Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion csvapi/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,24 @@ def cli():
help='Automatically reload if code change detected')
@click.option('--cache/--no-cache', default=True,
help='Do not parse CSV again if DB already exists')
@click.option('--cache-max-age', default=7,
help='Cache expiration time in days')
@click.option('-w', '--max-workers', default=3,
help='Max number of ThreadPoolExecutor workers')
@click.option('--ssl-cert', default=None,
help='Path to SSL certificate')
@click.option('--ssl-key', default=None,
help='Path to SSL key')
@cli.command()
def serve(dbs, host, port, debug, reload, cache, max_workers, ssl_cert, ssl_key):
def serve(dbs, host, port, debug, reload, cache, cache_max_age, max_workers, ssl_cert, ssl_key):
ssl_context = None
if ssl_cert and ssl_key:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=ssl_cert, keyfile=ssl_key)
app.config.update({
'DB_ROOT_DIR': dbs,
'CSV_CACHE_ENABLED': cache,
'CACHE_MAX_AGE': cache_max_age,
'MAX_WORKERS': max_workers,
'DEBUG': debug,
# TODO this probably does not exist in Quart
Expand Down
2 changes: 1 addition & 1 deletion csvapi/exportview.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class ExportView(TableView):

async def get(self, urlhash):
"This will inherit sorting and filtering from TableView"
db_info = get_db_info(urlhash)
db_info = get_db_info(urlhash=urlhash)
p = Path(db_info['db_path'])
if not p.exists():
raise APIError('Database has probably been removed.', status=404)
Expand Down
15 changes: 9 additions & 6 deletions csvapi/parser.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import os
import uuid

import agate
import agatesql # noqa
import cchardet as chardet

from csvapi.utils import get_db_info
from csvapi.utils import add_entry_to_sys_db
from csvapi.type_tester import agate_tester

SNIFF_LIMIT = 4096
Expand Down Expand Up @@ -33,15 +34,17 @@ def from_excel(filepath):
return agate.Table.from_xls(filepath, column_types=agate_tester())


def to_sql(table, urlhash, storage):
db_info = get_db_info(urlhash, storage=storage)
table.to_sql(db_info['dsn'], db_info['db_name'], overwrite=True)
def to_sql(table, urlhash, filehash, storage):
db_name = str(uuid.uuid4())
db_dsn = f"sqlite:///{storage}/{db_name}.db"
table.to_sql(db_dsn, urlhash, overwrite=True)
add_entry_to_sys_db(db_name, urlhash, filehash)


def parse(filepath, urlhash, storage, encoding=None, sniff_limit=SNIFF_LIMIT):
def parse(filepath, urlhash, filehash, storage, encoding=None, sniff_limit=SNIFF_LIMIT):
if is_binary(filepath):
table = from_excel(filepath)
else:
encoding = detect_encoding(filepath) if not encoding else encoding
table = from_csv(filepath, encoding=encoding, sniff_limit=sniff_limit)
return to_sql(table, urlhash, storage)
return to_sql(table, urlhash, filehash, storage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing line at EOF

50 changes: 33 additions & 17 deletions csvapi/parseview.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
import tempfile
import time

import xxhash
import aiohttp
import validators

Expand All @@ -9,14 +11,15 @@

from csvapi.errors import APIError
from csvapi.parser import parse
from csvapi.utils import already_exists, get_hash
from csvapi.utils import is_hash_relevant, get_hash, age_valid

X = xxhash.xxh64()

class ParseView(MethodView):

@staticmethod
async def do_parse(url, urlhash, encoding, storage, logger, sniff_limit, max_file_size):
logger.debug('* do_parse %s (%s)', urlhash, url)
logger.debug('* do_parse (%s)', url)
tmp = tempfile.NamedTemporaryFile(delete=False)
chunk_count = 0
chunk_size = 1024
Expand All @@ -30,13 +33,23 @@ async def do_parse(url, urlhash, encoding, storage, logger, sniff_limit, max_fil
raise Exception('File too big (max size is %s bytes)' % max_file_size)
if not chunk:
break
X.update(chunk)
tmp.write(chunk)
chunk_count += 1
tmp.close()
logger.debug('* Downloaded %s', urlhash)
logger.debug('* Parsing %s...', urlhash)
parse(tmp.name, urlhash, storage, encoding=encoding, sniff_limit=sniff_limit)
logger.debug('* Parsed %s', urlhash)
filehash = X.hexdigest()
logger.debug('* Downloaded %s', filehash)
if not is_hash_relevant(urlhash, filehash):
print("HASH IS NOT RELEVANT")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

try:
logger.debug('* Parsing %s...', filehash)
parse(tmp.name, urlhash, filehash, storage, encoding=encoding, sniff_limit=sniff_limit)
logger.debug('* Parsed %s', filehash)
except Exception as e:
raise APIError('Error parsing CSV: %s' % e)
else:
print("HASH IS RELEVANT")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

logger.info(f"File hash for {urlhash} is relevant, skipping parse.")
finally:
logger.debug('Removing tmp file: %s', tmp.name)
os.unlink(tmp.name)
Expand All @@ -50,22 +63,25 @@ async def get(self):
if not validators.url(url):
raise APIError('Malformed url parameter.', status=400)
urlhash = get_hash(url)
storage = app.config['DB_ROOT_DIR']

if not already_exists(urlhash):
if not age_valid(storage, urlhash):
print("AGE IS NOT OK")
try:
storage = app.config['DB_ROOT_DIR']
await self.do_parse(url=url,
urlhash=urlhash,
encoding=encoding,
storage=storage,
logger=app.logger,
sniff_limit=app.config.get('CSV_SNIFF_LIMIT'),
max_file_size=app.config.get('MAX_FILE_SIZE')
)
await self.do_parse(
url=url,
urlhash=urlhash,
encoding=encoding,
storage=storage,
logger=app.logger,
sniff_limit=app.config.get('CSV_SNIFF_LIMIT'),
max_file_size=app.config.get('MAX_FILE_SIZE')
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation

except Exception as e:
raise APIError('Error parsing CSV: %s' % e)
else:
app.logger.info(f"{urlhash}.db already exists, skipping parse.")
print("AGE IS OK")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

app.logger.info(f"Db for {urlhash} is young enough, serving as is.")
scheme = 'https' if app.config.get('FORCE_SSL') else request.scheme
return jsonify({
'ok': True,
Expand Down
9 changes: 5 additions & 4 deletions csvapi/tableview.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,11 @@ async def data(self, db_info, export=False):
return res

async def get(self, urlhash):
db_info = get_db_info(urlhash)
p = Path(db_info['db_path'])
if not p.exists():
raise APIError('Database has probably been removed.', status=404)
db_info = get_db_info(urlhash=urlhash)
if db_info is not None:
p = Path(db_info['db_path'])
if not p.exists():
raise APIError('Database has probably been removed.', status=404)

start = time.time()
try:
Expand Down
2 changes: 2 additions & 0 deletions csvapi/uploadview.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ async def post(self):
_file.save(_tmpfile)
_tmpfile.close()
parse(_tmpfile.name, content_hash, storage, sniff_limit=sniff_limit)
# Here no urlhash, we need to change the parse code to make urlhash as possible None.
# But how will get_db_info in tableview work with such an entry in sys DB?
finally:
os.unlink(_tmpfile.name)

Expand Down
125 changes: 117 additions & 8 deletions csvapi/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import hashlib
import os
import datetime

import sqlite3
import xxhash

from concurrent import futures
from pathlib import Path
Expand All @@ -8,15 +12,78 @@
executor = None


def get_db_info(urlhash, storage=None):
# app.config not thread safe, sometimes we need to pass storage directly
def create_sys_db(app, storage=None):
# We do not use rhe get_sys_db_info here because the call is made outside of the app context.
storage = storage or app.config['DB_ROOT_DIR']
dbpath = f"{storage}/{urlhash}.db"
dbpath = f"{storage}/sys.db"

conn = sqlite3.connect(dbpath)
with conn:
conn.execute("CREATE TABLE IF NOT EXISTS csvapi_sys (id integer primary key, db_uuid text, urlhash text, filehash text, creation_time date)")
conn.close()


def get_sys_db_info():
storage = app.config['DB_ROOT_DIR']
dbpath = f"{storage}/sys.db"
return {
'dsn': f"sqlite:///{dbpath}",
'db_name': urlhash,
'db_name': "sys.db",
'table_name': "csvapi_sys",
'db_path': dbpath,
}

def add_entry_to_sys_db(uuid, urlhash, filehash):
now = datetime.datetime.now()
now_str = now.strftime('%Y-%m-%d')

sys_db = get_sys_db_info()
conn = sqlite3.connect(sys_db['db_path'])
with conn:
conn.execute("INSERT INTO csvapi_sys (db_uuid, urlhash, filehash, creation_time) values (?, ?, ?, ?)", (uuid, urlhash, filehash, now_str))
conn.close()



def get_db_info(urlhash=None, filehash=None, storage=None):
storage = storage or app.config['DB_ROOT_DIR']

sys_db = get_sys_db_info()
conn = sqlite3.connect(sys_db['db_path'])
c = conn.cursor()

# The function permits to seek by urlhash and filehash because of the uploadview.
# Do we want to keep things this way?

if urlhash is not None:
c.execute('SELECT * FROM csvapi_sys WHERE urlhash=?', (urlhash,))
elif filehash is not None:
c.execute('SELECT * FROM csvapi_sys WHERE filehash=?', (filehash,))
else:
raise RuntimeError('Func get_db_info need at least one not none argument')

res = c.fetchone()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code below can probably be made cleaner/shorter by getting column values in a dict instead of a list eg res[‘uuid’].

if not res:
return None

dbuuid = res[1]
urlhash = res[2]
filehash = res[3]
creadate = res[4]
dbpath = f"{storage}/{dbuuid}.db"
dbname = dbuuid

conn.close()
return {
'db_uuid': dbuuid,
'urlhash': urlhash,
'filehash': filehash,
'creation_date': creadate,
'table_name': urlhash,
'db_path': dbpath,
'db_name': dbname,
'dsn': f"sqlite:///{dbpath}"

}


Expand All @@ -34,11 +101,53 @@ def get_hash(to_hash):


def get_hash_bytes(to_hash):
return hashlib.md5(to_hash).hexdigest()
return xxhash.xxh64(to_hash).hexdigest()


def already_exists(urlhash):
def already_exists(filehash):
cache_enabled = app.config.get('CSV_CACHE_ENABLED')
if not cache_enabled:
return False
return Path(get_db_info(urlhash)['db_path']).exists()

db = get_db_info(filehash=filehash)
if db is None:
return False

return True


def is_hash_relevant(urlhash, filehash):
cache_enabled = app.config.get('CSV_CACHE_ENABLED')
if not cache_enabled:
return False

db = get_db_info(urlhash=urlhash)
if db is None:
return False

# Question here is to wether or not to seek by urlhash or directly by filehash.
# Seeking by filehash would save the hash comparison but are we sure we are getting the right entry for the urlhash we wanted?
# The answer is yes if there can't be more than one entry by urlhash.
if db['filehash'] == filehash:
return True

return False


def age_valid(storage, urlhash):
max_age = app.config['CACHE_MAX_AGE']

db = get_db_info(urlhash=urlhash)
if db is None:
return False

date_time_obj = datetime.datetime.strptime(db['creation_date'], '%Y-%m-%d')
later_time = datetime.datetime.now()
file_age = later_time - date_time_obj
if file_age.days > max_age:
return False

return True



2 changes: 2 additions & 0 deletions csvapi/webservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from csvapi.uploadview import UploadView
from csvapi.parseview import ParseView
from csvapi.security import filter_referrers
from csvapi.utils import create_sys_db

app = Quart(__name__)
app = cors(app, allow_origin='*')
Expand All @@ -29,6 +30,7 @@
from raven import Client
app.extensions['sentry'] = Client(app.config['SENTRY_DSN'])

create_sys_db(app)

def handle_and_print_error():
sentry_id = None
Expand Down
1 change: 1 addition & 0 deletions requirements/install.pip
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ quart-cors==0.2.0
raven==6.10.0
cchardet==2.1.4
python-stdnum==1.11
xxhash==1.4.3