Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions bw2data/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,7 @@
from bw2data import calculation_setups, config, databases, geomapping
from bw2data.backends import sqlite3_lci_db
from bw2data.backends.proxies import Activity
from bw2data.backends.schema import (
ActivityDataset,
ExchangeDataset,
_insert_many_activities,
_insert_many_exchanges,
get_id,
)
from bw2data.backends.schema import ActivityDataset, ExchangeDataset, get_id
from bw2data.backends.typos import (
check_activity_keys,
check_activity_type,
Expand Down Expand Up @@ -607,7 +601,7 @@ def _efficient_write_dataset(
exchanges: list,
activities: list,
check_typos: bool = True,
) -> None:
) -> (list, list):
for exchange in ds.get("exchanges", []):
if "input" not in exchange or "amount" not in exchange:
raise InvalidExchange
Expand All @@ -622,6 +616,15 @@ def _efficient_write_dataset(
exchange["output"] = (ds["database"], ds["code"])
exchanges.append(dict_as_exchangedataset(exchange))

# Query gets passed as INSERT INTO x VALUES ('?', '?'...)
# SQLite3 has a limit of 999 variables,
# So 6 fields * 125 is under the limit
# Otherwise get the following:
# peewee.OperationalError: too many SQL variables
if len(exchanges) > 125:
ExchangeDataset.insert_many(exchanges).execute()
exchanges = []

ds = {k: v for k, v in ds.items() if k != "exchanges"}

if check_typos:
Expand All @@ -630,6 +633,12 @@ def _efficient_write_dataset(

activities.append(dict_as_activitydataset(ds, add_snowflake_id=True))

if len(activities) > 125:
ActivityDataset.insert_many(activities).execute()
activities = []

return exchanges, activities

def _efficient_write_many_data(
self, data: list, indices: bool = True, check_typos: bool = True
) -> None:
Expand All @@ -643,10 +652,14 @@ def _efficient_write_many_data(
exchanges, activities = [], []

for ds in tqdm_wrapper(data, getattr(config, "is_test", False)):
self._efficient_write_dataset(ds, exchanges, activities, check_typos)
exchanges, activities = self._efficient_write_dataset(
ds, exchanges, activities, check_typos
)

_insert_many_activities(activities)
_insert_many_exchanges(exchanges)
if activities:
ActivityDataset.insert_many(activities).execute()
if exchanges:
ExchangeDataset.insert_many(exchanges).execute()
sqlite3_lci_db.db.commit()
sqlite3_lci_db.vacuum()
except:
Expand Down
56 changes: 0 additions & 56 deletions bw2data/backends/schema.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import pickle

from peewee import DoesNotExist, TextField

from bw2data.errors import UnknownObject
Expand Down Expand Up @@ -90,57 +88,3 @@ def _remove_changed_activity_key_from_get_id_cache(sender, old=None, **kwargs):
signaleddataset_on_delete.connect(_remove_activity_from_get_id_cache)
on_activity_database_change.connect(_remove_changed_activity_key_from_get_id_cache)
on_activity_code_change.connect(_remove_changed_activity_key_from_get_id_cache)


def _insert_many_activities(activities: list) -> None:
"""Bulk-insert activity dicts via raw SQLite executemany, bypassing Peewee ORM overhead.

Must be called within an active transaction. Each dict must be in the format
produced by dict_as_activitydataset(ds, add_snowflake_id=True).
"""
if not activities:
return
conn = ActivityDataset._meta.database.connection()
conn.cursor().executemany(
'INSERT INTO "activitydataset" ("id", "data", "code", "database", "location", "name", "product", "type") VALUES (?, ?, ?, ?, ?, ?, ?, ?)',
[
(
row["id"],
# Protocol 4 matches PickleField.db_value() so both write paths produce identical blobs.
pickle.dumps(row["data"], protocol=4),
row["code"],
row["database"],
str(v) if (v := row.get("location")) is not None else None,
row.get("name"),
row.get("product"),
row.get("type"),
)
for row in activities
],
)


def _insert_many_exchanges(exchanges: list) -> None:
"""Bulk-insert exchange dicts via raw SQLite executemany, bypassing Peewee ORM overhead.

Must be called within an active transaction. Each dict must be in the format
produced by dict_as_exchangedataset(exc).
"""
if not exchanges:
return
conn = ExchangeDataset._meta.database.connection()
conn.cursor().executemany(
'INSERT INTO "exchangedataset" ("data", "input_code", "input_database", "output_code", "output_database", "type") VALUES (?, ?, ?, ?, ?, ?)',
[
(
# Protocol 4 matches PickleField.db_value() so both write paths produce identical blobs.
pickle.dumps(row["data"], protocol=4),
row["input_code"],
row["input_database"],
row["output_code"],
row["output_database"],
row["type"],
)
for row in exchanges
],
)
6 changes: 3 additions & 3 deletions bw2data/updates.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import os
import pickle
import re
Expand Down Expand Up @@ -163,8 +164,7 @@ def set_initial_updates(cls):
if "updates" in preferences:
return
SQL = "PRAGMA table_info(activitydataset)"
with sqlite3.connect(sqlite3_lci_db.db.database) as conn:
column_names = {x[1] for x in conn.execute(SQL)}
column_names = {x[1] for x in sqlite3_lci_db.execute_sql(SQL)}
if "code" in column_names:
preferences["updates"] = {key: True for key in cls.UPDATES}
else:
Expand All @@ -189,7 +189,7 @@ def reprocess_all_1_0(cls):

@classmethod
def schema_change_20_compound_keys(cls):
with sqlite3.connect(sqlite3_lci_db.db.database) as conn:
with contextlib.closing(sqlite3.connect(sqlite3_lci_db.db.database)) as conn:
stdout_feedback_logger.info("Update ActivityDataset table schema and data")
conn.executescript(UPDATE_ACTIVITYDATASET)
stdout_feedback_logger.info("Updating ExchangeDataset table schema and data")
Expand Down
Loading