diff --git a/QuantConnect.DataBento/DataBentoDataProvider.cs b/QuantConnect.DataBento/DataBentoDataProvider.cs index 8cf6015..2269d24 100644 --- a/QuantConnect.DataBento/DataBentoDataProvider.cs +++ b/QuantConnect.DataBento/DataBentoDataProvider.cs @@ -27,12 +27,14 @@ using QuantConnect.Packets; using QuantConnect.Securities; using System.Collections.Concurrent; +using System.ComponentModel.Composition; namespace QuantConnect.Lean.DataSource.DataBento { /// /// Implementation of Custom Data Provider /// + [Export(typeof(IDataQueueHandler))] public class DataBentoProvider : IDataQueueHandler { private readonly IDataAggregator _dataAggregator = Composer.Instance.GetExportedValueByTypeName( @@ -49,6 +51,15 @@ public class DataBentoProvider : IDataQueueHandler private readonly MarketHoursDatabase _marketHoursDatabase = MarketHoursDatabase.FromDataFolder(); private readonly ConcurrentDictionary _symbolExchangeTimeZones = new(); + /// + /// Replay start time for intraday historical data. + /// - DateTime.MinValue = full replay (start=0, up to 24 hours) + /// - Specific DateTime = replay from that time + /// - null = live only (no replay) - but we default to 15 min ago if not configured + /// Configure via "databento-replay-start" in config.json: "0" for full, ISO datetime for specific, absent for 15-min default + /// + private readonly DateTime? _replayStart; + /// /// Returns true if we're currently connected to the Data Provider /// @@ -65,6 +76,11 @@ public DataBentoProvider() throw new ArgumentException("DataBento API key is required. Set 'databento-api-key' in configuration."); } + // Parse replay start config: "0" for full replay, ISO datetime for specific, absent for 15-min default + var replayStartConfig = Config.Get("databento-replay-start", ""); + _replayStart = ParseReplayStart(replayStartConfig); + Log.Trace($"DataBentoProvider: Replay start configured as: {FormatReplayStart(_replayStart)}"); + _dataDownloader = new DataBentoDataDownloader(_apiKey); Initialize(); } @@ -104,8 +120,10 @@ private void Initialize() return false; } - var resolution = config.Resolution > Resolution.Tick ? Resolution.Tick : config.Resolution; - if (!_client.Subscribe(config.Symbol, resolution, config.TickType)) + // Use intraday replay to get historical data at startup + var replayStart = GetEffectiveReplayStart(); + Log.Trace($"DataBentoProvider.SubscribeImpl(): Using intraday replay: {FormatReplayStart(replayStart)}"); + if (!_client.Subscribe(config.Symbol, config.Resolution, config.TickType, replayStart)) { Log.Error($"Failed to subscribe to {config.Symbol}"); return false; @@ -258,6 +276,59 @@ public void Dispose() } } + /// + /// Parses the databento-replay-start config value. + /// + /// Config value: "0" for full, ISO datetime for specific, empty for default + /// DateTime.MinValue for full replay, specific DateTime, or null for default (15-min lookback) + private static DateTime? ParseReplayStart(string configValue) + { + if (string.IsNullOrWhiteSpace(configValue)) + { + // Empty/absent = use default (will be calculated as 15 min ago at subscribe time) + return null; + } + + if (configValue == "0") + { + // "0" = full intraday replay (sentinel value) + return DateTime.MinValue; + } + + // Try to parse as ISO datetime + if (DateTime.TryParse(configValue, out var parsed)) + { + return parsed.ToUniversalTime(); + } + + Log.Error($"DataBentoProvider: Invalid databento-replay-start value '{configValue}'. Use '0' for full replay, ISO datetime, or omit for 15-min default."); + return null; + } + + /// + /// Formats replay start for logging. + /// + private static string FormatReplayStart(DateTime? replayStart) + { + if (!replayStart.HasValue) + return "15-minute lookback (default)"; + if (replayStart.Value == DateTime.MinValue) + return "FULL intraday replay (start=0)"; + return $"from {replayStart.Value:yyyy-MM-ddTHH:mm:ss} UTC"; + } + + /// + /// Gets the effective replay start time for subscription. + /// If _replayStart is null (default), calculates 15 minutes ago. + /// + private DateTime? GetEffectiveReplayStart() + { + if (_replayStart.HasValue) + return _replayStart.Value; + // Default: 15 minutes ago + return DateTime.UtcNow.AddMinutes(-15); + } + /// /// Checks if this Data provider supports the specified symbol /// @@ -265,8 +336,8 @@ public void Dispose() /// returns true if Data Provider supports the specified symbol; otherwise false private bool CanSubscribe(Symbol symbol) { + // Reject universe symbols but allow canonical (continuous) futures return !symbol.Value.Contains("universe", StringComparison.InvariantCultureIgnoreCase) && - !symbol.IsCanonical() && IsSecurityTypeSupported(symbol.SecurityType); } @@ -352,18 +423,26 @@ private void OnDataReceived(object? sender, BaseData data) { tick.Time = GetTickTime(tick.Symbol, tick.Time); _dataAggregator.Update(tick); - - Log.Trace($"DataBentoProvider.OnDataReceived(): Updated tick - Symbol: {tick.Symbol}, " + - $"TickType: {tick.TickType}, Price: {tick.Value}, Quantity: {tick.Quantity}"); + // Log.Trace removed - too spammy (millions of ticks) } else if (data is TradeBar tradeBar) { tradeBar.Time = GetTickTime(tradeBar.Symbol, tradeBar.Time); tradeBar.EndTime = GetTickTime(tradeBar.Symbol, tradeBar.EndTime); - _dataAggregator.Update(tradeBar); - Log.Trace($"DataBentoProvider.OnDataReceived(): Updated TradeBar - Symbol: {tradeBar.Symbol}, " + - $"O:{tradeBar.Open} H:{tradeBar.High} L:{tradeBar.Low} C:{tradeBar.Close} V:{tradeBar.Volume}"); + // Classify as replay vs live data based on age + // Replay data (from start= parameter) is historical, live data is recent + var now = DateTime.UtcNow; + var dataAgeSeconds = (now - tradeBar.EndTime.ConvertToUtc(TimeZones.Chicago)).TotalSeconds; + var dataType = dataAgeSeconds > 60 ? "REPLAY" : "LIVE"; + + // Log first 10 bars and every 50th bar thereafter + _tradeBarCount++; + if (_tradeBarCount <= 10 || _tradeBarCount % 50 == 0) + { + Log.Trace($"DataBentoProvider.OnDataReceived(): [{dataType}] TradeBar #{_tradeBarCount} for {tradeBar.Symbol} at {tradeBar.Time} (age: {dataAgeSeconds:F0}s)"); + } + _dataAggregator.Update(tradeBar); } else { @@ -377,6 +456,8 @@ private void OnDataReceived(object? sender, BaseData data) } } + private int _tradeBarCount = 0; + /// /// Handles connection status changes from the live client /// diff --git a/QuantConnect.DataBento/DataBentoHistoryProivder.cs b/QuantConnect.DataBento/DataBentoHistoryProivder.cs index a93b50e..5202fa9 100644 --- a/QuantConnect.DataBento/DataBentoHistoryProivder.cs +++ b/QuantConnect.DataBento/DataBentoHistoryProivder.cs @@ -101,10 +101,13 @@ public override void Initialize(HistoryProviderInitializeParameters parameters) /// An enumerable of BaseData points public IEnumerable? GetHistory(HistoryRequest request) { + // DIAGNOSTIC: Log that we're being called + Log.Trace($"DataBentoHistoryProvider.GetHistory(): Symbol={request.Symbol}, Type={request.DataType}, TickType={request.TickType}, Resolution={request.Resolution}, IsCanonical={request.Symbol.IsCanonical()}, Start={request.StartTimeUtc}, End={request.EndTimeUtc}"); + if (request.Symbol.IsCanonical() || !IsSupported(request.Symbol.SecurityType, request.DataType, request.TickType, request.Resolution)) { - // It is Logged in IsSupported(...) + Log.Trace($"DataBentoHistoryProvider.GetHistory(): Rejecting request - IsCanonical={request.Symbol.IsCanonical()}, IsSupported={(request.Symbol.IsCanonical() ? "N/A" : IsSupported(request.Symbol.SecurityType, request.DataType, request.TickType, request.Resolution).ToString())}"); return null; } diff --git a/QuantConnect.DataBento/DataBentoRawLiveClient.cs b/QuantConnect.DataBento/DataBentoRawLiveClient.cs index 2e7bb97..989f8fa 100644 --- a/QuantConnect.DataBento/DataBentoRawLiveClient.cs +++ b/QuantConnect.DataBento/DataBentoRawLiveClient.cs @@ -202,9 +202,23 @@ private static string ComputeSHA256(string input) } /// - /// Subscribes to live data for a symbol + /// Sentinel value for full intraday replay (up to 24 hours). + /// Pass this as the start parameter to get all available intraday data. + /// Maps to Databento's start=0 convention. /// - public bool Subscribe(Symbol symbol, Resolution resolution, TickType tickType) + public static readonly DateTime FullReplay = DateTime.MinValue; + + /// + /// Subscribes to live data for a symbol with optional intraday replay + /// + /// The symbol to subscribe to + /// Data resolution + /// Type of tick data + /// Optional start time for intraday historical replay. + /// Use DateTime.MinValue (or FullReplay constant) for full intraday replay (start=0). + /// Use a specific DateTime for replay from that time. + /// Use null for live-only (no replay). + public bool Subscribe(Symbol symbol, Resolution resolution, TickType tickType, DateTime? start = null) { if (!IsConnected || _writer == null) { @@ -219,8 +233,29 @@ public bool Subscribe(Symbol symbol, Resolution resolution, TickType tickType) var databentoSymbol = MapSymbolToDataBento(symbol); var schema = GetSchema(resolution, tickType); - // subscribe + // Build subscription message with optional start time for intraday replay var subscribeMessage = $"schema={schema}|stype_in=parent|symbols={databentoSymbol}"; + + // Append start parameter based on value: + // - DateTime.MinValue (sentinel) -> start=0 (full intraday replay) + // - Specific DateTime -> start= + // - null -> no start parameter (live only) + if (start.HasValue) + { + if (start.Value == DateTime.MinValue) + { + // Sentinel: DateTime.MinValue -> start=0 (full intraday replay, up to 24 hours) + subscribeMessage += "|start=0"; + Log.Trace($"DatabentoRawClient.Subscribe(): Using FULL intraday replay (start=0)"); + } + else + { + // Specific timestamp for partial replay + var startStr = start.Value.ToUniversalTime().ToString("yyyy-MM-ddTHH:mm:ss"); + subscribeMessage += $"|start={startStr}"; + Log.Trace($"DatabentoRawClient.Subscribe(): Using intraday replay from {startStr}"); + } + } Log.Trace($"DatabentoRawClient.Subscribe(): Subscribing with message: {subscribeMessage}"); // Send subscribe message @@ -230,11 +265,24 @@ public bool Subscribe(Symbol symbol, Resolution resolution, TickType tickType) _subscriptions.TryAdd(symbol, (resolution, tickType)); Log.Trace($"DatabentoRawClient.Subscribe(): Subscribed to {symbol} ({databentoSymbol}) at {resolution} resolution for {tickType}"); - // If subscribing to quote ticks, also subscribe to trade ticks - if (tickType == TickType.Quote && resolution == Resolution.Tick) + // Also subscribe to trade data for OHLCV bars (for bar generation) + // This is needed because Quote data (mbp-1) doesn't produce TradeBars + if (tickType == TickType.Quote) { var tradeSchema = GetSchema(resolution, TickType.Trade); var tradeSubscribeMessage = $"schema={tradeSchema}|stype_in=parent|symbols={databentoSymbol}"; + if (start.HasValue) + { + if (start.Value == DateTime.MinValue) + { + tradeSubscribeMessage += "|start=0"; + } + else + { + var startStr = start.Value.ToUniversalTime().ToString("yyyy-MM-ddTHH:mm:ss"); + tradeSubscribeMessage += $"|start={startStr}"; + } + } Log.Trace($"DatabentoRawClient.Subscribe(): Also subscribing to trades with message: {tradeSubscribeMessage}"); _writer.WriteLine(tradeSubscribeMessage); } @@ -477,12 +525,12 @@ private void HandleOHLCVMessage(JsonElement root, JsonElement header) root.TryGetProperty("close", out var closeElement) && root.TryGetProperty("volume", out var volumeElement)) { - // Parse prices + // Parse prices and volume (all are strings in Databento JSON) var openRaw = long.Parse(openElement.GetString()!); var highRaw = long.Parse(highElement.GetString()!); var lowRaw = long.Parse(lowElement.GetString()!); var closeRaw = long.Parse(closeElement.GetString()!); - var volume = volumeElement.GetInt64(); + var volume = long.Parse(volumeElement.GetString()!); var open = openRaw * PriceScaleFactor; var high = highRaw * PriceScaleFactor; @@ -511,7 +559,9 @@ private void HandleOHLCVMessage(JsonElement root, JsonElement header) period ); - Log.Trace($"DatabentoRawClient: OHLCV bar: {matchedSymbol} O={open} H={high} L={low} C={close} V={volume} at {timestamp}"); + // OHLCV bars are now only used for historical replay, not live data + // Live data uses trades schema which gets consolidated by LEAN's aggregator + Log.Trace($"DatabentoRawClient: OHLCV bar (historical replay): {matchedSymbol} O={open} H={high} L={low} C={close} V={volume} at {timestamp}"); DataReceived?.Invoke(this, tradeBar); } } @@ -582,7 +632,7 @@ private void HandleMBPMessage(JsonElement root, JsonElement header) // QuantConnect convention: Quote ticks should have zero Price and Quantity quoteTick.Quantity = 0; - Log.Trace($"DatabentoRawClient: Quote tick: {matchedSymbol} Bid={quoteTick.BidPrice}x{quoteTick.BidSize} Ask={quoteTick.AskPrice}x{quoteTick.AskSize}"); + // Log.Trace removed - too spammy (millions of ticks) DataReceived?.Invoke(this, quoteTick); } } @@ -639,7 +689,7 @@ private void HandleTradeTickMessage(JsonElement root, JsonElement header) AskSize = 0 }; - Log.Trace($"DatabentoRawClient: Trade tick: {matchedSymbol} Price={price} Quantity={size}"); + // Log.Trace removed - too spammy (millions of ticks) DataReceived?.Invoke(this, tradeTick); } } @@ -656,42 +706,98 @@ private string MapSymbolToDataBento(Symbol symbol) { if (symbol.SecurityType == SecurityType.Future) { - // For DataBento, use the root symbol with .FUT suffix for parent subscription - // ES19Z25 -> ES.FUT + string root; + + // Check if this is a canonical (continuous) symbol like "/YM" + if (symbol.IsCanonical()) + { + // Canonical symbol - use ROOT.FUT format for parent subscription + // Databento will resolve to front month automatically + root = symbol.ID.Symbol; // e.g., "YM" from canonical "/YM" + Log.Trace($"DatabentoRawClient.MapSymbolToDataBento(): Canonical symbol {symbol} mapped to {root}.FUT"); + return $"{root}.FUT"; + } + + // Specific contract symbol like "ES19Z25" - extract root var value = symbol.Value; - - // Extract root by removing digits and month codes - var root = new string(value.TakeWhile(c => !char.IsDigit(c)).ToArray()); - + root = new string(value.TakeWhile(c => !char.IsDigit(c)).ToArray()); + + // Use parent subscription format for specific contracts too return $"{root}.FUT"; } return symbol.Value; } + /// + /// CME quarterly month codes: H=Mar, M=Jun, U=Sep, Z=Dec + /// + private static readonly char[] QuarterlyMonthCodes = { 'H', 'M', 'U', 'Z' }; + private static readonly int[] QuarterlyMonths = { 3, 6, 9, 12 }; + + /// + /// Gets the front-month contract symbol for quarterly futures (ES, YM, NQ, etc.) + /// CME index futures expire on 3rd Friday of contract month + /// + private string GetFrontMonthContract(string root, DateTime now) + { + // Find the next quarterly expiry that hasn't passed + // Roll to next contract ~1 week before expiry for safety + var rollBuffer = TimeSpan.FromDays(7); + + for (int yearOffset = 0; yearOffset <= 1; yearOffset++) + { + var year = now.Year + yearOffset; + var yearCode = (year % 100).ToString("D2"); // "25" for 2025 + + foreach (var i in Enumerable.Range(0, 4)) + { + var month = QuarterlyMonths[i]; + var monthCode = QuarterlyMonthCodes[i]; + + // Skip months in previous year iterations + if (yearOffset == 0 && month < now.Month) + continue; + + // Calculate 3rd Friday of the contract month (approximate expiry) + var firstDay = new DateTime(year, month, 1); + var firstFriday = firstDay.AddDays((DayOfWeek.Friday - firstDay.DayOfWeek + 7) % 7); + var thirdFriday = firstFriday.AddDays(14); + var rollDate = thirdFriday - rollBuffer; + + // If we're past the roll date, skip to next contract + if (now > rollDate) + continue; + + // Found the front month contract + // Format: YMH25 (root + month code + 2-digit year) + return $"{root}{monthCode}{yearCode}"; + } + } + + // Fallback - shouldn't happen + var fallbackYear = (now.Year % 100).ToString("D2"); + return $"{root}H{fallbackYear}"; + } + /// /// Pick Databento schema from Lean resolution/ticktype + /// For live streaming, we always use tick-level data (trades, mbp-1) because + /// the LEAN aggregator expects to consolidate tick data into bars. + /// OHLCV schemas are only appropriate for historical data requests. /// private string GetSchema(Resolution resolution, TickType tickType) { if (tickType == TickType.Trade) { - if (resolution == Resolution.Tick) - return "trades"; - if (resolution == Resolution.Second) - return "ohlcv-1s"; - if (resolution == Resolution.Minute) - return "ohlcv-1m"; - if (resolution == Resolution.Hour) - return "ohlcv-1h"; - if (resolution == Resolution.Daily) - return "ohlcv-1d"; + // Always use trades schema for live data - the aggregator will consolidate + // This ensures proper data flow through LEAN's consolidation pipeline + return "trades"; } else if (tickType == TickType.Quote) { - // top of book - if (resolution == Resolution.Tick || resolution == Resolution.Second || resolution == Resolution.Minute || resolution == Resolution.Hour || resolution == Resolution.Daily) - return "mbp-1"; + // top of book - mbp-1 provides tick-level quote data + return "mbp-1"; } else if (tickType == TickType.OpenInterest) {