From ead4baf874dab7628c2e326f66e8683c8e08a753 Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 24 Jan 2026 17:07:44 +1100 Subject: [PATCH 1/7] feat: increase length of LithologicModifier field to accommodate larger values --- alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py | 2 +- db/nma_legacy.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py b/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py index 97770d56..61173afa 100644 --- a/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py +++ b/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py @@ -46,7 +46,7 @@ def upgrade() -> None: sa.Column("StratBottom", sa.Float(), nullable=True), sa.Column("UnitIdentifier", sa.String(length=50), nullable=True), sa.Column("Lithology", sa.String(length=100), nullable=True), - sa.Column("LithologicModifier", sa.String(length=100), nullable=True), + sa.Column("LithologicModifier", sa.String(length=250), nullable=True), sa.Column("ContributingUnit", sa.String(length=10), nullable=True), sa.Column("StratSource", sa.Text(), nullable=True), sa.Column("StratNotes", sa.Text(), nullable=True), diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 72f39804..3d58f182 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -226,7 +226,7 @@ class NMA_Stratigraphy(Base): unit_identifier: Mapped[Optional[str]] = mapped_column("UnitIdentifier", String(50)) lithology: Mapped[Optional[str]] = mapped_column("Lithology", String(100)) lithologic_modifier: Mapped[Optional[str]] = mapped_column( - "LithologicModifier", String(100) + "LithologicModifier", String(250) ) contributing_unit: Mapped[Optional[str]] = mapped_column( "ContributingUnit", String(10) From fa2463ea7d802f9647c1c6460e9c8f2a353af52b Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 24 Jan 2026 17:19:25 +1100 Subject: [PATCH 2/7] feat: update NMA_Stratigraphy table schema with new constraints and field adjustments --- ...c3b4a5e67_create_nma_stratigraphy_table.py | 22 ++++++++------- db/nma_legacy.py | 27 ++++++++++++------- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py b/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py index 61173afa..29c3cab8 100644 --- a/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py +++ b/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py @@ -35,22 +35,26 @@ def upgrade() -> None: nullable=False, ), sa.Column("WellID", postgresql.UUID(as_uuid=True), nullable=True), - sa.Column("PointID", sa.String(length=10), nullable=False), + sa.Column("PointID", sa.String(length=50), nullable=False), sa.Column( "thing_id", sa.Integer(), sa.ForeignKey("thing.id", ondelete="CASCADE"), nullable=False, ), - sa.Column("StratTop", sa.Float(), nullable=True), - sa.Column("StratBottom", sa.Float(), nullable=True), - sa.Column("UnitIdentifier", sa.String(length=50), nullable=True), - sa.Column("Lithology", sa.String(length=100), nullable=True), - sa.Column("LithologicModifier", sa.String(length=250), nullable=True), - sa.Column("ContributingUnit", sa.String(length=10), nullable=True), - sa.Column("StratSource", sa.Text(), nullable=True), - sa.Column("StratNotes", sa.Text(), nullable=True), + sa.Column("StratTop", sa.SmallInteger(), nullable=False), + sa.Column("StratBottom", sa.SmallInteger(), nullable=False), + sa.Column("UnitIdentifier", sa.String(length=20), nullable=True), + sa.Column("Lithology", sa.String(length=4), nullable=True), + sa.Column("LithologicModifier", sa.String(length=255), nullable=True), + sa.Column("ContributingUnit", sa.String(length=2), nullable=True), + sa.Column("StratSource", sa.String(100), nullable=True), + sa.Column("StratNotes", sa.String(255), nullable=True), sa.Column("OBJECTID", sa.Integer(), nullable=True, unique=True), + sa.CheckConstraint( + 'char_length("PointID") > 0', + name="ck_nma_stratigraphy_pointid_len", + ), ) op.create_index( "ix_nma_stratigraphy_point_id", diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 3d58f182..5ea1337e 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -22,6 +22,7 @@ from sqlalchemy import ( Boolean, + CheckConstraint, Date, DateTime, Float, @@ -211,28 +212,36 @@ class NMA_Stratigraphy(Base): """Legacy stratigraphy (lithology log) data from AMPAPI.""" __tablename__ = "NMA_Stratigraphy" + __table_args__ = ( + CheckConstraint( + 'char_length("PointID") > 0', + name="ck_nma_stratigraphy_pointid_len", + ), + ) global_id: Mapped[uuid.UUID] = mapped_column( "GlobalID", UUID(as_uuid=True), primary_key=True ) well_id: Mapped[Optional[uuid.UUID]] = mapped_column("WellID", UUID(as_uuid=True)) - point_id: Mapped[str] = mapped_column("PointID", String(10), nullable=False) + point_id: Mapped[str] = mapped_column("PointID", String(50), nullable=False) thing_id: Mapped[int] = mapped_column( Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=False ) - strat_top: Mapped[Optional[float]] = mapped_column("StratTop", Float) - strat_bottom: Mapped[Optional[float]] = mapped_column("StratBottom", Float) - unit_identifier: Mapped[Optional[str]] = mapped_column("UnitIdentifier", String(50)) - lithology: Mapped[Optional[str]] = mapped_column("Lithology", String(100)) + strat_top: Mapped[int] = mapped_column("StratTop", SmallInteger, nullable=False) + strat_bottom: Mapped[int] = mapped_column( + "StratBottom", SmallInteger, nullable=False + ) + unit_identifier: Mapped[Optional[str]] = mapped_column("UnitIdentifier", String(20)) + lithology: Mapped[Optional[str]] = mapped_column("Lithology", String(4)) lithologic_modifier: Mapped[Optional[str]] = mapped_column( - "LithologicModifier", String(250) + "LithologicModifier", String(255) ) contributing_unit: Mapped[Optional[str]] = mapped_column( - "ContributingUnit", String(10) + "ContributingUnit", String(2) ) - strat_source: Mapped[Optional[str]] = mapped_column("StratSource", Text) - strat_notes: Mapped[Optional[str]] = mapped_column("StratNotes", Text) + strat_source: Mapped[Optional[str]] = mapped_column("StratSource", String(100)) + strat_notes: Mapped[Optional[str]] = mapped_column("StratNotes", String(255)) object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True) thing: Mapped["Thing"] = relationship("Thing", back_populates="stratigraphy_logs") From 970019f3f25c45d5f0ab6da6906fd369614b03ad Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 24 Jan 2026 20:03:08 +1100 Subject: [PATCH 3/7] feat: refactor waterlevels_transducer_transfer to use SQLAlchemy insert and update return type --- transfers/waterlevels_transducer_transfer.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/transfers/waterlevels_transducer_transfer.py b/transfers/waterlevels_transducer_transfer.py index 991ee5c9..27f6bde7 100644 --- a/transfers/waterlevels_transducer_transfer.py +++ b/transfers/waterlevels_transducer_transfer.py @@ -18,6 +18,7 @@ import pandas as pd from pandas import Timestamp from pydantic import ValidationError +from sqlalchemy import insert from sqlalchemy.exc import DatabaseError from sqlalchemy.orm import Session @@ -134,7 +135,11 @@ def _transfer_hook(self, session: Session) -> None: ] observations = [obs for obs in observations if obs is not None] - session.bulk_save_objects(observations) + if observations: + session.execute( + insert(TransducerObservation), + observations, + ) session.add(block) logger.info( f"Added {len(observations)} water levels {release_status} block" @@ -164,7 +169,7 @@ def _make_observation( release_status: str, deps_sorted: list, nodeployments: dict, - ) -> TransducerObservation | None: + ) -> dict | None: deployment = _find_deployment(row.DateMeasured, deps_sorted) if deployment is None: @@ -195,7 +200,7 @@ def _make_observation( payload ).model_dump() legacy_payload = self._legacy_payload(row) - return TransducerObservation(**obspayload, **legacy_payload) + return {**obspayload, **legacy_payload} except ValidationError as e: logger.critical(f"Observation validation error: {e.errors()}") From 052c1e2038483e8e6ddfa77c09def2e8860d98d4 Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 24 Jan 2026 23:10:12 +1100 Subject: [PATCH 4/7] feat: add high-volume transfer playbook to optimize SQLAlchemy data handling --- AGENTS.MD | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 AGENTS.MD diff --git a/AGENTS.MD b/AGENTS.MD new file mode 100644 index 00000000..3ffa0911 --- /dev/null +++ b/AGENTS.MD @@ -0,0 +1,24 @@ +# AGENTS: High-Volume Transfer Playbook + +This repo pushes millions of legacy rows through SQLAlchemy. When Codex or any other agent has to work on +these transfers, keep the following rules in mind to avoid hour-long runs: + +## 1. Skip ORM object construction once volume climbs +- **Do not call `session.bulk_save_objects`** for high frequency tables (e.g., transducer observations, + water-levels, chemistry results). It still instantiates every mapped class and kills throughput. +- Instead, build plain dictionaries/tuples and call `session.execute(insert(Model), data)` or the newer + SQLAlchemy `session.execute(stmt, execution_options={"synchronize_session": False})`. +- If validation is required (Pydantic models, bound schemas), validate first and dump to dicts before the + Core insert. + + +## 7. Running pytest safely +- Activate the repo virtualenv before testing: `source .venv/bin/activate` from the project root so all + dependencies (sqlalchemy, fastapi, etc.) are available. +- Load environment variables from `.env` so pytest sees the same DB creds the app uses. For quick shells: + `set -a; source .env; set +a`, or use `ENV_FILE=.env pytest ...` with `python-dotenv` installed. +- Many tests expect a running Postgres bound to the vars in `.env`; confirm `POSTGRES_*` values point to the + right instance before running destructive suites. +- When done, `deactivate` to exit the venv and avoid polluting other shells. + +Following this playbook keeps ETL runs measured in seconds/minutes instead of hours. EOF From 4d32a1397c5dd357da52a79bc402718df87cde5b Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 24 Jan 2026 23:14:28 +1100 Subject: [PATCH 5/7] feat: filter observation columns before inserting into TransducerObservation --- transfers/waterlevels_transducer_transfer.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/transfers/waterlevels_transducer_transfer.py b/transfers/waterlevels_transducer_transfer.py index 27f6bde7..fbfcf7cf 100644 --- a/transfers/waterlevels_transducer_transfer.py +++ b/transfers/waterlevels_transducer_transfer.py @@ -43,6 +43,9 @@ def __init__(self, *args, **kw): self.groundwater_parameter_id = get_groundwater_parameter_id() self._itertuples_field_map = {} self._df_columns = set() + self._observation_columns = { + column.key for column in TransducerObservation.__table__.columns + } if self._sensor_types is None: raise ValueError("_sensor_types must be set") if self._partition_field is None: @@ -136,9 +139,13 @@ def _transfer_hook(self, session: Session) -> None: observations = [obs for obs in observations if obs is not None] if observations: + filtered = [ + {k: v for k, v in obs.items() if k in self._observation_columns} + for obs in observations + ] session.execute( insert(TransducerObservation), - observations, + filtered, ) session.add(block) logger.info( From 844cd0d3df80f2d75bae8549fc744841dccd45de Mon Sep 17 00:00:00 2001 From: Jake Ross Date: Sat, 24 Jan 2026 23:32:59 +1100 Subject: [PATCH 6/7] Update AGENTS.MD Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- AGENTS.MD | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/AGENTS.MD b/AGENTS.MD index 3ffa0911..f4812ee3 100644 --- a/AGENTS.MD +++ b/AGENTS.MD @@ -12,7 +12,7 @@ these transfers, keep the following rules in mind to avoid hour-long runs: Core insert. -## 7. Running pytest safely +## 2. Running pytest safely - Activate the repo virtualenv before testing: `source .venv/bin/activate` from the project root so all dependencies (sqlalchemy, fastapi, etc.) are available. - Load environment variables from `.env` so pytest sees the same DB creds the app uses. For quick shells: From 3a5736410eb9a655371eb4f720365736dfaf8199 Mon Sep 17 00:00:00 2001 From: Jake Ross Date: Sat, 24 Jan 2026 23:42:15 +1100 Subject: [PATCH 7/7] Update transfers/waterlevels_transducer_transfer.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- transfers/waterlevels_transducer_transfer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transfers/waterlevels_transducer_transfer.py b/transfers/waterlevels_transducer_transfer.py index fbfcf7cf..d96b11d8 100644 --- a/transfers/waterlevels_transducer_transfer.py +++ b/transfers/waterlevels_transducer_transfer.py @@ -139,13 +139,13 @@ def _transfer_hook(self, session: Session) -> None: observations = [obs for obs in observations if obs is not None] if observations: - filtered = [ + filtered_observations = [ {k: v for k, v in obs.items() if k in self._observation_columns} for obs in observations ] session.execute( insert(TransducerObservation), - filtered, + filtered_observations, ) session.add(block) logger.info(