@@ -86,17 +86,21 @@ def collect_log_lines(self, log_file):
8686 freader = Pygtail (fpath )
8787 for line_info in freader :
8888 line = line_info ['line' ][:- 1 ] # remove new line char at the end
89+
90+ # assign default values
8991 log = dict (
9092 id = None ,
9193 file = fpath ,
9294 host = self .HOST ,
9395 formatter = L ['formatter' ],
9496 event = 'event' ,
97+ data = {},
9598 raw = line ,
9699 timestamp = datetime .datetime .utcnow ().isoformat (),
97100 type = 'log' ,
98101 level = 'debug' ,
99102 error = False ,
103+ error_tb = '' ,
100104 )
101105
102106 try :
@@ -108,15 +112,17 @@ def collect_log_lines(self, log_file):
108112 _log = util .load_object (formatter )(raw_log )
109113
110114 log .update (_log )
111- if log ['id' ] == None :
112- log ['id' ] = uuid .uuid1 ().hex
113- log = self ._remove_redundancy (log )
114- self .validate_log_format (log )
115115 except (SystemExit , KeyboardInterrupt ) as e : raise
116116 except :
117- self .log .exception ('Error during handling log line' , log = log )
118117 log ['error' ] = True
119118 log ['error_tb' ] = traceback .format_exc ()
119+ self .log .exception ('error_during_handling_log_line' , log = log ['raw' ])
120+
121+ if log ['id' ] == None :
122+ log ['id' ] = uuid .uuid1 ().hex
123+
124+ log = self ._remove_redundancy (log )
125+ self .validate_log_format (log )
120126
121127 self .queue .put (dict (log = json .dumps (log ),
122128 freader = freader , line_info = line_info ))
@@ -144,7 +150,7 @@ def _get_msgs_from_queue(self, msgs, timeout):
144150 _msgs_nbytes = msgs_nbytes + len (msg ['log' ])
145151 _msgs_nbytes += 1 # for newline char
146152
147- if _msgs_nbytes > self .MAX_NBYTES_TO_SEND :
153+ if _msgs_nbytes > self .MAX_NBYTES_TO_SEND :
148154 msgs_pending .append (msg )
149155 self .log .debug ('msg_bytes_read_mem_queue_exceeded' )
150156 break
@@ -216,7 +222,7 @@ def confirm_success(self, msgs):
216222 ack_fnames .add (fname )
217223 freader .update_offset_file (msg ['line_info' ])
218224
219- @keeprunning (SCAN_FPATTERNS_INTERVAL ,on_error = util .log_exception )
225+ @keeprunning (SCAN_FPATTERNS_INTERVAL , on_error = util .log_exception )
220226 def _scan_fpatterns (self , state ):
221227 '''
222228 fpaths = 'file=/var/log/nginx/access.log:formatter=logagg.formatters.nginx_access'
@@ -246,9 +252,9 @@ def _scan_fpatterns(self, state):
246252 log_key = (fpath , fpattern , formatter )
247253 if log_key not in self .log_reader_threads :
248254 self .log .info ('starting_collect_log_lines_thread' , log_key = log_key )
249- #self.collect_log_lines(log_f)
250255 # There is no existing thread tracking this log file. Start one
251- self .log_reader_threads [log_key ] = util .start_daemon_thread (self .collect_log_lines , (log_f ,))
256+ log_reader_thread = util .start_daemon_thread (self .collect_log_lines , (log_f ,))
257+ self .log_reader_threads [log_key ] = log_reader_thread
252258 state .files_tracked .append (fpath )
253259 time .sleep (self .SCAN_FPATTERNS_INTERVAL )
254260
@@ -265,12 +271,14 @@ def send_heartbeat(self, state):
265271
266272 def start (self ):
267273 state = AttrDict (files_tracked = list ())
268- #self._scan_fpatterns(state)
269274 util .start_daemon_thread (self ._scan_fpatterns , (state ,))
270275
271276 state = AttrDict (last_push_ts = time .time ())
272277 util .start_daemon_thread (self .send_to_nsq , (state ,))
273278
274279 state = AttrDict (heartbeat_number = 0 )
275- util .start_daemon_thread (self .send_heartbeat , (state ,)). join ( )
280+ th_heartbeat = util .start_daemon_thread (self .send_heartbeat , (state ,))
276281
282+ while True :
283+ th_heartbeat .join (1 )
284+ if not th_heartbeat .isAlive (): break
0 commit comments