Skip to content

Commit 0dfe233

Browse files
authored
Merge pull request #66 from supriyopaul/custom_handlers
README.md update one how to create and use custom handlers
2 parents 5c8de8b + 85e4bdd commit 0dfe233

5 files changed

Lines changed: 104 additions & 30 deletions

File tree

.travis.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ deploy:
1111
skip_cleanup: true
1212
api-key:
1313
secure: Rxl45qbTHWIbOhst3PS60ETfW5wDByxp0xv4ZbtgRGe4SPvHtOLHRNGiajsQX37pgUFF9ALcCseY2cTk46jNEA1jOzFx4DDSKyH+Wu4H5F4M8JDBBlIsvsgezumLsYMqOL18caZA8J84N9UyuzgdPBDb0B0mMclRa9xRaxWncrUZgXwW9r3N2zU1LvGtd0Su4zLXXP6HC6mKHdOOaNSDONqaesx1njYTGr5fbWy7IXrjSg75wWCtHW1dKDPXmyyWZomwpmhURYfYXn/o9lRaXSDpLWx4xTsbJQdG9EiSPm5fLjfv9tZTxIF7jB0tTrOB63gGAgrLu0zC5Z5MJ1Y0+sbotI8eySI4w0GTffhi4WQjTTyO02vgPuSCm9JV5aW+YeNJtSncEgaVgsuUmZUiWdqMsvPG+bqOjh/i0eIkHr/v7cyf3HndFieZH9H3XdlEDtyr4SRExQSjG+be6mcGOJMWMrXervcW6kGP3pcX7EWgrFxnkz9lSgx/0meNMP4JDo8pZWg50b0xpni3zUcweTgCIeYUBd5aIKUvPaCqSHC1BAyZI5z3Cvdlq0tjCS726drQcV4OJNjrnmb301/K6MBbXhAsyhbkB1NpUZ0k0ZwmGxQ7iE4N1pod2BQbTPxjNUL1KNQJXFvjr9Clrw9Arqo6X9S9t//GP2DDl5Ke5KQ=
14-
name: logagg-0.2.6
15-
tag_name: 0.2.6
14+
name: logagg-0.2.7
15+
tag_name: 0.2.7
1616
on:
1717
branch: master
1818
repo: deep-compute/logagg

