-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathchannel_proxy.go
More file actions
120 lines (96 loc) · 2.75 KB
/
channel_proxy.go
File metadata and controls
120 lines (96 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package cache_proxy_demo
func UseChannel(baseProxy *BaseCacheProxy) CacheProxy {
proxy := &ChannelProxy{
transform: baseProxy.Transform,
cache: baseProxy.Cache,
mqCommandGet: make(chan CommandGetReadModel),
mqEventGot: make(chan EventGotReadModel),
firstGoroutineRecords: make(map[string]*Entry),
closeManager: make(chan struct{}),
baseProxy: baseProxy,
}
go proxy.manager()
return proxy
}
type ChannelProxy struct {
transform TransformQryOptionToCacheKey
cache Cache
mqCommandGet chan CommandGetReadModel
mqEventGot chan EventGotReadModel
firstGoroutineRecords map[string]*Entry
closeManager chan struct{}
baseProxy *BaseCacheProxy
}
func (proxy *ChannelProxy) Execute(qryOption any, readModelType any) (readModel any, err error) {
return proxy.execute1(qryOption, readModelType)
}
func (proxy *ChannelProxy) execute1(qryOption any, readModelType any) (readModel any, err error) {
command := CommandGetReadModel{
qryOption: qryOption,
readModelType: readModelType,
replyAddress: make(chan Result, 1),
}
proxy.mqCommandGet <- command
result := <-command.replyAddress
return result.val, result.err
}
func (proxy *ChannelProxy) execute3(qryOption any, readModelType any) (readModel any, err error) {
key := proxy.transform(qryOption)
val, err := proxy.cache.GetValue(key, readModelType)
if err == nil {
return val, nil
}
return proxy.execute1(qryOption, readModelType)
}
func (proxy *ChannelProxy) manager() {
for {
select {
case <-proxy.closeManager:
return
default:
}
select {
case cmd := <-proxy.mqCommandGet:
key := proxy.transform(cmd.qryOption)
entry := proxy.firstGoroutineRecords[key]
if entry == nil {
entry = &Entry{ready: make(chan struct{})}
proxy.firstGoroutineRecords[key] = entry
go proxy.mainReader(cmd, key, entry)
}
go proxy.replier(cmd.replyAddress, entry)
case key := <-proxy.mqEventGot:
proxy.firstGoroutineRecords[key] = nil
case <-proxy.closeManager:
return
// default: // default 移除註解, 可以讓效能更好
}
}
}
func (proxy *ChannelProxy) mainReader(cmd CommandGetReadModel, key string, entry *Entry) {
defer func() {
close(entry.ready)
// time.Sleep(time.Second)
proxy.mqEventGot <- key
}()
readModel, err := proxy.baseProxy.Execute(cmd.qryOption, cmd.readModelType)
entry.result = Result{val: readModel, err: err}
}
func (proxy *ChannelProxy) replier(replyAddress chan Result, entry *Entry) {
<-entry.ready
replyAddress <- entry.result
}
type Entry struct {
ready chan struct{}
result Result
}
type Result struct {
val any
err error
}
type CommandGetReadModel struct {
qryOption any
readModelType any
replyAddress chan Result
}
type EventGotReadModel = string