From 00c646020759879e9b1652693346139a4620e6cf Mon Sep 17 00:00:00 2001 From: Saeid Alizadeh Date: Sun, 7 Jul 2024 16:40:42 -0600 Subject: [PATCH 1/6] Rebased with the new structure --- api/sessions_api.go | 340 +++++++++++++++++++++++++++++++++++ models/connection/message.go | 4 + models/connection/signal.go | 3 + 3 files changed, 347 insertions(+) create mode 100644 api/sessions_api.go diff --git a/api/sessions_api.go b/api/sessions_api.go new file mode 100644 index 0000000..03eced5 --- /dev/null +++ b/api/sessions_api.go @@ -0,0 +1,340 @@ +package api + +import ( + "encoding/json" + "log" + "time" + + "github.com/gorilla/websocket" + mb "github.com/saeidalz13/battleship-backend/models/battleship" + mc "github.com/saeidalz13/battleship-backend/models/connection" +) + +const ( + gracePeriod time.Duration = time.Minute * 2 +) + +type Session struct { + ID string + Conn *websocket.Conn + GameUuid string + Player *mb.Player + StopRetry chan struct{} + GameManager *GameManager + SessionManager *SessionManager + CreatedAt time.Time +} + +func NewSession(conn *websocket.Conn, sessionID string, gameManager *GameManager, sessionManager *SessionManager) *Session { + return &Session{ + ID: sessionID, + Conn: conn, + StopRetry: make(chan struct{}), + GameManager: gameManager, + SessionManager: sessionManager, + CreatedAt: time.Now(), + } +} + +func (s *Session) run() { + defer s.terminate() + +sessionLoop: + for { + // A WebSocket frame can be one of 6 types: text=1, binary=2, ping=9, pong=10, close=8 and continuation=0 + // https://www.rfc-editor.org/rfc/rfc6455.html#section-11.8 + _, payload, err := s.Conn.ReadMessage() + if err != nil { + if s.handleReadErr(err) == ConnLoopCodeBreak { + break sessionLoop + } + } + var signal mc.Signal + if err := json.Unmarshal(payload, &signal); err != nil { + log.Println("incoming msg does not contain 'code':", err) + resp := mc.NewMessage[mc.NoPayload](mc.CodeSignalAbsent) + resp.AddError("incoming req payload must contain 'code' field", "") + + if s.writeToConn(resp) == ConnLoopCodeBreak { + break sessionLoop + } + continue sessionLoop + } + + switch signal.Code { + case mc.CodeCreateGame: + req := NewRequest(s, payload) + resp := req.HandleCreateGame() + + if s.writeToConn(resp) == ConnLoopCodeBreak { + break sessionLoop + } + + case mc.CodeAttack: + req := NewRequest(s, payload) + // response will have the IsTurn as false field of attacker + resp, defender := req.HandleAttack() + + if s.writeToConn(resp) == ConnLoopCodeBreak { + break sessionLoop + } + if resp.Error != nil { + continue sessionLoop + } + + // defender turn is set to true + resp.Payload.IsTurn = true + s.notifyOtherSession(defender.SessionID, resp) + + if defender.MatchStatus == mb.PlayerMatchStatusLost { + respAttacker := mc.NewMessage[mc.RespEndGame](mc.CodeEndGame) + respAttacker.AddPayload(mc.RespEndGame{PlayerMatchStatus: mb.PlayerMatchStatusWon}) + if s.writeToConn(respAttacker) == ConnLoopCodeBreak { + break sessionLoop + } + + respDefender := mc.NewMessage[mc.RespEndGame](mc.CodeEndGame) + respDefender.AddPayload(mc.RespEndGame{PlayerMatchStatus: mb.PlayerMatchStatusLost}) + s.notifyOtherSession(defender.SessionID, respDefender) + } + + case mc.CodeReady: + req := NewRequest(s, payload) + resp, game := req.HandleReadyPlayer() + + if s.writeToConn(resp) == ConnLoopCodeBreak { + break sessionLoop + } + if resp.Error != nil { + continue sessionLoop + } + + if game.HostPlayer.IsReady && game.JoinPlayer.IsReady { + respStartGame := mc.NewMessage[mc.NoPayload](mc.CodeStartGame) + if s.writeToConn(respStartGame) == ConnLoopCodeBreak { + break sessionLoop + } + + otherPlayer := game.GetOtherPlayer(s.Player) + s.notifyOtherSession(otherPlayer.SessionID, respStartGame) + } + + case mc.CodeJoinGame: + req := NewRequest(s, payload) + resp, game := req.HandleJoinPlayer() + + if s.writeToConn(resp) == ConnLoopCodeBreak { + break sessionLoop + } + if resp.Error != nil { + break sessionLoop + } + + readyResp := mc.NewMessage[mc.NoPayload](mc.CodeSelectGrid) + if s.writeToConn(readyResp) == ConnLoopCodeBreak { + break sessionLoop + } + s.notifyOtherSession(game.HostPlayer.SessionID, readyResp) + + case mc.CodeRematchCall: + // 1. See if the game still exists + game, err := s.GameManager.FindGame(s.GameUuid) + if err != nil { + break sessionLoop + } + + if game.IsRematchAlreadyCalled() { + continue sessionLoop + } + + game.CallRematch() + + otherPlayer := game.GetOtherPlayer(s.Player) + if otherPlayer == nil { + break sessionLoop + } + + s.Player.IsTurn = true + // Notify the other player if they want a rematch + msg := mc.NewMessage[mc.NoPayload](mc.CodeRematchCall) + s.notifyOtherSession(otherPlayer.SessionID, msg) + + case mc.CodeRematchCallAccepted: + // Send the rematch call acceptance to other player + game, err := s.GameManager.FindGame(s.GameUuid) + if err != nil { + break sessionLoop + } + + if err := game.Reset(); err != nil { + break sessionLoop + } + + // Notify the other player with their turn + msgOtherPlayer := mc.NewMessage[mc.RespRematch](mc.CodeRematch) + otherPlayer := game.GetOtherPlayer(s.Player) + if otherPlayer == nil { + break sessionLoop + } + msgOtherPlayer.AddPayload(mc.RespRematch{IsTurn: otherPlayer.IsTurn}) + s.notifyOtherSession(otherPlayer.SessionID, msgOtherPlayer) + + s.Player.IsTurn = false + msgPlayer := mc.NewMessage[mc.RespRematch](mc.CodeRematch) + msgPlayer.AddPayload(mc.RespRematch{IsTurn: s.Player.IsTurn}) + + // Notify the acceptor with their turn + if s.writeToConn(msgPlayer) == ConnLoopCodeBreak { + break sessionLoop + } + + case mc.CodeRematchCallRejected: + game, err := s.GameManager.FindGame(s.GameUuid) + if err != nil { + break sessionLoop + } + + // Notify the other player that no rematch is wanted now + msg := mc.NewMessage[mc.NoPayload](mc.CodeRematchCallRejected) + otherPlayer := game.GetOtherPlayer(s.Player) + if otherPlayer != nil { + s.notifyOtherSession(otherPlayer.SessionID, msg) + } + + break sessionLoop + + case mc.CodePlayerInteraction: + game, err := s.GameManager.FindGame(s.GameUuid) + if err != nil { + break sessionLoop + } + otherPlayer := game.GetOtherPlayer(s.Player) + if otherPlayer != nil { + s.notifyOtherSession(otherPlayer.SessionID, payload) + } + + default: + respInvalidSignal := mc.NewMessage[mc.NoPayload](mc.CodeInvalidSignal) + respInvalidSignal.AddError("", "invalid code in the incoming payload") + if s.writeToConn(respInvalidSignal) == ConnLoopCodeBreak { + break sessionLoop + } + } + } +} + +// This is to send a message to the other session. +func (s *Session) notifyOtherSession(otherSessionId string, msg interface{}) { + s.SessionManager.CommunicationChan <- NewSessionMessage(s, otherSessionId, s.GameUuid, msg) +} + +// This will delete player from the game players map +// and delete the player session +func (s *Session) terminate() { + if s.Player != nil { + s.GameManager.DeletePlayerFromGame(s.GameUuid, s.Player.Uuid) + } + s.SessionManager.DeleteSession(s.ID) +} + +// Writes to the connection of that session. It also +// handles the abnormal or other types of errors of +// writing to a websocket connection. +func (s *Session) writeToConn(p interface{}) int { + switch WriteJSONWithRetry(s.Conn, p) { + case ConnLoopAbnormalClosureRetry: + switch s.handleAbnormalClosure() { + case ConnLoopCodeBreak: + return ConnLoopCodeBreak + + case ConnLoopCodeContinue: + } + case ConnLoopCodeBreak: + return ConnLoopCodeBreak + default: + } + + return ConnLoopCodeContinue +} + +// This function takes care of abnormal closures happening +// to either of the clients. This happens due to backgrounding +// in IOS clients or any other unexpected reasons for web apps. +func (s *Session) handleAbnormalClosure() int { + // This means there is no game and abnormal closure is happening + // which means this session is invalid and should end + game, err := s.GameManager.FindGame(s.GameUuid) + if err != nil { + return ConnLoopCodeBreak + } + + otherPlayer := game.GetOtherPlayer(s.Player) + if otherPlayer == nil { + return ConnLoopCodeBreak + } + + // Absence of otherPlayer session means this game is invalid + otherSession, err := s.SessionManager.FindSession(otherPlayer.SessionID) + if err != nil { + return ConnLoopCodeBreak + } + + if err := otherSession.Conn.WriteJSON(mc.NewMessage[mc.NoPayload](mc.CodeOtherPlayerGracePeriod)); err != nil { + // If other player connection is disrupted as well, then end the session + return ConnLoopCodeBreak + } + + log.Printf("starting grace period for %s\n", s.ID) + timer := time.NewTimer(gracePeriod) + + select { + case <-timer.C: + if otherSession != nil { + _ = otherSession.Conn.WriteJSON(mc.NewMessage[mc.NoPayload](mc.CodeOtherPlayerDisconnected)) + } + log.Printf("session terminated: %s\n", s.ID) + return ConnLoopCodeBreak + + case <-s.StopRetry: + if otherSession != nil { + _ = otherSession.Conn.WriteJSON(mc.NewMessage[mc.NoPayload](mc.CodeOtherPlayerReconnected)) + } + log.Printf("player reconnected, session: %s\n", s.ID) + return ConnLoopCodeContinue + } +} + +// Handles the errors that occurs when reading from +// ws connection. `ConnLoopCodeContinue` will results in +// terminating the session and removing `run` from stack +func (s *Session) handleReadErr(err error) int { + retries := 0 + + switch IdentifyWsConnErrAction(err) { + case ConnLoopAbnormalClosureRetry: + switch s.handleAbnormalClosure() { + case ConnLoopCodeBreak: + return ConnLoopCodeBreak + case ConnLoopCodeContinue: + } + + case ConnLoopCodeRetry: + if retries < maxWriteWsRetries { + retries++ + log.Printf("failed to read from ws conn [%s]; retrying... (retry no. %d)\n", s.Conn.RemoteAddr().String(), retries) + time.Sleep(time.Duration(retries*backOffFactor) * time.Second) + + } else { + return ConnLoopCodeBreak + + } + + case ConnLoopCodeBreak: + log.Printf("break ws conn loop [%s] due to: %s\n", s.Conn.RemoteAddr().String(), err) + return ConnLoopCodeBreak + + case ConnLoopCodeContinue: + } + + return ConnLoopCodeContinue +} diff --git a/models/connection/message.go b/models/connection/message.go index 8e279a2..42d5f7e 100644 --- a/models/connection/message.go +++ b/models/connection/message.go @@ -18,3 +18,7 @@ func (m *Message[T]) AddPayload(payload T) { func (m *Message[T]) AddError(errorDetails, message string) { m.Error = NewRespErr(errorDetails, message) } + +type PlayerInteraction struct { + Content string `json:"content"` +} diff --git a/models/connection/signal.go b/models/connection/signal.go index 1f9f6c4..0f65a90 100644 --- a/models/connection/signal.go +++ b/models/connection/signal.go @@ -28,6 +28,9 @@ const ( CodeRematchCallAccepted CodeRematchCallRejected CodeRematch + + // Players can send template texts and emojis to each other + CodePlayerInteraction ) type Signal struct { From 7bb8244cbee1a475e32e00597cccf057464f1919 Mon Sep 17 00:00:00 2001 From: Saeid Alizadeh Date: Mon, 8 Jul 2024 13:39:41 -0600 Subject: [PATCH 2/6] wrote unit tests for player interaction --- test/ws_test.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/test/ws_test.go b/test/ws_test.go index 619284d..d54abe7 100644 --- a/test/ws_test.go +++ b/test/ws_test.go @@ -281,6 +281,51 @@ func TestReadyGame(t *testing.T) { } } +func TestPlayerInteraction(t *testing.T) { + msg := mc.NewMessage[mc.PlayerInteraction](mc.CodePlayerInteraction) + + tests := []Test[mc.Message[mc.PlayerInteraction], mc.Message[mc.PlayerInteraction]]{ + { + name: "successful msg host to join", + expectedCode: mc.CodePlayerInteraction, + reqPayload: msg, + respPayload: mc.Message[mc.PlayerInteraction]{}, + conn: HostConn, + otherConn: JoinConn, + }, + { + name: "successful msg join to host", + expectedCode: mc.CodePlayerInteraction, + reqPayload: msg, + respPayload: mc.Message[mc.PlayerInteraction]{}, + conn: JoinConn, + otherConn: HostConn, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // Client writes to its own connection + if err := test.conn.WriteJSON(test.reqPayload); err != nil { + t.Fatal(err) + } + + // Server writes it to ther other connection + if err := test.otherConn.ReadJSON(&test.respPayload); err != nil { + t.Fatal(err) + } + + if test.respPayload.Code != test.expectedCode { + t.Fatalf("expected status: %d\t got: %d", test.expectedCode, test.respPayload.Code) + } + + if !reflect.DeepEqual(test.reqPayload, test.respPayload) { + t.Fatalf("expected resp payload: %+v\n got: %+v", test.expectedRespPayload, test.respPayload) + } + }) + } +} + func TestAttack(t *testing.T) { tests := []Test[mc.Message[mc.ReqAttack], mc.Message[mc.RespAttack]]{ { @@ -754,6 +799,7 @@ func TestAttack(t *testing.T) { } if test.respPayload.Error != nil { + log.Printf("%+v\n", test.respPayload.Error) if test.respPayload.Error.ErrorDetails != test.expectedErr { t.Fatalf("expected error: %s\t got: %s", test.expectedErr, test.respPayload.Error.ErrorDetails) } From 93b95ab20ef66d920e0c57465651371abab1a0b0 Mon Sep 17 00:00:00 2001 From: Saeid Alizadeh Date: Mon, 8 Jul 2024 13:40:11 -0600 Subject: [PATCH 3/6] modified notifying the other player based on message type --- api/sessions_api.go | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/api/sessions_api.go b/api/sessions_api.go index 03eced5..91938fe 100644 --- a/api/sessions_api.go +++ b/api/sessions_api.go @@ -84,7 +84,7 @@ sessionLoop: // defender turn is set to true resp.Payload.IsTurn = true - s.notifyOtherSession(defender.SessionID, resp) + s.notifyOtherSession(defender.SessionID, resp, TypeSessionMessageJSON) if defender.MatchStatus == mb.PlayerMatchStatusLost { respAttacker := mc.NewMessage[mc.RespEndGame](mc.CodeEndGame) @@ -95,7 +95,7 @@ sessionLoop: respDefender := mc.NewMessage[mc.RespEndGame](mc.CodeEndGame) respDefender.AddPayload(mc.RespEndGame{PlayerMatchStatus: mb.PlayerMatchStatusLost}) - s.notifyOtherSession(defender.SessionID, respDefender) + s.notifyOtherSession(defender.SessionID, respDefender, TypeSessionMessageJSON) } case mc.CodeReady: @@ -116,7 +116,7 @@ sessionLoop: } otherPlayer := game.GetOtherPlayer(s.Player) - s.notifyOtherSession(otherPlayer.SessionID, respStartGame) + s.notifyOtherSession(otherPlayer.SessionID, respStartGame, TypeSessionMessageJSON) } case mc.CodeJoinGame: @@ -134,7 +134,7 @@ sessionLoop: if s.writeToConn(readyResp) == ConnLoopCodeBreak { break sessionLoop } - s.notifyOtherSession(game.HostPlayer.SessionID, readyResp) + s.notifyOtherSession(game.HostPlayer.SessionID, readyResp, TypeSessionMessageJSON) case mc.CodeRematchCall: // 1. See if the game still exists @@ -157,7 +157,7 @@ sessionLoop: s.Player.IsTurn = true // Notify the other player if they want a rematch msg := mc.NewMessage[mc.NoPayload](mc.CodeRematchCall) - s.notifyOtherSession(otherPlayer.SessionID, msg) + s.notifyOtherSession(otherPlayer.SessionID, msg, TypeSessionMessageJSON) case mc.CodeRematchCallAccepted: // Send the rematch call acceptance to other player @@ -177,7 +177,7 @@ sessionLoop: break sessionLoop } msgOtherPlayer.AddPayload(mc.RespRematch{IsTurn: otherPlayer.IsTurn}) - s.notifyOtherSession(otherPlayer.SessionID, msgOtherPlayer) + s.notifyOtherSession(otherPlayer.SessionID, msgOtherPlayer, TypeSessionMessageJSON) s.Player.IsTurn = false msgPlayer := mc.NewMessage[mc.RespRematch](mc.CodeRematch) @@ -198,7 +198,7 @@ sessionLoop: msg := mc.NewMessage[mc.NoPayload](mc.CodeRematchCallRejected) otherPlayer := game.GetOtherPlayer(s.Player) if otherPlayer != nil { - s.notifyOtherSession(otherPlayer.SessionID, msg) + s.notifyOtherSession(otherPlayer.SessionID, msg, TypeSessionMessageJSON) } break sessionLoop @@ -209,10 +209,13 @@ sessionLoop: break sessionLoop } otherPlayer := game.GetOtherPlayer(s.Player) - if otherPlayer != nil { - s.notifyOtherSession(otherPlayer.SessionID, payload) + if otherPlayer == nil { + break sessionLoop } + s.notifyOtherSession(otherPlayer.SessionID, payload, TypeSessionMessageBytes) + continue sessionLoop + default: respInvalidSignal := mc.NewMessage[mc.NoPayload](mc.CodeInvalidSignal) respInvalidSignal.AddError("", "invalid code in the incoming payload") @@ -224,8 +227,13 @@ sessionLoop: } // This is to send a message to the other session. -func (s *Session) notifyOtherSession(otherSessionId string, msg interface{}) { - s.SessionManager.CommunicationChan <- NewSessionMessage(s, otherSessionId, s.GameUuid, msg) +func (s *Session) notifyOtherSession(otherSessionId string, msg interface{}, payloadType int8) { + switch payloadType { + case TypeSessionMessageJSON: + s.SessionManager.CommunicationChan <- NewSessionMessageJSON(otherSessionId, s.GameUuid, msg) + case TypeSessionMessageBytes: + s.SessionManager.CommunicationChan <- NewSessionMessageBytes(otherSessionId, s.GameUuid, msg) + } } // This will delete player from the game players map From 567760702621a5bcd170022555d1cfef47daaf21 Mon Sep 17 00:00:00 2001 From: Saeid Alizadeh Date: Tue, 16 Jul 2024 10:36:14 -0600 Subject: [PATCH 4/6] deleted extra files --- api/session_communication_api.go | 31 ++++++++ api/session_manager_api.go | 129 +++++++++++++++++++++++++++++++ 2 files changed, 160 insertions(+) create mode 100644 api/session_communication_api.go create mode 100644 api/session_manager_api.go diff --git a/api/session_communication_api.go b/api/session_communication_api.go new file mode 100644 index 0000000..a9ca186 --- /dev/null +++ b/api/session_communication_api.go @@ -0,0 +1,31 @@ +package api + +const ( + TypeSessionMessageBytes int8 = iota + TypeSessionMessageJSON +) + +type SessionMessage struct { + PayloadType int8 + ReceiverID string + GameUuid string + Payload interface{} +} + +func NewSessionMessageJSON(receiverId string, gameUuid string, p interface{}) SessionMessage { + return SessionMessage{ + PayloadType: TypeSessionMessageJSON, + ReceiverID: receiverId, + GameUuid: gameUuid, + Payload: p, + } +} + +func NewSessionMessageBytes(receiverId string, gameUuid string, p interface{}) SessionMessage { + return SessionMessage{ + PayloadType: TypeSessionMessageBytes, + ReceiverID: receiverId, + GameUuid: gameUuid, + Payload: p, + } +} diff --git a/api/session_manager_api.go b/api/session_manager_api.go new file mode 100644 index 0000000..e07388c --- /dev/null +++ b/api/session_manager_api.go @@ -0,0 +1,129 @@ +package api + +import ( + "log" + "sync" + "time" + + "github.com/gorilla/websocket" + cerr "github.com/saeidalz13/battleship-backend/internal/error" +) + +const ( + // Assuming this capacity for the slice when + // we're cleaning up the sessions map. + assumedClosedConns = 5 + cleanupInterval time.Duration = time.Minute * 20 +) + +type SessionManager struct { + Sessions map[string]*Session + CommunicationChan chan SessionMessage + mu sync.RWMutex +} + +func NewSessionManager() *SessionManager { + return &SessionManager{ + Sessions: make(map[string]*Session), + CommunicationChan: make(chan SessionMessage), + } +} + +func (sm *SessionManager) FindSession(sessionId string) (*Session, error) { + sm.mu.RLock() + defer sm.mu.RUnlock() + + session, prs := sm.Sessions[sessionId] + if !prs { + return nil, cerr.ErrSessionNotFound(sessionId) + } + + if session == nil { + return nil, cerr.ErrSessionIsNil(sessionId) + } + + return session, nil +} + +func (sm *SessionManager) DeleteSession(sessionId string) { + sm.mu.Lock() + delete(sm.Sessions, sessionId) + log.Printf("session deleted: %s", sessionId) + sm.mu.Unlock() +} + +// Function to faciliate the communication between +// two sessions through a channel +func (sm *SessionManager) ManageCommunication() { + for { + msg := <-sm.CommunicationChan + + sm.mu.Lock() + receiverSession, prs := sm.Sessions[msg.ReceiverID] + if !prs { + // It should never be the case that the other session + // is not found. The sender session should terminate + // msg.SenderSession.terminate() + continue + } + + if receiverSession.GameUuid != msg.GameUuid { + panic("receiver session msg game is not the same as game uuid; this error should never happen") + } + + switch msg.PayloadType { + case TypeSessionMessageJSON: + switch WriteJSONWithRetry(receiverSession.Conn, msg.Payload) { + case ConnLoopAbnormalClosureRetry: + switch receiverSession.handleAbnormalClosure() { + case ConnLoopCodeBreak: + receiverSession.terminate() + + case ConnLoopCodeContinue: + } + + case ConnLoopCodeBreak: + receiverSession.terminate() + + case ConnLoopCodePassThrough: + } + + case TypeSessionMessageBytes: + msgByte, ok := msg.Payload.([]byte) + if ok { + receiverSession.Conn.WriteMessage(websocket.TextMessage, msgByte) + } + + default: + log.Println("invalid message type for intersession communication") + continue + } + + sm.mu.Unlock() + } +} + +// To ensure that there is no dangling connections, +// server session manager marks the connections with a +// lifetime of more than 20 mins as stale and deletes them. +func (sm *SessionManager) CleanUpPeriodically() { + for { + time.Sleep(cleanupInterval) + + sm.mu.Lock() + toDelete := make([]string, 0, assumedClosedConns) + + for ID, session := range sm.Sessions { + if time.Since(session.CreatedAt) > cleanupInterval { + toDelete = append(toDelete, ID) + } + } + + log.Println("Clean up sessions:") + for _, ID := range toDelete { + delete(sm.Sessions, ID) + log.Printf("removed: %s", ID) + } + sm.mu.Unlock() + } +} From 10b8a468d343cff3834123fe5fbe59a7eed1f1a7 Mon Sep 17 00:00:00 2001 From: Saeid Alizadeh Date: Tue, 16 Jul 2024 14:23:15 -0600 Subject: [PATCH 5/6] rebased with restructure and modified tests --- api/request_processor.go | 5 + api/session_communication_api.go | 31 --- api/session_manager_api.go | 129 ------------ api/sessions_api.go | 348 ------------------------------- test/ws_test.go | 2 +- 5 files changed, 6 insertions(+), 509 deletions(-) delete mode 100644 api/session_communication_api.go delete mode 100644 api/session_manager_api.go delete mode 100644 api/sessions_api.go diff --git a/api/request_processor.go b/api/request_processor.go index 9ec116b..7b6e320 100644 --- a/api/request_processor.go +++ b/api/request_processor.go @@ -333,6 +333,11 @@ sessionLoop: rp.sessionManager.Communicate(sessionId, receiverSessionId, msg, mc.MessageTypeJSON) break sessionLoop + case mc.CodePlayerInteraction: + if err := rp.sessionManager.Communicate(sessionId, receiverSessionId, payload, mc.MessageTypeBytes); err != nil { + break sessionLoop + } + default: respInvalidSignal := mc.NewMessage[mc.NoPayload](mc.CodeInvalidSignal) respInvalidSignal.AddError("", "invalid code in the incoming payload") diff --git a/api/session_communication_api.go b/api/session_communication_api.go deleted file mode 100644 index a9ca186..0000000 --- a/api/session_communication_api.go +++ /dev/null @@ -1,31 +0,0 @@ -package api - -const ( - TypeSessionMessageBytes int8 = iota - TypeSessionMessageJSON -) - -type SessionMessage struct { - PayloadType int8 - ReceiverID string - GameUuid string - Payload interface{} -} - -func NewSessionMessageJSON(receiverId string, gameUuid string, p interface{}) SessionMessage { - return SessionMessage{ - PayloadType: TypeSessionMessageJSON, - ReceiverID: receiverId, - GameUuid: gameUuid, - Payload: p, - } -} - -func NewSessionMessageBytes(receiverId string, gameUuid string, p interface{}) SessionMessage { - return SessionMessage{ - PayloadType: TypeSessionMessageBytes, - ReceiverID: receiverId, - GameUuid: gameUuid, - Payload: p, - } -} diff --git a/api/session_manager_api.go b/api/session_manager_api.go deleted file mode 100644 index e07388c..0000000 --- a/api/session_manager_api.go +++ /dev/null @@ -1,129 +0,0 @@ -package api - -import ( - "log" - "sync" - "time" - - "github.com/gorilla/websocket" - cerr "github.com/saeidalz13/battleship-backend/internal/error" -) - -const ( - // Assuming this capacity for the slice when - // we're cleaning up the sessions map. - assumedClosedConns = 5 - cleanupInterval time.Duration = time.Minute * 20 -) - -type SessionManager struct { - Sessions map[string]*Session - CommunicationChan chan SessionMessage - mu sync.RWMutex -} - -func NewSessionManager() *SessionManager { - return &SessionManager{ - Sessions: make(map[string]*Session), - CommunicationChan: make(chan SessionMessage), - } -} - -func (sm *SessionManager) FindSession(sessionId string) (*Session, error) { - sm.mu.RLock() - defer sm.mu.RUnlock() - - session, prs := sm.Sessions[sessionId] - if !prs { - return nil, cerr.ErrSessionNotFound(sessionId) - } - - if session == nil { - return nil, cerr.ErrSessionIsNil(sessionId) - } - - return session, nil -} - -func (sm *SessionManager) DeleteSession(sessionId string) { - sm.mu.Lock() - delete(sm.Sessions, sessionId) - log.Printf("session deleted: %s", sessionId) - sm.mu.Unlock() -} - -// Function to faciliate the communication between -// two sessions through a channel -func (sm *SessionManager) ManageCommunication() { - for { - msg := <-sm.CommunicationChan - - sm.mu.Lock() - receiverSession, prs := sm.Sessions[msg.ReceiverID] - if !prs { - // It should never be the case that the other session - // is not found. The sender session should terminate - // msg.SenderSession.terminate() - continue - } - - if receiverSession.GameUuid != msg.GameUuid { - panic("receiver session msg game is not the same as game uuid; this error should never happen") - } - - switch msg.PayloadType { - case TypeSessionMessageJSON: - switch WriteJSONWithRetry(receiverSession.Conn, msg.Payload) { - case ConnLoopAbnormalClosureRetry: - switch receiverSession.handleAbnormalClosure() { - case ConnLoopCodeBreak: - receiverSession.terminate() - - case ConnLoopCodeContinue: - } - - case ConnLoopCodeBreak: - receiverSession.terminate() - - case ConnLoopCodePassThrough: - } - - case TypeSessionMessageBytes: - msgByte, ok := msg.Payload.([]byte) - if ok { - receiverSession.Conn.WriteMessage(websocket.TextMessage, msgByte) - } - - default: - log.Println("invalid message type for intersession communication") - continue - } - - sm.mu.Unlock() - } -} - -// To ensure that there is no dangling connections, -// server session manager marks the connections with a -// lifetime of more than 20 mins as stale and deletes them. -func (sm *SessionManager) CleanUpPeriodically() { - for { - time.Sleep(cleanupInterval) - - sm.mu.Lock() - toDelete := make([]string, 0, assumedClosedConns) - - for ID, session := range sm.Sessions { - if time.Since(session.CreatedAt) > cleanupInterval { - toDelete = append(toDelete, ID) - } - } - - log.Println("Clean up sessions:") - for _, ID := range toDelete { - delete(sm.Sessions, ID) - log.Printf("removed: %s", ID) - } - sm.mu.Unlock() - } -} diff --git a/api/sessions_api.go b/api/sessions_api.go deleted file mode 100644 index 91938fe..0000000 --- a/api/sessions_api.go +++ /dev/null @@ -1,348 +0,0 @@ -package api - -import ( - "encoding/json" - "log" - "time" - - "github.com/gorilla/websocket" - mb "github.com/saeidalz13/battleship-backend/models/battleship" - mc "github.com/saeidalz13/battleship-backend/models/connection" -) - -const ( - gracePeriod time.Duration = time.Minute * 2 -) - -type Session struct { - ID string - Conn *websocket.Conn - GameUuid string - Player *mb.Player - StopRetry chan struct{} - GameManager *GameManager - SessionManager *SessionManager - CreatedAt time.Time -} - -func NewSession(conn *websocket.Conn, sessionID string, gameManager *GameManager, sessionManager *SessionManager) *Session { - return &Session{ - ID: sessionID, - Conn: conn, - StopRetry: make(chan struct{}), - GameManager: gameManager, - SessionManager: sessionManager, - CreatedAt: time.Now(), - } -} - -func (s *Session) run() { - defer s.terminate() - -sessionLoop: - for { - // A WebSocket frame can be one of 6 types: text=1, binary=2, ping=9, pong=10, close=8 and continuation=0 - // https://www.rfc-editor.org/rfc/rfc6455.html#section-11.8 - _, payload, err := s.Conn.ReadMessage() - if err != nil { - if s.handleReadErr(err) == ConnLoopCodeBreak { - break sessionLoop - } - } - var signal mc.Signal - if err := json.Unmarshal(payload, &signal); err != nil { - log.Println("incoming msg does not contain 'code':", err) - resp := mc.NewMessage[mc.NoPayload](mc.CodeSignalAbsent) - resp.AddError("incoming req payload must contain 'code' field", "") - - if s.writeToConn(resp) == ConnLoopCodeBreak { - break sessionLoop - } - continue sessionLoop - } - - switch signal.Code { - case mc.CodeCreateGame: - req := NewRequest(s, payload) - resp := req.HandleCreateGame() - - if s.writeToConn(resp) == ConnLoopCodeBreak { - break sessionLoop - } - - case mc.CodeAttack: - req := NewRequest(s, payload) - // response will have the IsTurn as false field of attacker - resp, defender := req.HandleAttack() - - if s.writeToConn(resp) == ConnLoopCodeBreak { - break sessionLoop - } - if resp.Error != nil { - continue sessionLoop - } - - // defender turn is set to true - resp.Payload.IsTurn = true - s.notifyOtherSession(defender.SessionID, resp, TypeSessionMessageJSON) - - if defender.MatchStatus == mb.PlayerMatchStatusLost { - respAttacker := mc.NewMessage[mc.RespEndGame](mc.CodeEndGame) - respAttacker.AddPayload(mc.RespEndGame{PlayerMatchStatus: mb.PlayerMatchStatusWon}) - if s.writeToConn(respAttacker) == ConnLoopCodeBreak { - break sessionLoop - } - - respDefender := mc.NewMessage[mc.RespEndGame](mc.CodeEndGame) - respDefender.AddPayload(mc.RespEndGame{PlayerMatchStatus: mb.PlayerMatchStatusLost}) - s.notifyOtherSession(defender.SessionID, respDefender, TypeSessionMessageJSON) - } - - case mc.CodeReady: - req := NewRequest(s, payload) - resp, game := req.HandleReadyPlayer() - - if s.writeToConn(resp) == ConnLoopCodeBreak { - break sessionLoop - } - if resp.Error != nil { - continue sessionLoop - } - - if game.HostPlayer.IsReady && game.JoinPlayer.IsReady { - respStartGame := mc.NewMessage[mc.NoPayload](mc.CodeStartGame) - if s.writeToConn(respStartGame) == ConnLoopCodeBreak { - break sessionLoop - } - - otherPlayer := game.GetOtherPlayer(s.Player) - s.notifyOtherSession(otherPlayer.SessionID, respStartGame, TypeSessionMessageJSON) - } - - case mc.CodeJoinGame: - req := NewRequest(s, payload) - resp, game := req.HandleJoinPlayer() - - if s.writeToConn(resp) == ConnLoopCodeBreak { - break sessionLoop - } - if resp.Error != nil { - break sessionLoop - } - - readyResp := mc.NewMessage[mc.NoPayload](mc.CodeSelectGrid) - if s.writeToConn(readyResp) == ConnLoopCodeBreak { - break sessionLoop - } - s.notifyOtherSession(game.HostPlayer.SessionID, readyResp, TypeSessionMessageJSON) - - case mc.CodeRematchCall: - // 1. See if the game still exists - game, err := s.GameManager.FindGame(s.GameUuid) - if err != nil { - break sessionLoop - } - - if game.IsRematchAlreadyCalled() { - continue sessionLoop - } - - game.CallRematch() - - otherPlayer := game.GetOtherPlayer(s.Player) - if otherPlayer == nil { - break sessionLoop - } - - s.Player.IsTurn = true - // Notify the other player if they want a rematch - msg := mc.NewMessage[mc.NoPayload](mc.CodeRematchCall) - s.notifyOtherSession(otherPlayer.SessionID, msg, TypeSessionMessageJSON) - - case mc.CodeRematchCallAccepted: - // Send the rematch call acceptance to other player - game, err := s.GameManager.FindGame(s.GameUuid) - if err != nil { - break sessionLoop - } - - if err := game.Reset(); err != nil { - break sessionLoop - } - - // Notify the other player with their turn - msgOtherPlayer := mc.NewMessage[mc.RespRematch](mc.CodeRematch) - otherPlayer := game.GetOtherPlayer(s.Player) - if otherPlayer == nil { - break sessionLoop - } - msgOtherPlayer.AddPayload(mc.RespRematch{IsTurn: otherPlayer.IsTurn}) - s.notifyOtherSession(otherPlayer.SessionID, msgOtherPlayer, TypeSessionMessageJSON) - - s.Player.IsTurn = false - msgPlayer := mc.NewMessage[mc.RespRematch](mc.CodeRematch) - msgPlayer.AddPayload(mc.RespRematch{IsTurn: s.Player.IsTurn}) - - // Notify the acceptor with their turn - if s.writeToConn(msgPlayer) == ConnLoopCodeBreak { - break sessionLoop - } - - case mc.CodeRematchCallRejected: - game, err := s.GameManager.FindGame(s.GameUuid) - if err != nil { - break sessionLoop - } - - // Notify the other player that no rematch is wanted now - msg := mc.NewMessage[mc.NoPayload](mc.CodeRematchCallRejected) - otherPlayer := game.GetOtherPlayer(s.Player) - if otherPlayer != nil { - s.notifyOtherSession(otherPlayer.SessionID, msg, TypeSessionMessageJSON) - } - - break sessionLoop - - case mc.CodePlayerInteraction: - game, err := s.GameManager.FindGame(s.GameUuid) - if err != nil { - break sessionLoop - } - otherPlayer := game.GetOtherPlayer(s.Player) - if otherPlayer == nil { - break sessionLoop - } - - s.notifyOtherSession(otherPlayer.SessionID, payload, TypeSessionMessageBytes) - continue sessionLoop - - default: - respInvalidSignal := mc.NewMessage[mc.NoPayload](mc.CodeInvalidSignal) - respInvalidSignal.AddError("", "invalid code in the incoming payload") - if s.writeToConn(respInvalidSignal) == ConnLoopCodeBreak { - break sessionLoop - } - } - } -} - -// This is to send a message to the other session. -func (s *Session) notifyOtherSession(otherSessionId string, msg interface{}, payloadType int8) { - switch payloadType { - case TypeSessionMessageJSON: - s.SessionManager.CommunicationChan <- NewSessionMessageJSON(otherSessionId, s.GameUuid, msg) - case TypeSessionMessageBytes: - s.SessionManager.CommunicationChan <- NewSessionMessageBytes(otherSessionId, s.GameUuid, msg) - } -} - -// This will delete player from the game players map -// and delete the player session -func (s *Session) terminate() { - if s.Player != nil { - s.GameManager.DeletePlayerFromGame(s.GameUuid, s.Player.Uuid) - } - s.SessionManager.DeleteSession(s.ID) -} - -// Writes to the connection of that session. It also -// handles the abnormal or other types of errors of -// writing to a websocket connection. -func (s *Session) writeToConn(p interface{}) int { - switch WriteJSONWithRetry(s.Conn, p) { - case ConnLoopAbnormalClosureRetry: - switch s.handleAbnormalClosure() { - case ConnLoopCodeBreak: - return ConnLoopCodeBreak - - case ConnLoopCodeContinue: - } - case ConnLoopCodeBreak: - return ConnLoopCodeBreak - default: - } - - return ConnLoopCodeContinue -} - -// This function takes care of abnormal closures happening -// to either of the clients. This happens due to backgrounding -// in IOS clients or any other unexpected reasons for web apps. -func (s *Session) handleAbnormalClosure() int { - // This means there is no game and abnormal closure is happening - // which means this session is invalid and should end - game, err := s.GameManager.FindGame(s.GameUuid) - if err != nil { - return ConnLoopCodeBreak - } - - otherPlayer := game.GetOtherPlayer(s.Player) - if otherPlayer == nil { - return ConnLoopCodeBreak - } - - // Absence of otherPlayer session means this game is invalid - otherSession, err := s.SessionManager.FindSession(otherPlayer.SessionID) - if err != nil { - return ConnLoopCodeBreak - } - - if err := otherSession.Conn.WriteJSON(mc.NewMessage[mc.NoPayload](mc.CodeOtherPlayerGracePeriod)); err != nil { - // If other player connection is disrupted as well, then end the session - return ConnLoopCodeBreak - } - - log.Printf("starting grace period for %s\n", s.ID) - timer := time.NewTimer(gracePeriod) - - select { - case <-timer.C: - if otherSession != nil { - _ = otherSession.Conn.WriteJSON(mc.NewMessage[mc.NoPayload](mc.CodeOtherPlayerDisconnected)) - } - log.Printf("session terminated: %s\n", s.ID) - return ConnLoopCodeBreak - - case <-s.StopRetry: - if otherSession != nil { - _ = otherSession.Conn.WriteJSON(mc.NewMessage[mc.NoPayload](mc.CodeOtherPlayerReconnected)) - } - log.Printf("player reconnected, session: %s\n", s.ID) - return ConnLoopCodeContinue - } -} - -// Handles the errors that occurs when reading from -// ws connection. `ConnLoopCodeContinue` will results in -// terminating the session and removing `run` from stack -func (s *Session) handleReadErr(err error) int { - retries := 0 - - switch IdentifyWsConnErrAction(err) { - case ConnLoopAbnormalClosureRetry: - switch s.handleAbnormalClosure() { - case ConnLoopCodeBreak: - return ConnLoopCodeBreak - case ConnLoopCodeContinue: - } - - case ConnLoopCodeRetry: - if retries < maxWriteWsRetries { - retries++ - log.Printf("failed to read from ws conn [%s]; retrying... (retry no. %d)\n", s.Conn.RemoteAddr().String(), retries) - time.Sleep(time.Duration(retries*backOffFactor) * time.Second) - - } else { - return ConnLoopCodeBreak - - } - - case ConnLoopCodeBreak: - log.Printf("break ws conn loop [%s] due to: %s\n", s.Conn.RemoteAddr().String(), err) - return ConnLoopCodeBreak - - case ConnLoopCodeContinue: - } - - return ConnLoopCodeContinue -} diff --git a/test/ws_test.go b/test/ws_test.go index d54abe7..e9707ef 100644 --- a/test/ws_test.go +++ b/test/ws_test.go @@ -283,6 +283,7 @@ func TestReadyGame(t *testing.T) { func TestPlayerInteraction(t *testing.T) { msg := mc.NewMessage[mc.PlayerInteraction](mc.CodePlayerInteraction) + msg.AddPayload(mc.PlayerInteraction{Content: "salam!"}) tests := []Test[mc.Message[mc.PlayerInteraction], mc.Message[mc.PlayerInteraction]]{ { @@ -799,7 +800,6 @@ func TestAttack(t *testing.T) { } if test.respPayload.Error != nil { - log.Printf("%+v\n", test.respPayload.Error) if test.respPayload.Error.ErrorDetails != test.expectedErr { t.Fatalf("expected error: %s\t got: %s", test.expectedErr, test.respPayload.Error.ErrorDetails) } From afdb9df25623e9d76b2acad2d8b1734bec2a2eaa Mon Sep 17 00:00:00 2001 From: Saeid Alizadeh Date: Tue, 16 Jul 2024 18:06:57 -0600 Subject: [PATCH 6/6] implemented graceful shutdown for server --- cmd/main.go | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 0f767df..4341633 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,9 +1,13 @@ package main import ( + "context" "log" "net/http" "os" + "os/signal" + "syscall" + "time" "github.com/joho/godotenv" "github.com/saeidalz13/battleship-backend/api" @@ -42,6 +46,23 @@ func main() { mux := http.NewServeMux() mux.Handle("GET /battleship", requestProcessor) - log.Printf("Listening to port %s\n", port) - log.Fatalln(http.ListenAndServe("0.0.0.0:"+port, mux)) + s := &http.Server{ + Addr: ":" + port, + Handler: mux, + } + + go func() { + log.Printf("Listening to port %s\n", port) + log.Fatalln(s.ListenAndServe()) + }() + + sigChan := make(chan os.Signal, 1) + + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + sig := <-sigChan + log.Println("Server termination signal from OS, graceful shutdown\treason:", sig) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + s.Shutdown(ctx) }