README.md

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ Collects all the logs from the server and parses it for making a common schema f
167167
##### or
168168
- Docker run
169169
```bash
170-
$ sudo docker run --name collector --volume /var/log:/var/log deepcompute/logagg logagg --log-level INFO collect --file file=/var/log/serverstats.log:formatter=logagg.formatters.basescript --nsqtopic logagg --nsqd-http-address <nsq-server-ip-or-DNS>:4151
170+
$ sudo docker run --name collector --volume /var/log:/var/log deepcompute/logagg logagg collect --file file=/var/log/serverstats.log:formatter=logagg.formatters.basescript --nsqtopic logagg --nsqd-http-address <nsq-server-ip-or-DNS>:4151
171171
```
172172
- **Note**: Replace **<nsq-server-ip-or-DNS>** with the ip of `nsq` server eg.: **192.168.0.211**
173173
- **Note**: **--volume** argument is to mount local directory of log file into `Docker` `container`
@@ -322,7 +322,7 @@ $ python
322322
- You can see the basic format of record like below:
323323
```json
324324
{
325-
"_id" : ObjectId("some_id"),
325+
"_id" : "20180301T065838_f7e042841d1d11e8bcf1000c2925b24d",
326326
"level" : "info",
327327
"timestamp" : "isoformat_time. Ex: 2017-08-01T07:32:24.183981Z",
328328
"data" : {},
@@ -332,7 +332,6 @@ $ python
332332
"event" : "default_event",
333333
"file" : "/path/to/log/file",
334334
"type" : "log | metric"
335-
"id" : "20180301T065838_f7e042841d1d11e8bcf1000c2925b24d"
336335
}
337336
```
338337
@@ -390,8 +389,84 @@ time request_time
390389
|MongoDBForwarder|`--target forwarder=logagg.forwarders.MongoDBForwarder:host=<mongoDB-server-ip>:port=<mongod-port-number>:user=<user-name>:password=<passwd>:db=<db-name>:collection=<collection name>`|
391390
|InfluxDBForwarder|`--target forwarder=logagg.forwarders.InfluxDBForwarder:host=<influxDB-server-ip>:port=<influxd-port-number>:user=<user-name>:password=<passwd>:db=<db-name>:collection=nothing`|
392391
392+
**Note:** For using multiple forwarders use the format ``--target <forwarder1> <forwarder2>`` and not ``--target <forwarder1> --target <forwarder2>``
393+
394+
### How to create and use custom formatters for log files
395+
#### Step 1: make a directory and append it's path to evironment variable $PYTHONPATH
396+
```bash
397+
$ echo $PYTHONPATH
398+
399+
$ mkdir customformatters
400+
$ #Now append the path to $PYTHONPATH
401+
$ export PYTHONPATH=$PYTHONPATH:/home/path/to/customformatters/
402+
403+
$ echo $PYTHONPATH
404+
:/home/path/to/customformatters
405+
```
406+
#### Step 2: Create a another directory and put your formatter file(s) inside it.
407+
408+
```bash
409+
$ cd customformatters/
410+
$ mkdir myformatters
411+
$ cd myformatters/
412+
$ touch formatters.py
413+
$ touch __init__.py
414+
$ echo 'import formatters' >> __init__.py
415+
$ #Now write your formatter functions inside the formatters.py file
416+
```
417+
#### Step 3: Write your formatter functions inside the formatters.py file
418+
419+
**Important:**
420+
1. Only **python standard modules** can be imported in formatters.py file
421+
2. A formatter function should return a **dict()** `datatype`
422+
3. The 'dict()' should only contain keys which are mentioned in the above [log structure](https://github.com/deep-compute/logagg#features).
423+
4. Sample formatter functions:
424+
```python
425+
import json
426+
import re
427+
428+
sample_log_line = '2018-02-07T06:37:00.297610Z [Some_event] [Info] [Hello_there]'
429+
430+
def sample_formatter(log_line):
431+
log = re.sub('[\[+\]]', '',log_line).split(' ')
432+
timestamp = log[0]
433+
event = log[1]
434+
level = log[2]
435+
data = dict({'message': log[3]})
436+
437+
return dict(timestamp = timestamp,
438+
event = event,
439+
level = level,
440+
data = data,
441+
)
442+
```
443+
To see more examples, look [here](https://github.com/deep-compute/logagg/blob/master/logagg/formatters.py)
444+
445+
5. Check if the custom handler works in `python interpreter` like for logagg.
446+
```python
447+
>>> import myformatters
448+
>>> sample_log_line = '2018-02-07T06:37:00.297610Z [Some_event] [Info] [Hello_there]'
449+
>>> output = myformatters.formatters.sample_formatter(sample_log_line)
450+
>>> from pprint import pprint
451+
>>> pprint(output)
452+
{'data': {'message': 'Hello_there'},
453+
'event': 'Some_event',
454+
'level': 'Info',
455+
'timestamp': '2018-02-07T06:37:00.297610Z'}
456+
```
457+
6. Pseudo logagg collect commands:
458+
```
459+
$ sudo logagg collect --file file=logfile.log:myformatters.formatters.sample_formatter --nsqtopic logagg --nsqd-http-address localhost:4151
460+
```
461+
**or**
462+
docker run
463+
```
464+
$ sudo docker run --name collector --env PYTHONPATH=$PYTHONPATH --volume /var/log:/var/log deepcompute/logagg logagg collect --file file=logfile.log:myformatters.formatters.sample_formatter --nsqtopic logagg --nsqd-http-address <nsq-server-ip-or-DNS>:4151
465+
```
466+
393467
---
394-
## Build on it
468+
469+
## Build on logagg
395470
396471
You're more than welcome to hack on this:-)
397472

logagg/formatters.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ def docker_log_file_driver(line):
99
log = json.loads(json.loads(line)['msg'])
1010
if 'formatter' in log.get('extra'):
1111
return RawLog(dict(formatter=log.get('extra').get('formatter'),
12-
raw=str(log.get('message')),
12+
raw=log.get('message'),
1313
host=log.get('host'),
1414
timestamp=log.get('timestamp'),
1515
)

logagg/forwarders.py

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,14 @@ def _ensure_connection(self):
5353
self.host,
5454
self.port)
5555
client = MongoClient(url, serverSelectionTimeoutMS=self.SERVER_SELECTION_TIMEOUT)
56-
self.log.info('MongoDB_server_connection_established', host=self.host)
56+
self.log.info('mongodb_server_connection_established', host=self.host)
5757
self.database = client[self.db_name]
58-
self.log.info('MongoDB_database_created', db=self.db_name)
58+
self.log.info('mongodb_database_created', db=self.db_name)
5959
self.collection = self.database[self.coll]
60-
self.log.info('MongoDB_collection_created' ,
60+
self.log.info('mongodb_collection_created' ,
6161
collection=self.collection, db=self.db_name)
6262

63-
def _parse_msg_for_mongoDB(self, msgs):
63+
def _parse_msg_for_mongodb(self, msgs):
6464
msgs_list = []
6565
#TODO: We need to do this by using iteration object.
6666
for msg in msgs:
@@ -75,19 +75,19 @@ def _insert_1by1(self, records):
7575
try:
7676
self.collection.insert_one(r, ordered=False)
7777
except pymongo.errors.OperationFailure as opfail:
78-
self.log.exception('failed_to_insert_record_in_mongoDB',
78+
self.log.exception('failed_to_insert_record_in_mongodb',
7979
record=msg, tb=opfail.details)
8080

8181
def handle_logs(self, msgs):
82-
msgs_list = self._parse_msg_for_mongoDB(msgs)
82+
msgs_list = self._parse_msg_for_mongodb(msgs)
8383
try:
8484
self.log.debug('inserting_msgs_mongodb')
8585
self.collection.insert_many([msg for msg in msgs_list], ordered=False)
8686
self.log.info('logs_inserted_into_mongodb', num_msgs=len(msgs), type='metric')
87-
except pymongo.errors.AutoReconnect(message='connection_to_mongoDB_failed'):
87+
except pymongo.errors.AutoReconnect(message='connection_to_mongodb_failed'):
8888
self._ensure_connection()
8989
except pymongo.errors.BulkWriteError as bwe:
90-
self.log.exception('bulk_write_to_mongoDB_failed', tb=bwe.details)
90+
self.log.exception('bulk_write_to_mongodb_failed', tb=bwe.details)
9191
self._insert_1by1(msgs_list)
9292

9393

@@ -98,9 +98,9 @@ def handle_logs(self, msgs):
9898
from logagg.util import flatten_dict, is_number
9999

100100
class InfluxDBForwarder(BaseForwarder):
101-
EXCLUDE_TAGS = ["raw", "timestamp", "type", "event"]
101+
EXCLUDE_TAGS = ["id","raw", "timestamp", "type", "event"]
102102

103-
influxDB_records = []
103+
influxdb_records = []
104104

105105
def __init__(self,
106106
host, port,
@@ -116,12 +116,12 @@ def __init__(self,
116116
self._ensure_connection()
117117

118118
def _ensure_connection(self):
119-
# Establish connection to influxdb to store metrics
119+
# Establish connection to influxDB to store metrics
120120
self.influxdb_client = InfluxDBClient(self.host, self.port, self.user,
121121
self.passwd, self.db_name)
122-
self.log.info('InfluxDB_server_connection_established', host=self.host)
122+
self.log.info('influxdb_server_connection_established', host=self.host)
123123
self.influxdb_database = self.influxdb_client.create_database(self.db_name)
124-
self.log.info('InfluxDB_database_created', dbname=self.db_name)
124+
self.log.info('influxdb_database_created', dbname=self.db_name)
125125

126126
def _tag_and_field_maker(self,event):
127127
t = dict()
@@ -134,7 +134,7 @@ def _tag_and_field_maker(self,event):
134134
t[key] = event[key]
135135
return t, f
136136

137-
def parse_msg_for_influxDB(self, msgs):
137+
def parse_msg_for_influxdb(self, msgs):
138138
#TODO: We need to do this by using iteration object.
139139
series = []
140140
for msg in msgs:
@@ -161,19 +161,18 @@ def handle_logs(self, msgs):
161161
msgs_list.append(msg_body)
162162

163163
self.log.debug('parsing_of_metrics_started')
164-
records = self.parse_msg_for_influxDB(msgs_list)
165-
self.influxDB_records.extend(records)
164+
records = self.parse_msg_for_influxdb(msgs_list)
165+
self.influxdb_records.extend(records)
166166
self.log.debug('parsing_of_metrics_completed')
167167

168-
self.influxDB_records = [record for record in self.influxDB_records if record]
169168
try:
170169
self.log.debug('inserting_the_metrics_into_influxdb')
171-
self.influxdb_client.write_points(self.influxDB_records)
170+
self.influxdb_client.write_points(self.influxdb_records)
172171
self.log.info('metrics_inserted_into_influxdb',
173-
length=len(self.influxDB_records),
172+
length=len(self.influxdb_records),
174173
type='metric')
175-
self.influxDB_records = []
174+
self.influxdb_records = []
176175
except (InfluxDBClientError, InfluxDBServerError) as e:
177176
self.log.exception('failed_to_insert metric',
178-
record=self.influxDB_records,
179-
length=len(self.influxDB_records))
177+
record=self.influxdb_records,
178+
length=len(self.influxdb_records))

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
setup(
55
name="logagg",
6-
version="0.2.6",
6+
version="0.2.7",
77
description="logs aggregation framework",
88
keywords="logagg",
99
author="Deep Compute, LLC",

0 commit comments

Comments
 (0)