Skip to content

Bottlenecks#277

Open
dhruvgupta-meesho wants to merge 11 commits intodevelopfrom
bottlenecks
Open

Bottlenecks#277
dhruvgupta-meesho wants to merge 11 commits intodevelopfrom
bottlenecks

Conversation

@dhruvgupta-meesho
Copy link
Contributor

@dhruvgupta-meesho dhruvgupta-meesho commented Feb 9, 2026

🔁 Pull Request Template – BharatMLStack

Please fill out the following sections to help us review your changes efficiently.


📌 Summary

e.g., Adds optimizes Redis fetch latency in online-feature-store, or improves search UI responsiveness in trufflebox-ui.


📂 Modules Affected

  • horizon (Real-time systems / networking)
  • online-feature-store (Feature serving infra)
  • trufflebox-ui (Admin panel / UI)
  • infra (Docker, CI/CD, GCP/AWS setup)
  • docs (Documentation updates)
  • Other: ___________

✅ Type of Change

  • Feature addition
  • Bug fix
  • Infra / build system change
  • Performance improvement
  • Refactor
  • Documentation
  • Other: ___________

📊 Benchmark / Metrics (if applicable)

Summary by CodeRabbit

  • New Features

    • Spark-based distributed decoding with partitioning/batching controls
    • Single-entity Arrow and Parquet feature decoding helpers
    • CLI: Spark master, input-format flags (base64/hex), decompress toggle, output modes (JSON/CSV/stream) and summary/preview
  • Utilities

    • Spark-aware float formatting for dataframes
    • Exposed cache-clearing utility
  • Documentation

    • Complete README overhaul with examples and API reference
  • Chores

    • Project version bumped and core dependency updated for Spark integration

@coderabbitai
Copy link

coderabbitai bot commented Feb 9, 2026

Walkthrough

Core decoding moved from Pandas to Spark: APIs now require a SparkSession and return Spark DataFrames; distributed decoding implemented via batch map-style processing with per-row metadata/schema lookup and Spark-native feature decoding and formatting.

Changes

Cohort / File(s) Summary
Core API & Public Surface
py-sdk/inference_logging_client/inference_logging_client/__init__.py
Replaced Pandas with Spark types: decode_mplog now requires SparkSession and returns SparkDataFrame; decode_mplog_dataframe accepts SparkDataFrame+SparkSession and adds num_partitions/max_records_per_batch; added _extract_metadata_byte, TYPE_CHECKING Spark imports, expanded exports (Spark helpers, format_dataframe_floats).
Distributed Decoding Flow & Cache
py-sdk/inference_logging_client/inference_logging_client/__init__.py, .../io.py, .../types.py
Implemented distributed batch decoding (mapInPandas-like): per-row metadata extraction, per-row schema lookup with caching, Spark-friendly per-row decoding. DecodedMPLog dataclass extended with entities, parent_entity, metadata_byte, compression_enabled, version, format_type. Added clear_schema_cache().
CLI Integration
py-sdk/inference_logging_client/inference_logging_client/cli.py
CLI now builds/configures SparkSession (--spark-master), supports --no-decompress, --json, --output, --format; passes Spark to decode_mplog, formats floats via Spark helper, writes outputs via Spark (single-partition files or streamed JSON), and prints a lazy metadata summary; ensures Spark shutdown.
Format Readers / Feature Decoders
py-sdk/inference_logging_client/inference_logging_client/formats.py
Added decode_arrow_features and decode_parquet_features to decode single-entity Arrow IPC and Parquet payloads into feature-name→value dicts; integrate with decode_feature_value; handle empty/missing values.
Utilities & Float Formatting
py-sdk/inference_logging_client/inference_logging_client/utils.py
format_dataframe_floats refactored to operate on Spark DataFrames (uses Spark round() for FloatType/DoubleType); other type helpers preserved.
Decoder & Exceptions
py-sdk/inference_logging_client/inference_logging_client/decoder.py, .../exceptions.py
decoder.py whitespace/quote normalization only; exceptions now include explicit pass statements—no behavior changes.
Docs, Version & Dependencies
py-sdk/inference_logging_client/readme.md, py-sdk/inference_logging_client/pyproject.toml
README expanded into full documentation; project version bumped to 0.2.2 and dependency pandas replaced by pyspark>=3.3.0.
Pre-commit Hook
.pre-commit-config.yaml
Pre-commit local hook script path updated for trufflehog entry to pre-commit-scripts/runner.sh.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Spark as SparkSession
    participant Worker as Executor (batch worker)
    participant Cache as SchemaCache
    participant DF as SparkDataFrame

    Client->>Spark: call decode_mplog_dataframe(df, spark, ...)
    activate Spark
    Spark->>Spark: repartition/prepare batches
    Spark->>Worker: run batch decode (mapInPandas)
    activate Worker
    loop per row in batch
        Worker->>Worker: extract metadata (_extract_metadata_byte)
        Worker->>Cache: get_feature_schema(mp_config_id, version)
        Cache-->>Worker: schema (cached/fetched)
        Worker->>Worker: decode features (proto/arrow/parquet)
        Worker->>Worker: format floats
        Worker-->>DF: yield decoded row dict
    end
    deactivate Worker
    Spark->>DF: assemble decoded rows into SparkDataFrame
    deactivate Spark
    DF-->>Client: return decoded SparkDataFrame
