Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,18 @@ To use it, just declare it in your lager config:

It accepts exactly the same set of options as TCP backend.

### Chunking & compression

This backend currently doesn't support neither [chunking](http://docs.graylog.org/en/2.4/pages/gelf.html#chunking)
nor [compression](http://docs.graylog.org/en/2.4/pages/gelf.html#compression). This means that too
big log messages won't be probably received by Graylog due to packet fragmentation.
### Chunking

UDP backend will try to avoid packet fragmentation with using of [chunking](http://docs.graylog.org/en/2.4/pages/gelf.html#chunking).
Chunk size in bytes can be configured with `chunk_size` option (**optional**, default: `1472` minimum: `300`),
According to GELF protocol, maximum number of allowed chunks is 128.
Please, keep in mind that each chunk also has a fixed 12 bytes headers part,
so maximum allowed size of log message equal to (`chunk_size` - 12) x 128.
If message will not fit in that size, it will be dropped and
error log with some debug info about initial message will be sent instead.

### Compression
This backend currently doesn't support [compression](http://docs.graylog.org/en/2.4/pages/gelf.html#compression).

## GELF formatter

Expand Down
3 changes: 2 additions & 1 deletion src/lager_graylog.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
kernel,
stdlib,
backoff,
jiffy
jiffy,
crypto
]},
{env, []},
{maintainers, ["Erlang Solutions", "Heinz N. Gies"]},
Expand Down
7 changes: 6 additions & 1 deletion src/lager_graylog.erl
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
-module(lager_graylog).

-export_type([host/0, port_number/0, mask/0, backend_option/0]).
-export_type([host/0, port_number/0, mask/0, chunk_size/0,
backend_option/0, udp_backend_option/0]).

-type host() :: inet:hostname().
-type port_number() :: inet:port_number().
-type address_family() :: undefined | inet | inet6.
-type chunk_size() :: pos_integer().

%% log level mask - definition taken from lager
-type mask() :: {mask, integer()}.
Expand All @@ -15,3 +17,6 @@
| {level, lager:log_level()}
| {formatter, module()}
| {formatter_config, term()}.

-type udp_backend_option() :: backend_option() |
{chunk_size, chunk_size()}.
115 changes: 103 additions & 12 deletions src/lager_graylog_udp_backend.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

-behaviour(gen_event).

-include_lib("lager/include/lager.hrl").

-export([init/1]).
-export([handle_call/2]).
-export([handle_event/2]).
Expand All @@ -15,20 +17,30 @@
level := lager_graylog_utils:mask(),
host := lager_graylog:host(),
port := lager_graylog:port_number(),
chunk_size := lager_graylog:chunk_size(),
machine_id := binary(),
socket := socket(),
formatter := module(),
formatter_config := any(),
formatter_state := any()
formatter_state := any(),
chunks_counter := non_neg_integer()
}.

-define(HEADERS_BYTE_SIZE, 12). %% 12 bytes reserved for headers.

%% gen_event callbacks

-spec init([lager_graylog:backend_option()]) ->
-spec init([lager_graylog:udp_backend_option()]) ->
{ok, state()} | {error, {invalid_opts | gen_udp_open_failed, term()}}.
init(Opts) ->
case lager_graylog_utils:parse_common_opts(Opts) of
{ok, Config} ->
open_socket_and_init_state(Config);
case parse_opts(Opts) of
{ok, UdpBackendConfig} ->
open_socket_and_init_state(maps:merge(Config, UdpBackendConfig));
{error, Reason} ->
{error, {invalid_opts, Reason}}
end;
{error, Reason} ->
{error, {invalid_opts, Reason}}
end.
Expand All @@ -47,21 +59,30 @@ handle_call(_Request, State) ->

