diff --git a/src/asobi_telemetry.erl b/src/asobi_telemetry.erl index ef65e55..92a66c9 100644 --- a/src/asobi_telemetry.erl +++ b/src/asobi_telemetry.erl @@ -7,6 +7,7 @@ -export([matchmaker_queued/2, matchmaker_removed/2, matchmaker_formed/3]). -export([session_connected/1, session_disconnected/2]). -export([ws_connected/0, ws_disconnected/0, ws_message_in/1, ws_message_out/1]). +-export([anticheat_violation/3]). -export([economy_transaction/4, store_purchase/3]). -export([chat_message_sent/2]). -export([vote_started/2, vote_cast/2, vote_resolved/3]). @@ -157,6 +158,14 @@ ws_connected() -> ws_disconnected() -> telemetry:execute([asobi, ws, disconnected], #{count => 1}, #{}). +-spec anticheat_violation(binary(), atom(), map()) -> ok. +anticheat_violation(PlayerId, Type, Details) -> + telemetry:execute( + [asobi, anticheat, violation], + #{count => 1}, + #{player_id => PlayerId, type => Type, details => Details} + ). + -spec ws_message_in(binary()) -> ok. ws_message_in(Type) -> telemetry:execute([asobi, ws, message_in], #{count => 1}, #{type => Type}). diff --git a/src/world/asobi_world_server.erl b/src/world/asobi_world_server.erl index 9a76835..9b1fa3e 100644 --- a/src/world/asobi_world_server.erl +++ b/src/world/asobi_world_server.erl @@ -2,6 +2,7 @@ -behaviour(gen_statem). -export([start_link/1, join/2, leave/2, move_player/3, post_tick/2, get_info/1, cancel/1]). +-export([reconnect/2]). -export([start_vote/2, cast_vote/4, use_veto/3]). -export([whereis/1]). -export([callback_mode/0, init/1, terminate/3]). @@ -40,6 +41,10 @@ post_tick(Pid, TickN) -> get_info(Pid) -> gen_statem:call(Pid, get_info). +-spec reconnect(pid(), binary()) -> ok | {error, term()}. +reconnect(Pid, PlayerId) -> + gen_statem:call(Pid, {reconnect, PlayerId}). + -spec cancel(pid()) -> ok. cancel(Pid) -> gen_statem:cast(Pid, cancel). @@ -119,6 +124,7 @@ init(Config) -> frustration_bonus => FrustrationBonus, active_votes => #{}, persistent => Persistent, + reconnect_state => init_reconnect(Config), chat_state => ChatState, phase_state => PhaseState }, @@ -178,7 +184,8 @@ running( ) -> case Mod:post_tick(TickN, GS) of {ok, GS1} -> - State1 = tick_phases(TickRate, State#{game_state => GS1}), + State0 = tick_reconnect(State#{game_state => GS1}), + State1 = tick_phases(TickRate, State0), case maps:get(phase_state, State1) of PS when is_map(PS) -> case asobi_phase:info(PS) of @@ -215,7 +222,65 @@ running(info, {vote_vetoed, VoteId, _Template}, State) -> {keep_state, State#{active_votes => Active}}; running(info, {asobi_message, _}, _State) -> %% Zone snapshots/deltas forwarded here in tests — ignore - keep_state_and_data. + keep_state_and_data; +running( + info, {'DOWN', _MonRef, process, DownPid, _Reason}, #{reconnect_state := undefined} = State +) -> + %% No reconnect policy — no action (entity stays, player can rejoin) + case find_player_by_pid(DownPid, State) of + {ok, _PlayerId} -> keep_state_and_data; + none -> keep_state_and_data + end; +running(info, {'DOWN', _MonRef, process, DownPid, _Reason}, #{reconnect_state := RS} = State) -> + case find_player_by_pid(DownPid, State) of + {ok, PlayerId} -> + Now = erlang:system_time(millisecond), + {Events, RS1} = asobi_reconnect:disconnect(PlayerId, Now, RS), + State1 = handle_reconnect_events(Events, State#{reconnect_state => RS1}), + {keep_state, State1}; + none -> + keep_state_and_data + end; +running({call, From}, {reconnect, PlayerId}, State) -> + #{ + reconnect_state := RS, + player_zones := PZ, + zone_pids := ZP, + view_radius := _ViewRadius, + grid_size := _GridSize + } = State, + case {RS, maps:get(PlayerId, PZ, undefined)} of + {undefined, _} -> + {keep_state_and_data, [{reply, From, {error, no_reconnect_policy}}]}; + {_, undefined} -> + {keep_state_and_data, [{reply, From, {error, not_in_world}}]}; + {_, #{zone := ZoneCoords, interest := InterestZones}} -> + {_Events, RS1} = asobi_reconnect:reconnect(PlayerId, RS), + PlayerPid = find_player_pid(PlayerId), + MonRef = erlang:monitor(process, PlayerPid), + %% Re-subscribe to all interest zones + lists:foreach( + fun(Coords) -> + case maps:get(Coords, ZP, undefined) of + undefined -> ok; + ZPid -> asobi_zone:subscribe(ZPid, {PlayerId, PlayerPid}) + end + end, + InterestZones + ), + %% Notify session of world/zone + ZonePid = maps:get(ZoneCoords, ZP, undefined), + asobi_presence:send(PlayerId, {world_joined, self(), ZonePid}), + %% Update player entry with new session + Players = maps:get(players, State), + PlayerMeta = maps:get(PlayerId, Players, #{}), + Players1 = Players#{ + PlayerId => PlayerMeta#{ + session_pid => PlayerPid, monitor_ref => MonRef + } + }, + {keep_state, State#{reconnect_state => RS1, players => Players1}, [{reply, From, ok}]} + end. %% --- finished state --- @@ -307,12 +372,22 @@ handle_join( {ok, SpawnPos} = Mod:spawn_position(PlayerId, GS1), State1 = State#{game_state => GS1}, State2 = place_player(PlayerId, SpawnPos, State1), + %% Monitor player session for reconnection handling + PlayerPid = find_player_pid(PlayerId), + MonRef = erlang:monitor(process, PlayerPid), Players1 = Players#{ PlayerId => #{ joined_at => erlang:system_time(millisecond), - position => SpawnPos + position => SpawnPos, + session_pid => PlayerPid, + monitor_ref => MonRef } }, + %% Track player→world for reconnection lookup + case ets:info(asobi_player_worlds) of + undefined -> ok; + _ -> ets:insert(asobi_player_worlds, {PlayerId, self()}) + end, VetoTokens = maps:get(veto_tokens, State2), VetoCount = maps:get(veto_tokens_per_player, State2), State3 = State2#{ @@ -665,7 +740,13 @@ persist_result(#{world_id := WorldId, players := Players} = State) -> notify_players(Event, #{players := Players, world_id := WorldId} = State) -> Payload = case Event of - finished -> #{world_id => WorldId, result => maps:get(result, State, #{})} + finished -> + #{world_id => WorldId, result => maps:get(result, State, #{})}; + phase_changed -> + case maps:get(phase_state, State, undefined) of + undefined -> #{world_id => WorldId}; + PS -> maps:merge(#{world_id => WorldId}, asobi_phase:info(PS)) + end end, maps:foreach( fun(PlayerId, _Meta) -> @@ -690,6 +771,55 @@ world_info(Status, #{world_id := WorldId, players := Players} = State) -> PS -> Base#{phase => asobi_phase:info(PS)} end. +%% --- Internal: Reconnection --- + +init_reconnect(Config) -> + case maps:get(reconnect, Config, undefined) of + undefined -> undefined; + Policy when is_map(Policy) -> asobi_reconnect:new(Policy) + end. + +tick_reconnect(#{reconnect_state := undefined} = State) -> + State; +tick_reconnect(#{reconnect_state := RS, tick_rate := TickRate} = State) -> + {Events, RS1} = asobi_reconnect:tick(TickRate, RS), + State1 = State#{reconnect_state => RS1}, + handle_reconnect_events(Events, State1). + +handle_reconnect_events([], State) -> + State; +handle_reconnect_events([{grace_expired, PlayerId, Action} | Rest], State) -> + State1 = + case Action of + remove -> + #{world_id := WorldId} = State, + case ets:info(asobi_player_worlds) of + undefined -> ok; + _ -> ets:delete(asobi_player_worlds, PlayerId) + end, + asobi_telemetry:world_player_left(WorldId, PlayerId), + State2 = remove_player_from_zones(PlayerId, State), + Players = maps:remove(PlayerId, maps:get(players, State2)), + State2#{players => Players}; + _ -> + State + end, + handle_reconnect_events(Rest, State1); +handle_reconnect_events([_ | Rest], State) -> + handle_reconnect_events(Rest, State). + +find_player_by_pid(Pid, #{players := Players}) -> + maps:fold( + fun + (PlayerId, #{session_pid := SPid}, none) when SPid =:= Pid -> + {ok, PlayerId}; + (_, _, Acc) -> + Acc + end, + none, + Players + ). + resolve_siblings(#{instance_sup := InstanceSup}) -> ZoneSupPid = asobi_world_instance:get_child(InstanceSup, asobi_zone_sup), TickerPid = asobi_world_instance:get_child(InstanceSup, asobi_world_ticker), @@ -723,10 +853,25 @@ tick_phases( undefined -> ~"complete"; P -> P end - ); + ), + %% Broadcast phase change to all players + notify_players(phase_changed, State#{phase_state => PS1}); false -> ok end, + %% Periodically send phase info (every ~50 ticks = ~2.5s) + case erlang:system_time(second) rem 3 of + 0 -> + PhaseInfo2 = asobi_phase:info(PS1), + maps:foreach( + fun(PlayerId, _) -> + asobi_presence:send(PlayerId, {world_event, phase_changed, PhaseInfo2}) + end, + maps:get(players, State, #{}) + ); + _ -> + ok + end, GS1 = handle_phase_events(Events, Mod, GS), State#{phase_state => PS1, game_state => GS1}. diff --git a/src/world/asobi_world_sup.erl b/src/world/asobi_world_sup.erl index ec8dd12..ac18d46 100644 --- a/src/world/asobi_world_sup.erl +++ b/src/world/asobi_world_sup.erl @@ -10,6 +10,21 @@ start_link() -> -spec init([]) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. init([]) -> + %% ETS tables for world state backup and player-world mapping + _ = + case ets:info(asobi_world_state) of + undefined -> + ets:new(asobi_world_state, [named_table, public, set, {read_concurrency, true}]); + _ -> + ok + end, + _ = + case ets:info(asobi_player_worlds) of + undefined -> + ets:new(asobi_player_worlds, [named_table, public, set, {read_concurrency, true}]); + _ -> + ok + end, SupFlags = #{ strategy => one_for_one, intensity => 10, diff --git a/src/world/asobi_zone.erl b/src/world/asobi_zone.erl index 9e9e46d..3d1018a 100644 --- a/src/world/asobi_zone.erl +++ b/src/world/asobi_zone.erl @@ -66,12 +66,14 @@ init(Config) -> GameModule = maps:get(game_module, Config), ZoneState = maps:get(zone_state, Config, #{}), pg:join(?PG_SCOPE, {asobi_zone, WorldId, Coords}, self()), + %% Recover entity state from ETS backup if available (zone crash recovery) + RecoveredEntities = recover_zone_state(WorldId, Coords), {ok, #{ world_id => WorldId, coords => Coords, ticker_pid => TickerPid, game_module => GameModule, - entities => #{}, + entities => RecoveredEntities, prev_entities => #{}, broadcast_entities => #{}, broadcast_interval => maps:get(broadcast_interval, Config, 3), @@ -138,7 +140,17 @@ handle_info(_Info, State) -> {noreply, State}. -spec terminate(term(), map()) -> ok. -terminate(_Reason, #{world_id := WorldId, coords := Coords}) -> +terminate(normal, #{world_id := WorldId, coords := Coords}) -> + clear_zone_backup(WorldId, Coords), + pg:leave(?PG_SCOPE, {asobi_zone, WorldId, Coords}, self()), + ok; +terminate({shutdown, _}, #{world_id := WorldId, coords := Coords}) -> + clear_zone_backup(WorldId, Coords), + pg:leave(?PG_SCOPE, {asobi_zone, WorldId, Coords}, self()), + ok; +terminate(_Reason, #{world_id := WorldId, coords := Coords, entities := Entities}) -> + %% Abnormal termination — save state for recovery + backup_zone_state(WorldId, Coords, Entities), pg:leave(?PG_SCOPE, {asobi_zone, WorldId, Coords}, self()), ok. @@ -147,6 +159,8 @@ terminate(_Reason, #{world_id := WorldId, coords := Coords}) -> do_tick( TickN, #{ + world_id := WorldId, + coords := Coords, game_module := GameMod, entities := Entities, prev_entities := _PrevEntities, @@ -160,7 +174,8 @@ do_tick( } = State ) -> Entities1 = apply_inputs(GameMod, Queue, Entities), - {Entities2, ZoneState1} = GameMod:zone_tick(Entities1, ZoneState), + ZoneStateWithTick = ZoneState#{tick => TickN}, + {Entities2, ZoneState1} = GameMod:zone_tick(Entities1, ZoneStateWithTick), Now = erlang:system_time(millisecond), {TimerEvents, ET1} = asobi_entity_timer:tick(Now, ET), Entities3 = apply_timer_events(TimerEvents, Entities2), @@ -175,6 +190,11 @@ do_tick( State end, asobi_world_ticker:tick_done(TickerPid, self(), TickN), + %% Periodic backup for crash recovery (every 20 ticks ≈ 1 second) + case TickN rem 20 of + 0 -> backup_zone_state(WorldId, Coords, Entities3); + _ -> ok + end, State1#{ entities => Entities3, prev_entities => Entities3, @@ -259,3 +279,31 @@ encode_delta({added, Id, FullState}) -> FullState#{~"op" => ~"a", ~"id" => Id}; encode_delta({removed, Id}) -> #{~"op" => ~"r", ~"id" => Id}. + +%% --- Zone State Backup/Recovery --- + +backup_zone_state(WorldId, Coords, Entities) -> + case ets:info(asobi_world_state) of + undefined -> ok; + _ -> ets:insert(asobi_world_state, {{WorldId, Coords}, Entities}) + end. + +recover_zone_state(WorldId, Coords) -> + case ets:info(asobi_world_state) of + undefined -> + #{}; + _ -> + case ets:lookup(asobi_world_state, {WorldId, Coords}) of + [{{WorldId, Coords}, Entities}] -> + ets:delete(asobi_world_state, {WorldId, Coords}), + Entities; + [] -> + #{} + end + end. + +clear_zone_backup(WorldId, Coords) -> + case ets:info(asobi_world_state) of + undefined -> ok; + _ -> ets:delete(asobi_world_state, {WorldId, Coords}) + end. diff --git a/src/ws/asobi_ws_handler.erl b/src/ws/asobi_ws_handler.erl index 9d04b82..7270f65 100644 --- a/src/ws/asobi_ws_handler.erl +++ b/src/ws/asobi_ws_handler.erl @@ -33,7 +33,7 @@ websocket_handle({text, Raw}, State) -> try json:decode(Raw) of #{~"type" := Type} = Msg when is_binary(Type) -> asobi_telemetry:ws_message_in(Type), - handle_message(Msg, State1); + safe_handle_message(Msg, State1); _ -> Reply = encode_reply(undefined, ~"error", #{ reason => ~"invalid_message" @@ -100,6 +100,16 @@ handle_message(#{~"type" := ~"session.connect", ~"payload" := Payload} = Msg, St {ok, PlayerId} -> {ok, SessionPid} = asobi_player_session_sup:start_session(PlayerId, self()), asobi_telemetry:session_connected(PlayerId), + %% Check for pending world reconnection + _ = + case ets:lookup(asobi_player_worlds, PlayerId) of + [{PlayerId, WorldPid}] -> + _ = spawn(fun() -> + catch asobi_world_server:reconnect(WorldPid, PlayerId) + end); + [] -> + ok + end, Reply = encode_reply(Cid, ~"session.connected", #{player_id => PlayerId}), {reply, {text, Reply}, State#{session => SessionPid, player_id => PlayerId}}; {error, Reason} -> @@ -152,8 +162,13 @@ handle_message( is_binary(PlayerId) -> #{~"channel_id" := ChannelId, ~"content" := Content} = Payload, - asobi_chat_channel:send_message(ChannelId, PlayerId, Content), - {ok, State}; + case is_binary(Content) andalso byte_size(Content) =< 2000 of + true -> + asobi_chat_channel:send_message(ChannelId, PlayerId, Content), + {ok, State}; + false -> + {ok, State} + end; handle_message( #{~"type" := ~"dm.send", ~"payload" := Payload} = Msg, #{player_id := PlayerId} = State ) when is_binary(PlayerId) -> @@ -408,6 +423,35 @@ handle_message(#{~"type" := Type} = Msg, State) -> handle_message(_Msg, State) -> {ok, State}. +%% --- Safe Message Dispatch --- + +safe_handle_message(Msg, State) -> + try + handle_message(Msg, State) + catch + error:{badmatch, _}:_Stack -> + reply_error(Msg, ~"invalid_payload", State); + error:{badkey, _}:_Stack -> + reply_error(Msg, ~"missing_field", State); + error:function_clause:_Stack -> + reply_error(Msg, ~"invalid_payload", State); + error:{case_clause, _}:_Stack -> + reply_error(Msg, ~"invalid_payload", State); + Class:Reason:Stack -> + logger:warning(#{ + msg => ~"ws_handler_crash", + class => Class, + reason => Reason, + stacktrace => Stack + }), + reply_error(Msg, ~"internal_error", State) + end. + +reply_error(Msg, Reason, State) -> + Cid = maps:get(~"cid", Msg, undefined), + Reply = encode_reply(Cid, ~"error", #{reason => Reason}), + {reply, {text, Reply}, State}. + %% --- Internal --- authenticate(#{~"token" := Token}) ->