-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtokenAndPrice.go
More file actions
234 lines (208 loc) · 9.55 KB
/
tokenAndPrice.go
File metadata and controls
234 lines (208 loc) · 9.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package rajomon
import (
"context"
"errors"
"fmt"
"math"
"strconv"
"strings"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// SplitTokens splits the tokens left on the request to the downstream services.
// It returns a map, with the downstream service names as keys, and tokens left for them as values.
func (pt *PriceTable) SplitTokens(ctx context.Context, tokenleft int64, methodName string) ([]string, error) {
downstreamNames := pt.callMap[methodName]
size := len(downstreamNames)
if size == 0 {
return nil, nil
}
downstreamTokens := []string{}
downstreamPriceSum, _ := pt.RetrieveDSPrice(ctx, methodName)
tokenleftPerDownstream := (tokenleft - downstreamPriceSum) / int64(size)
logger("[Split tokens]: downstream total price is %d, from %d downstream services for %s, extra token left for each ds is %d\n",
downstreamPriceSum, size, pt.nodeName, tokenleftPerDownstream)
for _, downstreamName := range downstreamNames {
// concatenate the method name with node name to distinguish different downstream services calls.
downstreamPriceString, _ := pt.priceTableMap.Load(methodName + "-" + downstreamName)
downstreamPrice := downstreamPriceString.(int64)
downstreamToken := tokenleftPerDownstream + downstreamPrice
downstreamTokens = append(downstreamTokens, "tokens-"+downstreamName, strconv.FormatInt(downstreamToken, 10))
// logger("[Split tokens]: token for %s is %d + %d\n", downstreamName, tokenleftPerDownstream, downstreamPrice)
}
return downstreamTokens, nil
}
func (pt *PriceTable) RetrieveDSPrice(ctx context.Context, methodName string) (int64, error) {
if len(pt.callMap[methodName]) == 0 {
logger("[Retrieve DS Price]: No downstream service for %s\n", methodName)
return 0, nil
}
// load the downstream price from the price table with method name as key.
downstreamPrice_string, ok := pt.priceTableMap.Load(methodName)
if !ok || downstreamPrice_string == nil {
return 0, errors.New("[retrieve ds price] downstream price not found")
}
downstreamPrice, ok := downstreamPrice_string.(int64)
if !ok {
return 0, errors.New("[retrieve ds price] downstream price wrong type")
}
logger("[Retrieve DS Price]: Downstream price of %s is %d\n", methodName, downstreamPrice)
return downstreamPrice, nil
}
func (pt *PriceTable) RetrieveTotalPrice(ctx context.Context, methodName string) (string, error) {
ownPrice_string, _ := pt.priceTableMap.Load("ownprice")
ownPrice := ownPrice_string.(int64)
downstreamPrice, _ := pt.RetrieveDSPrice(ctx, methodName)
var totalPrice int64
if pt.priceAggregation == "maximal" {
// totalPrice is the max of ownPrice and downstreamPrice
if ownPrice > downstreamPrice {
totalPrice = ownPrice
} else {
totalPrice = downstreamPrice
}
} else if pt.priceAggregation == "additive" {
totalPrice = ownPrice + downstreamPrice
} else if pt.priceAggregation == "mean" {
totalPrice = (ownPrice + downstreamPrice) / 2
}
price_string := strconv.FormatInt(totalPrice, 10)
logger("[Retrieve Total Price]: Downstream price of %s is now %d, total price %d \n", methodName, downstreamPrice, totalPrice)
return price_string, nil
}
// Assume that own price is per microservice and it does not change across different types of requests/interfaces.
func (pt *PriceTable) UpdateOwnPrice(congestion bool) error {
ownPrice_string, _ := pt.priceTableMap.Load("ownprice")
ownPrice := ownPrice_string.(int64)
// The following code has been moved to decrementCounter() for pinpointThroughput.
logger("[Update OwnPrice]: congestion is %t, own price %d, step %d\n", congestion, ownPrice, pt.priceStep)
if congestion {
ownPrice += pt.priceStep
if pt.guidePrice > -1 {
ownPrice = pt.guidePrice
}
} else if ownPrice > 0 {
ownPrice -= 1
}
pt.priceTableMap.Store("ownprice", ownPrice)
logger("[Update OwnPrice]: Own price updated to %d\n", ownPrice)
return nil
}
// merged function for both linear and exponential price update
func (pt *PriceTable) UpdatePrice(ctx context.Context) error {
// 1. Retrieve the current ownPrice
ownPriceInterface, _ := pt.priceTableMap.Load("ownprice")
ownPrice := ownPriceInterface.(int64)
var gapLatency float64
// 2. Extract gapLatency and calculate the latency difference (diff)
val := ctx.Value(GapLatencyKey)
if val == nil {
gapLatency = 0.0
} else {
gapLatency = val.(float64)
}
diff := int64(gapLatency*1000) - pt.latencyThreshold.Microseconds()
// proportionally adjust the price based on the latency difference
adjustment := diff * pt.priceStep / 10000
// Implement the decay mechanism
if pt.priceStrategy == "expdecay" {
if adjustment > 0 {
if pt.consecutiveIncreases >= 2 {
// If the counter exceeds the threshold, decay the step size by 1/5 ** counter
adjustment = int64(float64(adjustment) * math.Pow(pt.decayRate, float64(pt.consecutiveIncreases)))
logger("[Price Step Decay]: Price step decreased by %f ** %d\n", pt.decayRate, pt.consecutiveIncreases)
}
pt.consecutiveIncreases++ // Increment counter for consecutive increases
} else if adjustment < 0 {
// Reset counter and step size to non-decay version
pt.consecutiveIncreases = 0
}
}
logger("[Update Price by Queue Delay]: Own price %d, step %d\n", ownPrice, adjustment)
ownPrice += adjustment
// Set reservePrice to the larger of pt.guidePrice and 0
reservePrice := int64(math.Max(float64(pt.guidePrice), 0))
if ownPrice <= reservePrice {
ownPrice = reservePrice
}
pt.priceTableMap.Store("ownprice", ownPrice)
// run the following code every 200 milliseconds
if pt.lastUpdateTime.Add(200 * time.Millisecond).Before(time.Now()) {
// merge the log code into a single line
msgToLog := fmt.Sprintf("[Own price]: %d [Incremental Waiting Time Maximum]: %.2f ms.\n", ownPrice, gapLatency)
recordPrice(msgToLog)
pt.lastUpdateTime = time.Now()
}
return nil
}
// UpdateDownstreamPrice incorperates the downstream price table to its own price table.
func (pt *PriceTable) UpdateDownstreamPrice(ctx context.Context, method string, nodeName string, downstreamPrice int64) (int64, error) {
if pt.priceAggregation == "maximal" || pt.priceAggregation == "mean" {
// Update the downstream price, but concatenate the method name with node name to distinguish different downstream services calls.
pt.priceTableMap.Store(method+"-"+nodeName, downstreamPrice)
logger("[Received Resp]: Downstream price of %s updated to %d\n", method+"-"+nodeName, downstreamPrice)
// if the downstream price is greater than the current downstream price, update the downstream price.
downstreamPrice_old, loaded := pt.priceTableMap.Load(method)
if !loaded {
// raise an error if the downstream price is not loaded.
logger("[Error]: Cannot find the previous downstream price of %s\n", method)
// return 0, status.Errorf(codes.Aborted, fmt.Sprintf("Downstream price of %s is not loaded", method+"-"+nodeName))
downstreamPrice_old = int64(0)
} else {
logger("[Previous DS Price]: The DS price of %s was %d before the update.\n", method, downstreamPrice_old)
}
if downstreamPrice > downstreamPrice_old.(int64) {
// update the downstream price
pt.priceTableMap.Store(method, downstreamPrice)
logger("[Updating DS Price]: Downstream price of %s updated to %d\n", method, downstreamPrice)
return downstreamPrice, nil
}
// if the downstream price is not greater than the current downstream price, store it and calculate the max
// find the maximum downstream price of the request.
maxPrice := int64(math.MinInt64) // Smallest int64 value
pt.priceTableMap.Range(func(key, value interface{}) bool {
if k, ok := key.(string); ok && strings.HasPrefix(k, method+"-") {
if v, valid := value.(int64); valid && v > maxPrice {
maxPrice = v
}
}
return true
})
logger("[Updated DS Price]: The price of %s is now %d\n", method, maxPrice)
// update the downstream price only for the method involved in the request.
pt.priceTableMap.Store(method, maxPrice)
} else if pt.priceAggregation == "additive" {
// load the downstream price from the price table with method + node name as key.
downstreamPrice_old, loaded := pt.priceTableMap.Swap(method+"-"+nodeName, downstreamPrice)
if !loaded {
// raise an error if the downstream price is not loaded.
return 0, status.Errorf(codes.Aborted, fmt.Sprintf("Downstream price of %s is not loaded", method+"-"+nodeName))
}
// calculate the new diff between the old downstream price and the new downstream price.
diff := downstreamPrice - downstreamPrice_old.(int64)
// if the diff is 0, return 0
if diff == 0 {
return 0, nil
}
// apply the diff to the all methods' whenever the downstream price is part of the method call.
for methodName, downstreamNames := range pt.callMap {
for _, downstreamName := range downstreamNames {
if downstreamName == nodeName {
// increase the downstream price of the method by the diff.
// load the downstream price from the price table with method name as key. then update the downstream price. then save it back to the price table.
methodPrice, _ := pt.priceTableMap.Load(methodName)
methodPrice = methodPrice.(int64) + diff
pt.priceTableMap.Store(methodName, methodPrice)
// log the downstream price of the request.
logger("[Updated DS Price]: The price of %s is now %d\n", methodName, methodPrice)
}
}
}
// Update the downstream price, but concatenate the method name with node name to distinguish different downstream services calls.
pt.priceTableMap.Store(method+"-"+nodeName, downstreamPrice)
logger("[Received Resp]: Downstream price of %s updated to %d\n", method+"-"+nodeName, downstreamPrice)
// pt.SaveDSPrice(ctx, method)
}
return downstreamPrice, nil
}