Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ cd analytics-agent
bash quickstart.sh
```

The script starts a local DataHub instance, loads the Olist e-commerce sample dataset and catalog metadata, then builds and launches Analytics Agent at **http://localhost:8100**. Postgres data is persisted to `~/.datahub/analytics-agent/postgres-data/` so it survives container restarts.
The script starts a local DataHub instance, loads the Fiction Retail sample dataset and catalog metadata, then builds and launches Analytics Agent at **http://localhost:8100**. Postgres data is persisted to `~/.datahub/analytics-agent/postgres-data/` so it survives container restarts.

**Using AWS Bedrock?** Export `LLM_PROVIDER=bedrock` before running the script. The script will verify your AWS credentials and Bedrock access before starting the container, and mount `~/.aws` read-only so boto3 picks up your profiles and SSO cache automatically.

Expand Down
2 changes: 1 addition & 1 deletion backend/src/analytics_agent/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async def _run_all_seeds() -> None:
"--demo",
is_flag=True,
default=False,
help="Full demo: start DataHub, load Olist sample data, and launch the agent.",
help="Full demo: start DataHub, load Fiction Retail sample data, and launch the agent.",
)
@click.option(
"--reconfigure",
Expand Down
2 changes: 1 addition & 1 deletion backend/src/analytics_agent/demo/config.demo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ context_platforms:

engines:
- type: mysql
name: olist_ecommerce
name: fiction_retail
connection:
dialect: mysql+pymysql
host: "${MYSQL_HOST}"
Expand Down
132 changes: 79 additions & 53 deletions backend/src/analytics_agent/demo/ingest_metadata.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
"""
Register the Olist MySQL tables in DataHub via GraphQL:
Register the Fiction Retail MySQL tables in DataHub via GraphQL:
1. createIngestionSource — upsert a MySQL ingestion recipe
2. createIngestionExecutionRequest — run it inside DataHub's executor
3. Poll until SUCCESS, then patch in human-readable descriptions
Expand All @@ -21,14 +21,16 @@
import urllib.request

TABLE_DESCRIPTIONS: dict[str, str] = {
"olist_customers": "Brazilian e-commerce customers with city and state information",
"olist_orders": "Order lifecycle records including status, purchase timestamp, and delivery timestamps",
"olist_order_items": "Line items in each order, linking orders to products and sellers with price and freight",
"olist_order_payments": "Payment method and installment details for each order",
"olist_order_reviews": "Customer satisfaction reviews with scores and comments per order",
"olist_products": "Product catalog with category, dimensions, and weight",
"olist_sellers": "Marketplace sellers with city and state location",
"product_category_name_translation": "Portuguese to English category name translations",
"customers": "Retail customers with contact details, location, and segment classification",
"orders": "Order records including status, payment method, and total amount",
"order_items": "Line items linking orders to products with quantity, unit price, and discount",
"products": "Product catalog with category, brand, price, and physical dimensions",
"suppliers": "Supplier directory with country and contract information",
"inventory": "Stock levels per product and warehouse with reorder thresholds",
"warehouses": "Warehouse locations with capacity and operational details",
"shipments": "Shipment tracking records with carrier, dates, and delivery state",
"returns": "Return and refund records with reason codes",
"promotions": "Promotional campaigns with discount rules, validity windows, and category scope",
}


Expand Down Expand Up @@ -95,7 +97,7 @@ def _upsert_ingestion_source(
input_fields = {
"name": "Analytics Agent Demo — MySQL",
"type": "mysql",
"description": f"Olist e-commerce sample data in {database}",
"description": f"Fiction Retail sample data in {database}",
"config": {
"recipe": recipe,
"executorId": "default",
Expand Down Expand Up @@ -196,7 +198,7 @@ def _patch_descriptions(gms_url: str, token: str, database: str) -> None:


def _seed_demo_context(gms_url: str, token: str, database: str) -> None:
"""Seed demo-ready tags, glossary terms, and table ownership for Olist tables."""
"""Seed demo-ready tags, glossary terms, and table ownership for Fiction Retail tables."""
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
AuditStampClass,
Expand Down Expand Up @@ -224,9 +226,10 @@ def _dataset_urn(table: str) -> str:

# ── 1. Create tag entities ──────────────────────────────────────────────
TAGS: dict[str, str] = {
"pii": "Contains personally identifiable information (customer IDs, locations, free-text review comments)",
"identifier": "Primary or foreign key columns used for joining tables",
"financial": "Contains monetary values (prices, payments, freight costs)",
"pii": "Contains personally identifiable information",
"financial": "Contains monetary values",
"transactional": "Records individual business transactions",
"reference_data": "Lookup or configuration data that changes infrequently",
}
for tag_name, description in TAGS.items():
try:
Expand All @@ -244,32 +247,46 @@ def _dataset_urn(table: str) -> str:

# ── 2. Create glossary term entities ────────────────────────────────────
TERMS: dict[str, dict[str, str]] = {
"revenue": {
"name": "Revenue",
"order_status": {
"name": "Order Status",
"definition": (
"Total sales value from delivered orders. Calculated as SUM(price + freight_value) "
"from olist_order_items, filtered to orders where olist_orders.order_status = 'delivered'. "
"Non-delivered orders (canceled, unavailable) are excluded. For per-seller revenue, group by "
"olist_order_items.seller_id. For per-category revenue, join via product_id to olist_products "
"and then to product_category_name_translation for English category names."
"Lifecycle state of an order. Values: pending, processing, shipped, delivered, "
"cancelled, returned."
),
},
"delivery_sla": {
"name": "Delivery SLA",
"customer_segment": {
"name": "Customer Segment",
"definition": (
"Delivery performance metric. An order meets SLA if order_delivered_customer_date <= "
"order_estimated_delivery_date. SLA breach = actual delivery later than estimated. "
"Per-seller SLA breach rate = count(breached orders) / count(delivered orders) for orders "
"shipped by that seller. Only computed for delivered orders; canceled and undelivered orders "
"are excluded from the denominator."
"Classification of a customer based on purchase behavior or profile. "
"Used to target promotions and personalize recommendations."
),
},
"active_seller": {
"name": "Active Seller",
"discount_pct": {
"name": "Discount Percentage",
"definition": (
"A seller with at least one delivered order in the trailing 30 days, measured from the most "
"recent order_purchase_timestamp in the dataset. For historical Olist data (2016-2018), "
"'trailing 30 days' is computed relative to MAX(order_purchase_timestamp), not today's date."
"Percentage reduction applied to the unit price of an order item. "
"Sourced from the applied promotion or negotiated directly."
),
},
"reorder_threshold": {
"name": "Reorder Threshold",
"definition": (
"Minimum quantity_on_hand below which a restock should be triggered "
"for a given product at a warehouse."
),
},
"return_reason_code": {
"name": "Return Reason Code",
"definition": (
"Standardized code classifying why an item was returned. "
"Used for quality tracking and supplier performance analysis."
),
},
"shipment_state": {
"name": "Shipment State",
"definition": (
"Current status of a shipment in the fulfillment pipeline. "
"Values: pending, in_transit, delivered, failed."
),
},
}
Expand Down Expand Up @@ -298,28 +315,35 @@ def _dataset_urn(table: str) -> str:
# so GlobalTagsClass and GlossaryTermsClass are each written exactly once —
# a second emit of either aspect would replace the first (overwrite bug).
TAG_MAP: dict[str, list[str]] = {
"olist_customers": ["pii", "identifier"],
"olist_orders": ["identifier"],
"olist_order_items": ["identifier", "financial"],
"olist_order_payments": ["financial"],
"olist_order_reviews": ["pii"],
"olist_products": ["identifier"],
"olist_sellers": ["pii", "identifier"],
"customers": ["pii"],
"orders": ["financial", "transactional"],
"order_items": ["financial", "transactional"],
"shipments": ["transactional"],
"returns": ["financial", "transactional"],
"products": ["reference_data"],
"suppliers": ["reference_data"],
"warehouses": ["reference_data"],
"promotions": ["reference_data"],
}
TERM_MAP: dict[str, list[str]] = {
"olist_order_items": ["revenue"],
"olist_orders": ["revenue", "delivery_sla"],
"olist_sellers": ["delivery_sla", "active_seller"],
"orders": ["order_status"],
"customers": ["customer_segment"],
"order_items": ["discount_pct"],
"inventory": ["reorder_threshold"],
"returns": ["return_reason_code"],
"shipments": ["shipment_state"],
}
OWNER_MAP: dict[str, str] = {
"olist_sellers": "logistics_team",
"olist_order_items": "logistics_team",
"olist_order_payments": "finance_team",
"olist_order_reviews": "customer_experience_team",
"olist_customers": "customer_experience_team",
"olist_products": "product_team",
"product_category_name_translation": "product_team",
"olist_orders": "data_platform_team",
"customers": "customer_team",
"orders": "commerce_team",
"order_items": "commerce_team",
"products": "catalog_team",
"suppliers": "catalog_team",
"inventory": "logistics_team",
"warehouses": "logistics_team",
"shipments": "logistics_team",
"returns": "finance_team",
"promotions": "marketing_team",
}
for table in sorted(set(TAG_MAP) | set(TERM_MAP) | set(OWNER_MAP)):
aspects: list = []
Expand Down Expand Up @@ -367,7 +391,9 @@ def _dataset_urn(table: str) -> str:


def main() -> None:
parser = argparse.ArgumentParser(description="Ingest Olist metadata into DataHub via GraphQL")
parser = argparse.ArgumentParser(
description="Ingest Fiction Retail metadata into DataHub via GraphQL"
)
parser.add_argument("--gms-url", default="http://localhost:8080")
parser.add_argument("--token", default="")
parser.add_argument("--database", default="analytics_agent_demo")
Expand Down Expand Up @@ -414,7 +440,7 @@ def main() -> None:
print()
print("[→] Seeding demo context (tags, glossary terms, ownership)...")
_seed_demo_context(args.gms_url, args.token, args.database)
print("[✓] Demo context seeded — 3 tags, 3 glossary terms, 8 table owners")
print("[✓] Demo context seeded — 4 tags, 6 glossary terms, 10 table owners")

print()
print(f"[✓] Done — {len(TABLE_DESCRIPTIONS)} tables indexed and described in DataHub.")
Expand Down
40 changes: 16 additions & 24 deletions backend/src/analytics_agent/demo/load_sample_data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
"""
Download the Olist e-commerce SQLite dataset and load it into MySQL.
Download the Fiction Retail SQLite dataset and load it into MySQL.

Usage (from repo root):
uv run python scripts/load_sample_data.py [options]
Expand All @@ -15,31 +15,23 @@
import urllib.request
from pathlib import Path

OLIST_URL = (
"https://github.com/datahub-project/static-assets/raw/main/datasets/olist-ecommerce/olist.db"
)
FICTION_RETAIL_URL = "https://github.com/datahub-project/static-assets/raw/main/datasets/fiction-retail/fiction-retail.db"

TABLES = [
"olist_customers",
"olist_orders",
"olist_order_items",
"olist_order_payments",
"olist_order_reviews",
"olist_products",
"olist_sellers",
"product_category_name_translation",
"customers",
"orders",
"order_items",
"products",
"suppliers",
"inventory",
"warehouses",
"shipments",
"returns",
"promotions",
]

# Fields whose content is long free-text — map to MySQL TEXT instead of VARCHAR(255).
# NOTE: product_category_name and product_category_name_english are kept as VARCHAR(255)
# because they are join keys between tables; TEXT columns cannot be indexed without a
# prefix length in MySQL.
LONG_TEXT_FIELDS = {
"review_comment_message",
"review_comment_title",
"product_description_lenght", # intentional Olist typo
"product_description_length",
}
LONG_TEXT_FIELDS: set[str] = set()

BATCH_SIZE = 500

Expand Down Expand Up @@ -105,7 +97,7 @@ def _build_create_table(table: str, columns: list[tuple]) -> str:


def main() -> None:
parser = argparse.ArgumentParser(description="Load Olist sample data into MySQL")
parser = argparse.ArgumentParser(description="Load Fiction Retail sample data into MySQL")
parser.add_argument("--host", default="localhost", help="MySQL host (default: localhost)")
parser.add_argument("--port", type=int, default=3306, help="MySQL port (default: 3306)")
parser.add_argument(
Expand Down Expand Up @@ -136,12 +128,12 @@ def main() -> None:
sys.exit(1)

# --- 1. Download SQLite file ---
print("[→] Downloading Olist dataset from GitHub...")
print("[→] Downloading Fiction Retail dataset from GitHub...")
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
tmp_path = Path(tmp.name)

try:
_download(OLIST_URL, tmp_path)
_download(FICTION_RETAIL_URL, tmp_path)
print(f"[✓] Downloaded to {tmp_path} ({tmp_path.stat().st_size:,} bytes)")

# --- 2. Open SQLite ---
Expand Down
18 changes: 9 additions & 9 deletions backend/src/analytics_agent/quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ def _bootstrap_and_launch(config_dir: Path, port: int, *, open_setup: bool = Fal
sys.exit(1)


# ── Demo mode (full DataHub + Olist sample data) ───────────────────────────────
# ── Demo mode (full DataHub + Fiction Retail sample data) ──────────────────────

# On Linux, host.docker.internal doesn't resolve — use Docker's default bridge gateway.
_HOST_INTERNAL = "host.docker.internal" if sys.platform == "darwin" else "172.17.0.1"
Expand Down Expand Up @@ -501,7 +501,7 @@ def _provision_datahub_token() -> str:
return ""


def _load_olist_data() -> None:
def _load_fiction_retail_data() -> None:
"""Run load_sample_data.py from the bundled demo package."""
from analytics_agent.demo import load_sample_data # noqa: F401

Expand All @@ -524,9 +524,9 @@ def _load_olist_data() -> None:
check=False,
)
if result.returncode != 0:
click.echo(" ✗ Olist data loading failed.", err=True)
click.echo(" ✗ Fiction Retail data loading failed.", err=True)
sys.exit(1)
click.echo(" ✓ Olist sample data loaded")
click.echo(" ✓ Fiction Retail sample data loaded")


def _ingest_metadata(gms_token: str) -> None:
Expand Down Expand Up @@ -556,11 +556,11 @@ def _ingest_metadata(gms_token: str) -> None:
if result.returncode != 0:
click.echo(" ✗ Metadata ingestion failed.", err=True)
sys.exit(1)
click.echo(" ✓ Olist metadata ingested into DataHub")
click.echo(" ✓ Fiction Retail metadata ingested into DataHub")


def _write_demo_config(config_dir: Path, gms_token: str, llm_env: dict[str, str]) -> None:
"""Write .env and config.yaml for the demo (DataHub + Olist MySQL)."""
"""Write .env and config.yaml for the demo (DataHub + Fiction Retail MySQL)."""
import shutil

# .env
Expand Down Expand Up @@ -589,7 +589,7 @@ def _write_demo_config(config_dir: Path, gms_token: str, llm_env: dict[str, str]


def run_demo(port: int = 8100) -> None:
"""Full demo: DataHub quickstart + Olist data + analytics agent."""
"""Full demo: DataHub quickstart + Fiction Retail data + analytics agent."""
click.echo(
textwrap.dedent("""
╔══════════════════════════════════════════╗
Expand Down Expand Up @@ -628,8 +628,8 @@ def run_demo(port: int = 8100) -> None:
click.echo("\n→ Starting DataHub…")
_start_datahub()

click.echo("\n→ Loading Olist sample data…")
_load_olist_data()
click.echo("\n→ Loading Fiction Retail sample data…")
_load_fiction_retail_data()

click.echo("\n→ Ingesting metadata into DataHub…")
gms_token = _provision_datahub_token()
Expand Down
2 changes: 1 addition & 1 deletion config.demo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ context_platforms:

engines:
- type: mysql
name: olist_ecommerce
name: fiction_retail
connection:
dialect: mysql+pymysql
host: "${MYSQL_HOST}"
Expand Down
2 changes: 1 addition & 1 deletion config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ engines: []

# MySQL example:
# - type: mysql
# name: olist_ecommerce
# name: fiction_retail
# connection:
# dialect: mysql+pymysql
# host: "${MYSQL_HOST}"
Expand Down
Loading
Loading