Skip to content

atharvavdeo/LiveAnalyst_Pathway

Repository files navigation

Live Social Analyst | Real-Time Pathway ETL Intelligence

Pathway Python 3.10+ FastAPI Groq Gemini

The Problem with Stale Knowledge

In the current landscape of artificial intelligence, many applications powered by Large Language Models (LLMs) are constrained by a fundamental limitation: their knowledge is static. Retrieval-Augmented Generation (RAG) systems, while powerful, often rely on a knowledge base that is a mere snapshot in time.

This creates a "knowledge cutoff," where an AI assistant can become instantly obsolete. Imagine a financial chatbot unaware of a market-moving announcement made minutes ago, or a customer service bot providing information based on documentation that was updated yesterday. In a world that operates in real-time, these delays are critical failures.

The Paradigm Shift to "Live AI"

A new paradigm is emerging to address this challenge: "Live AI." This represents a fundamental shift from static, retrospective intelligence to dynamic systems that perceive, learn, and reason in real-time.

Live Social Analyst is perpetually synchronized with the latest version of reality, processing information as it is created, modified, or deleted. This project puts us at the forefront of this transformation.

Powered by Pathway: The Engine for Live AI

The core technology driving this application is Pathway, a data processing framework designed specifically for building AI pipelines over live data streams.

It allows us to define complex AI workflows that process information incrementally, enabling extremely low-latency updates. Its unique architecture unifies batch and streaming data, meaning we can ingest thousands of global sources and instantly reflect them in our RAG pipeline without manual restarts or batch re-indexing.


How It Works: The "Live AI" Pipeline

This application connects to a dynamic, continuously updating array of data sources and reflects the absolute latest state of reality in real-time.

  1. Massive Real-Time Ingestion:

    • 1000+ RSS Feeds (OPML): Continuously scanning global news.
    • NewsData.io & GNews: Integrating external news APIs.
    • HackerNews & Social Streams: Monitoring tech & social discussions.
  2. Zero-Latency Processing:

    • As soon as a news item is detected, it is instantly streamed into the Pathway engine.
    • The engine deduplicates, normalizes, and embeds the text on-the-fly.
    • New information is immediately indexable by the RAG systemβ€”no waiting for nightly batches.
  3. Dynamic Context Retrieval:

    • When you ask a question ("What just happened in Tech?"), the RAG pipeline queries the live index.
    • It retrieves context that may have been created seconds ago.
    • The LLM generates an answer based on what is happening right now, not what happened yesterday.

πŸ“Έ App Previews

Live Dashboard RAG Search Architecture

πŸ› οΈ Integrated Pathway Technologies

This project strictly adheres to the Pathway Live Data Framework, utilizing specific APIs to achieve millisecond-latency streaming.

1. Unified Data Ingestion (pw.io Connectors)

We leverage standard Pathway connectors to ingest data from the web and file system:

  • RSS/OPML Stream: We use pw.io.fs.read to watch the OPML file for changes and pw.io.python.read to stream the actual RSS content as a standard table.
  • HTTP API Connectors: For Twitter/X and NewsData.io, we implement pw.io.http.read (via custom wrappers) to treat REST endpoints as infinite, appending tables.

2. Live Transformations (pw.temporal & pw.state)

The engine processes data incrementally using Pathway's Table API:

  • Incremental Deduplication: We use pw.io.deduplicate(pathway.table, col=[url]) to strictly enforce uniqueness on the input stream.
  • Sliding Window Aggregation: pw.temporal.sliding(duration=5, step=1) is used to group news items by 5-minute windows, allowing us to compute "velocity" and "cluster" metrics.

3. Real-Time RAG (pw.xpacks.llm)

  • Vector Indexing: Incoming text is embedded using pw.xpacks.llm.embedders.GeminiEmbedder and indexed in a live KNN index.
  • Context Retrieval: Queries are executed against this live index using pw.xpacks.llm.retrievers.knn, ensuring the context includes data from milliseconds ago.

Architecture Diagram

graph LR
    User[User Frontend]
    API[FastAPI Backend]

    subgraph Inputs ["Pathway Connectors (pw.io)"]
        direction TB
        OPML["OPML Stream<br/>(pw.io.fs.read)"]
        HTTP["HTTP API Stream<br/>(pw.io.http.read)"]
        Custom["Custom Feed<br/>(pw.io.python.read)"]
    end

    subgraph Core ["Pathway ETL Engine"]
        direction TB
        Dedup["pw.io.deduplicate"]
        Window["pw.temporal.sliding"]
        Vector["KNN Index"]
    end

    subgraph AI ["LLM xPack (pw.xpacks.llm)"]
        direction TB
        RAG["pw.xpacks.llm.retrievers"]
        Gen[Gemini 1.5]
    end

    Inputs -->|Stream| Core
    Core -->|Transform| Vector
    Vector -->|Retrieve| AI
    User --> API
    API --> AI
    AI --> API
    API --> User
