diff --git a/.circleci/config.yml b/.circleci/config.yml index 944ab45935..76d22c0328 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -47,7 +47,8 @@ commands: openjdk-11-jdk \ postgresql-${PG_VERSION} \ ruby2.3 \ - ruby2.3-dev + ruby2.3-dev \ + redis-server sudo chown -R circleci /etc/elasticsearch sed -i "1s;^;export PATH=${ES_BIN}:${PG_BIN}:$PATH\n;" $BASH_ENV sudo apt-get install -y python3.7-dev python3-pip diff --git a/base.ini b/base.ini index aad6e6f68b..3d9772661b 100644 --- a/base.ini +++ b/base.ini @@ -1,5 +1,6 @@ [app:app] use = egg:snovault#snowflakes +config_name = app sqlalchemy.url = postgresql:///snowflakes retry.attempts = 3 file_upload_bucket = snowflakes-files-dev @@ -43,7 +44,8 @@ embed_cache.capacity = 5000 use = egg:snovault#indexer app = app path = /index -timeout = 60 +set timeout = 60 +set config_name = indexer set embed_cache.capacity = 5000 set indexer = true set queue_type = Simple @@ -57,7 +59,8 @@ set queue_worker_batch_size = 2000000 use = egg:snovault#indexer app = app path = /index_file -timeout = 60 +set timeout = 60 +set config_name = regionindexer set embed_cache.capacity = 5000 set regionindexer = true diff --git a/development.ini b/development.ini index f30543ec58..4ee96deeea 100644 --- a/development.ini +++ b/development.ini @@ -8,11 +8,13 @@ use = config:base.ini#app sqlalchemy.url = postgresql://postgres@:5432/postgres?host=/tmp/snovault/pgdata snp_search.server = localhost:9200 load_test_only = true +local_tz = US/Pacific create_tables = true testing = true postgresql.statement_timeout = 20 indexer.processes = + pyramid.reload_templates = true pyramid.debug_authorization = false pyramid.debug_notfound = true @@ -20,6 +22,13 @@ pyramid.debug_routematch = false pyramid.default_locale_name = en snovault.load_test_data = snowflakes.loadxl:load_test_data +# Local Storage: Settings must exist in... +# snovault/tests/[testappsettings.py, test_key.py] +# snowflakes/tests/conftest.py +local_storage_host = localhost +local_storage_port = 6378 +local_storage_redis_index = 1 +local_storage_timeout = 5 [pipeline:debug] pipeline = diff --git a/requirements.osx.txt b/requirements.osx.txt index a349892e3e..7c3173084b 100644 --- a/requirements.osx.txt +++ b/requirements.osx.txt @@ -1,4 +1,6 @@ pip==20.0.2 psycopg2==2.8.4 +redis==3.5.3 +redis-server==5.0.7 setuptools==45.1.0 zc.buildout==2.13.2 diff --git a/requirements.txt b/requirements.txt index a349892e3e..7c3173084b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ pip==20.0.2 psycopg2==2.8.4 +redis==3.5.3 +redis-server==5.0.7 setuptools==45.1.0 zc.buildout==2.13.2 diff --git a/src/snovault/__init__.py b/src/snovault/__init__.py index 3cee982490..f9fe86366e 100644 --- a/src/snovault/__init__.py +++ b/src/snovault/__init__.py @@ -68,6 +68,7 @@ def includeme(config): config.include('.crud_views') config.include('.indexing_views') config.include('.resource_views') + config.include('.elasticsearch.local_indexer_store') def main(global_config, **local_config): diff --git a/src/snovault/dev_servers.py b/src/snovault/dev_servers.py index 38c08bc588..aaa00fcaaf 100644 --- a/src/snovault/dev_servers.py +++ b/src/snovault/dev_servers.py @@ -6,7 +6,7 @@ """ from pkg_resources import resource_filename -from pyramid.paster import get_app +from pyramid.paster import get_app, get_appsettings from multiprocessing import Process import atexit @@ -62,17 +62,25 @@ def main(): parser.add_argument('--datadir', default='/tmp/snovault', help="path to datadir") args = parser.parse_args() + appsettings = get_appsettings(args.config_uri, name='app') + # Required settings in config + local_storage_host = appsettings['local_storage_host'] + local_storage_port = appsettings['local_storage_port'] + local_storage_redis_index = appsettings['local_storage_redis_index'] + local_storage_timeout = appsettings['local_storage_timeout'] + logging.basicConfig() # Loading app will have configured from config file. Reconfigure here: logging.getLogger('snovault').setLevel(logging.INFO) - from snovault.tests import elasticsearch_fixture, postgresql_fixture + from snovault.tests import elasticsearch_fixture, postgresql_fixture, redis_storage_fixture from snovault.elasticsearch import create_mapping datadir = os.path.abspath(args.datadir) pgdata = os.path.join(datadir, 'pgdata') esdata = os.path.join(datadir, 'esdata') + redisdata = os.path.join(datadir, 'redisdata') if args.clear: - for dirname in [pgdata, esdata]: + for dirname in [pgdata, esdata, redisdata]: if os.path.exists(dirname): shutil.rmtree(dirname) if args.init: @@ -81,7 +89,9 @@ def main(): postgres = postgresql_fixture.server_process(pgdata, echo=True) elasticsearch = elasticsearch_fixture.server_process(esdata, echo=True) nginx = nginx_server_process(echo=True) - processes = [postgres, elasticsearch, nginx] + redis_config_path = redis_storage_fixture.initdb(redisdata, local_storage_port, echo=True) + redis = redis_storage_fixture.server_process(redis_config_path, local_storage_port, local_storage_redis_index, echo=True) + processes = [postgres, elasticsearch, nginx, redis] print_processes = [] diff --git a/src/snovault/elasticsearch/__init__.py b/src/snovault/elasticsearch/__init__.py index 97e5949d71..eea412cdaf 100644 --- a/src/snovault/elasticsearch/__init__.py +++ b/src/snovault/elasticsearch/__init__.py @@ -42,6 +42,7 @@ def includeme(config): config.include('.indexer') config.include('.indexer_state') + config.include('.local_indexer_store') if asbool(settings.get('indexer')) and not PY2: config.include('.mpindexer') diff --git a/src/snovault/elasticsearch/indexer.py b/src/snovault/elasticsearch/indexer.py index 2fb9aab14c..2e46041580 100644 --- a/src/snovault/elasticsearch/indexer.py +++ b/src/snovault/elasticsearch/indexer.py @@ -20,10 +20,13 @@ from snovault.storage import ( TransactionRecord, ) +from snovault.elasticsearch.local_indexer_store import IndexerStore + from urllib3.exceptions import ReadTimeoutError from .interfaces import ( ELASTIC_SEARCH, INDEXER, + INDEXER_STORE, RESOURCES_INDEX, ) from .indexer_state import ( @@ -90,6 +93,9 @@ def includeme(config): _update_for_uuid_queues(registry) if not processes: registry[INDEXER] = Indexer(registry) + if not registry.get(INDEXER_STORE): + registry[INDEXER_STORE] = IndexerStore(config.registry.settings) + def get_related_uuids(request, es, updated, renamed): @@ -336,6 +342,8 @@ def _get_nodes(request, indexer_state): @view_config(route_name='index', request_method='POST', permission="index") def index(request): + indexer_store = request.registry[INDEXER_STORE] + indexer_store.set_state(indexer_store.state_endpoint_start) # Setting request.datastore here only works because routed views are not traversed. request.datastore = 'database' session = request.registry[DBSESSION]() @@ -366,9 +374,11 @@ def index(request): return {'this_node': this_node, 'other_node': other_node} def _load_indexing(request, session, connection, indexer_state): + indexer_store.set_state(indexer_store.state_load_indexing) first_txn = None snapshot_id = None (xmin, invalidated, restart) = indexer_state.priority_cycle(request) + indexer_store.set_state(indexer_store.state_load_indexing) indexer_state.log_reindex_init_state() # OPTIONAL: restart support if restart: # Currently not bothering with restart!!! @@ -493,7 +503,11 @@ def _run_indexing( result = indexer_state.start_cycle(invalidated, result) # Do the work... - + local_state, event_tag = indexer_store.set_state( + indexer_store.state_run_indexing, + # 'invalidated' indicates the start of indexing + invalidated_cnt=len(invalidated) + ) indexing_update_infos, errors, err_msg = request.registry[INDEXER].serve_objects( request, invalidated, @@ -501,6 +515,11 @@ def _run_indexing( snapshot_id=snapshot_id, restart=restart, ) + indexer_store.set_state( + indexer_store.state_run_indexing, + errors_cnt=len(errors), + event_tag=event_tag, + ) if err_msg: log.warning('Could not start indexing: %s', err_msg) result = indexer_state.finish_cycle(result,errors) @@ -544,6 +563,7 @@ def _run_indexing( output_tuple = _load_indexing(request, session, connection, indexer_state) result, invalidated, flush, first_txn, snapshot_id, restart, xmin, return_now = output_tuple if return_now: + indexer_store.set_state(indexer_store.state_waiting) return result indexing_update_infos = [] dry_run = request.json.get('dry_run', False) @@ -625,6 +645,7 @@ def _run_indexing( # opposed to in the indexer or just after serve_objects, # so a crash in logging does not interupt indexing complietion request.registry[INDEXER].check_log_indexing_times(indexing_update_infos) + indexer_store.set_state(indexer_store.state_waiting) return result diff --git a/src/snovault/elasticsearch/interfaces.py b/src/snovault/elasticsearch/interfaces.py index ae3f28961b..8e5ac948a8 100644 --- a/src/snovault/elasticsearch/interfaces.py +++ b/src/snovault/elasticsearch/interfaces.py @@ -5,6 +5,10 @@ ELASTIC_SEARCH = 'elasticsearch' SNP_SEARCH_ES = 'snp_search' INDEXER = 'indexer' +INDEXER_STORE = f"{INDEXER}_store" +INDEXER_STATE_TAG = f"{INDEXER}_state:hash" +INDEXER_EVENTS_TAG = f"{INDEXER}_event" +INDEXER_EVENTS_LIST = f"{INDEXER_EVENTS_TAG}:list" RESOURCES_INDEX = 'snovault-resources' diff --git a/src/snovault/elasticsearch/local_indexer_store.py b/src/snovault/elasticsearch/local_indexer_store.py new file mode 100644 index 0000000000..94437d38ab --- /dev/null +++ b/src/snovault/elasticsearch/local_indexer_store.py @@ -0,0 +1,379 @@ +import datetime +import re +import time + +from pyramid.view import view_config + +from snovault.local_storage import ( + LocalStoreClient, + base_result, +) +from snovault.elasticsearch.interfaces import ( + INDEXER, + INDEXER_STORE, + INDEXER_STATE_TAG, + INDEXER_EVENTS_TAG, + INDEXER_EVENTS_LIST, +) + + +_EVENT_TAG_LEN = 4 +_EVENT_PATTERN = re.compile("^" + INDEXER_EVENTS_TAG + ":[a-z0-9]{" + str(2*_EVENT_TAG_LEN) + "}$") +_EVENTS_PATTERN = re.compile("^-?\d+:-?\d+$") +_EVENTS_DEFAULT_RANGE = '0:100' + + +def includeme(config): + config.scan(__name__) + config.add_route('indexer_store', '/indexer_store') + config.registry[INDEXER_STORE] = IndexerStore(config.registry.settings) + + + + +@view_config(route_name='indexer_store', request_method='GET', request_param='events') +def indexer_store_events(request): + ''' + Get indexer event(s) based on regex of events value + * This redis list is LIFO + + 1. Value looks like an event tag: + * indexer_event:4fc592b0 + 2. Value looks like a range: + * All events=0:-1 + * First events=-1:-1 + * Last/Most recent events=0:0 + * Range recent events=4:94 + 3. Default no events value uses _EVENTS_DEFAULT_RANGE + ''' + request_argument = request.params.get("events") + if not request_argument: + # Test for falsey, request params returns empty string if not events value given + request_argument = _EVENTS_DEFAULT_RANGE + indexer_store = request.registry[INDEXER_STORE] + result = base_result(indexer_store) + if _EVENT_PATTERN.match(request_argument): + # request argument looks like an event_tag + result['event'] = {} + for event_key in indexer_store.get_tag_keys(request_argument): + result['event'][event_key] = indexer_store.item_get(event_key) + elif _EVENTS_PATTERN.match(request_argument): + # request argument looks like range string + colon = request_argument.index(':') + start = int(request_argument[:colon]) + stop = int(request_argument[colon+1:]) + event_tags = indexer_store.list_get(INDEXER_EVENTS_LIST, start, stop) + result['events'] = [] + for event_tag in event_tags: + result['events'].append(indexer_store.get_event_msg(event_tag)) + else: + # Events arg is bad format? + result['error'] = f"Bad events value '{request_argument}'" + result['error_desc'] = 'Must be key(indexer_event:4fc592b0) or range(4:94) like' + request.query_string = "format=json" + return result + + +@view_config(route_name='indexer_store', request_method='GET', request_param='state=raw') +def indexer_store_state_raw(request): + '''Only return raw state object from redis''' + request.query_string = "format=json" + return request.registry[INDEXER_STORE].get_state() + + +@view_config(route_name='indexer_store', request_method='GET', request_param='state=split') +def indexer_store_state_split(request): + '''All state info organized into sections with some extra info and error checking''' + indexer_store = request.registry[INDEXER_STORE] + state_obj = indexer_store.get_state() + current_state = state_obj.get('state') + # Early return with raw full state due lack of state or still initializing + if not current_state: + return { + 'early_return': 'state_obj state keys is Falsey', + 'state_obj': state_obj, + } + elif current_state == IndexerStore.state_initialized[0]: + return { + 'early_return': 'state_obj state key is in intial state', + 'state_obj': state_obj, + } + # Normal return + result = base_result(indexer_store) + # Add Static Info + result['static'] = {} + for key in IndexerStore.static_keys: + result['static'][key] = state_obj[key] + # Add Dynamic State + result['dynamic'] = {} + for key in IndexerStore.dynamic_keys: + result['dynamic'][key] = state_obj[key] + # Determine if run events and run state + event_key = None + duration = None + if current_state in [ + IndexerStore.state_endpoint_start[0], + IndexerStore.state_waiting[0], + IndexerStore.state_load_indexing[0], + ]: + event_key = 'previous_event' + duration = indexer_store._duration_with_unis_str( + state_obj['start_dt'], + end_dt=state_obj['end_dt'] + ) + elif current_state == IndexerStore.state_run_indexing[0]: + if state_obj['end_dt'] == 'tbd': + event_key = 'current_event' + duration = indexer_store._duration_with_unis_str(state_obj['start_dt']) + else: + event_key = 'previous_event' + duration = indexer_store._duration_with_unis_str( + state_obj['start_dt'], + end_dt=state_obj['end_dt'] + ) + # Add current or previous run event keys + if event_key: + result[event_key] = {} + for key in indexer_store.event_keys: + result[event_key][key] = state_obj[key] + result[event_key]['duration'] = duration + request.query_string = "format=json" + return result + + +@view_config(route_name='indexer_store', request_method='GET') +def indexer_store_state(request): + '''Minimal view for current state and additional info if running''' + indexer_store = request.registry[INDEXER_STORE] + state_obj = indexer_store.get_state() + current_state = state_obj.get('state', 'not initialized') + result = { + 'state': current_state, + 'time_in_state': 'could not calculate', + } + if current_state == IndexerStore.state_waiting[0]: + result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['endpoint_end_dt']) + result['description'] = f"Remains in state for {state_obj['loop_time']} seconds." + elif current_state == IndexerStore.state_initialized[0]: + result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['init_dt']) + result['description'] = 'Very short duration. Happens once during deployment' + elif current_state == IndexerStore.state_endpoint_start[0]: + result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['endpoint_start']) + result['description'] = 'Very short duration. Happens once a minute.' + elif current_state == IndexerStore.state_load_indexing[0]: + result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['endpoint_start']) + result['description'] = 'Time depends on number of uuids to index. Could take minutes.' + elif current_state == IndexerStore.state_run_indexing[0]: + if state_obj['end_dt'] == 'tbd': + result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['start_dt']) + result['description'] = 'Time depends on number of uuids to index. Could take hours.' + result['current_event_tag'] = state_obj['event_tag'] + result['current_invalidated_cnt'] = state_obj['invalidated_cnt'] + else: + result['time_in_state'] = indexer_store._duration_with_unis_str(state_obj['end_dt']) + result['description'] = 'Short duration. Should go to waiting soon.' + result['just_finished_event_tag'] = state_obj.get('event_tag', 'not initialized') + result['just_finished_invalidated_cnt'] = state_obj['invalidated_cnt'] + request.query_string = "format=json" + return result + + + +class IndexerStore(LocalStoreClient): + config_name = INDEXER + state_initialized = ('state_init', 'initialized') + state_endpoint_start = ('state_endpoint_start', 'Endpoint started running') + state_load_indexing = ('state_load_indexing', 'Endpoint checking for uuids to index') + state_run_indexing = ('state_run_indexing', 'Endpoint found uuids and started indexing') + state_waiting = ('state_waiting', 'Waiting to call endpoint') + static_keys = [ + 'addr', + 'config_name', + 'init_dt', + 'loop_time', + 'pg_ip', + 'remote_indexing', + 'state_key', + ] + dynamic_keys = [ + 'endpoint_end_dt', + 'endpoint_start_dt', + 'state', + 'state_desc', + 'state_error', + ] + event_keys = [ + 'end_dt', + 'errors_cnt', + 'event_tag', + 'invalidated_cnt', + 'start_dt', + ] + + def __init__(self, settings): + super().__init__( + db_index=settings['local_storage_redis_index'], + host=settings['local_storage_host'], + port=settings['local_storage_port'], + local_tz=settings['local_tz'], + socket_timeout=int(settings['local_storage_timeout']), + ) + if settings.get('config_name', 'no name') == IndexerStore.config_name: + # only the indexer process should set these attributes and initialize this store + self.pg_ip = str(settings.get('pg_ip', 'localhost')) + self.remote_indexing = str(settings.get('remote_indexing', 'false')) + self.loop_time = str(settings.get('timeout', 'unknown')) + curr_state = self.get_state() + if not curr_state: + self._init_state() + + @staticmethod + def _dt_now(): + return str(datetime.datetime.utcnow()) + + @staticmethod + def _convert_dt_to_ts(dt_str): + '''Expecting dt_str to be in the format 2020-10-23 17:09:36.442405''' + try: + return datetime.datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S.%f').timestamp() + except ValueError: + return None + + def _duration_with_unis_str(self, start_dt, end_dt=None): + ''' + Return duration in seconds, minutes, or hours + - Input expected to be in '2020-10-23 17:09:36.442405' format + - From now if end_dt not provided + ''' + start_ts = self._convert_dt_to_ts(start_dt) + if not start_ts: + return f"Could not determine duration with start_dt={start_dt}" + if not end_dt: + end_dt = self._dt_now() + end_ts = self._convert_dt_to_ts(end_dt) + if not end_ts: + return f"Could not determine duration with end_dt={end_dt}" + seconds = float(end_ts - start_ts) + div = 1.0 + unit = 'seconds' + if seconds > 60: + div = 60.0 + unit = 'minutes' + if seconds > 3600: + div = 3600.0 + unit = 'hours' + return f"{seconds/div:0.2f} {unit}" + + def _init_state(self): + '''Indexer state in redis is named after the process in the config''' + init_state = { + # Static + 'addr': f"{self.config_name}:{id(self)}", + 'config_name': self.config_name, + 'init_dt': str(datetime.datetime.utcnow()), + 'loop_time': str(self.loop_time), + 'pg_ip': self.pg_ip, + 'remote_indexing': self.remote_indexing, + 'state_key': INDEXER_STATE_TAG, + } + # Dynamic + for key in self.dynamic_keys: + init_state[key] = 'unknown' + init_state['state'] = self.state_initialized[0] + init_state['state_desc'] = self.state_initialized[1] + init_state['state_error'] = '' + # Run event + for key in self.event_keys: + init_state[key] = 'unknown' + self.dict_set(INDEXER_STATE_TAG, init_state) + + def _end_event(self, event_tag, state): + '''Close event with only certain event keys in state. Also add human readable date time''' + for event_key in ['end_dt', 'errors_cnt']: + self.item_set(f"{event_tag}:{event_key}", state[event_key]) + + def _start_event(self, event_tag, state): + '''Create new event with info from state in events keys''' + self.list_add(INDEXER_EVENTS_LIST, event_tag) + for event_key in self.event_keys: + self.item_set(f"{event_tag}:{event_key}", state[event_key]) + + def get_event_msg(self, event_tag): + end_dt = str(self.item_get(event_tag + ':end_dt')) + start_dt = str(self.item_get(event_tag + ':start_dt')) + errors_cnt = str(self.item_get(event_tag + ':errors_cnt')) + invalidated_cnt = str(self.item_get(event_tag + ':invalidated_cnt')) + duration = self._duration_with_unis_str(start_dt, end_dt=end_dt) + msg = ( + f"Indexed '{invalidated_cnt}' uuids in '{duration}' " + f"with '{errors_cnt}' errors. Ended at '{end_dt}'." + ) + return f"{event_tag}: {msg}" + + def get_state(self): + return self.dict_get(INDEXER_STATE_TAG) + + def _set_state(self, state_tuple, new_state): + # Set valid state and store + new_state['state'] = state_tuple[0] + new_state['state_desc'] = state_tuple[1] + self.dict_set(INDEXER_STATE_TAG, new_state) + + def set_state(self, state_tuple, **kwargs): + state = self.get_state() + state['state_error'] = '' + if not isinstance(state_tuple, tuple): + # Invalid State type + state['state_error'] = f"state, '{str(state_tuple)}', is not a tuple" + self.dict_set(INDEXER_STATE_TAG, state) + return self.get_state(), None + elif state_tuple[0] == self.state_waiting[0]: + # Waiting in es_index_listener for timeout + state['endpoint_start_dt'] = 'tbd' + state['endpoint_end_dt'] = self._dt_now() + self._set_state(state_tuple, state) + return self.get_state(), None + elif not len(state_tuple) == 2: + # Invalid State len + state['state_error'] = f"state, {state_tuple}, is wrong length" + self.dict_set(INDEXER_STATE_TAG, state) + return self.get_state(), None + elif state_tuple[0] == self.state_endpoint_start[0]: + # Reset + state['endpoint_end_dt'] = 'tbd' + state['endpoint_start_dt'] = self._dt_now() + self._set_state(state_tuple, state) + return self.get_state(), None + elif state_tuple[0] == self.state_load_indexing[0]: + # Indexer is checking for uuids to index + self._set_state(state_tuple, state) + return self.get_state(), None + elif state_tuple[0] == self.state_run_indexing[0] and kwargs.get('invalidated_cnt'): + # Reset event keys + for event_key in self.event_keys: + state[event_key] = 'tbd' + # Start indexing + state['event_tag'] = self.get_tag(INDEXER_EVENTS_TAG, num_bytes=_EVENT_TAG_LEN) + state['invalidated_cnt'] = str(kwargs['invalidated_cnt']) + state['start_dt'] = self._dt_now() + self._start_event(state['event_tag'], state) + self._set_state(state_tuple, state) + return self.get_state(), state['event_tag'] + elif state_tuple[0] == self.state_run_indexing[0] and kwargs.get('event_tag'): + # End indexing + state['end_dt'] = self._dt_now() + state['errors_cnt'] = str(kwargs.get('errors_cnt', 0)) + self._end_event(kwargs['event_tag'], state) + self._set_state(state_tuple, state) + return self.get_state(), kwargs['event_tag'] + elif state_tuple[0] == self.state_run_indexing[0]: + # Invalid State + state['state_error'] = f"{str(state_tuple)} requries additional arguments" + self.dict_set(INDEXER_STATE_TAG, state) + self._set_state(state_tuple, state) + return self.get_state(), None + else: + # Invalid State + state['state_error'] = f"{str(state_tuple)} is not a valid state change" + self.dict_set(INDEXER_STATE_TAG, state) + return self.get_state(), None diff --git a/src/snovault/elasticsearch/tests/test_indexer_store_views.py b/src/snovault/elasticsearch/tests/test_indexer_store_views.py new file mode 100644 index 0000000000..151094f000 --- /dev/null +++ b/src/snovault/elasticsearch/tests/test_indexer_store_views.py @@ -0,0 +1,570 @@ +import datetime +import pytest +import time + +from unittest.mock import Mock, patch + +from snovault.elasticsearch.interfaces import ( + INDEXER, + INDEXER_EVENTS_LIST, + INDEXER_EVENTS_TAG, + INDEXER_STATE_TAG, +) +from snovault.elasticsearch.local_indexer_store import ( + IndexerStore, + _EVENT_TAG_LEN, +) + +def _setup_indexer_store(settings): + app_settings['config_name'] = INDEXER + return IndexerStore(app_settings) + + +class TestLocalStoreNoInitViews(): + ''' + No init. IndexerStore with 'app' configuration. + Calling the views prior to IndexerStore 'indexer' configuration initializing + ''' + + @pytest.fixture(autouse=True) + def setup_class(self, testapp, app_settings): + self.testapp = testapp + # Delete previous redis keys + self.local_store = IndexerStore(app_settings) + for key in self.local_store.get_tag_keys(INDEXER_EVENTS_TAG): + self.local_store.client.delete(key) + self.local_store.client.delete(INDEXER_EVENTS_LIST) + self.local_store.client.delete(INDEXER_STATE_TAG) + + # Event Views + def test_indexer_store_events_tag(self): + some_tag = 'a'*(2*_EVENT_TAG_LEN) + event_tag = f"{INDEXER_EVENTS_TAG}:{some_tag}" + url = f"/indexer_store?events={event_tag}" + result = self.testapp.get(url, status=200).json + assert 'events_cnt' not in result + assert 'events' not in result + assert 'event' in result + assert result['event'] == {} + + def test_indexer_store_events_bad_tag(self): + event_tag = 'not-a-tag' + url = f"/indexer_store?events={event_tag}" + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' in result + assert result['events_cnt'] == 0 + assert 'events' in result + assert result['events'] == [] + + some_tag = 'a'*(_EVENT_TAG_LEN) + event_tag = f"{INDEXER_EVENTS_TAG}:{some_tag}" + url = f"/indexer_store?events={event_tag}" + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' in result + assert result['events_cnt'] == 0 + assert 'events' in result + assert result['events'] == [] + + def test_indexer_store_events_range(self): + url = '/indexer_store?events=3:4' + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' not in result + assert 'events' in result + assert result['events'] == [] + + def test_indexer_store_events(self): + url = '/indexer_store?events' + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' in result + assert result['events_cnt'] == 0 + assert 'events' in result + assert result['events'] == [] + + # State Views + def test_indexer_store_state_only(self): + url = '/indexer_store?state=state' + result = self.testapp.get(url, status=200).json + assert 'state' in result + assert result['state'] == 'not initialized' + + def test_indexer_store_state_full(self): + url = '/indexer_store?state=full' + result = self.testapp.get(url, status=200).json + assert result == {} + + def test_indexer_store_state(self): + url = '/indexer_store?state' + result = self.testapp.get(url, status=200).json + assert result['early_return'] == 'state_obj state keys is Falsey' + assert result['state_obj'] == self.local_store.get_state() + + +class TestLocalStoreInitViews(): + ''' + Init. IndexerStore with 'indexer' configuration. + Calling the views prior to IndexerStore 'indexer' configuration being set in the index endpoint + ''' + + @pytest.fixture(autouse=True) + def setup_class(self, testapp, app_settings): + self.testapp = testapp + # Delete previous redis keys + self.local_store = IndexerStore(app_settings) + for key in self.local_store.get_tag_keys(INDEXER_EVENTS_TAG): + self.local_store.client.delete(key) + self.local_store.client.delete(INDEXER_EVENTS_LIST) + self.local_store.client.delete(INDEXER_STATE_TAG) + + @pytest.fixture(autouse=True) + def setup_method(self, app_settings): + # Delete previous redis keys + app_settings['config_name'] = INDEXER + self.local_store = IndexerStore(app_settings) + for key in self.local_store.get_tag_keys(INDEXER_EVENTS_TAG): + self.local_store.client.delete(key) + self.local_store.client.delete(INDEXER_EVENTS_LIST) + self.local_store.client.delete(INDEXER_STATE_TAG) + # Create new local store + app_settings['config_name'] = INDEXER + self.local_store = IndexerStore(app_settings) + + # Event Views + def test_indexer_store_events_tag(self): + some_tag = 'a'*(2*_EVENT_TAG_LEN) + event_tag = f"{INDEXER_EVENTS_TAG}:{some_tag}" + url = f"/indexer_store?events={event_tag}" + result = self.testapp.get(url, status=200).json + print(result) + assert 'events_cnt' not in result + assert 'events' not in result + assert 'event' in result + assert result['event'] == {} + + def test_indexer_store_events_bad_tag(self): + event_tag = 'not-a-tag' + url = f"/indexer_store?events={event_tag}" + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' in result + assert result['events_cnt'] == 0 + assert 'events' in result + assert result['events'] == [] + + some_tag = 'a'*(_EVENT_TAG_LEN) + event_tag = f"{INDEXER_EVENTS_TAG}:{some_tag}" + url = f"/indexer_store?events={event_tag}" + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' in result + assert result['events_cnt'] == 0 + assert 'events' in result + assert result['events'] == [] + + def test_indexer_store_events_range(self): + url = '/indexer_store?events=3:4' + result = self.testapp.get(url, status=200).json + print(result) + assert 'event' not in result + assert 'events_cnt' not in result + assert 'events' in result + assert result['events'] == [] + + def test_indexer_store_events(self): + url = '/indexer_store?events' + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' in result + assert result['events_cnt'] == 0 + assert 'events' in result + assert result['events'] == [] + + # State Views + def test_indexer_store_state_only(self): + url = '/indexer_store?state=state' + result = self.testapp.get(url, status=200).json + assert 'state' in result + assert result['state'] == IndexerStore.state_initialized[0] + + def test_indexer_store_state_full(self): + url = '/indexer_store?state=full' + result = self.testapp.get(url, status=200).json + assert result == self.local_store.get_state() + + def test_indexer_store_state(self): + url = '/indexer_store?state' + result = self.testapp.get(url, status=200).json + assert result['early_return'] == 'state_obj state key is in intial state' + assert result['state_obj'] == self.local_store.get_state() + + +class TestLocalStoreStateViews(): + ''' + IndexerStore with 'indexer' configuration. + - IndexerStore endpoint has been called + ''' + + @pytest.fixture(autouse=True) + def setup_class(self, testapp, app_settings): + self.testapp = testapp + # Delete previous redis keys + self.local_store = IndexerStore(app_settings) + for key in self.local_store.get_tag_keys(INDEXER_EVENTS_TAG): + self.local_store.client.delete(key) + self.local_store.client.delete(INDEXER_EVENTS_LIST) + self.local_store.client.delete(INDEXER_STATE_TAG) + + @pytest.fixture(autouse=True) + def setup_method(self, app_settings): + # Delete previous redis keys + app_settings['config_name'] = INDEXER + self.local_store = IndexerStore(app_settings) + for key in self.local_store.get_tag_keys(INDEXER_EVENTS_TAG): + print(key) + self.local_store.client.delete(key) + self.local_store.client.delete(INDEXER_EVENTS_LIST) + self.local_store.client.delete(INDEXER_STATE_TAG) + # Create new local store + app_settings['config_name'] = INDEXER + self.local_store = IndexerStore(app_settings) + + # State Views + @pytest.mark.parametrize( + 'state', + ( + IndexerStore.state_endpoint_start, + IndexerStore.state_load_indexing, + IndexerStore.state_waiting, + ) + ) + def test_indexer_store_state_no_event(self, state): + self.local_store.set_state(state) + url = '/indexer_store' + result = self.testapp.get(url, status=200).json + # Static + assert 'static' in result + assert isinstance(result['static'], dict) + res_keys = list(result['static'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.static_keys + # Dynamic + assert 'dynamic' in result + assert isinstance(result['dynamic'], dict) + res_keys = list(result['dynamic'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.dynamic_keys + # Event + assert 'previous_event' in result + assert isinstance(result['previous_event'], dict) + res_keys = list(result['previous_event'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.event_keys + for key in IndexerStore.event_keys: + assert result['previous_event'][key] == 'unknown' + + @pytest.mark.parametrize( + 'state', + ( + IndexerStore.state_endpoint_start, + IndexerStore.state_load_indexing, + IndexerStore.state_waiting, + ) + ) + def test_indexer_store_state_prev_event(self, state): + # Start run event + time_mock = Mock(wraps=time.time) + time_start = 38 + time_mock.return_value = time_start + invalidated_cnt=83 + event_tag = None + with patch('time.time', new=time_mock): + _, event_tag = self.local_store.set_state( + IndexerStore.state_run_indexing, + invalidated_cnt=invalidated_cnt + ) + # End run event + time_end = 48 + time_mock.return_value = time_end + errors_cnt = 81 + with patch('time.time', new=time_mock): + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = datetime.datetime(1999, 1, 1) + with patch('datetime.datetime', new=datetime_mock): + _, event_tag = self.local_store.set_state( + IndexerStore.state_run_indexing, + errors_cnt=errors_cnt, + event_tag=event_tag, + ) + # Test in other states + self.local_store.set_state(state) + url = '/indexer_store' + result = self.testapp.get(url, status=200).json + # Static + assert 'static' in result + assert isinstance(result['static'], dict) + res_keys = list(result['static'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.static_keys + # Dynamic + assert 'dynamic' in result + assert isinstance(result['dynamic'], dict) + res_keys = list(result['dynamic'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.dynamic_keys + # Event + assert 'previous_event' in result + assert isinstance(result['previous_event'], dict) + res_keys = list(result['previous_event'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.event_keys + assert result['previous_event']['duration'] == str(time_end - time_start) + assert result['previous_event']['end_time'] == str(datetime.datetime(1999, 1, 1)) + assert result['previous_event']['errors_cnt'] == str(errors_cnt) + assert result['previous_event']['event_tag'] == event_tag + assert result['previous_event']['invalidated_cnt'] == str(invalidated_cnt) + assert result['previous_event']['start_ts'] == str(time_start) + + def test_indexer_store_state_curr_event(self): + state, _ = self.local_store.set_state(IndexerStore.state_endpoint_start) + self.local_store.set_state(IndexerStore.state_load_indexing) + # Start run event + time_mock = Mock(wraps=time.time) + time_start = 38 + time_mock.return_value = time_start + invalidated_cnt=83 + event_tag = None + with patch('time.time', new=time_mock): + _, event_tag = self.local_store.set_state( + IndexerStore.state_run_indexing, + invalidated_cnt=invalidated_cnt + ) + # Test endpoint + url = '/indexer_store' + time_mock = Mock(wraps=time.time) + time_now = 49 + time_mock.return_value = time_now + result = None + with patch('time.time', new=time_mock): + result = self.testapp.get(url, status=200).json + # Static + assert 'static' in result + assert isinstance(result['static'], dict) + res_keys = list(result['static'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.static_keys + # Dynamic + assert 'dynamic' in result + assert isinstance(result['dynamic'], dict) + res_keys = list(result['dynamic'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.dynamic_keys + # Event + assert 'current_event' in result + assert isinstance(result['current_event'], dict) + assert 'current_duration' in result + duration = time_now - time_start + assert result['current_duration'] == f"{duration:0.2f} seconds" + res_keys = list(result['current_event'].keys()) + res_keys.sort() + assert res_keys == IndexerStore.event_keys + assert result['current_event']['duration'] == 'unknown' + assert result['current_event']['end_time'] == 'unknown' + assert result['current_event']['errors_cnt'] == 'unknown' + assert result['current_event']['event_tag'] == event_tag + assert result['current_event']['invalidated_cnt'] == str(invalidated_cnt) + assert result['current_event']['start_ts'] == str(time_start) + + + def test_indexer_store_state_curr_event_minutes(self): + state, _ = self.local_store.set_state(IndexerStore.state_endpoint_start) + self.local_store.set_state(IndexerStore.state_load_indexing) + # Start run event + time_mock = Mock(wraps=time.time) + time_start = 10 + time_mock.return_value = time_start + invalidated_cnt=38 + event_tag = None + with patch('time.time', new=time_mock): + _, event_tag = self.local_store.set_state( + IndexerStore.state_run_indexing, + invalidated_cnt=invalidated_cnt + ) + # Test endpoint + url = '/indexer_store' + time_mock = Mock(wraps=time.time) + time_now = 130 + time_mock.return_value = time_now + result = None + with patch('time.time', new=time_mock): + result = self.testapp.get(url, status=200).json + # Current Event + assert 'current_event' in result + assert isinstance(result['current_event'], dict) + assert 'current_duration' in result + duration = (time_now - time_start)/60.0 + assert result['current_duration'] == f"{duration:0.2f} minutes" + + + def test_indexer_store_state_curr_event_hours(self): + state, _ = self.local_store.set_state(IndexerStore.state_endpoint_start) + self.local_store.set_state(IndexerStore.state_load_indexing) + # Start run event + time_mock = Mock(wraps=time.time) + time_start = 120 + time_mock.return_value = time_start + invalidated_cnt=38 + event_tag = None + with patch('time.time', new=time_mock): + _, event_tag = self.local_store.set_state( + IndexerStore.state_run_indexing, + invalidated_cnt=invalidated_cnt + ) + # Test endpoint + url = '/indexer_store' + time_mock = Mock(wraps=time.time) + time_now = 7320 + time_mock.return_value = time_now + result = None + with patch('time.time', new=time_mock): + result = self.testapp.get(url, status=200).json + # Current Event + assert 'current_event' in result + assert isinstance(result['current_event'], dict) + assert 'current_duration' in result + duration = (time_now - time_start)/3600.0 + assert result['current_duration'] == f"{duration:0.2f} hours" + + +class TestLocalStoreEventViews(): + + def _add_event(self, event_number): + event_dict = { + 'error_cnt': event_number*10, + 'invalidated_cnt': event_number*100, + 'time_start': event_number*1000, + 'time_end': event_number*10000, + } + self.local_store.set_state(IndexerStore.state_endpoint_start) + self.local_store.set_state(IndexerStore.state_load_indexing) + # Start run event + time_mock = Mock(wraps=time.time) + time_mock.return_value = event_dict['time_start'] + event_tag = None + with patch('time.time', new=time_mock): + _, event_tag = self.local_store.set_state( + IndexerStore.state_run_indexing, + invalidated_cnt=event_dict['invalidated_cnt'] + ) + event_dict['event_tag'] = event_tag + # End run event + time_mock.return_value = event_dict['time_end'] + with patch('time.time', new=time_mock): + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = datetime.datetime(1999, 1, 1) + with patch('datetime.datetime', new=datetime_mock): + self.local_store.set_state( + IndexerStore.state_run_indexing, + errors_cnt=event_dict['error_cnt'], + event_tag=event_dict['event_tag'], + ) + self.local_store.set_state(IndexerStore.state_waiting) + expected_result = {} + event_dict['duration'] = event_dict['time_end'] - event_dict['time_start'] + expected_result[f"{event_tag}:duration"] = str(event_dict['duration']) + expected_result[f"{event_tag}:invalidated_cnt"] = str(event_dict['invalidated_cnt']) + expected_result[f"{event_tag}:end"] = str(datetime.datetime(1999, 1, 1)) + return event_dict, expected_result + + @pytest.fixture(autouse=True) + def setup_class(self, testapp, app_settings): + self.testapp = testapp + # Delete previous redis keys + self.local_store = IndexerStore(app_settings) + for key in self.local_store.get_tag_keys(INDEXER_EVENTS_TAG): + self.local_store.client.delete(key) + self.local_store.client.delete(INDEXER_EVENTS_LIST) + self.local_store.client.delete(INDEXER_STATE_TAG) + + @pytest.fixture(autouse=True) + def setup_method(self, app_settings): + # Delete previous redis keys + app_settings['config_name'] = INDEXER + self.local_store = IndexerStore(app_settings) + for key in self.local_store.get_tag_keys(INDEXER_EVENTS_TAG): + print(key) + self.local_store.client.delete(key) + self.local_store.client.delete(INDEXER_EVENTS_LIST) + self.local_store.client.delete(INDEXER_STATE_TAG) + # Create new local store + app_settings['config_name'] = INDEXER + self.local_store = IndexerStore(app_settings) + + def test_indexer_store_events_one(self): + event_dict_one, expected_result_one = self._add_event(1) + # Test event + url = f"/indexer_store?events={event_dict_one['event_tag']}" + result = self.testapp.get(url, status=200).json + assert 'events_cnt' not in result + assert 'events' not in result + assert 'event' in result + assert result['event'] == expected_result_one + # Test Range + url = f"/indexer_store?events=0:1" + result = self.testapp.get(url, status=200).json + expected_result_str_one = f"{event_dict_one['event_tag']}: Indexed '{event_dict_one['invalidated_cnt']}' " + expected_result_str_one += f"uuids in '{event_dict_one['duration']}' seconds. Ended at '1999-01-01 00:00:00'" + assert 'event' not in result + assert 'events_cnt' not in result + assert 'events' in result + assert len(result['events']) == 1 + assert result['events'][0] == expected_result_str_one + # Test Tags + url = f"/indexer_store?events" + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' in result + assert result['events_cnt'] == 1 + assert 'events' in result + assert len(result['events']) == 1 + assert result['events'][0] == event_dict_one['event_tag'] + + def test_indexer_store_events_three(self): + event_dict_one, expected_result_one = self._add_event(1) + event_dict_two, expected_result_two = self._add_event(2) + event_dict_three, expected_result_three = self._add_event(3) + # Test event + url = f"/indexer_store?events={event_dict_two['event_tag']}" + result = self.testapp.get(url, status=200).json + assert 'events_cnt' not in result + assert 'events' not in result + assert 'event' in result + assert result['event'] == expected_result_two + # Test Range + url = f"/indexer_store?events=1:-1" + result = self.testapp.get(url, status=200).json + expected_result_str_one = f"{event_dict_one['event_tag']}: Indexed '{event_dict_one['invalidated_cnt']}' " + expected_result_str_one += f"uuids in '{event_dict_one['duration']}' seconds. Ended at '1999-01-01 00:00:00'" + expected_result_str_two = f"{event_dict_two['event_tag']}: Indexed '{event_dict_two['invalidated_cnt']}' " + expected_result_str_two += f"uuids in '{event_dict_two['duration']}' seconds. Ended at '1999-01-01 00:00:00'" + assert 'event' not in result + assert 'events_cnt' not in result + assert 'events' in result + assert len(result['events']) == 2 + for ev in result['events']: + print(ev) + assert result['events'][0] == expected_result_str_two + assert result['events'][1] == expected_result_str_one + # Test Tags + url = f"/indexer_store?events" + result = self.testapp.get(url, status=200).json + assert 'event' not in result + assert 'events_cnt' in result + assert result['events_cnt'] == 3 + assert 'events' in result + assert len(result['events']) == 3 + assert result['events'][0] == event_dict_three['event_tag'] + assert result['events'][1] == event_dict_two['event_tag'] + assert result['events'][2] == event_dict_one['event_tag'] diff --git a/src/snovault/elasticsearch/tests/test_local_indexer_store.py b/src/snovault/elasticsearch/tests/test_local_indexer_store.py new file mode 100644 index 0000000000..e80bcd67c0 --- /dev/null +++ b/src/snovault/elasticsearch/tests/test_local_indexer_store.py @@ -0,0 +1,315 @@ +import datetime +import pytest + +from copy import copy +from unittest.mock import Mock, patch + +from snovault.local_storage import LocalStoreClient + +from snovault.elasticsearch.local_indexer_store import ( + IndexerStore, + _EVENT_TAG_LEN, +) +from snovault.elasticsearch.interfaces import ( + INDEXER, + INDEXER_EVENTS_LIST, + INDEXER_EVENTS_TAG, + INDEXER_STATE_TAG, +) + + + +def _get_local_app_settings(settings): + local_settings = copy(settings) + local_settings['config_name'] = 'app' + return local_settings + + +def _get_local_indexer_settings(local_settings): + local_settings.update(_get_local_app_settings(local_settings)) + local_settings['config_name'] = INDEXER + local_settings['pg_ip'] = '1.2.3.4' + local_settings['remote_indexing'] = 'false' + local_settings['timeout'] = '10' + return local_settings + + +def test_init_app_config(app_settings): + local_settings = _get_local_app_settings(app_settings) + indexer_store = IndexerStore(local_settings) + assert not hasattr(indexer_store, 'pg_ip') + assert not hasattr(indexer_store, 'remote_indexing') + assert not hasattr(indexer_store, 'loop_time') + assert indexer_store.get_state() == {} + + +def test_init_indexer_config(app_settings): + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = datetime.datetime(1999, 1, 1) + with patch('datetime.datetime', new=datetime_mock): + local_settings = _get_local_indexer_settings(app_settings) + indexer_store = IndexerStore(local_settings) + assert hasattr(indexer_store, 'pg_ip') + assert hasattr(indexer_store, 'remote_indexing') + assert hasattr(indexer_store, 'loop_time') + state_hash = indexer_store.get_state() + # Keys + state_hash_keys = list(state_hash.keys()) + state_hash_keys.sort() + all_keys = list(IndexerStore.static_keys) + all_keys.extend(list(IndexerStore.dynamic_keys)) + all_keys.extend(list(IndexerStore.event_keys)) + all_keys.sort() + assert state_hash_keys == all_keys + # Static values + assert state_hash['addr'] == f"{INDEXER}:{id(indexer_store)}" + assert state_hash['config_name'] == INDEXER + assert state_hash['init_dt'] == str(datetime.datetime(1999, 1, 1)) + assert state_hash['loop_time'] == '10' + assert state_hash['pg_ip'] == '1.2.3.4' + assert state_hash['remote_indexing'] == 'false' + assert state_hash['state_key'] == INDEXER_STATE_TAG + # Dynamic + assert state_hash['endpoint_end_dt'] == 'unknown' + assert state_hash['endpoint_start_dt'] == 'unknown' + assert state_hash['state'] == indexer_store.state_initialized[0] + assert state_hash['state_error'] == '' + assert state_hash['state_desc'] == indexer_store.state_initialized[1] + # Event + for key in IndexerStore.event_keys: + assert state_hash[key] == 'unknown' + + +class TestLocalStore(): + fake_event_tag = 'fakeeventtag' + + @pytest.fixture(autouse=True) + def setup_method(self, app_settings): + ''' + Add local store to test class and cleans up standard redis keys + - Uses the exposed redis client directly + ''' + # Delete previous redis keys + local_settings = _get_local_indexer_settings(copy(app_settings)) + self.local_store = IndexerStore(local_settings) + for key in self.local_store.get_tag_keys(self.fake_event_tag): + self.local_store.client.delete(key) + self.local_store.client.delete(INDEXER_EVENTS_LIST) + self.local_store.client.delete(INDEXER_STATE_TAG) + # Create new local store + local_settings = _get_local_indexer_settings(copy(app_settings)) + self.local_store = IndexerStore(local_settings) + + def test_end_event(self): + state = self.local_store.get_state() + end_dt = datetime.datetime(1999, 1, 1) + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = end_dt + with patch('datetime.datetime', new=datetime_mock): + state['errors_cnt'] = str(39) + state['end_dt'] = str(end_dt) + self.local_store._end_event( + self.fake_event_tag, + state, + ) + tag_end_dt = f"{self.fake_event_tag}:end_dt" + assert self.local_store.client.get(tag_end_dt) == state['end_dt'] + tag_errors_cnt = f"{self.fake_event_tag}:errors_cnt" + assert self.local_store.client.get(tag_errors_cnt) == state['errors_cnt'] + + def test_start_event(self): + state = self.local_store.get_state() + state['event_tag'] = self.fake_event_tag + state['invalidated_cnt'] ='4' + state['start_dt'] = str(datetime.datetime(1999, 1, 1)) + self.local_store._start_event(self.fake_event_tag, state) + for event_key in self.local_store.event_keys: + tag = f"{self.fake_event_tag}:{event_key}" + assert self.local_store.client.get(tag) == state[event_key] + all_keys = self.local_store.client.lrange(INDEXER_EVENTS_LIST, 0, -1) + assert len(all_keys) == 1 + assert all_keys[0] == self.fake_event_tag + + def test_get_event_msg(self): + # expected values + end_dt = datetime.datetime.utcnow() + start_dt = datetime.datetime.utcnow() + errors_cnt = 123 + invalidated_cnt = 1234 + duration = self.local_store._duration_with_unis_str( + str(start_dt), + end_dt=str(end_dt), + ) + # Add to store directly + self.local_store.item_set(self.fake_event_tag + ':end_dt', str(end_dt)) + self.local_store.item_set(self.fake_event_tag + ':start_dt', str(start_dt)) + self.local_store.item_set(self.fake_event_tag + ':errors_cnt', str(errors_cnt)) + self.local_store.item_set(self.fake_event_tag + ':invalidated_cnt', str(invalidated_cnt)) + expected_event_str = ( + f"{self.fake_event_tag}: " + f"Indexed '{invalidated_cnt}' uuids in '{duration}' " + f"with '{errors_cnt}' errors. Ended at '{end_dt}'." + ) + event_str = self.local_store.get_event_msg(self.fake_event_tag) + assert event_str == expected_event_str + + def test_get_state(self): + test_dict = {'test': 'testing'} + self.local_store.client.delete(INDEXER_STATE_TAG) + self.local_store.client.hmset(INDEXER_STATE_TAG, test_dict) + assert self.local_store.get_state() == test_dict + + def test_set_state_endpoint_start(self): + original_state = self.local_store.get_state() + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = datetime.datetime(1999, 1, 1) + with patch('datetime.datetime', new=datetime_mock): + self.local_store.set_state(self.local_store.state_endpoint_start) + new_state = self.local_store.get_state() + for key, val in original_state.items(): + if key not in IndexerStore.dynamic_keys: + assert val == new_state[key] + assert new_state['endpoint_end_dt'] == 'tbd' + assert new_state['endpoint_start_dt'] == str(datetime.datetime(1999, 1, 1)) + assert new_state['state_error'] == '' + assert new_state['state'] == self.local_store.state_endpoint_start[0] + assert new_state['state_desc'] == self.local_store.state_endpoint_start[1] + + def test_set_state_load_indexing(self): + original_state = self.local_store.get_state() + self.local_store.set_state(self.local_store.state_load_indexing) + new_state = self.local_store.get_state() + for key, val in original_state.items(): + if key not in ['state', 'state_desc']: + assert val == new_state[key] + assert new_state['state'] == self.local_store.state_load_indexing[0] + assert new_state['state_desc'] == self.local_store.state_load_indexing[1] + + def test_set_state_run_indexing_start(self): + invalidated_cnt = 9 + original_state = self.local_store.get_state() + new_event_tag = None + new_state = None + start_dt = datetime.datetime(1999, 1, 1) + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = start_dt + with patch('datetime.datetime', new=datetime_mock): + new_state, new_event_tag = self.local_store.set_state( + self.local_store.state_run_indexing, + invalidated_cnt=invalidated_cnt, + ) + for key in IndexerStore.static_keys: + assert original_state[key] == new_state[key] + + assert new_state['endpoint_end_dt'] == original_state['endpoint_end_dt'] + assert new_state['endpoint_start_dt'] == original_state['endpoint_start_dt'] + assert new_state['state_error'] == original_state['state_error'] + assert new_state['state'] == self.local_store.state_run_indexing[0] + assert new_state['state_desc'] == self.local_store.state_run_indexing[1] + + assert new_state['end_dt'] == 'tbd' + assert new_state['errors_cnt'] == 'tbd' + some_tag = self.local_store.get_tag(INDEXER_EVENTS_TAG, num_bytes=_EVENT_TAG_LEN) + assert len(new_state['event_tag']) == len(some_tag) + assert new_state['event_tag'] == new_event_tag + assert new_state['invalidated_cnt'] == str(invalidated_cnt) + assert new_state['start_dt'] == str(start_dt) + + def test_set_state_run_indexing_end(self): + # start event + invalidated_cnt = 11 + event_tag = None + original_state = None + start_dt = datetime.datetime(1999, 1, 1) + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = start_dt + with patch('datetime.datetime', new=datetime_mock): + original_state, event_tag = self.local_store.set_state( + self.local_store.state_run_indexing, + invalidated_cnt=invalidated_cnt, + ) + # end event + errors_cnt = 3 + new_event_tag = None + new_state = None + end_dt = datetime.datetime(1999, 1, 1) + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = end_dt + with patch('datetime.datetime', new=datetime_mock): + new_state, new_event_tag = self.local_store.set_state( + self.local_store.state_run_indexing, + errors_cnt=errors_cnt, + event_tag=original_state['event_tag'], + ) + for key in IndexerStore.static_keys: + assert original_state[key] == new_state[key] + for key in IndexerStore.dynamic_keys: + assert original_state[key] == new_state[key] + + assert isinstance(new_state['end_dt'], str) + assert new_state['errors_cnt'] == str(errors_cnt) + assert new_state['event_tag'] == event_tag + assert new_state['invalidated_cnt'] == original_state['invalidated_cnt'] + assert new_state['start_dt'] == original_state['start_dt'] + + def test_set_state_waiting(self): + start_dt = datetime.datetime(1999, 1, 2) + end_dt = datetime.datetime(1999, 1, 3) + # Start endpoint + _, _ = self.local_store.set_state(self.local_store.state_endpoint_start) + # Load indexing + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = datetime.datetime(1999, 1, 1) + with patch('datetime.datetime', new=datetime_mock): + self.local_store.set_state(self.local_store.state_load_indexing) + # Run Indexing Start + event_tag = None + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = start_dt + with patch('datetime.datetime', new=datetime_mock): + _, event_tag = self.local_store.set_state( + self.local_store.state_run_indexing, + invalidated_cnt=7, + ) + # Run Indexing End + datetime_mock = Mock(wraps=datetime.datetime) + datetime_mock.utcnow.return_value = end_dt + with patch('datetime.datetime', new=datetime_mock): + self.local_store.set_state(self.local_store.state_run_indexing, event_tag=event_tag) + # Endpoint End + new_state, _ = self.local_store.set_state(self.local_store.state_waiting) + # Make sure the last event exists during waiting state + assert new_state['event_tag'] == event_tag + assert new_state['end_dt'] == str(end_dt) + assert new_state['start_dt'] == str(start_dt) + # Check state + assert new_state['state'] == self.local_store.state_waiting[0] + assert new_state['state_desc'] == self.local_store.state_waiting[1] + + def test_set_state_run_indexing_no_args(self): + run_state = self.local_store.state_run_indexing + original_state, event_tag = self.local_store.set_state(self.local_store.state_run_indexing) + assert event_tag is None + assert original_state['state_error'] == f"{str(run_state)} requries additional arguments" + + def test_set_state_error_type(self): + bad_state = 'not a tuple' + expected_state_error = f"state, '{str(bad_state)}', is not a tuple" + state, _ = self.local_store.set_state(bad_state) + assert state['state_error'] == expected_state_error + + bad_state = IndexerStore + expected_state_error = f"state, '{str(bad_state)}', is not a tuple" + state, _ = self.local_store.set_state(bad_state) + assert state['state_error'] == expected_state_error + + def test_set_state_error_dne(self): + not_allow_state_change = tuple(['state_init', 'initialized']) + expected_state_error = f"{str(not_allow_state_change)} is not a valid state change" + state, _ = self.local_store.set_state(not_allow_state_change) + assert state['state_error'] == expected_state_error + + dne_state = tuple(['state_dne', 'does not exist']) + expected_state_error = f"{str(dne_state)} is not a valid state change" + state, _ = self.local_store.set_state(dne_state) + assert state['state_error'] == expected_state_error diff --git a/src/snovault/local_storage.py b/src/snovault/local_storage.py new file mode 100644 index 0000000000..1de0ecb8a8 --- /dev/null +++ b/src/snovault/local_storage.py @@ -0,0 +1,66 @@ +import binascii + +from os import urandom +from datetime import datetime +from pytz import timezone + +from redis import StrictRedis + + +def base_result(local_store): + local_dt = datetime.now(timezone(local_store.local_tz)) + return { + 'utc_now': str(datetime.utcnow()), + 'lcl_now': f"{local_store.local_tz}: {local_dt}", + } + + +class LocalStoreClient(): + ''' + Light redis wrapper and redis examples + - get_tag function was added to return hex str + - Can access client directly for full functionality + ''' + def __init__(self, **kwargs): + self.local_tz = kwargs.get('local_tz', 'GMT') + self.client = StrictRedis( + charset='utf-8', + decode_responses=True, + db=kwargs['db_index'], + host=kwargs['host'], + port=kwargs['port'], + socket_timeout=kwargs['socket_timeout'], + ) + + @staticmethod + def get_tag(tag, num_bytes=2): + ''' + Tags are the tag plus a random hex bytes string + - Bytes string length is 2 * num bytes + ''' + rand_hex_str = binascii.b2a_hex(urandom(num_bytes)).decode('utf-8') + return f"{tag}:{rand_hex_str}" + + def ping(self): + return self.client.ping() + + def dict_get(self, key): + return self.client.hgetall(key) + + def dict_set(self, key, hash_dict): + return self.client.hmset(key, hash_dict) + + def get_tag_keys(self, tag): + return self.client.keys(f"{tag}:*") + + def item_get(self, key): + return self.client.get(key) + + def item_set(self, key, item): + return self.client.set(key, item) + + def list_add(self, key, item): + return self.client.lpush(key, item) + + def list_get(self, key, start=0, stop=-1): + return self.client.lrange(key, start, stop) diff --git a/src/snovault/tests/indexfixtures.py b/src/snovault/tests/indexfixtures.py index f28cb18f6a..893e60693a 100644 --- a/src/snovault/tests/indexfixtures.py +++ b/src/snovault/tests/indexfixtures.py @@ -7,7 +7,7 @@ def autouse_external_tx(external_tx): @pytest.fixture(scope='session') -def app_settings(wsgi_server_host_port, elasticsearch_server, postgresql_server): +def app_settings(wsgi_server_host_port, elasticsearch_server, postgresql_server, redis_server): from .testappfixtures import _app_settings settings = _app_settings.copy() settings['create_tables'] = True diff --git a/src/snovault/tests/redis_storage_fixture.py b/src/snovault/tests/redis_storage_fixture.py new file mode 100644 index 0000000000..be0f4b20f9 --- /dev/null +++ b/src/snovault/tests/redis_storage_fixture.py @@ -0,0 +1,150 @@ +'''Local storage redis Fixture''' +import os +import sys + +import subprocess + + +# Commented out config is added in 'initdb' below +REDIS_DEFAULTS = [ + # 'bind 127.0.0.1 ::1', Disabled ipv6 since circleci does not support + 'protected-mode yes', + # port 6379', Port change + 'tcp-backlog 511', + 'timeout 0', + 'tcp-keepalive 300', + 'daemonize no', + 'supervised no', + # 'pidfile /var/run/redis_6379.pid', Port change + 'loglevel notice', + 'logfile ""', + 'databases 16', + 'always-show-logo yes', + 'save 900 1', + 'save 300 10', + 'save 60 10000', + 'stop-writes-on-bgsave-error yes', + 'rdbcompression yes', + 'rdbchecksum yes', + 'dbfilename dump.rdb', + # 'dir /usr/local/var/db/redis/', Variable database location + 'slave-serve-stale-data yes', + 'slave-read-only yes', + 'repl-diskless-sync no', + 'repl-diskless-sync-delay 5', + 'repl-disable-tcp-nodelay no', + 'slave-priority 100', + 'lazyfree-lazy-eviction no', + 'lazyfree-lazy-expire no', + 'lazyfree-lazy-server-del no', + 'slave-lazy-flush no', + 'appendonly no', + 'appendfilename "appendonly.aof"', + 'appendfsync everysec', + 'no-appendfsync-on-rewrite no', + 'auto-aof-rewrite-percentage 100', + 'auto-aof-rewrite-min-size 64mb', + 'aof-load-truncated yes', + 'aof-use-rdb-preamble no', + 'lua-time-limit 5000', + 'slowlog-max-len 128', + 'latency-monitor-threshold 0', + 'notify-keyspace-events ""', + 'hash-max-ziplist-entries 512', + 'hash-max-ziplist-value 64', + 'list-max-ziplist-size -2', + 'list-compress-depth 0', + 'set-max-intset-entries 512', + 'zset-max-ziplist-entries 128', + 'zset-max-ziplist-value 64', + 'hll-sparse-max-bytes 3000', + 'activerehashing yes', + 'client-output-buffer-limit normal 0 0 0', + 'client-output-buffer-limit slave 256mb 64mb 60', + 'client-output-buffer-limit pubsub 32mb 8mb 60', + 'hz 10', + 'aof-rewrite-incremental-fsync yes', +] + + +def initdb(datadir, redis_port, echo=False): + '''Create redis db config in data dir''' + redis_config_lines = REDIS_DEFAULTS.copy() + redis_config_lines.append(f"port {redis_port}") + redis_config_lines.append(f"pidfile /var/run/redis_{redis_port}.pid") + redis_config_lines.append(f"dir {datadir}") + redis_config_lines.append('bind 127.0.0.1') + redis_config_path = f"{datadir}/redis.conf" + if not os.path.exists(datadir): + os.makedirs(datadir) + with open(redis_config_path, '+w') as file_handler: + file_handler.writelines('\n'.join(redis_config_lines)) + if echo: + print(f"Redis Config created: {redis_config_path}") + return redis_config_path + + +def server_process(redis_config_path, redis_port, redis_index, echo=False): + '''Start redis server''' + args = [ + os.path.join('redis-server'), + f"{redis_config_path}", + ] + if echo: + print(f"Starting redis server: {' '.join(args)}") + print(f"Connect with 'redis-cli -p {redis_port} -n {redis_index}'") + redis_process = subprocess.Popen( + args, + close_fds=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + return redis_process + + +def _main(): + import atexit + import shutil + import tempfile + datadir = tempfile.mkdtemp() + + def _cleanup(): + shutil.rmtree(datadir) + print(f"Redis Cleaned dir: {datadir}") + + try: + print(f"Starting in dir: {datadir}") + redis_config_path = initdb(datadir, 6378, echo=True) + redis_process = server_process(redis_config_path, 6378, echo=True) + except Exception as ecp: # pylint: disable=broad-except + _cleanup() + shutil.rmtree(datadir) + print('Cleaned dir: %s' % datadir) + raise ecp + + @atexit.register + def cleanup_process(): # pylint: disable=unused-variable + '''System Exit hook to remove db and kill redis server process''' + try: + if redis_process.poll() is None: + redis_process.terminate() + for line in redis_process.stdout: + sys.stdout.write(line.decode('utf-8')) + redis_process.wait() + finally: + _cleanup() + + try: + break_line = b'Ready to accept connections' + for line in iter(redis_process.stdout.readline, b''): + sys.stdout.write(line.decode('utf-8')) + strip_line = line.strip() + if strip_line.endswith(break_line) and break_line: + print('Redis Started. ^C to exit.') + break_line = None + except KeyboardInterrupt: + raise SystemExit(0) + + +if __name__ == '__main__': + _main() diff --git a/src/snovault/tests/serverfixtures.py b/src/snovault/tests/serverfixtures.py index 05f064d8fc..e0fde7be0c 100644 --- a/src/snovault/tests/serverfixtures.py +++ b/src/snovault/tests/serverfixtures.py @@ -1,7 +1,11 @@ import pytest import os + +from time import sleep from subprocess import TimeoutExpired +from pyramid.paster import get_appsettings + def pytest_configure(): import logging @@ -82,6 +86,31 @@ def elasticsearch_server(request, elasticsearch_host_port): process.kill() +@pytest.mark.fixture_cost(10) +@pytest.yield_fixture(scope='session') +def redis_server(request): + from .redis_storage_fixture import initdb, server_process + datadir = str(request.config._tmpdirhandler.mktemp('redisdatatest', numbered=True)) + appsettings = get_appsettings('development.ini', name='app') + # Required settings in config + local_storage_host = appsettings['local_storage_host'] + local_storage_port = appsettings['local_storage_port'] + local_storage_redis_index = appsettings['local_storage_redis_index'] + local_storage_timeout = appsettings['local_storage_timeout'] + # Build fixture + redis_config_path = initdb(datadir, local_storage_port, echo=True) + process = server_process(redis_config_path, local_storage_port, local_storage_redis_index, echo=True) + # Sleep for short time to allow redis db to initialize + sleep(0.25) + yield f"Redis testing: redis-cli -p {local_storage_port} -n {local_storage_redis_index})" + if 'process' in locals() and process.poll() is None: + process.terminate() + try: + process.wait(timeout=10) + except TimeoutExpired: + process.kill() + + # http://docs.sqlalchemy.org/en/rel_0_8/orm/session.html#joining-a-session-into-an-external-transaction # By binding the SQLAlchemy Session to an external transaction multiple testapp # requests can be rolled back at the end of the test. diff --git a/src/snovault/tests/test_indexing.py b/src/snovault/tests/test_indexing.py index c395a0994c..d46fffc3c5 100644 --- a/src/snovault/tests/test_indexing.py +++ b/src/snovault/tests/test_indexing.py @@ -16,7 +16,7 @@ def autouse_external_tx(external_tx): @pytest.fixture(scope='session') -def app_settings(wsgi_server_host_port, elasticsearch_server, postgresql_server): +def app_settings(wsgi_server_host_port, elasticsearch_server, postgresql_server, redis_server): from .testappfixtures import _app_settings settings = _app_settings.copy() settings['create_tables'] = True diff --git a/src/snovault/tests/test_key.py b/src/snovault/tests/test_key.py index fa9384b2fd..b48266fe12 100644 --- a/src/snovault/tests/test_key.py +++ b/src/snovault/tests/test_key.py @@ -24,10 +24,15 @@ def autouse_external_tx(external_tx): @pytest.fixture(scope='session') -def app(DBSession): +def app(DBSession, app_settings): from pyramid.config import Configurator from snovault import DBSESSION config = Configurator() + config.registry.settings['local_storage_host'] = app_settings['local_storage_host'] + config.registry.settings['local_storage_port'] = app_settings['local_storage_port'] + config.registry.settings['local_storage_redis_index'] = app_settings['local_storage_redis_index'] + config.registry.settings['local_storage_timeout'] = app_settings['local_storage_timeout'] + config.registry.settings['local_tz'] = app_settings['local_tz'] config.registry[DBSESSION] = DBSession config.include('snovault') config.include('.testing_key') diff --git a/src/snovault/tests/test_redis_store.py b/src/snovault/tests/test_redis_store.py new file mode 100644 index 0000000000..e96ef558cc --- /dev/null +++ b/src/snovault/tests/test_redis_store.py @@ -0,0 +1,107 @@ +import pytest + +from redis import StrictRedis + +from snovault.local_storage import LocalStoreClient + + +def _get_client(local_settings): + return LocalStoreClient( + db_index=local_settings['local_storage_redis_index'], + host=local_settings['local_storage_host'], + port=local_settings['local_storage_port'], + socket_timeout=local_settings['local_storage_timeout'], + ) + + +def test_local_storage_server_fixture(app_settings): + local_store = _get_client(app_settings) + try: + local_store.ping() + except Exception as excp: # pylint: disable=broad-except + print(excp) + assert False + assert True + + +class TestLocalStore(): + dict_key = 'fakedictkey' + item_key = 'fakeitemkey' + list_key = 'fakelistkey' + + @pytest.fixture(autouse=True) + def setup_method(self, app_settings): + ''' + Add local store to test class and cleans up standard redis keys + - Uses the exposed redis client directly + ''' + self.local_store = _get_client(app_settings) + self.local_store.client.delete(self.dict_key) + self.local_store.client.delete(self.item_key) + for item in self.local_store.client.lrange(self.list_key, 0, -1): + self.local_store.client.delete(item) + self.local_store.client.delete(self.list_key) + + def test_init(self): + assert hasattr(self.local_store, 'local_tz') + assert isinstance(self.local_store.local_tz, str) + assert self.local_store.local_tz == 'GMT' + assert hasattr(self.local_store, 'client') + assert isinstance(self.local_store.client, StrictRedis) + + def test_get_tag(self): + config_tag = 'localtesting' + tag = self.local_store.get_tag(config_tag) + assert isinstance(tag, str) + assert len(tag) == len(config_tag) + len(':') + 4 + tag = self.local_store.get_tag(config_tag, num_bytes=4) + assert len(tag) == len(config_tag) + len(':') + 8 + + def test_ping(self): + try: + self.local_store.ping() + except Exception as excp: # pylint: disable=broad-except + assert False + assert True + + def test_dict_get(self): + hash_dict = self.local_store.dict_get(self.dict_key) + assert isinstance(hash_dict, dict) + assert hash_dict == {} + + def test_dict_set(self): + update_dict = {'somekey': 'somevalue'} + self.local_store.dict_set(self.dict_key, update_dict) + hash_dict = self.local_store.dict_get(self.dict_key) + assert hash_dict == update_dict + + def test_item_get(self): + item = self.local_store.item_get(self.item_key) + assert item is None + + def test_item_set(self): + set_item = 'someitem' + self.local_store.item_set(self.item_key, set_item) + item = self.local_store.item_get(self.item_key) + assert isinstance(item, str) + assert item == set_item + + def test_list_add(self): + # Lists are LIFO + items_to_add = ['A', 'B', 'C'] + for item in items_to_add: + self.local_store.list_add(self.list_key, item) + items = self.local_store.list_get(self.list_key, 0, -1) + items.sort() + assert items_to_add == items + + def test_list_get(self): + # Lists are LIFO + items_to_add = ['A', 'B', 'C', 'D', 'E'] + for item in items_to_add: + self.local_store.list_add(self.list_key, item) + items = self.local_store.list_get(self.list_key, 0, 0) + assert items[0] == items_to_add[-1] + items = self.local_store.list_get(self.list_key, 1, 3) + items_to_add.sort(reverse=True) + assert items == items_to_add[1:4] diff --git a/src/snovault/tests/testappfixtures.py b/src/snovault/tests/testappfixtures.py index 6a4f7fce2b..27eefbefcb 100644 --- a/src/snovault/tests/testappfixtures.py +++ b/src/snovault/tests/testappfixtures.py @@ -22,12 +22,18 @@ 'multiauth.policy.accesskey.check': 'snovault.authentication.basic_auth_check', 'multiauth.policy.webuser.use': 'snovault.authentication.NamespacedAuthenticationPolicy', 'multiauth.policy.webuser.namespace': 'webuser', - 'multiauth.policy.webuser.base': 'snovault.authentication.WebUserAuthenticationPolicy' + 'multiauth.policy.webuser.base': 'snovault.authentication.WebUserAuthenticationPolicy', + # Local Storage + 'local_storage_host': 'localhost', + 'local_storage_port': 6378, + 'local_storage_redis_index': 0, + 'local_storage_timeout': 5, + 'local_tz': 'GMT', } @pytest.fixture(scope='session') -def app_settings(request, wsgi_server_host_port, conn, DBSession): +def app_settings(request, wsgi_server_host_port, conn, DBSession, redis_server): from snovault import DBSESSION settings = _app_settings.copy() settings[DBSESSION] = DBSession diff --git a/src/snowflakes/tests/conftest.py b/src/snowflakes/tests/conftest.py index aac0234c38..ab467adbf7 100644 --- a/src/snowflakes/tests/conftest.py +++ b/src/snowflakes/tests/conftest.py @@ -44,11 +44,17 @@ def autouse_external_tx(external_tx): 'postgresql.statement_timeout': 20, 'retry.attempts': 3, 'ontology_path': pkg_resources.resource_filename('snowflakes', '../../ontology.json'), + # Local Storage + 'local_storage_host': 'localhost', + 'local_storage_port': 6378, + 'local_storage_redis_index': 0, + 'local_storage_timeout': 5, + 'local_tz': 'GMT', } @fixture(scope='session') -def app_settings(request, wsgi_server_host_port, conn, DBSession): +def app_settings(request, wsgi_server_host_port, conn, DBSession, redis_server): from snovault import DBSESSION settings = _app_settings.copy() settings[DBSESSION] = DBSession diff --git a/src/snowflakes/tests/features/conftest.py b/src/snowflakes/tests/features/conftest.py index 44049d80b9..5f66e73c54 100644 --- a/src/snowflakes/tests/features/conftest.py +++ b/src/snowflakes/tests/features/conftest.py @@ -19,7 +19,7 @@ def external_tx(): @pytest.fixture(scope='session') -def app_settings(wsgi_server_host_port, elasticsearch_server, postgresql_server): +def app_settings(wsgi_server_host_port, elasticsearch_server, postgresql_server, redis_server): from snovault.tests.testappfixtures import _app_settings settings = _app_settings.copy() settings['create_tables'] = True