forked from chiangbing/hman
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmetrics.py
More file actions
107 lines (89 loc) · 3.12 KB
/
metrics.py
File metadata and controls
107 lines (89 loc) · 3.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# -*- coding: utf-8 -*-
from collections import deque
import requests
class MetricsItem(object):
'''A single metrics item.'''
def __init__(self, name, func, requried=1, formatter=None):
self.name = name
self.func = func
self.formatter = formatter
self.requried = requried
self.snapshots = None
def update(self, snapshots):
self.snapshots = snapshots
def __str__(self):
if len(self.snapshots) < self.requried:
return 'WAITING'
val = self.func(self.snapshots)
if self.formatter is not None:
val = self.formatter(val)
return str(val)
class RegionServerMetrics(object):
'''Represent metrics for a region server.'''
keys = []
items = {}
max_snapshots = 0
@classmethod
def register(cls, name, func, requried=1, formatter=None):
cls.keys.append(name)
cls.items[name] = MetricsItem(name, func, requried, formatter)
cls.max_snapshots = max(requried, cls.max_snapshots)
def __init__(self, hostname, port):
self.hostname = hostname
self.port = port
self.snapshots = deque(maxlen=self.max_snapshots)
self.is_alive = True
def poll(self):
'''Poll metrics from region server.'''
metrics = self._get_metrics()
self.snapshots.appendleft(metrics)
def _get_metrics(self):
'''Get metrics from regionserver rs.
Return metrics as JSON.'''
try:
metrics_url = 'http://%s:%d/metrics?format=json' \
% (self.hostname, self.port)
r = requests.get(metrics_url)
if r.status_code == 400:
# TODO: oops! no /metrics ?
pass
r.raise_for_status()
# can be connected to, is alive or back into live again
self.is_alive = True
return r.json()
except requests.exceptions.ConnectionError:
self.is_alive = False
return None
def __getitem__(self, key):
if not self.is_alive:
return 'DEAD'
if key not in self.keys:
raise KeyError()
self.items[key].update(self.snapshots)
return str(self.items[key])
def _delta(valfunc):
return lambda pair: valfunc(pair[0]) - valfunc(pair[1])
# register known metrics
RegionServerMetrics.register(
'HOSTNAME',
lambda xs: xs[0]['rpc']['metrics'][0][0]['hostName'])
RegionServerMetrics.register(
'RPC_MULTIS',
_delta(lambda x: x['rpc']['metrics'][0][1]['multi_num_ops']),
requried=2)
RegionServerMetrics.register(
'REQS/S',
lambda xs: xs[0]['hbase']['regionserver'][0][1]['requests'],
formatter=int)
RegionServerMetrics.register(
'FLU_SIZE',
lambda xs: xs[0]['hbase']['regionserver'][0][1]['flushSize_avg_time'])
RegionServerMetrics.register(
'RGNS',
lambda xs: xs[0]['hbase']['regionserver'][0][1]['regions'])
RegionServerMetrics.register(
'STR_FILES',
lambda xs: xs[0]['hbase']['regionserver'][0][1]['storefiles'])
RegionServerMetrics.register(
'CMPCT_Q_SZ',
lambda xs: xs[0]['hbase']['regionserver'][0][1]['compactionQueueSize'])