diff --git a/lib/membrane_rtmp_plugin/rtmp/header.ex b/lib/membrane_rtmp_plugin/rtmp/header.ex index 5ccf486..62f7e95 100644 --- a/lib/membrane_rtmp_plugin/rtmp/header.ex +++ b/lib/membrane_rtmp_plugin/rtmp/header.ex @@ -39,11 +39,6 @@ defmodule Membrane.RTMP.Header do defmacro type(:amf_data), do: 0x12 defmacro type(:amf_command), do: 0x14 - @header_type_0 <<0x0::2>> - @header_type_1 <<0x1::2>> - @header_type_2 <<0x2::2>> - @header_type_3 <<0x3::2>> - @extended_timestamp_marker 0xFFFFFF @spec new(Keyword.t()) :: t() @@ -62,14 +57,50 @@ defmodule Membrane.RTMP.Header do * `0b11` - all values are derived from the previous header with the same `chunk_stream_id` """ @spec deserialize(binary(), t() | nil) :: {t(), rest :: binary()} | {:error, :need_more_data} - def deserialize(binary, previous_headers \\ nil) + + def deserialize(<>, previous_headers) do + # chunk basic header is made of the fmt and chunk_stream_id fields. + fmt = + case header_type do + 0 -> :type_0 + 1 -> :type_1 + 2 -> :type_2 + 3 -> :type_3 + end + + {chunk_stream_id, rest} = deserialize_chunk_stream_id(rest) + + deserialize_message_header( + %{fmt: fmt, chunk_stream_id: chunk_stream_id}, + rest, + previous_headers + ) + end + + defp deserialize_chunk_stream_id(<<0::6, stream_id::8, rest::binary>>) do + # 2 byte format + {stream_id + 64, rest} + end + + defp deserialize_chunk_stream_id( + <<1::6, stream_id_part_2::8, stream_id_part_3::8, rest::binary>> + ) do + # 3 byte format + {stream_id_part_3 * 256 + stream_id_part_2 + 64, rest} + end + + defp deserialize_chunk_stream_id(<>) do + # 1 byte format + {stream_id, rest} + end # only the deserialization of the 0b00 type can have `nil` previous header - def deserialize( - <<@header_type_0::bitstring, chunk_stream_id::6, timestamp::24, body_size::24, type_id::8, - stream_id::little-integer-size(32), rest::binary>>, - _previous_headers - ) do + defp deserialize_message_header( + %{fmt: :type_0, chunk_stream_id: chunk_stream_id}, + <>, + _previous_headers + ) do with {timestamp, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp) do header = %__MODULE__{ chunk_stream_id: chunk_stream_id, @@ -84,101 +115,139 @@ defmodule Membrane.RTMP.Header do end end - def deserialize( - <<@header_type_1::bitstring, chunk_stream_id::6, timestamp_delta::24, body_size::24, - type_id::8, rest::binary>>, - previous_headers - ) do - with {timestamp_delta, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp_delta) do - header = %__MODULE__{ - chunk_stream_id: chunk_stream_id, - timestamp: previous_headers[chunk_stream_id].timestamp + timestamp_delta, - timestamp_delta: timestamp_delta, - extended_timestamp?: extended_timestamp?, - body_size: body_size, - type_id: type_id, - stream_id: previous_headers[chunk_stream_id].stream_id - } - - {header, rest} - end - end + defp deserialize_message_header( + %{fmt: :type_1, chunk_stream_id: chunk_stream_id}, + <>, + previous_headers + ) do + previous_header = previous_headers[chunk_stream_id] - def deserialize( - <<@header_type_2::bitstring, chunk_stream_id::6, timestamp_delta::24, rest::binary>>, - previous_headers - ) do - with {timestamp_delta, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp_delta) do - header = %__MODULE__{ - chunk_stream_id: chunk_stream_id, - timestamp: previous_headers[chunk_stream_id].timestamp + timestamp_delta, - timestamp_delta: timestamp_delta, - extended_timestamp?: extended_timestamp?, - body_size: previous_headers[chunk_stream_id].body_size, - type_id: previous_headers[chunk_stream_id].type_id, - stream_id: previous_headers[chunk_stream_id].stream_id - } + if previous_header == nil do + {:error, {:missing_previous_header, chunk_stream_id, :type_1}} + else + with {timestamp_delta, extended_timestamp?, rest} <- + extract_timestamp(rest, timestamp_delta) do + header = %__MODULE__{ + chunk_stream_id: chunk_stream_id, + timestamp: previous_header.timestamp + timestamp_delta, + timestamp_delta: timestamp_delta, + extended_timestamp?: extended_timestamp?, + body_size: body_size, + type_id: type_id, + stream_id: previous_header.stream_id + } - {header, rest} + {header, rest} + end end end - def deserialize( - <<@header_type_3::bitstring, chunk_stream_id::6, rest::binary>>, - previous_headers - ) do - %__MODULE__{} = previous_header = previous_headers[chunk_stream_id] + defp deserialize_message_header( + %{fmt: :type_2, chunk_stream_id: chunk_stream_id}, + <>, + previous_headers + ) do + previous_header = previous_headers[chunk_stream_id] - if previous_header.extended_timestamp? do - with {timestamp_delta, _extended_timestamp?, rest} <- - extract_timestamp(rest, @extended_timestamp_marker) do + if previous_header == nil do + {:error, {:missing_previous_header, chunk_stream_id, :type_2}} + else + with {timestamp_delta, extended_timestamp?, rest} <- + extract_timestamp(rest, timestamp_delta) do header = %__MODULE__{ - previous_header - | timestamp: previous_header.timestamp + timestamp_delta, - timestamp_delta: timestamp_delta + chunk_stream_id: chunk_stream_id, + timestamp: previous_header.timestamp + timestamp_delta, + timestamp_delta: timestamp_delta, + extended_timestamp?: extended_timestamp?, + body_size: previous_header.body_size, + type_id: previous_header.type_id, + stream_id: previous_header.stream_id } {header, rest} end - else - header = %__MODULE__{ - previous_header - | timestamp: previous_header.timestamp + previous_header.timestamp_delta - } - - {header, rest} end end - def deserialize( - <<@header_type_0::bitstring, _chunk_stream_id::6, _rest::binary>>, - _prev_header - ), - do: {:error, :need_more_data} + defp deserialize_message_header( + %{fmt: :type_3, chunk_stream_id: chunk_stream_id}, + <>, + previous_headers + ) do + case previous_headers[chunk_stream_id] do + nil -> + {:error, {:missing_previous_header, chunk_stream_id, :type_3}} + + %__MODULE__{} = previous_header -> + if previous_header.extended_timestamp? do + with {timestamp_delta, _extended_timestamp?, rest} <- + extract_timestamp(rest, @extended_timestamp_marker) do + header = %__MODULE__{ + previous_header + | timestamp: previous_header.timestamp + timestamp_delta, + timestamp_delta: timestamp_delta + } + + {header, rest} + end + else + header = %__MODULE__{ + previous_header + | timestamp: previous_header.timestamp + previous_header.timestamp_delta + } - def deserialize( - <<@header_type_1::bitstring, _chunk_stream_id::6, _rest::binary>>, - _prev_header - ), - do: {:error, :need_more_data} + {header, rest} + end + end + end - def deserialize( - <<@header_type_2::bitstring, _chunk_stream_id::6, _rest::binary>>, - _prev_header - ), - do: {:error, :need_more_data} + defp deserialize_message_header(_basic_header, _data, _prev_header), + do: {:error, :need_more_data} @spec serialize(t()) :: binary() - def serialize(%__MODULE__{} = header) do + def serialize(%__MODULE__{chunk_stream_id: chunk_stream_id} = header) + when chunk_stream_id >= 2 and chunk_stream_id <= 63 do + # 1-byte format: fmt (2 bits) + chunk_stream_id (6 bits) + %{ + timestamp: timestamp, + body_size: body_size, + type_id: type_id, + stream_id: stream_id + } = header + + <<0::2, chunk_stream_id::6, timestamp::24, body_size::24, type_id::8, + stream_id::little-integer-size(32)>> + end + + def serialize(%__MODULE__{chunk_stream_id: chunk_stream_id} = header) + when chunk_stream_id >= 64 and chunk_stream_id <= 319 do + # 2-byte format: fmt (2 bits) + marker 0 (6 bits) + (id - 64) (8 bits) + %{ + timestamp: timestamp, + body_size: body_size, + type_id: type_id, + stream_id: stream_id + } = header + + <<0::2, 0::6, chunk_stream_id - 64::8, timestamp::24, body_size::24, type_id::8, + stream_id::little-integer-size(32)>> + end + + def serialize(%__MODULE__{chunk_stream_id: chunk_stream_id} = header) + when chunk_stream_id >= 320 and chunk_stream_id <= 65_599 do + # 3-byte format: fmt (2 bits) + marker 1 (6 bits) + low byte (8 bits) + high byte (8 bits) %{ - chunk_stream_id: chunk_stream_id, timestamp: timestamp, body_size: body_size, type_id: type_id, stream_id: stream_id } = header - <<@header_type_0::bitstring, chunk_stream_id::6, timestamp::24, body_size::24, type_id::8, + id_minus_64 = chunk_stream_id - 64 + low_byte = rem(id_minus_64, 256) + high_byte = div(id_minus_64, 256) + + <<0::2, 1::6, low_byte::8, high_byte::8, timestamp::24, body_size::24, type_id::8, stream_id::little-integer-size(32)>> end diff --git a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex index 7931e2f..a4fe334 100644 --- a/lib/membrane_rtmp_plugin/rtmp/message_parser.ex +++ b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex @@ -6,13 +6,19 @@ defmodule Membrane.RTMP.MessageParser do alias Membrane.RTMP.{Handshake, Header, Message, Messages} @enforce_keys [:state_machine, :buffer, :chunk_size, :handshake] - defstruct @enforce_keys ++ [previous_headers: %{}, current_tx_id: 1] + defstruct @enforce_keys ++ [previous_headers: %{}, current_tx_id: 1, partial_messages: %{}] @type state_machine_t :: :handshake | :connecting | :connected @type packet_t :: binary() + @type partial_message :: %{ + header: Header.t(), + body_chunks: [binary()], + bytes_received: non_neg_integer() + } + @type t :: %__MODULE__{ state_machine: state_machine_t(), buffer: binary(), @@ -20,7 +26,9 @@ defmodule Membrane.RTMP.MessageParser do # the chunk size of incoming messages (the other side of connection) chunk_size: non_neg_integer(), current_tx_id: non_neg_integer(), - handshake: Handshake.State.t() + handshake: Handshake.State.t(), + # tracks partial messages per chunk stream ID to support interleaving + partial_messages: %{non_neg_integer() => partial_message()} } @doc """ @@ -39,7 +47,8 @@ defmodule Membrane.RTMP.MessageParser do # previous header for each of the stream chunks previous_headers: %{}, chunk_size: chunk_size, - handshake: handshake + handshake: handshake, + partial_messages: %{} } end @@ -104,14 +113,48 @@ defmodule Membrane.RTMP.MessageParser do ) do payload = buffer <> packet - case read_frame(payload, state.previous_headers, chunk_size) do + case read_frame(payload, state.previous_headers, chunk_size, state.partial_messages) do {:error, :need_more_data} -> {:need_more_data, %__MODULE__{state | buffer: payload}} - {header, message, rest} -> - state = update_state_with_message(state, header, message, rest) + {:error, :invalid_chunk_stream} -> + # Invalid chunk stream detected, likely due to corrupted data + # Clear buffer and partial messages to try to recover + Membrane.Logger.error( + "Invalid chunk stream detected. Clearing buffers to attempt recovery." + ) + + {:need_more_data, %__MODULE__{state | buffer: <<>>, partial_messages: %{}}} + + {:complete, header, message, rest, new_partial_messages} -> + # Complete message - update_state_with_message will handle previous_headers + state = + update_state_with_message(state, header, message, rest) + |> Map.put(:partial_messages, new_partial_messages) {header, message, state} + + {:partial, rest, new_partial_messages, new_previous_headers} -> + # Partial message - update previous_headers if needed + new_state = + if new_previous_headers == :no_change do + %__MODULE__{state | buffer: rest, partial_messages: new_partial_messages} + else + %__MODULE__{ + state + | buffer: rest, + partial_messages: new_partial_messages, + previous_headers: new_previous_headers + } + end + + # Continue processing if there's more data in the buffer + # This handles the case where multiple chunks arrive in the same packet + if rest != <<>> do + handle_packet(<<>>, new_state) + else + {:need_more_data, new_state} + end end end @@ -165,87 +208,159 @@ defmodule Membrane.RTMP.MessageParser do ) do payload = buffer <> packet - case read_frame(payload, state.previous_headers, chunk_size) do + case read_frame(payload, state.previous_headers, chunk_size, state.partial_messages) do {:error, :need_more_data} -> {:need_more_data, %__MODULE__{state | buffer: payload}} - {header, message, rest} -> - state = update_state_with_message(state, header, message, rest) + {:error, :invalid_chunk_stream} -> + # Invalid chunk stream detected, likely due to corrupted data + # Clear buffer and partial messages to try to recover + Membrane.Logger.error( + "Invalid chunk stream detected. Clearing buffers to attempt recovery." + ) + + {:need_more_data, %__MODULE__{state | buffer: <<>>, partial_messages: %{}}} + + {:complete, header, message, rest, new_partial_messages} -> + # Complete message - update_state_with_message will handle previous_headers + state = + update_state_with_message(state, header, message, rest) + |> Map.put(:partial_messages, new_partial_messages) {header, message, state} + + {:partial, rest, new_partial_messages, new_previous_headers} -> + # Partial message - update previous_headers if needed + new_state = + if new_previous_headers == :no_change do + %__MODULE__{state | buffer: rest, partial_messages: new_partial_messages} + else + %__MODULE__{ + state + | buffer: rest, + partial_messages: new_partial_messages, + previous_headers: new_previous_headers + } + end + + # Continue processing if there's more data in the buffer + # This handles the case where multiple chunks arrive in the same packet + if rest != <<>> do + handle_packet(<<>>, new_state) + else + {:need_more_data, new_state} + end end end - defp read_frame(packet, previous_headers, chunk_size) do + defp read_frame(packet, previous_headers, chunk_size, partial_messages) do case Header.deserialize(packet, previous_headers) do {%Header{} = header, rest} -> - chunked_body_size = calculate_chunked_body_size(header, chunk_size) - - case rest do - <> -> - combined_body = combine_body_chunks(body, chunk_size, header) - - message = Message.deserialize_message(header.type_id, combined_body) + # Determine if this is a continuation chunk + # A chunk is a continuation if we have a partial message buffered for this chunk_stream_id + is_continuation = Map.has_key?(partial_messages, header.chunk_stream_id) + + if is_continuation do + # This is a continuation of an existing partial message + continue_partial_message(header, rest, chunk_size, partial_messages, previous_headers) + else + # This is a new message (Type 0, 1, 2, or Type 3 for a new chunk stream) + start_new_message(header, rest, chunk_size, partial_messages, previous_headers) + end - {header, message, rest} + {:error, {:missing_previous_header, chunk_stream_id, header_type}} -> + Membrane.Logger.warning( + "Received #{header_type} header for unknown chunk_stream_id: #{chunk_stream_id}. " <> + "This may indicate chunk interleaving issue or corrupted stream data. Skipping." + ) - _rest -> - {:error, :need_more_data} - end + {:error, :invalid_chunk_stream} {:error, :need_more_data} = error -> error end end - defp calculate_chunked_body_size(%Header{body_size: body_size} = header, chunk_size) do - if body_size > chunk_size do - # if a message's body is greater than the chunk size then - # after every chunk_size's bytes there is a 0x03 one byte header that - # needs to be stripped and is not counted into the body_size - headers_to_strip = div(body_size - 1, chunk_size) + defp start_new_message(header, rest, chunk_size, partial_messages, previous_headers) do + # Calculate how many bytes to read for this chunk + bytes_to_read = min(header.body_size, chunk_size) + + case rest do + <> -> + if bytes_to_read >= header.body_size do + # Message is complete in a single chunk + # Don't update previous_headers here - update_state_with_message will handle it + message = Message.deserialize_message(header.type_id, chunk) + {:complete, header, message, rest, partial_messages} + else + # Message spans multiple chunks, store as partial + # Also update previous_headers so Type 3 continuation headers can be deserialized + partial = %{ + header: header, + body_chunks: [chunk], + bytes_received: bytes_to_read + } + + new_partial_messages = Map.put(partial_messages, header.chunk_stream_id, partial) + new_previous_headers = Map.put(previous_headers, header.chunk_stream_id, header) + {:partial, rest, new_partial_messages, new_previous_headers} + end + + _rest -> + {:error, :need_more_data} + end + end - # if the initial header contains a extended timestamp then - # every following chunk will contain the timestamp - timestamps_to_strip = if header.extended_timestamp?, do: headers_to_strip * 4, else: 0 + defp continue_partial_message(header, rest, chunk_size, partial_messages, _previous_headers) do + partial = Map.get(partial_messages, header.chunk_stream_id) - body_size + headers_to_strip + timestamps_to_strip + if partial == nil do + # This shouldn't happen - Type 3 header for unknown chunk_stream_id + # This will be caught by the missing_previous_header error + {:error, :invalid_chunk_stream} else - body_size + # Calculate remaining bytes for this message + remaining_bytes = partial.header.body_size - partial.bytes_received + bytes_to_read = min(remaining_bytes, chunk_size) + + do_continue_partial_message(header, rest, bytes_to_read, partial, partial_messages) end end - # message's size can exceed the defined chunk size - # in this case the message gets divided into - # a sequence of smaller packets separated by the a header type 3 byte - # (the first 2 bits has to be 0b11) - defp combine_body_chunks(body, chunk_size, header) do - if byte_size(body) <= chunk_size do - body - else - do_combine_body_chunks(body, chunk_size, header, []) + defp do_continue_partial_message(header, rest, bytes_to_read, partial, partial_messages) do + case rest do + <> -> + complete_partial_chunk(header, chunk, rest, partial, partial_messages) + + _rest -> + {:error, :need_more_data} end end - defp do_combine_body_chunks(body, chunk_size, header, acc) do - case body do - <> - when header.extended_timestamp? and timestamp == header.timestamp -> - do_combine_body_chunks(rest, chunk_size, header, [acc, body]) + defp complete_partial_chunk(header, chunk, rest, partial, partial_messages) do + new_bytes_received = partial.bytes_received + byte_size(chunk) + new_body_chunks = [partial.body_chunks, chunk] - # cut out the header byte (staring with 0b11) - <> -> - do_combine_body_chunks(rest, chunk_size, header, [acc, body]) + if new_bytes_received >= partial.header.body_size do + # Message is now complete + complete_body = IO.iodata_to_binary(new_body_chunks) + message = Message.deserialize_message(partial.header.type_id, complete_body) + new_partial_messages = Map.delete(partial_messages, header.chunk_stream_id) - <<_body::binary-size(chunk_size), header_type::2, _chunk_stream_id::6, _rest::binary>> -> - Membrane.Logger.warning( - "Unexpected header type when combining body chunks: #{header_type}" - ) + # Use the original header from when the message started + {:complete, partial.header, message, rest, new_partial_messages} + else + # Still partial, continue buffering + updated_partial = %{ + partial + | body_chunks: new_body_chunks, + bytes_received: new_bytes_received + } - IO.iodata_to_binary([acc, body]) + new_partial_messages = + Map.put(partial_messages, header.chunk_stream_id, updated_partial) - body -> - IO.iodata_to_binary([acc, body]) + {:partial, rest, new_partial_messages, :no_change} end end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/source.ex b/lib/membrane_rtmp_plugin/rtmp/source/source.ex index 46c015b..d88866c 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/source.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/source.ex @@ -176,18 +176,31 @@ defmodule Membrane.RTMP.Source do def handle_info( {:client_ref, client_ref, app, stream_key}, _ctx, - %{mode: :builtin_server} = state + %{mode: :builtin_server, client_ref: nil} = state ) when app == state.app and stream_key == state.stream_key do {[setup: :complete], %{state | client_ref: client_ref}} end + @impl true + def handle_info( + {:client_ref, _client_ref, app, stream_key}, + _ctx, + %{mode: :builtin_server} = state + ) + when app == state.app and stream_key == state.stream_key do + # Duplicate connection on same app/stream_key - ignore it + Logger.warning("Duplicate client connection detected on /#{app}/#{stream_key}, ignoring") + {[], state} + end + @impl true def handle_info( {:client_ref, _client_ref, app, stream_key}, _ctx, %{mode: :builtin_server} = state ) do + # Connection on wrong app/stream_key Logger.warning("Unexpected client connected on /#{app}/#{stream_key}") {[], state} end diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex index bf0fd06..cdb2b02 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex @@ -117,7 +117,7 @@ defmodule Membrane.RTMPServer.ClientHandler do events = [:connection_closed] state = Enum.reduce(events, state, &handle_event/2) - {:noreply, state} + {:stop, :normal, state} end @impl true @@ -130,7 +130,7 @@ defmodule Membrane.RTMPServer.ClientHandler do events = [:connection_closed] state = Enum.reduce(events, state, &handle_event/2) - {:noreply, state} + {:stop, :normal, state} end @impl true @@ -151,9 +151,10 @@ defmodule Membrane.RTMPServer.ClientHandler do if not state.published? do Logger.warning("No demand made for client /#{app}/#{stream_key}, terminating connection.") :gen_tcp.close(state.socket) + {:stop, :normal, state} + else + {:noreply, state} end - - {:noreply, state} end @impl true diff --git a/test/membrane_rtmp_plugin/rtmp_header_test.exs b/test/membrane_rtmp_plugin/rtmp_header_test.exs new file mode 100644 index 0000000..0c8f3c9 --- /dev/null +++ b/test/membrane_rtmp_plugin/rtmp_header_test.exs @@ -0,0 +1,365 @@ +defmodule Membrane.RTMP.HeaderTest do + use ExUnit.Case, async: true + + alias Membrane.RTMP.Header + + describe "Extended Chunk Stream ID encoding" do + test "deserialize header with 2-byte chunk stream ID format (ID 64)" do + # Spec: When 6-bit field is 0, the ID is (second byte + 64) + # For chunk stream ID 64: second byte should be 0 (64 - 64 = 0) + # Header type 0 (0b00) with chunk_stream_id_marker=0 + # Format: [fmt:2 | cs_id:6] [cs_id-64:8] [timestamp:24] [body_size:24] [type_id:8] [stream_id:32] + + chunk_stream_id = 64 + timestamp = 1000 + body_size = 256 + # audio + type_id = 8 + stream_id = 1 + + binary = + << + # Header type 0 with chunk_stream_id field = 0 (indicates 2-byte format) + 0b00::2, + 0::6, + # Second byte: chunk_stream_id - 64 + chunk_stream_id - 64::8, + # Standard header fields + timestamp::24, + body_size::24, + type_id::8, + stream_id::little-32 + >> + + {header, <<>>} = Header.deserialize(binary, nil) + + assert header.chunk_stream_id == chunk_stream_id + assert header.timestamp == timestamp + assert header.body_size == body_size + assert header.type_id == type_id + assert header.stream_id == stream_id + end + + test "deserialize header with 2-byte chunk stream ID format (ID 319)" do + # For chunk stream ID 319: second byte should be 255 (319 - 64 = 255) + chunk_stream_id = 319 + timestamp = 2000 + body_size = 512 + # video + type_id = 9 + stream_id = 1 + + binary = + << + 0b00::2, + 0::6, + chunk_stream_id - 64::8, + timestamp::24, + body_size::24, + type_id::8, + stream_id::little-32 + >> + + {header, <<>>} = Header.deserialize(binary, nil) + + assert header.chunk_stream_id == chunk_stream_id + assert header.timestamp == timestamp + assert header.body_size == body_size + end + + test "deserialize header with 3-byte chunk stream ID format (ID 320)" do + # Spec: When 6-bit field is 1, the ID is ((third byte)*256 + (second byte) + 64) + # For chunk stream ID 320: (third byte)*256 + (second byte) + 64 = 320 + # So: (third byte)*256 + (second byte) = 256 + # third byte = 1, second byte = 0 + + chunk_stream_id = 320 + timestamp = 3000 + body_size = 1024 + type_id = 8 + stream_id = 1 + + # Calculate bytes for 3-byte format + # = 256 + id_minus_64 = chunk_stream_id - 64 + # = 0 + second_byte = rem(id_minus_64, 256) + # = 1 + third_byte = div(id_minus_64, 256) + + binary = + << + # Header type 0 with chunk_stream_id field = 1 (indicates 3-byte format) + 0b00::2, + 1::6, + # Second and third bytes for extended ID + second_byte::8, + third_byte::8, + # Standard header fields + timestamp::24, + body_size::24, + type_id::8, + stream_id::little-32 + >> + + {header, <<>>} = Header.deserialize(binary, nil) + + assert header.chunk_stream_id == chunk_stream_id + assert header.timestamp == timestamp + assert header.body_size == body_size + end + + test "deserialize header with 3-byte chunk stream ID format (ID 65599)" do + # Maximum chunk stream ID supported by 3-byte format + chunk_stream_id = 65_599 + timestamp = 4000 + body_size = 2048 + type_id = 9 + stream_id = 1 + + # = 65535 + id_minus_64 = chunk_stream_id - 64 + # = 255 + second_byte = rem(id_minus_64, 256) + # = 255 + third_byte = div(id_minus_64, 256) + + binary = + << + 0b00::2, + 1::6, + second_byte::8, + third_byte::8, + timestamp::24, + body_size::24, + type_id::8, + stream_id::little-32 + >> + + {header, <<>>} = Header.deserialize(binary, nil) + + assert header.chunk_stream_id == chunk_stream_id + assert header.timestamp == timestamp + assert header.body_size == body_size + end + + test "deserialize header type 1 with 2-byte chunk stream ID" do + # Test extended chunk stream ID with header type 1 (no stream_id) + chunk_stream_id = 100 + timestamp_delta = 500 + body_size = 128 + type_id = 8 + + # Create a previous header for type 1 deserialization + previous_header = %Header{ + chunk_stream_id: chunk_stream_id, + timestamp: 1000, + body_size: 100, + type_id: 8, + stream_id: 1 + } + + binary = + << + 0b01::2, + 0::6, + chunk_stream_id - 64::8, + timestamp_delta::24, + body_size::24, + type_id::8 + >> + + {header, <<>>} = Header.deserialize(binary, %{chunk_stream_id => previous_header}) + + assert header.chunk_stream_id == chunk_stream_id + assert header.timestamp == previous_header.timestamp + timestamp_delta + assert header.body_size == body_size + assert header.stream_id == previous_header.stream_id + end + + test "deserialize header type 2 with 3-byte chunk stream ID" do + # Test extended chunk stream ID with header type 2 (only timestamp delta) + chunk_stream_id = 500 + timestamp_delta = 42 + + id_minus_64 = chunk_stream_id - 64 + second_byte = rem(id_minus_64, 256) + third_byte = div(id_minus_64, 256) + + previous_header = %Header{ + chunk_stream_id: chunk_stream_id, + timestamp: 2000, + body_size: 256, + type_id: 9, + stream_id: 1 + } + + binary = + << + 0b10::2, + 1::6, + second_byte::8, + third_byte::8, + timestamp_delta::24 + >> + + {header, <<>>} = Header.deserialize(binary, %{chunk_stream_id => previous_header}) + + assert header.chunk_stream_id == chunk_stream_id + assert header.timestamp == previous_header.timestamp + timestamp_delta + assert header.body_size == previous_header.body_size + assert header.type_id == previous_header.type_id + end + + test "deserialize header type 3 with 2-byte chunk stream ID" do + # Test extended chunk stream ID with header type 3 (no new fields) + chunk_stream_id = 200 + + previous_header = %Header{ + chunk_stream_id: chunk_stream_id, + timestamp: 3000, + timestamp_delta: 100, + body_size: 512, + type_id: 8, + stream_id: 1, + extended_timestamp?: false + } + + binary = + << + 0b11::2, + 0::6, + chunk_stream_id - 64::8 + >> + + {header, <<>>} = Header.deserialize(binary, %{chunk_stream_id => previous_header}) + + assert header.chunk_stream_id == chunk_stream_id + assert header.timestamp == previous_header.timestamp + previous_header.timestamp_delta + end + + test "serialize header with 2-byte chunk stream ID format" do + # Test serialization of headers with extended chunk stream IDs + header = %Header{ + chunk_stream_id: 150, + timestamp: 5000, + body_size: 1024, + type_id: 9, + stream_id: 1 + } + + binary = Header.serialize(header) + + # Verify it can be deserialized back + {deserialized, <<>>} = Header.deserialize(binary, nil) + assert deserialized.chunk_stream_id == header.chunk_stream_id + assert deserialized.timestamp == header.timestamp + assert deserialized.body_size == header.body_size + end + + test "serialize header with 3-byte chunk stream ID format" do + # Test serialization with large chunk stream ID + header = %Header{ + chunk_stream_id: 1000, + timestamp: 6000, + body_size: 2048, + type_id: 8, + stream_id: 1 + } + + binary = Header.serialize(header) + + # Verify it can be deserialized back + {deserialized, <<>>} = Header.deserialize(binary, nil) + assert deserialized.chunk_stream_id == header.chunk_stream_id + assert deserialized.timestamp == header.timestamp + assert deserialized.body_size == header.body_size + end + + test "round-trip for chunk stream ID boundary values" do + # Test boundary values between different formats + test_ids = [ + # Minimum single-byte + 2, + # Maximum single-byte + 63, + # Minimum 2-byte + 64, + # Maximum 2-byte + 319, + # Minimum 3-byte + 320, + # Maximum 3-byte + 65_599 + ] + + for chunk_stream_id <- test_ids do + header = %Header{ + chunk_stream_id: chunk_stream_id, + timestamp: 1000, + body_size: 100, + type_id: 8, + stream_id: 1 + } + + binary = Header.serialize(header) + {deserialized, <<>>} = Header.deserialize(binary, nil) + + assert deserialized.chunk_stream_id == chunk_stream_id, + "Failed round-trip for chunk_stream_id #{chunk_stream_id}" + end + end + end + + describe "Standard Chunk Stream ID encoding (existing behavior)" do + test "deserialize header with standard 6-bit chunk stream ID" do + # This test should pass with current implementation + chunk_stream_id = 5 + timestamp = 1000 + body_size = 256 + type_id = 8 + stream_id = 1 + + binary = + << + 0b00::2, + chunk_stream_id::6, + timestamp::24, + body_size::24, + type_id::8, + stream_id::little-32 + >> + + {header, <<>>} = Header.deserialize(binary, nil) + + assert header.chunk_stream_id == chunk_stream_id + assert header.timestamp == timestamp + assert header.body_size == body_size + end + + test "serialize header with standard chunk stream ID" do + # This should work with current implementation + header = %Header{ + chunk_stream_id: 10, + timestamp: 2000, + body_size: 512, + type_id: 9, + stream_id: 1 + } + + binary = Header.serialize(header) + + expected = + << + 0b00::2, + 10::6, + 2000::24, + 512::24, + 9::8, + 1::little-32 + >> + + assert binary == expected + end + end +end