-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.go
More file actions
82 lines (74 loc) · 2.51 KB
/
client.go
File metadata and controls
82 lines (74 loc) · 2.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package msbqclient
import (
"fmt"
"net"
"github.com/francccisss/msbq-client-api/utils"
"github.com/google/uuid"
)
const (
MAX_STREAMS = 3
DEFAULT_READ_SIZE = 50
)
type clientConnection struct {
conn net.Conn
// StreamPool contains an array of channels
// wher each channel corresponds to specific routes
// that they listen to, the "stream" takes in
// data from server maps the route that corresponds to
// a channel
streamPool map[string]*ClientChannel
}
type Connection interface {
CreateChannel() (Channel, error)
Close()
}
func Connect(address string) (Connection, error) {
conn, err := net.Dial("tcp", address) // need to change this
if err != nil {
fmt.Println(err.Error())
return nil, err
}
newConnection := &clientConnection{
conn: conn,
streamPool: map[string]*ClientChannel{},
}
go mudem(newConnection)
fmt.Printf("NOTIF: Successfully Connected to message broker on %s\n", address)
return newConnection, nil
}
// # Creates a stream and channel
// - A Channel is an abstracted logical connection between two application endpoints
// - A Stream is a representation of data flowing from a tcp socket connection
//
// With multiplexing and demultiplexing the protocol can create multiple streams
// dedicated to specific channels, we can look at them as lightweight tcp connections.
//
// eg: CreateChannel() creates a new channel where it listens to the specific stream
// for incoming data
//
// # How are different streams handled?
//
// Each in the Hashmap holds a pointer to a specific channel on creation where the mudem
// can push messages into it using the channel's channel buffer (the "channel buffer' for IPC not
// to be confused with messaging systems' concept of channels).
//
// The Mudem() is responsible for handling different messages coming from the message broker
// and parses and then uses the STREAM_POOL look up table to push new messages into the specific
// stream that is specified on the StreamID field that is included in every message type
func (c *clientConnection) CreateChannel() (Channel, error) {
newStreamID := uuid.NewString()
ch, exists := c.streamPool[newStreamID]
if !exists {
ch = &ClientChannel{
StreamID: newStreamID,
conn: c.conn,
chanBuff: make(chan utils.EPMessage, 10),
}
c.streamPool[newStreamID] = ch
}
fmt.Printf("NOTIF: New Channel created bound to stream %s\n", newStreamID)
fmt.Printf("NOTIF: ^^ %+v\n", ch)
fmt.Printf("NOTIF: Stream Pool Length %d\n", len(c.streamPool))
return ch, nil
}
func (c clientConnection) Close() {}