diff --git a/csvapi/cli.py b/csvapi/cli.py index 6d5675c..9ceec4a 100644 --- a/csvapi/cli.py +++ b/csvapi/cli.py @@ -29,6 +29,8 @@ def cli(): help='Automatically reload if code change detected') @click.option('--cache/--no-cache', default=True, help='Do not parse CSV again if DB already exists') +@click.option('--cache-max-age', default=7, + help='Cache expiration time in days') @click.option('-w', '--max-workers', default=3, help='Max number of ThreadPoolExecutor workers') @click.option('--ssl-cert', default=None, @@ -36,7 +38,7 @@ def cli(): @click.option('--ssl-key', default=None, help='Path to SSL key') @cli.command() -def serve(dbs, host, port, debug, reload, cache, max_workers, ssl_cert, ssl_key): +def serve(dbs, host, port, debug, reload, cache, cache_max_age, max_workers, ssl_cert, ssl_key): ssl_context = None if ssl_cert and ssl_key: ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) @@ -44,6 +46,7 @@ def serve(dbs, host, port, debug, reload, cache, max_workers, ssl_cert, ssl_key) app.config.update({ 'DB_ROOT_DIR': dbs, 'CSV_CACHE_ENABLED': cache, + 'CACHE_MAX_AGE': cache_max_age, 'MAX_WORKERS': max_workers, 'DEBUG': debug, # TODO this probably does not exist in Quart diff --git a/csvapi/exportview.py b/csvapi/exportview.py index 8fbb7be..9488f64 100644 --- a/csvapi/exportview.py +++ b/csvapi/exportview.py @@ -15,7 +15,7 @@ class ExportView(TableView): async def get(self, urlhash): "This will inherit sorting and filtering from TableView" - db_info = get_db_info(urlhash) + db_info = get_db_info(urlhash=urlhash) p = Path(db_info['db_path']) if not p.exists(): raise APIError('Database has probably been removed.', status=404) diff --git a/csvapi/parser.py b/csvapi/parser.py index 467a85d..28df041 100644 --- a/csvapi/parser.py +++ b/csvapi/parser.py @@ -1,10 +1,11 @@ import os +import uuid import agate import agatesql # noqa import cchardet as chardet -from csvapi.utils import get_db_info +from csvapi.utils import add_entry_to_sys_db from csvapi.type_tester import agate_tester SNIFF_LIMIT = 4096 @@ -33,15 +34,17 @@ def from_excel(filepath): return agate.Table.from_xls(filepath, column_types=agate_tester()) -def to_sql(table, urlhash, storage): - db_info = get_db_info(urlhash, storage=storage) - table.to_sql(db_info['dsn'], db_info['db_name'], overwrite=True) +def to_sql(table, urlhash, filehash, storage): + db_name = str(uuid.uuid4()) + db_dsn = f"sqlite:///{storage}/{db_name}.db" + table.to_sql(db_dsn, urlhash, overwrite=True) + add_entry_to_sys_db(db_name, urlhash, filehash) -def parse(filepath, urlhash, storage, encoding=None, sniff_limit=SNIFF_LIMIT): +def parse(filepath, urlhash, filehash, storage, encoding=None, sniff_limit=SNIFF_LIMIT): if is_binary(filepath): table = from_excel(filepath) else: encoding = detect_encoding(filepath) if not encoding else encoding table = from_csv(filepath, encoding=encoding, sniff_limit=sniff_limit) - return to_sql(table, urlhash, storage) + return to_sql(table, urlhash, filehash, storage) \ No newline at end of file diff --git a/csvapi/parseview.py b/csvapi/parseview.py index bcb533b..64311a4 100644 --- a/csvapi/parseview.py +++ b/csvapi/parseview.py @@ -1,6 +1,8 @@ import os import tempfile +import time +import xxhash import aiohttp import validators @@ -9,14 +11,15 @@ from csvapi.errors import APIError from csvapi.parser import parse -from csvapi.utils import already_exists, get_hash +from csvapi.utils import is_hash_relevant, get_hash, age_valid +X = xxhash.xxh64() class ParseView(MethodView): @staticmethod async def do_parse(url, urlhash, encoding, storage, logger, sniff_limit, max_file_size): - logger.debug('* do_parse %s (%s)', urlhash, url) + logger.debug('* do_parse (%s)', url) tmp = tempfile.NamedTemporaryFile(delete=False) chunk_count = 0 chunk_size = 1024 @@ -30,13 +33,23 @@ async def do_parse(url, urlhash, encoding, storage, logger, sniff_limit, max_fil raise Exception('File too big (max size is %s bytes)' % max_file_size) if not chunk: break + X.update(chunk) tmp.write(chunk) chunk_count += 1 tmp.close() - logger.debug('* Downloaded %s', urlhash) - logger.debug('* Parsing %s...', urlhash) - parse(tmp.name, urlhash, storage, encoding=encoding, sniff_limit=sniff_limit) - logger.debug('* Parsed %s', urlhash) + filehash = X.hexdigest() + logger.debug('* Downloaded %s', filehash) + if not is_hash_relevant(urlhash, filehash): + print("HASH IS NOT RELEVANT") + try: + logger.debug('* Parsing %s...', filehash) + parse(tmp.name, urlhash, filehash, storage, encoding=encoding, sniff_limit=sniff_limit) + logger.debug('* Parsed %s', filehash) + except Exception as e: + raise APIError('Error parsing CSV: %s' % e) + else: + print("HASH IS RELEVANT") + logger.info(f"File hash for {urlhash} is relevant, skipping parse.") finally: logger.debug('Removing tmp file: %s', tmp.name) os.unlink(tmp.name) @@ -50,22 +63,25 @@ async def get(self): if not validators.url(url): raise APIError('Malformed url parameter.', status=400) urlhash = get_hash(url) + storage = app.config['DB_ROOT_DIR'] - if not already_exists(urlhash): + if not age_valid(storage, urlhash): + print("AGE IS NOT OK") try: - storage = app.config['DB_ROOT_DIR'] - await self.do_parse(url=url, - urlhash=urlhash, - encoding=encoding, - storage=storage, - logger=app.logger, - sniff_limit=app.config.get('CSV_SNIFF_LIMIT'), - max_file_size=app.config.get('MAX_FILE_SIZE') - ) + await self.do_parse( + url=url, + urlhash=urlhash, + encoding=encoding, + storage=storage, + logger=app.logger, + sniff_limit=app.config.get('CSV_SNIFF_LIMIT'), + max_file_size=app.config.get('MAX_FILE_SIZE') + ) except Exception as e: raise APIError('Error parsing CSV: %s' % e) else: - app.logger.info(f"{urlhash}.db already exists, skipping parse.") + print("AGE IS OK") + app.logger.info(f"Db for {urlhash} is young enough, serving as is.") scheme = 'https' if app.config.get('FORCE_SSL') else request.scheme return jsonify({ 'ok': True, diff --git a/csvapi/tableview.py b/csvapi/tableview.py index cf29e54..1cd7437 100644 --- a/csvapi/tableview.py +++ b/csvapi/tableview.py @@ -146,10 +146,11 @@ async def data(self, db_info, export=False): return res async def get(self, urlhash): - db_info = get_db_info(urlhash) - p = Path(db_info['db_path']) - if not p.exists(): - raise APIError('Database has probably been removed.', status=404) + db_info = get_db_info(urlhash=urlhash) + if db_info is not None: + p = Path(db_info['db_path']) + if not p.exists(): + raise APIError('Database has probably been removed.', status=404) start = time.time() try: diff --git a/csvapi/uploadview.py b/csvapi/uploadview.py index 85bce83..7f3005f 100644 --- a/csvapi/uploadview.py +++ b/csvapi/uploadview.py @@ -26,6 +26,8 @@ async def post(self): _file.save(_tmpfile) _tmpfile.close() parse(_tmpfile.name, content_hash, storage, sniff_limit=sniff_limit) + # Here no urlhash, we need to change the parse code to make urlhash as possible None. + # But how will get_db_info in tableview work with such an entry in sys DB? finally: os.unlink(_tmpfile.name) diff --git a/csvapi/utils.py b/csvapi/utils.py index f526cea..accbe96 100644 --- a/csvapi/utils.py +++ b/csvapi/utils.py @@ -1,4 +1,8 @@ -import hashlib +import os +import datetime + +import sqlite3 +import xxhash from concurrent import futures from pathlib import Path @@ -8,15 +12,78 @@ executor = None -def get_db_info(urlhash, storage=None): - # app.config not thread safe, sometimes we need to pass storage directly +def create_sys_db(app, storage=None): + # We do not use rhe get_sys_db_info here because the call is made outside of the app context. storage = storage or app.config['DB_ROOT_DIR'] - dbpath = f"{storage}/{urlhash}.db" + dbpath = f"{storage}/sys.db" + + conn = sqlite3.connect(dbpath) + with conn: + conn.execute("CREATE TABLE IF NOT EXISTS csvapi_sys (id integer primary key, db_uuid text, urlhash text, filehash text, creation_time date)") + conn.close() + + +def get_sys_db_info(): + storage = app.config['DB_ROOT_DIR'] + dbpath = f"{storage}/sys.db" return { 'dsn': f"sqlite:///{dbpath}", - 'db_name': urlhash, + 'db_name': "sys.db", + 'table_name': "csvapi_sys", + 'db_path': dbpath, + } + +def add_entry_to_sys_db(uuid, urlhash, filehash): + now = datetime.datetime.now() + now_str = now.strftime('%Y-%m-%d') + + sys_db = get_sys_db_info() + conn = sqlite3.connect(sys_db['db_path']) + with conn: + conn.execute("INSERT INTO csvapi_sys (db_uuid, urlhash, filehash, creation_time) values (?, ?, ?, ?)", (uuid, urlhash, filehash, now_str)) + conn.close() + + + +def get_db_info(urlhash=None, filehash=None, storage=None): + storage = storage or app.config['DB_ROOT_DIR'] + + sys_db = get_sys_db_info() + conn = sqlite3.connect(sys_db['db_path']) + c = conn.cursor() + + # The function permits to seek by urlhash and filehash because of the uploadview. + # Do we want to keep things this way? + + if urlhash is not None: + c.execute('SELECT * FROM csvapi_sys WHERE urlhash=?', (urlhash,)) + elif filehash is not None: + c.execute('SELECT * FROM csvapi_sys WHERE filehash=?', (filehash,)) + else: + raise RuntimeError('Func get_db_info need at least one not none argument') + + res = c.fetchone() + if not res: + return None + + dbuuid = res[1] + urlhash = res[2] + filehash = res[3] + creadate = res[4] + dbpath = f"{storage}/{dbuuid}.db" + dbname = dbuuid + + conn.close() + return { + 'db_uuid': dbuuid, + 'urlhash': urlhash, + 'filehash': filehash, + 'creation_date': creadate, 'table_name': urlhash, 'db_path': dbpath, + 'db_name': dbname, + 'dsn': f"sqlite:///{dbpath}" + } @@ -34,11 +101,53 @@ def get_hash(to_hash): def get_hash_bytes(to_hash): - return hashlib.md5(to_hash).hexdigest() + return xxhash.xxh64(to_hash).hexdigest() -def already_exists(urlhash): +def already_exists(filehash): cache_enabled = app.config.get('CSV_CACHE_ENABLED') if not cache_enabled: return False - return Path(get_db_info(urlhash)['db_path']).exists() + + db = get_db_info(filehash=filehash) + if db is None: + return False + + return True + + +def is_hash_relevant(urlhash, filehash): + cache_enabled = app.config.get('CSV_CACHE_ENABLED') + if not cache_enabled: + return False + + db = get_db_info(urlhash=urlhash) + if db is None: + return False + + # Question here is to wether or not to seek by urlhash or directly by filehash. + # Seeking by filehash would save the hash comparison but are we sure we are getting the right entry for the urlhash we wanted? + # The answer is yes if there can't be more than one entry by urlhash. + if db['filehash'] == filehash: + return True + + return False + + +def age_valid(storage, urlhash): + max_age = app.config['CACHE_MAX_AGE'] + + db = get_db_info(urlhash=urlhash) + if db is None: + return False + + date_time_obj = datetime.datetime.strptime(db['creation_date'], '%Y-%m-%d') + later_time = datetime.datetime.now() + file_age = later_time - date_time_obj + if file_age.days > max_age: + return False + + return True + + + diff --git a/csvapi/webservice.py b/csvapi/webservice.py index 510c0ad..9533a5b 100644 --- a/csvapi/webservice.py +++ b/csvapi/webservice.py @@ -11,6 +11,7 @@ from csvapi.uploadview import UploadView from csvapi.parseview import ParseView from csvapi.security import filter_referrers +from csvapi.utils import create_sys_db app = Quart(__name__) app = cors(app, allow_origin='*') @@ -29,6 +30,7 @@ from raven import Client app.extensions['sentry'] = Client(app.config['SENTRY_DSN']) +create_sys_db(app) def handle_and_print_error(): sentry_id = None diff --git a/requirements/install.pip b/requirements/install.pip index e2ebf03..72c891a 100644 --- a/requirements/install.pip +++ b/requirements/install.pip @@ -10,3 +10,4 @@ quart-cors==0.2.0 raven==6.10.0 cchardet==2.1.4 python-stdnum==1.11 +xxhash==1.4.3