From 0f4bfc4f8aae7f8ef7065a5e61a5d80c0e1623f9 Mon Sep 17 00:00:00 2001 From: Casey Litton Date: Thu, 8 Oct 2020 14:08:25 -0700 Subject: [PATCH 1/6] SNO-174 Add local storage with redis --- .circleci/config.yml | 3 +- base.ini | 3 + development.ini | 9 ++ src/snovault/dev_servers.py | 22 ++- src/snovault/local_storage.py | 77 ++++++++++ src/snovault/tests/local_storage_fixture.py | 149 ++++++++++++++++++++ src/snovault/tests/serverfixtures.py | 33 +++++ src/snovault/tests/test_key.py | 6 +- src/snovault/tests/test_local_storage.py | 99 +++++++++++++ src/snovault/tests/testappfixtures.py | 4 + src/snowflakes/tests/conftest.py | 4 + 11 files changed, 403 insertions(+), 6 deletions(-) create mode 100644 src/snovault/local_storage.py create mode 100644 src/snovault/tests/local_storage_fixture.py create mode 100644 src/snovault/tests/test_local_storage.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 944ab45935..76d22c0328 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -47,7 +47,8 @@ commands: openjdk-11-jdk \ postgresql-${PG_VERSION} \ ruby2.3 \ - ruby2.3-dev + ruby2.3-dev \ + redis-server sudo chown -R circleci /etc/elasticsearch sed -i "1s;^;export PATH=${ES_BIN}:${PG_BIN}:$PATH\n;" $BASH_ENV sudo apt-get install -y python3.7-dev python3-pip diff --git a/base.ini b/base.ini index aad6e6f68b..ca133380a2 100644 --- a/base.ini +++ b/base.ini @@ -1,5 +1,6 @@ [app:app] use = egg:snovault#snowflakes +config_name = app sqlalchemy.url = postgresql:///snowflakes retry.attempts = 3 file_upload_bucket = snowflakes-files-dev @@ -41,6 +42,7 @@ embed_cache.capacity = 5000 [composite:indexer] use = egg:snovault#indexer +config_name = indexer app = app path = /index timeout = 60 @@ -55,6 +57,7 @@ set queue_worker_batch_size = 2000000 [composite:regionindexer] use = egg:snovault#indexer +config_name = regionindexer app = app path = /index_file timeout = 60 diff --git a/development.ini b/development.ini index f30543ec58..485299e3cc 100644 --- a/development.ini +++ b/development.ini @@ -7,7 +7,12 @@ use = config:base.ini#app sqlalchemy.url = postgresql://postgres@:5432/postgres?host=/tmp/snovault/pgdata snp_search.server = localhost:9200 +local_storage_db = 1 +local_storage_host = localhost +local_storage_port = 6378 +local_storage_timeout = 5 load_test_only = true +local_tz = US/Pacific create_tables = true testing = true postgresql.statement_timeout = 20 @@ -36,9 +41,13 @@ use = egg:rutter#urlmap [composite:indexer] use = config:base.ini#indexer +local_storage_db = 2 +local_storage_port = ${app:local_storage_db} [composite:regionindexer] use = config:base.ini#regionindexer +local_storage_db = 3 +local_storage_port = ${app:local_storage_db} ### # wsgi server configuration diff --git a/src/snovault/dev_servers.py b/src/snovault/dev_servers.py index 38c08bc588..b50ba0f533 100644 --- a/src/snovault/dev_servers.py +++ b/src/snovault/dev_servers.py @@ -6,7 +6,7 @@ """ from pkg_resources import resource_filename -from pyramid.paster import get_app +from pyramid.paster import get_app, get_appsettings from multiprocessing import Process import atexit @@ -62,17 +62,20 @@ def main(): parser.add_argument('--datadir', default='/tmp/snovault', help="path to datadir") args = parser.parse_args() + appsettings = get_appsettings(args.config_uri, name='app') + logging.basicConfig() # Loading app will have configured from config file. Reconfigure here: logging.getLogger('snovault').setLevel(logging.INFO) - from snovault.tests import elasticsearch_fixture, postgresql_fixture + from snovault.tests import elasticsearch_fixture, postgresql_fixture, local_storage_fixture from snovault.elasticsearch import create_mapping datadir = os.path.abspath(args.datadir) pgdata = os.path.join(datadir, 'pgdata') esdata = os.path.join(datadir, 'esdata') + local_storage_data = os.path.join(datadir, 'local_storage_data') if args.clear: - for dirname in [pgdata, esdata]: + for dirname in [pgdata, esdata, local_storage_data]: if os.path.exists(dirname): shutil.rmtree(dirname) if args.init: @@ -81,7 +84,18 @@ def main(): postgres = postgresql_fixture.server_process(pgdata, echo=True) elasticsearch = elasticsearch_fixture.server_process(esdata, echo=True) nginx = nginx_server_process(echo=True) - processes = [postgres, elasticsearch, nginx] + local_storage_port = appsettings.get('local_storage_port', 6379) + local_storage_config_path = local_storage_fixture.initdb( + local_storage_data, + local_storage_port, + echo=True, + ) + local_storage = local_storage_fixture.server_process( + local_storage_config_path, + local_storage_port, + echo=True, + ) + processes = [postgres, elasticsearch, nginx, local_storage] print_processes = [] diff --git a/src/snovault/local_storage.py b/src/snovault/local_storage.py new file mode 100644 index 0000000000..7d1e2f3478 --- /dev/null +++ b/src/snovault/local_storage.py @@ -0,0 +1,77 @@ +import binascii + +from os import urandom +from datetime import datetime +from pytz import timezone + +from redis import StrictRedis + + +def base_result(local_store): + local_dt = datetime.now(timezone(local_store.local_tz)) + return { + 'utc_now': str(datetime.utcnow()), + 'lcl_now': f"{local_store.local_tz}: {local_dt}", + } + + +class LocalStoreClient(): + ''' + Light redis wrapper and redis examples + - get_tag function was added to return hex str + - Can access client directly for full functionality + - Server connection issues do not occur until accessing redis post __init__ + ''' + def __init__( + self, + db_index=0, + host='localhost', + local_tz='GMT', + port=6379, + timeout=5 + ): + self.local_tz = local_tz + redis_info = { + 'db': db_index, + 'host': host, + 'port': port, + 'socket_timeout': timeout, + } + self.client = StrictRedis( + charset='utf-8', + decode_responses=True, + **redis_info + ) + + @staticmethod + def get_tag(tag, num_bytes=8): + ''' + Tags are the tag and a bytes string + - Bytes string length is 2 * num bytes + ''' + rand_hex_str = binascii.b2a_hex(urandom(num_bytes)).decode('utf-8') + return f"{tag}:{rand_hex_str}" + + def ping(self): + return self.client.ping() + + def dict_get(self, key): + return self.client.hgetall(key) + + def dict_set(self, key, hash_dict): + return self.client.hmset(key, hash_dict) + + def get_tag_keys(self, tag): + return self.client.keys(f"{tag}:*") + + def item_get(self, key): + return self.client.get(key) + + def item_set(self, key, item): + return self.client.set(key, item) + + def list_add(self, key, item): + return self.client.lpush(key, item) + + def list_get(self, key, start=0, stop=-1): + return self.client.lrange(key, start, stop) diff --git a/src/snovault/tests/local_storage_fixture.py b/src/snovault/tests/local_storage_fixture.py new file mode 100644 index 0000000000..8c7d4dbca9 --- /dev/null +++ b/src/snovault/tests/local_storage_fixture.py @@ -0,0 +1,149 @@ +'''Local storage redis Fixture''' +import os +import sys + +import subprocess + + +REDIS_DEFAULTS = [ + # 'bind 127.0.0.1 ::1', Disabled ipv6 since circleci does not support + 'protected-mode yes', + # port 6379', + 'tcp-backlog 511', + 'timeout 0', + 'tcp-keepalive 300', + 'daemonize no', + 'supervised no', + # 'pidfile /var/run/redis_6379.pid', + 'loglevel notice', + 'logfile ""', + 'databases 16', + 'always-show-logo yes', + 'save 900 1', + 'save 300 10', + 'save 60 10000', + 'stop-writes-on-bgsave-error yes', + 'rdbcompression yes', + 'rdbchecksum yes', + 'dbfilename dump.rdb', + # 'dir /usr/local/var/db/redis/', + 'slave-serve-stale-data yes', + 'slave-read-only yes', + 'repl-diskless-sync no', + 'repl-diskless-sync-delay 5', + 'repl-disable-tcp-nodelay no', + 'slave-priority 100', + 'lazyfree-lazy-eviction no', + 'lazyfree-lazy-expire no', + 'lazyfree-lazy-server-del no', + 'slave-lazy-flush no', + 'appendonly no', + 'appendfilename "appendonly.aof"', + 'appendfsync everysec', + 'no-appendfsync-on-rewrite no', + 'auto-aof-rewrite-percentage 100', + 'auto-aof-rewrite-min-size 64mb', + 'aof-load-truncated yes', + 'aof-use-rdb-preamble no', + 'lua-time-limit 5000', + 'slowlog-max-len 128', + 'latency-monitor-threshold 0', + 'notify-keyspace-events ""', + 'hash-max-ziplist-entries 512', + 'hash-max-ziplist-value 64', + 'list-max-ziplist-size -2', + 'list-compress-depth 0', + 'set-max-intset-entries 512', + 'zset-max-ziplist-entries 128', + 'zset-max-ziplist-value 64', + 'hll-sparse-max-bytes 3000', + 'activerehashing yes', + 'client-output-buffer-limit normal 0 0 0', + 'client-output-buffer-limit slave 256mb 64mb 60', + 'client-output-buffer-limit pubsub 32mb 8mb 60', + 'hz 10', + 'aof-rewrite-incremental-fsync yes', +] + + +def initdb(datadir, redis_port, echo=False): + '''Create redis db config in data dir''' + redis_config_lines = REDIS_DEFAULTS.copy() + redis_config_lines.append(f"port {redis_port}") + redis_config_lines.append(f"pidfile /var/run/redis_{redis_port}.pid") + redis_config_lines.append(f"dir {datadir}") + redis_config_lines.append('bind 127.0.0.1') + redis_config_path = f"{datadir}/redis.conf" + if not os.path.exists(datadir): + os.makedirs(datadir) + with open(redis_config_path, '+w') as file_handler: + file_handler.writelines('\n'.join(redis_config_lines)) + if echo: + print(f"Redis Config created: {redis_config_path}") + return redis_config_path + + +def server_process(redis_config_path, redis_port, echo=False): + '''Start redis server''' + args = [ + os.path.join('redis-server'), + redis_config_path, + ] + if echo: + print(f"Starting redis server: {' '.join(args)}") + print(f"Connect with 'redis-cli -p {redis_port}'") + redis_process = subprocess.Popen( + args, + close_fds=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + return redis_process + + +def _main(): + import atexit + import shutil + import tempfile + datadir = tempfile.mkdtemp() + + def _cleanup(): + shutil.rmtree(datadir) + print(f"Redis Cleaned dir: {datadir}") + + try: + print(f"Starting in dir: {datadir}") + redis_config_dir = initdb(datadir, 6378, echo=True) + redis_process = server_process(redis_config_dir, 6378, echo=True) + except Exception as ecp: # pylint: disable=broad-except + _cleanup() + shutil.rmtree(datadir) + print('Cleaned dir: %s' % datadir) + raise ecp + + @atexit.register + def cleanup_process(): # pylint: disable=unused-variable + '''System Exit hook to remove db and kill redis server process''' + try: + if redis_process.poll() is None: + redis_process.terminate() + for line in redis_process.stdout: + sys.stdout.write(line.decode('utf-8')) + redis_process.wait() + finally: + _cleanup() + + try: + break_line = b'Ready to accept connections' + for line in iter(redis_process.stdout.readline, b''): + sys.stdout.write(line.decode('utf-8')) + strip_line = line.strip() + if strip_line.endswith(break_line) and break_line: + print('Redis Started. ^C to exit.') + break_line = None + except KeyboardInterrupt: + raise SystemExit(0) + + +if __name__ == '__main__': + _main() diff --git a/src/snovault/tests/serverfixtures.py b/src/snovault/tests/serverfixtures.py index 05f064d8fc..16001dc57a 100644 --- a/src/snovault/tests/serverfixtures.py +++ b/src/snovault/tests/serverfixtures.py @@ -1,5 +1,7 @@ import pytest import os + +from time import sleep from subprocess import TimeoutExpired @@ -82,6 +84,37 @@ def elasticsearch_server(request, elasticsearch_host_port): process.kill() +@pytest.mark.fixture_cost(10) +@pytest.yield_fixture(scope='session') +def redis_server(request, app_settings): + from .local_storage_fixture import initdb, server_process + local_storage_testdata = str( + request.config._tmpdirhandler.mktemp('local_storage_testdata', numbered=True) + ) + local_storage_testdata = '/tmp/snovault/local_storage_data' + redis_testdb = app_settings['local_storage_db'] + redis_testport = app_settings['local_storage_port'] + local_storage_config_testpath = initdb( + local_storage_testdata, + redis_testport, + echo=True, + ) + process = server_process( + local_storage_config_testpath, + redis_testport, + echo=True, + ) + sleep(0.25) + yield f"local test storage: redis-cli -p {redis_testport} -n {redis_testdb})" + + if 'process' in locals() and process.poll() is None: + process.terminate() + try: + process.wait(timeout=10) + except TimeoutExpired: + process.kill() + + # http://docs.sqlalchemy.org/en/rel_0_8/orm/session.html#joining-a-session-into-an-external-transaction # By binding the SQLAlchemy Session to an external transaction multiple testapp # requests can be rolled back at the end of the test. diff --git a/src/snovault/tests/test_key.py b/src/snovault/tests/test_key.py index fa9384b2fd..0ef09dd9d1 100644 --- a/src/snovault/tests/test_key.py +++ b/src/snovault/tests/test_key.py @@ -24,10 +24,14 @@ def autouse_external_tx(external_tx): @pytest.fixture(scope='session') -def app(DBSession): +def app(DBSession, app_settings): from pyramid.config import Configurator from snovault import DBSESSION config = Configurator() + config.registry.settings['local_storage_db'] = app_settings['local_storage_db'] + config.registry.settings['local_storage_host'] = app_settings['local_storage_host'] + config.registry.settings['local_storage_port'] = app_settings['local_storage_port'] + config.registry.settings['local_storage_timeout'] = app_settings['local_storage_timeout'] config.registry[DBSESSION] = DBSession config.include('snovault') config.include('.testing_key') diff --git a/src/snovault/tests/test_local_storage.py b/src/snovault/tests/test_local_storage.py new file mode 100644 index 0000000000..08ad703734 --- /dev/null +++ b/src/snovault/tests/test_local_storage.py @@ -0,0 +1,99 @@ +import pytest + +from snovault.local_storage import LocalStoreClient + + +def test_local_storage_server(redis_server, app_settings): + local_store = LocalStoreClient( + db_index=app_settings['local_storage_db'], + host=app_settings['local_storage_host'], + port=app_settings['local_storage_port'], + timeout=app_settings['local_storage_timeout'], + ) + try: + local_store.client.ping() + except Exception as excp: # pylint: disable=broad-except + print(excp) + assert False + assert True + + +class TestLocalStore(): + dict_key = 'fakedictkey' + item_key = 'fakeitemkey' + list_key = 'fakelistkey' + local_store = None + + @pytest.fixture(autouse=True) + def setup_method(self, redis_server, app_settings): + ''' + Add local store to test class and cleans up standard redis keys + - Uses the exposed redis client directly + ''' + self.local_store = LocalStoreClient( + db=app_settings['local_storage_db'], + host=app_settings['local_storage_host'], + port=app_settings['local_storage_port'], + timeout=app_settings['local_storage_timeout'], + ) + self.local_store.client.delete(self.dict_key) + self.local_store.client.delete(self.item_key) + for item in self.local_store.client.lrange(self.list_key, 0, -1): + self.local_store.client.delete(item) + self.local_store.client.delete(self.list_key) + + def test_init(self): + try: + self.local_store.ping() + except Exception as excp: # pylint: disable=broad-except + assert False + assert True + + def test_get_tag(self): + tag = self.local_store.get_tag() + assert isinstance(tag, str) + assert len(tag) == 16 + tag = self.local_store.get_tag(num_bytes=4) + assert len(tag) == 8 + + def test_dict_get(self): + hash_dict = self.local_store.dict_get(self.dict_key) + assert isinstance(hash_dict, dict) + assert hash_dict == {} + + def test_dict_set(self): + update_dict = {'somekey': 'somevalue'} + self.local_store.dict_set(self.dict_key, update_dict) + hash_dict = self.local_store.dict_get(self.dict_key) + assert hash_dict == update_dict + + def test_item_get(self): + item = self.local_store.item_get(self.item_key) + assert item is None + + def test_item_set(self): + set_item = 'someitem' + self.local_store.item_set(self.item_key, set_item) + item = self.local_store.item_get(self.item_key) + assert isinstance(item, str) + assert item == set_item + + def test_list_add(self): + # Lists are LIFO + items_to_add = ['A', 'B', 'C'] + for item in items_to_add: + self.local_store.list_add(self.list_key, item) + items = self.local_store.list_get(self.list_key, 0, -1) + items.sort() + assert items_to_add == items + + def test_list_get(self): + # Lists are LIFO + items_to_add = ['A', 'B', 'C', 'D', 'E'] + for item in items_to_add: + self.local_store.list_add(self.list_key, item) + items = self.local_store.list_get(self.list_key, 0, 0) + assert items[0] == items_to_add[-1] + items = self.local_store.list_get(self.list_key, 1, 3) + items_to_add.sort(reverse=True) + assert items == items_to_add[1:4] diff --git a/src/snovault/tests/testappfixtures.py b/src/snovault/tests/testappfixtures.py index 6a4f7fce2b..2e92e9c687 100644 --- a/src/snovault/tests/testappfixtures.py +++ b/src/snovault/tests/testappfixtures.py @@ -4,6 +4,10 @@ 'collection_datastore': 'database', 'item_datastore': 'database', 'load_test_only': True, + 'local_storage_db': 0, + 'local_storage_host': 'localhost', + 'local_storage_port': 6378, + 'local_storage_timeout': 5, 'testing': True, 'pyramid.debug_authorization': True, 'postgresql.statement_timeout': 20, diff --git a/src/snowflakes/tests/conftest.py b/src/snowflakes/tests/conftest.py index aac0234c38..f8779bb3a1 100644 --- a/src/snowflakes/tests/conftest.py +++ b/src/snowflakes/tests/conftest.py @@ -39,6 +39,10 @@ def autouse_external_tx(external_tx): 'multiauth.policy.webuser.namespace' : 'webuser', 'multiauth.policy.webuser.base' : 'snovault.authentication.WebUserAuthenticationPolicy', 'load_test_only': True, + 'local_storage_db': 0, + 'local_storage_host': 'localhost', + 'local_storage_port': 6378, + 'local_storage_timeout': 5, 'testing': True, 'pyramid.debug_authorization': True, 'postgresql.statement_timeout': 20, From 7fca08e1f3dee6b7d53b27340194ef759002fade Mon Sep 17 00:00:00 2001 From: Casey Litton Date: Wed, 21 Oct 2020 16:02:12 -0700 Subject: [PATCH 2/6] SNO-174-codereview-1 --- base.ini | 6 ++- development.ini | 16 +++--- requirements.osx.txt | 2 + requirements.txt | 2 + src/snovault/dev_servers.py | 26 ++++------ src/snovault/local_storage.py | 28 ++++------ src/snovault/tests/indexfixtures.py | 2 +- ...ge_fixture.py => redis_storage_fixture.py} | 19 +++---- src/snovault/tests/serverfixtures.py | 36 ++++++------- src/snovault/tests/test_indexing.py | 2 +- src/snovault/tests/test_key.py | 3 +- ...t_local_storage.py => test_redis_store.py} | 52 +++++++++++-------- src/snovault/tests/testappfixtures.py | 14 ++--- src/snowflakes/tests/conftest.py | 12 +++-- src/snowflakes/tests/features/conftest.py | 2 +- 15 files changed, 112 insertions(+), 110 deletions(-) rename src/snovault/tests/{local_storage_fixture.py => redis_storage_fixture.py} (85%) rename src/snovault/tests/{test_local_storage.py => test_redis_store.py} (69%) diff --git a/base.ini b/base.ini index ca133380a2..90c4a23d70 100644 --- a/base.ini +++ b/base.ini @@ -45,7 +45,8 @@ use = egg:snovault#indexer config_name = indexer app = app path = /index -timeout = 60 +set timeout = 60 +set config_name = indexer set embed_cache.capacity = 5000 set indexer = true set queue_type = Simple @@ -60,7 +61,8 @@ use = egg:snovault#indexer config_name = regionindexer app = app path = /index_file -timeout = 60 +set timeout = 60 +set config_name = regionindexer set embed_cache.capacity = 5000 set regionindexer = true diff --git a/development.ini b/development.ini index 485299e3cc..4ee96deeea 100644 --- a/development.ini +++ b/development.ini @@ -7,10 +7,6 @@ use = config:base.ini#app sqlalchemy.url = postgresql://postgres@:5432/postgres?host=/tmp/snovault/pgdata snp_search.server = localhost:9200 -local_storage_db = 1 -local_storage_host = localhost -local_storage_port = 6378 -local_storage_timeout = 5 load_test_only = true local_tz = US/Pacific create_tables = true @@ -18,6 +14,7 @@ testing = true postgresql.statement_timeout = 20 indexer.processes = + pyramid.reload_templates = true pyramid.debug_authorization = false pyramid.debug_notfound = true @@ -25,6 +22,13 @@ pyramid.debug_routematch = false pyramid.default_locale_name = en snovault.load_test_data = snowflakes.loadxl:load_test_data +# Local Storage: Settings must exist in... +# snovault/tests/[testappsettings.py, test_key.py] +# snowflakes/tests/conftest.py +local_storage_host = localhost +local_storage_port = 6378 +local_storage_redis_index = 1 +local_storage_timeout = 5 [pipeline:debug] pipeline = @@ -41,13 +45,9 @@ use = egg:rutter#urlmap [composite:indexer] use = config:base.ini#indexer -local_storage_db = 2 -local_storage_port = ${app:local_storage_db} [composite:regionindexer] use = config:base.ini#regionindexer -local_storage_db = 3 -local_storage_port = ${app:local_storage_db} ### # wsgi server configuration diff --git a/requirements.osx.txt b/requirements.osx.txt index a349892e3e..7c3173084b 100644 --- a/requirements.osx.txt +++ b/requirements.osx.txt @@ -1,4 +1,6 @@ pip==20.0.2 psycopg2==2.8.4 +redis==3.5.3 +redis-server==5.0.7 setuptools==45.1.0 zc.buildout==2.13.2 diff --git a/requirements.txt b/requirements.txt index a349892e3e..7c3173084b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ pip==20.0.2 psycopg2==2.8.4 +redis==3.5.3 +redis-server==5.0.7 setuptools==45.1.0 zc.buildout==2.13.2 diff --git a/src/snovault/dev_servers.py b/src/snovault/dev_servers.py index b50ba0f533..aaa00fcaaf 100644 --- a/src/snovault/dev_servers.py +++ b/src/snovault/dev_servers.py @@ -63,19 +63,24 @@ def main(): args = parser.parse_args() appsettings = get_appsettings(args.config_uri, name='app') + # Required settings in config + local_storage_host = appsettings['local_storage_host'] + local_storage_port = appsettings['local_storage_port'] + local_storage_redis_index = appsettings['local_storage_redis_index'] + local_storage_timeout = appsettings['local_storage_timeout'] logging.basicConfig() # Loading app will have configured from config file. Reconfigure here: logging.getLogger('snovault').setLevel(logging.INFO) - from snovault.tests import elasticsearch_fixture, postgresql_fixture, local_storage_fixture + from snovault.tests import elasticsearch_fixture, postgresql_fixture, redis_storage_fixture from snovault.elasticsearch import create_mapping datadir = os.path.abspath(args.datadir) pgdata = os.path.join(datadir, 'pgdata') esdata = os.path.join(datadir, 'esdata') - local_storage_data = os.path.join(datadir, 'local_storage_data') + redisdata = os.path.join(datadir, 'redisdata') if args.clear: - for dirname in [pgdata, esdata, local_storage_data]: + for dirname in [pgdata, esdata, redisdata]: if os.path.exists(dirname): shutil.rmtree(dirname) if args.init: @@ -84,18 +89,9 @@ def main(): postgres = postgresql_fixture.server_process(pgdata, echo=True) elasticsearch = elasticsearch_fixture.server_process(esdata, echo=True) nginx = nginx_server_process(echo=True) - local_storage_port = appsettings.get('local_storage_port', 6379) - local_storage_config_path = local_storage_fixture.initdb( - local_storage_data, - local_storage_port, - echo=True, - ) - local_storage = local_storage_fixture.server_process( - local_storage_config_path, - local_storage_port, - echo=True, - ) - processes = [postgres, elasticsearch, nginx, local_storage] + redis_config_path = redis_storage_fixture.initdb(redisdata, local_storage_port, echo=True) + redis = redis_storage_fixture.server_process(redis_config_path, local_storage_port, local_storage_redis_index, echo=True) + processes = [postgres, elasticsearch, nginx, redis] print_processes = [] diff --git a/src/snovault/local_storage.py b/src/snovault/local_storage.py index 7d1e2f3478..cff4e852a6 100644 --- a/src/snovault/local_storage.py +++ b/src/snovault/local_storage.py @@ -10,6 +10,7 @@ def base_result(local_store): local_dt = datetime.now(timezone(local_store.local_tz)) return { + '@type': ['result'], 'utc_now': str(datetime.utcnow()), 'lcl_now': f"{local_store.local_tz}: {local_dt}", } @@ -20,33 +21,22 @@ class LocalStoreClient(): Light redis wrapper and redis examples - get_tag function was added to return hex str - Can access client directly for full functionality - - Server connection issues do not occur until accessing redis post __init__ ''' - def __init__( - self, - db_index=0, - host='localhost', - local_tz='GMT', - port=6379, - timeout=5 - ): - self.local_tz = local_tz - redis_info = { - 'db': db_index, - 'host': host, - 'port': port, - 'socket_timeout': timeout, - } + def __init__(self, **kwargs): + self.local_tz = kwargs.get('local_tz', 'GMT') self.client = StrictRedis( charset='utf-8', decode_responses=True, - **redis_info + db=kwargs['db_index'], + host=kwargs['host'], + port=kwargs['port'], + socket_timeout=kwargs['socket_timeout'], ) @staticmethod - def get_tag(tag, num_bytes=8): + def get_tag(tag, num_bytes=2): ''' - Tags are the tag and a bytes string + Tags are the tag plus a random hex bytes string - Bytes string length is 2 * num bytes ''' rand_hex_str = binascii.b2a_hex(urandom(num_bytes)).decode('utf-8') diff --git a/src/snovault/tests/indexfixtures.py b/src/snovault/tests/indexfixtures.py index f28cb18f6a..893e60693a 100644 --- a/src/snovault/tests/indexfixtures.py +++ b/src/snovault/tests/indexfixtures.py @@ -7,7 +7,7 @@ def autouse_external_tx(external_tx): @pytest.fixture(scope='session') -def app_settings(wsgi_server_host_port, elasticsearch_server, postgresql_server): +def app_settings(wsgi_server_host_port, elasticsearch_server, postgresql_server, redis_server): from .testappfixtures import _app_settings settings = _app_settings.copy() settings['create_tables'] = True diff --git a/src/snovault/tests/local_storage_fixture.py b/src/snovault/tests/redis_storage_fixture.py similarity index 85% rename from src/snovault/tests/local_storage_fixture.py rename to src/snovault/tests/redis_storage_fixture.py index 8c7d4dbca9..be0f4b20f9 100644 --- a/src/snovault/tests/local_storage_fixture.py +++ b/src/snovault/tests/redis_storage_fixture.py @@ -5,16 +5,17 @@ import subprocess +# Commented out config is added in 'initdb' below REDIS_DEFAULTS = [ - # 'bind 127.0.0.1 ::1', Disabled ipv6 since circleci does not support + # 'bind 127.0.0.1 ::1', Disabled ipv6 since circleci does not support 'protected-mode yes', - # port 6379', + # port 6379', Port change 'tcp-backlog 511', 'timeout 0', 'tcp-keepalive 300', 'daemonize no', 'supervised no', - # 'pidfile /var/run/redis_6379.pid', + # 'pidfile /var/run/redis_6379.pid', Port change 'loglevel notice', 'logfile ""', 'databases 16', @@ -26,7 +27,7 @@ 'rdbcompression yes', 'rdbchecksum yes', 'dbfilename dump.rdb', - # 'dir /usr/local/var/db/redis/', + # 'dir /usr/local/var/db/redis/', Variable database location 'slave-serve-stale-data yes', 'slave-read-only yes', 'repl-diskless-sync no', @@ -83,15 +84,15 @@ def initdb(datadir, redis_port, echo=False): return redis_config_path -def server_process(redis_config_path, redis_port, echo=False): +def server_process(redis_config_path, redis_port, redis_index, echo=False): '''Start redis server''' args = [ os.path.join('redis-server'), - redis_config_path, + f"{redis_config_path}", ] if echo: print(f"Starting redis server: {' '.join(args)}") - print(f"Connect with 'redis-cli -p {redis_port}'") + print(f"Connect with 'redis-cli -p {redis_port} -n {redis_index}'") redis_process = subprocess.Popen( args, close_fds=True, @@ -113,8 +114,8 @@ def _cleanup(): try: print(f"Starting in dir: {datadir}") - redis_config_dir = initdb(datadir, 6378, echo=True) - redis_process = server_process(redis_config_dir, 6378, echo=True) + redis_config_path = initdb(datadir, 6378, echo=True) + redis_process = server_process(redis_config_path, 6378, echo=True) except Exception as ecp: # pylint: disable=broad-except _cleanup() shutil.rmtree(datadir) diff --git a/src/snovault/tests/serverfixtures.py b/src/snovault/tests/serverfixtures.py index 16001dc57a..e0fde7be0c 100644 --- a/src/snovault/tests/serverfixtures.py +++ b/src/snovault/tests/serverfixtures.py @@ -4,6 +4,8 @@ from time import sleep from subprocess import TimeoutExpired +from pyramid.paster import get_appsettings + def pytest_configure(): import logging @@ -86,27 +88,21 @@ def elasticsearch_server(request, elasticsearch_host_port): @pytest.mark.fixture_cost(10) @pytest.yield_fixture(scope='session') -def redis_server(request, app_settings): - from .local_storage_fixture import initdb, server_process - local_storage_testdata = str( - request.config._tmpdirhandler.mktemp('local_storage_testdata', numbered=True) - ) - local_storage_testdata = '/tmp/snovault/local_storage_data' - redis_testdb = app_settings['local_storage_db'] - redis_testport = app_settings['local_storage_port'] - local_storage_config_testpath = initdb( - local_storage_testdata, - redis_testport, - echo=True, - ) - process = server_process( - local_storage_config_testpath, - redis_testport, - echo=True, - ) +def redis_server(request): + from .redis_storage_fixture import initdb, server_process + datadir = str(request.config._tmpdirhandler.mktemp('redisdatatest', numbered=True)) + appsettings = get_appsettings('development.ini', name='app') + # Required settings in config + local_storage_host = appsettings['local_storage_host'] + local_storage_port = appsettings['local_storage_port'] + local_storage_redis_index = appsettings['local_storage_redis_index'] + local_storage_timeout = appsettings['local_storage_timeout'] + # Build fixture + redis_config_path = initdb(datadir, local_storage_port, echo=True) + process = server_process(redis_config_path, local_storage_port, local_storage_redis_index, echo=True) + # Sleep for short time to allow redis db to initialize sleep(0.25) - yield f"local test storage: redis-cli -p {redis_testport} -n {redis_testdb})" - + yield f"Redis testing: redis-cli -p {local_storage_port} -n {local_storage_redis_index})" if 'process' in locals() and process.poll() is None: process.terminate() try: diff --git a/src/snovault/tests/test_indexing.py b/src/snovault/tests/test_indexing.py index c395a0994c..d46fffc3c5 100644 --- a/src/snovault/tests/test_indexing.py +++ b/src/snovault/tests/test_indexing.py @@ -16,7 +16,7 @@ def autouse_external_tx(external_tx): @pytest.fixture(scope='session') -def app_settings(wsgi_server_host_port, elasticsearch_server, postgresql_server): +def app_settings(wsgi_server_host_port, elasticsearch_server, postgresql_server, redis_server): from .testappfixtures import _app_settings settings = _app_settings.copy() settings['create_tables'] = True diff --git a/src/snovault/tests/test_key.py b/src/snovault/tests/test_key.py index 0ef09dd9d1..b48266fe12 100644 --- a/src/snovault/tests/test_key.py +++ b/src/snovault/tests/test_key.py @@ -28,10 +28,11 @@ def app(DBSession, app_settings): from pyramid.config import Configurator from snovault import DBSESSION config = Configurator() - config.registry.settings['local_storage_db'] = app_settings['local_storage_db'] config.registry.settings['local_storage_host'] = app_settings['local_storage_host'] config.registry.settings['local_storage_port'] = app_settings['local_storage_port'] + config.registry.settings['local_storage_redis_index'] = app_settings['local_storage_redis_index'] config.registry.settings['local_storage_timeout'] = app_settings['local_storage_timeout'] + config.registry.settings['local_tz'] = app_settings['local_tz'] config.registry[DBSESSION] = DBSession config.include('snovault') config.include('.testing_key') diff --git a/src/snovault/tests/test_local_storage.py b/src/snovault/tests/test_redis_store.py similarity index 69% rename from src/snovault/tests/test_local_storage.py rename to src/snovault/tests/test_redis_store.py index 08ad703734..e96ef558cc 100644 --- a/src/snovault/tests/test_local_storage.py +++ b/src/snovault/tests/test_redis_store.py @@ -1,17 +1,23 @@ import pytest +from redis import StrictRedis + from snovault.local_storage import LocalStoreClient -def test_local_storage_server(redis_server, app_settings): - local_store = LocalStoreClient( - db_index=app_settings['local_storage_db'], - host=app_settings['local_storage_host'], - port=app_settings['local_storage_port'], - timeout=app_settings['local_storage_timeout'], +def _get_client(local_settings): + return LocalStoreClient( + db_index=local_settings['local_storage_redis_index'], + host=local_settings['local_storage_host'], + port=local_settings['local_storage_port'], + socket_timeout=local_settings['local_storage_timeout'], ) + + +def test_local_storage_server_fixture(app_settings): + local_store = _get_client(app_settings) try: - local_store.client.ping() + local_store.ping() except Exception as excp: # pylint: disable=broad-except print(excp) assert False @@ -22,20 +28,14 @@ class TestLocalStore(): dict_key = 'fakedictkey' item_key = 'fakeitemkey' list_key = 'fakelistkey' - local_store = None @pytest.fixture(autouse=True) - def setup_method(self, redis_server, app_settings): + def setup_method(self, app_settings): ''' Add local store to test class and cleans up standard redis keys - Uses the exposed redis client directly ''' - self.local_store = LocalStoreClient( - db=app_settings['local_storage_db'], - host=app_settings['local_storage_host'], - port=app_settings['local_storage_port'], - timeout=app_settings['local_storage_timeout'], - ) + self.local_store = _get_client(app_settings) self.local_store.client.delete(self.dict_key) self.local_store.client.delete(self.item_key) for item in self.local_store.client.lrange(self.list_key, 0, -1): @@ -43,19 +43,27 @@ def setup_method(self, redis_server, app_settings): self.local_store.client.delete(self.list_key) def test_init(self): + assert hasattr(self.local_store, 'local_tz') + assert isinstance(self.local_store.local_tz, str) + assert self.local_store.local_tz == 'GMT' + assert hasattr(self.local_store, 'client') + assert isinstance(self.local_store.client, StrictRedis) + + def test_get_tag(self): + config_tag = 'localtesting' + tag = self.local_store.get_tag(config_tag) + assert isinstance(tag, str) + assert len(tag) == len(config_tag) + len(':') + 4 + tag = self.local_store.get_tag(config_tag, num_bytes=4) + assert len(tag) == len(config_tag) + len(':') + 8 + + def test_ping(self): try: self.local_store.ping() except Exception as excp: # pylint: disable=broad-except assert False assert True - def test_get_tag(self): - tag = self.local_store.get_tag() - assert isinstance(tag, str) - assert len(tag) == 16 - tag = self.local_store.get_tag(num_bytes=4) - assert len(tag) == 8 - def test_dict_get(self): hash_dict = self.local_store.dict_get(self.dict_key) assert isinstance(hash_dict, dict) diff --git a/src/snovault/tests/testappfixtures.py b/src/snovault/tests/testappfixtures.py index 2e92e9c687..27eefbefcb 100644 --- a/src/snovault/tests/testappfixtures.py +++ b/src/snovault/tests/testappfixtures.py @@ -4,10 +4,6 @@ 'collection_datastore': 'database', 'item_datastore': 'database', 'load_test_only': True, - 'local_storage_db': 0, - 'local_storage_host': 'localhost', - 'local_storage_port': 6378, - 'local_storage_timeout': 5, 'testing': True, 'pyramid.debug_authorization': True, 'postgresql.statement_timeout': 20, @@ -26,12 +22,18 @@ 'multiauth.policy.accesskey.check': 'snovault.authentication.basic_auth_check', 'multiauth.policy.webuser.use': 'snovault.authentication.NamespacedAuthenticationPolicy', 'multiauth.policy.webuser.namespace': 'webuser', - 'multiauth.policy.webuser.base': 'snovault.authentication.WebUserAuthenticationPolicy' + 'multiauth.policy.webuser.base': 'snovault.authentication.WebUserAuthenticationPolicy', + # Local Storage + 'local_storage_host': 'localhost', + 'local_storage_port': 6378, + 'local_storage_redis_index': 0, + 'local_storage_timeout': 5, + 'local_tz': 'GMT', } @pytest.fixture(scope='session') -def app_settings(request, wsgi_server_host_port, conn, DBSession): +def app_settings(request, wsgi_server_host_port, conn, DBSession, redis_server): from snovault import DBSESSION settings = _app_settings.copy() settings[DBSESSION] = DBSession diff --git a/src/snowflakes/tests/conftest.py b/src/snowflakes/tests/conftest.py index f8779bb3a1..ab467adbf7 100644 --- a/src/snowflakes/tests/conftest.py +++ b/src/snowflakes/tests/conftest.py @@ -39,20 +39,22 @@ def autouse_external_tx(external_tx): 'multiauth.policy.webuser.namespace' : 'webuser', 'multiauth.policy.webuser.base' : 'snovault.authentication.WebUserAuthenticationPolicy', 'load_test_only': True, - 'local_storage_db': 0, - 'local_storage_host': 'localhost', - 'local_storage_port': 6378, - 'local_storage_timeout': 5, 'testing': True, 'pyramid.debug_authorization': True, 'postgresql.statement_timeout': 20, 'retry.attempts': 3, 'ontology_path': pkg_resources.resource_filename('snowflakes', '../../ontology.json'), + # Local Storage + 'local_storage_host': 'localhost', + 'local_storage_port': 6378, + 'local_storage_redis_index': 0, + 'local_storage_timeout': 5, + 'local_tz': 'GMT', } @fixture(scope='session') -def app_settings(request, wsgi_server_host_port, conn, DBSession): +def app_settings(request, wsgi_server_host_port, conn, DBSession, redis_server): from snovault import DBSESSION settings = _app_settings.copy() settings[DBSESSION] = DBSession diff --git a/src/snowflakes/tests/features/conftest.py b/src/snowflakes/tests/features/conftest.py index 44049d80b9..5f66e73c54 100644 --- a/src/snowflakes/tests/features/conftest.py +++ b/src/snowflakes/tests/features/conftest.py @@ -19,7 +19,7 @@ def external_tx(): @pytest.fixture(scope='session') -def app_settings(wsgi_server_host_port, elasticsearch_server, postgresql_server): +def app_settings(wsgi_server_host_port, elasticsearch_server, postgresql_server, redis_server): from snovault.tests.testappfixtures import _app_settings settings = _app_settings.copy() settings['create_tables'] = True From c8a6a1c382940755483b47d20116828cb9b41b51 Mon Sep 17 00:00:00 2001 From: Casey Litton Date: Wed, 21 Oct 2020 16:02:32 -0700 Subject: [PATCH 3/6] SNO-173-Report-clear-indexer-state --- base.ini | 2 - src/snovault/__init__.py | 1 + src/snovault/elasticsearch/__init__.py | 1 + src/snovault/elasticsearch/indexer.py | 22 +- src/snovault/elasticsearch/interfaces.py | 4 + .../elasticsearch/local_indexer_store.py | 312 ++++++++++ .../tests/test_indexer_store_views.py | 570 ++++++++++++++++++ .../tests/test_local_indexer_store.py | 314 ++++++++++ src/snovault/local_storage.py | 1 - 9 files changed, 1223 insertions(+), 4 deletions(-) create mode 100644 src/snovault/elasticsearch/local_indexer_store.py create mode 100644 src/snovault/elasticsearch/tests/test_indexer_store_views.py create mode 100644 src/snovault/elasticsearch/tests/test_local_indexer_store.py diff --git a/base.ini b/base.ini index 90c4a23d70..3d9772661b 100644 --- a/base.ini +++ b/base.ini @@ -42,7 +42,6 @@ embed_cache.capacity = 5000 [composite:indexer] use = egg:snovault#indexer -config_name = indexer app = app path = /index set timeout = 60 @@ -58,7 +57,6 @@ set queue_worker_batch_size = 2000000 [composite:regionindexer] use = egg:snovault#indexer -config_name = regionindexer app = app path = /index_file set timeout = 60 diff --git a/src/snovault/__init__.py b/src/snovault/__init__.py index 3cee982490..f9fe86366e 100644 --- a/src/snovault/__init__.py +++ b/src/snovault/__init__.py @@ -68,6 +68,7 @@ def includeme(config): config.include('.crud_views') config.include('.indexing_views') config.include('.resource_views') + config.include('.elasticsearch.local_indexer_store') def main(global_config, **local_config): diff --git a/src/snovault/elasticsearch/__init__.py b/src/snovault/elasticsearch/__init__.py index 97e5949d71..eea412cdaf 100644 --- a/src/snovault/elasticsearch/__init__.py +++ b/src/snovault/elasticsearch/__init__.py @@ -42,6 +42,7 @@ def includeme(config): config.include('.indexer') config.include('.indexer_state') + config.include('.local_indexer_store') if asbool(settings.get('indexer')) and not PY2: config.include('.mpindexer') diff --git a/src/snovault/elasticsearch/indexer.py b/src/snovault/elasticsearch/indexer.py index 2fb9aab14c..c5410a5041 100644 --- a/src/snovault/elasticsearch/indexer.py +++ b/src/snovault/elasticsearch/indexer.py @@ -20,10 +20,13 @@ from snovault.storage import ( TransactionRecord, ) +from snovault.elasticsearch.local_indexer_store import IndexerStore + from urllib3.exceptions import ReadTimeoutError from .interfaces import ( ELASTIC_SEARCH, INDEXER, + INDEXER_STORE, RESOURCES_INDEX, ) from .indexer_state import ( @@ -90,6 +93,9 @@ def includeme(config): _update_for_uuid_queues(registry) if not processes: registry[INDEXER] = Indexer(registry) + if not registry.get(INDEXER_STORE): + registry[INDEXER_STORE] = IndexerStore(config.registry.settings) + def get_related_uuids(request, es, updated, renamed): @@ -336,6 +342,8 @@ def _get_nodes(request, indexer_state): @view_config(route_name='index', request_method='POST', permission="index") def index(request): + indexer_store = request.registry[INDEXER_STORE] + indexer_store.set_state(indexer_store.state_endpoint_start) # Setting request.datastore here only works because routed views are not traversed. request.datastore = 'database' session = request.registry[DBSESSION]() @@ -366,6 +374,7 @@ def index(request): return {'this_node': this_node, 'other_node': other_node} def _load_indexing(request, session, connection, indexer_state): + indexer_store.set_state(indexer_store.state_load_indexing) first_txn = None snapshot_id = None (xmin, invalidated, restart) = indexer_state.priority_cycle(request) @@ -493,7 +502,11 @@ def _run_indexing( result = indexer_state.start_cycle(invalidated, result) # Do the work... - + local_state, event_tag = indexer_store.set_state( + indexer_store.state_run_indexing, + # 'invalidated' indicates the start of indexing + invalidated_cnt=len(invalidated) + ) indexing_update_infos, errors, err_msg = request.registry[INDEXER].serve_objects( request, invalidated, @@ -501,6 +514,11 @@ def _run_indexing( snapshot_id=snapshot_id, restart=restart, ) + indexer_store.set_state( + indexer_store.state_run_indexing, + errors_cnt=len(errors), + event_tag=event_tag, + ) if err_msg: log.warning('Could not start indexing: %s', err_msg) result = indexer_state.finish_cycle(result,errors) @@ -544,6 +562,7 @@ def _run_indexing( output_tuple = _load_indexing(request, session, connection, indexer_state) result, invalidated, flush, first_txn, snapshot_id, restart, xmin, return_now = output_tuple if return_now: + indexer_store.set_state(indexer_store.state_waiting) return result indexing_update_infos = [] dry_run = request.json.get('dry_run', False) @@ -625,6 +644,7 @@ def _run_indexing( # opposed to in the indexer or just after serve_objects, # so a crash in logging does not interupt indexing complietion request.registry[INDEXER].check_log_indexing_times(indexing_update_infos) + indexer_store.set_state(indexer_store.state_waiting) return result diff --git a/src/snovault/elasticsearch/interfaces.py b/src/snovault/elasticsearch/interfaces.py index ae3f28961b..8e5ac948a8 100644 --- a/src/snovault/elasticsearch/interfaces.py +++ b/src/snovault/elasticsearch/interfaces.py @@ -5,6 +5,10 @@ ELASTIC_SEARCH = 'elasticsearch' SNP_SEARCH_ES = 'snp_search' INDEXER = 'indexer' +INDEXER_STORE = f"{INDEXER}_store" +INDEXER_STATE_TAG = f"{INDEXER}_state:hash" +INDEXER_EVENTS_TAG = f"{INDEXER}_event" +INDEXER_EVENTS_LIST = f"{INDEXER_EVENTS_TAG}:list" RESOURCES_INDEX = 'snovault-resources' diff --git a/src/snovault/elasticsearch/local_indexer_store.py b/src/snovault/elasticsearch/local_indexer_store.py new file mode 100644 index 0000000000..fae193572a --- /dev/null +++ b/src/snovault/elasticsearch/local_indexer_store.py @@ -0,0 +1,312 @@ +import datetime +import re +import time + +from pyramid.view import view_config + +from snovault.local_storage import ( + LocalStoreClient, + base_result, +) +from snovault.elasticsearch.interfaces import ( + INDEXER, + INDEXER_STORE, + INDEXER_STATE_TAG, + INDEXER_EVENTS_TAG, + INDEXER_EVENTS_LIST, +) + + +_EVENT_TAG_LEN = 4 +_EVENT_PATTERN = re.compile("^" + INDEXER_EVENTS_TAG + ":[a-z0-9]{" + str(2*_EVENT_TAG_LEN) + "}$") +_EVENTS_PATTERN = re.compile("^-?\d+:-?\d+$") +_EVENTS_DEFAULT_RANGE = '0:100' + + +def includeme(config): + config.scan(__name__) + config.add_route('indexer_store', '/indexer_store') + config.registry[INDEXER_STORE] = IndexerStore(config.registry.settings) + + +def _duration_with_unis_str(ts_start): + '''Return duration in seconds, minutes, or hours''' + seconds = float(time.time()) - int(ts_start) + div = 1.0 + unit = 'seconds' + if seconds > 60: + div = 60.0 + unit = 'minutes' + if seconds > 3600: + div = 3600.0 + unit = 'hours' + return f"{seconds/div:0.2f} {unit}" + + +@view_config(route_name='indexer_store', request_method='GET', request_param='events') +def indexer_store_events(request): + ''' + Get indexer event(s) based on regex of events value + * This redis list is LIFO + + 1. Value looks like an event tag: + * indexer_event:4fc592b0 + 2. Value looks like a range: + * All events=0:-1 + * First events=-1:-1 + * Last/Most recent events=0:0 + * Range recent events=4:94 + 3. Default no events value uses _EVENTS_DEFAULT_RANGE + ''' + request_argument = request.params.get("events") + if not request_argument: + # Test for falsey, request params returns empty string if not events value given + request_argument = _EVENTS_DEFAULT_RANGE + indexer_store = request.registry[INDEXER_STORE] + result = base_result(indexer_store) + if _EVENT_PATTERN.match(request_argument): + # request argument looks like an event_tag + result['event'] = {} + for event_key in indexer_store.get_tag_keys(request_argument): + result['event'][event_key] = indexer_store.item_get(event_key) + elif _EVENTS_PATTERN.match(request_argument): + # request argument looks like range string + colon = request_argument.index(':') + start = int(request_argument[:colon]) + stop = int(request_argument[colon+1:]) + event_tags = indexer_store.list_get(INDEXER_EVENTS_LIST, start, stop) + result['events'] = [] + for event_tag in event_tags: + result['events'].append(indexer_store.get_event(event_tag)) + else: + # Events arg is bad format? + result['error'] = f"Bad events value '{request_argument}'" + result['error_desc'] = 'Must be key(indexer_event:4fc592b0) or range(4:94) like' + request.query_string = "format=json" + return result + + +@view_config(route_name='indexer_store', request_method='GET', request_param='state=raw') +def indexer_store_state_raw(request): + '''Only return raw state object from redis''' + request.query_string = "format=json" + return request.registry[INDEXER_STORE].get_state() + + +@view_config(route_name='indexer_store', request_method='GET', request_param='state=split') +def indexer_store_state_split(request): + '''All state info organized into sections with some extra info and error checking''' + indexer_store = request.registry[INDEXER_STORE] + state_obj = indexer_store.get_state() + current_state = state_obj.get('state') + # Early return with raw full state due lack of state or still initializing + if not current_state: + return { + 'early_return': 'state_obj state keys is Falsey', + 'state_obj': state_obj, + } + elif current_state == IndexerStore.state_initialized[0]: + return { + 'early_return': 'state_obj state key is in intial state', + 'state_obj': state_obj, + } + # Normal return + result = base_result(indexer_store) + # Add Static Info + result['static'] = {} + for key in IndexerStore.static_keys: + result['static'][key] = state_obj[key] + # Add Dynamic State + result['dynamic'] = {} + for key in IndexerStore.dynamic_keys: + result['dynamic'][key] = state_obj[key] + # Determine if run events and run state + event_key = None + if current_state in [ + IndexerStore.state_endpoint_start[0], + IndexerStore.state_waiting[0], + IndexerStore.state_load_indexing[0], + ]: + event_key = 'previous_event' + elif current_state == IndexerStore.state_run_indexing[0]: + event_key = 'previous_event' + if state_obj['end_time'] == 'unknown': + event_key = 'current_event' + # Add current or previous run event keys + if event_key: + result[event_key] = {} + for key in indexer_store.event_keys: + result[event_key][key] = state_obj[key] + # Add formatted duration and event_tag if running + result[event_key]['formated_duration'] = _duration_with_unis_str(state_obj['start_ts']) + request.query_string = "format=json" + return result + + +@view_config(route_name='indexer_store', request_method='GET') +def indexer_store_state(request): + '''Minimal view for current state and additional info if running''' + request.query_string = "format=json" + state_obj = request.registry[INDEXER_STORE].get_state() + current_state = state_obj.get('state', 'not initialized') + result = { + 'state': current_state, + } + if state_obj.get('state') in [IndexerStore.state_endpoint_start[0], IndexerStore.state_load_indexing[0]]: + result['state_duration'] = _duration_with_unis_str(state_obj['endpoint_start_ts']) + elif state_obj.get('state') == IndexerStore.state_run_indexing[0]: + # Add formatted duration and event_tag if running + result['current_event_tag'] = state_obj.get('event_tag', 'not initialized') + result['current_duration'] = _duration_with_unis_str(state_obj['start_ts']) + result['current_invalidated_cnt'] = state_obj['invalidated_cnt'] + request.query_string = "format=json" + return result + + +class IndexerStore(LocalStoreClient): + config_name = INDEXER + state_initialized = ('state_init', 'initialized') + state_endpoint_start = ('state_endpoint_start', 'Endpoint started running') + state_load_indexing = ('state_load_indexing', 'Endpoint checking for uuids to index') + state_run_indexing = ('state_run_indexing', 'Endpoint found uuids and started indexing') + state_waiting = ('state_waiting', 'Waiting to call endpoint') + static_keys = [ + 'addr', + 'config_name', + 'init_dt', + 'loop_time', + 'pg_ip', + 'remote_indexing', + 'state_key', + ] + dynamic_keys = [ + 'endpoint_end', + 'endpoint_start', + 'endpoint_start_ts', + 'state', + 'state_desc', + 'state_error', + ] + event_keys = [ + 'duration', + 'end_time', + 'errors_cnt', + 'event_tag', + 'invalidated_cnt', + 'start_ts', + ] + + def __init__(self, settings): + super().__init__( + db_index=settings['local_storage_redis_index'], + host=settings['local_storage_host'], + port=settings['local_storage_port'], + local_tz=settings['local_tz'], + socket_timeout=int(settings['local_storage_timeout']), + ) + if settings.get('config_name', 'no name') == IndexerStore.config_name: + # only the indexer process should set these attributes and initialize this store + self.pg_ip = str(settings.get('pg_ip', 'localhost')) + self.remote_indexing = str(settings.get('remote_indexing', 'false')) + self.loop_time = str(settings.get('timeout', 'unknown')) + curr_state = self.get_state() + if not curr_state: + self._init_state() + + def _init_state(self): + '''Indexer state in redis is named after the process in the config''' + init_state = { + # Static + 'addr': f"{self.config_name}:{id(self)}", + 'config_name': self.config_name, + 'init_dt': str(datetime.datetime.utcnow()), + 'loop_time': str(self.loop_time), + 'pg_ip': self.pg_ip, + 'remote_indexing': self.remote_indexing, + 'state_key': INDEXER_STATE_TAG, + } + # Dynamic + for key in self.dynamic_keys: + init_state[key] = 'unknown' + init_state['state'] = self.state_initialized[0] + init_state['state_desc'] = self.state_initialized[1] + # Run event + for key in self.event_keys: + init_state[key] = 'unknown' + self.dict_set(INDEXER_STATE_TAG, init_state) + + def _end_event(self, event_tag, state): + '''Close event with only certain event keys in state. Also add human readable date time''' + for event_key in ['duration', 'end_time', 'errors_cnt']: + self.item_set(f"{event_tag}:{event_key}", state[event_key]) + self.item_set(f"{event_tag}:end", str(datetime.datetime.utcnow())) + + def _start_event(self, event_tag, state): + '''Create new event with info from state in events keys''' + self.list_add(INDEXER_EVENTS_LIST, event_tag) + for event_key in self.event_keys: + self.item_set(f"{event_tag}:{event_key}", state[event_key]) + + def get_event(self, event_tag): + end = str(self.item_get(event_tag + ':end')) + invalidated_cnt = str(self.item_get(event_tag + ':invalidated_cnt')) + duration = str(self.item_get(event_tag + ':duration')) + msg = f"Indexed '{invalidated_cnt}' uuids in '{duration}' seconds. Ended at '{end}'" + return f"{event_tag}: {msg}" + + def get_state(self): + return self.dict_get(INDEXER_STATE_TAG) + + def set_state(self, state_tuple, errors_cnt=0, invalidated_cnt=0, event_tag=None): + state = self.get_state() + if not isinstance(state_tuple, tuple): + # Invalid State type + state['state_error'] = f"state, '{str(state_tuple)}', is not a tuple" + self.dict_set(INDEXER_STATE_TAG, state) + return self.get_state(), None + elif not len(state_tuple) == 2: + # Invalid State len + state['state_error'] = f"state, {state_tuple}, is wrong length" + self.dict_set(INDEXER_STATE_TAG, state) + return self.get_state(), None + elif state_tuple[0] == self.state_endpoint_start[0]: + # Reset + state['endpoint_end'] = 'tbd' + state['endpoint_start'] = str(datetime.datetime.utcnow()) + state['endpoint_start_ts'] = str(int(time.time())) + state['state_error'] = 'tbd' + elif state_tuple[0] == self.state_load_indexing[0]: + # Checking for uuids to index + pass + elif state_tuple[0] == self.state_run_indexing[0] and invalidated_cnt: + # Start indexing + event_tag = self.get_tag(INDEXER_EVENTS_TAG, num_bytes=_EVENT_TAG_LEN) + state['event_tag'] = event_tag + state['invalidated_cnt'] = str(invalidated_cnt) + state['start_ts'] = str(int(time.time())) + self._start_event(state['event_tag'], state) + elif state_tuple[0] == self.state_run_indexing[0] and event_tag: + # End indexing + state['end_time'] = str(datetime.datetime.utcnow()) + state['errors_cnt'] = str(errors_cnt) + duration = int(time.time()) - int(state['start_ts']) + state['duration'] = str(duration) + self._end_event(event_tag, state) + elif state_tuple[0] == self.state_waiting[0]: + # Waiting in es_index_listener for timeout + state['endpoint_end'] = str(datetime.datetime.utcnow()) + elif state_tuple[0] == self.state_run_indexing[0]: + # Invalid State + state['state_error'] = f"{str(state_tuple)} requries additional arguments" + self.dict_set(INDEXER_STATE_TAG, state) + return self.get_state(), None + else: + # Invalid State + state['state_error'] = f"{str(state_tuple)} is not a valid state change" + self.dict_set(INDEXER_STATE_TAG, state) + return self.get_state(), None + # Set valid state and store + state['state'] = state_tuple[0] + state['state_desc'] = state_tuple[1] + self.dict_set(INDEXER_STATE_TAG, state) + return self.get_state(), event_tag diff --git a/src/snovault/elasticsearch/tests/test_indexer_store_views.py b/src/snovault/elasticsearch/tests/test_indexer_store_views.py new file mode 100644 index 0000000000..151094f000 --- /dev/null +++ b/src/snovault/elasticsearch/tests/test_indexer_store_views.py @@ -0,0 +1,570 @@ +import datetime +import pytest +import time + +from unittest.mock import Mock, patch + +from snovault.elasticsearch.interfaces import ( + INDEXER, + INDEXER_EVENTS_LIST, + INDEXER_EVENTS_TAG, + INDEXER_STATE_TAG, +) +from snovault.elasticsearch.local_indexer_store import ( + IndexerStore, + _EVENT_TAG_LEN, +) + +def _setup_indexer_store(settings): + app_settings['config_name'] = INDEXER + return IndexerStore(app_settings) + + +class TestLocalStoreNoInitViews(): + ''' + No init. IndexerStore with 'app' configuration. + Calling the views prior to IndexerStore 'indexer' configuration initializing + ''' + + @pytest.fixture(autouse=True) + def setup_class(self, testapp, app_settings): + self.testapp = testapp + # Delete previous redis keys + self.local_store = IndexerStore(app_settings) + for key in self.local_store.get_tag_keys(INDEXER_EVENTS_TAG): + self.local_store.client.delete(key) + self.local_store.client.delete(INDEXER_EVENTS_LIST) + self.local_store.client.delete(INDEXER_STATE_TAG) + + # Event Views + def test_indexer_store_events_tag(self): + some_tag = 'a'*(2*_EVENT_TAG_LEN) + event_tag = f"{INDEXER_EVENTS_TAG}:{some_tag}" + url = f"/indexer_store?events={event_tag}" + result = self.testapp.get(url, status=200).json + assert 'events_cnt' not in result + assert 'events' not in result + assert 'event' in result + assert result['event'] == {} + + def test_indexer_store_events_bad_tag(self): + event_tag = 'not-a-tag' + url = f"/indexer_store?events={event_tag}" + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' in result + assert result['events_cnt'] == 0 + assert 'events' in result + assert result['events'] == [] + + some_tag = 'a'*(_EVENT_TAG_LEN) + event_tag = f"{INDEXER_EVENTS_TAG}:{some_tag}" + url = f"/indexer_store?events={event_tag}" + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' in result + assert result['events_cnt'] == 0 + assert 'events' in result + assert result['events'] == [] + + def test_indexer_store_events_range(self): + url = '/indexer_store?events=3:4' + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' not in result + assert 'events' in result + assert result['events'] == [] + + def test_indexer_store_events(self): + url = '/indexer_store?events' + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' in result + assert result['events_cnt'] == 0 + assert 'events' in result + assert result['events'] == [] + + # State Views + def test_indexer_store_state_only(self): + url = '/indexer_store?state=state' + result = self.testapp.get(url, status=200).json + assert 'state' in result + assert result['state'] == 'not initialized' + + def test_indexer_store_state_full(self): + url = '/indexer_store?state=full' + result = self.testapp.get(url, status=200).json + assert result == {} + + def test_indexer_store_state(self): + url = '/indexer_store?state' + result = self.testapp.get(url, status=200).json + assert result['early_return'] == 'state_obj state keys is Falsey' + assert result['state_obj'] == self.local_store.get_state() + + +class TestLocalStoreInitViews(): + ''' + Init. IndexerStore with 'indexer' configuration. + Calling the views prior to IndexerStore 'indexer' configuration being set in the index endpoint + ''' + + @pytest.fixture(autouse=True) + def setup_class(self, testapp, app_settings): + self.testapp = testapp + # Delete previous redis keys + self.local_store = IndexerStore(app_settings) + for key in self.local_store.get_tag_keys(INDEXER_EVENTS_TAG): + self.local_store.client.delete(key) + self.local_store.client.delete(INDEXER_EVENTS_LIST) + self.local_store.client.delete(INDEXER_STATE_TAG) + + @pytest.fixture(autouse=True) + def setup_method(self, app_settings): + # Delete previous redis keys + app_settings['config_name'] = INDEXER + self.local_store = IndexerStore(app_settings) + for key in self.local_store.get_tag_keys(INDEXER_EVENTS_TAG): + self.local_store.client.delete(key) + self.local_store.client.delete(INDEXER_EVENTS_LIST) + self.local_store.client.delete(INDEXER_STATE_TAG) + # Create new local store + app_settings['config_name'] = INDEXER + self.local_store = IndexerStore(app_settings) + + # Event Views + def test_indexer_store_events_tag(self): + some_tag = 'a'*(2*_EVENT_TAG_LEN) + event_tag = f"{INDEXER_EVENTS_TAG}:{some_tag}" + url = f"/indexer_store?events={event_tag}" + result = self.testapp.get(url, status=200).json + print(result) + assert 'events_cnt' not in result + assert 'events' not in result + assert 'event' in result + assert result['event'] == {} + + def test_indexer_store_events_bad_tag(self): + event_tag = 'not-a-tag' + url = f"/indexer_store?events={event_tag}" + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' in result + assert result['events_cnt'] == 0 + assert 'events' in result + assert result['events'] == [] + + some_tag = 'a'*(_EVENT_TAG_LEN) + event_tag = f"{INDEXER_EVENTS_TAG}:{some_tag}" + url = f"/indexer_store?events={event_tag}" + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' in result + assert result['events_cnt'] == 0 + assert 'events' in result + assert result['events'] == [] + + def test_indexer_store_events_range(self): + url = '/indexer_store?events=3:4' + result = self.testapp.get(url, status=200).json + print(result) + assert 'event' not in result + assert 'events_cnt' not in result + assert 'events' in result + assert result['events'] == [] + + def test_indexer_store_events(self): + url = '/indexer_store?events' + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' in result + assert result['events_cnt'] == 0 + assert 'events' in result + assert result['events'] == [] + + # State Views + def test_indexer_store_state_only(self): + url = '/indexer_store?state=state' + result = self.testapp.get(url, status=200).json + assert 'state' in result + assert result['state'] == IndexerStore.state_initialized[0] + + def test_indexer_store_state_full(self): + url = '/indexer_store?state=full' + result = self.testapp.get(url, status=200).json + assert result == self.local_store.get_state() + + def test_indexer_store_state(self): + url = '/indexer_store?state' + result = self.testapp.get(url, status=200).json + assert result['early_return'] == 'state_obj state key is in intial state' + assert result['state_obj'] == self.local_store.get_state() + + +class TestLocalStoreStateViews(): + ''' + IndexerStore with 'indexer' configuration. + - IndexerStore endpoint has been called + ''' + + @pytest.fixture(autouse=True) + def setup_class(self, testapp, app_settings): + self.testapp = testapp + # Delete previous redis keys + self.local_store = IndexerStore(app_settings) + for key in self.local_store.get_tag_keys(INDEXER_EVENTS_TAG): + self.local_store.client.delete(key) + self.local_store.client.delete(INDEXER_EVENTS_LIST) + self.local_store.client.delete(INDEXER_STATE_TAG) + + @pytest.fixture(autouse=True) + def setup_method(self, app_settings): + # Delete previous redis keys + app_settings['config_name'] = INDEXER + self.local_store = IndexerStore(app_settings) + for key in self.local_store.get_tag_keys(INDEXER_EVENTS_TAG): + print(key) + self.local_store.client.delete(key) + self.local_store.client.delete(INDEXER_EVENTS_LIST) + self.local_store.client.delete(INDEXER_STATE_TAG) + # Create new local store + app_settings['config_name'] = INDEXER + self.local_store = IndexerStore(app_settings) + + # State Views + @pytest.mark.parametrize( + 'state', + ( + IndexerStore.state_endpoint_start, + IndexerStore.state_load_indexing, + IndexerStore.state_waiting, + ) + ) + def test_indexer_store_state_no_event(self, state): + self.local_store.set_state(state) + url = '/indexer_store' + result = self.testapp.get(url, status=200).json + # Static + assert 'static' in result + assert isinstance(result['static'], dict) + res_keys = list(result['static'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.static_keys + # Dynamic + assert 'dynamic' in result + assert isinstance(result['dynamic'], dict) + res_keys = list(result['dynamic'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.dynamic_keys + # Event + assert 'previous_event' in result + assert isinstance(result['previous_event'], dict) + res_keys = list(result['previous_event'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.event_keys + for key in IndexerStore.event_keys: + assert result['previous_event'][key] == 'unknown' + + @pytest.mark.parametrize( + 'state', + ( + IndexerStore.state_endpoint_start, + IndexerStore.state_load_indexing, + IndexerStore.state_waiting, + ) + ) + def test_indexer_store_state_prev_event(self, state): + # Start run event + time_mock = Mock(wraps=time.time) + time_start = 38 + time_mock.return_value = time_start + invalidated_cnt=83 + event_tag = None + with patch('time.time', new=time_mock): + _, event_tag = self.local_store.set_state( + IndexerStore.state_run_indexing, + invalidated_cnt=invalidated_cnt + ) + # End run event + time_end = 48 + time_mock.return_value = time_end + errors_cnt = 81 + with patch('time.time', new=time_mock): + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = datetime.datetime(1999, 1, 1) + with patch('datetime.datetime', new=datetime_mock): + _, event_tag = self.local_store.set_state( + IndexerStore.state_run_indexing, + errors_cnt=errors_cnt, + event_tag=event_tag, + ) + # Test in other states + self.local_store.set_state(state) + url = '/indexer_store' + result = self.testapp.get(url, status=200).json + # Static + assert 'static' in result + assert isinstance(result['static'], dict) + res_keys = list(result['static'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.static_keys + # Dynamic + assert 'dynamic' in result + assert isinstance(result['dynamic'], dict) + res_keys = list(result['dynamic'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.dynamic_keys + # Event + assert 'previous_event' in result + assert isinstance(result['previous_event'], dict) + res_keys = list(result['previous_event'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.event_keys + assert result['previous_event']['duration'] == str(time_end - time_start) + assert result['previous_event']['end_time'] == str(datetime.datetime(1999, 1, 1)) + assert result['previous_event']['errors_cnt'] == str(errors_cnt) + assert result['previous_event']['event_tag'] == event_tag + assert result['previous_event']['invalidated_cnt'] == str(invalidated_cnt) + assert result['previous_event']['start_ts'] == str(time_start) + + def test_indexer_store_state_curr_event(self): + state, _ = self.local_store.set_state(IndexerStore.state_endpoint_start) + self.local_store.set_state(IndexerStore.state_load_indexing) + # Start run event + time_mock = Mock(wraps=time.time) + time_start = 38 + time_mock.return_value = time_start + invalidated_cnt=83 + event_tag = None + with patch('time.time', new=time_mock): + _, event_tag = self.local_store.set_state( + IndexerStore.state_run_indexing, + invalidated_cnt=invalidated_cnt + ) + # Test endpoint + url = '/indexer_store' + time_mock = Mock(wraps=time.time) + time_now = 49 + time_mock.return_value = time_now + result = None + with patch('time.time', new=time_mock): + result = self.testapp.get(url, status=200).json + # Static + assert 'static' in result + assert isinstance(result['static'], dict) + res_keys = list(result['static'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.static_keys + # Dynamic + assert 'dynamic' in result + assert isinstance(result['dynamic'], dict) + res_keys = list(result['dynamic'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.dynamic_keys + # Event + assert 'current_event' in result + assert isinstance(result['current_event'], dict) + assert 'current_duration' in result + duration = time_now - time_start + assert result['current_duration'] == f"{duration:0.2f} seconds" + res_keys = list(result['current_event'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.event_keys + assert result['current_event']['duration'] == 'unknown' + assert result['current_event']['end_time'] == 'unknown' + assert result['current_event']['errors_cnt'] == 'unknown' + assert result['current_event']['event_tag'] == event_tag + assert result['current_event']['invalidated_cnt'] == str(invalidated_cnt) + assert result['current_event']['start_ts'] == str(time_start) + + + def test_indexer_store_state_curr_event_minutes(self): + state, _ = self.local_store.set_state(IndexerStore.state_endpoint_start) + self.local_store.set_state(IndexerStore.state_load_indexing) + # Start run event + time_mock = Mock(wraps=time.time) + time_start = 10 + time_mock.return_value = time_start + invalidated_cnt=38 + event_tag = None + with patch('time.time', new=time_mock): + _, event_tag = self.local_store.set_state( + IndexerStore.state_run_indexing, + invalidated_cnt=invalidated_cnt + ) + # Test endpoint + url = '/indexer_store' + time_mock = Mock(wraps=time.time) + time_now = 130 + time_mock.return_value = time_now + result = None + with patch('time.time', new=time_mock): + result = self.testapp.get(url, status=200).json + # Current Event + assert 'current_event' in result + assert isinstance(result['current_event'], dict) + assert 'current_duration' in result + duration = (time_now - time_start)/60.0 + assert result['current_duration'] == f"{duration:0.2f} minutes" + + + def test_indexer_store_state_curr_event_hours(self): + state, _ = self.local_store.set_state(IndexerStore.state_endpoint_start) + self.local_store.set_state(IndexerStore.state_load_indexing) + # Start run event + time_mock = Mock(wraps=time.time) + time_start = 120 + time_mock.return_value = time_start + invalidated_cnt=38 + event_tag = None + with patch('time.time', new=time_mock): + _, event_tag = self.local_store.set_state( + IndexerStore.state_run_indexing, + invalidated_cnt=invalidated_cnt + ) + # Test endpoint + url = '/indexer_store' + time_mock = Mock(wraps=time.time) + time_now = 7320 + time_mock.return_value = time_now + result = None + with patch('time.time', new=time_mock): + result = self.testapp.get(url, status=200).json + # Current Event + assert 'current_event' in result + assert isinstance(result['current_event'], dict) + assert 'current_duration' in result + duration = (time_now - time_start)/3600.0 + assert result['current_duration'] == f"{duration:0.2f} hours" + + +class TestLocalStoreEventViews(): + + def _add_event(self, event_number): + event_dict = { + 'error_cnt': event_number*10, + 'invalidated_cnt': event_number*100, + 'time_start': event_number*1000, + 'time_end': event_number*10000, + } + self.local_store.set_state(IndexerStore.state_endpoint_start) + self.local_store.set_state(IndexerStore.state_load_indexing) + # Start run event + time_mock = Mock(wraps=time.time) + time_mock.return_value = event_dict['time_start'] + event_tag = None + with patch('time.time', new=time_mock): + _, event_tag = self.local_store.set_state( + IndexerStore.state_run_indexing, + invalidated_cnt=event_dict['invalidated_cnt'] + ) + event_dict['event_tag'] = event_tag + # End run event + time_mock.return_value = event_dict['time_end'] + with patch('time.time', new=time_mock): + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = datetime.datetime(1999, 1, 1) + with patch('datetime.datetime', new=datetime_mock): + self.local_store.set_state( + IndexerStore.state_run_indexing, + errors_cnt=event_dict['error_cnt'], + event_tag=event_dict['event_tag'], + ) + self.local_store.set_state(IndexerStore.state_waiting) + expected_result = {} + event_dict['duration'] = event_dict['time_end'] - event_dict['time_start'] + expected_result[f"{event_tag}:duration"] = str(event_dict['duration']) + expected_result[f"{event_tag}:invalidated_cnt"] = str(event_dict['invalidated_cnt']) + expected_result[f"{event_tag}:end"] = str(datetime.datetime(1999, 1, 1)) + return event_dict, expected_result + + @pytest.fixture(autouse=True) + def setup_class(self, testapp, app_settings): + self.testapp = testapp + # Delete previous redis keys + self.local_store = IndexerStore(app_settings) + for key in self.local_store.get_tag_keys(INDEXER_EVENTS_TAG): + self.local_store.client.delete(key) + self.local_store.client.delete(INDEXER_EVENTS_LIST) + self.local_store.client.delete(INDEXER_STATE_TAG) + + @pytest.fixture(autouse=True) + def setup_method(self, app_settings): + # Delete previous redis keys + app_settings['config_name'] = INDEXER + self.local_store = IndexerStore(app_settings) + for key in self.local_store.get_tag_keys(INDEXER_EVENTS_TAG): + print(key) + self.local_store.client.delete(key) + self.local_store.client.delete(INDEXER_EVENTS_LIST) + self.local_store.client.delete(INDEXER_STATE_TAG) + # Create new local store + app_settings['config_name'] = INDEXER + self.local_store = IndexerStore(app_settings) + + def test_indexer_store_events_one(self): + event_dict_one, expected_result_one = self._add_event(1) + # Test event + url = f"/indexer_store?events={event_dict_one['event_tag']}" + result = self.testapp.get(url, status=200).json + assert 'events_cnt' not in result + assert 'events' not in result + assert 'event' in result + assert result['event'] == expected_result_one + # Test Range + url = f"/indexer_store?events=0:1" + result = self.testapp.get(url, status=200).json + expected_result_str_one = f"{event_dict_one['event_tag']}: Indexed '{event_dict_one['invalidated_cnt']}' " + expected_result_str_one += f"uuids in '{event_dict_one['duration']}' seconds. Ended at '1999-01-01 00:00:00'" + assert 'event' not in result + assert 'events_cnt' not in result + assert 'events' in result + assert len(result['events']) == 1 + assert result['events'][0] == expected_result_str_one + # Test Tags + url = f"/indexer_store?events" + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' in result + assert result['events_cnt'] == 1 + assert 'events' in result + assert len(result['events']) == 1 + assert result['events'][0] == event_dict_one['event_tag'] + + def test_indexer_store_events_three(self): + event_dict_one, expected_result_one = self._add_event(1) + event_dict_two, expected_result_two = self._add_event(2) + event_dict_three, expected_result_three = self._add_event(3) + # Test event + url = f"/indexer_store?events={event_dict_two['event_tag']}" + result = self.testapp.get(url, status=200).json + assert 'events_cnt' not in result + assert 'events' not in result + assert 'event' in result + assert result['event'] == expected_result_two + # Test Range + url = f"/indexer_store?events=1:-1" + result = self.testapp.get(url, status=200).json + expected_result_str_one = f"{event_dict_one['event_tag']}: Indexed '{event_dict_one['invalidated_cnt']}' " + expected_result_str_one += f"uuids in '{event_dict_one['duration']}' seconds. Ended at '1999-01-01 00:00:00'" + expected_result_str_two = f"{event_dict_two['event_tag']}: Indexed '{event_dict_two['invalidated_cnt']}' " + expected_result_str_two += f"uuids in '{event_dict_two['duration']}' seconds. Ended at '1999-01-01 00:00:00'" + assert 'event' not in result + assert 'events_cnt' not in result + assert 'events' in result + assert len(result['events']) == 2 + for ev in result['events']: + print(ev) + assert result['events'][0] == expected_result_str_two + assert result['events'][1] == expected_result_str_one + # Test Tags + url = f"/indexer_store?events" + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' in result + assert result['events_cnt'] == 3 + assert 'events' in result + assert len(result['events']) == 3 + assert result['events'][0] == event_dict_three['event_tag'] + assert result['events'][1] == event_dict_two['event_tag'] + assert result['events'][2] == event_dict_one['event_tag'] diff --git a/src/snovault/elasticsearch/tests/test_local_indexer_store.py b/src/snovault/elasticsearch/tests/test_local_indexer_store.py new file mode 100644 index 0000000000..7ad36d7906 --- /dev/null +++ b/src/snovault/elasticsearch/tests/test_local_indexer_store.py @@ -0,0 +1,314 @@ +import datetime +import pytest +import time + +from copy import copy +from unittest.mock import Mock, patch + +from snovault.local_storage import LocalStoreClient + +from snovault.elasticsearch.local_indexer_store import ( + IndexerStore, + _EVENT_TAG_LEN, +) +from snovault.elasticsearch.interfaces import ( + INDEXER, + INDEXER_EVENTS_LIST, + INDEXER_EVENTS_TAG, + INDEXER_STATE_TAG, +) + + + +def _get_local_app_settings(settings): + local_settings = copy(settings) + local_settings['config_name'] = 'app' + return local_settings + + +def _get_local_indexer_settings(local_settings): + local_settings.update(_get_local_app_settings(local_settings)) + local_settings['config_name'] = INDEXER + local_settings['pg_ip'] = '1.2.3.4' + local_settings['remote_indexing'] = 'false' + local_settings['timeout'] = '10' + return local_settings + + +def test_init_app_config(app_settings): + local_settings = _get_local_app_settings(app_settings) + indexer_store = IndexerStore(local_settings) + assert not hasattr(indexer_store, 'pg_ip') + assert not hasattr(indexer_store, 'remote_indexing') + assert not hasattr(indexer_store, 'loop_time') + assert indexer_store.get_state() == {} + + +def test_init_indexer_config(app_settings): + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = datetime.datetime(1999, 1, 1) + with patch('datetime.datetime', new=datetime_mock): + local_settings = _get_local_indexer_settings(app_settings) + indexer_store = IndexerStore(local_settings) + assert hasattr(indexer_store, 'pg_ip') + assert hasattr(indexer_store, 'remote_indexing') + assert hasattr(indexer_store, 'loop_time') + state_hash = indexer_store.get_state() + # Keys + state_hash_keys = list(state_hash.keys()) + state_hash_keys.sort() + all_keys = list(IndexerStore.static_keys) + all_keys.extend(list(IndexerStore.dynamic_keys)) + all_keys.extend(list(IndexerStore.event_keys)) + all_keys.sort() + assert state_hash_keys == all_keys + # Static values + assert state_hash['addr'] == f"{INDEXER}:{id(indexer_store)}" + assert state_hash['config_name'] == INDEXER + assert state_hash['init_dt'] == str(datetime.datetime(1999, 1, 1)) + assert state_hash['loop_time'] == '10' + assert state_hash['pg_ip'] == '1.2.3.4' + assert state_hash['remote_indexing'] == 'false' + assert state_hash['state_key'] == INDEXER_STATE_TAG + # Dynamic + assert state_hash['endpoint_end'] == 'unknown' + assert state_hash['endpoint_start'] == 'unknown' + assert state_hash['state'] == indexer_store.state_initialized[0] + assert state_hash['state_error'] == 'unknown' + assert state_hash['state_desc'] == indexer_store.state_initialized[1] + # Event + for key in IndexerStore.event_keys: + assert state_hash[key] == 'unknown' + + +class TestLocalStore(): + fake_event_tag = 'fakeeventtag' + + @pytest.fixture(autouse=True) + def setup_method(self, app_settings): + ''' + Add local store to test class and cleans up standard redis keys + - Uses the exposed redis client directly + ''' + # Delete previous redis keys + local_settings = _get_local_indexer_settings(copy(app_settings)) + self.local_store = IndexerStore(local_settings) + for key in self.local_store.get_tag_keys(self.fake_event_tag): + self.local_store.client.delete(key) + self.local_store.client.delete(INDEXER_EVENTS_LIST) + self.local_store.client.delete(INDEXER_STATE_TAG) + # Create new local store + local_settings = _get_local_indexer_settings(copy(app_settings)) + self.local_store = IndexerStore(local_settings) + + def test_end_event(self): + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = datetime.datetime(1999, 1, 1) + with patch('datetime.datetime', new=datetime_mock): + duration = '4' + self.local_store._end_event(self.fake_event_tag, duration) + event_duration_tag = f"{self.fake_event_tag}:duration" + assert self.local_store.client.get(event_duration_tag) == duration + event_end_tag = f"{self.fake_event_tag}:end" + assert self.local_store.client.get(event_end_tag), str(datetime.datetime(1999, 1, 1)) + + def test_start_event(self): + invalidated_cnt = '4' + self.local_store._start_event(self.fake_event_tag, invalidated_cnt) + event_invalidated_cnt_tag = f"{self.fake_event_tag}:invalidated_cnt" + assert self.local_store.client.get(event_invalidated_cnt_tag) == invalidated_cnt + all_keys = self.local_store.client.lrange(INDEXER_EVENTS_LIST, 0, -1) + assert len(all_keys) == 1 + assert all_keys[0] == self.fake_event_tag + + def test_get_event(self): + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = datetime.datetime(1999, 1, 1) + with patch('datetime.datetime', new=datetime_mock): + invalidated_cnt = '14' + self.local_store._start_event(self.fake_event_tag, invalidated_cnt) + duration = '2' + self.local_store._end_event(self.fake_event_tag, duration) + event_str = self.local_store.get_event(self.fake_event_tag) + expected_event_str = f"Indexed '{invalidated_cnt}' uuids in '{duration}' seconds." + expected_event_str = f"{expected_event_str} Ended at '{datetime.datetime.utcnow()}'" + expected_event_str = f"{self.fake_event_tag}: {expected_event_str}" + assert event_str == expected_event_str + + def test_get_state(self): + test_dict = {'test': 'testing'} + self.local_store.client.delete(INDEXER_STATE_TAG) + self.local_store.client.hmset(INDEXER_STATE_TAG, test_dict) + assert self.local_store.get_state() == test_dict + + def test_set_state_endpoint_start(self): + original_state = self.local_store.get_state() + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = datetime.datetime(1999, 1, 1) + with patch('datetime.datetime', new=datetime_mock): + self.local_store.set_state(self.local_store.state_endpoint_start) + new_state = self.local_store.get_state() + for key, val in original_state.items(): + if key not in IndexerStore.dynamic_keys: + assert val == new_state[key] + assert new_state['endpoint_end'] == 'tbd' + assert new_state['endpoint_start'] == str(datetime.datetime(1999, 1, 1)) + assert new_state['state_error'] == 'tbd' + assert new_state['state'] == self.local_store.state_endpoint_start[0] + assert new_state['state_desc'] == self.local_store.state_endpoint_start[1] + + def test_set_state_load_indexing(self): + original_state = self.local_store.get_state() + self.local_store.set_state(self.local_store.state_load_indexing) + new_state = self.local_store.get_state() + for key, val in original_state.items(): + if key not in ['state', 'state_desc']: + assert val == new_state[key] + assert new_state['state'] == self.local_store.state_load_indexing[0] + assert new_state['state_desc'] == self.local_store.state_load_indexing[1] + + def test_set_state_run_indexing_start(self): + invalidated_cnt = 9 + original_state = self.local_store.get_state() + time_mock = Mock(wraps=time.time) + time_start = time.time() + time_mock.return_value = time_start + new_event_tag = None + new_state = None + with patch('time.time', new=time_mock): + new_state, new_event_tag = self.local_store.set_state( + self.local_store.state_run_indexing, + invalidated_cnt=invalidated_cnt, + ) + for key in IndexerStore.static_keys: + assert original_state[key] == new_state[key] + + assert new_state['endpoint_end'] == original_state['endpoint_end'] + assert new_state['endpoint_start'] == original_state['endpoint_start'] + assert new_state['state_error'] == original_state['state_error'] + assert new_state['state'] == self.local_store.state_run_indexing[0] + assert new_state['state_desc'] == self.local_store.state_run_indexing[1] + + assert new_state['end_time'] == original_state['end_time'] + assert new_state['errors_cnt'] == original_state['errors_cnt'] + some_tag = self.local_store.get_tag(INDEXER_EVENTS_TAG, num_bytes=_EVENT_TAG_LEN) + assert len(new_state['event_tag']) == len(some_tag) + assert new_state['event_tag'] == new_event_tag + assert new_state['duration'] == original_state['duration'] + assert new_state['invalidated_cnt'] == str(invalidated_cnt) + assert new_state['start_ts'] == str(int(time_start)) + + def test_set_state_run_indexing_end(self): + # start event + invalidated_cnt = 11 + time_mock = Mock(wraps=time.time) + time_start = 30 + time_mock.return_value = time_start + event_tag = None + original_state = None + with patch('time.time', new=time_mock): + original_state, event_tag = self.local_store.set_state( + self.local_store.state_run_indexing, + invalidated_cnt=invalidated_cnt, + ) + # end event + errors_cnt = 3 + time_mock = Mock(wraps=time.time) + time_end= 40 + time_mock.return_value = time_end + new_event_tag = None + new_state = None + with patch('time.time', new=time_mock): + new_state, new_event_tag = self.local_store.set_state( + self.local_store.state_run_indexing, + errors_cnt=errors_cnt, + event_tag=original_state['event_tag'], + ) + for key in IndexerStore.static_keys: + assert original_state[key] == new_state[key] + for key in IndexerStore.dynamic_keys: + assert original_state[key] == new_state[key] + + assert isinstance(new_state['end_time'], str) + assert new_state['errors_cnt'] == str(errors_cnt) + assert new_state['event_tag'] == event_tag + assert new_state['duration'] == str(time_end - time_start) + assert new_state['invalidated_cnt'] == original_state['invalidated_cnt'] + assert new_state['start_ts'] == original_state['start_ts'] + + def test_set_state_waiting(self): + # Start endpoint + original_state, _ = self.local_store.set_state(self.local_store.state_endpoint_start) + # Load indexing + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = datetime.datetime(1999, 1, 1) + with patch('datetime.datetime', new=datetime_mock): + self.local_store.set_state(self.local_store.state_load_indexing) + # Start Event + time_mock = Mock(wraps=time.time) + time_start = int(500) + time_mock.return_value = time_start + event_tag = None + original_state = None + with patch('time.time', new=time_mock): + _, event_tag = self.local_store.set_state( + self.local_store.state_run_indexing, + invalidated_cnt=7, + ) + # End Event + time_end = int(700) + time_mock.return_value = time_end + with patch('time.time', new=time_mock): + self.local_store.set_state(self.local_store.state_run_indexing, event_tag=event_tag) + # Endpoint End + new_state, _ = self.local_store.set_state(self.local_store.state_waiting) + # Make sure the last event exists during waiting state + assert new_state['event_tag'] == event_tag + assert new_state['duration'] == str(time_end - time_start) + # Check state + assert new_state['state'] == self.local_store.state_waiting[0] + assert new_state['state_desc'] == self.local_store.state_waiting[1] + + def test_set_state_run_indexing_no_args(self): + run_state = self.local_store.state_run_indexing + original_state, event_tag = self.local_store.set_state(self.local_store.state_run_indexing) + assert event_tag is None + assert original_state['state_error'] == f"{str(run_state)} requries additional arguments" + + def test_set_state_error_type(self): + bad_state = 'not a tuple' + expected_state_error = f"state, '{str(bad_state)}', is not a tuple" + state, _ = self.local_store.set_state(bad_state) + assert state['state_error'] == expected_state_error + + bad_state = IndexerStore + expected_state_error = f"state, '{str(bad_state)}', is not a tuple" + state, _ = self.local_store.set_state(bad_state) + assert state['state_error'] == expected_state_error + + def test_set_state_error_len(self): + bad_state = tuple() + expected_state_error = f"state, {bad_state}, is wrong length" + state, _ = self.local_store.set_state(bad_state) + assert state['state_error'] == expected_state_error + + bad_state = tuple(['a']) + expected_state_error = f"state, {bad_state}, is wrong length" + state, _ = self.local_store.set_state(bad_state) + assert state['state_error'] == expected_state_error + + bad_state = tuple(['a', 'b', 'c']) + expected_state_error = f"state, {bad_state}, is wrong length" + state, _ = self.local_store.set_state(bad_state) + assert state['state_error'] == expected_state_error + + def test_set_state_error_dne(self): + not_allow_state_change = tuple(['state_init', 'initialized']) + expected_state_error = f"{str(not_allow_state_change)} is not a valid state change" + state, _ = self.local_store.set_state(not_allow_state_change) + assert state['state_error'] == expected_state_error + + dne_state = tuple(['state_dne', 'does not exist']) + expected_state_error = f"{str(dne_state)} is not a valid state change" + state, _ = self.local_store.set_state(dne_state) + assert state['state_error'] == expected_state_error diff --git a/src/snovault/local_storage.py b/src/snovault/local_storage.py index cff4e852a6..1de0ecb8a8 100644 --- a/src/snovault/local_storage.py +++ b/src/snovault/local_storage.py @@ -10,7 +10,6 @@ def base_result(local_store): local_dt = datetime.now(timezone(local_store.local_tz)) return { - '@type': ['result'], 'utc_now': str(datetime.utcnow()), 'lcl_now': f"{local_store.local_tz}: {local_dt}", } From 5d28a38da64b2dc63f0de9fc5eec71fcc3ce58ae Mon Sep 17 00:00:00 2001 From: Casey Litton Date: Mon, 26 Oct 2020 10:27:45 -0700 Subject: [PATCH 4/6] pre and post review changes --- .../elasticsearch/local_indexer_store.py | 143 ++++++++++++++---- 1 file changed, 111 insertions(+), 32 deletions(-) diff --git a/src/snovault/elasticsearch/local_indexer_store.py b/src/snovault/elasticsearch/local_indexer_store.py index fae193572a..b231055989 100644 --- a/src/snovault/elasticsearch/local_indexer_store.py +++ b/src/snovault/elasticsearch/local_indexer_store.py @@ -29,9 +29,35 @@ def includeme(config): config.registry[INDEXER_STORE] = IndexerStore(config.registry.settings) -def _duration_with_unis_str(ts_start): - '''Return duration in seconds, minutes, or hours''' - seconds = float(time.time()) - int(ts_start) +def _convert_dt_str_to_ts(dt_str): + '''Expecting dt_str to be in the format 2020-10-23 17:09:36.442405''' + try: + return datetime.datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S.%f').timestamp() + except ValueError: + return None + + +def _duration_with_unis_str(ts_start, ts_end=None): + ''' + Return duration in seconds, minutes, or hours + - From now by default + - from provided end timestamp + ''' + try: + ts_start = int(ts_start) + except ValueError: + ts_start = _convert_dt_str_to_ts(ts_start) + if not ts_start: + return f"Could not determine duration with ts_start={ts_start}" + try: + if not ts_end: + ts_end = time.time() + ts_end = int(ts_end) + except ValueError: + ts_end = _convert_dt_str_to_ts(ts_end) + if not ts_end: + return f"Could not determine duration with ts_end={ts_end}" + seconds = float(int(ts_end) - int(ts_start)) div = 1.0 unit = 'seconds' if seconds > 60: @@ -122,23 +148,27 @@ def indexer_store_state_split(request): result['dynamic'][key] = state_obj[key] # Determine if run events and run state event_key = None + formatted_duration = None if current_state in [ IndexerStore.state_endpoint_start[0], IndexerStore.state_waiting[0], IndexerStore.state_load_indexing[0], ]: event_key = 'previous_event' + formatted_duration = _duration_with_unis_str(state_obj['start_ts'], ts_end=state_obj['end_time']) elif current_state == IndexerStore.state_run_indexing[0]: - event_key = 'previous_event' - if state_obj['end_time'] == 'unknown': + if state_obj['end_time'] == 'tbd': event_key = 'current_event' + formatted_duration = _duration_with_unis_str(state_obj['start_ts']) + else: + event_key = 'previous_event' + formatted_duration = _duration_with_unis_str(state_obj['start_ts'], ts_end=state_obj['end_time']) # Add current or previous run event keys if event_key: result[event_key] = {} for key in indexer_store.event_keys: result[event_key][key] = state_obj[key] - # Add formatted duration and event_tag if running - result[event_key]['formated_duration'] = _duration_with_unis_str(state_obj['start_ts']) + result[event_key]['formated_duration'] = formatted_duration request.query_string = "format=json" return result @@ -146,23 +176,40 @@ def indexer_store_state_split(request): @view_config(route_name='indexer_store', request_method='GET') def indexer_store_state(request): '''Minimal view for current state and additional info if running''' - request.query_string = "format=json" state_obj = request.registry[INDEXER_STORE].get_state() current_state = state_obj.get('state', 'not initialized') result = { 'state': current_state, + 'state_duration': 'could not calculate', } - if state_obj.get('state') in [IndexerStore.state_endpoint_start[0], IndexerStore.state_load_indexing[0]]: - result['state_duration'] = _duration_with_unis_str(state_obj['endpoint_start_ts']) - elif state_obj.get('state') == IndexerStore.state_run_indexing[0]: - # Add formatted duration and event_tag if running - result['current_event_tag'] = state_obj.get('event_tag', 'not initialized') - result['current_duration'] = _duration_with_unis_str(state_obj['start_ts']) - result['current_invalidated_cnt'] = state_obj['invalidated_cnt'] + if current_state == IndexerStore.state_initialized[0]: + result['state_duration'] = _duration_with_unis_str(state_obj['init_dt']) + result['description'] = 'Very short duration. Happens once during deployment' + elif current_state == IndexerStore.state_endpoint_start[0]: + result['state_duration'] = _duration_with_unis_str(state_obj['endpoint_start']) + result['description'] = 'Very short duration. Happens once a minute.' + elif current_state == IndexerStore.state_load_indexing[0]: + result['state_duration'] = _duration_with_unis_str(state_obj['endpoint_start']) + result['description'] = 'Time depends on number of uuids to index. Could take minutes.' + elif current_state == IndexerStore.state_run_indexing[0]: + if state_obj['end_time'] == 'tbd': + result['state_duration'] = _duration_with_unis_str(state_obj['start_ts']) + result['description'] = 'Time depends on number of uuids to index. Could take hours.' + result['current_event_tag'] = state_obj['event_tag'] + result['current_invalidated_cnt'] = state_obj['invalidated_cnt'] + else: + result['state_duration'] = _duration_with_unis_str(state_obj['end_time']) + result['description'] = 'Short duration. Should go to waiting soon.' + result['just_finished_event_tag'] = state_obj.get('event_tag', 'not initialized') + result['just_finished_invalidated_cnt'] = state_obj['invalidated_cnt'] + elif current_state == IndexerStore.state_waiting[0]: + result['state_duration'] = _duration_with_unis_str(state_obj['endpoint_end']) + result['description'] = f"Remains in state for {state_obj['loop_time']} seconds." request.query_string = "format=json" return result + class IndexerStore(LocalStoreClient): config_name = INDEXER state_initialized = ('state_init', 'initialized') @@ -186,6 +233,7 @@ class IndexerStore(LocalStoreClient): 'state', 'state_desc', 'state_error', + 'sub_state', ] event_keys = [ 'duration', @@ -242,6 +290,7 @@ def _end_event(self, event_tag, state): self.item_set(f"{event_tag}:end", str(datetime.datetime.utcnow())) def _start_event(self, event_tag, state): + print(state) '''Create new event with info from state in events keys''' self.list_add(INDEXER_EVENTS_LIST, event_tag) for event_key in self.event_keys: @@ -257,7 +306,31 @@ def get_event(self, event_tag): def get_state(self): return self.dict_get(INDEXER_STATE_TAG) - def set_state(self, state_tuple, errors_cnt=0, invalidated_cnt=0, event_tag=None): + def _set_state(self, state_tuple, new_state): + # Set valid state and store + new_state['state'] = state_tuple[0] + new_state['state_desc'] = state_tuple[1] + self.dict_set(INDEXER_STATE_TAG, new_state) + + def _set_state_load_indexing(self, **kwargs): + state = self.get_state() + state['sub_state'] = kwargs.get('sub_state', 'where is your sub_state!?') + # Set sub state keys + allowed_keys = [] + if state['sub_state'] == 'priority_cycle': + allowed_keys = ['priority_xmin', 'priority_invalidated', 'priority_did_restart'] + elif state['sub_state'] == 'current_cycle': + allowed_keys = ['xmin', 'last_xmin'] + # Update state + for key in allowed_keys: + if key in kwargs: + if isinstance(kwargs[key], list): + kwargs[key] = f"[{', '.join(kwargs[key])}]" + state[key] = str(kwargs[key]) + self._set_state(self.state_load_indexing, state) + return self.get_state(), None + + def set_state(self, state_tuple, **kwargs): state = self.get_state() if not isinstance(state_tuple, tuple): # Invalid State type @@ -275,38 +348,44 @@ def set_state(self, state_tuple, errors_cnt=0, invalidated_cnt=0, event_tag=None state['endpoint_start'] = str(datetime.datetime.utcnow()) state['endpoint_start_ts'] = str(int(time.time())) state['state_error'] = 'tbd' + self._set_state(state_tuple, state) + return self.get_state(), None elif state_tuple[0] == self.state_load_indexing[0]: - # Checking for uuids to index - pass - elif state_tuple[0] == self.state_run_indexing[0] and invalidated_cnt: + # Indexer is checking for uuids to index + return self._set_state_load_indexing(**kwargs) + elif state_tuple[0] == self.state_run_indexing[0] and kwargs.get('invalidated_cnt'): + # Reset event keys + for event_key in self.event_keys: + state[event_key] = 'tbd' # Start indexing - event_tag = self.get_tag(INDEXER_EVENTS_TAG, num_bytes=_EVENT_TAG_LEN) - state['event_tag'] = event_tag - state['invalidated_cnt'] = str(invalidated_cnt) + state['event_tag'] = self.get_tag(INDEXER_EVENTS_TAG, num_bytes=_EVENT_TAG_LEN) + state['invalidated_cnt'] = str(kwargs['invalidated_cnt']) state['start_ts'] = str(int(time.time())) self._start_event(state['event_tag'], state) - elif state_tuple[0] == self.state_run_indexing[0] and event_tag: + self._set_state(state_tuple, state) + return self.get_state(), state['event_tag'] + elif state_tuple[0] == self.state_run_indexing[0] and kwargs.get('event_tag'): # End indexing state['end_time'] = str(datetime.datetime.utcnow()) - state['errors_cnt'] = str(errors_cnt) - duration = int(time.time()) - int(state['start_ts']) - state['duration'] = str(duration) - self._end_event(event_tag, state) + state['end_ts'] = int(time.time()) + state['errors_cnt'] = str(kwargs.get('errors_cnt', 0)) + state['duration'] = _duration_with_unis_str(state['start_ts'], ts_end=state['end_ts']) + self._end_event(kwargs['event_tag'], state) + self._set_state(state_tuple, state) + return self.get_state(), kwargs['event_tag'] elif state_tuple[0] == self.state_waiting[0]: # Waiting in es_index_listener for timeout state['endpoint_end'] = str(datetime.datetime.utcnow()) + self._set_state(state_tuple, state) + return self.get_state(), None elif state_tuple[0] == self.state_run_indexing[0]: # Invalid State state['state_error'] = f"{str(state_tuple)} requries additional arguments" self.dict_set(INDEXER_STATE_TAG, state) + self._set_state(state_tuple, state) return self.get_state(), None else: # Invalid State state['state_error'] = f"{str(state_tuple)} is not a valid state change" self.dict_set(INDEXER_STATE_TAG, state) return self.get_state(), None - # Set valid state and store - state['state'] = state_tuple[0] - state['state_desc'] = state_tuple[1] - self.dict_set(INDEXER_STATE_TAG, state) - return self.get_state(), event_tag From 948a8b2b8142562d8c595689b3ee9ebb3ed89149 Mon Sep 17 00:00:00 2001 From: Casey Litton Date: Mon, 26 Oct 2020 11:29:57 -0700 Subject: [PATCH 5/6] Fixing dates timestamps --- src/snovault/elasticsearch/indexer.py | 1 + .../elasticsearch/local_indexer_store.py | 183 ++++++++---------- 2 files changed, 84 insertions(+), 100 deletions(-) diff --git a/src/snovault/elasticsearch/indexer.py b/src/snovault/elasticsearch/indexer.py index c5410a5041..2e46041580 100644 --- a/src/snovault/elasticsearch/indexer.py +++ b/src/snovault/elasticsearch/indexer.py @@ -378,6 +378,7 @@ def _load_indexing(request, session, connection, indexer_state): first_txn = None snapshot_id = None (xmin, invalidated, restart) = indexer_state.priority_cycle(request) + indexer_store.set_state(indexer_store.state_load_indexing) indexer_state.log_reindex_init_state() # OPTIONAL: restart support if restart: # Currently not bothering with restart!!! diff --git a/src/snovault/elasticsearch/local_indexer_store.py b/src/snovault/elasticsearch/local_indexer_store.py index b231055989..b13dea8bd6 100644 --- a/src/snovault/elasticsearch/local_indexer_store.py +++ b/src/snovault/elasticsearch/local_indexer_store.py @@ -29,44 +29,6 @@ def includeme(config): config.registry[INDEXER_STORE] = IndexerStore(config.registry.settings) -def _convert_dt_str_to_ts(dt_str): - '''Expecting dt_str to be in the format 2020-10-23 17:09:36.442405''' - try: - return datetime.datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S.%f').timestamp() - except ValueError: - return None - - -def _duration_with_unis_str(ts_start, ts_end=None): - ''' - Return duration in seconds, minutes, or hours - - From now by default - - from provided end timestamp - ''' - try: - ts_start = int(ts_start) - except ValueError: - ts_start = _convert_dt_str_to_ts(ts_start) - if not ts_start: - return f"Could not determine duration with ts_start={ts_start}" - try: - if not ts_end: - ts_end = time.time() - ts_end = int(ts_end) - except ValueError: - ts_end = _convert_dt_str_to_ts(ts_end) - if not ts_end: - return f"Could not determine duration with ts_end={ts_end}" - seconds = float(int(ts_end) - int(ts_start)) - div = 1.0 - unit = 'seconds' - if seconds > 60: - div = 60.0 - unit = 'minutes' - if seconds > 3600: - div = 3600.0 - unit = 'hours' - return f"{seconds/div:0.2f} {unit}" @view_config(route_name='indexer_store', request_method='GET', request_param='events') @@ -103,7 +65,7 @@ def indexer_store_events(request): event_tags = indexer_store.list_get(INDEXER_EVENTS_LIST, start, stop) result['events'] = [] for event_tag in event_tags: - result['events'].append(indexer_store.get_event(event_tag)) + result['events'].append(indexer_store.get_event_msg(event_tag)) else: # Events arg is bad format? result['error'] = f"Bad events value '{request_argument}'" @@ -148,27 +110,27 @@ def indexer_store_state_split(request): result['dynamic'][key] = state_obj[key] # Determine if run events and run state event_key = None - formatted_duration = None + duration = None if current_state in [ IndexerStore.state_endpoint_start[0], IndexerStore.state_waiting[0], IndexerStore.state_load_indexing[0], ]: event_key = 'previous_event' - formatted_duration = _duration_with_unis_str(state_obj['start_ts'], ts_end=state_obj['end_time']) + duration = indexer_store._duration_with_unis_str(state_obj['start_dt'], ts_end=state_obj['end_dt']) elif current_state == IndexerStore.state_run_indexing[0]: - if state_obj['end_time'] == 'tbd': + if state_obj['end_dt'] == 'tbd': event_key = 'current_event' - formatted_duration = _duration_with_unis_str(state_obj['start_ts']) + duration = indexer_store._duration_with_unis_str(state_obj['start_dt']) else: event_key = 'previous_event' - formatted_duration = _duration_with_unis_str(state_obj['start_ts'], ts_end=state_obj['end_time']) + duration = indexer_store._duration_with_unis_str(state_obj['start_dt'], ts_end=state_obj['end_dt']) # Add current or previous run event keys if event_key: result[event_key] = {} for key in indexer_store.event_keys: result[event_key][key] = state_obj[key] - result[event_key]['formated_duration'] = formatted_duration + result[event_key]['duration'] = duration request.query_string = "format=json" return result @@ -176,35 +138,36 @@ def indexer_store_state_split(request): @view_config(route_name='indexer_store', request_method='GET') def indexer_store_state(request): '''Minimal view for current state and additional info if running''' - state_obj = request.registry[INDEXER_STORE].get_state() + indexer_store = request.registry[INDEXER_STORE] + state_obj = indexer_store.get_state() current_state = state_obj.get('state', 'not initialized') result = { 'state': current_state, - 'state_duration': 'could not calculate', + 'time_in_state': 'could not calculate', } - if current_state == IndexerStore.state_initialized[0]: - result['state_duration'] = _duration_with_unis_str(state_obj['init_dt']) + if current_state == IndexerStore.state_waiting[0]: + result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['endpoint_end_dt']) + result['description'] = f"Remains in state for {state_obj['loop_time']} seconds." + elif current_state == IndexerStore.state_initialized[0]: + result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['init_dt']) result['description'] = 'Very short duration. Happens once during deployment' elif current_state == IndexerStore.state_endpoint_start[0]: - result['state_duration'] = _duration_with_unis_str(state_obj['endpoint_start']) + result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['endpoint_start']) result['description'] = 'Very short duration. Happens once a minute.' elif current_state == IndexerStore.state_load_indexing[0]: - result['state_duration'] = _duration_with_unis_str(state_obj['endpoint_start']) + result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['endpoint_start']) result['description'] = 'Time depends on number of uuids to index. Could take minutes.' elif current_state == IndexerStore.state_run_indexing[0]: - if state_obj['end_time'] == 'tbd': - result['state_duration'] = _duration_with_unis_str(state_obj['start_ts']) + if state_obj['end_dt'] == 'tbd': + result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['start_dt']) result['description'] = 'Time depends on number of uuids to index. Could take hours.' result['current_event_tag'] = state_obj['event_tag'] result['current_invalidated_cnt'] = state_obj['invalidated_cnt'] else: - result['state_duration'] = _duration_with_unis_str(state_obj['end_time']) + result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['end_dt']) result['description'] = 'Short duration. Should go to waiting soon.' result['just_finished_event_tag'] = state_obj.get('event_tag', 'not initialized') result['just_finished_invalidated_cnt'] = state_obj['invalidated_cnt'] - elif current_state == IndexerStore.state_waiting[0]: - result['state_duration'] = _duration_with_unis_str(state_obj['endpoint_end']) - result['description'] = f"Remains in state for {state_obj['loop_time']} seconds." request.query_string = "format=json" return result @@ -227,21 +190,19 @@ class IndexerStore(LocalStoreClient): 'state_key', ] dynamic_keys = [ - 'endpoint_end', - 'endpoint_start', - 'endpoint_start_ts', + 'endpoint_end_dt', + 'endpoint_start_dt', 'state', 'state_desc', 'state_error', - 'sub_state', ] event_keys = [ 'duration', - 'end_time', + 'end_dt', 'errors_cnt', 'event_tag', 'invalidated_cnt', - 'start_ts', + 'start_dt', ] def __init__(self, settings): @@ -261,6 +222,43 @@ def __init__(self, settings): if not curr_state: self._init_state() + @staticmethod + def _dt_now(): + return str(datetime.datetime.utcnow()) + + @staticmethod + def _convert_dt_to_ts(dt_str): + '''Expecting dt_str to be in the format 2020-10-23 17:09:36.442405''' + try: + return datetime.datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S.%f').timestamp() + except ValueError: + return None + + def _duration_with_unis_str(self, start_dt, end_dt=None): + ''' + Return duration in seconds, minutes, or hours + - Input expected to be in '2020-10-23 17:09:36.442405' format + - From now if end_dt not provided + ''' + start_ts = self._convert_dt_to_ts(start_dt) + if not start_ts: + return f"Could not determine duration with start_dt={start_dt}" + if not end_dt: + end_dt = self._dt_now() + end_ts = self._convert_dt_to_ts(end_dt) + if not end_ts: + return f"Could not determine duration with end_dt={end_dt}" + seconds = float(end_ts - start_ts) + div = 1.0 + unit = 'seconds' + if seconds > 60: + div = 60.0 + unit = 'minutes' + if seconds > 3600: + div = 3600.0 + unit = 'hours' + return f"{seconds/div:0.2f} {unit}" + def _init_state(self): '''Indexer state in redis is named after the process in the config''' init_state = { @@ -285,7 +283,7 @@ def _init_state(self): def _end_event(self, event_tag, state): '''Close event with only certain event keys in state. Also add human readable date time''' - for event_key in ['duration', 'end_time', 'errors_cnt']: + for event_key in ['end_dt', 'errors_cnt']: self.item_set(f"{event_tag}:{event_key}", state[event_key]) self.item_set(f"{event_tag}:end", str(datetime.datetime.utcnow())) @@ -296,11 +294,16 @@ def _start_event(self, event_tag, state): for event_key in self.event_keys: self.item_set(f"{event_tag}:{event_key}", state[event_key]) - def get_event(self, event_tag): - end = str(self.item_get(event_tag + ':end')) + def get_event_msg(self, event_tag): + end_dt = str(self.item_get(event_tag + ':end_dt')) + start_dt = str(self.item_get(event_tag + ':start_dt')) + errors_cnt = str(self.item_get(event_tag + ':errors_cnt')) invalidated_cnt = str(self.item_get(event_tag + ':invalidated_cnt')) - duration = str(self.item_get(event_tag + ':duration')) - msg = f"Indexed '{invalidated_cnt}' uuids in '{duration}' seconds. Ended at '{end}'" + duration = self._duration_with_unis_str(start_dt, end=end_dt) + msg = ( + f"Indexed '{invalidated_cnt}' uuids in '{duration}'" + f"with '{errors_cnt}' errors. Ended at '{end_dt}'." + ) return f"{event_tag}: {msg}" def get_state(self): @@ -312,31 +315,20 @@ def _set_state(self, state_tuple, new_state): new_state['state_desc'] = state_tuple[1] self.dict_set(INDEXER_STATE_TAG, new_state) - def _set_state_load_indexing(self, **kwargs): - state = self.get_state() - state['sub_state'] = kwargs.get('sub_state', 'where is your sub_state!?') - # Set sub state keys - allowed_keys = [] - if state['sub_state'] == 'priority_cycle': - allowed_keys = ['priority_xmin', 'priority_invalidated', 'priority_did_restart'] - elif state['sub_state'] == 'current_cycle': - allowed_keys = ['xmin', 'last_xmin'] - # Update state - for key in allowed_keys: - if key in kwargs: - if isinstance(kwargs[key], list): - kwargs[key] = f"[{', '.join(kwargs[key])}]" - state[key] = str(kwargs[key]) - self._set_state(self.state_load_indexing, state) - return self.get_state(), None - def set_state(self, state_tuple, **kwargs): state = self.get_state() + state['state_error'] = '' if not isinstance(state_tuple, tuple): # Invalid State type state['state_error'] = f"state, '{str(state_tuple)}', is not a tuple" self.dict_set(INDEXER_STATE_TAG, state) return self.get_state(), None + elif state_tuple[0] == self.state_waiting[0]: + # Waiting in es_index_listener for timeout + state['endpoint_start_dt'] = 'tbd' + state['endpoint_end_dt'] = self._dt_now() + self._set_state(state_tuple, state) + return self.get_state(), None elif not len(state_tuple) == 2: # Invalid State len state['state_error'] = f"state, {state_tuple}, is wrong length" @@ -344,15 +336,13 @@ def set_state(self, state_tuple, **kwargs): return self.get_state(), None elif state_tuple[0] == self.state_endpoint_start[0]: # Reset - state['endpoint_end'] = 'tbd' - state['endpoint_start'] = str(datetime.datetime.utcnow()) - state['endpoint_start_ts'] = str(int(time.time())) - state['state_error'] = 'tbd' + state['endpoint_end_dt'] = 'tbd' + state['endpoint_start_dt'] = self._dt_now() self._set_state(state_tuple, state) return self.get_state(), None elif state_tuple[0] == self.state_load_indexing[0]: # Indexer is checking for uuids to index - return self._set_state_load_indexing(**kwargs) + pass elif state_tuple[0] == self.state_run_indexing[0] and kwargs.get('invalidated_cnt'): # Reset event keys for event_key in self.event_keys: @@ -360,24 +350,17 @@ def set_state(self, state_tuple, **kwargs): # Start indexing state['event_tag'] = self.get_tag(INDEXER_EVENTS_TAG, num_bytes=_EVENT_TAG_LEN) state['invalidated_cnt'] = str(kwargs['invalidated_cnt']) - state['start_ts'] = str(int(time.time())) + state['start_dt'] = self._dt_now() self._start_event(state['event_tag'], state) self._set_state(state_tuple, state) return self.get_state(), state['event_tag'] elif state_tuple[0] == self.state_run_indexing[0] and kwargs.get('event_tag'): # End indexing - state['end_time'] = str(datetime.datetime.utcnow()) - state['end_ts'] = int(time.time()) + state['end_dt'] = self._dt_now() state['errors_cnt'] = str(kwargs.get('errors_cnt', 0)) - state['duration'] = _duration_with_unis_str(state['start_ts'], ts_end=state['end_ts']) self._end_event(kwargs['event_tag'], state) self._set_state(state_tuple, state) return self.get_state(), kwargs['event_tag'] - elif state_tuple[0] == self.state_waiting[0]: - # Waiting in es_index_listener for timeout - state['endpoint_end'] = str(datetime.datetime.utcnow()) - self._set_state(state_tuple, state) - return self.get_state(), None elif state_tuple[0] == self.state_run_indexing[0]: # Invalid State state['state_error'] = f"{str(state_tuple)} requries additional arguments" From fae41f411f2cb23a8ea3f00a5f9803ff53e7eba9 Mon Sep 17 00:00:00 2001 From: Casey Litton Date: Wed, 28 Oct 2020 12:44:51 -0700 Subject: [PATCH 6/6] fixing basic indexer store tests --- .../elasticsearch/local_indexer_store.py | 21 ++- .../tests/test_local_indexer_store.py | 169 +++++++++--------- 2 files changed, 98 insertions(+), 92 deletions(-) diff --git a/src/snovault/elasticsearch/local_indexer_store.py b/src/snovault/elasticsearch/local_indexer_store.py index b13dea8bd6..94437d38ab 100644 --- a/src/snovault/elasticsearch/local_indexer_store.py +++ b/src/snovault/elasticsearch/local_indexer_store.py @@ -117,14 +117,20 @@ def indexer_store_state_split(request): IndexerStore.state_load_indexing[0], ]: event_key = 'previous_event' - duration = indexer_store._duration_with_unis_str(state_obj['start_dt'], ts_end=state_obj['end_dt']) + duration = indexer_store._duration_with_unis_str( + state_obj['start_dt'], + end_dt=state_obj['end_dt'] + ) elif current_state == IndexerStore.state_run_indexing[0]: if state_obj['end_dt'] == 'tbd': event_key = 'current_event' duration = indexer_store._duration_with_unis_str(state_obj['start_dt']) else: event_key = 'previous_event' - duration = indexer_store._duration_with_unis_str(state_obj['start_dt'], ts_end=state_obj['end_dt']) + duration = indexer_store._duration_with_unis_str( + state_obj['start_dt'], + end_dt=state_obj['end_dt'] + ) # Add current or previous run event keys if event_key: result[event_key] = {} @@ -197,7 +203,6 @@ class IndexerStore(LocalStoreClient): 'state_error', ] event_keys = [ - 'duration', 'end_dt', 'errors_cnt', 'event_tag', @@ -276,6 +281,7 @@ def _init_state(self): init_state[key] = 'unknown' init_state['state'] = self.state_initialized[0] init_state['state_desc'] = self.state_initialized[1] + init_state['state_error'] = '' # Run event for key in self.event_keys: init_state[key] = 'unknown' @@ -285,10 +291,8 @@ def _end_event(self, event_tag, state): '''Close event with only certain event keys in state. Also add human readable date time''' for event_key in ['end_dt', 'errors_cnt']: self.item_set(f"{event_tag}:{event_key}", state[event_key]) - self.item_set(f"{event_tag}:end", str(datetime.datetime.utcnow())) def _start_event(self, event_tag, state): - print(state) '''Create new event with info from state in events keys''' self.list_add(INDEXER_EVENTS_LIST, event_tag) for event_key in self.event_keys: @@ -299,9 +303,9 @@ def get_event_msg(self, event_tag): start_dt = str(self.item_get(event_tag + ':start_dt')) errors_cnt = str(self.item_get(event_tag + ':errors_cnt')) invalidated_cnt = str(self.item_get(event_tag + ':invalidated_cnt')) - duration = self._duration_with_unis_str(start_dt, end=end_dt) + duration = self._duration_with_unis_str(start_dt, end_dt=end_dt) msg = ( - f"Indexed '{invalidated_cnt}' uuids in '{duration}'" + f"Indexed '{invalidated_cnt}' uuids in '{duration}' " f"with '{errors_cnt}' errors. Ended at '{end_dt}'." ) return f"{event_tag}: {msg}" @@ -342,7 +346,8 @@ def set_state(self, state_tuple, **kwargs): return self.get_state(), None elif state_tuple[0] == self.state_load_indexing[0]: # Indexer is checking for uuids to index - pass + self._set_state(state_tuple, state) + return self.get_state(), None elif state_tuple[0] == self.state_run_indexing[0] and kwargs.get('invalidated_cnt'): # Reset event keys for event_key in self.event_keys: diff --git a/src/snovault/elasticsearch/tests/test_local_indexer_store.py b/src/snovault/elasticsearch/tests/test_local_indexer_store.py index 7ad36d7906..e80bcd67c0 100644 --- a/src/snovault/elasticsearch/tests/test_local_indexer_store.py +++ b/src/snovault/elasticsearch/tests/test_local_indexer_store.py @@ -1,6 +1,5 @@ import datetime import pytest -import time from copy import copy from unittest.mock import Mock, patch @@ -71,10 +70,10 @@ def test_init_indexer_config(app_settings): assert state_hash['remote_indexing'] == 'false' assert state_hash['state_key'] == INDEXER_STATE_TAG # Dynamic - assert state_hash['endpoint_end'] == 'unknown' - assert state_hash['endpoint_start'] == 'unknown' + assert state_hash['endpoint_end_dt'] == 'unknown' + assert state_hash['endpoint_start_dt'] == 'unknown' assert state_hash['state'] == indexer_store.state_initialized[0] - assert state_hash['state_error'] == 'unknown' + assert state_hash['state_error'] == '' assert state_hash['state_desc'] == indexer_store.state_initialized[1] # Event for key in IndexerStore.event_keys: @@ -102,38 +101,57 @@ def setup_method(self, app_settings): self.local_store = IndexerStore(local_settings) def test_end_event(self): + state = self.local_store.get_state() + end_dt = datetime.datetime(1999, 1, 1) datetime_mock = Mock(wraps=datetime.datetime) - datetime_mock.utcnow.return_value = datetime.datetime(1999, 1, 1) + datetime_mock.utcnow.return_value = end_dt with patch('datetime.datetime', new=datetime_mock): - duration = '4' - self.local_store._end_event(self.fake_event_tag, duration) - event_duration_tag = f"{self.fake_event_tag}:duration" - assert self.local_store.client.get(event_duration_tag) == duration - event_end_tag = f"{self.fake_event_tag}:end" - assert self.local_store.client.get(event_end_tag), str(datetime.datetime(1999, 1, 1)) + state['errors_cnt'] = str(39) + state['end_dt'] = str(end_dt) + self.local_store._end_event( + self.fake_event_tag, + state, + ) + tag_end_dt = f"{self.fake_event_tag}:end_dt" + assert self.local_store.client.get(tag_end_dt) == state['end_dt'] + tag_errors_cnt = f"{self.fake_event_tag}:errors_cnt" + assert self.local_store.client.get(tag_errors_cnt) == state['errors_cnt'] def test_start_event(self): - invalidated_cnt = '4' - self.local_store._start_event(self.fake_event_tag, invalidated_cnt) - event_invalidated_cnt_tag = f"{self.fake_event_tag}:invalidated_cnt" - assert self.local_store.client.get(event_invalidated_cnt_tag) == invalidated_cnt + state = self.local_store.get_state() + state['event_tag'] = self.fake_event_tag + state['invalidated_cnt'] ='4' + state['start_dt'] = str(datetime.datetime(1999, 1, 1)) + self.local_store._start_event(self.fake_event_tag, state) + for event_key in self.local_store.event_keys: + tag = f"{self.fake_event_tag}:{event_key}" + assert self.local_store.client.get(tag) == state[event_key] all_keys = self.local_store.client.lrange(INDEXER_EVENTS_LIST, 0, -1) assert len(all_keys) == 1 assert all_keys[0] == self.fake_event_tag - def test_get_event(self): - datetime_mock = Mock(wraps=datetime.datetime) - datetime_mock.utcnow.return_value = datetime.datetime(1999, 1, 1) - with patch('datetime.datetime', new=datetime_mock): - invalidated_cnt = '14' - self.local_store._start_event(self.fake_event_tag, invalidated_cnt) - duration = '2' - self.local_store._end_event(self.fake_event_tag, duration) - event_str = self.local_store.get_event(self.fake_event_tag) - expected_event_str = f"Indexed '{invalidated_cnt}' uuids in '{duration}' seconds." - expected_event_str = f"{expected_event_str} Ended at '{datetime.datetime.utcnow()}'" - expected_event_str = f"{self.fake_event_tag}: {expected_event_str}" - assert event_str == expected_event_str + def test_get_event_msg(self): + # expected values + end_dt = datetime.datetime.utcnow() + start_dt = datetime.datetime.utcnow() + errors_cnt = 123 + invalidated_cnt = 1234 + duration = self.local_store._duration_with_unis_str( + str(start_dt), + end_dt=str(end_dt), + ) + # Add to store directly + self.local_store.item_set(self.fake_event_tag + ':end_dt', str(end_dt)) + self.local_store.item_set(self.fake_event_tag + ':start_dt', str(start_dt)) + self.local_store.item_set(self.fake_event_tag + ':errors_cnt', str(errors_cnt)) + self.local_store.item_set(self.fake_event_tag + ':invalidated_cnt', str(invalidated_cnt)) + expected_event_str = ( + f"{self.fake_event_tag}: " + f"Indexed '{invalidated_cnt}' uuids in '{duration}' " + f"with '{errors_cnt}' errors. Ended at '{end_dt}'." + ) + event_str = self.local_store.get_event_msg(self.fake_event_tag) + assert event_str == expected_event_str def test_get_state(self): test_dict = {'test': 'testing'} @@ -151,9 +169,9 @@ def test_set_state_endpoint_start(self): for key, val in original_state.items(): if key not in IndexerStore.dynamic_keys: assert val == new_state[key] - assert new_state['endpoint_end'] == 'tbd' - assert new_state['endpoint_start'] == str(datetime.datetime(1999, 1, 1)) - assert new_state['state_error'] == 'tbd' + assert new_state['endpoint_end_dt'] == 'tbd' + assert new_state['endpoint_start_dt'] == str(datetime.datetime(1999, 1, 1)) + assert new_state['state_error'] == '' assert new_state['state'] == self.local_store.state_endpoint_start[0] assert new_state['state_desc'] == self.local_store.state_endpoint_start[1] @@ -170,12 +188,12 @@ def test_set_state_load_indexing(self): def test_set_state_run_indexing_start(self): invalidated_cnt = 9 original_state = self.local_store.get_state() - time_mock = Mock(wraps=time.time) - time_start = time.time() - time_mock.return_value = time_start new_event_tag = None new_state = None - with patch('time.time', new=time_mock): + start_dt = datetime.datetime(1999, 1, 1) + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = start_dt + with patch('datetime.datetime', new=datetime_mock): new_state, new_event_tag = self.local_store.set_state( self.local_store.state_run_indexing, invalidated_cnt=invalidated_cnt, @@ -183,42 +201,41 @@ def test_set_state_run_indexing_start(self): for key in IndexerStore.static_keys: assert original_state[key] == new_state[key] - assert new_state['endpoint_end'] == original_state['endpoint_end'] - assert new_state['endpoint_start'] == original_state['endpoint_start'] + assert new_state['endpoint_end_dt'] == original_state['endpoint_end_dt'] + assert new_state['endpoint_start_dt'] == original_state['endpoint_start_dt'] assert new_state['state_error'] == original_state['state_error'] assert new_state['state'] == self.local_store.state_run_indexing[0] assert new_state['state_desc'] == self.local_store.state_run_indexing[1] - assert new_state['end_time'] == original_state['end_time'] - assert new_state['errors_cnt'] == original_state['errors_cnt'] + assert new_state['end_dt'] == 'tbd' + assert new_state['errors_cnt'] == 'tbd' some_tag = self.local_store.get_tag(INDEXER_EVENTS_TAG, num_bytes=_EVENT_TAG_LEN) assert len(new_state['event_tag']) == len(some_tag) assert new_state['event_tag'] == new_event_tag - assert new_state['duration'] == original_state['duration'] assert new_state['invalidated_cnt'] == str(invalidated_cnt) - assert new_state['start_ts'] == str(int(time_start)) + assert new_state['start_dt'] == str(start_dt) def test_set_state_run_indexing_end(self): # start event invalidated_cnt = 11 - time_mock = Mock(wraps=time.time) - time_start = 30 - time_mock.return_value = time_start event_tag = None original_state = None - with patch('time.time', new=time_mock): + start_dt = datetime.datetime(1999, 1, 1) + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = start_dt + with patch('datetime.datetime', new=datetime_mock): original_state, event_tag = self.local_store.set_state( self.local_store.state_run_indexing, invalidated_cnt=invalidated_cnt, ) # end event errors_cnt = 3 - time_mock = Mock(wraps=time.time) - time_end= 40 - time_mock.return_value = time_end new_event_tag = None new_state = None - with patch('time.time', new=time_mock): + end_dt = datetime.datetime(1999, 1, 1) + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = end_dt + with patch('datetime.datetime', new=datetime_mock): new_state, new_event_tag = self.local_store.set_state( self.local_store.state_run_indexing, errors_cnt=errors_cnt, @@ -229,42 +246,42 @@ def test_set_state_run_indexing_end(self): for key in IndexerStore.dynamic_keys: assert original_state[key] == new_state[key] - assert isinstance(new_state['end_time'], str) + assert isinstance(new_state['end_dt'], str) assert new_state['errors_cnt'] == str(errors_cnt) assert new_state['event_tag'] == event_tag - assert new_state['duration'] == str(time_end - time_start) assert new_state['invalidated_cnt'] == original_state['invalidated_cnt'] - assert new_state['start_ts'] == original_state['start_ts'] + assert new_state['start_dt'] == original_state['start_dt'] def test_set_state_waiting(self): + start_dt = datetime.datetime(1999, 1, 2) + end_dt = datetime.datetime(1999, 1, 3) # Start endpoint - original_state, _ = self.local_store.set_state(self.local_store.state_endpoint_start) + _, _ = self.local_store.set_state(self.local_store.state_endpoint_start) # Load indexing datetime_mock = Mock(wraps=datetime.datetime) datetime_mock.utcnow.return_value = datetime.datetime(1999, 1, 1) with patch('datetime.datetime', new=datetime_mock): self.local_store.set_state(self.local_store.state_load_indexing) - # Start Event - time_mock = Mock(wraps=time.time) - time_start = int(500) - time_mock.return_value = time_start + # Run Indexing Start event_tag = None - original_state = None - with patch('time.time', new=time_mock): - _, event_tag = self.local_store.set_state( - self.local_store.state_run_indexing, - invalidated_cnt=7, - ) - # End Event - time_end = int(700) - time_mock.return_value = time_end - with patch('time.time', new=time_mock): + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = start_dt + with patch('datetime.datetime', new=datetime_mock): + _, event_tag = self.local_store.set_state( + self.local_store.state_run_indexing, + invalidated_cnt=7, + ) + # Run Indexing End + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = end_dt + with patch('datetime.datetime', new=datetime_mock): self.local_store.set_state(self.local_store.state_run_indexing, event_tag=event_tag) # Endpoint End new_state, _ = self.local_store.set_state(self.local_store.state_waiting) # Make sure the last event exists during waiting state assert new_state['event_tag'] == event_tag - assert new_state['duration'] == str(time_end - time_start) + assert new_state['end_dt'] == str(end_dt) + assert new_state['start_dt'] == str(start_dt) # Check state assert new_state['state'] == self.local_store.state_waiting[0] assert new_state['state_desc'] == self.local_store.state_waiting[1] @@ -286,22 +303,6 @@ def test_set_state_error_type(self): state, _ = self.local_store.set_state(bad_state) assert state['state_error'] == expected_state_error - def test_set_state_error_len(self): - bad_state = tuple() - expected_state_error = f"state, {bad_state}, is wrong length" - state, _ = self.local_store.set_state(bad_state) - assert state['state_error'] == expected_state_error - - bad_state = tuple(['a']) - expected_state_error = f"state, {bad_state}, is wrong length" - state, _ = self.local_store.set_state(bad_state) - assert state['state_error'] == expected_state_error - - bad_state = tuple(['a', 'b', 'c']) - expected_state_error = f"state, {bad_state}, is wrong length" - state, _ = self.local_store.set_state(bad_state) - assert state['state_error'] == expected_state_error - def test_set_state_error_dne(self): not_allow_state_change = tuple(['state_init', 'initialized']) expected_state_error = f"{str(not_allow_state_change)} is not a valid state change"