-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbroker.py
More file actions
168 lines (129 loc) · 5.03 KB
/
broker.py
File metadata and controls
168 lines (129 loc) · 5.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import socket
from time import sleep
class Producer:
def __init__(self, prod_id):
self.prod_id = prod_id
self.subs_list = []
class Subscriber:
def __init__(self, sub_id, sub_address_and_port):
self.sub_id = sub_id
self.sub_address_and_port = sub_address_and_port
self.subscribed_to_list = []
# Broker details
broker_address_and_port = ("0.0.0.0", 50010)
broker_ip = "0.0.0.0"
broker_port = 50010
# Subscriber details
sub1_address_and_port = ("subscriber", 50020)
sub1_ip = "subscriber"
sub1_port = 50020
sub2_address_and_port = ("subscriber2", 50021)
sub2_ip = "subscriber2"
sub2_port = 50021
prod1 = Producer("P01")
prod2 = Producer("P02")
producers_list: [Producer] = [prod1, prod2]
# sub1 = Subscriber("S01")
# sub2 = Subscriber("S02")
subscribers_list: [Subscriber] = []
# Buffer Size
bufferSize = 50000
# Create datagram sockets
broker_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
# Bind to address and ip
broker_socket.bind((broker_ip, broker_port))
print("Broker up and running.")
def subscribe(prod_id, sub_id, sub_address_and_port):
"""
Lets a consumer subscribe to a producer
returns a code that will be interpreted by function caller
"""
# TODO implement subscribe + error checking for sub_id / prod_id
# Check that subscriber is in our list, if not, add it
sub_found = False
for sub in subscribers_list:
if sub.sub_id == sub_id:
sub_found = True
break
if sub_found == False:
sub = Subscriber(sub_id, sub_address_and_port)
subscribers_list.append(sub)
# Check that producer id is correct
prod_found = False
for prod in producers_list:
if prod.prod_id == prod_id:
prod_found = True
prod.subs_list.append(sub)
sub.subscribed_to_list.append(prod)
break
if prod_found == False:
print("Subscribe failed, no producer found.")
return -1
print(sub_id + " has successfully subscribed to " + prod_id)
# Send ack
msg = "09A" + prod_id + sub_id
sleep(0.5)
broker_socket.sendto(str.encode(msg), sub_address_and_port)
def unsubscribe(prod_id, sub_id, sub_address_and_port):
# TODO implement unsubscribe + error checking for sub_id / prod_id
# TODO I have to check sub_id exists, prod_id exists, and that sub is subscribed to prod, and prod has sub as a subscriber
# Check sub exists in our list
sub_found = False
for sub in subscribers_list:
# print(sub.sub_id + " " + sub_id)
if sub.sub_id == sub_id:
sub_found = True
break
if not sub_found:
print("sub not found!")
return -1 # Error code ?
# Check prod exists in our list
prod_found = False
for prod in producers_list:
if prod.prod_id == prod_id:
prod_found = True
break
if not prod_found:
print("prod not found!")
return -2 # Error code ?
# Check subscribed
subscribed = False
if prod in sub.subscribed_to_list:
prod.subs_list.remove(sub)
sub.subscribed_to_list.remove(prod)
print(sub_id + " has successfully unsubscribed from " + prod_id)
# Send ack
sleep(0.5)
msg = "09A" + prod_id + sub_id
broker_socket.sendto(str.encode(msg), sub_address_and_port)
while(True):
# breaking up the header and payload
message, address = broker_socket.recvfrom(bufferSize)
header_length = int(message[:2])
header = message[:header_length]
# print(header_length)
packet_request_type = message[:6].decode()[2]
# print("Request type is: " + packet_request_type)
if packet_request_type == "S": #example message = 03SP01S01
message_as_string = message.decode()
# print(message_as_string)
prod_id = message_as_string[header_length:][:3]
# print(prod_id)
sub_id = message_as_string[header_length:][3:6]
subscribe(prod_id, sub_id, address)
elif packet_request_type == "P": # packet
prod_id_last_digit = int(message[:header_length].decode()[header_length-1])
# print(int(prod_id_last_digit))
print("Received packet from " + header[-3:].decode())
if len(producers_list[prod_id_last_digit-1].subs_list) != 0:
for subscriber in producers_list[prod_id_last_digit-1].subs_list:
# print("Broker sent msg! ")
broker_socket.sendto(message, subscriber.sub_address_and_port)
else:
print("Received packet from P0" + str(prod_id_last_digit) + " but P0" + str(prod_id_last_digit) + " has no subscribers")
elif packet_request_type == "U":
message_as_string = message.decode()
# print(message_as_string)
prod_id = message_as_string[header_length:][:3]
sub_id = message_as_string[header_length:][3:6]
unsubscribe(prod_id, sub_id, address)