-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathchannel.go
More file actions
127 lines (112 loc) · 3.37 KB
/
channel.go
File metadata and controls
127 lines (112 loc) · 3.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package msbqclient
import (
"encoding/json"
"errors"
"fmt"
"io"
"net"
"github.com/francccisss/msbq-client-api/utils"
)
type ClientChannel struct {
StreamID string
BoundTo string
conn net.Conn
chanBuff chan utils.EPMessage
}
type Channel interface {
AssertQueue(Name string, Type string, Durable bool) (string, error)
DeliverMessage(Route string, Message []byte, QueueType string) error
Consume(route string) <-chan utils.EPMessage
CloseChannel()
}
/*
Channel asserts that a message queue exists on the server, which then connects to that
specified message queue based on the route that the queue is bound to
- Name is used to route endpoint messages to the specified queue,
which then pushes messages to consumers that are subscribe to it,
- Type could be 'P2P' (point to point) or 'PUBSUB' (pub-sub) model,
- Durable persists data in the queue
*/
func (ch ClientChannel) AssertQueue(Name string, Type string, Durable bool) (string, error) {
fmt.Println("NOTIF: Asserting a Queue")
// TODO Something wrong with this, server is not able to parse
// message from queue assertion for some reason
q := utils.Queue{
Name: Name,
MessageType: "Queue",
Type: Type,
Durable: Durable,
StreamID: ch.StreamID,
}
b, err := json.Marshal(q)
if err != nil {
fmt.Printf("ERROR: Unable to Marshal Queue Message")
return "", err
}
appQBuff, err := utils.AppendPrefixLength(b)
if err != nil {
fmt.Printf("ERROR: Unable to append queue message prefix length")
return "", err
}
_, err = ch.conn.Write(appQBuff)
if errors.Is(err, io.EOF) {
fmt.Printf("ERROR: connection was closed")
return "", err
}
if err != nil {
fmt.Printf("ERROR: Unable to write to message broker")
return "", err
}
return q.Name, nil
}
/*
Do i need QueueType??
*/
func (ch ClientChannel) DeliverMessage(Route string, Message []byte, QueueType string) error {
defer fmt.Println("NOTIF: Message delivered!")
emsg := utils.EPMessage{
MessageType: "EPMessage",
Route: Route,
Type: QueueType,
Body: Message,
StreamID: ch.StreamID,
}
body, err := json.Marshal(emsg)
if err != nil {
fmt.Println("ERROR: Unable to Marshal EPMessage")
return err
}
appMessBuff, err := utils.AppendPrefixLength(body)
if err != nil {
fmt.Printf("ERROR: Unable to append message prefix length")
return err
}
_, err = ch.conn.Write(appMessBuff)
if errors.Is(err, io.EOF) {
fmt.Printf("ERROR: connection was closed")
return err
}
if err != nil {
fmt.Printf("ERROR: Unable to write to message broker")
return err
}
return nil
}
type Consumer struct {
MessageType string
Route string
}
// Calling consume returns a read-only channel buffer, this channel's channel buffer
// is used by the Mudem to push messages into it by doing a STREAM_POOL look up, since
// each stream in the table is a pointer to a specific channel, the channel will then
// push the incoming messages into the stream which then pushes the message into the
// the channel's channel buffer
func (ch *ClientChannel) Consume(route string) <-chan utils.EPMessage {
fmt.Printf("NOTIF: Consuming from \"%s\" \n", route)
// implement some functionality to listen to a specific route
// a channel can have multiple consumers, which can listen to different routes
// register the channel to listen for specific routes
return ch.chanBuff
}
func (ch ClientChannel) CloseChannel() {
}