From 9e25150f629b3a27aec1de88913003df69552509 Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Tue, 21 Oct 2025 18:59:57 +0200 Subject: [PATCH 01/17] Implement spec-compliant chunk-stram-id serde --- lib/membrane_rtmp_plugin/rtmp/header.ex | 137 ++++--- .../membrane_rtmp_plugin/rtmp_header_test.exs | 365 ++++++++++++++++++ 2 files changed, 459 insertions(+), 43 deletions(-) create mode 100644 test/membrane_rtmp_plugin/rtmp_header_test.exs diff --git a/lib/membrane_rtmp_plugin/rtmp/header.ex b/lib/membrane_rtmp_plugin/rtmp/header.ex index b5eca2a8..b7bba995 100644 --- a/lib/membrane_rtmp_plugin/rtmp/header.ex +++ b/lib/membrane_rtmp_plugin/rtmp/header.ex @@ -39,11 +39,6 @@ defmodule Membrane.RTMP.Header do defmacro type(:amf_data), do: 0x12 defmacro type(:amf_command), do: 0x14 - @header_type_0 <<0x0::2>> - @header_type_1 <<0x1::2>> - @header_type_2 <<0x2::2>> - @header_type_3 <<0x3::2>> - @extended_timestamp_marker 0xFFFFFF @spec new(Keyword.t()) :: t() @@ -62,14 +57,50 @@ defmodule Membrane.RTMP.Header do * `0b11` - all values are derived from the previous header with the same `chunk_stream_id` """ @spec deserialize(binary(), t() | nil) :: {t(), rest :: binary()} | {:error, :need_more_data} - def deserialize(binary, previous_headers \\ nil) + + def deserialize(<>, previous_headers) do + # chunk basic header is made of the fmt and chunk_stream_id fields. + fmt = + case header_type do + 0 -> :type_0 + 1 -> :type_1 + 2 -> :type_2 + 3 -> :type_3 + end + + {chunk_stream_id, rest} = deserialize_chunk_stream_id(rest) + + deserialize_message_header( + %{fmt: fmt, chunk_stream_id: chunk_stream_id}, + rest, + previous_headers + ) + end + + defp deserialize_chunk_stream_id(<<0::6, stream_id::8, rest::binary>>) do + # 2 byte format + {stream_id + 64, rest} + end + + defp deserialize_chunk_stream_id( + <<1::6, stream_id_part_2::8, stream_id_part_3::8, rest::binary>> + ) do + # 3 byte format + {stream_id_part_3 * 256 + stream_id_part_2 + 64, rest} + end + + defp deserialize_chunk_stream_id(<>) do + # 1 byte format + {stream_id, rest} + end # only the deserialization of the 0b00 type can have `nil` previous header - def deserialize( - <<@header_type_0::bitstring, chunk_stream_id::6, timestamp::24, body_size::24, type_id::8, - stream_id::little-integer-size(32), rest::binary>>, - _previous_headers - ) do + defp deserialize_message_header( + %{fmt: :type_0, chunk_stream_id: chunk_stream_id}, + <>, + _previous_headers + ) do with {timestamp, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp) do header = %__MODULE__{ chunk_stream_id: chunk_stream_id, @@ -84,11 +115,11 @@ defmodule Membrane.RTMP.Header do end end - def deserialize( - <<@header_type_1::bitstring, chunk_stream_id::6, timestamp_delta::24, body_size::24, - type_id::8, rest::binary>>, - previous_headers - ) do + defp deserialize_message_header( + %{fmt: :type_1, chunk_stream_id: chunk_stream_id}, + <>, + previous_headers + ) do with {timestamp_delta, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp_delta) do header = %__MODULE__{ chunk_stream_id: chunk_stream_id, @@ -104,10 +135,11 @@ defmodule Membrane.RTMP.Header do end end - def deserialize( - <<@header_type_2::bitstring, chunk_stream_id::6, timestamp_delta::24, rest::binary>>, - previous_headers - ) do + defp deserialize_message_header( + %{fmt: :type_2, chunk_stream_id: chunk_stream_id}, + <>, + previous_headers + ) do with {timestamp_delta, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp_delta) do header = %__MODULE__{ chunk_stream_id: chunk_stream_id, @@ -123,10 +155,11 @@ defmodule Membrane.RTMP.Header do end end - def deserialize( - <<@header_type_3::bitstring, chunk_stream_id::6, rest::binary>>, - previous_headers - ) do + defp deserialize_message_header( + %{fmt: :type_3, chunk_stream_id: chunk_stream_id}, + <>, + previous_headers + ) do previous_header = previous_headers[chunk_stream_id] if previous_header.extended_timestamp? do @@ -150,35 +183,53 @@ defmodule Membrane.RTMP.Header do end end - def deserialize( - <<@header_type_0::bitstring, _chunk_stream_id::6, _rest::binary>>, - _prev_header - ), - do: {:error, :need_more_data} + defp deserialize_message_header(_basic_header, _data, _prev_header), + do: {:error, :need_more_data} + + @spec serialize(t()) :: binary() + def serialize(%__MODULE__{chunk_stream_id: chunk_stream_id} = header) + when chunk_stream_id >= 2 and chunk_stream_id <= 63 do + # 1-byte format: fmt (2 bits) + chunk_stream_id (6 bits) + %{ + timestamp: timestamp, + body_size: body_size, + type_id: type_id, + stream_id: stream_id + } = header + + <<0::2, chunk_stream_id::6, timestamp::24, body_size::24, type_id::8, + stream_id::little-integer-size(32)>> + end - def deserialize( - <<@header_type_1::bitstring, _chunk_stream_id::6, _rest::binary>>, - _prev_header - ), - do: {:error, :need_more_data} + def serialize(%__MODULE__{chunk_stream_id: chunk_stream_id} = header) + when chunk_stream_id >= 64 and chunk_stream_id <= 319 do + # 2-byte format: fmt (2 bits) + marker 0 (6 bits) + (id - 64) (8 bits) + %{ + timestamp: timestamp, + body_size: body_size, + type_id: type_id, + stream_id: stream_id + } = header - def deserialize( - <<@header_type_2::bitstring, _chunk_stream_id::6, _rest::binary>>, - _prev_header - ), - do: {:error, :need_more_data} + <<0::2, 0::6, chunk_stream_id - 64::8, timestamp::24, body_size::24, type_id::8, + stream_id::little-integer-size(32)>> + end - @spec serialize(t()) :: binary() - def serialize(%__MODULE__{} = header) do + def serialize(%__MODULE__{chunk_stream_id: chunk_stream_id} = header) + when chunk_stream_id >= 320 and chunk_stream_id <= 65599 do + # 3-byte format: fmt (2 bits) + marker 1 (6 bits) + low byte (8 bits) + high byte (8 bits) %{ - chunk_stream_id: chunk_stream_id, timestamp: timestamp, body_size: body_size, type_id: type_id, stream_id: stream_id } = header - <<@header_type_0::bitstring, chunk_stream_id::6, timestamp::24, body_size::24, type_id::8, + id_minus_64 = chunk_stream_id - 64 + low_byte = rem(id_minus_64, 256) + high_byte = div(id_minus_64, 256) + + <<0::2, 1::6, low_byte::8, high_byte::8, timestamp::24, body_size::24, type_id::8, stream_id::little-integer-size(32)>> end diff --git a/test/membrane_rtmp_plugin/rtmp_header_test.exs b/test/membrane_rtmp_plugin/rtmp_header_test.exs new file mode 100644 index 00000000..a3f5560a --- /dev/null +++ b/test/membrane_rtmp_plugin/rtmp_header_test.exs @@ -0,0 +1,365 @@ +defmodule Membrane.RTMP.HeaderTest do + use ExUnit.Case, async: true + + alias Membrane.RTMP.Header + + describe "Extended Chunk Stream ID encoding" do + test "deserialize header with 2-byte chunk stream ID format (ID 64)" do + # Spec: When 6-bit field is 0, the ID is (second byte + 64) + # For chunk stream ID 64: second byte should be 0 (64 - 64 = 0) + # Header type 0 (0b00) with chunk_stream_id_marker=0 + # Format: [fmt:2 | cs_id:6] [cs_id-64:8] [timestamp:24] [body_size:24] [type_id:8] [stream_id:32] + + chunk_stream_id = 64 + timestamp = 1000 + body_size = 256 + # audio + type_id = 8 + stream_id = 1 + + binary = + << + # Header type 0 with chunk_stream_id field = 0 (indicates 2-byte format) + 0b00::2, + 0::6, + # Second byte: chunk_stream_id - 64 + chunk_stream_id - 64::8, + # Standard header fields + timestamp::24, + body_size::24, + type_id::8, + stream_id::little-32 + >> + + {header, <<>>} = Header.deserialize(binary, nil) + + assert header.chunk_stream_id == chunk_stream_id + assert header.timestamp == timestamp + assert header.body_size == body_size + assert header.type_id == type_id + assert header.stream_id == stream_id + end + + test "deserialize header with 2-byte chunk stream ID format (ID 319)" do + # For chunk stream ID 319: second byte should be 255 (319 - 64 = 255) + chunk_stream_id = 319 + timestamp = 2000 + body_size = 512 + # video + type_id = 9 + stream_id = 1 + + binary = + << + 0b00::2, + 0::6, + chunk_stream_id - 64::8, + timestamp::24, + body_size::24, + type_id::8, + stream_id::little-32 + >> + + {header, <<>>} = Header.deserialize(binary, nil) + + assert header.chunk_stream_id == chunk_stream_id + assert header.timestamp == timestamp + assert header.body_size == body_size + end + + test "deserialize header with 3-byte chunk stream ID format (ID 320)" do + # Spec: When 6-bit field is 1, the ID is ((third byte)*256 + (second byte) + 64) + # For chunk stream ID 320: (third byte)*256 + (second byte) + 64 = 320 + # So: (third byte)*256 + (second byte) = 256 + # third byte = 1, second byte = 0 + + chunk_stream_id = 320 + timestamp = 3000 + body_size = 1024 + type_id = 8 + stream_id = 1 + + # Calculate bytes for 3-byte format + # = 256 + id_minus_64 = chunk_stream_id - 64 + # = 0 + second_byte = rem(id_minus_64, 256) + # = 1 + third_byte = div(id_minus_64, 256) + + binary = + << + # Header type 0 with chunk_stream_id field = 1 (indicates 3-byte format) + 0b00::2, + 1::6, + # Second and third bytes for extended ID + second_byte::8, + third_byte::8, + # Standard header fields + timestamp::24, + body_size::24, + type_id::8, + stream_id::little-32 + >> + + {header, <<>>} = Header.deserialize(binary, nil) + + assert header.chunk_stream_id == chunk_stream_id + assert header.timestamp == timestamp + assert header.body_size == body_size + end + + test "deserialize header with 3-byte chunk stream ID format (ID 65599)" do + # Maximum chunk stream ID supported by 3-byte format + chunk_stream_id = 65599 + timestamp = 4000 + body_size = 2048 + type_id = 9 + stream_id = 1 + + # = 65535 + id_minus_64 = chunk_stream_id - 64 + # = 255 + second_byte = rem(id_minus_64, 256) + # = 255 + third_byte = div(id_minus_64, 256) + + binary = + << + 0b00::2, + 1::6, + second_byte::8, + third_byte::8, + timestamp::24, + body_size::24, + type_id::8, + stream_id::little-32 + >> + + {header, <<>>} = Header.deserialize(binary, nil) + + assert header.chunk_stream_id == chunk_stream_id + assert header.timestamp == timestamp + assert header.body_size == body_size + end + + test "deserialize header type 1 with 2-byte chunk stream ID" do + # Test extended chunk stream ID with header type 1 (no stream_id) + chunk_stream_id = 100 + timestamp_delta = 500 + body_size = 128 + type_id = 8 + + # Create a previous header for type 1 deserialization + previous_header = %Header{ + chunk_stream_id: chunk_stream_id, + timestamp: 1000, + body_size: 100, + type_id: 8, + stream_id: 1 + } + + binary = + << + 0b01::2, + 0::6, + chunk_stream_id - 64::8, + timestamp_delta::24, + body_size::24, + type_id::8 + >> + + {header, <<>>} = Header.deserialize(binary, %{chunk_stream_id => previous_header}) + + assert header.chunk_stream_id == chunk_stream_id + assert header.timestamp == previous_header.timestamp + timestamp_delta + assert header.body_size == body_size + assert header.stream_id == previous_header.stream_id + end + + test "deserialize header type 2 with 3-byte chunk stream ID" do + # Test extended chunk stream ID with header type 2 (only timestamp delta) + chunk_stream_id = 500 + timestamp_delta = 42 + + id_minus_64 = chunk_stream_id - 64 + second_byte = rem(id_minus_64, 256) + third_byte = div(id_minus_64, 256) + + previous_header = %Header{ + chunk_stream_id: chunk_stream_id, + timestamp: 2000, + body_size: 256, + type_id: 9, + stream_id: 1 + } + + binary = + << + 0b10::2, + 1::6, + second_byte::8, + third_byte::8, + timestamp_delta::24 + >> + + {header, <<>>} = Header.deserialize(binary, %{chunk_stream_id => previous_header}) + + assert header.chunk_stream_id == chunk_stream_id + assert header.timestamp == previous_header.timestamp + timestamp_delta + assert header.body_size == previous_header.body_size + assert header.type_id == previous_header.type_id + end + + test "deserialize header type 3 with 2-byte chunk stream ID" do + # Test extended chunk stream ID with header type 3 (no new fields) + chunk_stream_id = 200 + + previous_header = %Header{ + chunk_stream_id: chunk_stream_id, + timestamp: 3000, + timestamp_delta: 100, + body_size: 512, + type_id: 8, + stream_id: 1, + extended_timestamp?: false + } + + binary = + << + 0b11::2, + 0::6, + chunk_stream_id - 64::8 + >> + + {header, <<>>} = Header.deserialize(binary, %{chunk_stream_id => previous_header}) + + assert header.chunk_stream_id == chunk_stream_id + assert header.timestamp == previous_header.timestamp + previous_header.timestamp_delta + end + + test "serialize header with 2-byte chunk stream ID format" do + # Test serialization of headers with extended chunk stream IDs + header = %Header{ + chunk_stream_id: 150, + timestamp: 5000, + body_size: 1024, + type_id: 9, + stream_id: 1 + } + + binary = Header.serialize(header) + + # Verify it can be deserialized back + {deserialized, <<>>} = Header.deserialize(binary, nil) + assert deserialized.chunk_stream_id == header.chunk_stream_id + assert deserialized.timestamp == header.timestamp + assert deserialized.body_size == header.body_size + end + + test "serialize header with 3-byte chunk stream ID format" do + # Test serialization with large chunk stream ID + header = %Header{ + chunk_stream_id: 1000, + timestamp: 6000, + body_size: 2048, + type_id: 8, + stream_id: 1 + } + + binary = Header.serialize(header) + + # Verify it can be deserialized back + {deserialized, <<>>} = Header.deserialize(binary, nil) + assert deserialized.chunk_stream_id == header.chunk_stream_id + assert deserialized.timestamp == header.timestamp + assert deserialized.body_size == header.body_size + end + + test "round-trip for chunk stream ID boundary values" do + # Test boundary values between different formats + test_ids = [ + # Minimum single-byte + 2, + # Maximum single-byte + 63, + # Minimum 2-byte + 64, + # Maximum 2-byte + 319, + # Minimum 3-byte + 320, + # Maximum 3-byte + 65599 + ] + + for chunk_stream_id <- test_ids do + header = %Header{ + chunk_stream_id: chunk_stream_id, + timestamp: 1000, + body_size: 100, + type_id: 8, + stream_id: 1 + } + + binary = Header.serialize(header) + {deserialized, <<>>} = Header.deserialize(binary, nil) + + assert deserialized.chunk_stream_id == chunk_stream_id, + "Failed round-trip for chunk_stream_id #{chunk_stream_id}" + end + end + end + + describe "Standard Chunk Stream ID encoding (existing behavior)" do + test "deserialize header with standard 6-bit chunk stream ID" do + # This test should pass with current implementation + chunk_stream_id = 5 + timestamp = 1000 + body_size = 256 + type_id = 8 + stream_id = 1 + + binary = + << + 0b00::2, + chunk_stream_id::6, + timestamp::24, + body_size::24, + type_id::8, + stream_id::little-32 + >> + + {header, <<>>} = Header.deserialize(binary, nil) + + assert header.chunk_stream_id == chunk_stream_id + assert header.timestamp == timestamp + assert header.body_size == body_size + end + + test "serialize header with standard chunk stream ID" do + # This should work with current implementation + header = %Header{ + chunk_stream_id: 10, + timestamp: 2000, + body_size: 512, + type_id: 9, + stream_id: 1 + } + + binary = Header.serialize(header) + + expected = + << + 0b00::2, + 10::6, + 2000::24, + 512::24, + 9::8, + 1::little-32 + >> + + assert binary == expected + end + end +end From 631ae3818df55fbb57d7858fdca35347c74253ae Mon Sep 17 00:00:00 2001 From: Philip Giuliani Date: Thu, 23 Oct 2025 09:26:37 +0200 Subject: [PATCH 02/17] Fix credo warnings --- lib/membrane_rtmp_plugin/rtmp/header.ex | 2 +- test/membrane_rtmp_plugin/rtmp_header_test.exs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/header.ex b/lib/membrane_rtmp_plugin/rtmp/header.ex index b7bba995..94493ffd 100644 --- a/lib/membrane_rtmp_plugin/rtmp/header.ex +++ b/lib/membrane_rtmp_plugin/rtmp/header.ex @@ -216,7 +216,7 @@ defmodule Membrane.RTMP.Header do end def serialize(%__MODULE__{chunk_stream_id: chunk_stream_id} = header) - when chunk_stream_id >= 320 and chunk_stream_id <= 65599 do + when chunk_stream_id >= 320 and chunk_stream_id <= 65_599 do # 3-byte format: fmt (2 bits) + marker 1 (6 bits) + low byte (8 bits) + high byte (8 bits) %{ timestamp: timestamp, diff --git a/test/membrane_rtmp_plugin/rtmp_header_test.exs b/test/membrane_rtmp_plugin/rtmp_header_test.exs index a3f5560a..0c8f3c9a 100644 --- a/test/membrane_rtmp_plugin/rtmp_header_test.exs +++ b/test/membrane_rtmp_plugin/rtmp_header_test.exs @@ -111,7 +111,7 @@ defmodule Membrane.RTMP.HeaderTest do test "deserialize header with 3-byte chunk stream ID format (ID 65599)" do # Maximum chunk stream ID supported by 3-byte format - chunk_stream_id = 65599 + chunk_stream_id = 65_599 timestamp = 4000 body_size = 2048 type_id = 9 @@ -290,7 +290,7 @@ defmodule Membrane.RTMP.HeaderTest do # Minimum 3-byte 320, # Maximum 3-byte - 65599 + 65_599 ] for chunk_stream_id <- test_ids do From faaa13cf6be1002cd34e5fc820ad88d0ab287026 Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Thu, 23 Oct 2025 14:59:29 +0200 Subject: [PATCH 03/17] Handle chunked stream ids also for chunked messages --- lib/membrane_rtmp_plugin/rtmp/header.ex | 49 +++++++++- lib/membrane_rtmp_plugin/rtmp/message.ex | 5 +- .../rtmp/message_parser.ex | 92 +++++++------------ .../membrane_rtmp_plugin/rtmp_header_test.exs | 42 +++++++++ 4 files changed, 125 insertions(+), 63 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/header.ex b/lib/membrane_rtmp_plugin/rtmp/header.ex index 94493ffd..2a357a42 100644 --- a/lib/membrane_rtmp_plugin/rtmp/header.ex +++ b/lib/membrane_rtmp_plugin/rtmp/header.ex @@ -77,19 +77,30 @@ defmodule Membrane.RTMP.Header do ) end - defp deserialize_chunk_stream_id(<<0::6, stream_id::8, rest::binary>>) do + @doc """ + Deserializes chunk stream ID from binary. + + Handles all three formats: + - 1-byte format: IDs 2-63 encoded directly in 6 bits + - 2-byte format: IDs 64-319 with marker 0 + (id-64) in 8 bits + - 3-byte format: IDs 320-65599 with marker 1 + low byte + high byte + + Returns tuple of {chunk_stream_id, remaining_binary} + """ + @spec deserialize_chunk_stream_id(binary()) :: {integer(), binary()} + def deserialize_chunk_stream_id(<<0::6, stream_id::8, rest::binary>>) do # 2 byte format {stream_id + 64, rest} end - defp deserialize_chunk_stream_id( - <<1::6, stream_id_part_2::8, stream_id_part_3::8, rest::binary>> - ) do + def deserialize_chunk_stream_id( + <<1::6, stream_id_part_2::8, stream_id_part_3::8, rest::binary>> + ) do # 3 byte format {stream_id_part_3 * 256 + stream_id_part_2 + 64, rest} end - defp deserialize_chunk_stream_id(<>) do + def deserialize_chunk_stream_id(<>) do # 1 byte format {stream_id, rest} end @@ -186,6 +197,34 @@ defmodule Membrane.RTMP.Header do defp deserialize_message_header(_basic_header, _data, _prev_header), do: {:error, :need_more_data} + @doc """ + Serializes a chunk basic header (format type + chunk stream ID). + + Used for creating chunk headers, including Type 3 headers between message chunks. + The fmt parameter should be 0-3 corresponding to header types 0-3. + """ + @spec serialize_chunk_basic_header(0..3, integer()) :: binary() + def serialize_chunk_basic_header(fmt, chunk_stream_id) + when fmt in 0..3 and chunk_stream_id >= 2 and chunk_stream_id <= 63 do + # 1-byte format: fmt (2 bits) + chunk_stream_id (6 bits) + <> + end + + def serialize_chunk_basic_header(fmt, chunk_stream_id) + when fmt in 0..3 and chunk_stream_id >= 64 and chunk_stream_id <= 319 do + # 2-byte format: fmt (2 bits) + marker 0 (6 bits) + (id - 64) (8 bits) + <> + end + + def serialize_chunk_basic_header(fmt, chunk_stream_id) + when fmt in 0..3 and chunk_stream_id >= 320 and chunk_stream_id <= 65599 do + # 3-byte format: fmt (2 bits) + marker 1 (6 bits) + low byte (8 bits) + high byte (8 bits) + id_minus_64 = chunk_stream_id - 64 + low_byte = rem(id_minus_64, 256) + high_byte = div(id_minus_64, 256) + <> + end + @spec serialize(t()) :: binary() def serialize(%__MODULE__{chunk_stream_id: chunk_stream_id} = header) when chunk_stream_id >= 2 and chunk_stream_id <= 63 do diff --git a/lib/membrane_rtmp_plugin/rtmp/message.ex b/lib/membrane_rtmp_plugin/rtmp/message.ex index 80206dc6..bc1d8069 100644 --- a/lib/membrane_rtmp_plugin/rtmp/message.ex +++ b/lib/membrane_rtmp_plugin/rtmp/message.ex @@ -102,5 +102,8 @@ defmodule Membrane.RTMP.Message do end @compile {:inline, chunk_separator: 1} - defp chunk_separator(chunk_stream_id), do: <<0b11::2, chunk_stream_id::6>> + defp chunk_separator(chunk_stream_id) do + # Use Type 3 header (fmt=3) for chunk separators + Membrane.RTMP.Header.serialize_chunk_basic_header(3, chunk_stream_id) + end end diff --git a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex index 34e17544..a1114911 100644 --- a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex +++ b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex @@ -178,19 +178,17 @@ defmodule Membrane.RTMP.MessageParser do defp read_frame(packet, previous_headers, chunk_size) do case Header.deserialize(packet, previous_headers) do - {%Header{} = header, rest} -> - chunked_body_size = calculate_chunked_body_size(header, chunk_size) + {%Header{chunk_stream_id: chunk_stream_id} = header, rest} -> + # Add the current header to previous_headers so Type 3 headers can reference it + updated_previous_headers = Map.put(previous_headers, chunk_stream_id, header) - case rest do - <> -> - combined_body = combine_body_chunks(body, chunk_size, header) - - message = Message.deserialize_message(header.type_id, combined_body) + case read_chunked_body(rest, header, chunk_size, updated_previous_headers) do + {:error, :need_more_data} = error -> + error + {body, rest} -> + message = Message.deserialize_message(header.type_id, body) {header, message, rest} - - _rest -> - {:error, :need_more_data} end {:error, :need_more_data} = error -> @@ -198,54 +196,34 @@ defmodule Membrane.RTMP.MessageParser do end end - defp calculate_chunked_body_size(%Header{body_size: body_size} = header, chunk_size) do - if body_size > chunk_size do - # if a message's body is greater than the chunk size then - # after every chunk_size's bytes there is a 0x03 one byte header that - # needs to be stripped and is not counted into the body_size - headers_to_strip = div(body_size - 1, chunk_size) - - # if the initial header contains a extended timestamp then - # every following chunk will contain the timestamp - timestamps_to_strip = if header.extended_timestamp?, do: headers_to_strip * 4, else: 0 - - body_size + headers_to_strip + timestamps_to_strip - else - body_size - end - end - - # message's size can exceed the defined chunk size - # in this case the message gets divided into - # a sequence of smaller packets separated by the a header type 3 byte - # (the first 2 bits has to be 0b11) - defp combine_body_chunks(body, chunk_size, header) do - if byte_size(body) <= chunk_size do - body - else - do_combine_body_chunks(body, chunk_size, header, []) - end - end - - defp do_combine_body_chunks(body, chunk_size, header, acc) do - case body do - <> - when header.extended_timestamp? and timestamp == header.timestamp -> - do_combine_body_chunks(rest, chunk_size, header, [acc, body]) - - # cut out the header byte (staring with 0b11) - <> -> - do_combine_body_chunks(rest, chunk_size, header, [acc, body]) - - <<_body::binary-size(chunk_size), header_type::2, _chunk_stream_id::6, _rest::binary>> -> - Membrane.Logger.warning( - "Unexpected header type when combining body chunks: #{header_type}" - ) - - IO.iodata_to_binary([acc, body]) + defp read_chunked_body(data, header, chunk_size, previous_headers, acc \\ <<>>) do + bytes_read = byte_size(acc) + bytes_remaining = header.body_size - bytes_read + bytes_to_read = min(bytes_remaining, chunk_size) + + case data do + <> -> + new_acc = <> + + if byte_size(new_acc) == header.body_size do + # Complete message body assembled + {new_acc, rest} + else + # Need more chunks - parse next Type 3 header + # Note: We don't update previous_headers because Type 3 headers are just + # continuation markers within the same message. The original message header + # in previous_headers is what all Type 3 headers should reference. + case Header.deserialize(rest, previous_headers) do + {%Header{}, rest_after_header} -> + read_chunked_body(rest_after_header, header, chunk_size, previous_headers, new_acc) + + {:error, :need_more_data} = error -> + error + end + end - body -> - IO.iodata_to_binary([acc, body]) + _ -> + {:error, :need_more_data} end end diff --git a/test/membrane_rtmp_plugin/rtmp_header_test.exs b/test/membrane_rtmp_plugin/rtmp_header_test.exs index 0c8f3c9a..5360dde8 100644 --- a/test/membrane_rtmp_plugin/rtmp_header_test.exs +++ b/test/membrane_rtmp_plugin/rtmp_header_test.exs @@ -311,6 +311,48 @@ defmodule Membrane.RTMP.HeaderTest do end end + describe "serialize_chunk_basic_header/2" do + test "creates Type 3 header with 1-byte chunk stream ID" do + chunk_stream_id = 10 + header = Header.serialize_chunk_basic_header(3, chunk_stream_id) + + # Should be 1 byte: fmt (2 bits) + id (6 bits) + assert byte_size(header) == 1 + assert header == <<0b11::2, chunk_stream_id::6>> + end + + test "creates Type 3 header with 2-byte chunk stream ID" do + chunk_stream_id = 150 + header = Header.serialize_chunk_basic_header(3, chunk_stream_id) + + # Should be 2 bytes: fmt (2 bits) + 0 (6 bits) + (id-64) (8 bits) + assert byte_size(header) == 2 + assert header == <<0b11::2, 0::6, chunk_stream_id - 64::8>> + end + + test "creates Type 3 header with 3-byte chunk stream ID" do + chunk_stream_id = 1000 + header = Header.serialize_chunk_basic_header(3, chunk_stream_id) + + # Should be 3 bytes: fmt (2 bits) + 1 (6 bits) + low (8 bits) + high (8 bits) + assert byte_size(header) == 3 + + id_minus_64 = chunk_stream_id - 64 + low_byte = rem(id_minus_64, 256) + high_byte = div(id_minus_64, 256) + + assert header == <<0b11::2, 1::6, low_byte::8, high_byte::8>> + end + + test "creates Type 0 header with extended chunk stream ID" do + chunk_stream_id = 200 + header = Header.serialize_chunk_basic_header(0, chunk_stream_id) + + assert byte_size(header) == 2 + assert header == <<0b00::2, 0::6, chunk_stream_id - 64::8>> + end + end + describe "Standard Chunk Stream ID encoding (existing behavior)" do test "deserialize header with standard 6-bit chunk stream ID" do # This test should pass with current implementation From 16e9fc9f641544142afbe56a42d1030f01fc7115 Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Thu, 23 Oct 2025 15:15:24 +0200 Subject: [PATCH 04/17] Checkpoint --- .../rtmp/message_parser.ex | 44 ++++++++++++------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex index a1114911..dde24d99 100644 --- a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex +++ b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex @@ -205,21 +205,35 @@ defmodule Membrane.RTMP.MessageParser do <> -> new_acc = <> - if byte_size(new_acc) == header.body_size do - # Complete message body assembled - {new_acc, rest} - else - # Need more chunks - parse next Type 3 header - # Note: We don't update previous_headers because Type 3 headers are just - # continuation markers within the same message. The original message header - # in previous_headers is what all Type 3 headers should reference. - case Header.deserialize(rest, previous_headers) do - {%Header{}, rest_after_header} -> - read_chunked_body(rest_after_header, header, chunk_size, previous_headers, new_acc) - - {:error, :need_more_data} = error -> - error - end + cond do + byte_size(new_acc) == header.body_size -> + # Complete message body assembled + {new_acc, rest} + + rest == <<>> -> + # Need more data but buffer is empty + {:error, :need_more_data} + + true -> + # Need more chunks - parse next header + # Note: We don't update previous_headers because Type 3 headers are just + # continuation markers within the same message. The original message header + # in previous_headers is what all Type 3 headers should reference. + case Header.deserialize(rest, previous_headers) do + {%Header{chunk_stream_id: parsed_chunk_stream_id}, rest_after_header} -> + # Check if the parsed header belongs to the current message + if parsed_chunk_stream_id == header.chunk_stream_id do + # It's a continuation of our message (Type 3), keep reading + read_chunked_body(rest_after_header, header, chunk_size, previous_headers, new_acc) + else + # It's a header for a different stream (interleaved messages) + # The current message is incomplete, we need to wait for more data + {:error, :need_more_data} + end + + {:error, :need_more_data} = error -> + error + end end _ -> From dde4b1279fd4530229daf95ac90b12e754656cb5 Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Thu, 23 Oct 2025 15:30:27 +0200 Subject: [PATCH 05/17] Fix infinite recursion --- .../rtmp/message_parser.ex | 109 ++++++++++-------- 1 file changed, 64 insertions(+), 45 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex index dde24d99..2ef884ed 100644 --- a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex +++ b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex @@ -178,17 +178,19 @@ defmodule Membrane.RTMP.MessageParser do defp read_frame(packet, previous_headers, chunk_size) do case Header.deserialize(packet, previous_headers) do - {%Header{chunk_stream_id: chunk_stream_id} = header, rest} -> - # Add the current header to previous_headers so Type 3 headers can reference it - updated_previous_headers = Map.put(previous_headers, chunk_stream_id, header) + {%Header{} = header, rest} -> + chunked_body_size = calculate_chunked_body_size(header, chunk_size) - case read_chunked_body(rest, header, chunk_size, updated_previous_headers) do - {:error, :need_more_data} = error -> - error + case rest do + <> -> + combined_body = combine_body_chunks(body, chunk_size, header) + + message = Message.deserialize_message(header.type_id, combined_body) - {body, rest} -> - message = Message.deserialize_message(header.type_id, body) {header, message, rest} + + _rest -> + {:error, :need_more_data} end {:error, :need_more_data} = error -> @@ -196,48 +198,65 @@ defmodule Membrane.RTMP.MessageParser do end end - defp read_chunked_body(data, header, chunk_size, previous_headers, acc \\ <<>>) do - bytes_read = byte_size(acc) - bytes_remaining = header.body_size - bytes_read - bytes_to_read = min(bytes_remaining, chunk_size) + defp calculate_chunked_body_size(%Header{body_size: body_size, chunk_stream_id: chunk_stream_id} = header, chunk_size) do + if body_size > chunk_size do + # If a message's body is greater than the chunk size, then after every + # chunk_size bytes there is a Type 3 chunk header that needs to be stripped. + num_continuation_chunks = div(body_size - 1, chunk_size) - case data do - <> -> - new_acc = <> + # Calculate the size of each Type 3 header based on chunk_stream_id + type3_header_size = chunk_basic_header_size(chunk_stream_id) - cond do - byte_size(new_acc) == header.body_size -> - # Complete message body assembled - {new_acc, rest} + # If the initial header contains an extended timestamp, then every + # following Type 3 chunk will also contain the extended timestamp (4 bytes) + timestamps_to_strip = if header.extended_timestamp?, do: num_continuation_chunks * 4, else: 0 - rest == <<>> -> - # Need more data but buffer is empty - {:error, :need_more_data} + body_size + (num_continuation_chunks * type3_header_size) + timestamps_to_strip + else + body_size + end + end - true -> - # Need more chunks - parse next header - # Note: We don't update previous_headers because Type 3 headers are just - # continuation markers within the same message. The original message header - # in previous_headers is what all Type 3 headers should reference. - case Header.deserialize(rest, previous_headers) do - {%Header{chunk_stream_id: parsed_chunk_stream_id}, rest_after_header} -> - # Check if the parsed header belongs to the current message - if parsed_chunk_stream_id == header.chunk_stream_id do - # It's a continuation of our message (Type 3), keep reading - read_chunked_body(rest_after_header, header, chunk_size, previous_headers, new_acc) - else - # It's a header for a different stream (interleaved messages) - # The current message is incomplete, we need to wait for more data - {:error, :need_more_data} - end - - {:error, :need_more_data} = error -> - error - end - end + # Calculate the size of a chunk basic header (Type 3) based on chunk_stream_id + defp chunk_basic_header_size(chunk_stream_id) when chunk_stream_id >= 2 and chunk_stream_id <= 63, do: 1 + defp chunk_basic_header_size(chunk_stream_id) when chunk_stream_id >= 64 and chunk_stream_id <= 319, do: 2 + defp chunk_basic_header_size(chunk_stream_id) when chunk_stream_id >= 320 and chunk_stream_id <= 65599, do: 3 + + # Combine body chunks by stripping out Type 3 headers + # Messages larger than chunk_size are split into chunks separated by Type 3 headers + defp combine_body_chunks(body, chunk_size, header) do + if byte_size(body) <= chunk_size do + body + else + do_combine_body_chunks(body, chunk_size, header, []) + end + end + + defp do_combine_body_chunks(body, chunk_size, header, acc) do + # Get the Type 3 header size for this chunk stream + type3_header_size = chunk_basic_header_size(header.chunk_stream_id) + + # If extended timestamp, add 4 bytes + separator_size = if header.extended_timestamp?, do: type3_header_size + 4, else: type3_header_size + + case body do + <> -> + # Validate it's a Type 3 header (optional but safe) + <<0b11::2, _::bitstring>> = separator + + # Continue with next chunk + do_combine_body_chunks(rest, chunk_size, header, [acc, chunk]) + + <<_chunk::binary-size(chunk_size), header_type::2, _rest::bitstring>> -> + Membrane.Logger.warning( + "Unexpected header type when combining body chunks: #{header_type}" + ) + + IO.iodata_to_binary([acc, body]) - _ -> - {:error, :need_more_data} + body -> + # Last chunk (smaller than chunk_size) + IO.iodata_to_binary([acc, body]) end end From b1be8a610d9720e132cd7c8dba01e391b0601c85 Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Thu, 30 Oct 2025 14:11:30 +0100 Subject: [PATCH 06/17] Revert "Fix infinite recursion" This reverts commit dde4b1279fd4530229daf95ac90b12e754656cb5. --- .../rtmp/message_parser.ex | 109 ++++++++---------- 1 file changed, 45 insertions(+), 64 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex index 2ef884ed..dde24d99 100644 --- a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex +++ b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex @@ -178,19 +178,17 @@ defmodule Membrane.RTMP.MessageParser do defp read_frame(packet, previous_headers, chunk_size) do case Header.deserialize(packet, previous_headers) do - {%Header{} = header, rest} -> - chunked_body_size = calculate_chunked_body_size(header, chunk_size) + {%Header{chunk_stream_id: chunk_stream_id} = header, rest} -> + # Add the current header to previous_headers so Type 3 headers can reference it + updated_previous_headers = Map.put(previous_headers, chunk_stream_id, header) - case rest do - <> -> - combined_body = combine_body_chunks(body, chunk_size, header) - - message = Message.deserialize_message(header.type_id, combined_body) + case read_chunked_body(rest, header, chunk_size, updated_previous_headers) do + {:error, :need_more_data} = error -> + error + {body, rest} -> + message = Message.deserialize_message(header.type_id, body) {header, message, rest} - - _rest -> - {:error, :need_more_data} end {:error, :need_more_data} = error -> @@ -198,65 +196,48 @@ defmodule Membrane.RTMP.MessageParser do end end - defp calculate_chunked_body_size(%Header{body_size: body_size, chunk_stream_id: chunk_stream_id} = header, chunk_size) do - if body_size > chunk_size do - # If a message's body is greater than the chunk size, then after every - # chunk_size bytes there is a Type 3 chunk header that needs to be stripped. - num_continuation_chunks = div(body_size - 1, chunk_size) - - # Calculate the size of each Type 3 header based on chunk_stream_id - type3_header_size = chunk_basic_header_size(chunk_stream_id) + defp read_chunked_body(data, header, chunk_size, previous_headers, acc \\ <<>>) do + bytes_read = byte_size(acc) + bytes_remaining = header.body_size - bytes_read + bytes_to_read = min(bytes_remaining, chunk_size) - # If the initial header contains an extended timestamp, then every - # following Type 3 chunk will also contain the extended timestamp (4 bytes) - timestamps_to_strip = if header.extended_timestamp?, do: num_continuation_chunks * 4, else: 0 + case data do + <> -> + new_acc = <> - body_size + (num_continuation_chunks * type3_header_size) + timestamps_to_strip - else - body_size - end - end + cond do + byte_size(new_acc) == header.body_size -> + # Complete message body assembled + {new_acc, rest} - # Calculate the size of a chunk basic header (Type 3) based on chunk_stream_id - defp chunk_basic_header_size(chunk_stream_id) when chunk_stream_id >= 2 and chunk_stream_id <= 63, do: 1 - defp chunk_basic_header_size(chunk_stream_id) when chunk_stream_id >= 64 and chunk_stream_id <= 319, do: 2 - defp chunk_basic_header_size(chunk_stream_id) when chunk_stream_id >= 320 and chunk_stream_id <= 65599, do: 3 - - # Combine body chunks by stripping out Type 3 headers - # Messages larger than chunk_size are split into chunks separated by Type 3 headers - defp combine_body_chunks(body, chunk_size, header) do - if byte_size(body) <= chunk_size do - body - else - do_combine_body_chunks(body, chunk_size, header, []) - end - end - - defp do_combine_body_chunks(body, chunk_size, header, acc) do - # Get the Type 3 header size for this chunk stream - type3_header_size = chunk_basic_header_size(header.chunk_stream_id) - - # If extended timestamp, add 4 bytes - separator_size = if header.extended_timestamp?, do: type3_header_size + 4, else: type3_header_size - - case body do - <> -> - # Validate it's a Type 3 header (optional but safe) - <<0b11::2, _::bitstring>> = separator - - # Continue with next chunk - do_combine_body_chunks(rest, chunk_size, header, [acc, chunk]) - - <<_chunk::binary-size(chunk_size), header_type::2, _rest::bitstring>> -> - Membrane.Logger.warning( - "Unexpected header type when combining body chunks: #{header_type}" - ) + rest == <<>> -> + # Need more data but buffer is empty + {:error, :need_more_data} - IO.iodata_to_binary([acc, body]) + true -> + # Need more chunks - parse next header + # Note: We don't update previous_headers because Type 3 headers are just + # continuation markers within the same message. The original message header + # in previous_headers is what all Type 3 headers should reference. + case Header.deserialize(rest, previous_headers) do + {%Header{chunk_stream_id: parsed_chunk_stream_id}, rest_after_header} -> + # Check if the parsed header belongs to the current message + if parsed_chunk_stream_id == header.chunk_stream_id do + # It's a continuation of our message (Type 3), keep reading + read_chunked_body(rest_after_header, header, chunk_size, previous_headers, new_acc) + else + # It's a header for a different stream (interleaved messages) + # The current message is incomplete, we need to wait for more data + {:error, :need_more_data} + end + + {:error, :need_more_data} = error -> + error + end + end - body -> - # Last chunk (smaller than chunk_size) - IO.iodata_to_binary([acc, body]) + _ -> + {:error, :need_more_data} end end From 202dea31a978adb7a9eb623345c345c706a67a1b Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Thu, 30 Oct 2025 14:11:41 +0100 Subject: [PATCH 07/17] Revert "Checkpoint" This reverts commit 16e9fc9f641544142afbe56a42d1030f01fc7115. --- .../rtmp/message_parser.ex | 44 +++++++------------ 1 file changed, 15 insertions(+), 29 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex index dde24d99..a1114911 100644 --- a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex +++ b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex @@ -205,35 +205,21 @@ defmodule Membrane.RTMP.MessageParser do <> -> new_acc = <> - cond do - byte_size(new_acc) == header.body_size -> - # Complete message body assembled - {new_acc, rest} - - rest == <<>> -> - # Need more data but buffer is empty - {:error, :need_more_data} - - true -> - # Need more chunks - parse next header - # Note: We don't update previous_headers because Type 3 headers are just - # continuation markers within the same message. The original message header - # in previous_headers is what all Type 3 headers should reference. - case Header.deserialize(rest, previous_headers) do - {%Header{chunk_stream_id: parsed_chunk_stream_id}, rest_after_header} -> - # Check if the parsed header belongs to the current message - if parsed_chunk_stream_id == header.chunk_stream_id do - # It's a continuation of our message (Type 3), keep reading - read_chunked_body(rest_after_header, header, chunk_size, previous_headers, new_acc) - else - # It's a header for a different stream (interleaved messages) - # The current message is incomplete, we need to wait for more data - {:error, :need_more_data} - end - - {:error, :need_more_data} = error -> - error - end + if byte_size(new_acc) == header.body_size do + # Complete message body assembled + {new_acc, rest} + else + # Need more chunks - parse next Type 3 header + # Note: We don't update previous_headers because Type 3 headers are just + # continuation markers within the same message. The original message header + # in previous_headers is what all Type 3 headers should reference. + case Header.deserialize(rest, previous_headers) do + {%Header{}, rest_after_header} -> + read_chunked_body(rest_after_header, header, chunk_size, previous_headers, new_acc) + + {:error, :need_more_data} = error -> + error + end end _ -> From 0f136dd3612e3b9bdfb9e4927c4f08b2ff4eada2 Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Thu, 30 Oct 2025 14:11:49 +0100 Subject: [PATCH 08/17] Revert "Handle chunked stream ids also for chunked messages" This reverts commit faaa13cf6be1002cd34e5fc820ad88d0ab287026. --- lib/membrane_rtmp_plugin/rtmp/header.ex | 49 +--------- lib/membrane_rtmp_plugin/rtmp/message.ex | 5 +- .../rtmp/message_parser.ex | 92 ++++++++++++------- .../membrane_rtmp_plugin/rtmp_header_test.exs | 42 --------- 4 files changed, 63 insertions(+), 125 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/header.ex b/lib/membrane_rtmp_plugin/rtmp/header.ex index 2a357a42..94493ffd 100644 --- a/lib/membrane_rtmp_plugin/rtmp/header.ex +++ b/lib/membrane_rtmp_plugin/rtmp/header.ex @@ -77,30 +77,19 @@ defmodule Membrane.RTMP.Header do ) end - @doc """ - Deserializes chunk stream ID from binary. - - Handles all three formats: - - 1-byte format: IDs 2-63 encoded directly in 6 bits - - 2-byte format: IDs 64-319 with marker 0 + (id-64) in 8 bits - - 3-byte format: IDs 320-65599 with marker 1 + low byte + high byte - - Returns tuple of {chunk_stream_id, remaining_binary} - """ - @spec deserialize_chunk_stream_id(binary()) :: {integer(), binary()} - def deserialize_chunk_stream_id(<<0::6, stream_id::8, rest::binary>>) do + defp deserialize_chunk_stream_id(<<0::6, stream_id::8, rest::binary>>) do # 2 byte format {stream_id + 64, rest} end - def deserialize_chunk_stream_id( - <<1::6, stream_id_part_2::8, stream_id_part_3::8, rest::binary>> - ) do + defp deserialize_chunk_stream_id( + <<1::6, stream_id_part_2::8, stream_id_part_3::8, rest::binary>> + ) do # 3 byte format {stream_id_part_3 * 256 + stream_id_part_2 + 64, rest} end - def deserialize_chunk_stream_id(<>) do + defp deserialize_chunk_stream_id(<>) do # 1 byte format {stream_id, rest} end @@ -197,34 +186,6 @@ defmodule Membrane.RTMP.Header do defp deserialize_message_header(_basic_header, _data, _prev_header), do: {:error, :need_more_data} - @doc """ - Serializes a chunk basic header (format type + chunk stream ID). - - Used for creating chunk headers, including Type 3 headers between message chunks. - The fmt parameter should be 0-3 corresponding to header types 0-3. - """ - @spec serialize_chunk_basic_header(0..3, integer()) :: binary() - def serialize_chunk_basic_header(fmt, chunk_stream_id) - when fmt in 0..3 and chunk_stream_id >= 2 and chunk_stream_id <= 63 do - # 1-byte format: fmt (2 bits) + chunk_stream_id (6 bits) - <> - end - - def serialize_chunk_basic_header(fmt, chunk_stream_id) - when fmt in 0..3 and chunk_stream_id >= 64 and chunk_stream_id <= 319 do - # 2-byte format: fmt (2 bits) + marker 0 (6 bits) + (id - 64) (8 bits) - <> - end - - def serialize_chunk_basic_header(fmt, chunk_stream_id) - when fmt in 0..3 and chunk_stream_id >= 320 and chunk_stream_id <= 65599 do - # 3-byte format: fmt (2 bits) + marker 1 (6 bits) + low byte (8 bits) + high byte (8 bits) - id_minus_64 = chunk_stream_id - 64 - low_byte = rem(id_minus_64, 256) - high_byte = div(id_minus_64, 256) - <> - end - @spec serialize(t()) :: binary() def serialize(%__MODULE__{chunk_stream_id: chunk_stream_id} = header) when chunk_stream_id >= 2 and chunk_stream_id <= 63 do diff --git a/lib/membrane_rtmp_plugin/rtmp/message.ex b/lib/membrane_rtmp_plugin/rtmp/message.ex index bc1d8069..80206dc6 100644 --- a/lib/membrane_rtmp_plugin/rtmp/message.ex +++ b/lib/membrane_rtmp_plugin/rtmp/message.ex @@ -102,8 +102,5 @@ defmodule Membrane.RTMP.Message do end @compile {:inline, chunk_separator: 1} - defp chunk_separator(chunk_stream_id) do - # Use Type 3 header (fmt=3) for chunk separators - Membrane.RTMP.Header.serialize_chunk_basic_header(3, chunk_stream_id) - end + defp chunk_separator(chunk_stream_id), do: <<0b11::2, chunk_stream_id::6>> end diff --git a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex index a1114911..34e17544 100644 --- a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex +++ b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex @@ -178,17 +178,19 @@ defmodule Membrane.RTMP.MessageParser do defp read_frame(packet, previous_headers, chunk_size) do case Header.deserialize(packet, previous_headers) do - {%Header{chunk_stream_id: chunk_stream_id} = header, rest} -> - # Add the current header to previous_headers so Type 3 headers can reference it - updated_previous_headers = Map.put(previous_headers, chunk_stream_id, header) + {%Header{} = header, rest} -> + chunked_body_size = calculate_chunked_body_size(header, chunk_size) - case read_chunked_body(rest, header, chunk_size, updated_previous_headers) do - {:error, :need_more_data} = error -> - error + case rest do + <> -> + combined_body = combine_body_chunks(body, chunk_size, header) + + message = Message.deserialize_message(header.type_id, combined_body) - {body, rest} -> - message = Message.deserialize_message(header.type_id, body) {header, message, rest} + + _rest -> + {:error, :need_more_data} end {:error, :need_more_data} = error -> @@ -196,34 +198,54 @@ defmodule Membrane.RTMP.MessageParser do end end - defp read_chunked_body(data, header, chunk_size, previous_headers, acc \\ <<>>) do - bytes_read = byte_size(acc) - bytes_remaining = header.body_size - bytes_read - bytes_to_read = min(bytes_remaining, chunk_size) - - case data do - <> -> - new_acc = <> - - if byte_size(new_acc) == header.body_size do - # Complete message body assembled - {new_acc, rest} - else - # Need more chunks - parse next Type 3 header - # Note: We don't update previous_headers because Type 3 headers are just - # continuation markers within the same message. The original message header - # in previous_headers is what all Type 3 headers should reference. - case Header.deserialize(rest, previous_headers) do - {%Header{}, rest_after_header} -> - read_chunked_body(rest_after_header, header, chunk_size, previous_headers, new_acc) - - {:error, :need_more_data} = error -> - error - end - end + defp calculate_chunked_body_size(%Header{body_size: body_size} = header, chunk_size) do + if body_size > chunk_size do + # if a message's body is greater than the chunk size then + # after every chunk_size's bytes there is a 0x03 one byte header that + # needs to be stripped and is not counted into the body_size + headers_to_strip = div(body_size - 1, chunk_size) + + # if the initial header contains a extended timestamp then + # every following chunk will contain the timestamp + timestamps_to_strip = if header.extended_timestamp?, do: headers_to_strip * 4, else: 0 + + body_size + headers_to_strip + timestamps_to_strip + else + body_size + end + end + + # message's size can exceed the defined chunk size + # in this case the message gets divided into + # a sequence of smaller packets separated by the a header type 3 byte + # (the first 2 bits has to be 0b11) + defp combine_body_chunks(body, chunk_size, header) do + if byte_size(body) <= chunk_size do + body + else + do_combine_body_chunks(body, chunk_size, header, []) + end + end + + defp do_combine_body_chunks(body, chunk_size, header, acc) do + case body do + <> + when header.extended_timestamp? and timestamp == header.timestamp -> + do_combine_body_chunks(rest, chunk_size, header, [acc, body]) + + # cut out the header byte (staring with 0b11) + <> -> + do_combine_body_chunks(rest, chunk_size, header, [acc, body]) + + <<_body::binary-size(chunk_size), header_type::2, _chunk_stream_id::6, _rest::binary>> -> + Membrane.Logger.warning( + "Unexpected header type when combining body chunks: #{header_type}" + ) + + IO.iodata_to_binary([acc, body]) - _ -> - {:error, :need_more_data} + body -> + IO.iodata_to_binary([acc, body]) end end diff --git a/test/membrane_rtmp_plugin/rtmp_header_test.exs b/test/membrane_rtmp_plugin/rtmp_header_test.exs index 5360dde8..0c8f3c9a 100644 --- a/test/membrane_rtmp_plugin/rtmp_header_test.exs +++ b/test/membrane_rtmp_plugin/rtmp_header_test.exs @@ -311,48 +311,6 @@ defmodule Membrane.RTMP.HeaderTest do end end - describe "serialize_chunk_basic_header/2" do - test "creates Type 3 header with 1-byte chunk stream ID" do - chunk_stream_id = 10 - header = Header.serialize_chunk_basic_header(3, chunk_stream_id) - - # Should be 1 byte: fmt (2 bits) + id (6 bits) - assert byte_size(header) == 1 - assert header == <<0b11::2, chunk_stream_id::6>> - end - - test "creates Type 3 header with 2-byte chunk stream ID" do - chunk_stream_id = 150 - header = Header.serialize_chunk_basic_header(3, chunk_stream_id) - - # Should be 2 bytes: fmt (2 bits) + 0 (6 bits) + (id-64) (8 bits) - assert byte_size(header) == 2 - assert header == <<0b11::2, 0::6, chunk_stream_id - 64::8>> - end - - test "creates Type 3 header with 3-byte chunk stream ID" do - chunk_stream_id = 1000 - header = Header.serialize_chunk_basic_header(3, chunk_stream_id) - - # Should be 3 bytes: fmt (2 bits) + 1 (6 bits) + low (8 bits) + high (8 bits) - assert byte_size(header) == 3 - - id_minus_64 = chunk_stream_id - 64 - low_byte = rem(id_minus_64, 256) - high_byte = div(id_minus_64, 256) - - assert header == <<0b11::2, 1::6, low_byte::8, high_byte::8>> - end - - test "creates Type 0 header with extended chunk stream ID" do - chunk_stream_id = 200 - header = Header.serialize_chunk_basic_header(0, chunk_stream_id) - - assert byte_size(header) == 2 - assert header == <<0b00::2, 0::6, chunk_stream_id - 64::8>> - end - end - describe "Standard Chunk Stream ID encoding (existing behavior)" do test "deserialize header with standard 6-bit chunk stream ID" do # This test should pass with current implementation From 8aa2c01852ceed0a4817c2449caf96cbaf0629e8 Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Thu, 30 Oct 2025 15:14:32 +0100 Subject: [PATCH 09/17] Fix chunked message interleaving --- lib/membrane_rtmp_plugin/rtmp/header.ex | 84 ++++--- .../rtmp/message_parser.ex | 228 +++++++++++++----- 2 files changed, 214 insertions(+), 98 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/header.ex b/lib/membrane_rtmp_plugin/rtmp/header.ex index 94493ffd..66185259 100644 --- a/lib/membrane_rtmp_plugin/rtmp/header.ex +++ b/lib/membrane_rtmp_plugin/rtmp/header.ex @@ -120,18 +120,24 @@ defmodule Membrane.RTMP.Header do <>, previous_headers ) do - with {timestamp_delta, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp_delta) do - header = %__MODULE__{ - chunk_stream_id: chunk_stream_id, - timestamp: previous_headers[chunk_stream_id].timestamp + timestamp_delta, - timestamp_delta: timestamp_delta, - extended_timestamp?: extended_timestamp?, - body_size: body_size, - type_id: type_id, - stream_id: previous_headers[chunk_stream_id].stream_id - } + previous_header = previous_headers[chunk_stream_id] - {header, rest} + if previous_header == nil do + {:error, {:missing_previous_header, chunk_stream_id, :type_1}} + else + with {timestamp_delta, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp_delta) do + header = %__MODULE__{ + chunk_stream_id: chunk_stream_id, + timestamp: previous_header.timestamp + timestamp_delta, + timestamp_delta: timestamp_delta, + extended_timestamp?: extended_timestamp?, + body_size: body_size, + type_id: type_id, + stream_id: previous_header.stream_id + } + + {header, rest} + end end end @@ -140,18 +146,24 @@ defmodule Membrane.RTMP.Header do <>, previous_headers ) do - with {timestamp_delta, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp_delta) do - header = %__MODULE__{ - chunk_stream_id: chunk_stream_id, - timestamp: previous_headers[chunk_stream_id].timestamp + timestamp_delta, - timestamp_delta: timestamp_delta, - extended_timestamp?: extended_timestamp?, - body_size: previous_headers[chunk_stream_id].body_size, - type_id: previous_headers[chunk_stream_id].type_id, - stream_id: previous_headers[chunk_stream_id].stream_id - } + previous_header = previous_headers[chunk_stream_id] - {header, rest} + if previous_header == nil do + {:error, {:missing_previous_header, chunk_stream_id, :type_2}} + else + with {timestamp_delta, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp_delta) do + header = %__MODULE__{ + chunk_stream_id: chunk_stream_id, + timestamp: previous_header.timestamp + timestamp_delta, + timestamp_delta: timestamp_delta, + extended_timestamp?: extended_timestamp?, + body_size: previous_header.body_size, + type_id: previous_header.type_id, + stream_id: previous_header.stream_id + } + + {header, rest} + end end end @@ -162,24 +174,28 @@ defmodule Membrane.RTMP.Header do ) do previous_header = previous_headers[chunk_stream_id] - if previous_header.extended_timestamp? do - with {timestamp_delta, _extended_timestamp?, rest} <- - extract_timestamp(rest, @extended_timestamp_marker) do + if previous_header == nil do + {:error, {:missing_previous_header, chunk_stream_id, :type_3}} + else + if previous_header.extended_timestamp? do + with {timestamp_delta, _extended_timestamp?, rest} <- + extract_timestamp(rest, @extended_timestamp_marker) do + header = %__MODULE__{ + previous_header + | timestamp: previous_header.timestamp + timestamp_delta, + timestamp_delta: timestamp_delta + } + + {header, rest} + end + else header = %__MODULE__{ previous_header - | timestamp: previous_header.timestamp + timestamp_delta, - timestamp_delta: timestamp_delta + | timestamp: previous_header.timestamp + previous_header.timestamp_delta } {header, rest} end - else - header = %__MODULE__{ - previous_header - | timestamp: previous_header.timestamp + previous_header.timestamp_delta - } - - {header, rest} end end diff --git a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex index 34e17544..0c61a716 100644 --- a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex +++ b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex @@ -6,13 +6,19 @@ defmodule Membrane.RTMP.MessageParser do alias Membrane.RTMP.{Handshake, Header, Message, Messages} @enforce_keys [:state_machine, :buffer, :chunk_size, :handshake] - defstruct @enforce_keys ++ [previous_headers: %{}, current_tx_id: 1] + defstruct @enforce_keys ++ [previous_headers: %{}, current_tx_id: 1, partial_messages: %{}] @type state_machine_t :: :handshake | :connecting | :connected @type packet_t :: binary() + @type partial_message :: %{ + header: Header.t(), + body_chunks: [binary()], + bytes_received: non_neg_integer() + } + @type t :: %__MODULE__{ state_machine: state_machine_t(), buffer: binary(), @@ -20,7 +26,9 @@ defmodule Membrane.RTMP.MessageParser do # the chunk size of incoming messages (the other side of connection) chunk_size: non_neg_integer(), current_tx_id: non_neg_integer(), - handshake: Handshake.State.t() + handshake: Handshake.State.t(), + # tracks partial messages per chunk stream ID to support interleaving + partial_messages: %{non_neg_integer() => partial_message()} } @doc """ @@ -39,7 +47,8 @@ defmodule Membrane.RTMP.MessageParser do # previous header for each of the stream chunks previous_headers: %{}, chunk_size: chunk_size, - handshake: handshake + handshake: handshake, + partial_messages: %{} } end @@ -104,14 +113,43 @@ defmodule Membrane.RTMP.MessageParser do ) do payload = buffer <> packet - case read_frame(payload, state.previous_headers, chunk_size) do + case read_frame(payload, state.previous_headers, chunk_size, state.partial_messages) do {:error, :need_more_data} -> {:need_more_data, %__MODULE__{state | buffer: payload}} - {header, message, rest} -> - state = update_state_with_message(state, header, message, rest) + {:error, :invalid_chunk_stream} -> + # Invalid chunk stream detected, likely due to corrupted data + # Clear buffer and partial messages to try to recover + Membrane.Logger.error( + "Invalid chunk stream detected. Clearing buffers to attempt recovery." + ) + + {:need_more_data, + %__MODULE__{state | buffer: <<>>, partial_messages: %{}}} + + {:complete, header, message, rest, new_partial_messages} -> + # Complete message - update_state_with_message will handle previous_headers + state = + update_state_with_message(state, header, message, rest) + |> Map.put(:partial_messages, new_partial_messages) {header, message, state} + + {:partial, rest, new_partial_messages, new_previous_headers} -> + # Partial message - update previous_headers if needed + new_state = + if new_previous_headers == :no_change do + %__MODULE__{state | buffer: rest, partial_messages: new_partial_messages} + else + %__MODULE__{ + state + | buffer: rest, + partial_messages: new_partial_messages, + previous_headers: new_previous_headers + } + end + + {:need_more_data, new_state} end end @@ -165,87 +203,149 @@ defmodule Membrane.RTMP.MessageParser do ) do payload = buffer <> packet - case read_frame(payload, state.previous_headers, chunk_size) do + case read_frame(payload, state.previous_headers, chunk_size, state.partial_messages) do {:error, :need_more_data} -> {:need_more_data, %__MODULE__{state | buffer: payload}} - {header, message, rest} -> - state = update_state_with_message(state, header, message, rest) + {:error, :invalid_chunk_stream} -> + # Invalid chunk stream detected, likely due to corrupted data + # Clear buffer and partial messages to try to recover + Membrane.Logger.error( + "Invalid chunk stream detected. Clearing buffers to attempt recovery." + ) + + {:need_more_data, + %__MODULE__{state | buffer: <<>>, partial_messages: %{}}} + + {:complete, header, message, rest, new_partial_messages} -> + # Complete message - update_state_with_message will handle previous_headers + state = + update_state_with_message(state, header, message, rest) + |> Map.put(:partial_messages, new_partial_messages) {header, message, state} + + {:partial, rest, new_partial_messages, new_previous_headers} -> + # Partial message - update previous_headers if needed + new_state = + if new_previous_headers == :no_change do + %__MODULE__{state | buffer: rest, partial_messages: new_partial_messages} + else + %__MODULE__{ + state + | buffer: rest, + partial_messages: new_partial_messages, + previous_headers: new_previous_headers + } + end + + {:need_more_data, new_state} end end - defp read_frame(packet, previous_headers, chunk_size) do + defp read_frame(packet, previous_headers, chunk_size, partial_messages) do case Header.deserialize(packet, previous_headers) do {%Header{} = header, rest} -> - chunked_body_size = calculate_chunked_body_size(header, chunk_size) - - case rest do - <> -> - combined_body = combine_body_chunks(body, chunk_size, header) + # Determine if this is a continuation chunk + # A chunk is a continuation if we have a partial message buffered for this chunk_stream_id + is_continuation = Map.has_key?(partial_messages, header.chunk_stream_id) + + if is_continuation do + # This is a continuation of an existing partial message + continue_partial_message(header, rest, chunk_size, partial_messages, previous_headers) + else + # This is a new message (Type 0, 1, 2, or Type 3 for a new chunk stream) + start_new_message(header, rest, chunk_size, partial_messages, previous_headers) + end - message = Message.deserialize_message(header.type_id, combined_body) + {:error, {:missing_previous_header, chunk_stream_id, header_type}} -> + Membrane.Logger.warning( + "Received #{header_type} header for unknown chunk_stream_id: #{chunk_stream_id}. " <> + "This may indicate chunk interleaving issue or corrupted stream data. Skipping." + ) - {header, message, rest} - - _rest -> - {:error, :need_more_data} - end + {:error, :invalid_chunk_stream} {:error, :need_more_data} = error -> error end end - defp calculate_chunked_body_size(%Header{body_size: body_size} = header, chunk_size) do - if body_size > chunk_size do - # if a message's body is greater than the chunk size then - # after every chunk_size's bytes there is a 0x03 one byte header that - # needs to be stripped and is not counted into the body_size - headers_to_strip = div(body_size - 1, chunk_size) - - # if the initial header contains a extended timestamp then - # every following chunk will contain the timestamp - timestamps_to_strip = if header.extended_timestamp?, do: headers_to_strip * 4, else: 0 - - body_size + headers_to_strip + timestamps_to_strip - else - body_size - end - end + defp start_new_message(header, rest, chunk_size, partial_messages, previous_headers) do + # Calculate how many bytes to read for this chunk + bytes_to_read = min(header.body_size, chunk_size) + + case rest do + <> -> + if bytes_to_read >= header.body_size do + # Message is complete in a single chunk + # Don't update previous_headers here - update_state_with_message will handle it + message = Message.deserialize_message(header.type_id, chunk) + {:complete, header, message, rest, partial_messages} + else + # Message spans multiple chunks, store as partial + # Also update previous_headers so Type 3 continuation headers can be deserialized + partial = %{ + header: header, + body_chunks: [chunk], + bytes_received: bytes_to_read + } + + new_partial_messages = Map.put(partial_messages, header.chunk_stream_id, partial) + new_previous_headers = Map.put(previous_headers, header.chunk_stream_id, header) + {:partial, rest, new_partial_messages, new_previous_headers} + end - # message's size can exceed the defined chunk size - # in this case the message gets divided into - # a sequence of smaller packets separated by the a header type 3 byte - # (the first 2 bits has to be 0b11) - defp combine_body_chunks(body, chunk_size, header) do - if byte_size(body) <= chunk_size do - body - else - do_combine_body_chunks(body, chunk_size, header, []) + _rest -> + {:error, :need_more_data} end end - defp do_combine_body_chunks(body, chunk_size, header, acc) do - case body do - <> - when header.extended_timestamp? and timestamp == header.timestamp -> - do_combine_body_chunks(rest, chunk_size, header, [acc, body]) - - # cut out the header byte (staring with 0b11) - <> -> - do_combine_body_chunks(rest, chunk_size, header, [acc, body]) + defp continue_partial_message(header, rest, chunk_size, partial_messages, _previous_headers) do + partial = Map.get(partial_messages, header.chunk_stream_id) - <<_body::binary-size(chunk_size), header_type::2, _chunk_stream_id::6, _rest::binary>> -> - Membrane.Logger.warning( - "Unexpected header type when combining body chunks: #{header_type}" - ) - - IO.iodata_to_binary([acc, body]) - - body -> - IO.iodata_to_binary([acc, body]) + if partial == nil do + # This shouldn't happen - Type 3 header for unknown chunk_stream_id + # This will be caught by the missing_previous_header error + {:error, :invalid_chunk_stream} + else + # Calculate remaining bytes for this message + remaining_bytes = partial.header.body_size - partial.bytes_received + bytes_to_read = min(remaining_bytes, chunk_size) + + case rest do + <> -> + new_bytes_received = partial.bytes_received + bytes_to_read + new_body_chunks = [partial.body_chunks, chunk] + + if new_bytes_received >= partial.header.body_size do + # Message is now complete + # Don't update previous_headers here - update_state_with_message will handle it + complete_body = IO.iodata_to_binary(new_body_chunks) + message = Message.deserialize_message(partial.header.type_id, complete_body) + new_partial_messages = Map.delete(partial_messages, header.chunk_stream_id) + + # Use the original header from when the message started + {:complete, partial.header, message, rest, new_partial_messages} + else + # Still partial, continue buffering + # Keep previous_headers as is (already has the header from first chunk) + updated_partial = %{ + partial + | body_chunks: new_body_chunks, + bytes_received: new_bytes_received + } + + new_partial_messages = + Map.put(partial_messages, header.chunk_stream_id, updated_partial) + + # Return :no_change for previous_headers to indicate no update needed + {:partial, rest, new_partial_messages, :no_change} + end + + _rest -> + {:error, :need_more_data} + end end end From 204f818b67b91dae53fb12147f83c9f8ed2a43c2 Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Thu, 30 Oct 2025 17:07:08 +0100 Subject: [PATCH 10/17] Protect against duplicate connections --- lib/membrane_rtmp_plugin/rtmp/source/source.ex | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/source/source.ex b/lib/membrane_rtmp_plugin/rtmp/source/source.ex index 46c015b4..d88866c6 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/source.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/source.ex @@ -176,18 +176,31 @@ defmodule Membrane.RTMP.Source do def handle_info( {:client_ref, client_ref, app, stream_key}, _ctx, - %{mode: :builtin_server} = state + %{mode: :builtin_server, client_ref: nil} = state ) when app == state.app and stream_key == state.stream_key do {[setup: :complete], %{state | client_ref: client_ref}} end + @impl true + def handle_info( + {:client_ref, _client_ref, app, stream_key}, + _ctx, + %{mode: :builtin_server} = state + ) + when app == state.app and stream_key == state.stream_key do + # Duplicate connection on same app/stream_key - ignore it + Logger.warning("Duplicate client connection detected on /#{app}/#{stream_key}, ignoring") + {[], state} + end + @impl true def handle_info( {:client_ref, _client_ref, app, stream_key}, _ctx, %{mode: :builtin_server} = state ) do + # Connection on wrong app/stream_key Logger.warning("Unexpected client connected on /#{app}/#{stream_key}") {[], state} end From d8175c35832340a4eb96ccfa56e4fbce81ff6cd7 Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Fri, 31 Oct 2025 10:24:48 +0100 Subject: [PATCH 11/17] Format --- lib/membrane_rtmp_plugin/rtmp/message_parser.ex | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex index 0c61a716..a274871f 100644 --- a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex +++ b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex @@ -124,8 +124,7 @@ defmodule Membrane.RTMP.MessageParser do "Invalid chunk stream detected. Clearing buffers to attempt recovery." ) - {:need_more_data, - %__MODULE__{state | buffer: <<>>, partial_messages: %{}}} + {:need_more_data, %__MODULE__{state | buffer: <<>>, partial_messages: %{}}} {:complete, header, message, rest, new_partial_messages} -> # Complete message - update_state_with_message will handle previous_headers @@ -214,8 +213,7 @@ defmodule Membrane.RTMP.MessageParser do "Invalid chunk stream detected. Clearing buffers to attempt recovery." ) - {:need_more_data, - %__MODULE__{state | buffer: <<>>, partial_messages: %{}}} + {:need_more_data, %__MODULE__{state | buffer: <<>>, partial_messages: %{}}} {:complete, header, message, rest, new_partial_messages} -> # Complete message - update_state_with_message will handle previous_headers From 4c1cd9af730db0ad39433c0c212adb07a608628b Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Fri, 31 Oct 2025 10:33:30 +0100 Subject: [PATCH 12/17] format --- lib/membrane_rtmp_plugin/rtmp/header.ex | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/header.ex b/lib/membrane_rtmp_plugin/rtmp/header.ex index 66185259..6111e817 100644 --- a/lib/membrane_rtmp_plugin/rtmp/header.ex +++ b/lib/membrane_rtmp_plugin/rtmp/header.ex @@ -125,7 +125,8 @@ defmodule Membrane.RTMP.Header do if previous_header == nil do {:error, {:missing_previous_header, chunk_stream_id, :type_1}} else - with {timestamp_delta, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp_delta) do + with {timestamp_delta, extended_timestamp?, rest} <- + extract_timestamp(rest, timestamp_delta) do header = %__MODULE__{ chunk_stream_id: chunk_stream_id, timestamp: previous_header.timestamp + timestamp_delta, @@ -151,7 +152,8 @@ defmodule Membrane.RTMP.Header do if previous_header == nil do {:error, {:missing_previous_header, chunk_stream_id, :type_2}} else - with {timestamp_delta, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp_delta) do + with {timestamp_delta, extended_timestamp?, rest} <- + extract_timestamp(rest, timestamp_delta) do header = %__MODULE__{ chunk_stream_id: chunk_stream_id, timestamp: previous_header.timestamp + timestamp_delta, From af71abdedb99b0d80a1a475024bcd29af8cc0979 Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Mon, 17 Nov 2025 13:10:43 +0100 Subject: [PATCH 13/17] Handle multiple chunks in the same packet (fix ffmpeg) --- lib/membrane_rtmp_plugin/rtmp/message_parser.ex | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex index a274871f..ea255dc9 100644 --- a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex +++ b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex @@ -148,7 +148,13 @@ defmodule Membrane.RTMP.MessageParser do } end - {:need_more_data, new_state} + # Continue processing if there's more data in the buffer + # This handles the case where multiple chunks arrive in the same packet + if rest != <<>> do + handle_packet(<<>>, new_state) + else + {:need_more_data, new_state} + end end end @@ -237,7 +243,13 @@ defmodule Membrane.RTMP.MessageParser do } end - {:need_more_data, new_state} + # Continue processing if there's more data in the buffer + # This handles the case where multiple chunks arrive in the same packet + if rest != <<>> do + handle_packet(<<>>, new_state) + else + {:need_more_data, new_state} + end end end From 9825c30307a7eed57639b71523d209e43740f619 Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Thu, 19 Mar 2026 16:43:05 +0100 Subject: [PATCH 14/17] Fix credo nesting issue in continue_partial_message --- .../rtmp/message_parser.ex | 65 ++++++++++--------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex index 62ebeb2f..a4fe334a 100644 --- a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex +++ b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex @@ -323,39 +323,44 @@ defmodule Membrane.RTMP.MessageParser do remaining_bytes = partial.header.body_size - partial.bytes_received bytes_to_read = min(remaining_bytes, chunk_size) - case rest do - <> -> - new_bytes_received = partial.bytes_received + bytes_to_read - new_body_chunks = [partial.body_chunks, chunk] - - if new_bytes_received >= partial.header.body_size do - # Message is now complete - # Don't update previous_headers here - update_state_with_message will handle it - complete_body = IO.iodata_to_binary(new_body_chunks) - message = Message.deserialize_message(partial.header.type_id, complete_body) - new_partial_messages = Map.delete(partial_messages, header.chunk_stream_id) - - # Use the original header from when the message started - {:complete, partial.header, message, rest, new_partial_messages} - else - # Still partial, continue buffering - # Keep previous_headers as is (already has the header from first chunk) - updated_partial = %{ - partial - | body_chunks: new_body_chunks, - bytes_received: new_bytes_received - } + do_continue_partial_message(header, rest, bytes_to_read, partial, partial_messages) + end + end - new_partial_messages = - Map.put(partial_messages, header.chunk_stream_id, updated_partial) + defp do_continue_partial_message(header, rest, bytes_to_read, partial, partial_messages) do + case rest do + <> -> + complete_partial_chunk(header, chunk, rest, partial, partial_messages) - # Return :no_change for previous_headers to indicate no update needed - {:partial, rest, new_partial_messages, :no_change} - end + _rest -> + {:error, :need_more_data} + end + end + + defp complete_partial_chunk(header, chunk, rest, partial, partial_messages) do + new_bytes_received = partial.bytes_received + byte_size(chunk) + new_body_chunks = [partial.body_chunks, chunk] + + if new_bytes_received >= partial.header.body_size do + # Message is now complete + complete_body = IO.iodata_to_binary(new_body_chunks) + message = Message.deserialize_message(partial.header.type_id, complete_body) + new_partial_messages = Map.delete(partial_messages, header.chunk_stream_id) + + # Use the original header from when the message started + {:complete, partial.header, message, rest, new_partial_messages} + else + # Still partial, continue buffering + updated_partial = %{ + partial + | body_chunks: new_body_chunks, + bytes_received: new_bytes_received + } + + new_partial_messages = + Map.put(partial_messages, header.chunk_stream_id, updated_partial) - _rest -> - {:error, :need_more_data} - end + {:partial, rest, new_partial_messages, :no_change} end end From 72f84ffcb357198bd8dc719ed62ec467947ef438 Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Thu, 19 Mar 2026 16:52:19 +0100 Subject: [PATCH 15/17] Stop ClientHandler on connection close and timeout ClientHandler never returned {:stop, ...} in any code path, causing leaked processes. This is especially problematic with TCP health checks (e.g. from NLB) that connect and immediately disconnect every 30s, each leaving a zombie GenServer behind. --- lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex index bf0fd061..261cf0c2 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex @@ -117,7 +117,7 @@ defmodule Membrane.RTMPServer.ClientHandler do events = [:connection_closed] state = Enum.reduce(events, state, &handle_event/2) - {:noreply, state} + {:stop, :normal, state} end @impl true @@ -130,7 +130,7 @@ defmodule Membrane.RTMPServer.ClientHandler do events = [:connection_closed] state = Enum.reduce(events, state, &handle_event/2) - {:noreply, state} + {:stop, :normal, state} end @impl true @@ -153,7 +153,7 @@ defmodule Membrane.RTMPServer.ClientHandler do :gen_tcp.close(state.socket) end - {:noreply, state} + {:stop, :normal, state} end @impl true From 3ab116fb1a2b8b94dced9b9abf88521e8526f7ef Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Thu, 19 Mar 2026 16:54:51 +0100 Subject: [PATCH 16/17] Fix type checker warning in Header.deserialize_message_header/3 --- lib/membrane_rtmp_plugin/rtmp/header.ex | 36 ++++++++++++------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp/header.ex b/lib/membrane_rtmp_plugin/rtmp/header.ex index 6111e817..62f7e950 100644 --- a/lib/membrane_rtmp_plugin/rtmp/header.ex +++ b/lib/membrane_rtmp_plugin/rtmp/header.ex @@ -174,30 +174,30 @@ defmodule Membrane.RTMP.Header do <>, previous_headers ) do - previous_header = previous_headers[chunk_stream_id] - - if previous_header == nil do - {:error, {:missing_previous_header, chunk_stream_id, :type_3}} - else - if previous_header.extended_timestamp? do - with {timestamp_delta, _extended_timestamp?, rest} <- - extract_timestamp(rest, @extended_timestamp_marker) do + case previous_headers[chunk_stream_id] do + nil -> + {:error, {:missing_previous_header, chunk_stream_id, :type_3}} + + %__MODULE__{} = previous_header -> + if previous_header.extended_timestamp? do + with {timestamp_delta, _extended_timestamp?, rest} <- + extract_timestamp(rest, @extended_timestamp_marker) do + header = %__MODULE__{ + previous_header + | timestamp: previous_header.timestamp + timestamp_delta, + timestamp_delta: timestamp_delta + } + + {header, rest} + end + else header = %__MODULE__{ previous_header - | timestamp: previous_header.timestamp + timestamp_delta, - timestamp_delta: timestamp_delta + | timestamp: previous_header.timestamp + previous_header.timestamp_delta } {header, rest} end - else - header = %__MODULE__{ - previous_header - | timestamp: previous_header.timestamp + previous_header.timestamp_delta - } - - {header, rest} - end end end From e40b0766ec5877656b0776a1fe428acce939a7a0 Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Wed, 25 Mar 2026 11:19:46 +0100 Subject: [PATCH 17/17] Fix client_timeout unconditionally killing active RTMP connections The client_timeout handler always called {:stop, :normal, state} regardless of whether the client had successfully published. This caused all RTMP connections to be terminated after 5 seconds, even when data was actively flowing. Move the stop into the not-published branch so active connections are left alone when the timeout fires. --- lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex index 261cf0c2..cdb2b02e 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex @@ -151,9 +151,10 @@ defmodule Membrane.RTMPServer.ClientHandler do if not state.published? do Logger.warning("No demand made for client /#{app}/#{stream_key}, terminating connection.") :gen_tcp.close(state.socket) + {:stop, :normal, state} + else + {:noreply, state} end - - {:stop, :normal, state} end @impl true