-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathwg.go
More file actions
401 lines (341 loc) · 8.7 KB
/
wg.go
File metadata and controls
401 lines (341 loc) · 8.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
package proxy
import (
"context"
"errors"
"fmt"
"net"
"net/netip"
// "strconv"
"sync"
"time"
"golang.org/x/exp/maps"
uwgtun "github.com/urnetwork/userwireguard/tun"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/urnetwork/glog"
"github.com/urnetwork/connect"
"github.com/urnetwork/userwireguard/conn"
"github.com/urnetwork/userwireguard/device"
"github.com/urnetwork/userwireguard/logger"
)
// FIXME currently the client ipv4 is threaded to the egress providers
// this can allow tracing a single client ipv4 across multiple providers
// it should be natted to a standard ipv4
var DidNotSendError = errors.New("did not send")
var PacketTooLargeError = errors.New("packet too large for buffer")
type WgClient struct {
PublicKey string
// TODO this is the signed proxy id
PresharedKey string
ClientIpv4 netip.Addr
Tun func() (WgTun, error)
}
type WgTun interface {
CancelIfIdle() bool
Send([]byte) bool
SetReceive(chan []byte)
}
func DefaultWgProxySettings() *WgProxySettings {
return &WgProxySettings{
ReceiveSequenceSize: 1024,
EventsSequenceSize: 16,
CheckTunIdleTimeout: 1 * time.Minute,
}
}
type WgProxySettings struct {
PrivateKey string
ReceiveSequenceSize int
EventsSequenceSize int
CheckTunIdleTimeout time.Duration
FirewallMark int
}
// implements wg device:
// - parses packets from wg by source ip to forward to tun
// - all packets from tuns are put back into wg
// the wg proxy reader is activated on first sent packet from the proxy
type WgProxy struct {
ctx context.Context
cancel context.CancelFunc
settings *WgProxySettings
events chan uwgtun.Event
receive chan []byte
device *device.Device
stateLock sync.Mutex
clients map[netip.Addr]*WgClient
activeClients map[netip.Addr]WgTun
}
func NewWgProxyWithDefaults(ctx context.Context) *WgProxy {
return NewWgProxy(ctx, DefaultWgProxySettings())
}
func NewWgProxy(ctx context.Context, settings *WgProxySettings) *WgProxy {
cancelCtx, cancel := context.WithCancel(ctx)
wg := &WgProxy{
ctx: cancelCtx,
cancel: cancel,
settings: settings,
events: make(chan uwgtun.Event, settings.EventsSequenceSize),
receive: make(chan []byte, settings.ReceiveSequenceSize),
clients: map[netip.Addr]*WgClient{},
activeClients: map[netip.Addr]WgTun{},
}
logger := &logger.Logger{
Verbosef: func(format string, args ...any) {
glog.Infof("[wg]"+format, args...)
},
Errorf: func(format string, args ...any) {
glog.Errorf("[wg]"+format, args...)
},
}
wg.device = device.NewDevice(wg, conn.NewDefaultBind(), logger)
go connect.HandleError(wg.run)
return wg
}
func (self *WgProxy) run() {
defer self.cancel()
for {
func() {
self.stateLock.Lock()
defer self.stateLock.Unlock()
for addr, activeTun := range self.activeClients {
if activeTun.CancelIfIdle() {
activeTun.SetReceive(nil)
delete(self.activeClients, addr)
}
}
}()
select {
case <-self.ctx.Done():
return
case <-time.After(self.settings.CheckTunIdleTimeout):
}
}
}
func (self *WgProxy) ListenAndServe(ipv4 string, ipv6 string, port int) error {
privateKey, err := wgtypes.ParseKey(self.settings.PrivateKey)
if err != nil {
return err
}
glog.Infof("[wg]ipv4=%s ipv6=%s port=%d fwmark=%d\n", ipv4, ipv6, port, self.settings.FirewallMark)
config := &device.Config{
Config: wgtypes.Config{
PrivateKey: &privateKey,
ListenPort: &port,
ReplacePeers: true,
Peers: []wgtypes.PeerConfig{},
},
BindIpv4: &ipv4,
BindIpv6: &ipv6,
}
if 0 < self.settings.FirewallMark {
config.FirewallMark = &self.settings.FirewallMark
}
err = self.device.IpcSet2(config)
if err != nil {
return err
}
self.device.AddEvent(uwgtun.EventUp)
select {
case <-self.device.Wait():
}
return nil
}
// hot patches the devices into the wg server
// if the device is already active, keep it active
func (self *WgProxy) SetClients(clients map[netip.Addr]*WgClient) (returnErr error) {
self.stateLock.Lock()
defer self.stateLock.Unlock()
peers := createPeerConfigs(clients)
config := wgtypes.Config{
ReplacePeers: true,
Peers: peers,
}
returnErr = self.device.IpcSet(&config)
if returnErr != nil {
return
}
clear(self.clients)
for addr, client := range clients {
self.clients[addr] = client
}
for addr, activeTun := range self.activeClients {
if client, ok := self.clients[addr]; ok {
tun, err := client.Tun()
if err == nil {
tun.SetReceive(self.receive)
self.activeClients[addr] = tun
} else {
activeTun.SetReceive(nil)
delete(self.activeClients, addr)
returnErr = errors.Join(returnErr, err)
}
} else {
activeTun.SetReceive(nil)
delete(self.activeClients, addr)
}
}
return
}
// only clients not already in the state are added
func (self *WgProxy) AddClients(clients map[netip.Addr]*WgClient) (returnErr error) {
self.stateLock.Lock()
defer self.stateLock.Unlock()
newClients := maps.Clone(clients)
for addr, _ := range newClients {
if _, ok := self.clients[addr]; ok {
delete(newClients, addr)
}
}
if len(newClients) == 0 {
return
}
peers := createPeerConfigs(newClients)
config := wgtypes.Config{
ReplacePeers: false,
Peers: peers,
}
returnErr = self.device.IpcSet(&config)
if returnErr != nil {
return
}
for addr, client := range newClients {
self.clients[addr] = client
}
return
}
func (self *WgProxy) activateClient(addr netip.Addr) (WgTun, error) {
self.stateLock.Lock()
defer self.stateLock.Unlock()
tun, ok := self.activeClients[addr]
if ok {
return tun, nil
}
client, ok := self.clients[addr]
if ok {
tun, err := client.Tun()
if err == nil {
tun.SetReceive(self.receive)
self.activeClients[addr] = tun
return tun, nil
} else {
return nil, err
}
}
return nil, fmt.Errorf("No client found for %s.", addr)
}
func (self *WgProxy) MTU() int {
return 0
}
func (self *WgProxy) Events() <-chan uwgtun.Event {
return self.events
}
func (self *WgProxy) AddEvent(event uwgtun.Event) {
select {
case <-self.ctx.Done():
case self.events <- event:
}
}
func (self *WgProxy) BatchSize() int {
return 1
}
// `uwgtun.Device` implementation
func (self *WgProxy) Write(bufs [][]byte, offset int) (count int, returnErr error) {
for _, buf := range bufs {
packet := buf[offset:]
// packet := gopacket.NewPacket(packet, layers.LayerTypeIPv4, gopacket.Default)
err := func() error {
ipPath, err := connect.ParseIpPath(packet)
if err != nil {
return err
}
sourceAddr, ok := netip.AddrFromSlice(ipPath.SourceIp)
if !ok {
return fmt.Errorf("Unknown source ip")
}
tun, err := self.activateClient(sourceAddr)
if err != nil {
return err
}
success := tun.Send(packet)
if !success {
return DidNotSendError
}
return nil
}()
if err == nil {
count += 1
} else {
returnErr = errors.Join(returnErr, err)
}
}
return
}
// `uwgtun.Device` implementation
func (self *WgProxy) Read(bufs [][]byte, sizes []int, offset int) (count int, returnErr error) {
select {
case <-self.ctx.Done():
return 0, fmt.Errorf("Done.")
case packet := <-self.receive:
defer connect.MessagePoolReturn(packet)
n := copy(bufs[0][offset:], packet)
if len(packet) < n {
returnErr = errors.Join(returnErr, PacketTooLargeError)
}
sizes[0] = n
count += 1
return
}
}
// `uwgtun.Device` implementation
func (self *WgProxy) Close() error {
self.device.Close()
return nil
}
func createPeerConfigs(clients map[netip.Addr]*WgClient) []wgtypes.PeerConfig {
var peerConfigs []wgtypes.PeerConfig
for _, client := range clients {
publicKey, err := wgtypes.ParseKey(client.PublicKey)
if err == nil {
var presharedKey *wgtypes.Key
if client.PresharedKey != "" {
var presharedKey_ wgtypes.Key
presharedKey_, err = wgtypes.ParseKey(client.PresharedKey)
if err == nil {
presharedKey = &presharedKey_
}
}
if err == nil {
peerConfig := wgtypes.PeerConfig{
PublicKey: publicKey,
PresharedKey: presharedKey,
ReplaceAllowedIPs: true,
AllowedIPs: []net.IPNet{
{
IP: net.IP(client.ClientIpv4.AsSlice()),
Mask: net.CIDRMask(32, 32),
},
},
}
peerConfigs = append(peerConfigs, peerConfig)
}
}
}
return peerConfigs
}
func WgGenKeyPair() (privateKey wgtypes.Key, publicKey wgtypes.Key, err error) {
privateKey, err = wgtypes.GeneratePrivateKey()
if err != nil {
return
}
publicKey = privateKey.PublicKey()
return
}
func WgGenKeyPairStrings() (privateKeyStr string, publicKeyStr string, err error) {
var privateKey wgtypes.Key
var publicKey wgtypes.Key
privateKey, publicKey, err = WgGenKeyPair()
if err != nil {
return
}
privateKeyStr = privateKey.String()
publicKeyStr = publicKey.String()
return
}