Skip to content
Open
3 changes: 3 additions & 0 deletions src/merge_index.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
{compact_mod_fun, {mi_segment, compact_by_average}},
{compact_staleness_threshold, {1, hours}},
{max_compact_segments, 20},
{segment_similarity_ratio, 0.5},
{min_segment_size, 50000000},
{compaction_throughput_mb_per_sec, 30},
{segment_query_read_ahead_size, 65536},
{segment_compact_read_ahead_size, 5242880},
{segment_file_buffer_size, 20971520},
Expand Down
69 changes: 41 additions & 28 deletions src/mi_locks.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,58 +29,71 @@
claim/2,
claim_many/2,
release/2,
when_free/3
when_free/3,
claim_compact/2,
release_compact/2,
is_compact_free/2
]).

-record(lock, {
key,
count,
not_compacting=true,
funs=[]
}).

new() -> [].
new() -> dict:new().

claim_many(Keys, Locks) ->
lists:foldl(fun claim/2, Locks, Keys).

claim(Key, Locks) ->
case lists:keyfind(Key, #lock.key, Locks) of
Lock = #lock { count=Count } ->
NewLock = Lock#lock { count=Count + 1 },
lists:keystore(Key, #lock.key, Locks, NewLock);

false ->
NewLock = #lock { key=Key, count=1, funs=[] },
lists:keystore(Key, #lock.key, Locks, NewLock)
case dict:find(Key,Locks) of
{ok,#lock{count=Count}=Lock} ->
dict:store(Key,Lock#lock{count=Count + 1},Locks);
error ->
dict:store(Key,#lock{count=1,funs=[]},Locks)
end.

release(Key, Locks) ->
case lists:keyfind(Key, #lock.key, Locks) of
#lock { count=1, funs=Funs } ->
case dict:find(Key,Locks) of
{ok,#lock{count=1,funs=Funs}} ->
[X() || X <- Funs],
lists:keydelete(Key, #lock.key, Locks);

Lock = #lock { count=Count } ->
NewLock = Lock#lock { count = Count - 1 },
lists:keystore(Key, #lock.key, Locks, NewLock);

false ->
dict:erase(Key,Locks);
{ok,#lock{count=Count}=Lock} ->
dict:store(Key,Lock#lock{count=Count - 1},Locks);
error ->
throw({lock_does_not_exist, Key})
end.

%% Run the provided function when the key is free. If the key is
%% currently free, then this is run immeditaely.
when_free(Key, Fun, Locks) ->
case lists:keyfind(Key, #lock.key, Locks) of
false ->
case dict:find(Key,Locks) of
error ->
Fun(),
Locks;

#lock { count=0, funs=Funs } ->
{ok,#lock{count=0,funs=Funs}} ->
[X() || X <- [Fun|Funs]],
lists:keydelete(Key, #lock.key, Locks);
dict:erase(Key,Locks);
{ok,#lock{funs=Funs}=Lock} ->
dict:store(Key,Lock#lock{funs=[Fun|Funs]},Locks)
end.

claim_compact(Key,Locks) ->
case dict:find(Key,Locks) of
{ok,#lock{count=Count}=Lock} -> dict:store(Key,Lock#lock{not_compacting=false,count=Count+1},Locks);
error -> dict:store(Key,#lock{count=1,funs=[],not_compacting=false},Locks)
end.

release_compact(Key,Locks) ->
NewLocks = case dict:find(Key,Locks) of
{ok,Lock} -> dict:store(Key,Lock#lock{not_compacting=true},Locks);
error -> throw({lock_does_not_exist, Key})
end,
release(Key,NewLocks).

Lock = #lock { funs=Funs} ->
NewLock = Lock#lock { funs=[Fun|Funs] },
lists:keystore(Key, #lock.key, Locks, NewLock)
is_compact_free(Key,Locks) ->
case dict:find(Key,Locks) of
{ok,Lock} -> Lock#lock.not_compacting;
error -> true
end.
117 changes: 83 additions & 34 deletions src/mi_scheduler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
-export([
start_link/0,
start/0,
schedule_compaction/1
schedule_compaction/1,
ms_before_replace/2,
new_timering/1,
replace_oldest/1
]).
%% Private export
-export([worker_loop/1]).
Expand All @@ -38,6 +41,7 @@
terminate/2, code_change/3]).

-record(state, { queue,
ready,
worker }).

%% ====================================================================
Expand Down Expand Up @@ -67,11 +71,16 @@ init([]) ->
%% and we want to ensure that our message queue doesn't fill up with
%% a bunch of dup requests for the same directory.
Self = self(),
WorkerPid = spawn_link(fun() -> worker_loop(Self) end),
WorkerPid = spawn_link(fun() -> worker_loop(worker_init_state(Self)) end),
{ok, #state{ queue = queue:new(),
worker = WorkerPid }}.

handle_call({schedule_compaction, Pid}, _From, #state { queue = Q } = State) ->
worker = WorkerPid,
ready = true}}.

handle_call({schedule_compaction, Pid}, _From, #state { ready = true, worker = WorkerPid } = State) ->
WorkerPid ! {compaction, Pid},
{reply, ok, State#state {ready = false}};

handle_call({schedule_compaction, Pid}, _From, #state { ready = false, queue = Q } = State) ->
case queue:member(Pid, Q) of
true ->
{reply, already_queued, State};
Expand All @@ -91,18 +100,18 @@ handle_cast(Msg, State) ->
handle_info({worker_ready, WorkerPid}, #state { queue = Q } = State) ->
case queue:out(Q) of
{empty, Q} ->
{noreply, State};
{noreply, State#state{ ready = true }};
{{value, Pid}, NewQ} ->
WorkerPid ! {compaction, Pid},
NewState = State#state { queue=NewQ },
NewState = State#state { queue=NewQ , ready = false },
{noreply, NewState}
end;
handle_info({'EXIT', WorkerPid, Reason}, #state { worker = WorkerPid } = State) ->
lager:error("Compaction worker ~p exited: ~p", [WorkerPid, Reason]),
%% Start a new worker.
Self=self(),
NewWorkerPid = spawn_link(fun() -> worker_loop(Self) end),
NewState = State#state { worker=NewWorkerPid },
Self = self(),
NewWorkerPid = spawn_link(fun() -> worker_loop(worker_init_state(Self)) end),
NewState = State#state { worker=NewWorkerPid , ready = true},
{noreply, NewState};

handle_info(Info, State) ->
Expand All @@ -118,32 +127,72 @@ code_change(_OldVsn, State, _Extra) ->
%% ====================================================================
%% Internal worker
%% ====================================================================

worker_loop(Parent) ->
Parent ! {worker_ready, self()},
-define(TIMERING_SIZE, 10).
-define(TIMERING_AJUST_EVERY, 5).

-record(wstate, { parent,
timering,
timering_span,
test_start,
test_compactions}).

%% THROTTLING: Run M processes in N milliseconds waiting ms_before_replace(Ring,N) before doing something
%% and replace_oldest (impl is a set of size M of timestamps representing running processes.
%% Replace the oldest one if older than N seconds else wait (oldest_ts-Nsec) : {Ts_Set, Oldest_Idx}
new_timering(M)->{erlang:make_tuple(M,now()),1}.
replace_oldest({Set,Idx})->
{erlang:setelement(Idx,Set,now()),case Idx+1 of X when X>erlang:size(Set) ->1; X->X end}.
ms_before_replace({Set,Idx},N)->
max(0,N - trunc(timer:now_diff(now(),element(Idx,Set))/1000)).

worker_init_state(Parent)->
#wstate{parent=Parent, timering=new_timering(?TIMERING_SIZE),
timering_span=1000,test_start=now(),test_compactions=[]}.

worker_loop(#wstate{timering_span=TimeRingSpan,test_start=TestStart,
test_compactions=TestCompactions}=State) when length(TestCompactions)==(?TIMERING_SIZE*?TIMERING_AJUST_EVERY) ->
{ok,WantedThroughput} = application:get_env(merge_index, compaction_throughput_mb_per_sec),
TestBytes = lists:sum([OldBytes || {ok, _, OldBytes}<-TestCompactions]),
TestSegments = lists:sum([OldSegments || {ok, OldSegments, _}<-TestCompactions]),
TestElapsedMSecs = timer:now_diff(os:timestamp(), TestStart) / 1000,
ThroughputBms = TestBytes/TestElapsedMSecs,
WantedThroughputBms = WantedThroughput *1024*1024 / 1000,
lager:info("Overall Compaction: ~p segments for ~p MBytes in ~.2f seconds, ~.2f MB/sec",
[TestSegments, TestBytes/(1024*1024), TestElapsedMSecs/1000, (ThroughputBms*1000) / (1024*1024)]),
%% We need to adjust timering span window in order to have the good throughput
%% ensure a kind of continuity not allowing more than 30% adjustment
Span = case trunc(TestBytes/WantedThroughputBms/?TIMERING_AJUST_EVERY)+1 of
NewTime when NewTime < TimeRingSpan -> max(trunc(TimeRingSpan*0.3)+1,NewTime);
NewTime -> min(trunc(TimeRingSpan*1.3)+1,NewTime)
end,
lager:info("Adjust throttling to have ~p compaction every ~p milliseconds",[?TIMERING_SIZE,Span]),
worker_loop(State#wstate{timering_span=Span,test_start=now(),test_compactions=[]});

worker_loop(#wstate{parent=Parent,timering=TimeRing,
timering_span=TimeRingSpan, test_compactions=TestCompactions}=State) ->
Worker = self(),
receive
{compaction_res,Result}->
?MODULE:worker_loop(State#wstate{test_compactions=[Result|TestCompactions]});
{compaction, Pid} ->
Start = os:timestamp(),
Result = merge_index:compact(Pid),
ElapsedSecs = timer:now_diff(os:timestamp(), Start) / 1000000,
case Result of
{ok, OldSegments, OldBytes} ->
case ElapsedSecs > 1 of
true ->
lager:info(
"Pid ~p compacted ~p segments for ~p bytes in ~p seconds, ~.2f MB/sec",
[Pid, OldSegments, OldBytes, ElapsedSecs, OldBytes/ElapsedSecs/(1024*1024)]);
false ->
ok
end;

{Error, Reason} when Error == error; Error == 'EXIT' ->
lager:error("Failed to compact ~p: ~p", [Pid, Reason])
end,
?MODULE:worker_loop(Parent);
spawn_link(fun()->
Start = os:timestamp(),
Result = merge_index:compact(Pid),
case Result of
{ok, 0, 0}->ok;
{ok, OldSegments, OldBytes} ->
Worker ! {compaction_res,Result},
ElapsedMSecs = timer:now_diff(os:timestamp(), Start) / 1000,
lager:debug(
"Single Compaction ~p: ~p segments for ~p bytes in ~p milliseconds, ~.2f MB/sec",
[Pid, OldSegments, OldBytes, ElapsedMSecs, OldBytes/ElapsedMSecs/1024]);
{Error, Reason} when Error == error; Error == 'EXIT' ->
lager:error("Failed to compact ~p: ~p", [Pid, Reason])
end
end),
erlang:send_after(ms_before_replace(TimeRing,TimeRingSpan),Parent,{worker_ready,Worker}),
?MODULE:worker_loop(State#wstate{timering=replace_oldest(TimeRing)});
_ ->
%% ignore unknown messages
?MODULE:worker_loop(Parent)
after 1000 ->
?MODULE:worker_loop(Parent)
?MODULE:worker_loop(State)
end.
Loading