handle_event({log, Message}, #{name := Name,
level := Mask,
host := Host,
port := Port,
socket := Socket,
formatter := Formatter,
formatter_config := FormatterConfig,
formatter_state := FormatterState
} = State) ->
case lager_util:is_loggable(Message, Mask, Name) of
true ->
FormattedLog = Formatter:format(Message, FormatterState, FormatterConfig),
gen_udp:send(Socket, Host, Port, FormattedLog);
case send(State, FormattedLog, byte_size(FormattedLog)) of
{ok, NewState} -> {ok, NewState};
{error, Error} ->
Metadata = lager_msg:metadata(Message),
DebugMeta =
[
{pid, proplists:get_value(pid, Metadata)},
{module, proplists:get_value(module, Metadata)},
{function, proplists:get_value(function, Metadata)},
{line, proplists:get_value(line, Metadata)}
],
?INT_LOG(error, "~p DebugMeta ~p", [Error, DebugMeta]),
{ok, State}
end;
false ->
ok
end,
{ok, State}.
{ok, State}
end.

handle_info(_, State) ->
{ok, State}.
Expand All @@ -74,13 +95,50 @@ code_change(_OldVsn, State, _Extra) ->

%% Helpers

-spec open_socket_and_init_state(lager_graylog_utils:common_config()) ->
{ok, state()} | {error, {gen_udp_open_failed, term()}}.
send(#{host := Host, port := Port, socket := Socket, chunk_size := ChunkSize} = State,
Msg, MsgSize) when MsgSize =< ChunkSize ->
gen_udp:send(Socket, Host, Port, Msg),
{ok, State};
send(#{chunk_size := ChunkSize, machine_id := MachineId, chunks_counter := ChunksCounter} = State,
Msg, MsgSize) ->
PayloadSize = ChunkSize - ?HEADERS_BYTE_SIZE,
ChunksPerPayload = MsgSize / PayloadSize,
WholeChunksNumber = trunc(ChunksPerPayload),
ChunksNumber =
case ChunksPerPayload - WholeChunksNumber == 0 of
true -> WholeChunksNumber;
false -> WholeChunksNumber + 1
end,
case ChunksNumber > 128 of %% maximum number of chunks in GELF
true -> {error, {message_to_big, [{msg_size, MsgSize}, {chunk_size, ChunkSize}]}};
false ->
{NewChunksCounter, ChunkId} = generate_chunk_id(MachineId, ChunksCounter),
chunk_send(ChunkId, 0, ChunksNumber, PayloadSize, MsgSize, Msg, State),
{ok, State#{chunks_counter => NewChunksCounter}}
end.

chunk_send(_ChunkId, _SequenceNumber, _NumberOfChunks, _PayloadSize, _BodyLength, <<>>, _State) ->
ok;
chunk_send(ChunkId, SequenceNumber, NumberOfChunks, PayloadSize, BodyLength, Body,
#{host := Host, port := Port, socket := Socket} = State) ->
CurrentPayloadSize = erlang:min(PayloadSize, BodyLength),
<<BodyPart:CurrentPayloadSize/binary, Rest/binary>> = Body,
ChunkData = <<30,15, %% Chunked GELF magic bytes
ChunkId/binary,
SequenceNumber:8/integer,
NumberOfChunks:8/integer,
BodyPart/binary>>,
gen_udp:send(Socket, Host, Port, ChunkData),
chunk_send(ChunkId, SequenceNumber + 1, NumberOfChunks, PayloadSize,
BodyLength - CurrentPayloadSize, Rest, State).

-spec open_socket_and_init_state(map()) -> {ok, state()} | {error, {gen_udp_open_failed, term()}}.
open_socket_and_init_state(Config) ->
#{level := Mask,
host := Host,
port := Port,
address_family := AddressFamily,
chunk_size := ChunkSize,
formatter := Formatter,
formatter_config := FormatterConfig} = Config,
case gen_udp:open(0, [binary, {active, false} | extra_open_opts(AddressFamily)]) of
Expand All @@ -90,6 +148,9 @@ open_socket_and_init_state(Config) ->
host => Host,
port => Port,
socket => Socket,
chunk_size => ChunkSize,
machine_id => crypto:strong_rand_bytes(4),
chunks_counter => 0,
formatter => Formatter,
formatter_config => FormatterConfig,
formatter_state => Formatter:init(FormatterConfig)
Expand All @@ -99,7 +160,37 @@ open_socket_and_init_state(Config) ->
{error, {gen_udp_open_failed, Reason}}
end.

generate_chunk_id(MachineId, ChunksCounter) ->
ChunkId = <<MachineId/binary, ChunksCounter:4/unsigned-integer-unit:8>>,
NewChunksCounter =
case ChunksCounter =:= 4294967295 of
true -> 0;
false -> ChunksCounter + 1
end,
{NewChunksCounter, ChunkId}.

-spec extra_open_opts(lager_graylog:address_family()) -> [inet:address_family()].
extra_open_opts(undefined) -> [];
extra_open_opts(inet) -> [inet];
extra_open_opts(inet6) -> [inet6].

parse_opts(Opts) when is_list(Opts) ->
%% 1472 looks like good default value to avoid fragmentation
%% 1500 bytes - 20 bytes for the IPv4 header and 8 bytes for the UDP header
ChunkSize = proplists:get_value(chunk_size, Opts, 1472),
OptsWithDefaults = [{chunk_size, ChunkSize}],
Errors =
lists:foldl(
fun({Opt, OptVal}, ErrorAcc) ->
case validate_opt(Opt, OptVal) of
ok -> ErrorAcc;
Error -> [Error | ErrorAcc]
end
end, [], OptsWithDefaults),
case Errors of
[] -> {ok, maps:from_list(OptsWithDefaults)};
_ -> {error, Errors}
end.

validate_opt(chunk_size, ChunkSize) when ChunkSize < 300 -> {error, chunk_size_too_small};
validate_opt(_Opt, _OptVal) -> ok.
3 changes: 1 addition & 2 deletions src/lager_graylog_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ parse_common_opts(Opts) when is_list(Opts) ->
Host = proplists:get_value(host, Opts),
Port = proplists:get_value(port, Opts),
AddressFamily = proplists:get_value(address_family, Opts),
Formatter = proplists:get_value(formatter, Opts, lager_graylog_gelf_formatter),
Formatter = proplists:get_value(formatter, Opts, lager_graylog_gelf_formatter),
FormatterConfig = proplists:get_value(formatter_config, Opts, []),

OptsWithDefaults = [{level, Level},
Expand Down Expand Up @@ -81,4 +81,3 @@ validate_config_value(formatter, Formatter) when not is_atom(Formatter) ->
{error, {invalid_formatter, Formatter}};
validate_config_value(_, _) ->
ok.

94 changes: 72 additions & 22 deletions test/lager_graylog_udp_backend_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ groups() ->

test_cases() ->
[sends_log_messages_to_configured_endpoint,
doesnt_log_over_configured_level
doesnt_log_over_configured_level,
big_message_should_be_chunked,
too_big_message_should_be_dropped,
chunks_must_fit_in_specified_size
].

init_per_suite(Config) ->
Expand All @@ -30,8 +33,9 @@ end_per_suite(_) ->

init_per_testcase(_, Config) ->
{Socket, Port} = open(),
start_lager_handler(Port),
[{socket, Socket}, {port, Port} | Config].
AdditionalOptions = [{chunk_size, 1372}],
start_lager_handler(Port, AdditionalOptions),
AdditionalOptions ++ [{socket, Socket}, {port, Port} | Config].

end_per_testcase(_, Config) ->
stop_lager_handler(?config(port, Config)).
Expand All @@ -58,11 +62,43 @@ doesnt_log_over_configured_level(Config) ->
assert_logged(Logs, [Log1]),
assert_not_logged(Logs, [Log2]).

big_message_should_be_chunked(Config) ->
Socket = ?config(socket, Config),
ChunkSize = ?config(chunk_size, Config),
BigMessage = generate_message(ChunkSize * 15 + 100),
Log1 = log(info, BigMessage),
assert_logged([flush_chunked(Socket, 16)], [Log1]).

too_big_message_should_be_dropped(Config) ->
Socket = ?config(socket, Config),
ChunkSize = ?config(chunk_size, Config),
VeryBigMessage = generate_message(ChunkSize * 129),
log(info, VeryBigMessage),
[#{<<"level">> := 3, <<"_module">> := <<"lager_graylog_udp_backend">>}] = flush(Socket).

chunks_must_fit_in_specified_size(Config) ->
Socket = ?config(socket, Config),
ChunkSize = ?config(chunk_size, Config),
Message = generate_message(ChunkSize * 5 + 100),
log(info, Message),
Chunks = recv(Socket, 129, 50, []),
%% All chunks but last should be fully loaded
[_LastChunk] =
lists:filter(
fun
(Chunk) when byte_size(Chunk) =:= ChunkSize -> false;
(Chunk) when byte_size(Chunk) < ChunkSize -> true
end, Chunks).

%% Helpers

-spec start_lager_handler(inet:port_number()) -> ok.
start_lager_handler(Port) ->
Opts = [{host, ?HOST}, {port, Port}],
generate_message(Length) ->
iolist_to_binary([integer_to_list(crypto:rand_uniform(0, 36), 36) ||
_ <- lists:seq(1, Length)]).

-spec start_lager_handler(inet:port_number(), [lager_graylog:udp_backend_option()]) -> ok.
start_lager_handler(Port, AdditionalOptions) ->
Opts = [{host, ?HOST}, {port, Port} | AdditionalOptions],
ok = gen_event:add_handler(lager_event, handler_id(Port), Opts).

-spec stop_lager_handler(inet:port_number()) -> ok.
Expand All @@ -75,40 +111,55 @@ handler_id(Port) ->

-spec open() -> {gen_udp:socket(), inet:port_number()}.
open() ->
{ok, Socket} = gen_udp:open(0, [binary,
{ok, Socket} = gen_udp:open(0, [
binary,
{ip, ?HOST},
{active, true},
{reuseaddr, true}]),
{reuseaddr, true},
{recbuf, 191100}
]),
{ok, Port} = inet:port(Socket),
{Socket, Port}.

flush_chunked(Socket, NumberOfChunks) ->
Chunks = recv(Socket, 129, 500, []),
DecodedChunks =
lists:map(
fun(<<30,15, MessageId:8/binary, SequenceNumber:8/integer,
ChunksTotal:8/integer, BodyPart/binary>>) ->
{{MessageId, ChunksTotal}, SequenceNumber , BodyPart}
end, Chunks),
{MessageIdsAndChunkTotals, SequenceNumbers, BodyParts} = lists:unzip3(DecodedChunks),
%% all MessageIds and ChunkTotals shold be the same for one message
1 = length(lists:usort(MessageIdsAndChunkTotals)),
%% we should receive all chunks (possible reordered)
true = lists:seq(0, NumberOfChunks - 1) =:= lists:sort(SequenceNumbers),
NumberOfChunks = length(Chunks),
jiffy:decode(BodyParts, [return_maps]).

-spec flush(gen_udp:socket()) -> [map()].
flush(Socket) ->
Packets = recv(Socket),
[jiffy:decode(Packet, [return_maps]) || Packet <- Packets].

-spec recv(gen_udp:socket()) -> ok.
recv(Socket) ->
recv(Socket, 10, 10, []).
recv(Socket, 10, 50, []).

-spec recv(gen_udp:socket(), Tries :: non_neg_integer(), timeout(), list()) -> [binary()].
recv(Socket, Tries, Timeout, Acc0) when Tries > 0 ->
Acc =
receive
{udp, Socket, _, _, Packet} ->
[Acc0, Packet]
after
Timeout ->
Acc0
end,
recv(Socket, Tries - 1, Timeout, Acc);
recv(_Socket, 0, _Timeout, Acc) ->
lists:flatten(Acc).
receive
{udp, Socket, _, _, Packet} ->
recv(Socket, Tries - 1, Timeout, [Acc0, Packet])
after
Timeout -> lists:flatten(Acc0)
end;
recv(_Socket, 0, _Timeout, Acc) -> lists:flatten(Acc).

-spec log(atom(), string()) -> pos_integer().
log(Level, Message) ->
LogRef = erlang:unique_integer([positive, monotonic]),
lager:log(Level, [{test_log_ref, LogRef}], Message),
lager:log_unsafe(Level, [{test_log_ref, LogRef}], Message, []),
LogRef.

-spec assert_logged([map()], [pos_integer()]) -> ok | no_return().
Expand Down Expand Up @@ -140,4 +191,3 @@ assert_not_logged(Logs, LogRefs) ->
_ ->
error({found_logs_but_shouldnt, [{log_refs, LogRefs}, {found_logs, FilteredLogs}]})
end.