Loading
🚥 Pre-merge checks | ✅ 1
✅ Passed checks (1 passed)
Check name Status Explanation
Dynamic Configuration Validation ✅ Passed No changes to application-dyn-*.yml files found. PR contains only Python source code, documentation, and configuration updates.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

📝 Coding Plan
  • Generate coding plan for human review comments

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
py-sdk/inference_logging_client/inference_logging_client/types.py (1)

44-49: ⚠️ Potential issue | 🟠 Major

Python 3.8 compatibility issue with type hints.

The list[str] syntax for type hints is only available at runtime starting from Python 3.9. Since pyproject.toml declares requires-python = ">=3.8", this will cause a TypeError on Python 3.8 when the module is imported.

🐛 Proposed fix for Python 3.8 compatibility

Add the annotations import at the top of the file:

 """Type definitions for inference-logging-client."""

+from __future__ import annotations
 from enum import Enum
 from dataclasses import dataclass, field

Alternatively, use typing.List:

+from typing import List
 from dataclasses import dataclass, field

 # ... later in DecodedMPLog ...
-    entities: list[str] = field(default_factory=list)
-    parent_entity: list[str] = field(default_factory=list)
+    entities: List[str] = field(default_factory=list)
+    parent_entity: List[str] = field(default_factory=list)
py-sdk/inference_logging_client/inference_logging_client/utils.py (1)

178-190: ⚠️ Potential issue | 🟠 Major

Python 3.8 compatibility issue with return type annotation.

The tuple[bool, int, int] return type syntax requires Python 3.9+. This will fail on Python 3.8 which is listed as a supported version.

🐛 Proposed fix

Add at the top of the file:

 """Utility functions for type normalization and formatting."""

+from __future__ import annotations
 from typing import Optional

Or use typing.Tuple:

-from typing import Optional
+from typing import Optional, Tuple

-def unpack_metadata_byte(metadata_byte: int) -> tuple[bool, int, int]:
+def unpack_metadata_byte(metadata_byte: int) -> Tuple[bool, int, int]:
py-sdk/inference_logging_client/inference_logging_client/io.py (1)

22-24: ⚠️ Potential issue | 🟠 Major

Python 3.8 compatibility issue with generic type annotations.

The OrderedDict[tuple[str, int], list[FeatureInfo]] syntax requires Python 3.9+. This will cause a TypeError on Python 3.8.

🐛 Proposed fix

Add at the top of the file:

 """I/O operations for schema fetching and protobuf parsing."""

+from __future__ import annotations
 import os

Or use typing generics:

-from collections import OrderedDict
+from collections import OrderedDict
+from typing import Dict, List, Tuple

-_schema_cache: OrderedDict[tuple[str, int], list[FeatureInfo]] = OrderedDict()
+_schema_cache: "OrderedDict[Tuple[str, int], List[FeatureInfo]]" = OrderedDict()
🤖 Fix all issues with AI agents
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 306-309: The empty-DataFrame branch currently returns
spark.createDataFrame([], StructType([])) which drops the expected output
schema; in the df.limit(1).count() == 0 block replace the empty StructType with
the actual output schema used by the transformation (e.g., reuse the computed
output_schema or derive it by applying the same schema-mapping logic used for
non-empty inputs) and return spark.createDataFrame([], output_schema) so the
returned DataFrame preserves entity_id, metadata and feature columns; update the
code around df.limit(1).count() to obtain or reference output_schema before
creating the empty DataFrame.

In `@py-sdk/inference_logging_client/inference_logging_client/cli.py`:
- Around line 171-176: The comment above the df.count() call is inaccurate
because the code performs a full count; either update the comment to reflect
that df.count() is executed, or change the implementation to the advertised
optimization by replacing row_count = df.count() with a cheap emptiness check
such as row_count = df.limit(1).count() (or similar) and only run a full count
when necessary; update the printed message using row_count and retain the
existing exception handling around the count to preserve current behavior.

In `@py-sdk/inference_logging_client/pyproject.toml`:
- Line 31: The package metadata lists Python 3.12 in classifiers but requires
"pyspark>=3.3.0" which doesn't officially support 3.12; update the dependency or
classifiers to be consistent: either change the pyspark requirement string in
pyproject.toml from "pyspark>=3.3.0" to "pyspark>=3.5.0" to enable Python 3.12
support, or remove the Python 3.12 classifier entries from the package
classifiers so they match pyspark>=3.3.0 compatibility.
🧹 Nitpick comments (2)
py-sdk/inference_logging_client/readme.md (1)

