diff --git a/README.md b/README.md index 5c63b864..05a363c3 100644 --- a/README.md +++ b/README.md @@ -142,5 +142,8 @@ context-use-data/ | Google | Available | Searches, YouTube, Shopping, Lens, Discover | [Export your data](https://support.google.com/accounts/answer/3024190) | | Netflix | Available | Viewing Activity, Search History, Ratings, My List, Messages, Preferences | [Export your data](https://help.netflix.com/en/node/100624) | | Airbnb | Available | Wishlists, Search History, Reviews, Reservations, Messages | [Export your data](https://www.airbnb.com/help/article/2273) | +| Amex | Available | Transactions | Manual CSV download from Amex | +| Barclays | Available | Transactions | Manual CSV download from Barclays | +| Revolut | Available | Transactions | Manual CSV download from Revolut | Want another provider? Contribute it by pointing your coding agent to the [Adding a Data Provider](docs/add-provider/AGENTS.md) guide. diff --git a/context_use/cli/bank_setup.py b/context_use/cli/bank_setup.py new file mode 100644 index 00000000..241cf9ae --- /dev/null +++ b/context_use/cli/bank_setup.py @@ -0,0 +1,192 @@ +"""Interactive column-mapping setup for the generic bank provider. + +Reads CSV headers from a zip archive and walks the user through mapping +columns to the fields required by :class:`BankMapping`. +""" + +from __future__ import annotations + +import csv +import io +import zipfile + +from context_use.cli import output as out +from context_use.providers.bank.mapping import AmountColumns, BankMapping + + +def _read_csv_headers(zip_path: str) -> tuple[str, list[str]] | None: + """Extract headers from the first CSV found inside *zip_path*. + + Returns ``(csv_filename, headers)`` or ``None`` if no CSV is found. + """ + with zipfile.ZipFile(zip_path) as zf: + for name in sorted(zf.namelist()): + if name.startswith("__MACOSX"): + continue + if name.lower().endswith(".csv"): + with zf.open(name) as f: + reader = csv.reader(io.TextIOWrapper(f, encoding="utf-8")) + try: + headers = next(reader) + except StopIteration: + continue + return name, [h.strip() for h in headers] + return None + + +def _pick_column(headers: list[str], prompt_text: str) -> str | None: + """Let the user pick one column by number.""" + choice = input(prompt_text).strip() + try: + idx = int(choice) - 1 + if 0 <= idx < len(headers): + return headers[idx] + except ValueError: + pass + out.error("Invalid choice.") + return None + + +def _print_columns(headers: list[str]) -> None: + for i, h in enumerate(headers, 1): + print(f" {out.bold(str(i))}. {h}") + print() + + +def _ask_amount_columns(headers: list[str]) -> AmountColumns | None: + """Ask whether amount is a single column or split into in/out.""" + out.header("Amount columns") + print() + print(f" {out.bold('1')}. Single column (e.g. +100 / -50)") + print(f" {out.bold('2')}. Separate columns for money in / money out") + print() + + mode = input(" Amount format? [1-2]: ").strip() + + if mode == "1": + print() + out.info("Which column contains the transaction amount?") + print() + _print_columns(headers) + col = _pick_column(headers, f" Amount column [1-{len(headers)}]: ") + if col is None: + return None + return AmountColumns(single=col) + + if mode == "2": + print() + out.info("Which column contains money IN (credits/deposits)?") + print() + _print_columns(headers) + col_in = _pick_column(headers, f" Money-in column [1-{len(headers)}]: ") + if col_in is None: + return None + + print() + out.info("Which column contains money OUT (debits/payments)?") + print() + _print_columns(headers) + col_out = _pick_column(headers, f" Money-out column [1-{len(headers)}]: ") + if col_out is None: + return None + + return AmountColumns(money_in=col_in, money_out=col_out) + + out.error("Invalid choice.") + return None + + +def _ask_yes_no(prompt_text: str, *, default: bool = False) -> bool: + hint = "Y/n" if default else "y/N" + answer = input(f" {prompt_text} [{hint}]: ").strip().lower() + if not answer: + return default + return answer in ("y", "yes") + + +def run_bank_setup(zip_path: str) -> BankMapping | None: + """Run the full interactive bank CSV setup. + + Returns a :class:`BankMapping` or ``None`` if the user aborts. + """ + result = _read_csv_headers(zip_path) + if result is None: + out.error("No CSV files found in the archive.") + return None + + csv_filename, headers = result + if not headers: + out.error(f"CSV file {csv_filename} has no columns.") + return None + + out.header("Bank CSV setup") + out.kv("File", csv_filename) + print() + + out.info("CSV columns found:") + print() + _print_columns(headers) + + bank_name = input(" Bank name (e.g. Chase, HSBC): ").strip() + if not bank_name: + out.error("Bank name is required.") + return None + + print() + out.info("Which column contains the transaction date?") + print() + _print_columns(headers) + date_col = _pick_column(headers, f" Date column [1-{len(headers)}]: ") + if date_col is None: + return None + + print() + amount = _ask_amount_columns(headers) + if amount is None: + return None + + print() + out.info("Which column contains the transaction description?") + print() + _print_columns(headers) + desc_col = _pick_column(headers, f" Description column [1-{len(headers)}]: ") + if desc_col is None: + return None + + print() + is_credit_card = _ask_yes_no( + "Is this a credit card? (charges shown as positive amounts)" + ) + + print() + currency = input(" Currency code (e.g. GBP, USD, EUR): ").strip().upper() + if not currency: + out.error("Currency is required.") + return None + + print() + out.header("Mapping summary") + out.kv("Bank", bank_name) + out.kv("Date column", date_col) + if amount.single: + out.kv("Amount column", amount.single) + else: + out.kv("Money-in column", amount.money_in) + out.kv("Money-out column", amount.money_out) + out.kv("Description column", desc_col) + out.kv("Credit card", "yes" if is_credit_card else "no") + out.kv("Currency", currency) + print() + + if not _ask_yes_no("Proceed with this mapping?", default=True): + out.warn("Aborted.") + return None + + return BankMapping( + bank_name=bank_name, + date_column=date_col, + amount=amount, + description_column=desc_col, + currency=currency, + is_credit_card=is_credit_card, + ) diff --git a/context_use/cli/commands/ingest.py b/context_use/cli/commands/ingest.py index e573eed2..48b88756 100644 --- a/context_use/cli/commands/ingest.py +++ b/context_use/cli/commands/ingest.py @@ -1,6 +1,7 @@ from __future__ import annotations import argparse +from collections.abc import Callable from typing import TYPE_CHECKING from context_use.cli import output as out @@ -14,6 +15,7 @@ if TYPE_CHECKING: from context_use import ContextUse + from context_use.etl.core.pipe import Pipe class IngestCommand(ContextCommand): @@ -38,13 +40,29 @@ async def run( return provider_str, zip_path = picked + pipe_factory: Callable[[type[Pipe]], Pipe] | None = None + if provider_str == "bank": + from context_use.cli.bank_setup import run_bank_setup + from context_use.providers.bank.generic_pipe import GenericBankPipe + + mapping = run_bank_setup(zip_path) + if mapping is None: + return + + def _bank_factory(pipe_cls: type[Pipe]) -> Pipe: + return GenericBankPipe(mapping=mapping) + + pipe_factory = _bank_factory + print() out.header(f"Ingesting {provider_str} archive") out.kv("File", zip_path) out.kv("Provider", provider_str) print() - result = await ctx.process_archive(provider_str, zip_path) + result = await ctx.process_archive( + provider_str, zip_path, pipe_factory=pipe_factory + ) out.success("Archive processed") out.kv("Archive ID", result.archive_id) diff --git a/context_use/core.py b/context_use/core.py index 6d884371..fab2e97a 100644 --- a/context_use/core.py +++ b/context_use/core.py @@ -3,6 +3,7 @@ import logging import zipfile from collections import defaultdict +from collections.abc import Callable from pathlib import PurePosixPath from typing import TYPE_CHECKING @@ -35,6 +36,7 @@ from datetime import date, datetime from typing import Any + from context_use.etl.core.pipe import Pipe from context_use.llm.litellm.clients import LiteLLMBase from context_use.storage.base import StorageBackend from context_use.store.base import Store @@ -92,6 +94,7 @@ async def process_archive( self, provider: str, path: str, + pipe_factory: Callable[[type[Pipe]], Pipe] | None = None, ) -> PipelineResult: """Unzip, discover, and run ETL for the given archive. @@ -151,7 +154,7 @@ async def process_archive( for task_model in task_models: try: pipe_cls = provider_cfg.get_pipe(task_model.interaction_type) - pipe = pipe_cls() + pipe = pipe_factory(pipe_cls) if pipe_factory else pipe_cls() count = await self._run_pipe(pipe, task_model) task_model.status = EtlTaskStatus.COMPLETED.value diff --git a/context_use/etl/payload/models.py b/context_use/etl/payload/models.py index 169642de..56aa033b 100644 --- a/context_use/etl/payload/models.py +++ b/context_use/etl/payload/models.py @@ -435,6 +435,33 @@ def _get_preview(self, provider: str | None) -> str | None: return parts +class FibreTransaction(Create, _BaseFibreMixin): + fibreKind: Literal["Transaction"] = Field("Transaction", alias="fibre_kind") + object: Note # type: ignore[reportIncompatibleVariableOverride, reportGeneralTypeIssues] + actor: Person | None = None # type: ignore[reportIncompatibleVariableOverride] + + def _get_preview(self, provider: str | None) -> str | None: + amount = str(self.object.name) if self.object.name else "" + desc = str(self.object.content) if self.object.content else "" + if amount.startswith("-"): + verb, prep = "Spent", "at" + display_amount = amount[1:] + elif amount.startswith("+"): + verb, prep = "Received", "from" + display_amount = amount[1:] + else: + verb, prep = "Transacted", "at" + display_amount = amount + parts = f"{verb} {display_amount}" + if desc: + parts += f" {prep} {desc}" + if self.actor and self.actor.name: + parts += f" (by {self.actor.name})" + if provider: + parts += f" via {provider}" + return parts + + # --- Discriminated unions --- FibreReactionByType = Annotated[ @@ -455,7 +482,8 @@ def _get_preview(self, provider: str | None) -> str | None: | FibreCollection | FibreSendMessage | FibreReceiveMessage - | FibreComment, + | FibreComment + | FibreTransaction, Field(discriminator="fibreKind"), ] @@ -484,5 +512,6 @@ def _get_preview(self, provider: str | None) -> str | None: FibreComment.model_rebuild() FibreSearch.model_rebuild() FibreAddObjectToCollection.model_rebuild() +FibreTransaction.model_rebuild() FibreFollowedBy.model_rebuild() FibreFollowing.model_rebuild() diff --git a/context_use/providers/__init__.py b/context_use/providers/__init__.py index 72b5f799..71f80810 100644 --- a/context_use/providers/__init__.py +++ b/context_use/providers/__init__.py @@ -1,10 +1,14 @@ from context_use.providers import ( # noqa: F401 — triggers provider registration airbnb, + amex, + bank, + barclays, chatgpt, claude, google, instagram, netflix, + revolut, ) from context_use.providers.registry import ( get_memory_config, diff --git a/context_use/providers/amex/__init__.py b/context_use/providers/amex/__init__.py new file mode 100644 index 00000000..cb360d7e --- /dev/null +++ b/context_use/providers/amex/__init__.py @@ -0,0 +1,8 @@ +from context_use.providers.amex import transactions +from context_use.providers.registry import register_provider + +PROVIDER = "amex" + +register_provider(PROVIDER, modules=[transactions]) + +__all__ = ["PROVIDER"] diff --git a/context_use/providers/amex/transactions/__init__.py b/context_use/providers/amex/transactions/__init__.py new file mode 100644 index 00000000..1aee99e3 --- /dev/null +++ b/context_use/providers/amex/transactions/__init__.py @@ -0,0 +1,3 @@ +from context_use.providers.amex.transactions.pipe import AmexTransactionsPipe + +__all__ = ["AmexTransactionsPipe"] diff --git a/context_use/providers/amex/transactions/pipe.py b/context_use/providers/amex/transactions/pipe.py new file mode 100644 index 00000000..c7a92673 --- /dev/null +++ b/context_use/providers/amex/transactions/pipe.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +import csv +import io +import json +from collections.abc import Iterator + +from context_use.providers.amex.transactions.schemas import Model +from context_use.providers.bank.pipe import _BankTransactionPipe +from context_use.providers.bank.record import BankTransactionRecord +from context_use.providers.bank.transaction_types import FALLBACK_INTERACTION_TYPE +from context_use.providers.registry import declare_interaction +from context_use.providers.types import InteractionConfig +from context_use.storage.base import StorageBackend + +PROVIDER = "amex" + + +class AmexTransactionsPipe(_BankTransactionPipe): + provider = PROVIDER + interaction_type = FALLBACK_INTERACTION_TYPE + archive_path_pattern = "*/amex/*.csv" + display_name = "Amex" + currency = "GBP" + + def extract_file( + self, + source_uri: str, + storage: StorageBackend, + ) -> Iterator[BankTransactionRecord]: + stream = storage.open_stream(source_uri) + try: + reader = csv.DictReader(io.TextIOWrapper(stream, encoding="utf-8")) + for raw_row in reader: + row = Model.model_validate(raw_row) + if row.CR == "CR": + amount = f"+{row.Amount}" + transaction_type = "Payment" + else: + amount = f"-{row.Amount}" + transaction_type = "Card Payment" + foreign_amount = row.Foreign_Spend_Amount or None + foreign_currency = row.Foreign_Spend_Currency or None + yield BankTransactionRecord( + date=row.Process_Date, + authorized_date=row.Transaction_Date, + amount=amount, + currency=self.currency, + description=row.Description, + merchant_name=row.Description, + account_owner=row.Cardmember, + transaction_type=transaction_type, + foreign_amount=foreign_amount, + foreign_currency=foreign_currency, + source=json.dumps(raw_row), + ) + finally: + stream.close() + + +declare_interaction(InteractionConfig(pipe=AmexTransactionsPipe)) diff --git a/context_use/providers/amex/transactions/schema.json b/context_use/providers/amex/transactions/schema.json new file mode 100644 index 00000000..01b4873d --- /dev/null +++ b/context_use/providers/amex/transactions/schema.json @@ -0,0 +1,40 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "Cardmember": { + "type": "string" + }, + "Transaction Date": { + "type": "string" + }, + "Process Date": { + "type": "string" + }, + "Description": { + "type": "string" + }, + "Foreign Spend Amount": { + "type": "string" + }, + "Foreign Spend Currency": { + "type": "string" + }, + "Amount": { + "type": "string" + }, + "CR": { + "type": "string" + } + }, + "required": [ + "Amount", + "CR", + "Cardmember", + "Description", + "Foreign Spend Amount", + "Foreign Spend Currency", + "Process Date", + "Transaction Date" + ] +} diff --git a/context_use/providers/amex/transactions/schemas.py b/context_use/providers/amex/transactions/schemas.py new file mode 100644 index 00000000..51da8ebd --- /dev/null +++ b/context_use/providers/amex/transactions/schemas.py @@ -0,0 +1,18 @@ +# generated by datamodel-codegen: +# filename: schema.json +# timestamp: 2026-03-21T18:32:40+00:00 + +from __future__ import annotations + +from pydantic import BaseModel, Field + + +class Model(BaseModel): + Cardmember: str + Transaction_Date: str = Field(..., alias="Transaction Date") + Process_Date: str = Field(..., alias="Process Date") + Description: str + Foreign_Spend_Amount: str = Field(..., alias="Foreign Spend Amount") + Foreign_Spend_Currency: str = Field(..., alias="Foreign Spend Currency") + Amount: str + CR: str diff --git a/context_use/providers/bank/__init__.py b/context_use/providers/bank/__init__.py new file mode 100644 index 00000000..49f3e624 --- /dev/null +++ b/context_use/providers/bank/__init__.py @@ -0,0 +1,4 @@ +from context_use.providers.bank import generic_pipe # noqa: F401 +from context_use.providers.registry import register_provider + +register_provider("bank", modules=[generic_pipe]) diff --git a/context_use/providers/bank/generic_pipe.py b/context_use/providers/bank/generic_pipe.py new file mode 100644 index 00000000..203a6922 --- /dev/null +++ b/context_use/providers/bank/generic_pipe.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +import csv +import io +import json +from collections.abc import Iterator + +from context_use.providers.bank.mapping import AmountColumns, BankMapping +from context_use.providers.bank.pipe import _BankTransactionPipe +from context_use.providers.bank.record import BankTransactionRecord +from context_use.providers.bank.transaction_types import FALLBACK_INTERACTION_TYPE +from context_use.providers.registry import declare_interaction +from context_use.providers.types import InteractionConfig +from context_use.storage.base import StorageBackend + +PROVIDER = "bank" + + +def _resolve_amount( + raw_row: dict[str, str], + amount_cfg: AmountColumns, + *, + is_credit_card: bool, +) -> str | None: + """Derive a signed amount string from the CSV row. + + Returns ``None`` when the row has no usable amount (e.g. both + money-in and money-out are empty). + """ + if amount_cfg.single: + raw = raw_row.get(amount_cfg.single, "").strip() + if not raw: + return None + if is_credit_card: + try: + val = float(raw) + return str(-val) + except ValueError: + return raw + return raw + + assert amount_cfg.money_in is not None + assert amount_cfg.money_out is not None + money_in = raw_row.get(amount_cfg.money_in, "").strip() + money_out = raw_row.get(amount_cfg.money_out, "").strip() + if money_out: + return f"-{money_out.lstrip('-')}" + if money_in: + return f"+{money_in.lstrip('+')}" + return None + + +class GenericBankPipe(_BankTransactionPipe): + provider = PROVIDER + interaction_type = FALLBACK_INTERACTION_TYPE + archive_version = 1 + archive_path_pattern = "*/bank/*.csv" + display_name = "Bank" + + def __init__(self, mapping: BankMapping | None = None) -> None: + super().__init__() + self._mapping = mapping + if mapping: + self.display_name = mapping.bank_name + + def extract_file( + self, + source_uri: str, + storage: StorageBackend, + ) -> Iterator[BankTransactionRecord]: + if self._mapping is None: + raise RuntimeError( + "GenericBankPipe requires a BankMapping; " + "use the interactive 'context-use ingest' flow to configure one." + ) + mapping = self._mapping + stream = storage.open_stream(source_uri) + try: + reader = csv.DictReader(io.TextIOWrapper(stream, encoding="utf-8")) + for raw_row in reader: + amount = _resolve_amount( + raw_row, + mapping.amount, + is_credit_card=mapping.is_credit_card, + ) + if amount is None: + continue + + date = raw_row.get(mapping.date_column, "").strip() + if not date: + continue + + description = raw_row.get(mapping.description_column, "").strip() + + yield BankTransactionRecord( + date=date, + amount=amount, + currency=mapping.currency, + description=description, + merchant_name=description or None, + source=json.dumps(raw_row), + ) + finally: + stream.close() + + +declare_interaction(InteractionConfig(pipe=GenericBankPipe)) diff --git a/context_use/providers/bank/mapping.py b/context_use/providers/bank/mapping.py new file mode 100644 index 00000000..831b930f --- /dev/null +++ b/context_use/providers/bank/mapping.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class AmountColumns: + """Describes how to read the transaction amount from a CSV row. + + Either a single column with signed values, or separate columns for + money in / money out. + """ + + single: str | None = None + money_in: str | None = None + money_out: str | None = None + + def __post_init__(self) -> None: + has_single = self.single is not None + has_split = self.money_in is not None or self.money_out is not None + if has_single == has_split: + raise ValueError( + "Specify either 'single' or 'money_in'/'money_out', not both" + ) + if has_split and (self.money_in is None or self.money_out is None): + raise ValueError( + "Both 'money_in' and 'money_out' must be provided together" + ) + + +@dataclass(frozen=True) +class BankMapping: + """Runtime configuration for the generic bank CSV pipe. + + Captures the user's interactive column-mapping choices so + ``GenericBankPipe`` can extract ``BankTransactionRecord`` from any CSV. + """ + + bank_name: str + date_column: str + amount: AmountColumns + description_column: str + currency: str + is_credit_card: bool = False diff --git a/context_use/providers/bank/pipe.py b/context_use/providers/bank/pipe.py new file mode 100644 index 00000000..a6aff5b4 --- /dev/null +++ b/context_use/providers/bank/pipe.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from datetime import UTC, datetime + +from context_use.activitystreams.actors import Person +from context_use.activitystreams.objects import Note +from context_use.etl.core.pipe import Pipe +from context_use.etl.core.types import ThreadRow +from context_use.etl.payload.models import ( + CURRENT_THREAD_PAYLOAD_VERSION, + FibreTransaction, +) +from context_use.models.etl_task import EtlTask +from context_use.providers.bank.record import BankTransactionRecord +from context_use.providers.bank.transaction_types import normalize_transaction_type + + +def _parse_date(value: str) -> datetime: + for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"): + try: + return datetime.strptime(value, fmt).replace(tzinfo=UTC) + except ValueError: + continue + raise ValueError(f"Cannot parse date: {value!r}") + + +class _BankTransactionPipe(Pipe[BankTransactionRecord]): + archive_version = 1 + record_schema = BankTransactionRecord + display_name: str + + def transform( + self, + record: BankTransactionRecord, + task: EtlTask, + ) -> ThreadRow: + published = _parse_date(record.date) + note = Note( # type: ignore[reportCallIssue] + name=record.amount, + content=record.merchant_name or record.description, + published=published, + ) + actor = Person(name=record.account_owner) if record.account_owner else None # type: ignore[reportCallIssue] + payload = FibreTransaction( # type: ignore[reportCallIssue] + object=note, + published=published, + actor=actor, + ) + + return ThreadRow( + unique_key=payload.unique_key(), + provider=self.provider, + interaction_type=normalize_transaction_type(record.transaction_type), + preview=payload.get_preview(self.display_name) or "", + payload=payload.to_dict(), + version=CURRENT_THREAD_PAYLOAD_VERSION, + asat=published, + source=record.source, + ) diff --git a/context_use/providers/bank/record.py b/context_use/providers/bank/record.py new file mode 100644 index 00000000..ecc59625 --- /dev/null +++ b/context_use/providers/bank/record.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from pydantic import BaseModel + + +class BankTransactionRecord(BaseModel): + date: str + authorized_date: str | None = None + amount: str + currency: str + description: str + merchant_name: str | None = None + transaction_type: str | None = None + payment_channel: str | None = None + pending: bool = False + account_owner: str | None = None + foreign_amount: str | None = None + foreign_currency: str | None = None + source: str | None = None diff --git a/context_use/providers/bank/transaction_types.py b/context_use/providers/bank/transaction_types.py new file mode 100644 index 00000000..e961e630 --- /dev/null +++ b/context_use/providers/bank/transaction_types.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +_TRANSACTION_TYPE_MAP: dict[str, str] = { + "Card Payment": "purchase", + "Card Purchase": "purchase", + "Direct Debit": "direct_debit", + "Standing Order": "standing_order", + "Transfer": "transfer", + "Bank Transfer": "transfer", + "Cash Withdrawal": "cash", + "Cheque": "cheque", + "Interest": "interest", + "Payment": "payment", +} + +FALLBACK_INTERACTION_TYPE = "transaction" + + +def normalize_transaction_type(raw: str | None) -> str: + if raw is None: + return FALLBACK_INTERACTION_TYPE + return _TRANSACTION_TYPE_MAP.get(raw, FALLBACK_INTERACTION_TYPE) diff --git a/context_use/providers/barclays/__init__.py b/context_use/providers/barclays/__init__.py new file mode 100644 index 00000000..ece7620e --- /dev/null +++ b/context_use/providers/barclays/__init__.py @@ -0,0 +1,8 @@ +from context_use.providers.barclays import transactions +from context_use.providers.registry import register_provider + +PROVIDER = "barclays" + +register_provider(PROVIDER, modules=[transactions]) + +__all__ = ["PROVIDER"] diff --git a/context_use/providers/barclays/transactions/__init__.py b/context_use/providers/barclays/transactions/__init__.py new file mode 100644 index 00000000..bb607dec --- /dev/null +++ b/context_use/providers/barclays/transactions/__init__.py @@ -0,0 +1,3 @@ +from context_use.providers.barclays.transactions.pipe import BarclaysTransactionsPipe + +__all__ = ["BarclaysTransactionsPipe"] diff --git a/context_use/providers/barclays/transactions/pipe.py b/context_use/providers/barclays/transactions/pipe.py new file mode 100644 index 00000000..981a7c02 --- /dev/null +++ b/context_use/providers/barclays/transactions/pipe.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +import csv +import io +import json +import re +from collections.abc import Iterator + +from context_use.providers.bank.pipe import _BankTransactionPipe +from context_use.providers.bank.record import BankTransactionRecord +from context_use.providers.bank.transaction_types import FALLBACK_INTERACTION_TYPE +from context_use.providers.barclays.transactions.schemas import Model +from context_use.providers.registry import declare_interaction +from context_use.providers.types import InteractionConfig +from context_use.storage.base import StorageBackend + +PROVIDER = "barclays" + +_TYPE_PREFIXES = [ + "Direct Debit", + "Card Purchase", + "Standing Order", + "Bank Transfer", + "Cash Withdrawal", + "Cheque", + "Interest", +] + +_TYPE_PATTERN = re.compile( + r"^(" + "|".join(re.escape(p) for p in _TYPE_PREFIXES) + r")\b" +) + + +def _extract_transaction_type(description: str) -> str | None: + m = _TYPE_PATTERN.match(description) + return m.group(1) if m else None + + +class BarclaysTransactionsPipe(_BankTransactionPipe): + provider = PROVIDER + interaction_type = FALLBACK_INTERACTION_TYPE + archive_path_pattern = "*/barclays/*.csv" + display_name = "Barclays" + currency = "GBP" + + def extract_file( + self, + source_uri: str, + storage: StorageBackend, + ) -> Iterator[BankTransactionRecord]: + stream = storage.open_stream(source_uri) + try: + reader = csv.DictReader(io.TextIOWrapper(stream, encoding="utf-8")) + for raw_row in reader: + row = Model.model_validate(raw_row) + if row.Money_Out: + amount = f"-{row.Money_Out}" + elif row.Money_In: + amount = f"+{row.Money_In}" + else: + continue + yield BankTransactionRecord( + date=row.Date, + amount=amount, + currency=self.currency, + description=row.Description, + transaction_type=_extract_transaction_type(row.Description), + source=json.dumps(raw_row), + ) + finally: + stream.close() + + +declare_interaction(InteractionConfig(pipe=BarclaysTransactionsPipe)) diff --git a/context_use/providers/barclays/transactions/schema.json b/context_use/providers/barclays/transactions/schema.json new file mode 100644 index 00000000..a5e34165 --- /dev/null +++ b/context_use/providers/barclays/transactions/schema.json @@ -0,0 +1,28 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "Date": { + "type": "string" + }, + "Description": { + "type": "string" + }, + "Money Out": { + "type": "string" + }, + "Money In": { + "type": "string" + }, + "Balance": { + "type": "string" + } + }, + "required": [ + "Balance", + "Date", + "Description", + "Money In", + "Money Out" + ] +} diff --git a/context_use/providers/barclays/transactions/schemas.py b/context_use/providers/barclays/transactions/schemas.py new file mode 100644 index 00000000..35612f9b --- /dev/null +++ b/context_use/providers/barclays/transactions/schemas.py @@ -0,0 +1,15 @@ +# generated by datamodel-codegen: +# filename: schema.json +# timestamp: 2026-03-21T18:32:42+00:00 + +from __future__ import annotations + +from pydantic import BaseModel, Field + + +class Model(BaseModel): + Date: str + Description: str + Money_Out: str = Field(..., alias="Money Out") + Money_In: str = Field(..., alias="Money In") + Balance: str diff --git a/context_use/providers/revolut/__init__.py b/context_use/providers/revolut/__init__.py new file mode 100644 index 00000000..8e57a29b --- /dev/null +++ b/context_use/providers/revolut/__init__.py @@ -0,0 +1,8 @@ +from context_use.providers.registry import register_provider +from context_use.providers.revolut import transactions + +PROVIDER = "revolut" + +register_provider(PROVIDER, modules=[transactions]) + +__all__ = ["PROVIDER"] diff --git a/context_use/providers/revolut/transactions/__init__.py b/context_use/providers/revolut/transactions/__init__.py new file mode 100644 index 00000000..7f786f10 --- /dev/null +++ b/context_use/providers/revolut/transactions/__init__.py @@ -0,0 +1,3 @@ +from context_use.providers.revolut.transactions.pipe import RevolutTransactionsPipe + +__all__ = ["RevolutTransactionsPipe"] diff --git a/context_use/providers/revolut/transactions/pipe.py b/context_use/providers/revolut/transactions/pipe.py new file mode 100644 index 00000000..1fb6014d --- /dev/null +++ b/context_use/providers/revolut/transactions/pipe.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +import csv +import io +import json +from collections.abc import Iterator +from datetime import datetime + +from context_use.providers.bank.pipe import _BankTransactionPipe +from context_use.providers.bank.record import BankTransactionRecord +from context_use.providers.bank.transaction_types import FALLBACK_INTERACTION_TYPE +from context_use.providers.registry import declare_interaction +from context_use.providers.revolut.transactions.schemas import Model +from context_use.providers.types import InteractionConfig +from context_use.storage.base import StorageBackend + +PROVIDER = "revolut" + + +def _parse_revolut_date(value: str) -> str: + for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"): + try: + dt = datetime.strptime(value, fmt) + return dt.strftime("%Y-%m-%d") + except ValueError: + continue + raise ValueError(f"Cannot parse Revolut date: {value!r}") + + +class RevolutTransactionsPipe(_BankTransactionPipe): + provider = PROVIDER + interaction_type = FALLBACK_INTERACTION_TYPE + archive_path_pattern = "*/revolut/*.csv" + display_name = "Revolut" + + def extract_file( + self, + source_uri: str, + storage: StorageBackend, + ) -> Iterator[BankTransactionRecord]: + stream = storage.open_stream(source_uri) + try: + reader = csv.DictReader(io.TextIOWrapper(stream, encoding="utf-8")) + for raw_row in reader: + row = Model.model_validate(raw_row) + if row.State == "REVERTED": + continue + authorized_date = ( + _parse_revolut_date(row.Started_Date) if row.Started_Date else None + ) + date = ( + _parse_revolut_date(row.Completed_Date) + if row.Completed_Date + else _parse_revolut_date(row.Started_Date) + ) + payment_channel = "in_store" if row.Type == "Card Payment" else None + yield BankTransactionRecord( + date=date, + authorized_date=authorized_date, + amount=row.Amount, + currency=row.Currency, + description=row.Description, + merchant_name=row.Description, + transaction_type=row.Type, + payment_channel=payment_channel, + source=json.dumps(raw_row), + ) + finally: + stream.close() + + +declare_interaction(InteractionConfig(pipe=RevolutTransactionsPipe)) diff --git a/context_use/providers/revolut/transactions/schema.json b/context_use/providers/revolut/transactions/schema.json new file mode 100644 index 00000000..cc90314c --- /dev/null +++ b/context_use/providers/revolut/transactions/schema.json @@ -0,0 +1,46 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "Type": { + "type": "string" + }, + "Product": { + "type": "string" + }, + "Started Date": { + "type": "string" + }, + "Completed Date": { + "type": "string" + }, + "Description": { + "type": "string" + }, + "Amount": { + "type": "string" + }, + "Fee": { + "type": "string" + }, + "Currency": { + "type": "string" + }, + "State": { + "type": "string" + }, + "Balance": { + "type": "string" + } + }, + "required": [ + "Amount", + "Currency", + "Description", + "Fee", + "Product", + "Started Date", + "State", + "Type" + ] +} diff --git a/context_use/providers/revolut/transactions/schemas.py b/context_use/providers/revolut/transactions/schemas.py new file mode 100644 index 00000000..3672f64e --- /dev/null +++ b/context_use/providers/revolut/transactions/schemas.py @@ -0,0 +1,20 @@ +# generated by datamodel-codegen: +# filename: schema.json +# timestamp: 2026-03-21T18:32:42+00:00 + +from __future__ import annotations + +from pydantic import BaseModel, Field + + +class Model(BaseModel): + Type: str + Product: str + Started_Date: str = Field(..., alias="Started Date") + Completed_Date: str | None = Field(None, alias="Completed Date") + Description: str + Amount: str + Fee: str + Currency: str + State: str + Balance: str | None = None diff --git a/context_use/testing/pipe_test_kit.py b/context_use/testing/pipe_test_kit.py index 3b7efb8c..db542c5e 100644 --- a/context_use/testing/pipe_test_kit.py +++ b/context_use/testing/pipe_test_kit.py @@ -41,6 +41,7 @@ class TransformConformanceTests: pipe_class: ClassVar[type[Pipe]] expected_transform_count: ClassVar[int] expected_fibre_kind: ClassVar[str | None] + per_record_interaction_type: ClassVar[bool] = False def test_run_yields_well_formed_thread_rows( self, transformed_rows: list[ThreadRow] @@ -54,11 +55,14 @@ def test_run_yields_well_formed_thread_rows( assert row.provider == self.pipe_class.provider, ( f"provider mismatch: {row.provider!r} != {self.pipe_class.provider!r}" ) - assert row.interaction_type == self.pipe_class.interaction_type, ( - f"interaction_type mismatch: " - f"{row.interaction_type!r} " - f"!= {self.pipe_class.interaction_type!r}" - ) + if self.per_record_interaction_type: + assert row.interaction_type, "interaction_type must be set" + else: + assert row.interaction_type == self.pipe_class.interaction_type, ( + f"interaction_type mismatch: " + f"{row.interaction_type!r} " + f"!= {self.pipe_class.interaction_type!r}" + ) assert row.version, "ThreadRow.version must be set" assert row.asat is not None, "ThreadRow.asat must be set" diff --git a/tests/fixtures/users/alice/bank/v1/amex/statement.csv b/tests/fixtures/users/alice/bank/v1/amex/statement.csv new file mode 100644 index 00000000..3ff24ef9 --- /dev/null +++ b/tests/fixtures/users/alice/bank/v1/amex/statement.csv @@ -0,0 +1,4 @@ +Cardmember,Transaction Date,Process Date,Description,Foreign Spend Amount,Foreign Spend Currency,Amount,CR +Alice Smith,2025-11-01,2025-11-01,PAYMENT RECEIVED - THANK YOU,,,500.00,CR +Alice Smith,2025-11-02,2025-11-03,RESTAURANT LONDON,,,35.00, +Bob Smith,2025-11-04,2025-11-05,ONLINE STORE,12.50,USD,10.00, diff --git a/tests/fixtures/users/alice/bank/v1/barclays/statement.csv b/tests/fixtures/users/alice/bank/v1/barclays/statement.csv new file mode 100644 index 00000000..661b15d1 --- /dev/null +++ b/tests/fixtures/users/alice/bank/v1/barclays/statement.csv @@ -0,0 +1,4 @@ +Date,Description,Money Out,Money In,Balance +2025-11-01,Direct Debit to Energy Co,150.00,,850.00 +2025-11-02,Card Purchase to Supermarket,45.00,,805.00 +2025-11-03,Bank Transfer from Employer,,2000.00,2805.00 diff --git a/tests/fixtures/users/alice/bank/v1/revolut/account-statement.csv b/tests/fixtures/users/alice/bank/v1/revolut/account-statement.csv new file mode 100644 index 00000000..c5a57781 --- /dev/null +++ b/tests/fixtures/users/alice/bank/v1/revolut/account-statement.csv @@ -0,0 +1,4 @@ +Type,Product,Started Date,Completed Date,Description,Amount,Fee,Currency,State,Balance +Transfer,Current,2025-11-01 10:00:00,2025-11-01 10:00:00,From GBP Savings,100.00,0.00,GBP,COMPLETED,200.00 +Card Payment,Current,2025-11-02 14:30:00,2025-11-02 15:00:00,Coffee Shop,-4.50,0.00,GBP,COMPLETED,195.50 +Card Payment,Current,2025-11-03 09:00:00,,Grocery Store,-25.00,0.00,GBP,REVERTED, diff --git a/tests/unit/etl/bank/__init__.py b/tests/unit/etl/bank/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/etl/bank/conftest.py b/tests/unit/etl/bank/conftest.py new file mode 100644 index 00000000..47c546ce --- /dev/null +++ b/tests/unit/etl/bank/conftest.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from pathlib import Path + +_FIXTURES_ROOT = Path(__file__).parents[3] / "fixtures" +_BASE = "users/alice/bank/v1" + + +def load_csv_fixture(relative_path: str) -> bytes: + return (_FIXTURES_ROOT / relative_path).read_bytes() + + +REVOLUT_CSV = load_csv_fixture(f"{_BASE}/revolut/account-statement.csv") +AMEX_CSV = load_csv_fixture(f"{_BASE}/amex/statement.csv") +BARCLAYS_CSV = load_csv_fixture(f"{_BASE}/barclays/statement.csv") diff --git a/tests/unit/etl/bank/test_amex.py b/tests/unit/etl/bank/test_amex.py new file mode 100644 index 00000000..88bc2d83 --- /dev/null +++ b/tests/unit/etl/bank/test_amex.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +import pytest + +from context_use.providers.amex.transactions.pipe import AmexTransactionsPipe +from context_use.providers.bank.pipe import _parse_date +from context_use.providers.bank.transaction_types import normalize_transaction_type +from context_use.storage.disk import DiskStorage +from context_use.testing import PipeTestKit +from tests.unit.etl.bank.conftest import AMEX_CSV + + +class TestAmexTransactionsPipe(PipeTestKit): + pipe_class = AmexTransactionsPipe + expected_extract_count = 3 + expected_transform_count = 3 + expected_fibre_kind = "Transaction" + per_record_interaction_type = True + + @pytest.fixture() + def pipe_fixture(self, tmp_path): + storage = DiskStorage(str(tmp_path / "store")) + key = "archive/amex/statement.csv" + storage.write(key, AMEX_CSV) + return storage, key + + def test_cr_row_has_positive_amount(self, extracted_records): + payment = next(r for r in extracted_records if "PAYMENT" in r.description) + assert payment.amount.startswith("+") + + def test_debit_row_has_negative_amount(self, extracted_records): + debit = next(r for r in extracted_records if "RESTAURANT" in r.description) + assert debit.amount.startswith("-") + + def test_account_owner_set(self, extracted_records): + for record in extracted_records: + assert record.account_owner is not None + + def test_foreign_currency_populated(self, extracted_records): + foreign = next(r for r in extracted_records if r.foreign_currency is not None) + assert foreign.foreign_currency == "USD" + assert foreign.foreign_amount == "12.50" + + def test_preview_shows_account_owner(self, transformed_rows): + for row in transformed_rows: + assert "(by " in row.preview + + def test_preview_contains_institution(self, transformed_rows): + for row in transformed_rows: + assert "Amex" in row.preview + + def test_provider_is_amex(self, transformed_rows): + for row in transformed_rows: + assert row.provider == "amex" + + def test_cr_row_interaction_type_is_payment(self, transformed_rows): + payment = next(r for r in transformed_rows if "PAYMENT" in r.preview) + assert payment.interaction_type == "payment" + + def test_debit_row_interaction_type_is_purchase(self, transformed_rows): + debit = next(r for r in transformed_rows if "RESTAURANT" in r.preview) + assert debit.interaction_type == "purchase" + + def test_transaction_type_set_on_records(self, extracted_records): + for record in extracted_records: + assert record.transaction_type is not None + + +class TestParseDate: + def test_date_only(self) -> None: + dt = _parse_date("2025-11-01") + assert dt.year == 2025 + assert dt.month == 11 + assert dt.day == 1 + + def test_datetime(self) -> None: + dt = _parse_date("2025-11-01 14:30:00") + assert dt.hour == 14 + + def test_invalid_raises(self) -> None: + with pytest.raises(ValueError, match="Cannot parse date"): + _parse_date("not-a-date") + + +class TestNormalizeTransactionType: + @pytest.mark.parametrize( + ("raw", "expected"), + [ + ("Card Payment", "purchase"), + ("Card Purchase", "purchase"), + ("Direct Debit", "direct_debit"), + ("Standing Order", "standing_order"), + ("Transfer", "transfer"), + ("Bank Transfer", "transfer"), + ("Cash Withdrawal", "cash"), + ("Cheque", "cheque"), + ("Interest", "interest"), + ("Payment", "payment"), + ("Unknown Type", "transaction"), + (None, "transaction"), + ], + ) + def test_normalizes(self, raw: str | None, expected: str) -> None: + assert normalize_transaction_type(raw) == expected diff --git a/tests/unit/etl/bank/test_barclays.py b/tests/unit/etl/bank/test_barclays.py new file mode 100644 index 00000000..b1da8660 --- /dev/null +++ b/tests/unit/etl/bank/test_barclays.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import pytest + +from context_use.providers.barclays.transactions.pipe import ( + BarclaysTransactionsPipe, + _extract_transaction_type, +) +from context_use.storage.disk import DiskStorage +from context_use.testing import PipeTestKit +from tests.unit.etl.bank.conftest import BARCLAYS_CSV + + +class TestBarclaysTransactionsPipe(PipeTestKit): + pipe_class = BarclaysTransactionsPipe + expected_extract_count = 3 + expected_transform_count = 3 + expected_fibre_kind = "Transaction" + per_record_interaction_type = True + + @pytest.fixture() + def pipe_fixture(self, tmp_path): + storage = DiskStorage(str(tmp_path / "store")) + key = "archive/barclays/statement.csv" + storage.write(key, BARCLAYS_CSV) + return storage, key + + def test_money_out_has_negative_amount(self, extracted_records): + debit = next(r for r in extracted_records if "Energy" in r.description) + assert debit.amount.startswith("-") + + def test_money_in_has_positive_amount(self, extracted_records): + credit = next(r for r in extracted_records if "Employer" in r.description) + assert credit.amount.startswith("+") + + def test_transaction_type_extracted(self, extracted_records): + dd = next(r for r in extracted_records if "Direct Debit" in r.description) + assert dd.transaction_type == "Direct Debit" + + def test_card_purchase_type_extracted(self, extracted_records): + cp = next(r for r in extracted_records if "Card Purchase" in r.description) + assert cp.transaction_type == "Card Purchase" + + def test_preview_contains_institution(self, transformed_rows): + for row in transformed_rows: + assert "Barclays" in row.preview + + def test_provider_is_barclays(self, transformed_rows): + for row in transformed_rows: + assert row.provider == "barclays" + + def test_direct_debit_interaction_type(self, transformed_rows): + dd = next(r for r in transformed_rows if "Direct Debit" in r.preview) + assert dd.interaction_type == "direct_debit" + + def test_card_purchase_interaction_type(self, transformed_rows): + cp = next(r for r in transformed_rows if "Card Purchase" in r.preview) + assert cp.interaction_type == "purchase" + + def test_transfer_interaction_type(self, transformed_rows): + xfer = next(r for r in transformed_rows if "Bank Transfer" in r.preview) + assert xfer.interaction_type == "transfer" + + def test_empty_money_in_and_out_skipped(self, tmp_path): + csv_with_empty = ( + b"Date,Description,Money Out,Money In,Balance\n" + b"2025-11-01,Mystery Row,,,500.00\n" + b"2025-11-02,Direct Debit to Energy Co,150.00,,350.00\n" + ) + storage = DiskStorage(str(tmp_path / "store")) + key = "archive/barclays/statement.csv" + storage.write(key, csv_with_empty) + pipe = BarclaysTransactionsPipe() + records = list(pipe.extract_file(key, storage)) + assert len(records) == 1 + assert "Energy" in records[0].description + + +class TestExtractTransactionType: + @pytest.mark.parametrize( + ("description", "expected"), + [ + ("Direct Debit to Skipton", "Direct Debit"), + ("Card Purchase to Tesco", "Card Purchase"), + ("Standing Order to MONZO", "Standing Order"), + ("Bank Transfer from Someone", "Bank Transfer"), + ("Cash Withdrawal at ATM", "Cash Withdrawal"), + ("Cheque 001234", "Cheque"), + ("Interest Payment", "Interest"), + ("Random description", None), + ("", None), + ], + ) + def test_extracts_type(self, description: str, expected: str | None) -> None: + assert _extract_transaction_type(description) == expected diff --git a/tests/unit/etl/bank/test_generic.py b/tests/unit/etl/bank/test_generic.py new file mode 100644 index 00000000..f1fac5ae --- /dev/null +++ b/tests/unit/etl/bank/test_generic.py @@ -0,0 +1,360 @@ +from __future__ import annotations + +import io +import json +import zipfile + +import pytest +from pydantic import BaseModel + +from context_use.etl.core.types import ThreadRow +from context_use.models.etl_task import EtlTask, EtlTaskStatus +from context_use.providers.bank.generic_pipe import GenericBankPipe, _resolve_amount +from context_use.providers.bank.mapping import AmountColumns, BankMapping +from context_use.providers.bank.record import BankTransactionRecord +from context_use.storage.disk import DiskStorage +from context_use.testing import PipeTestKit + +SINGLE_AMOUNT_CSV = ( + "Date,Description,Amount,Currency\n" + "2026-01-15,Coffee Shop,-4.50,GBP\n" + "2026-01-16,Salary,3000.00,GBP\n" +) + +SPLIT_AMOUNT_CSV = ( + "Date,Description,Money In,Money Out,Balance\n" + "2026-01-15,Coffee Shop,,4.50,100.00\n" + "2026-01-16,Salary,3000.00,,3100.00\n" +) + +CREDIT_CARD_CSV = ( + "Date,Description,Amount\n2026-01-15,Restaurant,25.00\n2026-01-16,Payment,-100.00\n" +) + + +def _make_mapping( + *, + single: str | None = "Amount", + money_in: str | None = None, + money_out: str | None = None, + is_credit_card: bool = False, + date_column: str = "Date", + description_column: str = "Description", +) -> BankMapping: + if single: + amount = AmountColumns(single=single) + else: + amount = AmountColumns(money_in=money_in, money_out=money_out) + return BankMapping( + bank_name="TestBank", + date_column=date_column, + amount=amount, + description_column=description_column, + currency="GBP", + is_credit_card=is_credit_card, + ) + + +def _make_task(key: str) -> EtlTask: + return EtlTask( + archive_id="a1", + provider="bank", + interaction_type="transaction", + source_uris=[key], + status=EtlTaskStatus.CREATED.value, + ) + + +class TestAmountColumns: + def test_single_column(self) -> None: + ac = AmountColumns(single="Amount") + assert ac.single == "Amount" + assert ac.money_in is None + + def test_split_columns(self) -> None: + ac = AmountColumns(money_in="In", money_out="Out") + assert ac.money_in == "In" + assert ac.money_out == "Out" + assert ac.single is None + + def test_neither_raises(self) -> None: + with pytest.raises(ValueError, match="Specify either"): + AmountColumns() + + def test_both_raises(self) -> None: + with pytest.raises(ValueError, match="Specify either"): + AmountColumns(single="Amount", money_in="In", money_out="Out") + + def test_partial_split_raises(self) -> None: + with pytest.raises(ValueError, match="Both"): + AmountColumns(money_in="In") + + +_SINGLE = AmountColumns(single="Amount") + + +class TestResolveAmount: + def test_single_positive(self) -> None: + row = {"Amount": "100.00"} + result = _resolve_amount(row, _SINGLE, is_credit_card=False) + assert result == "100.00" + + def test_single_negative(self) -> None: + row = {"Amount": "-50.00"} + result = _resolve_amount(row, _SINGLE, is_credit_card=False) + assert result == "-50.00" + + def test_single_empty_returns_none(self) -> None: + row = {"Amount": ""} + result = _resolve_amount(row, _SINGLE, is_credit_card=False) + assert result is None + + def test_credit_card_flips_sign(self) -> None: + row = {"Amount": "25.00"} + result = _resolve_amount(row, _SINGLE, is_credit_card=True) + assert result == "-25.0" + + def test_credit_card_payment_becomes_positive(self) -> None: + row = {"Amount": "-100.00"} + result = _resolve_amount(row, _SINGLE, is_credit_card=True) + assert result == "100.0" + + def test_split_money_out(self) -> None: + cols = AmountColumns(money_in="In", money_out="Out") + row = {"In": "", "Out": "4.50"} + result = _resolve_amount(row, cols, is_credit_card=False) + assert result == "-4.50" + + def test_split_money_in(self) -> None: + cols = AmountColumns(money_in="In", money_out="Out") + row = {"In": "3000.00", "Out": ""} + result = _resolve_amount(row, cols, is_credit_card=False) + assert result == "+3000.00" + + def test_split_both_empty_returns_none(self) -> None: + cols = AmountColumns(money_in="In", money_out="Out") + row = {"In": "", "Out": ""} + result = _resolve_amount(row, cols, is_credit_card=False) + assert result is None + + +class TestGenericBankPipeSingleAmount: + @pytest.fixture() + def storage_and_key(self, tmp_path: object) -> tuple[DiskStorage, str]: + storage = DiskStorage(str(tmp_path)) # type: ignore[arg-type] + key = "archive/bank/statement.csv" + storage.write(key, SINGLE_AMOUNT_CSV.encode()) + return storage, key + + @pytest.fixture() + def records( + self, storage_and_key: tuple[DiskStorage, str] + ) -> list[BankTransactionRecord]: + storage, key = storage_and_key + mapping = _make_mapping() + pipe = GenericBankPipe(mapping=mapping) + task = _make_task(key) + return list(pipe.extract(task, storage)) + + @pytest.fixture() + def rows(self, storage_and_key: tuple[DiskStorage, str]) -> list[ThreadRow]: + storage, key = storage_and_key + mapping = _make_mapping() + pipe = GenericBankPipe(mapping=mapping) + task = _make_task(key) + return list(pipe.run(task, storage)) + + def test_extract_count(self, records: list[BankTransactionRecord]) -> None: + assert len(records) == 2 + + def test_amounts(self, records: list[BankTransactionRecord]) -> None: + assert records[0].amount == "-4.50" + assert records[1].amount == "3000.00" + + def test_descriptions(self, records: list[BankTransactionRecord]) -> None: + assert records[0].description == "Coffee Shop" + assert records[1].description == "Salary" + + def test_dates(self, records: list[BankTransactionRecord]) -> None: + assert records[0].date == "2026-01-15" + assert records[1].date == "2026-01-16" + + def test_transform_yields_thread_rows(self, rows: list[ThreadRow]) -> None: + assert len(rows) == 2 + for row in rows: + assert isinstance(row, ThreadRow) + assert row.provider == "bank" + assert row.payload["fibreKind"] == "Transaction" + + def test_preview_contains_bank_name(self, rows: list[ThreadRow]) -> None: + for row in rows: + assert "TestBank" in row.preview + + def test_spent_preview(self, rows: list[ThreadRow]) -> None: + assert "Spent" in rows[0].preview + assert "Coffee Shop" in rows[0].preview + + def test_source_preserved(self, records: list[BankTransactionRecord]) -> None: + for r in records: + assert r.source is not None + parsed = json.loads(r.source) + assert "Date" in parsed + + +class TestGenericBankPipeSplitAmount: + @pytest.fixture() + def records(self, tmp_path: object) -> list[BankTransactionRecord]: + storage = DiskStorage(str(tmp_path)) # type: ignore[arg-type] + key = "archive/bank/statement.csv" + storage.write(key, SPLIT_AMOUNT_CSV.encode()) + mapping = _make_mapping(single=None, money_in="Money In", money_out="Money Out") + pipe = GenericBankPipe(mapping=mapping) + task = _make_task(key) + return list(pipe.extract(task, storage)) + + def test_extract_count(self, records: list[BankTransactionRecord]) -> None: + assert len(records) == 2 + + def test_money_out_is_negative(self, records: list[BankTransactionRecord]) -> None: + assert records[0].amount == "-4.50" + + def test_money_in_is_positive(self, records: list[BankTransactionRecord]) -> None: + assert records[1].amount == "+3000.00" + + +class TestGenericBankPipeCreditCard: + @pytest.fixture() + def records(self, tmp_path: object) -> list[BankTransactionRecord]: + storage = DiskStorage(str(tmp_path)) # type: ignore[arg-type] + key = "archive/bank/statement.csv" + storage.write(key, CREDIT_CARD_CSV.encode()) + mapping = _make_mapping(is_credit_card=True) + pipe = GenericBankPipe(mapping=mapping) + task = _make_task(key) + return list(pipe.extract(task, storage)) + + def test_charge_becomes_negative( + self, records: list[BankTransactionRecord] + ) -> None: + assert records[0].amount == "-25.0" + + def test_payment_becomes_positive( + self, records: list[BankTransactionRecord] + ) -> None: + assert records[1].amount == "100.0" + + +class TestGenericBankPipeNoMapping: + def test_no_records_without_mapping(self, tmp_path: object) -> None: + storage = DiskStorage(str(tmp_path)) # type: ignore[arg-type] + key = "archive/bank/statement.csv" + storage.write(key, SINGLE_AMOUNT_CSV.encode()) + pipe = GenericBankPipe() + task = _make_task(key) + rows = list(pipe.run(task, storage)) + assert rows == [] + assert pipe.error_count == 1 + + +class TestReadCsvHeaders: + def test_reads_headers_from_zip(self, tmp_path: object) -> None: + from context_use.cli.bank_setup import _read_csv_headers + + zip_path = str(tmp_path) + "/test.zip" # type: ignore[operator] + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + zf.writestr("bank/statement.csv", SINGLE_AMOUNT_CSV) + with open(zip_path, "wb") as f: + f.write(buf.getvalue()) + + result = _read_csv_headers(zip_path) + assert result is not None + filename, headers = result + assert filename == "bank/statement.csv" + assert headers == ["Date", "Description", "Amount", "Currency"] + + def test_skips_macosx(self, tmp_path: object) -> None: + from context_use.cli.bank_setup import _read_csv_headers + + zip_path = str(tmp_path) + "/test.zip" # type: ignore[operator] + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + zf.writestr("__MACOSX/._statement.csv", "junk") + zf.writestr("bank/statement.csv", SINGLE_AMOUNT_CSV) + with open(zip_path, "wb") as f: + f.write(buf.getvalue()) + + result = _read_csv_headers(zip_path) + assert result is not None + assert result[0] == "bank/statement.csv" + + def test_no_csv_returns_none(self, tmp_path: object) -> None: + from context_use.cli.bank_setup import _read_csv_headers + + zip_path = str(tmp_path) + "/test.zip" # type: ignore[operator] + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + zf.writestr("data.json", "{}") + with open(zip_path, "wb") as f: + f.write(buf.getvalue()) + + assert _read_csv_headers(zip_path) is None + + +_DEFAULT_MAPPING = _make_mapping() + + +class TestGenericBankPipeKit(PipeTestKit): + """PipeTestKit conformance for GenericBankPipe.""" + + pipe_class = GenericBankPipe + expected_extract_count = 2 + expected_transform_count = 2 + expected_fibre_kind = "Transaction" + per_record_interaction_type = True + + @pytest.fixture() + def pipe_fixture(self, tmp_path: object) -> tuple[DiskStorage, str]: + storage = DiskStorage(str(tmp_path)) # type: ignore[arg-type] + key = "archive/bank/statement.csv" + storage.write(key, SINGLE_AMOUNT_CSV.encode()) + return storage, key + + def _make_pipe(self) -> GenericBankPipe: + return GenericBankPipe(mapping=_DEFAULT_MAPPING) + + @pytest.fixture() + def extracted_records( + self, pipe_fixture: tuple[DiskStorage, str] + ) -> list[BaseModel]: + storage, key = pipe_fixture + pipe = self._make_pipe() + task = _make_task(key) + return list(pipe.extract(task, storage)) + + @pytest.fixture() + def transformed_rows( + self, pipe_fixture: tuple[DiskStorage, str] + ) -> list[ThreadRow]: + storage, key = pipe_fixture + pipe = self._make_pipe() + task = _make_task(key) + return list(pipe.run(task, storage)) + + def test_counts_tracked(self, pipe_fixture: tuple[DiskStorage, str]) -> None: + storage, key = pipe_fixture + pipe = self._make_pipe() + task = _make_task(key) + list(pipe.run(task, storage)) + assert pipe.extracted_count == self.expected_extract_count + assert pipe.transformed_count == self.expected_transform_count + assert pipe.error_count == 0 + + def test_unique_keys_are_stable( + self, pipe_fixture: tuple[DiskStorage, str] + ) -> None: + storage, key = pipe_fixture + task = _make_task(key) + first = [r.unique_key for r in self._make_pipe().run(task, storage)] + second = [r.unique_key for r in self._make_pipe().run(task, storage)] + assert first == second diff --git a/tests/unit/etl/bank/test_revolut.py b/tests/unit/etl/bank/test_revolut.py new file mode 100644 index 00000000..ada2b03b --- /dev/null +++ b/tests/unit/etl/bank/test_revolut.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +import pytest + +from context_use.providers.revolut.transactions.pipe import ( + RevolutTransactionsPipe, + _parse_revolut_date, +) +from context_use.storage.disk import DiskStorage +from context_use.testing import PipeTestKit +from tests.unit.etl.bank.conftest import REVOLUT_CSV + + +class TestRevolutTransactionsPipe(PipeTestKit): + pipe_class = RevolutTransactionsPipe + expected_extract_count = 2 + expected_transform_count = 2 + expected_fibre_kind = "Transaction" + per_record_interaction_type = True + + @pytest.fixture() + def pipe_fixture(self, tmp_path): + storage = DiskStorage(str(tmp_path / "store")) + key = "archive/revolut/account-statement.csv" + storage.write(key, REVOLUT_CSV) + return storage, key + + def test_reverted_rows_filtered(self, extracted_records): + for record in extracted_records: + assert "REVERTED" not in (record.source or "") + + def test_preview_contains_institution(self, transformed_rows): + for row in transformed_rows: + assert "Revolut" in row.preview + + def test_provider_is_revolut(self, transformed_rows): + for row in transformed_rows: + assert row.provider == "revolut" + + def test_transfer_has_positive_amount(self, extracted_records): + transfer = next( + r for r in extracted_records if r.transaction_type == "Transfer" + ) + assert not transfer.amount.startswith("-") + + def test_card_payment_has_negative_amount(self, extracted_records): + card = next( + r for r in extracted_records if r.transaction_type == "Card Payment" + ) + assert card.amount.startswith("-") + + def test_transfer_interaction_type(self, transformed_rows): + xfer = next(r for r in transformed_rows if "From GBP" in r.preview) + assert xfer.interaction_type == "transfer" + + def test_purchase_interaction_type(self, transformed_rows): + card = next(r for r in transformed_rows if "Coffee" in r.preview) + assert card.interaction_type == "purchase" + + +class TestParseRevolutDate: + def test_valid_datetime(self) -> None: + assert _parse_revolut_date("2025-11-01 10:00:00") == "2025-11-01" + + def test_valid_date_only(self) -> None: + assert _parse_revolut_date("2025-11-01") == "2025-11-01" + + def test_invalid_date_raises(self) -> None: + with pytest.raises(ValueError, match="Cannot parse Revolut date"): + _parse_revolut_date("not-a-date")