Loading

🧠 Deep Dive: The Life of a Data Point

This detailed sequence diagram illustrates exactly how a news item travels from a source to the LLM in milliseconds, highlighting the Pathway Engine's internal mechanics.

sequenceDiagram
    autonumber
    participant Internet as "Refreshed Data Source"
    participant Connector as "Pathway Connector"
    participant InputTable as "pw.Table (Input)"
    participant Transform as "Transformation Engine"
    participant State as "Global Deduplication State"
    participant Vector as "KNN Vector Index"
    participant User as "User Query"
    participant RAG as "RAG Retriever"

    Note over Internet, Connector: T+0ms: News Published
    Internet->>Connector: Poll/Stream New Item
    Connector->>InputTable: Append Row (JSON)
    
    Note over InputTable, Transform: STREAMING MODE
    InputTable->>Transform: Propagate Update (New Row)
    
    Transform->>State: Check Existence (URL/Title Hash)
    alt Is Duplicate?
        State-->>Transform: TRUE (Ignore)
    else Is New?
        State-->>Transform: FALSE (Process)
        Transform->>Transform: Text Normalization & Cleaning
        Transform->>Transform: Compute Embeddings (Gemini)
        Transform->>Vector: UPSERT Vector
    end
    
    Note over Vector, User: T+200ms: Ready for Query
    
    User->>RAG: "What just happened?"
    RAG->>Vector: KNN Search (k=5)
    Vector-->>RAG: Return Top Matches
    RAG->>User: Generate Answer with Context
Loading

Installation & Usage

Follow these steps to deploy the system locally.

1. Prerequisites

  • Python: Version 3.10 or higher.
  • API Keys: You need keys for:
    • Gemini (Google AI)
    • Groq (Llama 3 Inference)
    • GNews (Historical Data)

2. Configuration

  1. Clone the repository:
    git clone https://github.com/your-repo/LiveSocialAnalyst.git
    cd LiveSocialAnalyst
  2. Create a .env file (or update config.yaml) with your credentials:
    GEMINI_API_KEY=your_key_here
    GROQ_API_KEY=your_key_here
    GNEWS_API_KEY=your_key_here

3. Install Dependencies

Install the required Python packages:

pip install -r requirements.txt

4. Execution

Run the main application script. This initializes the FastAPI server and spawns the background Pathway daemon threads.

python3 app_pathway.py

Expected Output:

INFO: Uvicorn running on http://0.0.0.0:8000 OPML: Starting to parse 2000+ RSS feeds... Injected 10 High-Frequency Firehose Feeds.


API Endpoints Reference

The system exposes a RESTful API for frontend integration and external webhooks.

Method Endpoint Description Payload / Params
GET / Landing Page None
GET /app Main Dashboard Application None
GET /data Fetch current engine stats and real-time buffer (No-Cache) None
POST /fetch_news Get categorical news (Business, Tech, etc.) {"category": "business"}
POST /query Perform RAG Analysis (Search) {"query": "Trump"}
POST /refresh_opml Burst Signal: Triggers "Firehose" instant ingestion None

Project Structure

A clean, modular architecture designed for scalability.

LiveSocialAnalyst/
β”œβ”€β”€ app_pathway.py         # MAIN ENTRY POINT: Server & Thread Orchestrator
β”œβ”€β”€ config.yaml            # Global Configuration
β”œβ”€β”€ requirements.txt       # Dependency List
β”œβ”€β”€ .env                   # Secrets (GitIgnored)
β”‚
β”œβ”€β”€ ingest/                # PATHWAY CONNECTORS (Data Ingestion)
β”‚   β”œβ”€β”€ opml_loader.py     # High-Throughput Burst Ingestor
β”‚   β”œβ”€β”€ gnews_connector.py
β”‚   β”œβ”€β”€ firecrawl_connector.py
β”‚   β”œβ”€β”€ reddit_stream.py
β”‚   └── hackernews_stream.py
β”‚
β”œβ”€β”€ pipeline/              # INTELLIGENCE LAYER
β”‚   └── gemini_rag.py      # Hybrid RAG & LLM Logic
β”‚
β”œβ”€β”€ frontend/              # PRESENTATION LAYER
β”‚   β”œβ”€β”€ index.html         # Main SPA Real-Time Dashboard
β”‚   └── assets/
β”‚
└── data/                  # PERSISTENCE LAYER
    β”œβ”€β”€ database.py        # SQLite Interface
    └── storage/           # Local vector stores

License

MIT License. Built for High-Performance Data Engineering.

About

Built for IIT-KGP's Megalith 2026. Live Social Analyst is a real-time Retrieval-Augmented Generation (RAG) system powered by Pathway. It ingests live data from thousands of sources, processes it incrementally, and enables an LLM to answer questions about events happening right now.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors