-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsend_ttn.py
More file actions
138 lines (123 loc) · 4.07 KB
/
send_ttn.py
File metadata and controls
138 lines (123 loc) · 4.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python3
import sys
from time import sleep
from SX127x.LoRa import *
from SX127x.LoRaArgumentParser import LoRaArgumentParser
from SX127x.board_config import BOARD
import LoRaWAN
from LoRaWAN.MHDR import MHDR
import json,datetime
BOARD.setup()
parser = LoRaArgumentParser("LoRaWAN sender")
class LoRaWANsend(LoRa):
def __init__(self, devaddr = [], nwkey = [], appkey = [], verbose = False):
super(LoRaWANsend, self).__init__(verbose)
def on_tx_done(self):
print("TxDone\n")
# 切換到rx
self.set_mode(MODE.STDBY)
self.clear_irq_flags(TxDone=1)
self.set_mode(MODE.SLEEP)
self.set_dio_mapping([0,0,0,0,0,0])
self.set_invert_iq(1)
self.reset_ptr_rx()
sleep(1)
self.set_mode(MODE.RXSINGLE)
def on_rx_done(self):
global RxDone
RxDone = any([self.get_irq_flags()[s] for s in ['rx_done']])
print("RxDone")
self.clear_irq_flags(RxDone=1)
# 讀取 payload
payload = self.read_payload(nocheck=True)
lorawan = LoRaWAN.new(nwskey, appskey)
lorawan.read(payload)
print("get mic: ",lorawan.get_mic())
print("compute mic: ",lorawan.compute_mic())
print("valid mic: ",lorawan.valid_mic())
# 檢查mic
if lorawan.valid_mic():
print("ACK: ",lorawan.get_mac_payload().get_fhdr().get_fctrl()>>5&0x01)
print("direction: ",lorawan.get_direction())
print("devaddr: ",''.join(format(x, '02x') for x in lorawan.get_devaddr()))
write_config()
else:
print("Wrong MIC")
def send(self):
global fCnt
lorawan = LoRaWAN.new(nwskey, appskey)
message = "HELLO WORLD!"
# 資料打包,fCnt+1
lorawan.create(MHDR.CONF_DATA_UP, {'devaddr': devaddr, 'fcnt': fCnt, 'data': list(map(ord, message)) })
print("fCnt: ",fCnt)
print("Send Message: ",message)
fCnt = fCnt+1
self.write_payload(lorawan.to_raw())
self.set_mode(MODE.TX)
def time_checking(self):
global RxDone
# 檢查是否超時
TIMEOUT = any([self.get_irq_flags()[s] for s in ['rx_timeout']])
if TIMEOUT:
print("TIMEOUT!!")
write_config()
sys.exit(0)
elif RxDone:
print("SUCCESS!!")
sys.exit(0)
def start(self):
self.send()
while True:
self.time_checking()
sleep(1)
def binary_array_to_hex(array):
return ''.join(format(x, '02x') for x in array)
def write_config():
global devaddr,nwskey,appskey,fCnt
config = {'devaddr':binary_array_to_hex(devaddr),'nwskey':binary_array_to_hex(nwskey),'appskey':binary_array_to_hex(appskey),'fCnt':fCnt}
data = json.dumps(config, sort_keys = True, indent = 4, separators=(',', ': '))
fp = open("config.json","w")
fp.write(data)
fp.close()
def read_config():
global devaddr,nwskey,appskey,fCnt
config_file = open('config.json')
parsed_json = json.load(config_file)
devaddr = list(bytearray.fromhex(parsed_json['devaddr']))
nwskey = list(bytearray.fromhex(parsed_json['nwskey']))
appskey = list(bytearray.fromhex(parsed_json['appskey']))
fCnt = parsed_json['fCnt']
print("devaddr: ",parsed_json['devaddr'])
print("nwskey : ",parsed_json['nwskey'])
print("appskey: ",parsed_json['appskey'],"\n")
# Init
RxDone = False
fCnt = 0
devaddr = []
nwskey = []
appskey = []
read_config()
lora = LoRaWANsend(False)
# Setup
lora.set_mode(MODE.SLEEP)
lora.set_dio_mapping([1,0,0,0,0,0])
lora.set_freq(AS923.FREQ1)
lora.set_spreading_factor(SF.SF7)
lora.set_bw(BW.BW125)
lora.set_pa_config(pa_select=1)
lora.set_pa_config(max_power=0x0F, output_power=0x0E)
lora.set_sync_word(0x34)
lora.set_rx_crc(True)
#print(lora)
assert(lora.get_agc_auto_on() == 1)
try:
print("Sending LoRaWAN message")
lora.start()
except KeyboardInterrupt:
sys.stdout.flush()
print("\nKeyboardInterrupt")
finally:
sys.stdout.flush()
lora.set_mode(MODE.SLEEP)
BOARD.teardown()
write_config()