From c9f1641e851822bb8bb936a7787d66bc51468898 Mon Sep 17 00:00:00 2001 From: uniray7 Date: Wed, 2 May 2018 15:53:05 +0800 Subject: [PATCH 1/2] add restart mechanism in analyzer.py --- apps/analyzer.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/apps/analyzer.py b/apps/analyzer.py index c593b1f..d85f36c 100644 --- a/apps/analyzer.py +++ b/apps/analyzer.py @@ -7,6 +7,9 @@ from dask.distributed import LocalCluster, Client from multiprocessing import Process, Pipe, TimeoutError +from pymongo import MongoClient +from pymongo.errors import ServerSelectionTimeoutError + from utils import AsyncTimer from intrusion_detection import IntrusionDetector @@ -17,6 +20,10 @@ from jagereye_ng.io import io_worker, notification, database from jagereye_ng import logging +# TODO: this must be configuration in the future +MONGODB_URL = 'mongodb://localhost:27017/' +MONGODB_NAME = 'jager_test' + class HotReconfigurationError(Exception): def __str__(self): @@ -353,6 +360,37 @@ def on_stop(self, sid): else: self._analyzers[sid].stop() + def restart_existed_analyzers(self): + # try to get analyzers info from mongodb + try: + mongo_cli = MongoClient(MONGODB_URL, serverSelectionTimeoutMS=5) + db = mongo_cli[MONGODB_NAME] + anal_coll = db['analyzers'] + analyzers_info = anal_coll.find({}) + + if analyzers_info.count() < 1: + return + + logging.info("Restart existed analyzers") + for analyzer_info in analyzers_info: + print(analyzer_info) + try: + sid = analyzer_info["_id"] + name = analyzer_info["name"] + source = analyzer_info["source"] + pipelines = analyzer_info["pipelines"] + + # Create analyzer object and insert info into self._analyzers + self._analyzers[sid] = Analyzer( + self._cluster, sid, name, source, pipelines) + # start self._analyzers + self._analyzers[sid].start() + except KeyError: + raise RuntimeError("Analyzer not found") + except ServerSelectionTimeoutError: + raise RuntimeError("Mongodb connect failed") + + if __name__ == "__main__": cluster = LocalCluster(n_workers=0) @@ -374,5 +412,6 @@ def on_stop(self, sid): # Start analyzer manager io_loop = asyncio.get_event_loop() manager = AnalyzerManager(cluster, io_loop, ["nats://localhost:4222"]) + manager.restart_existed_analyzers() io_loop.run_forever() io_loop.close() From 6cf0bc3c5f0d69d110e084acfb27339f61b2a097 Mon Sep 17 00:00:00 2001 From: uniray7 Date: Wed, 2 May 2018 15:57:06 +0800 Subject: [PATCH 2/2] modify exception handle in restart mechanism --- apps/analyzer.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/analyzer.py b/apps/analyzer.py index d85f36c..b652534 100644 --- a/apps/analyzer.py +++ b/apps/analyzer.py @@ -21,7 +21,7 @@ from jagereye_ng import logging # TODO: this must be configuration in the future -MONGODB_URL = 'mongodb://localhost:27017/' +MONGODB_URL = 'mongodb://localhost:27019/' MONGODB_NAME = 'jager_test' @@ -386,9 +386,9 @@ def restart_existed_analyzers(self): # start self._analyzers self._analyzers[sid].start() except KeyError: - raise RuntimeError("Analyzer not found") - except ServerSelectionTimeoutError: - raise RuntimeError("Mongodb connect failed") + raise + except ServerSelectionTimeoutError as err: + raise RuntimeError("Mongodb connect failed", err)