Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions apps/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from dask.distributed import LocalCluster, Client
from multiprocessing import Process, Pipe, TimeoutError

from pymongo import MongoClient
from pymongo.errors import ServerSelectionTimeoutError

from utils import AsyncTimer
from intrusion_detection import IntrusionDetector

Expand All @@ -17,6 +20,10 @@
from jagereye_ng.io import io_worker, notification, database
from jagereye_ng import logging

# TODO: this must be configuration in the future
MONGODB_URL = 'mongodb://localhost:27019/'
MONGODB_NAME = 'jager_test'


class HotReconfigurationError(Exception):
def __str__(self):
Expand Down Expand Up @@ -353,6 +360,37 @@ def on_stop(self, sid):
else:
self._analyzers[sid].stop()

def restart_existed_analyzers(self):
# try to get analyzers info from mongodb
try:
mongo_cli = MongoClient(MONGODB_URL, serverSelectionTimeoutMS=5)
db = mongo_cli[MONGODB_NAME]
anal_coll = db['analyzers']
analyzers_info = anal_coll.find({})

if analyzers_info.count() < 1:
return

logging.info("Restart existed analyzers")
for analyzer_info in analyzers_info:
print(analyzer_info)
try:
sid = analyzer_info["_id"]
name = analyzer_info["name"]
source = analyzer_info["source"]
pipelines = analyzer_info["pipelines"]

# Create analyzer object and insert info into self._analyzers
self._analyzers[sid] = Analyzer(
self._cluster, sid, name, source, pipelines)
# start self._analyzers
self._analyzers[sid].start()
except KeyError:
raise
except ServerSelectionTimeoutError as err:
raise RuntimeError("Mongodb connect failed", err)



if __name__ == "__main__":
cluster = LocalCluster(n_workers=0)
Expand All @@ -374,5 +412,6 @@ def on_stop(self, sid):
# Start analyzer manager
io_loop = asyncio.get_event_loop()
manager = AnalyzerManager(cluster, io_loop, ["nats://localhost:4222"])
manager.restart_existed_analyzers()
io_loop.run_forever()
io_loop.close()