Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/asobi_telemetry.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
-export([matchmaker_queued/2, matchmaker_removed/2, matchmaker_formed/3]).
-export([session_connected/1, session_disconnected/2]).
-export([ws_connected/0, ws_disconnected/0, ws_message_in/1, ws_message_out/1]).
-export([anticheat_violation/3]).
-export([economy_transaction/4, store_purchase/3]).
-export([chat_message_sent/2]).
-export([vote_started/2, vote_cast/2, vote_resolved/3]).
Expand Down Expand Up @@ -157,6 +158,14 @@ ws_connected() ->
ws_disconnected() ->
telemetry:execute([asobi, ws, disconnected], #{count => 1}, #{}).

-spec anticheat_violation(binary(), atom(), map()) -> ok.
anticheat_violation(PlayerId, Type, Details) ->
telemetry:execute(
[asobi, anticheat, violation],
#{count => 1},
#{player_id => PlayerId, type => Type, details => Details}
).

-spec ws_message_in(binary()) -> ok.
ws_message_in(Type) ->
telemetry:execute([asobi, ws, message_in], #{count => 1}, #{type => Type}).
Expand Down
155 changes: 150 additions & 5 deletions src/world/asobi_world_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
-behaviour(gen_statem).

-export([start_link/1, join/2, leave/2, move_player/3, post_tick/2, get_info/1, cancel/1]).
-export([reconnect/2]).
-export([start_vote/2, cast_vote/4, use_veto/3]).
-export([whereis/1]).
-export([callback_mode/0, init/1, terminate/3]).
Expand Down Expand Up @@ -40,6 +41,10 @@ post_tick(Pid, TickN) ->
get_info(Pid) ->
gen_statem:call(Pid, get_info).

-spec reconnect(pid(), binary()) -> ok | {error, term()}.
reconnect(Pid, PlayerId) ->
gen_statem:call(Pid, {reconnect, PlayerId}).

-spec cancel(pid()) -> ok.
cancel(Pid) ->
gen_statem:cast(Pid, cancel).
Expand Down Expand Up @@ -119,6 +124,7 @@ init(Config) ->
frustration_bonus => FrustrationBonus,
active_votes => #{},
persistent => Persistent,
reconnect_state => init_reconnect(Config),
chat_state => ChatState,
phase_state => PhaseState
},
Expand Down Expand Up @@ -178,7 +184,8 @@ running(
) ->
case Mod:post_tick(TickN, GS) of
{ok, GS1} ->
State1 = tick_phases(TickRate, State#{game_state => GS1}),
State0 = tick_reconnect(State#{game_state => GS1}),
State1 = tick_phases(TickRate, State0),
case maps:get(phase_state, State1) of
PS when is_map(PS) ->
case asobi_phase:info(PS) of
Expand Down Expand Up @@ -215,7 +222,65 @@ running(info, {vote_vetoed, VoteId, _Template}, State) ->
{keep_state, State#{active_votes => Active}};
running(info, {asobi_message, _}, _State) ->
%% Zone snapshots/deltas forwarded here in tests — ignore
keep_state_and_data.
keep_state_and_data;
running(
info, {'DOWN', _MonRef, process, DownPid, _Reason}, #{reconnect_state := undefined} = State
) ->
%% No reconnect policy — no action (entity stays, player can rejoin)
case find_player_by_pid(DownPid, State) of
{ok, _PlayerId} -> keep_state_and_data;
none -> keep_state_and_data
end;
running(info, {'DOWN', _MonRef, process, DownPid, _Reason}, #{reconnect_state := RS} = State) ->
case find_player_by_pid(DownPid, State) of
{ok, PlayerId} ->
Now = erlang:system_time(millisecond),
{Events, RS1} = asobi_reconnect:disconnect(PlayerId, Now, RS),
State1 = handle_reconnect_events(Events, State#{reconnect_state => RS1}),
{keep_state, State1};
none ->
keep_state_and_data
end;
running({call, From}, {reconnect, PlayerId}, State) ->
#{
reconnect_state := RS,
player_zones := PZ,
zone_pids := ZP,
view_radius := _ViewRadius,
grid_size := _GridSize
} = State,
case {RS, maps:get(PlayerId, PZ, undefined)} of
{undefined, _} ->
{keep_state_and_data, [{reply, From, {error, no_reconnect_policy}}]};
{_, undefined} ->
{keep_state_and_data, [{reply, From, {error, not_in_world}}]};
{_, #{zone := ZoneCoords, interest := InterestZones}} ->
{_Events, RS1} = asobi_reconnect:reconnect(PlayerId, RS),
PlayerPid = find_player_pid(PlayerId),
MonRef = erlang:monitor(process, PlayerPid),
%% Re-subscribe to all interest zones
lists:foreach(
fun(Coords) ->
case maps:get(Coords, ZP, undefined) of
undefined -> ok;
ZPid -> asobi_zone:subscribe(ZPid, {PlayerId, PlayerPid})
end
end,
InterestZones
),
%% Notify session of world/zone
ZonePid = maps:get(ZoneCoords, ZP, undefined),
asobi_presence:send(PlayerId, {world_joined, self(), ZonePid}),
%% Update player entry with new session
Players = maps:get(players, State),
PlayerMeta = maps:get(PlayerId, Players, #{}),
Players1 = Players#{
PlayerId => PlayerMeta#{
session_pid => PlayerPid, monitor_ref => MonRef
}
},
{keep_state, State#{reconnect_state => RS1, players => Players1}, [{reply, From, ok}]}
end.

%% --- finished state ---

Expand Down Expand Up @@ -307,12 +372,22 @@ handle_join(
{ok, SpawnPos} = Mod:spawn_position(PlayerId, GS1),
State1 = State#{game_state => GS1},
State2 = place_player(PlayerId, SpawnPos, State1),
%% Monitor player session for reconnection handling
PlayerPid = find_player_pid(PlayerId),
MonRef = erlang:monitor(process, PlayerPid),
Players1 = Players#{
PlayerId => #{
joined_at => erlang:system_time(millisecond),
position => SpawnPos
position => SpawnPos,
session_pid => PlayerPid,
monitor_ref => MonRef
}
},
%% Track player→world for reconnection lookup
case ets:info(asobi_player_worlds) of
undefined -> ok;
_ -> ets:insert(asobi_player_worlds, {PlayerId, self()})
end,
VetoTokens = maps:get(veto_tokens, State2),
VetoCount = maps:get(veto_tokens_per_player, State2),
State3 = State2#{
Expand Down Expand Up @@ -665,7 +740,13 @@ persist_result(#{world_id := WorldId, players := Players} = State) ->
notify_players(Event, #{players := Players, world_id := WorldId} = State) ->
Payload =
case Event of
finished -> #{world_id => WorldId, result => maps:get(result, State, #{})}
finished ->
#{world_id => WorldId, result => maps:get(result, State, #{})};
phase_changed ->
case maps:get(phase_state, State, undefined) of
undefined -> #{world_id => WorldId};
PS -> maps:merge(#{world_id => WorldId}, asobi_phase:info(PS))
end
end,
maps:foreach(
fun(PlayerId, _Meta) ->
Expand All @@ -690,6 +771,55 @@ world_info(Status, #{world_id := WorldId, players := Players} = State) ->
PS -> Base#{phase => asobi_phase:info(PS)}
end.

%% --- Internal: Reconnection ---

init_reconnect(Config) ->
case maps:get(reconnect, Config, undefined) of
undefined -> undefined;
Policy when is_map(Policy) -> asobi_reconnect:new(Policy)
end.

tick_reconnect(#{reconnect_state := undefined} = State) ->
State;
tick_reconnect(#{reconnect_state := RS, tick_rate := TickRate} = State) ->
{Events, RS1} = asobi_reconnect:tick(TickRate, RS),
State1 = State#{reconnect_state => RS1},
handle_reconnect_events(Events, State1).

handle_reconnect_events([], State) ->
State;
handle_reconnect_events([{grace_expired, PlayerId, Action} | Rest], State) ->
State1 =
case Action of
remove ->
#{world_id := WorldId} = State,
case ets:info(asobi_player_worlds) of
undefined -> ok;
_ -> ets:delete(asobi_player_worlds, PlayerId)
end,
asobi_telemetry:world_player_left(WorldId, PlayerId),
State2 = remove_player_from_zones(PlayerId, State),
Players = maps:remove(PlayerId, maps:get(players, State2)),
State2#{players => Players};
_ ->
State
end,
handle_reconnect_events(Rest, State1);
handle_reconnect_events([_ | Rest], State) ->
handle_reconnect_events(Rest, State).

find_player_by_pid(Pid, #{players := Players}) ->
maps:fold(
fun
(PlayerId, #{session_pid := SPid}, none) when SPid =:= Pid ->
{ok, PlayerId};
(_, _, Acc) ->
Acc
end,
none,
Players
).

resolve_siblings(#{instance_sup := InstanceSup}) ->
ZoneSupPid = asobi_world_instance:get_child(InstanceSup, asobi_zone_sup),
TickerPid = asobi_world_instance:get_child(InstanceSup, asobi_world_ticker),
Expand Down Expand Up @@ -723,10 +853,25 @@ tick_phases(
undefined -> ~"complete";
P -> P
end
);
),
%% Broadcast phase change to all players
notify_players(phase_changed, State#{phase_state => PS1});
false ->
ok
end,
%% Periodically send phase info (every ~50 ticks = ~2.5s)
case erlang:system_time(second) rem 3 of
0 ->
PhaseInfo2 = asobi_phase:info(PS1),
maps:foreach(
fun(PlayerId, _) ->
asobi_presence:send(PlayerId, {world_event, phase_changed, PhaseInfo2})
end,
maps:get(players, State, #{})
);
_ ->
ok
end,
GS1 = handle_phase_events(Events, Mod, GS),
State#{phase_state => PS1, game_state => GS1}.

Expand Down
15 changes: 15 additions & 0 deletions src/world/asobi_world_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,21 @@ start_link() ->

-spec init([]) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init([]) ->
%% ETS tables for world state backup and player-world mapping
_ =
case ets:info(asobi_world_state) of
undefined ->
ets:new(asobi_world_state, [named_table, public, set, {read_concurrency, true}]);
_ ->
ok
end,
_ =
case ets:info(asobi_player_worlds) of
undefined ->
ets:new(asobi_player_worlds, [named_table, public, set, {read_concurrency, true}]);
_ ->
ok
end,
SupFlags = #{
strategy => one_for_one,
intensity => 10,
Expand Down
54 changes: 51 additions & 3 deletions src/world/asobi_zone.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,14 @@ init(Config) ->
GameModule = maps:get(game_module, Config),
ZoneState = maps:get(zone_state, Config, #{}),
pg:join(?PG_SCOPE, {asobi_zone, WorldId, Coords}, self()),
%% Recover entity state from ETS backup if available (zone crash recovery)
RecoveredEntities = recover_zone_state(WorldId, Coords),
{ok, #{
world_id => WorldId,
coords => Coords,
ticker_pid => TickerPid,
game_module => GameModule,
entities => #{},
entities => RecoveredEntities,
prev_entities => #{},
broadcast_entities => #{},
broadcast_interval => maps:get(broadcast_interval, Config, 3),
Expand Down Expand Up @@ -138,7 +140,17 @@ handle_info(_Info, State) ->
{noreply, State}.

-spec terminate(term(), map()) -> ok.
terminate(_Reason, #{world_id := WorldId, coords := Coords}) ->
terminate(normal, #{world_id := WorldId, coords := Coords}) ->
clear_zone_backup(WorldId, Coords),
pg:leave(?PG_SCOPE, {asobi_zone, WorldId, Coords}, self()),
ok;
terminate({shutdown, _}, #{world_id := WorldId, coords := Coords}) ->
clear_zone_backup(WorldId, Coords),
pg:leave(?PG_SCOPE, {asobi_zone, WorldId, Coords}, self()),
ok;
terminate(_Reason, #{world_id := WorldId, coords := Coords, entities := Entities}) ->
%% Abnormal termination — save state for recovery
backup_zone_state(WorldId, Coords, Entities),
pg:leave(?PG_SCOPE, {asobi_zone, WorldId, Coords}, self()),
ok.

Expand All @@ -147,6 +159,8 @@ terminate(_Reason, #{world_id := WorldId, coords := Coords}) ->
do_tick(
TickN,
#{
world_id := WorldId,
coords := Coords,
game_module := GameMod,
entities := Entities,
prev_entities := _PrevEntities,
Expand All @@ -160,7 +174,8 @@ do_tick(
} = State
) ->
Entities1 = apply_inputs(GameMod, Queue, Entities),
{Entities2, ZoneState1} = GameMod:zone_tick(Entities1, ZoneState),
ZoneStateWithTick = ZoneState#{tick => TickN},
{Entities2, ZoneState1} = GameMod:zone_tick(Entities1, ZoneStateWithTick),
Now = erlang:system_time(millisecond),
{TimerEvents, ET1} = asobi_entity_timer:tick(Now, ET),
Entities3 = apply_timer_events(TimerEvents, Entities2),
Expand All @@ -175,6 +190,11 @@ do_tick(
State
end,
asobi_world_ticker:tick_done(TickerPid, self(), TickN),
%% Periodic backup for crash recovery (every 20 ticks ≈ 1 second)
case TickN rem 20 of
0 -> backup_zone_state(WorldId, Coords, Entities3);
_ -> ok
end,
State1#{
entities => Entities3,
prev_entities => Entities3,
Expand Down Expand Up @@ -259,3 +279,31 @@ encode_delta({added, Id, FullState}) ->
FullState#{~"op" => ~"a", ~"id" => Id};
encode_delta({removed, Id}) ->
#{~"op" => ~"r", ~"id" => Id}.

%% --- Zone State Backup/Recovery ---

backup_zone_state(WorldId, Coords, Entities) ->
case ets:info(asobi_world_state) of
undefined -> ok;
_ -> ets:insert(asobi_world_state, {{WorldId, Coords}, Entities})
end.

recover_zone_state(WorldId, Coords) ->
case ets:info(asobi_world_state) of
undefined ->
#{};
_ ->
case ets:lookup(asobi_world_state, {WorldId, Coords}) of
[{{WorldId, Coords}, Entities}] ->
ets:delete(asobi_world_state, {WorldId, Coords}),
Entities;
[] ->
#{}
end
end.

clear_zone_backup(WorldId, Coords) ->
case ets:info(asobi_world_state) of
undefined -> ok;
_ -> ets:delete(asobi_world_state, {WorldId, Coords})
end.
Loading
Loading