Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pykalshi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
A clean, modular interface for the Kalshi trading API.
"""

__version__ = "1.0.4"
__version__ = "1.0.5"

import logging

Expand Down
7 changes: 4 additions & 3 deletions pykalshi/afeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ._utils import normalize_tickers
from .feed import (
_parse_message,
_parse_ts,
_WS_SIGN_PATH,
DEFAULT_WS_BASE,
DEMO_WS_BASE,
Expand Down Expand Up @@ -205,9 +206,9 @@ async def __aiter__(self) -> AsyncIterator:
# Extract server timestamp
payload = data.get("msg", data)
if isinstance(payload, dict):
ts = payload.get("ts")
if ts is not None:
self._last_server_ts = int(ts)
parsed_ts = _parse_ts(payload.get("ts"))
if parsed_ts is not None:
self._last_server_ts = parsed_ts

# Call registered handlers
for handler in self._handlers.get(channel, []):
Expand Down
46 changes: 35 additions & 11 deletions pykalshi/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
import logging
import threading
import time
from typing import Any, Callable, Union, TYPE_CHECKING
from datetime import datetime
from typing import Annotated, Any, Callable, Union, TYPE_CHECKING

from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel, BeforeValidator, ConfigDict

from ._utils import normalize_ticker, normalize_tickers

Expand All @@ -28,6 +29,29 @@
_WS_SIGN_PATH = "/trade-api/ws/v2"


# --- Timestamp parsing ---


def _parse_ts(value: Any) -> int | None:
"""Coerce a Kalshi WebSocket ``ts`` to int milliseconds since epoch.

Pre-April-2026 Kalshi sent int ms; since then it sends ISO 8601 strings
(e.g. ``'2026-04-22T18:31:59.043421Z'``). Accept both; unrecognized
values return None rather than raising.
"""
if isinstance(value, int):
return value
if isinstance(value, str):
try:
return int(datetime.fromisoformat(value.replace("Z", "+00:00")).timestamp() * 1000)
except ValueError:
return None
return None


TsField = Annotated[int | None, BeforeValidator(_parse_ts)]


# --- WebSocket Message Models ---


Expand All @@ -46,7 +70,7 @@ class TickerMessage(BaseModel):
open_interest_fp: str | None = None
dollar_volume_dollars: str | None = None
dollar_open_interest_dollars: str | None = None
ts: int | None = None
ts: TsField = None

model_config = ConfigDict(extra="ignore")

Expand Down Expand Up @@ -95,7 +119,7 @@ class TradeMessage(BaseModel):
yes_price_dollars: str | None = None
no_price_dollars: str | None = None
taker_side: str | None = None
ts: int | None = None
ts: TsField = None

model_config = ConfigDict(extra="ignore")

Expand All @@ -116,7 +140,7 @@ class FillMessage(BaseModel):
yes_price_dollars: str | None = None
no_price_dollars: str | None = None
is_taker: bool | None = None
ts: int | None = None
ts: TsField = None

model_config = ConfigDict(extra="ignore")

Expand All @@ -135,7 +159,7 @@ class PositionMessage(BaseModel):
total_traded_dollars: str | None = None
resting_orders_count: int | None = None
fees_paid_dollars: str | None = None
ts: int | None = None
ts: TsField = None

model_config = ConfigDict(extra="ignore")

Expand All @@ -146,7 +170,7 @@ class MarketLifecycleMessage(BaseModel):
market_ticker: str
status: str | None = None
result: str | None = None # Settlement result ("yes" or "no")
ts: int | None = None
ts: TsField = None

model_config = ConfigDict(extra="ignore")

Expand All @@ -156,7 +180,7 @@ class OrderGroupUpdateMessage(BaseModel):

order_group_id: str
status: str | None = None # "active", "triggered", "canceled"
ts: int | None = None
ts: TsField = None

model_config = ConfigDict(extra="ignore")

Expand Down Expand Up @@ -552,10 +576,10 @@ def _dispatch(self, raw: str | bytes) -> None:

payload = data.get("msg", data)
if isinstance(payload, dict):
ts = payload.get("ts")
if ts is not None:
parsed_ts = _parse_ts(payload.get("ts"))
if parsed_ts is not None:
with self._metrics_lock:
self._last_server_ts = int(ts)
self._last_server_ts = parsed_ts

handlers = self._handlers.get(channel)
if not handlers:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "pykalshi"
version = "1.0.4"
version = "1.0.5"
description = "A typed Python client for the Kalshi prediction markets API with WebSocket streaming, automatic retries, and ergonomic interfaces"
readme = "README.md"
license = "MIT"
Expand Down
32 changes: 32 additions & 0 deletions tests/test_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,23 @@
PositionMessage,
DEFAULT_WS_BASE,
DEMO_WS_BASE,
_parse_ts,
)


class TestParseTs:
"""Issue #18: Kalshi switched ts from int ms to ISO 8601 in April 2026."""

def test_int_passes_through(self):
assert _parse_ts(1776882719000) == 1776882719000

def test_iso_string(self):
assert _parse_ts("2026-04-22T18:31:59.043421Z") == 1776882719043

def test_garbage_returns_none(self):
assert _parse_ts("not-a-timestamp") is None


class TestFeedCreation:
"""Tests for Feed initialization."""

Expand Down Expand Up @@ -549,6 +563,11 @@ def test_position_model(self):
assert msg.realized_pnl_dollars == "2.50"
assert msg.ts == 1704067200

def test_ticker_model_accepts_iso_ts(self):
"""Issue #18: TsField coerces ISO strings so typed models still validate."""
msg = TickerMessage(market_ticker="TEST", ts="2026-04-22T18:31:59Z")
assert msg.ts == 1776882719000

def test_models_ignore_extra_fields(self):
"""Models ignore unknown fields (forward compatibility)."""
msg = TickerMessage(
Expand Down Expand Up @@ -633,6 +652,19 @@ def test_server_timestamp_extracted(self, client):

assert feed._last_server_ts == server_ts

def test_server_timestamp_iso_string(self, client):
"""Issue #18: ISO 8601 ts must not raise (was crashing every message)."""
feed = Feed(client)
feed.on("ticker", lambda x: None)

raw = json.dumps({
"type": "ticker",
"msg": {"market_ticker": "TEST", "ts": "2026-04-22T18:31:59.043421Z"},
})
feed._dispatch(raw)

assert feed._last_server_ts == 1776882719043

def test_latency_calculated_from_timestamps(self, client):
"""Latency is calculated when server timestamp is available."""
import time
Expand Down
Loading