Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 3 additions & 15 deletions pylogstash/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import zmq
import socket
import datetime
import threading

MAX_MESSAGES = 1000

Expand All @@ -20,19 +19,10 @@ def __init__(self, connect_string="tcp://127.0.0.1:2120", fields=[],
self._connect_string = connect_string
self._fields = fields
self._input_type = input_type
self._local = threading.local()
self._queue_length = queue_length

@property
def publisher(self):
if not hasattr(self._local, 'publisher'):
print("creating publisher")
# 0mq sockets aren't threadsafe, so bind them into a
# threadlocal
self._local.publisher = self._context.socket(zmq.PUB)
self._local.publisher.setsockopt(zmq.HWM, self._queue_length)
self._local.publisher.connect(self._connect_string)
return self._local.publisher
self.publisher = self._context.socket(zmq.PUB)
self.publisher.setsockopt(zmq.HWM, self._queue_length)
self.publisher.connect(self._connect_string)

def emit(self, record):
field_dict = dict([(field, getattr(record, field)) for field in self._fields if hasattr(record, field)])
Expand All @@ -52,6 +42,4 @@ def emit(self, record):
"@source_host": host,
"@message": self.format(record)
}
print("Shipping log")
self.publisher.send_json(message)
print("message shipped")