diff --git a/README.md b/README.md index 28e0e09..f9c0efa 100644 --- a/README.md +++ b/README.md @@ -70,7 +70,7 @@ cd analytics-agent bash quickstart.sh ``` -The script starts a local DataHub instance, loads the Olist e-commerce sample dataset and catalog metadata, then builds and launches Analytics Agent at **http://localhost:8100**. Postgres data is persisted to `~/.datahub/analytics-agent/postgres-data/` so it survives container restarts. +The script starts a local DataHub instance, loads the Fiction Retail sample dataset and catalog metadata, then builds and launches Analytics Agent at **http://localhost:8100**. Postgres data is persisted to `~/.datahub/analytics-agent/postgres-data/` so it survives container restarts. **Using AWS Bedrock?** Export `LLM_PROVIDER=bedrock` before running the script. The script will verify your AWS credentials and Bedrock access before starting the container, and mount `~/.aws` read-only so boto3 picks up your profiles and SSO cache automatically. diff --git a/backend/src/analytics_agent/cli.py b/backend/src/analytics_agent/cli.py index be265d1..5da699c 100644 --- a/backend/src/analytics_agent/cli.py +++ b/backend/src/analytics_agent/cli.py @@ -85,7 +85,7 @@ async def _run_all_seeds() -> None: "--demo", is_flag=True, default=False, - help="Full demo: start DataHub, load Olist sample data, and launch the agent.", + help="Full demo: start DataHub, load Fiction Retail sample data, and launch the agent.", ) @click.option( "--reconfigure", diff --git a/backend/src/analytics_agent/demo/config.demo.yaml b/backend/src/analytics_agent/demo/config.demo.yaml index da53b8f..75f4522 100644 --- a/backend/src/analytics_agent/demo/config.demo.yaml +++ b/backend/src/analytics_agent/demo/config.demo.yaml @@ -7,7 +7,7 @@ context_platforms: engines: - type: mysql - name: olist_ecommerce + name: fiction_retail connection: dialect: mysql+pymysql host: "${MYSQL_HOST}" diff --git a/backend/src/analytics_agent/demo/ingest_metadata.py b/backend/src/analytics_agent/demo/ingest_metadata.py index c65b157..10112df 100644 --- a/backend/src/analytics_agent/demo/ingest_metadata.py +++ b/backend/src/analytics_agent/demo/ingest_metadata.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """ -Register the Olist MySQL tables in DataHub via GraphQL: +Register the Fiction Retail MySQL tables in DataHub via GraphQL: 1. createIngestionSource — upsert a MySQL ingestion recipe 2. createIngestionExecutionRequest — run it inside DataHub's executor 3. Poll until SUCCESS, then patch in human-readable descriptions @@ -21,14 +21,16 @@ import urllib.request TABLE_DESCRIPTIONS: dict[str, str] = { - "olist_customers": "Brazilian e-commerce customers with city and state information", - "olist_orders": "Order lifecycle records including status, purchase timestamp, and delivery timestamps", - "olist_order_items": "Line items in each order, linking orders to products and sellers with price and freight", - "olist_order_payments": "Payment method and installment details for each order", - "olist_order_reviews": "Customer satisfaction reviews with scores and comments per order", - "olist_products": "Product catalog with category, dimensions, and weight", - "olist_sellers": "Marketplace sellers with city and state location", - "product_category_name_translation": "Portuguese to English category name translations", + "customers": "Retail customers with contact details, location, and segment classification", + "orders": "Order records including status, payment method, and total amount", + "order_items": "Line items linking orders to products with quantity, unit price, and discount", + "products": "Product catalog with category, brand, price, and physical dimensions", + "suppliers": "Supplier directory with country and contract information", + "inventory": "Stock levels per product and warehouse with reorder thresholds", + "warehouses": "Warehouse locations with capacity and operational details", + "shipments": "Shipment tracking records with carrier, dates, and delivery state", + "returns": "Return and refund records with reason codes", + "promotions": "Promotional campaigns with discount rules, validity windows, and category scope", } @@ -95,7 +97,7 @@ def _upsert_ingestion_source( input_fields = { "name": "Analytics Agent Demo — MySQL", "type": "mysql", - "description": f"Olist e-commerce sample data in {database}", + "description": f"Fiction Retail sample data in {database}", "config": { "recipe": recipe, "executorId": "default", @@ -196,7 +198,7 @@ def _patch_descriptions(gms_url: str, token: str, database: str) -> None: def _seed_demo_context(gms_url: str, token: str, database: str) -> None: - """Seed demo-ready tags, glossary terms, and table ownership for Olist tables.""" + """Seed demo-ready tags, glossary terms, and table ownership for Fiction Retail tables.""" from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.metadata.schema_classes import ( AuditStampClass, @@ -224,9 +226,10 @@ def _dataset_urn(table: str) -> str: # ── 1. Create tag entities ────────────────────────────────────────────── TAGS: dict[str, str] = { - "pii": "Contains personally identifiable information (customer IDs, locations, free-text review comments)", - "identifier": "Primary or foreign key columns used for joining tables", - "financial": "Contains monetary values (prices, payments, freight costs)", + "pii": "Contains personally identifiable information", + "financial": "Contains monetary values", + "transactional": "Records individual business transactions", + "reference_data": "Lookup or configuration data that changes infrequently", } for tag_name, description in TAGS.items(): try: @@ -244,32 +247,46 @@ def _dataset_urn(table: str) -> str: # ── 2. Create glossary term entities ──────────────────────────────────── TERMS: dict[str, dict[str, str]] = { - "revenue": { - "name": "Revenue", + "order_status": { + "name": "Order Status", "definition": ( - "Total sales value from delivered orders. Calculated as SUM(price + freight_value) " - "from olist_order_items, filtered to orders where olist_orders.order_status = 'delivered'. " - "Non-delivered orders (canceled, unavailable) are excluded. For per-seller revenue, group by " - "olist_order_items.seller_id. For per-category revenue, join via product_id to olist_products " - "and then to product_category_name_translation for English category names." + "Lifecycle state of an order. Values: pending, processing, shipped, delivered, " + "cancelled, returned." ), }, - "delivery_sla": { - "name": "Delivery SLA", + "customer_segment": { + "name": "Customer Segment", "definition": ( - "Delivery performance metric. An order meets SLA if order_delivered_customer_date <= " - "order_estimated_delivery_date. SLA breach = actual delivery later than estimated. " - "Per-seller SLA breach rate = count(breached orders) / count(delivered orders) for orders " - "shipped by that seller. Only computed for delivered orders; canceled and undelivered orders " - "are excluded from the denominator." + "Classification of a customer based on purchase behavior or profile. " + "Used to target promotions and personalize recommendations." ), }, - "active_seller": { - "name": "Active Seller", + "discount_pct": { + "name": "Discount Percentage", "definition": ( - "A seller with at least one delivered order in the trailing 30 days, measured from the most " - "recent order_purchase_timestamp in the dataset. For historical Olist data (2016-2018), " - "'trailing 30 days' is computed relative to MAX(order_purchase_timestamp), not today's date." + "Percentage reduction applied to the unit price of an order item. " + "Sourced from the applied promotion or negotiated directly." + ), + }, + "reorder_threshold": { + "name": "Reorder Threshold", + "definition": ( + "Minimum quantity_on_hand below which a restock should be triggered " + "for a given product at a warehouse." + ), + }, + "return_reason_code": { + "name": "Return Reason Code", + "definition": ( + "Standardized code classifying why an item was returned. " + "Used for quality tracking and supplier performance analysis." + ), + }, + "shipment_state": { + "name": "Shipment State", + "definition": ( + "Current status of a shipment in the fulfillment pipeline. " + "Values: pending, in_transit, delivered, failed." ), }, } @@ -298,28 +315,35 @@ def _dataset_urn(table: str) -> str: # so GlobalTagsClass and GlossaryTermsClass are each written exactly once — # a second emit of either aspect would replace the first (overwrite bug). TAG_MAP: dict[str, list[str]] = { - "olist_customers": ["pii", "identifier"], - "olist_orders": ["identifier"], - "olist_order_items": ["identifier", "financial"], - "olist_order_payments": ["financial"], - "olist_order_reviews": ["pii"], - "olist_products": ["identifier"], - "olist_sellers": ["pii", "identifier"], + "customers": ["pii"], + "orders": ["financial", "transactional"], + "order_items": ["financial", "transactional"], + "shipments": ["transactional"], + "returns": ["financial", "transactional"], + "products": ["reference_data"], + "suppliers": ["reference_data"], + "warehouses": ["reference_data"], + "promotions": ["reference_data"], } TERM_MAP: dict[str, list[str]] = { - "olist_order_items": ["revenue"], - "olist_orders": ["revenue", "delivery_sla"], - "olist_sellers": ["delivery_sla", "active_seller"], + "orders": ["order_status"], + "customers": ["customer_segment"], + "order_items": ["discount_pct"], + "inventory": ["reorder_threshold"], + "returns": ["return_reason_code"], + "shipments": ["shipment_state"], } OWNER_MAP: dict[str, str] = { - "olist_sellers": "logistics_team", - "olist_order_items": "logistics_team", - "olist_order_payments": "finance_team", - "olist_order_reviews": "customer_experience_team", - "olist_customers": "customer_experience_team", - "olist_products": "product_team", - "product_category_name_translation": "product_team", - "olist_orders": "data_platform_team", + "customers": "customer_team", + "orders": "commerce_team", + "order_items": "commerce_team", + "products": "catalog_team", + "suppliers": "catalog_team", + "inventory": "logistics_team", + "warehouses": "logistics_team", + "shipments": "logistics_team", + "returns": "finance_team", + "promotions": "marketing_team", } for table in sorted(set(TAG_MAP) | set(TERM_MAP) | set(OWNER_MAP)): aspects: list = [] @@ -367,7 +391,9 @@ def _dataset_urn(table: str) -> str: def main() -> None: - parser = argparse.ArgumentParser(description="Ingest Olist metadata into DataHub via GraphQL") + parser = argparse.ArgumentParser( + description="Ingest Fiction Retail metadata into DataHub via GraphQL" + ) parser.add_argument("--gms-url", default="http://localhost:8080") parser.add_argument("--token", default="") parser.add_argument("--database", default="analytics_agent_demo") @@ -414,7 +440,7 @@ def main() -> None: print() print("[→] Seeding demo context (tags, glossary terms, ownership)...") _seed_demo_context(args.gms_url, args.token, args.database) - print("[✓] Demo context seeded — 3 tags, 3 glossary terms, 8 table owners") + print("[✓] Demo context seeded — 4 tags, 6 glossary terms, 10 table owners") print() print(f"[✓] Done — {len(TABLE_DESCRIPTIONS)} tables indexed and described in DataHub.") diff --git a/backend/src/analytics_agent/demo/load_sample_data.py b/backend/src/analytics_agent/demo/load_sample_data.py index 3b7e993..4aee5bd 100644 --- a/backend/src/analytics_agent/demo/load_sample_data.py +++ b/backend/src/analytics_agent/demo/load_sample_data.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """ -Download the Olist e-commerce SQLite dataset and load it into MySQL. +Download the Fiction Retail SQLite dataset and load it into MySQL. Usage (from repo root): uv run python scripts/load_sample_data.py [options] @@ -15,31 +15,23 @@ import urllib.request from pathlib import Path -OLIST_URL = ( - "https://github.com/datahub-project/static-assets/raw/main/datasets/olist-ecommerce/olist.db" -) +FICTION_RETAIL_URL = "https://github.com/datahub-project/static-assets/raw/main/datasets/fiction-retail/fiction-retail.db" TABLES = [ - "olist_customers", - "olist_orders", - "olist_order_items", - "olist_order_payments", - "olist_order_reviews", - "olist_products", - "olist_sellers", - "product_category_name_translation", + "customers", + "orders", + "order_items", + "products", + "suppliers", + "inventory", + "warehouses", + "shipments", + "returns", + "promotions", ] # Fields whose content is long free-text — map to MySQL TEXT instead of VARCHAR(255). -# NOTE: product_category_name and product_category_name_english are kept as VARCHAR(255) -# because they are join keys between tables; TEXT columns cannot be indexed without a -# prefix length in MySQL. -LONG_TEXT_FIELDS = { - "review_comment_message", - "review_comment_title", - "product_description_lenght", # intentional Olist typo - "product_description_length", -} +LONG_TEXT_FIELDS: set[str] = set() BATCH_SIZE = 500 @@ -105,7 +97,7 @@ def _build_create_table(table: str, columns: list[tuple]) -> str: def main() -> None: - parser = argparse.ArgumentParser(description="Load Olist sample data into MySQL") + parser = argparse.ArgumentParser(description="Load Fiction Retail sample data into MySQL") parser.add_argument("--host", default="localhost", help="MySQL host (default: localhost)") parser.add_argument("--port", type=int, default=3306, help="MySQL port (default: 3306)") parser.add_argument( @@ -136,12 +128,12 @@ def main() -> None: sys.exit(1) # --- 1. Download SQLite file --- - print("[→] Downloading Olist dataset from GitHub...") + print("[→] Downloading Fiction Retail dataset from GitHub...") with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp: tmp_path = Path(tmp.name) try: - _download(OLIST_URL, tmp_path) + _download(FICTION_RETAIL_URL, tmp_path) print(f"[✓] Downloaded to {tmp_path} ({tmp_path.stat().st_size:,} bytes)") # --- 2. Open SQLite --- diff --git a/backend/src/analytics_agent/quickstart.py b/backend/src/analytics_agent/quickstart.py index c6a74e5..e22c8c3 100644 --- a/backend/src/analytics_agent/quickstart.py +++ b/backend/src/analytics_agent/quickstart.py @@ -404,7 +404,7 @@ def _bootstrap_and_launch(config_dir: Path, port: int, *, open_setup: bool = Fal sys.exit(1) -# ── Demo mode (full DataHub + Olist sample data) ─────────────────────────────── +# ── Demo mode (full DataHub + Fiction Retail sample data) ────────────────────── # On Linux, host.docker.internal doesn't resolve — use Docker's default bridge gateway. _HOST_INTERNAL = "host.docker.internal" if sys.platform == "darwin" else "172.17.0.1" @@ -501,7 +501,7 @@ def _provision_datahub_token() -> str: return "" -def _load_olist_data() -> None: +def _load_fiction_retail_data() -> None: """Run load_sample_data.py from the bundled demo package.""" from analytics_agent.demo import load_sample_data # noqa: F401 @@ -524,9 +524,9 @@ def _load_olist_data() -> None: check=False, ) if result.returncode != 0: - click.echo(" ✗ Olist data loading failed.", err=True) + click.echo(" ✗ Fiction Retail data loading failed.", err=True) sys.exit(1) - click.echo(" ✓ Olist sample data loaded") + click.echo(" ✓ Fiction Retail sample data loaded") def _ingest_metadata(gms_token: str) -> None: @@ -556,11 +556,11 @@ def _ingest_metadata(gms_token: str) -> None: if result.returncode != 0: click.echo(" ✗ Metadata ingestion failed.", err=True) sys.exit(1) - click.echo(" ✓ Olist metadata ingested into DataHub") + click.echo(" ✓ Fiction Retail metadata ingested into DataHub") def _write_demo_config(config_dir: Path, gms_token: str, llm_env: dict[str, str]) -> None: - """Write .env and config.yaml for the demo (DataHub + Olist MySQL).""" + """Write .env and config.yaml for the demo (DataHub + Fiction Retail MySQL).""" import shutil # .env @@ -589,7 +589,7 @@ def _write_demo_config(config_dir: Path, gms_token: str, llm_env: dict[str, str] def run_demo(port: int = 8100) -> None: - """Full demo: DataHub quickstart + Olist data + analytics agent.""" + """Full demo: DataHub quickstart + Fiction Retail data + analytics agent.""" click.echo( textwrap.dedent(""" ╔══════════════════════════════════════════╗ @@ -628,8 +628,8 @@ def run_demo(port: int = 8100) -> None: click.echo("\n→ Starting DataHub…") _start_datahub() - click.echo("\n→ Loading Olist sample data…") - _load_olist_data() + click.echo("\n→ Loading Fiction Retail sample data…") + _load_fiction_retail_data() click.echo("\n→ Ingesting metadata into DataHub…") gms_token = _provision_datahub_token() diff --git a/config.demo.yaml b/config.demo.yaml index da53b8f..75f4522 100644 --- a/config.demo.yaml +++ b/config.demo.yaml @@ -7,7 +7,7 @@ context_platforms: engines: - type: mysql - name: olist_ecommerce + name: fiction_retail connection: dialect: mysql+pymysql host: "${MYSQL_HOST}" diff --git a/config.yaml.example b/config.yaml.example index 2983671..7df92da 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -39,7 +39,7 @@ engines: [] # MySQL example: # - type: mysql -# name: olist_ecommerce +# name: fiction_retail # connection: # dialect: mysql+pymysql # host: "${MYSQL_HOST}" diff --git a/quickstart.sh b/quickstart.sh index 0d2189b..21979f8 100755 --- a/quickstart.sh +++ b/quickstart.sh @@ -323,14 +323,14 @@ fi # ────────────────────────────────────────────────────────────────────────────── # 4. Load sample data (idempotent) # ────────────────────────────────────────────────────────────────────────────── -go "Checking if Olist sample data is already loaded..." +go "Checking if Fiction Retail sample data is already loaded..." -# Returns 0 only if ALL 7 expected tables exist AND at least one key table +# Returns 0 only if ALL 10 expected tables exist AND at least one key table # (orders) has rows — so empty or partial loads always re-trigger. _sample_data_loaded() { uv run python -c " import pymysql, sys -REQUIRED = {'olist_customers','olist_orders','olist_order_items','olist_order_payments','olist_order_reviews','olist_products','olist_sellers','product_category_name_translation'} +REQUIRED = {'customers','orders','order_items','products','suppliers','inventory','warehouses','shipments','returns','promotions'} try: conn = pymysql.connect( host='localhost', port=3306, @@ -346,7 +346,7 @@ try: missing = REQUIRED - found print(f'Missing tables: {missing}', file=sys.stderr) conn.close(); sys.exit(1) - cur.execute(\"SELECT COUNT(*) FROM \`${MYSQL_DATABASE:-analytics_agent_demo}\`.\`olist_orders\`\") + cur.execute(\"SELECT COUNT(*) FROM \`${MYSQL_DATABASE:-analytics_agent_demo}\`.\`orders\`\") row_count = cur.fetchone()[0] conn.close() if row_count == 0: @@ -363,10 +363,10 @@ except Exception as e: # NOTE: assignment inside `if` suppresses set -e for the subshell exit code, # which is what we want — a non-zero exit means "not loaded yet", not a fatal error. if _check_result=$(_sample_data_loaded); then - ok "Olist sample data already loaded — ${_check_result}" + ok "Fiction Retail sample data already loaded — ${_check_result}" else [[ -n "${_check_result:-}" ]] && warn "${_check_result}" - go "Loading Olist sample data into MySQL..." + go "Loading Fiction Retail sample data into MySQL..." cd "$REPO_ROOT" uv run python scripts/load_sample_data.py \ --user "$MYSQL_USER" \ @@ -558,11 +558,11 @@ fi echo -e " ${BOLD}DataHub UI:${NC} http://localhost:9002 (datahub / datahub)" echo "" echo -e " ${BOLD}Try asking:${NC}" -echo " • Top 10 categories by revenue?" +echo " • Top 5 product categories by revenue?" echo " • Monthly order volumes (chart)" -echo " • States with best review scores?" -echo " • Average delivery time by category" -echo " • Sellers with most late deliveries" +echo " • Which warehouses have the most shipments?" +echo " • What products are below their reorder threshold?" +echo " • Which suppliers have the most products?" echo "" echo " Stop: docker stop analytics-agent-quickstart" echo " Logs: docker logs -f analytics-agent-quickstart" diff --git a/scripts/ingest_metadata.py b/scripts/ingest_metadata.py index d862117..71b6d22 100644 --- a/scripts/ingest_metadata.py +++ b/scripts/ingest_metadata.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """ -Register the Olist MySQL tables in DataHub via GraphQL: +Register the Fiction Retail MySQL tables in DataHub via GraphQL: 1. createIngestionSource — upsert a MySQL ingestion recipe 2. createIngestionExecutionRequest — run it inside DataHub's executor 3. Poll until SUCCESS, then patch in human-readable descriptions @@ -20,14 +20,16 @@ import urllib.request TABLE_DESCRIPTIONS: dict[str, str] = { - "olist_customers": "Brazilian e-commerce customers with city and state information", - "olist_orders": "Order lifecycle records including status, purchase timestamp, and delivery timestamps", - "olist_order_items": "Line items in each order, linking orders to products and sellers with price and freight", - "olist_order_payments": "Payment method and installment details for each order", - "olist_order_reviews": "Customer satisfaction reviews with scores and comments per order", - "olist_products": "Product catalog with category, dimensions, and weight", - "olist_sellers": "Marketplace sellers with city and state location", - "product_category_name_translation": "Portuguese to English category name translations", + "customers": "Retail customers with contact details, location, and segment classification", + "orders": "Order records including status, payment method, and total amount", + "order_items": "Line items linking orders to products with quantity, unit price, and discount", + "products": "Product catalog with category, brand, price, and physical dimensions", + "suppliers": "Supplier directory with country and contract information", + "inventory": "Stock levels per product and warehouse with reorder thresholds", + "warehouses": "Warehouse locations with capacity and operational details", + "shipments": "Shipment tracking records with carrier, dates, and delivery state", + "returns": "Return and refund records with reason codes", + "promotions": "Promotional campaigns with discount rules, validity windows, and category scope", } @@ -79,7 +81,7 @@ def _upsert_ingestion_source(gms_url: str, token: str, mysql_host_port: str, dat input_fields = { "name": "Analytics Agent Demo — MySQL", "type": "mysql", - "description": f"Olist e-commerce sample data in {database}", + "description": f"Fiction Retail sample data in {database}", "config": { "recipe": recipe, "executorId": "default", @@ -169,7 +171,7 @@ def _patch_descriptions(gms_url: str, token: str, database: str) -> None: def _seed_demo_context(gms_url: str, token: str, database: str) -> None: - """Seed demo-ready tags, glossary terms, and table ownership for Olist tables.""" + """Seed demo-ready tags, glossary terms, and table ownership for Fiction Retail tables.""" from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.metadata.schema_classes import ( AuditStampClass, @@ -197,9 +199,10 @@ def _dataset_urn(table: str) -> str: # ── 1. Create tag entities ────────────────────────────────────────────── TAGS: dict[str, str] = { - "pii": "Contains personally identifiable information (customer IDs, locations, free-text review comments)", - "identifier": "Primary or foreign key columns used for joining tables", - "financial": "Contains monetary values (prices, payments, freight costs)", + "pii": "Contains personally identifiable information", + "financial": "Contains monetary values", + "transactional": "Records individual business transactions", + "reference_data": "Lookup or configuration data that changes infrequently", } for tag_name, description in TAGS.items(): try: @@ -215,32 +218,46 @@ def _dataset_urn(table: str) -> str: # ── 2. Create glossary term entities ──────────────────────────────────── TERMS: dict[str, dict[str, str]] = { - "revenue": { - "name": "Revenue", + "order_status": { + "name": "Order Status", "definition": ( - "Total sales value from delivered orders. Calculated as SUM(price + freight_value) " - "from olist_order_items, filtered to orders where olist_orders.order_status = 'delivered'. " - "Non-delivered orders (canceled, unavailable) are excluded. For per-seller revenue, group by " - "olist_order_items.seller_id. For per-category revenue, join via product_id to olist_products " - "and then to product_category_name_translation for English category names." + "Lifecycle state of an order. Values: pending, processing, shipped, delivered, " + "cancelled, returned." ), }, - "delivery_sla": { - "name": "Delivery SLA", + "customer_segment": { + "name": "Customer Segment", "definition": ( - "Delivery performance metric. An order meets SLA if order_delivered_customer_date <= " - "order_estimated_delivery_date. SLA breach = actual delivery later than estimated. " - "Per-seller SLA breach rate = count(breached orders) / count(delivered orders) for orders " - "shipped by that seller. Only computed for delivered orders; canceled and undelivered orders " - "are excluded from the denominator." + "Classification of a customer based on purchase behavior or profile. " + "Used to target promotions and personalize recommendations." ), }, - "active_seller": { - "name": "Active Seller", + "discount_pct": { + "name": "Discount Percentage", "definition": ( - "A seller with at least one delivered order in the trailing 30 days, measured from the most " - "recent order_purchase_timestamp in the dataset. For historical Olist data (2016-2018), " - "'trailing 30 days' is computed relative to MAX(order_purchase_timestamp), not today's date." + "Percentage reduction applied to the unit price of an order item. " + "Sourced from the applied promotion or negotiated directly." + ), + }, + "reorder_threshold": { + "name": "Reorder Threshold", + "definition": ( + "Minimum quantity_on_hand below which a restock should be triggered " + "for a given product at a warehouse." + ), + }, + "return_reason_code": { + "name": "Return Reason Code", + "definition": ( + "Standardized code classifying why an item was returned. " + "Used for quality tracking and supplier performance analysis." + ), + }, + "shipment_state": { + "name": "Shipment State", + "definition": ( + "Current status of a shipment in the fulfillment pipeline. " + "Values: pending, in_transit, delivered, failed." ), }, } @@ -265,28 +282,35 @@ def _dataset_urn(table: str) -> str: # so GlobalTagsClass and GlossaryTermsClass are each written exactly once — # a second emit of either aspect would replace the first (overwrite bug). TAG_MAP: dict[str, list[str]] = { - "olist_customers": ["pii", "identifier"], - "olist_orders": ["identifier"], - "olist_order_items": ["identifier", "financial"], - "olist_order_payments": ["financial"], - "olist_order_reviews": ["pii"], - "olist_products": ["identifier"], - "olist_sellers": ["pii", "identifier"], + "customers": ["pii"], + "orders": ["financial", "transactional"], + "order_items": ["financial", "transactional"], + "shipments": ["transactional"], + "returns": ["financial", "transactional"], + "products": ["reference_data"], + "suppliers": ["reference_data"], + "warehouses": ["reference_data"], + "promotions": ["reference_data"], } TERM_MAP: dict[str, list[str]] = { - "olist_order_items": ["revenue"], - "olist_orders": ["revenue", "delivery_sla"], - "olist_sellers": ["delivery_sla", "active_seller"], + "orders": ["order_status"], + "customers": ["customer_segment"], + "order_items": ["discount_pct"], + "inventory": ["reorder_threshold"], + "returns": ["return_reason_code"], + "shipments": ["shipment_state"], } OWNER_MAP: dict[str, str] = { - "olist_sellers": "logistics_team", - "olist_order_items": "logistics_team", - "olist_order_payments": "finance_team", - "olist_order_reviews": "customer_experience_team", - "olist_customers": "customer_experience_team", - "olist_products": "product_team", - "product_category_name_translation": "product_team", - "olist_orders": "data_platform_team", + "customers": "customer_team", + "orders": "commerce_team", + "order_items": "commerce_team", + "products": "catalog_team", + "suppliers": "catalog_team", + "inventory": "logistics_team", + "warehouses": "logistics_team", + "shipments": "logistics_team", + "returns": "finance_team", + "promotions": "marketing_team", } for table in sorted(set(TAG_MAP) | set(TERM_MAP) | set(OWNER_MAP)): aspects: list = [] @@ -324,7 +348,7 @@ def _dataset_urn(table: str) -> str: def main() -> None: - parser = argparse.ArgumentParser(description="Ingest Olist metadata into DataHub via GraphQL") + parser = argparse.ArgumentParser(description="Ingest Fiction Retail metadata into DataHub via GraphQL") parser.add_argument("--gms-url", default="http://localhost:8080") parser.add_argument("--token", default="") parser.add_argument("--database", default="analytics_agent_demo") @@ -365,7 +389,7 @@ def main() -> None: print() print("[→] Seeding demo context (tags, glossary terms, ownership)...") _seed_demo_context(args.gms_url, args.token, args.database) - print("[✓] Demo context seeded — 3 tags, 3 glossary terms, 8 table owners") + print("[✓] Demo context seeded — 4 tags, 6 glossary terms, 10 table owners") print() print(f"[✓] Done — {len(TABLE_DESCRIPTIONS)} tables indexed and described in DataHub.") diff --git a/scripts/load_sample_data.py b/scripts/load_sample_data.py index 95306a1..2a84b4b 100644 --- a/scripts/load_sample_data.py +++ b/scripts/load_sample_data.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """ -Download the Olist e-commerce SQLite dataset and load it into MySQL. +Download the Fiction Retail SQLite dataset and load it into MySQL. Usage (from repo root): uv run python scripts/load_sample_data.py [options] @@ -14,32 +14,26 @@ import urllib.request from pathlib import Path -OLIST_URL = ( +FICTION_RETAIL_URL = ( "https://github.com/datahub-project/static-assets/raw/main/" - "datasets/olist-ecommerce/olist.db" + "datasets/fiction-retail/fiction-retail.db" ) TABLES = [ - "olist_customers", - "olist_orders", - "olist_order_items", - "olist_order_payments", - "olist_order_reviews", - "olist_products", - "olist_sellers", - "product_category_name_translation", + "customers", + "orders", + "order_items", + "products", + "suppliers", + "inventory", + "warehouses", + "shipments", + "returns", + "promotions", ] # Fields whose content is long free-text — map to MySQL TEXT instead of VARCHAR(255). -# NOTE: product_category_name and product_category_name_english are kept as VARCHAR(255) -# because they are join keys between tables; TEXT columns cannot be indexed without a -# prefix length in MySQL. -LONG_TEXT_FIELDS = { - "review_comment_message", - "review_comment_title", - "product_description_lenght", # intentional Olist typo - "product_description_length", -} +LONG_TEXT_FIELDS: set[str] = set() BATCH_SIZE = 500 @@ -102,7 +96,7 @@ def _build_create_table(table: str, columns: list[tuple]) -> str: def main() -> None: - parser = argparse.ArgumentParser(description="Load Olist sample data into MySQL") + parser = argparse.ArgumentParser(description="Load Fiction Retail sample data into MySQL") parser.add_argument("--host", default="localhost", help="MySQL host (default: localhost)") parser.add_argument("--port", type=int, default=3306, help="MySQL port (default: 3306)") parser.add_argument("--user", default="datahub", help="MySQL user for data loading (default: datahub)") @@ -123,12 +117,12 @@ def main() -> None: sys.exit(1) # --- 1. Download SQLite file --- - print(f"[→] Downloading Olist dataset from GitHub...") + print("[→] Downloading Fiction Retail dataset from GitHub...") with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp: tmp_path = Path(tmp.name) try: - _download(OLIST_URL, tmp_path) + _download(FICTION_RETAIL_URL, tmp_path) print(f"[✓] Downloaded to {tmp_path} ({tmp_path.stat().st_size:,} bytes)") # --- 2. Open SQLite --- diff --git a/tests/integration/test_duckdb_e2e.py b/tests/integration/test_duckdb_e2e.py index 74e101c..c14fccf 100644 --- a/tests/integration/test_duckdb_e2e.py +++ b/tests/integration/test_duckdb_e2e.py @@ -2,8 +2,8 @@ Integration test: DuckDB query engine + DataHub metadata, end-to-end. Setup: - - Creates a temporary DuckDB file with three Olist-like tables - (olist_orders, olist_order_items, olist_products — ~50 rows total). + - Creates a temporary DuckDB file with three tables + (orders, order_items, products — ~50 rows total). - Pushes table descriptions to the configured DataHub instance under platform=duckdb, env=DEV so the agent can discover them via search. @@ -113,16 +113,16 @@ def _delete_entity(gms_url: str, token: str, urn: str) -> None: # Tables we create + their descriptions for DataHub. _TABLES: dict[str, str] = { - "olist_orders": ( + "orders": ( "Order lifecycle records. Columns: order_id (PK), customer_id, " "order_status ('delivered' or 'canceled'), order_purchase_timestamp." ), - "olist_order_items": ( + "order_items": ( "Line items inside each order. Columns: order_id (FK), product_id (FK), " - "price (item price in BRL), freight_value (shipping cost in BRL). " + "price (item price), freight_value (shipping cost). " "Revenue = SUM(price + freight_value) for delivered orders." ), - "olist_products": ( + "products": ( "Product catalog. Columns: product_id (PK), product_category_name " "(e.g. 'electronics', 'furniture', 'clothing', 'books', 'toys')." ), @@ -138,15 +138,15 @@ def _dataset_urn(table: str) -> str: @pytest.fixture(scope="module") def duckdb_path(tmp_path_factory): - """Build a temp DuckDB file with three Olist-like tables.""" + """Build a temp DuckDB file with three test tables.""" import duckdb db_file = tmp_path_factory.mktemp("duckdb") / "test.duckdb" con = duckdb.connect(str(db_file)) - # olist_orders — 50 rows, 5 canceled (i % 10 == 0) + # orders — 50 rows, 5 canceled (i % 10 == 0) con.execute(""" - CREATE TABLE olist_orders ( + CREATE TABLE orders ( order_id VARCHAR PRIMARY KEY, customer_id VARCHAR, order_status VARCHAR, @@ -154,7 +154,7 @@ def duckdb_path(tmp_path_factory): ) """) con.execute(""" - INSERT INTO olist_orders + INSERT INTO orders SELECT 'order_' || i::VARCHAR, 'customer_' || (i % 20)::VARCHAR, @@ -163,10 +163,10 @@ def duckdb_path(tmp_path_factory): FROM range(1, 51) t(i) """) - # olist_order_items — 2 items per order (100 rows) + # order_items — 2 items per order (100 rows) # product_id cycles through 0-9 so each maps to a distinct category con.execute(""" - CREATE TABLE olist_order_items ( + CREATE TABLE order_items ( order_id VARCHAR, product_id VARCHAR, price DOUBLE, @@ -174,7 +174,7 @@ def duckdb_path(tmp_path_factory): ) """) con.execute(""" - INSERT INTO olist_order_items + INSERT INTO order_items SELECT 'order_' || (i % 50 + 1)::VARCHAR, 'product_' || (i % 10)::VARCHAR, @@ -183,15 +183,15 @@ def duckdb_path(tmp_path_factory): FROM range(0, 100) t(i) """) - # olist_products — 10 products across 5 categories (2 products each) + # products — 10 products across 5 categories (2 products each) con.execute(""" - CREATE TABLE olist_products ( + CREATE TABLE products ( product_id VARCHAR PRIMARY KEY, product_category_name VARCHAR ) """) con.executemany( - "INSERT INTO olist_products VALUES (?, ?)", + "INSERT INTO products VALUES (?, ?)", [ ("product_0", "electronics"), ("product_1", "furniture"), @@ -326,7 +326,7 @@ async def test_delivered_vs_canceled_order_count(agent_graph): event_types = {e["event"] for e in events} assert "COMPLETE" in event_types - assert "SQL" in event_types, "Agent should query olist_orders for status counts" + assert "SQL" in event_types, "Agent should query orders for status counts" complete_text = next(e["payload"].get("text", "") for e in events if e["event"] == "COMPLETE") # Dataset has 45 delivered (i % 10 != 0) and 5 canceled (i % 10 == 0) @@ -348,9 +348,7 @@ async def test_engine_list_tables(duckdb_engine): result = tools["list_tables"].invoke({"schema": ""}) tables = orjson.loads(result) table_names = {t["name"] for t in tables} - assert {"olist_orders", "olist_order_items", "olist_products"} == table_names, ( - f"Unexpected tables: {table_names}" - ) + assert {"orders", "order_items", "products"} == table_names, f"Unexpected tables: {table_names}" @pytest.mark.asyncio @@ -361,7 +359,7 @@ async def test_engine_execute_sql(duckdb_engine): tools = {t.name: t for t in duckdb_engine.get_tools()} result = tools["execute_sql"].invoke( { - "sql": "SELECT order_status, COUNT(*) AS cnt FROM olist_orders GROUP BY order_status ORDER BY cnt DESC" + "sql": "SELECT order_status, COUNT(*) AS cnt FROM orders GROUP BY order_status ORDER BY cnt DESC" } ) parsed = orjson.loads(result)