628-638: Add language specifiers to diagram code blocks.

Several code blocks containing ASCII art diagrams and text layouts are missing language specifiers, which triggers markdownlint warnings. Consider adding text or plaintext as the language identifier.

📝 Example fix for diagram blocks
-```
+```text
 ┌─────────────────────────────────────────────────────────────┐
 │ Byte 0: Generated Flag (1 = no generated values)           │

This applies to multiple diagram blocks throughout the document (lines 628, 647, 666, 682, 707, 818, 984, 999, 1058, 1074, 1084).

py-sdk/inference_logging_client/inference_logging_client/__init__.py (1)

488-489: High default partition count may be excessive for smaller DataFrames.

The default of 10,000 partitions is optimized for large rows (3-5 MB each per documentation), but could introduce unnecessary overhead for smaller DataFrames. Consider documenting this trade-off more prominently or adjusting the default based on estimated data size.

Comment on lines +306 to +309
# Check if DataFrame is empty (avoid full count: use limit(1))
if df.limit(1).count() == 0:
from pyspark.sql.types import StructType
return spark.createDataFrame([], StructType([]))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Empty DataFrame returns empty schema instead of expected output schema.

When the input DataFrame is empty, this returns a DataFrame with an empty StructType(), losing the expected output schema (entity_id + metadata columns + feature columns). This may cause issues for downstream operations that depend on a consistent schema.

Suggested fix to preserve schema
     # Check if DataFrame is empty (avoid full count: use limit(1))
     if df.limit(1).count() == 0:
-        from pyspark.sql.types import StructType
-        return spark.createDataFrame([], StructType([]))
+        from pyspark.sql.types import StringType, StructField, StructType
+        # Return empty DataFrame with at least entity_id column
+        fields = [StructField("entity_id", StringType(), True)]
+        return spark.createDataFrame([], StructType(fields))
🤖 Prompt for AI Agents
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 306 - 309, The empty-DataFrame branch currently returns
spark.createDataFrame([], StructType([])) which drops the expected output
schema; in the df.limit(1).count() == 0 block replace the empty StructType with
the actual output schema used by the transformation (e.g., reuse the computed
output_schema or derive it by applying the same schema-mapping logic used for
non-empty inputs) and return spark.createDataFrame([], output_schema) so the
returned DataFrame preserves entity_id, metadata and feature columns; update the
code around df.limit(1).count() to obtain or reference output_schema before
creating the empty DataFrame.

Comment on lines +171 to +176
# Avoid full count() for huge DataFrames: use limit(1).count() for empty check only
try:
row_count = df.count()
print(f"Rows: {row_count}", file=sys.stderr)
except Exception:
print("Rows: (count skipped - use --output to write without summary)", file=sys.stderr)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Misleading comment contradicts actual behavior.

The comment states "Avoid full count() for huge DataFrames" but the code immediately performs df.count(), which is a full count operation. Consider updating the comment to accurately reflect the behavior, or implementing the optimization mentioned.

Suggested comment fix
-        # Avoid full count() for huge DataFrames: use limit(1).count() for empty check only
+        # Note: count() can be slow for large DataFrames; errors are caught gracefully
         try:
             row_count = df.count()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Avoid full count() for huge DataFrames: use limit(1).count() for empty check only
try:
row_count = df.count()
print(f"Rows: {row_count}", file=sys.stderr)
except Exception:
print("Rows: (count skipped - use --output to write without summary)", file=sys.stderr)
# Note: count() can be slow for large DataFrames; errors are caught gracefully
try:
row_count = df.count()
print(f"Rows: {row_count}", file=sys.stderr)
except Exception:
print("Rows: (count skipped - use --output to write without summary)", file=sys.stderr)
🤖 Prompt for AI Agents
In `@py-sdk/inference_logging_client/inference_logging_client/cli.py` around lines
171 - 176, The comment above the df.count() call is inaccurate because the code
performs a full count; either update the comment to reflect that df.count() is
executed, or change the implementation to the advertised optimization by
replacing row_count = df.count() with a cheap emptiness check such as row_count
= df.limit(1).count() (or similar) and only run a full count when necessary;
update the printed message using row_count and retain the existing exception
handling around the count to preserve current behavior.


