Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 150 additions & 81 deletions lib/membrane_rtmp_plugin/rtmp/header.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ defmodule Membrane.RTMP.Header do
defmacro type(:amf_data), do: 0x12
defmacro type(:amf_command), do: 0x14

@header_type_0 <<0x0::2>>
@header_type_1 <<0x1::2>>
@header_type_2 <<0x2::2>>
@header_type_3 <<0x3::2>>

@extended_timestamp_marker 0xFFFFFF

@spec new(Keyword.t()) :: t()
Expand All @@ -62,14 +57,50 @@ defmodule Membrane.RTMP.Header do
* `0b11` - all values are derived from the previous header with the same `chunk_stream_id`
"""
@spec deserialize(binary(), t() | nil) :: {t(), rest :: binary()} | {:error, :need_more_data}
def deserialize(binary, previous_headers \\ nil)

def deserialize(<<header_type::2, rest::bitstring>>, previous_headers) do
# chunk basic header is made of the fmt and chunk_stream_id fields.
fmt =
case header_type do
0 -> :type_0
1 -> :type_1
2 -> :type_2
3 -> :type_3
end

{chunk_stream_id, rest} = deserialize_chunk_stream_id(rest)

deserialize_message_header(
%{fmt: fmt, chunk_stream_id: chunk_stream_id},
rest,
previous_headers
)
end

defp deserialize_chunk_stream_id(<<0::6, stream_id::8, rest::binary>>) do
# 2 byte format
{stream_id + 64, rest}
end

defp deserialize_chunk_stream_id(
<<1::6, stream_id_part_2::8, stream_id_part_3::8, rest::binary>>
) do
# 3 byte format
{stream_id_part_3 * 256 + stream_id_part_2 + 64, rest}
end

defp deserialize_chunk_stream_id(<<stream_id::6, rest::binary>>) do
# 1 byte format
{stream_id, rest}
end

# only the deserialization of the 0b00 type can have `nil` previous header
def deserialize(
<<@header_type_0::bitstring, chunk_stream_id::6, timestamp::24, body_size::24, type_id::8,
stream_id::little-integer-size(32), rest::binary>>,
_previous_headers
) do
defp deserialize_message_header(
%{fmt: :type_0, chunk_stream_id: chunk_stream_id},
<<timestamp::24, body_size::24, type_id::8, stream_id::little-integer-size(32),
rest::binary>>,
_previous_headers
) do
with {timestamp, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp) do
header = %__MODULE__{
chunk_stream_id: chunk_stream_id,
Expand All @@ -84,101 +115,139 @@ defmodule Membrane.RTMP.Header do
end
end

def deserialize(
<<@header_type_1::bitstring, chunk_stream_id::6, timestamp_delta::24, body_size::24,
type_id::8, rest::binary>>,
previous_headers
) do
with {timestamp_delta, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp_delta) do
header = %__MODULE__{
chunk_stream_id: chunk_stream_id,
timestamp: previous_headers[chunk_stream_id].timestamp + timestamp_delta,
timestamp_delta: timestamp_delta,
extended_timestamp?: extended_timestamp?,
body_size: body_size,
type_id: type_id,
stream_id: previous_headers[chunk_stream_id].stream_id
}

{header, rest}
end
end
defp deserialize_message_header(
%{fmt: :type_1, chunk_stream_id: chunk_stream_id},
<<timestamp_delta::24, body_size::24, type_id::8, rest::binary>>,
previous_headers
) do
previous_header = previous_headers[chunk_stream_id]

def deserialize(
<<@header_type_2::bitstring, chunk_stream_id::6, timestamp_delta::24, rest::binary>>,
previous_headers
) do
with {timestamp_delta, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp_delta) do
header = %__MODULE__{
chunk_stream_id: chunk_stream_id,
timestamp: previous_headers[chunk_stream_id].timestamp + timestamp_delta,
timestamp_delta: timestamp_delta,
extended_timestamp?: extended_timestamp?,
body_size: previous_headers[chunk_stream_id].body_size,
type_id: previous_headers[chunk_stream_id].type_id,
stream_id: previous_headers[chunk_stream_id].stream_id
}
if previous_header == nil do
{:error, {:missing_previous_header, chunk_stream_id, :type_1}}
else
with {timestamp_delta, extended_timestamp?, rest} <-
extract_timestamp(rest, timestamp_delta) do
header = %__MODULE__{
chunk_stream_id: chunk_stream_id,
timestamp: previous_header.timestamp + timestamp_delta,
timestamp_delta: timestamp_delta,
extended_timestamp?: extended_timestamp?,
body_size: body_size,
type_id: type_id,
stream_id: previous_header.stream_id
}

{header, rest}
{header, rest}
end
end
end

def deserialize(
<<@header_type_3::bitstring, chunk_stream_id::6, rest::binary>>,
previous_headers
) do
%__MODULE__{} = previous_header = previous_headers[chunk_stream_id]
defp deserialize_message_header(
%{fmt: :type_2, chunk_stream_id: chunk_stream_id},
<<timestamp_delta::24, rest::binary>>,
previous_headers
) do
previous_header = previous_headers[chunk_stream_id]

if previous_header.extended_timestamp? do
with {timestamp_delta, _extended_timestamp?, rest} <-
extract_timestamp(rest, @extended_timestamp_marker) do
if previous_header == nil do
{:error, {:missing_previous_header, chunk_stream_id, :type_2}}
else
with {timestamp_delta, extended_timestamp?, rest} <-
extract_timestamp(rest, timestamp_delta) do
header = %__MODULE__{
previous_header
| timestamp: previous_header.timestamp + timestamp_delta,
timestamp_delta: timestamp_delta
chunk_stream_id: chunk_stream_id,
timestamp: previous_header.timestamp + timestamp_delta,
timestamp_delta: timestamp_delta,
extended_timestamp?: extended_timestamp?,
body_size: previous_header.body_size,
type_id: previous_header.type_id,
stream_id: previous_header.stream_id
}

{header, rest}
end
else
header = %__MODULE__{
previous_header
| timestamp: previous_header.timestamp + previous_header.timestamp_delta
}

{header, rest}
end
end

def deserialize(
<<@header_type_0::bitstring, _chunk_stream_id::6, _rest::binary>>,
_prev_header
),
do: {:error, :need_more_data}
defp deserialize_message_header(
%{fmt: :type_3, chunk_stream_id: chunk_stream_id},
<<rest::binary>>,
previous_headers
) do
case previous_headers[chunk_stream_id] do
nil ->
{:error, {:missing_previous_header, chunk_stream_id, :type_3}}

%__MODULE__{} = previous_header ->
if previous_header.extended_timestamp? do
with {timestamp_delta, _extended_timestamp?, rest} <-
extract_timestamp(rest, @extended_timestamp_marker) do
header = %__MODULE__{
previous_header
| timestamp: previous_header.timestamp + timestamp_delta,
timestamp_delta: timestamp_delta
}

{header, rest}
end
else
header = %__MODULE__{
previous_header
| timestamp: previous_header.timestamp + previous_header.timestamp_delta
}

def deserialize(
<<@header_type_1::bitstring, _chunk_stream_id::6, _rest::binary>>,
_prev_header
),
do: {:error, :need_more_data}
{header, rest}
end
end
end

def deserialize(
<<@header_type_2::bitstring, _chunk_stream_id::6, _rest::binary>>,
_prev_header
),
do: {:error, :need_more_data}
defp deserialize_message_header(_basic_header, _data, _prev_header),
do: {:error, :need_more_data}

@spec serialize(t()) :: binary()
def serialize(%__MODULE__{} = header) do
def serialize(%__MODULE__{chunk_stream_id: chunk_stream_id} = header)
when chunk_stream_id >= 2 and chunk_stream_id <= 63 do
# 1-byte format: fmt (2 bits) + chunk_stream_id (6 bits)
%{
timestamp: timestamp,
body_size: body_size,
type_id: type_id,
stream_id: stream_id
} = header

<<0::2, chunk_stream_id::6, timestamp::24, body_size::24, type_id::8,
stream_id::little-integer-size(32)>>
end

def serialize(%__MODULE__{chunk_stream_id: chunk_stream_id} = header)
when chunk_stream_id >= 64 and chunk_stream_id <= 319 do
# 2-byte format: fmt (2 bits) + marker 0 (6 bits) + (id - 64) (8 bits)
%{
timestamp: timestamp,
body_size: body_size,
type_id: type_id,
stream_id: stream_id
} = header

<<0::2, 0::6, chunk_stream_id - 64::8, timestamp::24, body_size::24, type_id::8,
stream_id::little-integer-size(32)>>
end

def serialize(%__MODULE__{chunk_stream_id: chunk_stream_id} = header)
when chunk_stream_id >= 320 and chunk_stream_id <= 65_599 do
# 3-byte format: fmt (2 bits) + marker 1 (6 bits) + low byte (8 bits) + high byte (8 bits)
%{
chunk_stream_id: chunk_stream_id,
timestamp: timestamp,
body_size: body_size,
type_id: type_id,
stream_id: stream_id
} = header

<<@header_type_0::bitstring, chunk_stream_id::6, timestamp::24, body_size::24, type_id::8,
id_minus_64 = chunk_stream_id - 64
low_byte = rem(id_minus_64, 256)
high_byte = div(id_minus_64, 256)

<<0::2, 1::6, low_byte::8, high_byte::8, timestamp::24, body_size::24, type_id::8,
stream_id::little-integer-size(32)>>
end

Expand Down
Loading