2020After a downtime of collector, pygtail is missing logs from rotational files
2121'''
2222
23+ def load_formatter_fn (formatter ):
24+ '''
25+ >>> load_formatter_fn('logagg.formatters.basescript') #doctest: +ELLIPSIS
26+ <function basescript at 0x...>
27+ '''
28+ obj = util .load_object (formatter )
29+ if not hasattr (obj , 'ispartial' ):
30+ obj .ispartial = util .ispartial
31+ return obj
32+
2333class LogCollector (object ):
2434 DESC = 'Collects the log information and sends to NSQTopic'
2535
@@ -33,7 +43,6 @@ class LogCollector(object):
3343 SCAN_FPATTERNS_INTERVAL = 30 # How often to scan filesystem for files matching fpatterns
3444 HOST = socket .gethostname ()
3545 HEARTBEAT_RESTART_INTERVAL = 30 # Wait time if heartbeat sending stops
36- #TODO check for structure in validate_log_format
3746
3847 LOG_STRUCTURE = {
3948 'id' : basestring ,
@@ -50,8 +59,11 @@ class LogCollector(object):
5059 'error_tb' : basestring ,
5160 }
5261
53- def __init__ (self , fpaths , nsq_sender ,
54- heartbeat_interval , log = util .DUMMY_LOGGER ):
62+ def __init__ (self ,
63+ fpaths ,
64+ heartbeat_interval ,
65+ nsq_sender = util .DUMMY ,
66+ log = util .DUMMY ):
5567 self .fpaths = fpaths
5668 self .nsq_sender = nsq_sender
5769 self .heartbeat_interval = heartbeat_interval
@@ -64,52 +76,168 @@ def __init__(self, fpaths, nsq_sender,
6476 self .queue = Queue .Queue (maxsize = self .QUEUE_MAX_SIZE )
6577
6678 def _remove_redundancy (self , log ):
79+ """Removes duplicate data from 'data' inside log dict and brings it
80+ out.
81+
82+ >>> lc = LogCollector('file=/path/to/log_file.log:formatter=logagg.formatters.basescript', 30)
83+
84+ >>> log = {'id' : 46846876, 'type' : 'log',
85+ ... 'data' : {'a' : 1, 'b' : 2, 'type' : 'metric'}}
86+ >>> lc._remove_redundancy(log)
87+ {'data': {'a': 1, 'b': 2}, 'type': 'metric', 'id': 46846876}
88+ """
6789 for key in log :
6890 if key in log and key in log ['data' ]:
6991 log [key ] = log ['data' ].pop (key )
7092 return log
7193
7294 def validate_log_format (self , log ):
95+ '''
96+ >>> lc = LogCollector('file=/path/to/file.log:formatter=logagg.formatters.basescript', 30)
97+
98+ >>> incomplete_log = {'data' : {'x' : 1, 'y' : 2},
99+ ... 'raw' : 'Not all keys present'}
100+ >>> lc.validate_log_format(incomplete_log)
101+ 'failed'
102+
103+ >>> redundant_log = {'one_invalid_key' : 'Extra information',
104+ ... 'data': {'x' : 1, 'y' : 2},
105+ ... 'error': False,
106+ ... 'error_tb': '',
107+ ... 'event': 'event',
108+ ... 'file': '/path/to/file.log',
109+ ... 'formatter': 'logagg.formatters.mongodb',
110+ ... 'host': 'deepcompute-ThinkPad-E470',
111+ ... 'id': '0112358',
112+ ... 'level': 'debug',
113+ ... 'raw': 'some log line here',
114+ ... 'timestamp': '2018-04-07T14:06:17.404818',
115+ ... 'type': 'log'}
116+ >>> lc.validate_log_format(redundant_log)
117+ 'failed'
118+
119+ >>> correct_log = {'data': {'x' : 1, 'y' : 2},
120+ ... 'error': False,
121+ ... 'error_tb': '',
122+ ... 'event': 'event',
123+ ... 'file': '/path/to/file.log',
124+ ... 'formatter': 'logagg.formatters.mongodb',
125+ ... 'host': 'deepcompute-ThinkPad-E470',
126+ ... 'id': '0112358',
127+ ... 'level': 'debug',
128+ ... 'raw': 'some log line here',
129+ ... 'timestamp': '2018-04-07T14:06:17.404818',
130+ ... 'type': 'log'}
131+ >>> lc.validate_log_format(correct_log)
132+ 'passed'
133+ '''
134+
135+ keys_in_log = set (log )
136+ keys_in_log_structure = set (self .LOG_STRUCTURE )
137+ try :
138+ assert (keys_in_log == keys_in_log_structure )
139+ except AssertionError as e :
140+ self .log .warning ('formatted_log_structure_rejected' ,
141+ key_not_found = list (keys_in_log_structure - keys_in_log ),
142+ extra_keys_found = list (keys_in_log - keys_in_log_structure ),
143+ num_logs = 1 ,
144+ type = 'metric' )
145+ return 'failed'
146+
73147 for key in log :
74- assert (key in self .LOG_STRUCTURE )
75148 try :
76149 assert isinstance (log [key ], self .LOG_STRUCTURE [key ])
77150 except AssertionError as e :
78- self .log .exception ('formatted_log_structure_rejected' , log = log )
151+ self .log .warning ('formatted_log_structure_rejected' ,
152+ key_datatype_not_matched = key ,
153+ datatype_expected = type (self .LOG_STRUCTURE [key ]),
154+ datatype_got = type (log [key ]),
155+ num_logs = 1 ,
156+ type = 'metric' )
157+ return 'failed'
158+
159+ return 'passed'
160+
161+ def _full_from_frags (self , frags ):
162+ full_line = '\n ' .join ([l for l , _ in frags ])
163+ line_info = frags [- 1 ][- 1 ]
164+ return full_line , line_info
165+
166+ def _iter_logs (self , freader , fmtfn ):
167+ # FIXME: does not handle partial lines
168+ # at the start of a file properly
169+
170+ frags = []
171+
172+ for line_info in freader :
173+ line = line_info ['line' ][:- 1 ] # remove new line char at the end
174+
175+ if not fmtfn .ispartial (line ) and frags :
176+ yield self ._full_from_frags (frags )
177+ frags = []
178+
179+ frags .append ((line , line_info ))
180+
181+ if frags :
182+ yield self ._full_from_frags (frags )
183+
184+ def assign_default_log_values (self , fpath , line , formatter ):
185+ '''
186+ >>> lc = LogCollector('file=/path/to/log_file.log:formatter=logagg.formatters.basescript', 30)
187+ >>> from pprint import pprint
188+
189+ >>> formatter = 'logagg.formatters.mongodb'
190+ >>> fpath = '/var/log/mongodb/mongodb.log'
191+ >>> line = 'some log line here'
192+
193+ >>> default_log = lc.assign_default_log_values(fpath, line, formatter)
194+ >>> pprint(default_log) #doctest: +ELLIPSIS
195+ {'data': {},
196+ 'error': False,
197+ 'error_tb': '',
198+ 'event': 'event',
199+ 'file': '/var/log/mongodb/mongodb.log',
200+ 'formatter': 'logagg.formatters.mongodb',
201+ 'host': '...',
202+ 'id': None,
203+ 'level': 'debug',
204+ 'raw': 'some log line here',
205+ 'timestamp': '...',
206+ 'type': 'log'}
207+ '''
208+ return dict (
209+ id = None ,
210+ file = fpath ,
211+ host = self .HOST ,
212+ formatter = formatter ,
213+ event = 'event' ,
214+ data = {},
215+ raw = line ,
216+ timestamp = datetime .datetime .utcnow ().isoformat (),
217+ type = 'log' ,
218+ level = 'debug' ,
219+ error = False ,
220+ error_tb = '' ,
221+ )
79222
80223 @keeprunning (LOG_FILE_POLL_INTERVAL , on_error = util .log_exception )
81224 def collect_log_lines (self , log_file ):
82225 L = log_file
83226 fpath = L ['fpath' ]
84- self .log .debug ('tracking_file_for_log_lines' , fpath = fpath )
227+ fmtfn = L ['formatter_fn' ]
228+ formatter = L ['formatter' ]
85229
86230 freader = Pygtail (fpath )
87- for line_info in freader :
88- line = line_info ['line' ][:- 1 ] # remove new line char at the end
89-
90- # assign default values
91- log = dict (
92- id = None ,
93- file = fpath ,
94- host = self .HOST ,
95- formatter = L ['formatter' ],
96- event = 'event' ,
97- data = {},
98- raw = line ,
99- timestamp = datetime .datetime .utcnow ().isoformat (),
100- type = 'log' ,
101- level = 'debug' ,
102- error = False ,
103- error_tb = '' ,
104- )
231+ for line , line_info in self ._iter_logs (freader , fmtfn ):
232+ log = self .assign_default_log_values (fpath , line , formatter )
105233
106234 try :
107- _log = L [ 'formatter_fn' ] (line )
235+ _log = fmtfn (line )
108236
109237 if isinstance (_log , RawLog ):
110238 formatter , raw_log = _log ['formatter' ], _log ['raw' ]
111239 log .update (_log )
112- _log = util . load_object (formatter )(raw_log )
240+ _log = load_formatter_fn (formatter )(raw_log )
113241
114242 log .update (_log )
115243 except (SystemExit , KeyboardInterrupt ) as e : raise
@@ -122,7 +250,7 @@ def collect_log_lines(self, log_file):
122250 log ['id' ] = uuid .uuid1 ().hex
123251
124252 log = self ._remove_redundancy (log )
125- self .validate_log_format (log )
253+ if self .validate_log_format (log ) == 'failed' : continue
126254
127255 self .queue .put (dict (log = json .dumps (log ),
128256 freader = freader , line_info = line_info ))
@@ -175,10 +303,8 @@ def _get_msgs_from_queue(self, msgs, timeout):
175303 self .log .debug ('got_msgs_from_mem_queue' )
176304 return msgs_pending , msgs_nbytes , read_from_q
177305
178-
179306 @keeprunning (0 , on_error = util .log_exception ) # FIXME: what wait time var here?
180307 def send_to_nsq (self , state ):
181- self .log .debug ('send_to_nsq' )
182308 msgs = []
183309 should_push = False
184310
@@ -199,10 +325,14 @@ def send_to_nsq(self, state):
199325 msgs_nbytes = msgs_nbytes )
200326
201327 try :
202- self .log .debug ('trying_to_push_to_nsq' , msgs_length = len (msgs ))
203- self .nsq_sender .handle_logs (msgs )
328+ if isinstance (self .nsq_sender , type (util .DUMMY )):
329+ for m in msgs :
330+ self .log .info ('final_log_format' , log = m ['log' ])
331+ else :
332+ self .log .debug ('trying_to_push_to_nsq' , msgs_length = len (msgs ))
333+ self .nsq_sender .handle_logs (msgs )
334+ self .log .debug ('pushed_to_nsq' , msgs_length = len (msgs ))
204335 self .confirm_success (msgs )
205- self .log .debug ('pushed_to_nsq' , msgs_length = len (msgs ))
206336 msgs = msgs_pending
207337 state .last_push_ts = time .time ()
208338 except (SystemExit , KeyboardInterrupt ): raise
@@ -225,37 +355,60 @@ def confirm_success(self, msgs):
225355 @keeprunning (SCAN_FPATTERNS_INTERVAL , on_error = util .log_exception )
226356 def _scan_fpatterns (self , state ):
227357 '''
228- fpaths = 'file=/var/log/nginx/access.log:formatter=logagg.formatters.nginx_access'
229- fpattern = '/var/log/nginx/access.log'
358+ For a list of given fpatterns, this starts a thread
359+ collecting log lines from file
360+
361+ >>> os.path.isfile = lambda path: path == '/path/to/log_file.log'
362+ >>> lc = LogCollector('file=/path/to/log_file.log:formatter=logagg.formatters.basescript', 30)
363+
364+ >>> print(lc.fpaths)
365+ file=/path/to/log_file.log:formatter=logagg.formatters.basescript
366+
367+ >>> print('formatters loaded:', lc.formatters)
368+ {}
369+ >>> print('log file reader threads started:', lc.log_reader_threads)
370+ {}
371+ >>> state = AttrDict(files_tracked=list())
372+ >>> print('files bieng tracked:', state.files_tracked)
373+ []
374+
375+
376+ >>> if not state.files_tracked:
377+ >>> lc._scan_fpatterns(state)
378+ >>> print('formatters loaded:', lc.formatters)
379+ >>> print('log file reader threads started:', lc.log_reader_threads)
380+ >>> print('files bieng tracked:', state.files_tracked)
381+
382+
230383 '''
231384 for f in self .fpaths :
232385 fpattern , formatter = (a .split ('=' )[1 ] for a in f .split (':' , 1 ))
233386 self .log .debug ('scan_fpatterns' , fpattern = fpattern , formatter = formatter )
234387 # TODO code for scanning fpatterns for the files not yet present goes here
235388 fpaths = glob .glob (fpattern )
236389 # Load formatter_fn if not in list
390+ fpaths = list (set (fpaths ) - set (state .files_tracked ))
237391 for fpath in fpaths :
238- if fpath not in state .files_tracked :
239- try :
240- formatter_fn = self .formatters .get (formatter ,
241- util .load_object (formatter ))
242- self .log .info ('found_formatter_fn' , fn = formatter )
243- self .formatters [formatter ] = formatter_fn
244- except (SystemExit , KeyboardInterrupt ): raise
245- except (ImportError , AttributeError ):
246- self .log .exception ('formatter_fn_not_found' , fn = formatter )
247- sys .exit (- 1 )
248- # Start a thread for every file
249- self .log .info ('found_log_file' , log_file = fpath )
250- log_f = dict (fpath = fpath , fpattern = fpattern ,
251- formatter = formatter , formatter_fn = formatter_fn )
252- log_key = (fpath , fpattern , formatter )
253- if log_key not in self .log_reader_threads :
254- self .log .info ('starting_collect_log_lines_thread' , log_key = log_key )
255- # There is no existing thread tracking this log file. Start one
256- log_reader_thread = util .start_daemon_thread (self .collect_log_lines , (log_f ,))
257- self .log_reader_threads [log_key ] = log_reader_thread
258- state .files_tracked .append (fpath )
392+ try :
393+ formatter_fn = self .formatters .get (formatter ,
394+ load_formatter_fn (formatter ))
395+ self .log .info ('found_formatter_fn' , fn = formatter )
396+ self .formatters [formatter ] = formatter_fn
397+ except (SystemExit , KeyboardInterrupt ): raise
398+ except (ImportError , AttributeError ):
399+ self .log .exception ('formatter_fn_not_found' , fn = formatter )
400+ sys .exit (- 1 )
401+ # Start a thread for every file
402+ self .log .info ('found_log_file' , log_file = fpath )
403+ log_f = dict (fpath = fpath , fpattern = fpattern ,
404+ formatter = formatter , formatter_fn = formatter_fn )
405+ log_key = (fpath , fpattern , formatter )
406+ if log_key not in self .log_reader_threads :
407+ self .log .info ('starting_collect_log_lines_thread' , log_key = log_key )
408+ # There is no existing thread tracking this log file. Start one
409+ log_reader_thread = util .start_daemon_thread (self .collect_log_lines , (log_f ,))
410+ self .log_reader_threads [log_key ] = log_reader_thread
411+ state .files_tracked .append (fpath )
259412 time .sleep (self .SCAN_FPATTERNS_INTERVAL )
260413
261414 @keeprunning (HEARTBEAT_RESTART_INTERVAL , on_error = util .log_exception )
0 commit comments