diff --git a/src/merge_index.app.src b/src/merge_index.app.src index 0a8696b..7c95d11 100644 --- a/src/merge_index.app.src +++ b/src/merge_index.app.src @@ -14,6 +14,9 @@ {compact_mod_fun, {mi_segment, compact_by_average}}, {compact_staleness_threshold, {1, hours}}, {max_compact_segments, 20}, + {segment_similarity_ratio, 0.5}, + {min_segment_size, 50000000}, + {compaction_throughput_mb_per_sec, 30}, {segment_query_read_ahead_size, 65536}, {segment_compact_read_ahead_size, 5242880}, {segment_file_buffer_size, 20971520}, diff --git a/src/mi_locks.erl b/src/mi_locks.erl index 7ae0944..f6a91df 100644 --- a/src/mi_locks.erl +++ b/src/mi_locks.erl @@ -29,58 +29,71 @@ claim/2, claim_many/2, release/2, - when_free/3 + when_free/3, + claim_compact/2, + release_compact/2, + is_compact_free/2 ]). -record(lock, { - key, count, + not_compacting=true, funs=[] }). -new() -> []. +new() -> dict:new(). claim_many(Keys, Locks) -> lists:foldl(fun claim/2, Locks, Keys). claim(Key, Locks) -> - case lists:keyfind(Key, #lock.key, Locks) of - Lock = #lock { count=Count } -> - NewLock = Lock#lock { count=Count + 1 }, - lists:keystore(Key, #lock.key, Locks, NewLock); - - false -> - NewLock = #lock { key=Key, count=1, funs=[] }, - lists:keystore(Key, #lock.key, Locks, NewLock) + case dict:find(Key,Locks) of + {ok,#lock{count=Count}=Lock} -> + dict:store(Key,Lock#lock{count=Count + 1},Locks); + error -> + dict:store(Key,#lock{count=1,funs=[]},Locks) end. release(Key, Locks) -> - case lists:keyfind(Key, #lock.key, Locks) of - #lock { count=1, funs=Funs } -> + case dict:find(Key,Locks) of + {ok,#lock{count=1,funs=Funs}} -> [X() || X <- Funs], - lists:keydelete(Key, #lock.key, Locks); - - Lock = #lock { count=Count } -> - NewLock = Lock#lock { count = Count - 1 }, - lists:keystore(Key, #lock.key, Locks, NewLock); - - false -> + dict:erase(Key,Locks); + {ok,#lock{count=Count}=Lock} -> + dict:store(Key,Lock#lock{count=Count - 1},Locks); + error -> throw({lock_does_not_exist, Key}) end. %% Run the provided function when the key is free. If the key is %% currently free, then this is run immeditaely. when_free(Key, Fun, Locks) -> - case lists:keyfind(Key, #lock.key, Locks) of - false -> + case dict:find(Key,Locks) of + error -> Fun(), Locks; - - #lock { count=0, funs=Funs } -> + {ok,#lock{count=0,funs=Funs}} -> [X() || X <- [Fun|Funs]], - lists:keydelete(Key, #lock.key, Locks); + dict:erase(Key,Locks); + {ok,#lock{funs=Funs}=Lock} -> + dict:store(Key,Lock#lock{funs=[Fun|Funs]},Locks) + end. + +claim_compact(Key,Locks) -> + case dict:find(Key,Locks) of + {ok,#lock{count=Count}=Lock} -> dict:store(Key,Lock#lock{not_compacting=false,count=Count+1},Locks); + error -> dict:store(Key,#lock{count=1,funs=[],not_compacting=false},Locks) + end. + +release_compact(Key,Locks) -> + NewLocks = case dict:find(Key,Locks) of + {ok,Lock} -> dict:store(Key,Lock#lock{not_compacting=true},Locks); + error -> throw({lock_does_not_exist, Key}) + end, + release(Key,NewLocks). - Lock = #lock { funs=Funs} -> - NewLock = Lock#lock { funs=[Fun|Funs] }, - lists:keystore(Key, #lock.key, Locks, NewLock) +is_compact_free(Key,Locks) -> + case dict:find(Key,Locks) of + {ok,Lock} -> Lock#lock.not_compacting; + error -> true end. diff --git a/src/mi_scheduler.erl b/src/mi_scheduler.erl index ef8d974..fd24edb 100644 --- a/src/mi_scheduler.erl +++ b/src/mi_scheduler.erl @@ -26,7 +26,10 @@ -export([ start_link/0, start/0, - schedule_compaction/1 + schedule_compaction/1, + ms_before_replace/2, + new_timering/1, + replace_oldest/1 ]). %% Private export -export([worker_loop/1]). @@ -38,6 +41,7 @@ terminate/2, code_change/3]). -record(state, { queue, + ready, worker }). %% ==================================================================== @@ -67,11 +71,16 @@ init([]) -> %% and we want to ensure that our message queue doesn't fill up with %% a bunch of dup requests for the same directory. Self = self(), - WorkerPid = spawn_link(fun() -> worker_loop(Self) end), + WorkerPid = spawn_link(fun() -> worker_loop(worker_init_state(Self)) end), {ok, #state{ queue = queue:new(), - worker = WorkerPid }}. - -handle_call({schedule_compaction, Pid}, _From, #state { queue = Q } = State) -> + worker = WorkerPid, + ready = true}}. + +handle_call({schedule_compaction, Pid}, _From, #state { ready = true, worker = WorkerPid } = State) -> + WorkerPid ! {compaction, Pid}, + {reply, ok, State#state {ready = false}}; + +handle_call({schedule_compaction, Pid}, _From, #state { ready = false, queue = Q } = State) -> case queue:member(Pid, Q) of true -> {reply, already_queued, State}; @@ -91,18 +100,18 @@ handle_cast(Msg, State) -> handle_info({worker_ready, WorkerPid}, #state { queue = Q } = State) -> case queue:out(Q) of {empty, Q} -> - {noreply, State}; + {noreply, State#state{ ready = true }}; {{value, Pid}, NewQ} -> WorkerPid ! {compaction, Pid}, - NewState = State#state { queue=NewQ }, + NewState = State#state { queue=NewQ , ready = false }, {noreply, NewState} end; handle_info({'EXIT', WorkerPid, Reason}, #state { worker = WorkerPid } = State) -> lager:error("Compaction worker ~p exited: ~p", [WorkerPid, Reason]), %% Start a new worker. - Self=self(), - NewWorkerPid = spawn_link(fun() -> worker_loop(Self) end), - NewState = State#state { worker=NewWorkerPid }, + Self = self(), + NewWorkerPid = spawn_link(fun() -> worker_loop(worker_init_state(Self)) end), + NewState = State#state { worker=NewWorkerPid , ready = true}, {noreply, NewState}; handle_info(Info, State) -> @@ -118,32 +127,72 @@ code_change(_OldVsn, State, _Extra) -> %% ==================================================================== %% Internal worker %% ==================================================================== - -worker_loop(Parent) -> - Parent ! {worker_ready, self()}, +-define(TIMERING_SIZE, 10). +-define(TIMERING_AJUST_EVERY, 5). + +-record(wstate, { parent, + timering, + timering_span, + test_start, + test_compactions}). + +%% THROTTLING: Run M processes in N milliseconds waiting ms_before_replace(Ring,N) before doing something +%% and replace_oldest (impl is a set of size M of timestamps representing running processes. +%% Replace the oldest one if older than N seconds else wait (oldest_ts-Nsec) : {Ts_Set, Oldest_Idx} +new_timering(M)->{erlang:make_tuple(M,now()),1}. +replace_oldest({Set,Idx})-> + {erlang:setelement(Idx,Set,now()),case Idx+1 of X when X>erlang:size(Set) ->1; X->X end}. +ms_before_replace({Set,Idx},N)-> + max(0,N - trunc(timer:now_diff(now(),element(Idx,Set))/1000)). + +worker_init_state(Parent)-> + #wstate{parent=Parent, timering=new_timering(?TIMERING_SIZE), + timering_span=1000,test_start=now(),test_compactions=[]}. + +worker_loop(#wstate{timering_span=TimeRingSpan,test_start=TestStart, + test_compactions=TestCompactions}=State) when length(TestCompactions)==(?TIMERING_SIZE*?TIMERING_AJUST_EVERY) -> + {ok,WantedThroughput} = application:get_env(merge_index, compaction_throughput_mb_per_sec), + TestBytes = lists:sum([OldBytes || {ok, _, OldBytes}<-TestCompactions]), + TestSegments = lists:sum([OldSegments || {ok, OldSegments, _}<-TestCompactions]), + TestElapsedMSecs = timer:now_diff(os:timestamp(), TestStart) / 1000, + ThroughputBms = TestBytes/TestElapsedMSecs, + WantedThroughputBms = WantedThroughput *1024*1024 / 1000, + lager:info("Overall Compaction: ~p segments for ~p MBytes in ~.2f seconds, ~.2f MB/sec", + [TestSegments, TestBytes/(1024*1024), TestElapsedMSecs/1000, (ThroughputBms*1000) / (1024*1024)]), + %% We need to adjust timering span window in order to have the good throughput + %% ensure a kind of continuity not allowing more than 30% adjustment + Span = case trunc(TestBytes/WantedThroughputBms/?TIMERING_AJUST_EVERY)+1 of + NewTime when NewTime < TimeRingSpan -> max(trunc(TimeRingSpan*0.3)+1,NewTime); + NewTime -> min(trunc(TimeRingSpan*1.3)+1,NewTime) + end, + lager:info("Adjust throttling to have ~p compaction every ~p milliseconds",[?TIMERING_SIZE,Span]), + worker_loop(State#wstate{timering_span=Span,test_start=now(),test_compactions=[]}); + +worker_loop(#wstate{parent=Parent,timering=TimeRing, + timering_span=TimeRingSpan, test_compactions=TestCompactions}=State) -> + Worker = self(), receive + {compaction_res,Result}-> + ?MODULE:worker_loop(State#wstate{test_compactions=[Result|TestCompactions]}); {compaction, Pid} -> - Start = os:timestamp(), - Result = merge_index:compact(Pid), - ElapsedSecs = timer:now_diff(os:timestamp(), Start) / 1000000, - case Result of - {ok, OldSegments, OldBytes} -> - case ElapsedSecs > 1 of - true -> - lager:info( - "Pid ~p compacted ~p segments for ~p bytes in ~p seconds, ~.2f MB/sec", - [Pid, OldSegments, OldBytes, ElapsedSecs, OldBytes/ElapsedSecs/(1024*1024)]); - false -> - ok - end; - - {Error, Reason} when Error == error; Error == 'EXIT' -> - lager:error("Failed to compact ~p: ~p", [Pid, Reason]) - end, - ?MODULE:worker_loop(Parent); + spawn_link(fun()-> + Start = os:timestamp(), + Result = merge_index:compact(Pid), + case Result of + {ok, 0, 0}->ok; + {ok, OldSegments, OldBytes} -> + Worker ! {compaction_res,Result}, + ElapsedMSecs = timer:now_diff(os:timestamp(), Start) / 1000, + lager:debug( + "Single Compaction ~p: ~p segments for ~p bytes in ~p milliseconds, ~.2f MB/sec", + [Pid, OldSegments, OldBytes, ElapsedMSecs, OldBytes/ElapsedMSecs/1024]); + {Error, Reason} when Error == error; Error == 'EXIT' -> + lager:error("Failed to compact ~p: ~p", [Pid, Reason]) + end + end), + erlang:send_after(ms_before_replace(TimeRing,TimeRingSpan),Parent,{worker_ready,Worker}), + ?MODULE:worker_loop(State#wstate{timering=replace_oldest(TimeRing)}); _ -> %% ignore unknown messages - ?MODULE:worker_loop(Parent) - after 1000 -> - ?MODULE:worker_loop(Parent) + ?MODULE:worker_loop(State) end. diff --git a/src/mi_server.erl b/src/mi_server.erl index 79418b8..f286643 100644 --- a/src/mi_server.erl +++ b/src/mi_server.erl @@ -59,7 +59,8 @@ segments, buffers, next_id, - is_compacting, + config, + compacting_pids, lookup_range_pids, buffer_rollover_size, converter, @@ -74,6 +75,14 @@ segments }). +-record(config, { + max_compact_segments, + bucket_low, + bucket_high, + min_segment_size + }). +-define(MIN_COMPACT_SEGMENTS,5). + -define(RESULTVEC_SIZE, 1000). -define(DELETEME_FLAG, ".deleted"). @@ -146,6 +155,17 @@ init([Root]) -> %% don't pull down this merge_index if they fail process_flag(trap_exit, true), + %% Cache config in order to avoid application:get_env overhead + {ok,SegSimilarityRatio} = application:get_env(merge_index, segment_similarity_ratio), + {ok,MaxCompactSegments} = application:get_env(merge_index, max_compact_segments), + {ok,MinSegmentSize} = application:get_env(merge_index, min_segment_size), + Config = #config{ + max_compact_segments=MaxCompactSegments, + min_segment_size=MinSegmentSize, + bucket_low=1-SegSimilarityRatio, + bucket_high=1+SegSimilarityRatio + }, + %% Create the state... State = #state { root = Root, @@ -153,7 +173,8 @@ init([Root]) -> buffers = [Buffer], segments = Segments, next_id = NextID, - is_compacting = false, + config = Config, + compacting_pids = [], lookup_range_pids = [], buffer_rollover_size=fuzzed_rollover_size(), to_convert = queue:new() @@ -224,52 +245,40 @@ handle_call({index, Postings}, _From, State) -> end; handle_call(start_compaction, _From, State) - when is_tuple(State#state.is_compacting) -> - %% Don't compact if we are already compacting, or if we have fewer - %% than five open segments. + when length(State#state.segments) =< ?MIN_COMPACT_SEGMENTS -> + %% Don't compact if we have fewer than MIN_COMPACT_SEGMENTS {reply, {ok, 0, 0}, State}; handle_call(start_compaction, From, State) -> - %% Get list of segments to compact. Do this by getting filesizes, - %% and then lopping off files larger than the average. This could be - %% optimized with tuning, but probably a good enough solution. - Segments = State#state.segments, - {ok, MaxSegments} = application:get_env(merge_index, max_compact_segments), - {ok, {M,F}} = application:get_env(merge_index, compact_mod_fun), - SegmentsToCompact = case M:F(Segments) of - STC when length(STC) > MaxSegments -> - lists:sublist(STC, MaxSegments); - STC -> - STC - end, - - case SegmentsToCompact of - [] -> - {reply, {ok, 0, 0}, State}; - _ -> + %% Get list of segments to compact : as Cassandra SizeTieredCompactionStrategy + #state{segments=Segments,locks=Locks,config=Config,compacting_pids=CompactingPids} = State, + case get_segments_to_merge(Segments,Config,Locks) of + [] -> {reply, {ok, 0, 0}, State}; + SegmentsToCompact -> BytesToCompact = lists:sum([mi_segment:filesize(X) || X <- SegmentsToCompact]), %% Spawn a function to merge a bunch of segments into one... Pid = self(), - CF = - fun() -> - %% Create the group iterator... - SegmentIterators = [mi_segment:iterator(X) || X <- SegmentsToCompact], - GroupIterator = build_iterator_tree(SegmentIterators), - - %% Create the new compaction segment... - <> = erlang:md5(term_to_binary({now, make_ref()})), - SName = join(State, io_lib:format("segment.~.16B", [MD5])), - set_deleteme_flag(SName), - CompactSegment = mi_segment:open_write(SName), - - %% Run the compaction... - mi_segment:from_iterator(GroupIterator, CompactSegment), - gen_server:cast(Pid, {compacted, CompactSegment, SegmentsToCompact, BytesToCompact, From}) - end, - CompactingPid = spawn_opt(CF, [link, {fullsweep_after, 0}]), - {noreply, State#state { is_compacting={From, CompactingPid} }} - end; + CompactingPid = spawn_opt(fun() -> + %% Create the group iterator... + SegmentIterators = [mi_segment:iterator(X) || X <- SegmentsToCompact], + GroupIterator = build_iterator_tree(SegmentIterators), + + %% Create the new compaction segment... + <> = erlang:md5(term_to_binary({now, make_ref()})), + SName = join(State, io_lib:format("segment.~.16B", [MD5])), + set_deleteme_flag(SName), + CompactSegment = mi_segment:open_write(SName), + + %% Run the compaction... + mi_segment:from_iterator(GroupIterator, CompactSegment), + gen_server:cast(Pid, {compacted, CompactSegment, SegmentsToCompact, BytesToCompact, From}) + end, [link, {fullsweep_after, 0}]), + NewLocks = lists:foldl(fun (S,Acc)-> + mi_locks:claim_compact(mi_segment:filename(S),Acc) + end,Locks,SegmentsToCompact), + {noreply, State#state { compacting_pids=[{From, CompactingPid}|CompactingPids] , locks=NewLocks}} + end; handle_call({info, Index, Field, Term}, _From, State) -> %% Calculate the IFT... @@ -411,7 +420,7 @@ handle_call(Msg, _From, State) -> {reply, ok, State}. handle_cast({compacted, CompactSegmentWO, OldSegments, OldBytes, From}, State) -> - #state { locks=Locks, segments=Segments } = State, + #state { locks=Locks, segments=Segments , compacting_pids=CompactingPids } = State, %% Clean up. Remove delete flag on the new segment. Add delete %% flags to the old segments. Register to delete the old segments @@ -423,7 +432,9 @@ handle_cast({compacted, CompactSegmentWO, OldSegments, OldBytes, From}, State) - [set_deleteme_flag(mi_segment:filename(X)) || X <- OldSegments], F = fun(X, Acc) -> - mi_locks:when_free(mi_segment:filename(X), fun() -> mi_segment:delete(X) end, Acc) + Key = mi_segment:filename(X), + Acc1 = mi_locks:when_free(Key, fun() -> mi_segment:delete(X) end, Acc), + mi_locks:release_compact(Key,Acc1) end, NewLocks = lists:foldl(F, Locks, OldSegments), @@ -431,7 +442,7 @@ handle_cast({compacted, CompactSegmentWO, OldSegments, OldBytes, From}, State) - NewState = State#state { locks=NewLocks, segments=[CompactSegmentRO|(Segments -- OldSegments)], - is_compacting=false + compacting_pids=lists:keydelete(From,1,CompactingPids) }, %% Tell the awaiting process that we've finished compaction. @@ -440,8 +451,7 @@ handle_cast({compacted, CompactSegmentWO, OldSegments, OldBytes, From}, State) - handle_cast({buffer_to_segment, Buffer, SegmentWO}, State) -> #state { root=Root, locks=Locks, buffers=Buffers, segments=Segments, - to_convert=ToConvert, - is_compacting=IsCompacting } = State, + to_convert=ToConvert } = State, %% Clean up by clearing delete flag on the segment, adding delete %% flag to the buffer, and telling the system to delete the buffer @@ -481,14 +491,7 @@ handle_cast({buffer_to_segment, Buffer, SegmentWO}, State) -> }, %% Give us the opportunity to do a merge... - {ok, {M,F}} = application:get_env(merge_index, compact_mod_fun), - SegmentsToMerge = M:F(NewSegments), - case length(SegmentsToMerge) of - Num when Num =< 2 orelse is_tuple(IsCompacting) -> - ok; - _ -> - mi_scheduler:schedule_compaction(self()) - end, + mi_scheduler:schedule_compaction(self()), {noreply, NewState}; false -> lager:warning("`buffer_to_segment` cast received" @@ -501,27 +504,8 @@ handle_cast(Msg, State) -> lager:error("Unexpected cast ~p", [Msg]), {noreply, State}. -handle_info({'EXIT', CompactingPid, Reason}, - #state{is_compacting={From, CompactingPid}}=State) -> - %% the spawned compaction process exited - case Reason of - normal -> - %% compaction finished normally: nothing to be done - %% handle_call({compacted... already sent the reply - ok; - _ -> - %% compaction failed: not too much to worry about - %% (it should be safe to try again later) - %% but we need to let the compaction-requester know - %% that we're not compacting any more - gen_server:reply(From, {error, Reason}) - end, - - %% clear out compaction flags, so we try again when necessary - {noreply, State#state{is_compacting=false}}; - handle_info({'EXIT', Pid, Reason}, - #state{lookup_range_pids=SRPids}=State) -> + #state{lookup_range_pids=SRPids,compacting_pids=CompactingPids}=State) -> case lists:keytake(Pid, #stream_range.pid, SRPids) of {value, SR, NewSRPids} -> @@ -553,8 +537,21 @@ handle_info({'EXIT', Pid, Reason}, {noreply, State#state { locks=NewLocks1, lookup_range_pids=NewSRPids }}; false -> - %% some random other process exited: ignore - {noreply, State} + case lists:keytake(Pid, 2, CompactingPids) of + {value, {From,_CompactingPid}, NewCompactingPids}-> + %% a spawned compaction process exited + case Reason of + normal -> ok; %% compaction finished normally: nothing to be done + _ -> %% compaction failed: not too much to worry about (it should be safe to try again later) + %% but we need to let the compaction-requester know + %% that we're not compacting any more + gen_server:reply(From, {error, Reason}) + end, + %% clear out compaction flags, so we try again when necessary + {noreply, State#state{compacting_pids=NewCompactingPids}}; + false -> %% some random other process exited: ignore + {noreply, State} + end end; handle_info(Msg, State) -> @@ -745,6 +742,44 @@ group_iterator(Iterator, eof) -> clear_deleteme_flag(Filename) -> file:delete(Filename ++ ?DELETEME_FLAG). +%% Figure out which files to merge +get_segments_to_merge(Segments,#config{max_compact_segments=MaxCompactSegments}=Config,Locks) -> + NotCompactingSegments = lists:filter(fun(S)-> + mi_locks:is_compact_free(mi_segment:filename(S),Locks) + end,Segments), + %% Group segments by similar size (buckets) + Buckets = get_buckets(NotCompactingSegments,Config), + %% Take only the groups > min_compact_segments and take the max_compact_segments firsts + PrunedBuckets = dict:fold( + fun (_,Bucket,Acc0) when length(Bucket) < ?MIN_COMPACT_SEGMENTS -> Acc0; + (_,Bucket,Acc0)-> + SortedBucket = lists:reverse(Bucket), %% alreay sorted by construction but reversed + ToMerge = lists:sublist(SortedBucket, min(length(Bucket),MaxCompactSegments)), + Avg = lists:sum([Size || {Size,_}<-Bucket]) div length(Bucket), + [{Avg,ToMerge}|Acc0] + end, [],Buckets), + %% then take the group with the smallest segment average size + case lists:sort(PrunedBuckets) of + [] -> []; + [{_,Segs}|_] -> [Seg || {_,Seg}<-Segs] + end. + +get_buckets(Segments,#config{bucket_low=BucketLow, bucket_high=BucketHigh, min_segment_size=MinSegSize})-> + %% sort segs to group them in average size groups in a deterministic way + SortedSizedSegments = lists:sort([{mi_segment:filesize(X),X} || X <- Segments]), + lists:foldl(fun({Size,_}=Seg,Acc0)-> + NotSimilar = fun({AverageSize,_})-> + not ((Size > AverageSize*BucketLow andalso Size < AverageSize*BucketHigh) + orelse (Size < MinSegSize andalso AverageSize < MinSegSize)) + end, + case lists:dropwhile(NotSimilar,dict:to_list(Acc0)) of % if a bucket is similar, add seg to bucket and change averagesize + [{AverageSize,Bucket}|_] -> NbSeg = length(Bucket), + dict:store((AverageSize*NbSeg+Size)/(NbSeg+1),[Seg|Bucket],dict:erase(AverageSize,Acc0)); + [] -> % else create a single bucket with the seg + dict:store(Size,[Seg],Acc0) + end + end,dict:new(),SortedSizedSegments). + fold_itr(_Fun, Acc, eof) -> Acc; fold_itr(Fun, Acc, {Term, IteratorFun}) -> fold_itr(Fun, Fun(Term, Acc), IteratorFun()).