-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathoverloadDetection.go
More file actions
146 lines (123 loc) · 4.7 KB
/
overloadDetection.go
File metadata and controls
146 lines (123 loc) · 4.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package rajomon
import (
"context"
"runtime/metrics"
"sync/atomic"
"time"
)
// define a package‐wide key type and key constant
type ctxKey string
const GapLatencyKey ctxKey = "gapLatency"
func (pt *PriceTable) Increment() {
atomic.AddInt64(&pt.throughputCounter, 1)
}
func (pt *PriceTable) Decrement(step int64) {
atomic.AddInt64(&pt.throughputCounter, -step)
}
func (pt *PriceTable) GetCount() int64 {
// return atomic.LoadInt64(&cc.throughtputCounter)
return atomic.SwapInt64(&pt.throughputCounter, 0)
}
func (pt *PriceTable) latencyCheck() {
for range time.Tick(pt.priceUpdateRate) {
// create a new incoming context with the "request-id" as "0"
// ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("request-id", "0"))
// change to using the average latency
pt.UpdateOwnPrice(pt.observedDelay.Milliseconds() > pt.latencyThreshold.Milliseconds()*pt.GetCount())
pt.observedDelay = time.Duration(0)
}
}
// queuingCheck checks if the queuing delay of go routine is greater than the latency SLO.
func (pt *PriceTable) queuingCheck() {
// define a custom type for context key
// init a null histogram
var prevHist *metrics.Float64Histogram
for range time.Tick(pt.priceUpdateRate) {
// start a timer to measure the query latency
start := time.Now()
// get the current histogram
currHist := readHistogram()
if prevHist == nil {
// directly go to next iteration
prevHist = currHist
continue
}
gapLatency := maximumQueuingDelayms(prevHist, currHist)
ctx := context.Background()
logger("[Incremental Waiting Time Maximum]: %f ms.\n", gapLatency)
// store the gapLatency in the context ctx
ctx = context.WithValue(ctx, GapLatencyKey, gapLatency)
if pt.priceStrategy == "step" {
pt.UpdateOwnPrice(pt.overloadDetection(ctx))
} else {
pt.UpdatePrice(ctx)
}
// copy the content of current histogram to the previous histogram
prevHist = currHist
// log the time elapsed for the query
logger("[Query Latency]: Overhead is %.2f milliseconds\n", float64(time.Since(start).Microseconds())/1000)
}
}
// throughputCheck decrements the counter by 2x every x milliseconds.
func (pt *PriceTable) throughputCheck() {
for range time.Tick(pt.priceUpdateRate) {
// pt.Decrement(pt.throughputThreshold)
// Create an empty context
// ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("request-id", "0"))
logger("[Throughput Counter]: The throughtput counter is %d\n", pt.throughputCounter)
// pt.UpdateOwnPrice(ctx, pt.GetCount() > 0)
// update own price if getCounter is greater than the threshold
pt.UpdateOwnPrice(pt.GetCount() > pt.throughputThreshold)
}
}
// checkBoth checks both throughput and latency.
func (pt *PriceTable) checkBoth() {
var prevHist *metrics.Float64Histogram
for range time.Tick(pt.priceUpdateRate) {
logger("[Throughput Counter]: The throughtput counter is %d\n", pt.throughputCounter)
// get the current histogram
currHist := readHistogram()
// calculate the differernce between the two histograms prevHist and currHist
diff := metrics.Float64Histogram{}
// if preHist is empty pointer, return currHist
if prevHist == nil {
diff = *currHist
} else {
diff = GetHistogramDifference(*prevHist, *currHist)
}
// maxLatency is the max of the histogram in milliseconds.
gapLatency := maximumBucket(&diff)
cumulativeLat := medianBucket(currHist)
// printHistogram(currHist)
logger("[Cumulative Waiting Time Median]: %f ms.\n", cumulativeLat)
// printHistogram(&diff)
logger("[Incremental Waiting Time 90-tile]: %f ms.\n", percentileBucket(&diff, 90))
logger("[Incremental Waiting Time Median]: %f ms.\n", medianBucket(&diff))
logger("[Incremental Waiting Time Maximum]: %f ms.\n", maximumBucket(&diff))
pt.UpdateOwnPrice(pt.GetCount() > pt.throughputThreshold && int64(gapLatency*1000) > pt.latencyThreshold.Microseconds())
// copy the content of current histogram to the previous histogram
prevHist = currHist
}
}
// overloadDetection takes signals as input, (either pinpointLatency or throughputCounter)
// and compares them with the threshold. If the signal is greater than the threshold,
// then the overload flag is set to true. If the signal is less than the threshold,
// then the overload flag is set to false. The overload flag is then used to update
// the price table.
func (pt *PriceTable) overloadDetection(ctx context.Context) bool {
if pt.pinpointQueuing {
// declare a variable to store the gapLatency
var gapLatency float64
// read the gapLatency from context ctx
val := ctx.Value(GapLatencyKey)
if val == nil {
gapLatency = 0.0
} else {
gapLatency = val.(float64)
}
if int64(gapLatency*1000) > pt.latencyThreshold.Microseconds() {
return true
}
}
return false
}