Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 156 additions & 82 deletions lib/membrane_rtmp_plugin/rtmp/header.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ defmodule Membrane.RTMP.Header do
defmacro type(:amf_data), do: 0x12
defmacro type(:amf_command), do: 0x14

@header_type_0 <<0x0::2>>
@header_type_1 <<0x1::2>>
@header_type_2 <<0x2::2>>
@header_type_3 <<0x3::2>>

@extended_timestamp_marker 0xFFFFFF

@spec new(Keyword.t()) :: t()
Expand All @@ -61,15 +56,56 @@ defmodule Membrane.RTMP.Header do
* `0b10` - same as above plus derives `type_id` and `body_size`
* `0b11` - all values are derived from the previous header with the same `chunk_stream_id`
"""
@spec deserialize(binary(), t() | nil) :: {t(), rest :: binary()} | {:error, :need_more_data}
def deserialize(binary, previous_headers \\ nil)
@type deserialize_error_t ::
:need_more_data
| {:missing_previous_header, chunk_stream_id :: non_neg_integer(), fmt :: atom()}

@spec deserialize(binary(), t() | nil) ::
{t(), rest :: binary()} | {:error, deserialize_error_t()}

def deserialize(<<header_type::2, rest::bitstring>>, previous_headers) do
# chunk basic header is made of the fmt and chunk_stream_id fields.
fmt =
case header_type do
0 -> :type_0
1 -> :type_1
2 -> :type_2
3 -> :type_3
end

{chunk_stream_id, rest} = deserialize_chunk_stream_id(rest)

deserialize_message_header(
%{fmt: fmt, chunk_stream_id: chunk_stream_id},
rest,
previous_headers
)
end

defp deserialize_chunk_stream_id(<<0::6, stream_id::8, rest::binary>>) do
# 2 byte format
{stream_id + 64, rest}
end

defp deserialize_chunk_stream_id(
<<1::6, stream_id_part_2::8, stream_id_part_3::8, rest::binary>>
) do
# 3 byte format
{stream_id_part_3 * 256 + stream_id_part_2 + 64, rest}
end

defp deserialize_chunk_stream_id(<<stream_id::6, rest::binary>>) do
# 1 byte format
{stream_id, rest}
end

# only the deserialization of the 0b00 type can have `nil` previous header
def deserialize(
<<@header_type_0::bitstring, chunk_stream_id::6, timestamp::24, body_size::24, type_id::8,
stream_id::little-integer-size(32), rest::binary>>,
_previous_headers
) do
defp deserialize_message_header(
%{fmt: :type_0, chunk_stream_id: chunk_stream_id},
<<timestamp::24, body_size::24, type_id::8, stream_id::little-integer-size(32),
rest::binary>>,
_previous_headers
) do
with {timestamp, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp) do
header = %__MODULE__{
chunk_stream_id: chunk_stream_id,
Expand All @@ -84,101 +120,139 @@ defmodule Membrane.RTMP.Header do
end
end

def deserialize(
<<@header_type_1::bitstring, chunk_stream_id::6, timestamp_delta::24, body_size::24,
type_id::8, rest::binary>>,
previous_headers
) do
with {timestamp_delta, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp_delta) do
header = %__MODULE__{
chunk_stream_id: chunk_stream_id,
timestamp: previous_headers[chunk_stream_id].timestamp + timestamp_delta,
timestamp_delta: timestamp_delta,
extended_timestamp?: extended_timestamp?,
body_size: body_size,
type_id: type_id,
stream_id: previous_headers[chunk_stream_id].stream_id
}

{header, rest}
end
end
defp deserialize_message_header(
%{fmt: :type_1, chunk_stream_id: chunk_stream_id},
<<timestamp_delta::24, body_size::24, type_id::8, rest::binary>>,
previous_headers
) do
previous_header = previous_headers[chunk_stream_id]

def deserialize(
<<@header_type_2::bitstring, chunk_stream_id::6, timestamp_delta::24, rest::binary>>,
previous_headers
) do
with {timestamp_delta, extended_timestamp?, rest} <- extract_timestamp(rest, timestamp_delta) do
header = %__MODULE__{
chunk_stream_id: chunk_stream_id,
timestamp: previous_headers[chunk_stream_id].timestamp + timestamp_delta,
timestamp_delta: timestamp_delta,
extended_timestamp?: extended_timestamp?,
body_size: previous_headers[chunk_stream_id].body_size,
type_id: previous_headers[chunk_stream_id].type_id,
stream_id: previous_headers[chunk_stream_id].stream_id
}
if previous_header == nil do
{:error, {:missing_previous_header, chunk_stream_id, :type_1}}
else
with {timestamp_delta, extended_timestamp?, rest} <-
extract_timestamp(rest, timestamp_delta) do
header = %__MODULE__{
chunk_stream_id: chunk_stream_id,
timestamp: previous_header.timestamp + timestamp_delta,
timestamp_delta: timestamp_delta,
extended_timestamp?: extended_timestamp?,
body_size: body_size,
type_id: type_id,
stream_id: previous_header.stream_id
}

{header, rest}
{header, rest}
end
end
end

def deserialize(
<<@header_type_3::bitstring, chunk_stream_id::6, rest::binary>>,
previous_headers
) do
%__MODULE__{} = previous_header = previous_headers[chunk_stream_id]
defp deserialize_message_header(
%{fmt: :type_2, chunk_stream_id: chunk_stream_id},
<<timestamp_delta::24, rest::binary>>,
previous_headers
) do
previous_header = previous_headers[chunk_stream_id]

if previous_header.extended_timestamp? do
with {timestamp_delta, _extended_timestamp?, rest} <-
extract_timestamp(rest, @extended_timestamp_marker) do
if previous_header == nil do
{:error, {:missing_previous_header, chunk_stream_id, :type_2}}
else
with {timestamp_delta, extended_timestamp?, rest} <-
extract_timestamp(rest, timestamp_delta) do
header = %__MODULE__{
previous_header
| timestamp: previous_header.timestamp + timestamp_delta,
timestamp_delta: timestamp_delta
chunk_stream_id: chunk_stream_id,
timestamp: previous_header.timestamp + timestamp_delta,
timestamp_delta: timestamp_delta,
extended_timestamp?: extended_timestamp?,
body_size: previous_header.body_size,
type_id: previous_header.type_id,
stream_id: previous_header.stream_id
}

{header, rest}
end
else
header = %__MODULE__{
previous_header
| timestamp: previous_header.timestamp + previous_header.timestamp_delta
}

{header, rest}
end
end

def deserialize(
<<@header_type_0::bitstring, _chunk_stream_id::6, _rest::binary>>,
_prev_header
),
do: {:error, :need_more_data}
defp deserialize_message_header(
%{fmt: :type_3, chunk_stream_id: chunk_stream_id},
<<rest::binary>>,
previous_headers
) do
case previous_headers[chunk_stream_id] do
%__MODULE__{} = previous_header ->
if previous_header.extended_timestamp? do
with {timestamp_delta, _extended_timestamp?, rest} <-
extract_timestamp(rest, @extended_timestamp_marker) do
header = %__MODULE__{
previous_header
| timestamp: previous_header.timestamp + timestamp_delta,
timestamp_delta: timestamp_delta
}

{header, rest}
end
else
header = %__MODULE__{
previous_header
| timestamp: previous_header.timestamp + previous_header.timestamp_delta
}

{header, rest}
end

def deserialize(
<<@header_type_1::bitstring, _chunk_stream_id::6, _rest::binary>>,
_prev_header
),
do: {:error, :need_more_data}
_nil ->
{:error, {:missing_previous_header, chunk_stream_id, :type_3}}
end
end

def deserialize(
<<@header_type_2::bitstring, _chunk_stream_id::6, _rest::binary>>,
_prev_header
),
do: {:error, :need_more_data}
defp deserialize_message_header(_basic_header, _data, _prev_header),
do: {:error, :need_more_data}

@spec serialize(t()) :: binary()
def serialize(%__MODULE__{} = header) do
def serialize(%__MODULE__{chunk_stream_id: chunk_stream_id} = header)
when chunk_stream_id >= 2 and chunk_stream_id <= 63 do
# 1-byte format: fmt (2 bits) + chunk_stream_id (6 bits)
%{
timestamp: timestamp,
body_size: body_size,
type_id: type_id,
stream_id: stream_id
} = header

<<0::2, chunk_stream_id::6, timestamp::24, body_size::24, type_id::8,
stream_id::little-integer-size(32)>>
end

def serialize(%__MODULE__{chunk_stream_id: chunk_stream_id} = header)
when chunk_stream_id >= 64 and chunk_stream_id <= 319 do
# 2-byte format: fmt (2 bits) + marker 0 (6 bits) + (id - 64) (8 bits)
%{
timestamp: timestamp,
body_size: body_size,
type_id: type_id,
stream_id: stream_id
} = header

<<0::2, 0::6, chunk_stream_id - 64::8, timestamp::24, body_size::24, type_id::8,
stream_id::little-integer-size(32)>>
end

def serialize(%__MODULE__{chunk_stream_id: chunk_stream_id} = header)
when chunk_stream_id >= 320 and chunk_stream_id <= 65_599 do
# 3-byte format: fmt (2 bits) + marker 1 (6 bits) + low byte (8 bits) + high byte (8 bits)
%{
chunk_stream_id: chunk_stream_id,
timestamp: timestamp,
body_size: body_size,
type_id: type_id,
stream_id: stream_id
} = header

<<@header_type_0::bitstring, chunk_stream_id::6, timestamp::24, body_size::24, type_id::8,
id_minus_64 = chunk_stream_id - 64
low_byte = rem(id_minus_64, 256)
high_byte = div(id_minus_64, 256)

<<0::2, 1::6, low_byte::8, high_byte::8, timestamp::24, body_size::24, type_id::8,
stream_id::little-integer-size(32)>>
end

Expand Down
Loading