dependencies = [
"pandas>=1.3.0",
"pyspark>=3.3.0",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

PySpark Python 3.12 support version compatibility

💡 Result:

For Python 3.12, these are the key PySpark / Spark compatibility points:

  • Spark / PySpark 4.0.x: supports Python 3.9+ (so 3.12 is supported). The Spark 4.0.0 release notes also include Python-3.12-related dependency work (e.g., grpcio upgrade “for Python 3.12”). [1][2]
  • Spark / PySpark 4.1.x (latest docs): supports Python 3.10+ (so 3.12 is supported). [3]
  • Spark / PySpark 3.5.x: supports Python 3.8+ (so 3.12 is included by the stated support range). [4]

Practical recommendation: if you specifically care about smooth Python 3.12 operation (dependency wheels, fewer edge cases), prefer Spark 4.0+ (or newer). [1][2]

Sources
[1] Spark 4.0.0 PySpark install docs (Python 3.9+) (spark.apache.org)
[2] Spark 4.0.0 release notes (grpcio upgrade for Python 3.12) (spark.apache.org)
[3] Spark “latest” (4.1.0) PySpark install docs (Python 3.10+) (spark.apache.org)
[4] Spark 3.5.3 PySpark install docs (Python 3.8+) (spark.apache.org)

Citations:


🏁 Script executed:

find . -path "*/py-sdk/inference_logging_client/pyproject.toml" -type f | head -1 | xargs cat

Repository: Meesho/BharatMLStack

Length of output: 1893


Fix PySpark and Python version compatibility mismatch.

Python 3.12 is listed in the classifiers, but the minimum PySpark version (3.3.0) does not officially support it. PySpark 3.5.0+ adds Python 3.12 support.

Either:

  1. Bump the minimum PySpark version to >=3.5.0 if Python 3.12 support is required
  2. Remove Python 3.12 from classifiers if maintaining PySpark 3.3.0 compatibility is more important
🤖 Prompt for AI Agents
In `@py-sdk/inference_logging_client/pyproject.toml` at line 31, The package
metadata lists Python 3.12 in classifiers but requires "pyspark>=3.3.0" which
doesn't officially support 3.12; update the dependency or classifiers to be
consistent: either change the pyspark requirement string in pyproject.toml from
"pyspark>=3.3.0" to "pyspark>=3.5.0" to enable Python 3.12 support, or remove
the Python 3.12 classifier entries from the package classifiers so they match
pyspark>=3.3.0 compatibility.

@readytoreview
Copy link

readytoreview bot commented Mar 12, 2026

PR Validation Failed

PR description validation failed

Issues found:

  • no PR type selected
  • no test selection made (yes/no/need help)
  • missing KRD section

Please fill out the form to fix this: Open Form

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In @.pre-commit-config.yaml:
- Line 7: The pre-commit hook entry "entry: 'pre-commit-scripts/runner.sh'"
points to a missing script which breaks TruffleHog scanning; update the entry in
.pre-commit-config.yaml to reference the existing hook script
"trufflehog/trufflehog-hook.sh" (or create the intended wrapper at
pre-commit-scripts/runner.sh), ensure the referenced script is executable, and
commit the change so the TruffleHog hook can run successfully.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 355-370: The code builds full_schema without first filtering out
reserved or duplicate column names, causing NameError for the undefined
_reserved_columns and allowing feature names to collide with existing columns
(e.g., tracking_id). Define and populate a _reserved_columns set (including
existing df_columns and any fixed output columns like "entity_id") and use it to
filter out collisions before assembling schema_fields: compute
metadata_cols_in_schema and the list of feature names to include by excluding
names in _reserved_columns, then build input_field_map and append StructField
entries only for the filtered metadata and feature names when creating
full_schema (update references around schema_cache, row_metadata_columns,
df_columns, input_field_map, and full_schema).
- Around line 340-352: The code hardcodes "mp_config_id" into
row_metadata_columns and other output-path places instead of using the function
parameter mp_config_id_column; update every occurrence (e.g., the
row_metadata_columns list, any preserved_metadata construction, output schema
generation, and final column ordering logic) to use the mp_config_id_column
variable so the chosen column name (mp_config_id_column) is carried through
decoding, preserved metadata, schema, and ordering; search for literal
"mp_config_id" in this module and replace those spots with mp_config_id_column
while keeping the rest of the metadata names unchanged.
- Around line 323-339: The current prefetch loop catches exceptions from
get_feature_schema and only warns, allowing full_schema to be built without that
config/version which causes silent data loss later; change the error handling in
the prefetch section (the loop over distinct_rows that writes into schema_cache
and calls get_feature_schema) to not swallow failures—either re-raise the
exception or abort building the prefetch (e.g., raise a RuntimeError including
the mp_config_id/version) so the function fails fast and forces the worker-path
to fetch the schema successfully; ensure this change applies to the same pattern
used at schema prefetch sites (the schema_cache population in the distinct_rows
loop and the similar blocks around where get_feature_schema is called).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3ad76abe-6a23-4baa-a164-22aa42e1c664

📥 Commits

Reviewing files that changed from the base of the PR and between d303727 and f4a6572.

📒 Files selected for processing (4)
  • .pre-commit-config.yaml
  • py-sdk/inference_logging_client/inference_logging_client/__init__.py
  • py-sdk/inference_logging_client/inference_logging_client/io.py
  • py-sdk/inference_logging_client/pyproject.toml
🚧 Files skipped from review as they are similar to previous changes (2)
  • py-sdk/inference_logging_client/pyproject.toml
  • py-sdk/inference_logging_client/inference_logging_client/io.py

name: TruffleHog
description: Detect secrets in your data.
entry: "trufflehog/trufflehog-hook.sh"
entry: "pre-commit-scripts/runner.sh"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== pre-commit config =="
sed -n '1,40p' .pre-commit-config.yaml

echo
echo "== locate new runner =="
fd '^runner\.sh$' pre-commit-scripts -x ls -l {}

echo
echo "== inspect new runner contents =="
fd '^runner\.sh$' pre-commit-scripts -x sed -n '1,220p' {}

echo
echo "== inspect any existing trufflehog-specific hook =="
fd '^trufflehog-hook\.sh$' -x sed -n '1,220p' {} || true

echo
echo "== search for trufflehog wiring =="
rg -n -C3 'trufflehog|TruffleHog' .pre-commit-config.yaml pre-commit-scripts

Repository: Meesho/BharatMLStack

Length of output: 481


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== All files in repository root =="
git ls-files | head -30

echo
echo "== Search for pre-commit-scripts directory =="
find . -type d -name "pre-commit-scripts" 2>/dev/null || echo "No pre-commit-scripts directory found"

echo
echo "== Search for runner.sh anywhere =="
find . -name "runner.sh" -type f 2>/dev/null || echo "No runner.sh found"

echo
echo "== Search for any script files in repository =="
find . -name "*.sh" -type f 2>/dev/null | head -20

echo
echo "== Check directory structure =="
ls -la

echo
echo "== Full pre-commit config context =="
cat .pre-commit-config.yaml

Repository: Meesho/BharatMLStack

Length of output: 1032


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== Search for pre-commit-scripts directory or runner.sh =="
git ls-files | grep -E "pre-commit-scripts|runner\.sh" || echo "No matches found"

echo
echo "== List all directories in repo =="
git ls-files -o --exclude-standard | grep -v "^\." | cut -d/ -f1 | sort -u

echo
echo "== Check if pre-commit-scripts exists =="
if [ -d "pre-commit-scripts" ]; then
  echo "Directory exists. Contents:"
  ls -la pre-commit-scripts/
else
  echo "Directory does NOT exist"
fi

echo
echo "== Full .pre-commit-config.yaml =="
cat .pre-commit-config.yaml

Repository: Meesho/BharatMLStack

Length of output: 175


🏁 Script executed:

#!/bin/bash
cat .pre-commit-config.yaml

echo
echo "== Search for any shell scripts in the entire repo =="
git ls-files | grep "\.sh$"

Repository: Meesho/BharatMLStack

Length of output: 1111


🏁 Script executed:

#!/bin/bash
cat trufflehog/trufflehog-hook.sh

Repository: Meesho/BharatMLStack

Length of output: 2286


Hook entry points to non-existent path—secret scanning is broken.

The configured entry pre-commit-scripts/runner.sh does not exist in the repository. This will cause the TruffleHog pre-commit hook to fail with a file-not-found error, completely disabling secret scanning.

The actual TruffleHog hook script exists at trufflehog/trufflehog-hook.sh. Verify that the entry should reference this path instead, or if pre-commit-scripts/runner.sh is intentionally being created as a wrapper.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.pre-commit-config.yaml at line 7, The pre-commit hook entry "entry:
'pre-commit-scripts/runner.sh'" points to a missing script which breaks
TruffleHog scanning; update the entry in .pre-commit-config.yaml to reference
the existing hook script "trufflehog/trufflehog-hook.sh" (or create the intended
wrapper at pre-commit-scripts/runner.sh), ensure the referenced script is
executable, and commit the change so the TruffleHog hook can run successfully.

Comment on lines +323 to +339
for row in distinct_rows:
metadata_data = row[metadata_column]
metadata_byte = 0
if not pd.isna(metadata_data):
if isinstance(metadata_data, (int, float)):
metadata_byte = int(metadata_data)
elif isinstance(metadata_data, bytes) and len(metadata_data) > 0:
metadata_byte = metadata_data[0]
elif isinstance(metadata_data, (bytearray, memoryview)) and len(metadata_data) > 0:
metadata_byte = metadata_data[0]
elif isinstance(metadata_data, str):
try:
metadata_byte = int(metadata_data)
except ValueError:
pass

metadata_byte = _extract_metadata_byte(metadata_data, json, base64)
_, version, _ = unpack_metadata_byte(metadata_byte)

# Skip invalid versions
if not (0 <= version <= _MAX_SCHEMA_VERSION):
continue

# Extract mp_config_id
mp_config_id = row[mp_config_id_column]
if pd.isna(mp_config_id):
if mp_config_id is None:
continue
mp_config_id = str(mp_config_id)

cache_key = (mp_config_id, version)
if cache_key not in schema_cache:
# Pre-fetch schema and store in local cache
try:
schema_cache[cache_key] = get_feature_schema(mp_config_id, version, inference_host)
except Exception as e:
# Log warning but don't fail - will be caught again in main loop
warnings.warn(f"Failed to pre-fetch schema for {cache_key}: {e}", UserWarning)

all_decoded_rows = []

for idx, row in df.iterrows():
# Extract features bytes
features_data = row[features_column]
if pd.isna(features_data):
continue

# Convert features to bytes (handle base64, hex, or raw bytes)
features_bytes = None
if isinstance(features_data, bytes):
features_bytes = features_data
elif isinstance(features_data, str):
# Try base64 first
try:
features_bytes = base64.b64decode(features_data)
except Exception:
# Try hex
try:
features_bytes = bytes.fromhex(features_data)
except Exception:
# Try UTF-8 encoding
features_bytes = features_data.encode('utf-8')
elif isinstance(features_data, (bytearray, memoryview)):
features_bytes = bytes(features_data)
else:
continue

if features_bytes is None or len(features_bytes) == 0:
continue

# Extract metadata byte
metadata_data = row[metadata_column]
metadata_byte = 0
if not pd.isna(metadata_data):
if isinstance(metadata_data, (int, float)):
metadata_byte = int(metadata_data)
elif isinstance(metadata_data, bytes) and len(metadata_data) > 0:
metadata_byte = metadata_data[0]
elif isinstance(metadata_data, (bytearray, memoryview)) and len(metadata_data) > 0:
metadata_byte = metadata_data[0]
elif isinstance(metadata_data, str):
try:
metadata_byte = int(metadata_data)
except ValueError:
pass

# Extract version from metadata byte
_, version, _ = unpack_metadata_byte(metadata_byte)

# Validate version range
if not (0 <= version <= _MAX_SCHEMA_VERSION):
warnings.warn(
f"Row {idx}: Version {version} extracted from metadata is out of valid range (0-{_MAX_SCHEMA_VERSION}). "
f"This may indicate corrupted metadata.",
UserWarning
)
continue

# Extract mp_config_id
mp_config_id = row[mp_config_id_column]
if pd.isna(mp_config_id):
continue
mp_config_id = str(mp_config_id)

# Lookup cached schema
cache_key = (mp_config_id, version)
cached_schema = schema_cache.get(cache_key)

# Decode this row's features using cached schema

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't continue with a partial schema cache.

If the prefetch at Lines 335-338 fails, full_schema is built without that config/version's feature columns. The worker path then retries at Lines 401-404, and if that succeeds, pd.DataFrame(..., columns=all_columns_ordered) drops the decoded feature keys because they were never added to the schema. That turns a transient schema-fetch failure into silent data loss.

Proposed fix
         if cache_key not in schema_cache:
             try:
                 schema_cache[cache_key] = get_feature_schema(mp_config_id, version, inference_host)
             except Exception as e:
-                warnings.warn(f"Failed to pre-fetch schema for {cache_key}: {e}", UserWarning)
+                raise SchemaFetchError(
+                    f"Failed to fetch schema for {cache_key} before building the output schema: {e}"
+                ) from e

Also applies to: 354-370, 400-405, 494-495

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 323 - 339, The current prefetch loop catches exceptions from
get_feature_schema and only warns, allowing full_schema to be built without that
config/version which causes silent data loss later; change the error handling in
the prefetch section (the loop over distinct_rows that writes into schema_cache
and calls get_feature_schema) to not swallow failures—either re-raise the
exception or abort building the prefetch (e.g., raise a RuntimeError including
the mp_config_id/version) so the function fails fast and forces the worker-path
to fetch the schema successfully; ensure this change applies to the same pattern
used at schema prefetch sites (the schema_cache population in the distinct_rows
loop and the similar blocks around where get_feature_schema is called).

Comment on lines +340 to +352
row_metadata_columns = [
"prism_ingested_at",
"prism_extracted_at",
"created_at",
"mp_config_id",
"parent_entity",
"tracking_id",
"user_id",
"year",
"month",
"day",
"hour",
]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Honor mp_config_id_column throughout the output path.

Lines 344 and 515 hardcode mp_config_id, even though the function accepts mp_config_id_column. A call with mp_config_id_column="foo" will decode rows using foo, but foo never makes it into the preserved metadata, output schema, or final ordering.

Proposed fix
     row_metadata_columns = [
         "prism_ingested_at",
         "prism_extracted_at",
         "created_at",
-        "mp_config_id",
+        mp_config_id_column,
         "parent_entity",
         "tracking_id",
         "user_id",
         "year",
@@
     for col in [
         "prism_ingested_at",
         "prism_extracted_at",
         "created_at",
-        "mp_config_id",
+        mp_config_id_column,
         "parent_entity",
         "tracking_id",
         "user_id",
         "year",

Also applies to: 481-485, 511-523

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 340 - 352, The code hardcodes "mp_config_id" into row_metadata_columns and
other output-path places instead of using the function parameter
mp_config_id_column; update every occurrence (e.g., the row_metadata_columns
list, any preserved_metadata construction, output schema generation, and final
column ordering logic) to use the mp_config_id_column variable so the chosen
column name (mp_config_id_column) is carried through decoding, preserved
metadata, schema, and ordering; search for literal "mp_config_id" in this module
and replace those spots with mp_config_id_column while keeping the rest of the
metadata names unchanged.

Comment on lines +355 to +370
all_feature_names = set()
for feat_list in schema_cache.values():
for f in feat_list:
all_feature_names.add(f.name)
metadata_cols_in_schema = [c for c in row_metadata_columns if c in df_columns]
from pyspark.sql.types import StringType, StructField, StructType
# Map input column names to their Spark types so we can preserve them in the output
input_field_map = {field.name: field.dataType for field in df.schema.fields}
schema_fields = [StructField("entity_id", StringType(), True)]
for c in metadata_cols_in_schema:
# Preserve the original type (LongType, TimestampType, etc.)
original_type = input_field_map.get(c, StringType())
schema_fields.append(StructField(c, original_type, True))
for c in sorted(all_feature_names):
schema_fields.append(StructField(c, StringType(), True))
full_schema = StructType(schema_fields)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Finish the duplicate-column guard before building full_schema.

Line 471 references _reserved_columns, but this file never defines it, so the first decoded row will fail with NameError. Also, because Lines 355-369 add every feature name to full_schema before any collision filtering, a feature named like tracking_id or user_id still creates duplicate output columns.

Proposed fix
     metadata_cols_in_schema = [c for c in row_metadata_columns if c in df_columns]
+    reserved_columns = {"entity_id"} | set(metadata_cols_in_schema)
+    all_feature_names = {name for name in all_feature_names if name not in reserved_columns}
     from pyspark.sql.types import StringType, StructField, StructType
     # Map input column names to their Spark types so we can preserve them in the output
     input_field_map = {field.name: field.dataType for field in df.schema.fields}
     schema_fields = [StructField("entity_id", StringType(), True)]
@@
                     result_row = {"entity_id": entity_id}
                     # Convert all feature values to strings for schema compatibility
                     for k, v in decoded_features.items():
-                        if k in _reserved_columns:
+                        if k in reserved_columns:
                             continue

Also applies to: 469-472

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 355 - 370, The code builds full_schema without first filtering out
reserved or duplicate column names, causing NameError for the undefined
_reserved_columns and allowing feature names to collide with existing columns
(e.g., tracking_id). Define and populate a _reserved_columns set (including
existing df_columns and any fixed output columns like "entity_id") and use it to
filter out collisions before assembling schema_fields: compute
metadata_cols_in_schema and the list of feature names to include by excluding
names in _reserved_columns, then build input_field_map and append StructField
entries only for the filtered metadata and feature names when creating
full_schema (update references around schema_cache, row_metadata_columns,
df_columns, input_field_map, and full_schema).

Comment on lines +360 to +367
from pyspark.sql.types import StringType, StructField, StructType
# Map input column names to their Spark types so we can preserve them in the output
input_field_map = {field.name: field.dataType for field in df.schema.fields}
schema_fields = [StructField("entity_id", StringType(), True)]
for c in metadata_cols_in_schema:
# Preserve the original type (LongType, TimestampType, etc.)
original_type = input_field_map.get(c, StringType())
schema_fields.append(StructField(c, original_type, True))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Keep parent_entity aligned with the copied Spark type.

Lines 364-367 preserve the input Spark type for parent_entity, but Lines 439-440 and 486-487 flatten list values to a scalar or stringify them. When the source column is ArrayType(...), the yielded pandas rows no longer match full_schema, which can break mapInPandas or corrupt the preserved field.

Also applies to: 430-487

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
py-sdk/inference_logging_client/inference_logging_client/__init__.py (1)

369-371: ⚠️ Potential issue | 🟠 Major

Feature names not filtered against reserved columns when building schema.

While _reserved_columns is now properly defined (Line 353) and used at runtime (Line 472), feature names are still added to the schema without filtering. If a feature is named tracking_id or user_id, the schema will have duplicate column names.

Proposed fix - filter feature names before adding to schema
     for c in sorted(all_feature_names):
+        if c in _reserved_columns:
+            continue
         schema_fields.append(StructField(c, StringType(), True))
     full_schema = StructType(schema_fields)
-    all_columns_ordered = ["entity_id"] + metadata_cols_in_schema + sorted(all_feature_names)
+    filtered_feature_names = sorted(all_feature_names - _reserved_columns)
+    all_columns_ordered = ["entity_id"] + metadata_cols_in_schema + filtered_feature_names
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 369 - 371, When building full_schema, filter out reserved column names
from the feature list before creating StructField entries: instead of iterating
over sorted(all_feature_names) directly, exclude names present in
_reserved_columns (or self._reserved_columns if in a class) so you don't append
duplicate fields like "tracking_id" or "user_id"; update the loop that builds
schema_fields (the code referencing all_feature_names, StructField, and
full_schema) to skip reserved names prior to adding StructField entries.
🧹 Nitpick comments (1)
py-sdk/inference_logging_client/inference_logging_client/__init__.py (1)

401-406: Silent skip on schema fetch failure in worker may lose data.

When the worker-side schema fetch fails (Lines 403-406), the row is silently skipped. Combined with the prefetch issue above, this means rows with schemas that failed prefetch and fail worker fetch will be silently dropped. Consider logging a warning for observability.

Suggested improvement for observability
                 if feature_schema is None:
                     try:
                         feature_schema = get_feature_schema(mp_config_id, version, inference_host)
                     except Exception:
+                        warnings.warn(
+                            f"Skipping row: failed to fetch schema for ({mp_config_id}, {version})",
+                            UserWarning
+                        )
                         continue
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 401 - 406, The code silently continues when
get_feature_schema(mp_config_id, version, inference_host) raises, causing
dropped rows; modify the except block around the feature_schema fetch (where
schema_cache.get(cache_key) and get_feature_schema are used) to log a warning
including mp_config_id, version, inference_host, cache_key and the caught
exception details (use your existing logger) before continuing so failures are
observable while preserving the current behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 501-507: The code restores the Arrow batch config by always
setting "10000" when spark.conf.get returned None, which can change Spark's true
default; in the finally block of the section that sets
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch",
str(batch_limit)), change the restoration to call
spark.conf.unset("spark.sql.execution.arrow.maxRecordsPerBatch") when
prev_max_records is None and otherwise set it back to prev_max_records (i.e.,
use prev_max_records to decide between spark.conf.unset(...) and
spark.conf.set(...)), referencing the variables batch_limit, prev_max_records
and the mapInPandas call using _decode_batch to locate the surrounding code.

---

Duplicate comments:
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 369-371: When building full_schema, filter out reserved column
names from the feature list before creating StructField entries: instead of
iterating over sorted(all_feature_names) directly, exclude names present in
_reserved_columns (or self._reserved_columns if in a class) so you don't append
duplicate fields like "tracking_id" or "user_id"; update the loop that builds
schema_fields (the code referencing all_feature_names, StructField, and
full_schema) to skip reserved names prior to adding StructField entries.

---

Nitpick comments:
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 401-406: The code silently continues when
get_feature_schema(mp_config_id, version, inference_host) raises, causing
dropped rows; modify the except block around the feature_schema fetch (where
schema_cache.get(cache_key) and get_feature_schema are used) to log a warning
including mp_config_id, version, inference_host, cache_key and the caught
exception details (use your existing logger) before continuing so failures are
observable while preserving the current behavior.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 40a933ab-c345-45ba-a15d-9dc4a8e0ff00

📥 Commits

Reviewing files that changed from the base of the PR and between f4a6572 and dfaf567.

📒 Files selected for processing (1)
  • py-sdk/inference_logging_client/inference_logging_client/__init__.py

Comment on lines +501 to +507
batch_limit = max_records_per_batch if max_records_per_batch is not None else 200
prev_max_records = spark.conf.get("spark.sql.execution.arrow.maxRecordsPerBatch")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", str(batch_limit))
try:
result_df = df_repart.mapInPandas(_decode_batch, full_schema)
finally:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", prev_max_records or "10000")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Spark config restoration may not preserve actual default.

spark.conf.get() returns None if the config was never explicitly set, but Line 507 falls back to "10000". This may not match Spark's actual default for spark.sql.execution.arrow.maxRecordsPerBatch, potentially altering behavior after this function completes.

Suggested fix using unset
-    prev_max_records = spark.conf.get("spark.sql.execution.arrow.maxRecordsPerBatch")
+    try:
+        prev_max_records = spark.conf.get("spark.sql.execution.arrow.maxRecordsPerBatch")
+    except Exception:
+        prev_max_records = None
     spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", str(batch_limit))
     try:
         result_df = df_repart.mapInPandas(_decode_batch, full_schema)
     finally:
-        spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", prev_max_records or "10000")
+        if prev_max_records is not None:
+            spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", prev_max_records)
+        else:
+            spark.conf.unset("spark.sql.execution.arrow.maxRecordsPerBatch")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 501 - 507, The code restores the Arrow batch config by always setting
"10000" when spark.conf.get returned None, which can change Spark's true
default; in the finally block of the section that sets
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch",
str(batch_limit)), change the restoration to call
spark.conf.unset("spark.sql.execution.arrow.maxRecordsPerBatch") when
prev_max_records is None and otherwise set it back to prev_max_records (i.e.,
use prev_max_records to decide between spark.conf.unset(...) and
spark.conf.set(...)), referencing the variables batch_limit, prev_max_records
and the mapInPandas call using _decode_batch to locate the surrounding code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants