@@ -24,7 +24,8 @@ class LogCollector(object):
2424 DESC = 'Collects the log information and sends to NSQTopic'
2525
2626 QUEUE_MAX_SIZE = 2000 # Maximum number of messages in in-mem queue
27- NBYTES_TO_SEND = 5 * (1024 ** 2 ) # Number of bytes from in-mem queue minimally required to push
27+ MAX_NBYTES_TO_SEND = 4.5 * (1024 ** 2 ) # Number of bytes from in-mem queue minimally required to push
28+ MIN_NBYTES_TO_SEND = 512 * 1024 # Minimum number of bytes to send to nsq in mpub
2829 MAX_SECONDS_TO_PUSH = 1 # Wait till this much time elapses before pushing
2930 LOG_FILE_POLL_INTERVAL = 0.25 # Wait time to pull log file for new lines added
3031 QUEUE_READ_TIMEOUT = 1 # Wait time when doing blocking read on the in-mem q
@@ -71,7 +72,10 @@ def _remove_redundancy(self, log):
7172 def validate_log_format (self , log ):
7273 for key in log :
7374 assert (key in self .LOG_STRUCTURE )
74- assert isinstance (log [key ], self .LOG_STRUCTURE [key ])
75+ try :
76+ assert isinstance (log [key ], self .LOG_STRUCTURE [key ])
77+ except AssertionError as e :
78+ self .log .exception ('formatted_log_structure_rejected' , log = log )
7579
7680 @keeprunning (LOG_FILE_POLL_INTERVAL , on_error = util .log_exception )
7781 def collect_log_lines (self , log_file ):
@@ -83,7 +87,6 @@ def collect_log_lines(self, log_file):
8387 for line_info in freader :
8488 line = line_info ['line' ][:- 1 ] # remove new line char at the end
8589 log = dict (
86- id = None ,
8790 file = fpath ,
8891 host = self .HOST ,
8992 formatter = L ['formatter' ],
@@ -123,58 +126,67 @@ def collect_log_lines(self, log_file):
123126 self .log .debug ('waiting_for_pygtail_to_fully_ack' , wait_time = t )
124127 time .sleep (t )
125128
126- def _get_msgs_from_queue (self , msgs , msgs_nbytes , timeout ):
129+ def _get_msgs_from_queue (self , msgs , timeout ):
130+ msgs_pending = []
127131 read_from_q = False
128132 ts = time .time ()
129133
134+ msgs_nbytes = sum (len (m ['log' ]) for m in msgs )
135+
130136 while 1 :
131137 try :
132138 msg = self .queue .get (block = True , timeout = self .QUEUE_READ_TIMEOUT )
133139 read_from_q = True
134140 self .log .debug ("tally:get_from_self.queue" )
135- msgs .append (msg )
136- msgs_nbytes += len (msg ['log' ])
137141
138- if msgs_nbytes > self .NBYTES_TO_SEND :
142+ _msgs_nbytes = msgs_nbytes + len (msg ['log' ])
143+ _msgs_nbytes += 1 # for newline char
144+
145+ if _msgs_nbytes > self .MAX_NBYTES_TO_SEND :
146+ msgs_pending .append (msg )
139147 self .log .debug ('msg_bytes_read_mem_queue_exceeded' )
140148 break
149+
150+ msgs .append (msg )
151+ msgs_nbytes = _msgs_nbytes
152+
141153 #FIXME condition never met
142154 if time .time () - ts >= timeout and msgs :
143155 self .log .debug ('msg_reading_timeout_from_mem_queue_got_exceeded' )
144156 break
145157 # TODO: What if a single log message itself is bigger than max bytes limit?
158+
146159 except Queue .Empty :
147160 self .log .debug ('queue_empty' )
148161 time .sleep (self .QUEUE_READ_TIMEOUT )
149162 if not msgs :
150163 continue
151164 else :
152- return msgs , msgs_nbytes , read_from_q
165+ return msgs_pending , msgs_nbytes , read_from_q
166+
153167 self .log .debug ('got_msgs_from_mem_queue' )
154- return msgs , msgs_nbytes , read_from_q
168+ return msgs_pending , msgs_nbytes , read_from_q
155169
156170
157171 @keeprunning (0 , on_error = util .log_exception ) # FIXME: what wait time var here?
158172 def send_to_nsq (self , state ):
159173 self .log .debug ('send_to_nsq' )
160174 msgs = []
161- msgs_nbytes = 0
162175 should_push = False
163176
164177 while not should_push :
165178 cur_ts = time .time ()
166179 self .log .debug ('should_push' , should_push = should_push )
167180 time_since_last_push = cur_ts - state .last_push_ts
168181
169- msgs , msgs_nbytes , read_from_q = self ._get_msgs_from_queue (msgs ,
170- msgs_nbytes ,
182+ msgs_pending , msgs_nbytes , read_from_q = self ._get_msgs_from_queue (msgs ,
171183 self .MAX_SECONDS_TO_PUSH )
172184
173- have_enough_msgs = msgs_nbytes >= self .NBYTES_TO_SEND
185+ have_enough_msgs = msgs_nbytes >= self .MIN_NBYTES_TO_SEND
174186 is_max_time_elapsed = time_since_last_push >= self .MAX_SECONDS_TO_PUSH
175187
176188 should_push = len (msgs ) > 0 and (is_max_time_elapsed or have_enough_msgs )
177- self .log .debug ('desciding_to_push ' , should_push = should_push ,
189+ self .log .debug ('deciding_to_push ' , should_push = should_push ,
178190 time_since_last_push = time_since_last_push ,
179191 msgs_nbytes = msgs_nbytes )
180192
@@ -183,7 +195,7 @@ def send_to_nsq(self, state):
183195 self .nsq_sender .handle_logs (msgs )
184196 self .confirm_success (msgs )
185197 self .log .debug ('pushed_to_nsq' , msgs_length = len (msgs ))
186- msgs = []
198+ msgs = msgs_pending
187199 state .last_push_ts = time .time ()
188200 except (SystemExit , KeyboardInterrupt ): raise
189201 finally :
0 commit comments