diff --git a/README.md b/README.md index e46c06c..4f0994d 100644 --- a/README.md +++ b/README.md @@ -62,11 +62,18 @@ To use it, just declare it in your lager config: It accepts exactly the same set of options as TCP backend. -### Chunking & compression - -This backend currently doesn't support neither [chunking](http://docs.graylog.org/en/2.4/pages/gelf.html#chunking) -nor [compression](http://docs.graylog.org/en/2.4/pages/gelf.html#compression). This means that too -big log messages won't be probably received by Graylog due to packet fragmentation. +### Chunking + +UDP backend will try to avoid packet fragmentation with using of [chunking](http://docs.graylog.org/en/2.4/pages/gelf.html#chunking). +Chunk size in bytes can be configured with `chunk_size` option (**optional**, default: `1472` minimum: `300`), +According to GELF protocol, maximum number of allowed chunks is 128. +Please, keep in mind that each chunk also has a fixed 12 bytes headers part, +so maximum allowed size of log message equal to (`chunk_size` - 12) x 128. +If message will not fit in that size, it will be dropped and +error log with some debug info about initial message will be sent instead. + +### Compression +This backend currently doesn't support [compression](http://docs.graylog.org/en/2.4/pages/gelf.html#compression). ## GELF formatter diff --git a/src/lager_graylog.app.src b/src/lager_graylog.app.src index 55554f6..1c3e3ad 100644 --- a/src/lager_graylog.app.src +++ b/src/lager_graylog.app.src @@ -8,7 +8,8 @@ kernel, stdlib, backoff, - jiffy + jiffy, + crypto ]}, {env, []}, {maintainers, ["Erlang Solutions", "Heinz N. Gies"]}, diff --git a/src/lager_graylog.erl b/src/lager_graylog.erl index af17548..cec535d 100644 --- a/src/lager_graylog.erl +++ b/src/lager_graylog.erl @@ -1,10 +1,12 @@ -module(lager_graylog). --export_type([host/0, port_number/0, mask/0, backend_option/0]). +-export_type([host/0, port_number/0, mask/0, chunk_size/0, + backend_option/0, udp_backend_option/0]). -type host() :: inet:hostname(). -type port_number() :: inet:port_number(). -type address_family() :: undefined | inet | inet6. +-type chunk_size() :: pos_integer(). %% log level mask - definition taken from lager -type mask() :: {mask, integer()}. @@ -15,3 +17,6 @@ | {level, lager:log_level()} | {formatter, module()} | {formatter_config, term()}. + +-type udp_backend_option() :: backend_option() | + {chunk_size, chunk_size()}. diff --git a/src/lager_graylog_udp_backend.erl b/src/lager_graylog_udp_backend.erl index 0e195a2..1faecf5 100644 --- a/src/lager_graylog_udp_backend.erl +++ b/src/lager_graylog_udp_backend.erl @@ -2,6 +2,8 @@ -behaviour(gen_event). +-include_lib("lager/include/lager.hrl"). + -export([init/1]). -export([handle_call/2]). -export([handle_event/2]). @@ -15,20 +17,30 @@ level := lager_graylog_utils:mask(), host := lager_graylog:host(), port := lager_graylog:port_number(), + chunk_size := lager_graylog:chunk_size(), + machine_id := binary(), socket := socket(), formatter := module(), formatter_config := any(), - formatter_state := any() + formatter_state := any(), + chunks_counter := non_neg_integer() }. +-define(HEADERS_BYTE_SIZE, 12). %% 12 bytes reserved for headers. + %% gen_event callbacks --spec init([lager_graylog:backend_option()]) -> +-spec init([lager_graylog:udp_backend_option()]) -> {ok, state()} | {error, {invalid_opts | gen_udp_open_failed, term()}}. init(Opts) -> case lager_graylog_utils:parse_common_opts(Opts) of {ok, Config} -> - open_socket_and_init_state(Config); + case parse_opts(Opts) of + {ok, UdpBackendConfig} -> + open_socket_and_init_state(maps:merge(Config, UdpBackendConfig)); + {error, Reason} -> + {error, {invalid_opts, Reason}} + end; {error, Reason} -> {error, {invalid_opts, Reason}} end. @@ -47,9 +59,6 @@ handle_call(_Request, State) -> handle_event({log, Message}, #{name := Name, level := Mask, - host := Host, - port := Port, - socket := Socket, formatter := Formatter, formatter_config := FormatterConfig, formatter_state := FormatterState @@ -57,11 +66,23 @@ handle_event({log, Message}, #{name := Name, case lager_util:is_loggable(Message, Mask, Name) of true -> FormattedLog = Formatter:format(Message, FormatterState, FormatterConfig), - gen_udp:send(Socket, Host, Port, FormattedLog); + case send(State, FormattedLog, byte_size(FormattedLog)) of + {ok, NewState} -> {ok, NewState}; + {error, Error} -> + Metadata = lager_msg:metadata(Message), + DebugMeta = + [ + {pid, proplists:get_value(pid, Metadata)}, + {module, proplists:get_value(module, Metadata)}, + {function, proplists:get_value(function, Metadata)}, + {line, proplists:get_value(line, Metadata)} + ], + ?INT_LOG(error, "~p DebugMeta ~p", [Error, DebugMeta]), + {ok, State} + end; false -> - ok - end, - {ok, State}. + {ok, State} + end. handle_info(_, State) -> {ok, State}. @@ -74,13 +95,50 @@ code_change(_OldVsn, State, _Extra) -> %% Helpers --spec open_socket_and_init_state(lager_graylog_utils:common_config()) -> - {ok, state()} | {error, {gen_udp_open_failed, term()}}. +send(#{host := Host, port := Port, socket := Socket, chunk_size := ChunkSize} = State, + Msg, MsgSize) when MsgSize =< ChunkSize -> + gen_udp:send(Socket, Host, Port, Msg), + {ok, State}; +send(#{chunk_size := ChunkSize, machine_id := MachineId, chunks_counter := ChunksCounter} = State, + Msg, MsgSize) -> + PayloadSize = ChunkSize - ?HEADERS_BYTE_SIZE, + ChunksPerPayload = MsgSize / PayloadSize, + WholeChunksNumber = trunc(ChunksPerPayload), + ChunksNumber = + case ChunksPerPayload - WholeChunksNumber == 0 of + true -> WholeChunksNumber; + false -> WholeChunksNumber + 1 + end, + case ChunksNumber > 128 of %% maximum number of chunks in GELF + true -> {error, {message_to_big, [{msg_size, MsgSize}, {chunk_size, ChunkSize}]}}; + false -> + {NewChunksCounter, ChunkId} = generate_chunk_id(MachineId, ChunksCounter), + chunk_send(ChunkId, 0, ChunksNumber, PayloadSize, MsgSize, Msg, State), + {ok, State#{chunks_counter => NewChunksCounter}} + end. + +chunk_send(_ChunkId, _SequenceNumber, _NumberOfChunks, _PayloadSize, _BodyLength, <<>>, _State) -> + ok; +chunk_send(ChunkId, SequenceNumber, NumberOfChunks, PayloadSize, BodyLength, Body, + #{host := Host, port := Port, socket := Socket} = State) -> + CurrentPayloadSize = erlang:min(PayloadSize, BodyLength), + <> = Body, + ChunkData = <<30,15, %% Chunked GELF magic bytes + ChunkId/binary, + SequenceNumber:8/integer, + NumberOfChunks:8/integer, + BodyPart/binary>>, + gen_udp:send(Socket, Host, Port, ChunkData), + chunk_send(ChunkId, SequenceNumber + 1, NumberOfChunks, PayloadSize, + BodyLength - CurrentPayloadSize, Rest, State). + +-spec open_socket_and_init_state(map()) -> {ok, state()} | {error, {gen_udp_open_failed, term()}}. open_socket_and_init_state(Config) -> #{level := Mask, host := Host, port := Port, address_family := AddressFamily, + chunk_size := ChunkSize, formatter := Formatter, formatter_config := FormatterConfig} = Config, case gen_udp:open(0, [binary, {active, false} | extra_open_opts(AddressFamily)]) of @@ -90,6 +148,9 @@ open_socket_and_init_state(Config) -> host => Host, port => Port, socket => Socket, + chunk_size => ChunkSize, + machine_id => crypto:strong_rand_bytes(4), + chunks_counter => 0, formatter => Formatter, formatter_config => FormatterConfig, formatter_state => Formatter:init(FormatterConfig) @@ -99,7 +160,37 @@ open_socket_and_init_state(Config) -> {error, {gen_udp_open_failed, Reason}} end. +generate_chunk_id(MachineId, ChunksCounter) -> + ChunkId = <>, + NewChunksCounter = + case ChunksCounter =:= 4294967295 of + true -> 0; + false -> ChunksCounter + 1 + end, + {NewChunksCounter, ChunkId}. + -spec extra_open_opts(lager_graylog:address_family()) -> [inet:address_family()]. extra_open_opts(undefined) -> []; extra_open_opts(inet) -> [inet]; extra_open_opts(inet6) -> [inet6]. + +parse_opts(Opts) when is_list(Opts) -> + %% 1472 looks like good default value to avoid fragmentation + %% 1500 bytes - 20 bytes for the IPv4 header and 8 bytes for the UDP header + ChunkSize = proplists:get_value(chunk_size, Opts, 1472), + OptsWithDefaults = [{chunk_size, ChunkSize}], + Errors = + lists:foldl( + fun({Opt, OptVal}, ErrorAcc) -> + case validate_opt(Opt, OptVal) of + ok -> ErrorAcc; + Error -> [Error | ErrorAcc] + end + end, [], OptsWithDefaults), + case Errors of + [] -> {ok, maps:from_list(OptsWithDefaults)}; + _ -> {error, Errors} + end. + +validate_opt(chunk_size, ChunkSize) when ChunkSize < 300 -> {error, chunk_size_too_small}; +validate_opt(_Opt, _OptVal) -> ok. diff --git a/src/lager_graylog_utils.erl b/src/lager_graylog_utils.erl index 0fd4fa8..6c238d1 100644 --- a/src/lager_graylog_utils.erl +++ b/src/lager_graylog_utils.erl @@ -30,7 +30,7 @@ parse_common_opts(Opts) when is_list(Opts) -> Host = proplists:get_value(host, Opts), Port = proplists:get_value(port, Opts), AddressFamily = proplists:get_value(address_family, Opts), - Formatter = proplists:get_value(formatter, Opts, lager_graylog_gelf_formatter), + Formatter = proplists:get_value(formatter, Opts, lager_graylog_gelf_formatter), FormatterConfig = proplists:get_value(formatter_config, Opts, []), OptsWithDefaults = [{level, Level}, @@ -81,4 +81,3 @@ validate_config_value(formatter, Formatter) when not is_atom(Formatter) -> {error, {invalid_formatter, Formatter}}; validate_config_value(_, _) -> ok. - diff --git a/test/lager_graylog_udp_backend_SUITE.erl b/test/lager_graylog_udp_backend_SUITE.erl index d46b316..152fbd3 100644 --- a/test/lager_graylog_udp_backend_SUITE.erl +++ b/test/lager_graylog_udp_backend_SUITE.erl @@ -17,7 +17,10 @@ groups() -> test_cases() -> [sends_log_messages_to_configured_endpoint, - doesnt_log_over_configured_level + doesnt_log_over_configured_level, + big_message_should_be_chunked, + too_big_message_should_be_dropped, + chunks_must_fit_in_specified_size ]. init_per_suite(Config) -> @@ -30,8 +33,9 @@ end_per_suite(_) -> init_per_testcase(_, Config) -> {Socket, Port} = open(), - start_lager_handler(Port), - [{socket, Socket}, {port, Port} | Config]. + AdditionalOptions = [{chunk_size, 1372}], + start_lager_handler(Port, AdditionalOptions), + AdditionalOptions ++ [{socket, Socket}, {port, Port} | Config]. end_per_testcase(_, Config) -> stop_lager_handler(?config(port, Config)). @@ -58,11 +62,43 @@ doesnt_log_over_configured_level(Config) -> assert_logged(Logs, [Log1]), assert_not_logged(Logs, [Log2]). +big_message_should_be_chunked(Config) -> + Socket = ?config(socket, Config), + ChunkSize = ?config(chunk_size, Config), + BigMessage = generate_message(ChunkSize * 15 + 100), + Log1 = log(info, BigMessage), + assert_logged([flush_chunked(Socket, 16)], [Log1]). + +too_big_message_should_be_dropped(Config) -> + Socket = ?config(socket, Config), + ChunkSize = ?config(chunk_size, Config), + VeryBigMessage = generate_message(ChunkSize * 129), + log(info, VeryBigMessage), + [#{<<"level">> := 3, <<"_module">> := <<"lager_graylog_udp_backend">>}] = flush(Socket). + +chunks_must_fit_in_specified_size(Config) -> + Socket = ?config(socket, Config), + ChunkSize = ?config(chunk_size, Config), + Message = generate_message(ChunkSize * 5 + 100), + log(info, Message), + Chunks = recv(Socket, 129, 50, []), + %% All chunks but last should be fully loaded + [_LastChunk] = + lists:filter( + fun + (Chunk) when byte_size(Chunk) =:= ChunkSize -> false; + (Chunk) when byte_size(Chunk) < ChunkSize -> true + end, Chunks). + %% Helpers --spec start_lager_handler(inet:port_number()) -> ok. -start_lager_handler(Port) -> - Opts = [{host, ?HOST}, {port, Port}], +generate_message(Length) -> + iolist_to_binary([integer_to_list(crypto:rand_uniform(0, 36), 36) || + _ <- lists:seq(1, Length)]). + +-spec start_lager_handler(inet:port_number(), [lager_graylog:udp_backend_option()]) -> ok. +start_lager_handler(Port, AdditionalOptions) -> + Opts = [{host, ?HOST}, {port, Port} | AdditionalOptions], ok = gen_event:add_handler(lager_event, handler_id(Port), Opts). -spec stop_lager_handler(inet:port_number()) -> ok. @@ -75,13 +111,32 @@ handler_id(Port) -> -spec open() -> {gen_udp:socket(), inet:port_number()}. open() -> - {ok, Socket} = gen_udp:open(0, [binary, + {ok, Socket} = gen_udp:open(0, [ + binary, {ip, ?HOST}, {active, true}, - {reuseaddr, true}]), + {reuseaddr, true}, + {recbuf, 191100} + ]), {ok, Port} = inet:port(Socket), {Socket, Port}. +flush_chunked(Socket, NumberOfChunks) -> + Chunks = recv(Socket, 129, 500, []), + DecodedChunks = + lists:map( + fun(<<30,15, MessageId:8/binary, SequenceNumber:8/integer, + ChunksTotal:8/integer, BodyPart/binary>>) -> + {{MessageId, ChunksTotal}, SequenceNumber , BodyPart} + end, Chunks), + {MessageIdsAndChunkTotals, SequenceNumbers, BodyParts} = lists:unzip3(DecodedChunks), + %% all MessageIds and ChunkTotals shold be the same for one message + 1 = length(lists:usort(MessageIdsAndChunkTotals)), + %% we should receive all chunks (possible reordered) + true = lists:seq(0, NumberOfChunks - 1) =:= lists:sort(SequenceNumbers), + NumberOfChunks = length(Chunks), + jiffy:decode(BodyParts, [return_maps]). + -spec flush(gen_udp:socket()) -> [map()]. flush(Socket) -> Packets = recv(Socket), @@ -89,26 +144,22 @@ flush(Socket) -> -spec recv(gen_udp:socket()) -> ok. recv(Socket) -> - recv(Socket, 10, 10, []). + recv(Socket, 10, 50, []). -spec recv(gen_udp:socket(), Tries :: non_neg_integer(), timeout(), list()) -> [binary()]. recv(Socket, Tries, Timeout, Acc0) when Tries > 0 -> - Acc = - receive - {udp, Socket, _, _, Packet} -> - [Acc0, Packet] - after - Timeout -> - Acc0 - end, - recv(Socket, Tries - 1, Timeout, Acc); -recv(_Socket, 0, _Timeout, Acc) -> - lists:flatten(Acc). + receive + {udp, Socket, _, _, Packet} -> + recv(Socket, Tries - 1, Timeout, [Acc0, Packet]) + after + Timeout -> lists:flatten(Acc0) + end; +recv(_Socket, 0, _Timeout, Acc) -> lists:flatten(Acc). -spec log(atom(), string()) -> pos_integer(). log(Level, Message) -> LogRef = erlang:unique_integer([positive, monotonic]), - lager:log(Level, [{test_log_ref, LogRef}], Message), + lager:log_unsafe(Level, [{test_log_ref, LogRef}], Message, []), LogRef. -spec assert_logged([map()], [pos_integer()]) -> ok | no_return(). @@ -140,4 +191,3 @@ assert_not_logged(Logs, LogRefs) -> _ -> error({found_logs_but_shouldnt, [{log_refs, LogRefs}, {found_logs, FilteredLogs}]}) end. -