diff --git a/apps/analyzer.py b/apps/analyzer.py index c593b1f..b652534 100644 --- a/apps/analyzer.py +++ b/apps/analyzer.py @@ -7,6 +7,9 @@ from dask.distributed import LocalCluster, Client from multiprocessing import Process, Pipe, TimeoutError +from pymongo import MongoClient +from pymongo.errors import ServerSelectionTimeoutError + from utils import AsyncTimer from intrusion_detection import IntrusionDetector @@ -17,6 +20,10 @@ from jagereye_ng.io import io_worker, notification, database from jagereye_ng import logging +# TODO: this must be configuration in the future +MONGODB_URL = 'mongodb://localhost:27019/' +MONGODB_NAME = 'jager_test' + class HotReconfigurationError(Exception): def __str__(self): @@ -353,6 +360,37 @@ def on_stop(self, sid): else: self._analyzers[sid].stop() + def restart_existed_analyzers(self): + # try to get analyzers info from mongodb + try: + mongo_cli = MongoClient(MONGODB_URL, serverSelectionTimeoutMS=5) + db = mongo_cli[MONGODB_NAME] + anal_coll = db['analyzers'] + analyzers_info = anal_coll.find({}) + + if analyzers_info.count() < 1: + return + + logging.info("Restart existed analyzers") + for analyzer_info in analyzers_info: + print(analyzer_info) + try: + sid = analyzer_info["_id"] + name = analyzer_info["name"] + source = analyzer_info["source"] + pipelines = analyzer_info["pipelines"] + + # Create analyzer object and insert info into self._analyzers + self._analyzers[sid] = Analyzer( + self._cluster, sid, name, source, pipelines) + # start self._analyzers + self._analyzers[sid].start() + except KeyError: + raise + except ServerSelectionTimeoutError as err: + raise RuntimeError("Mongodb connect failed", err) + + if __name__ == "__main__": cluster = LocalCluster(n_workers=0) @@ -374,5 +412,6 @@ def on_stop(self, sid): # Start analyzer manager io_loop = asyncio.get_event_loop() manager = AnalyzerManager(cluster, io_loop, ["nats://localhost:4222"]) + manager.restart_existed_analyzers() io_loop.run_forever() io_loop.close()