-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodels.go
More file actions
119 lines (103 loc) · 2.58 KB
/
models.go
File metadata and controls
119 lines (103 loc) · 2.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package main
import (
"sync"
"github.com/gorilla/websocket"
)
func init() {
pool = Pool{}
pool.Init()
}
var pool Pool
type Pool struct {
guard sync.Mutex
hubs map[string]*Hub
// Callback func(map[string]interface{}, *Hub)
}
func (self *Pool) Init() {
self.guard = sync.Mutex{}
self.hubs = make(map[string]*Hub)
}
func (self *Pool) Get(name string) *Hub {
self.guard.Lock()
defer self.guard.Unlock()
if _, ok := self.hubs[name]; !ok {
logger.Warnf("Adding socket hub: %v", name)
self.hubs[name] = &Hub{id: name}
self.hubs[name].Init()
}
return self.hubs[name]
}
func (self *Pool) Remove(name string) {
self.guard.Lock()
defer self.guard.Unlock()
if _, ok := self.hubs[name]; ok {
logger.Warnf("Removing socket hub: %v", name)
delete(self.hubs, name)
}
}
type Hub struct {
id string
count int
guard sync.Mutex
clients map[*websocket.Conn]bool
broadcast chan map[string]interface{}
}
func (self *Hub) Init() {
logger.Debugf("Socket hub initializing [%v]", self.id)
self.guard = sync.Mutex{}
self.clients = make(map[*websocket.Conn]bool) // connected clients
self.broadcast = make(chan map[string]interface{}) // broadcast channel
self.broadcaster()
}
func (self *Hub) add(ws *websocket.Conn) {
self.guard.Lock()
self.count++
self.clients[ws] = true
logger.Infof("Connection from %v [%v] (%v clients connected)", ws.RemoteAddr(), self.id, self.count)
self.guard.Unlock()
}
func (self *Hub) remove(ws *websocket.Conn) {
self.guard.Lock()
self.count--
ws.Close()
delete(self.clients, ws)
logger.Infof("Disconnection from %v [%v] (%v clients connected)", ws.RemoteAddr(), self.id, self.count)
if 0 == self.count {
pool.Remove(self.id)
}
self.guard.Unlock()
}
func (self *Hub) Add(ws *websocket.Conn) {
go func() {
defer self.remove(ws)
self.add(ws)
for {
var msg map[string]interface{}
// Read in a new message as JSON and map it to a Message object
err := ws.ReadJSON(&msg)
if err != nil {
logger.Warn(err)
break
}
logger.Debugf("Received message from %v [%v]", ws.RemoteAddr(), self.id)
// Send the newly received message to the broadcast channel
self.broadcast <- msg
}
}()
}
func (self *Hub) broadcaster() {
go func() {
for msg := range self.broadcast {
// Grab the next message from the broadcast channel
logger.Debugf("Broadcasting message to %v clients [%v]", self.count, self.id)
// Send it out to every client that is currently connected
for client := range self.clients {
err := client.WriteJSON(msg)
if err != nil {
logger.Error(err)
self.remove(client)
}
}
}
}()
}