diff --git a/.gitignore b/.gitignore index bd0f10e..ebb4bf2 100644 --- a/.gitignore +++ b/.gitignore @@ -63,3 +63,7 @@ build/ bld/ [Bb]in/ [Oo]bj/ + +# Test artifacts +test-results/ +test-config.json diff --git a/QuantConnect.DataBento.Tests/DataBentoRawLiveClientTests.cs b/QuantConnect.DataBento.Tests/DataBentoRawLiveClientTests.cs index 7df71a4..8402c85 100644 --- a/QuantConnect.DataBento.Tests/DataBentoRawLiveClientTests.cs +++ b/QuantConnect.DataBento.Tests/DataBentoRawLiveClientTests.cs @@ -249,5 +249,51 @@ public void SchemaResolutionMappingWorksCorrectly() } }); } + + [Test] + [TestCase("MYMH6", "MYM", 3, 2026)] // MYM March 2026 (bug case - month code H is in root) + [TestCase("ESZ25", "ES", 12, 2025)] // ES December 2025 + [TestCase("NQM6", "NQ", 6, 2026)] // NQ June 2026 + [TestCase("YMH26", "YM", 3, 2026)] // YM March 2026 (2-digit year) + [TestCase("MESF7", "MES", 1, 2027)] // Micro E-mini S&P January 2027 + [TestCase("CLG26", "CL", 2, 2026)] // Crude Oil February 2026 + public void ParseDatabentoContractSymbol_ParsesCorrectly(string databentoSymbol, string expectedRoot, int expectedMonth, int expectedYear) + { + // Create a canonical symbol for reference (market doesn't matter for parsing test) + var canonicalSymbol = Symbol.Create("ES", SecurityType.Future, Market.CME); + + var result = _client.ParseDatabentoContractSymbol(canonicalSymbol, databentoSymbol); + + Assert.IsNotNull(result, $"Should successfully parse {databentoSymbol}"); + Assert.AreEqual(expectedRoot, result.ID.Symbol, $"Root symbol should be {expectedRoot}"); + Assert.AreEqual(expectedMonth, result.ID.Date.Month, $"Month should be {expectedMonth}"); + Assert.AreEqual(expectedYear, result.ID.Date.Year, $"Year should be {expectedYear}"); + + Log.Trace($"Successfully parsed {databentoSymbol} -> {result} (expiry: {result.ID.Date:yyyy-MM-dd})"); + } + + [Test] + [TestCase("MYMH6-MYMM6")] // Spread symbol + [TestCase("ESZ25-ESH26")] // Spread symbol + public void ParseDatabentoContractSymbol_SkipsSpreads(string spreadSymbol) + { + var canonicalSymbol = Symbol.Create("ES", SecurityType.Future, Market.CME); + + var result = _client.ParseDatabentoContractSymbol(canonicalSymbol, spreadSymbol); + + Assert.IsNull(result, $"Should return null for spread symbol {spreadSymbol}"); + } + + [Test] + [TestCase("")] + [TestCase(null)] + public void ParseDatabentoContractSymbol_HandlesEmptyInput(string input) + { + var canonicalSymbol = Symbol.Create("ES", SecurityType.Future, Market.CME); + + var result = _client.ParseDatabentoContractSymbol(canonicalSymbol, input); + + Assert.IsNull(result, "Should return null for empty/null input"); + } } } diff --git a/QuantConnect.DataBento.Tests/QuantConnect.DataSource.DataBento.Tests.csproj b/QuantConnect.DataBento.Tests/QuantConnect.DataSource.DataBento.Tests.csproj index d2d7ef1..4f0ae1a 100644 --- a/QuantConnect.DataBento.Tests/QuantConnect.DataSource.DataBento.Tests.csproj +++ b/QuantConnect.DataBento.Tests/QuantConnect.DataSource.DataBento.Tests.csproj @@ -1,6 +1,6 @@ - net9.0 + net10.0 QuantConnect.DataLibrary.Tests diff --git a/QuantConnect.DataBento/DataBentoDataDownloader.cs b/QuantConnect.DataBento/DataBentoDataDownloader.cs index 2a44e55..6e70d1f 100644 --- a/QuantConnect.DataBento/DataBentoDataDownloader.cs +++ b/QuantConnect.DataBento/DataBentoDataDownloader.cs @@ -236,8 +236,8 @@ private string MapSymbolToDataBento(Symbol symbol) return symbol.Value; } - /// Class for parsing trade data from Databento - /// Really used as a map from the http request to then get it in QC data structures + /// Class for parsing OHLCV bar data from Databento + /// Databento returns prices as fixed-point integers scaled by 10^9 private class DatabentoBar { [Name("ts_event")] @@ -247,19 +247,23 @@ private class DatabentoBar .AddTicks((TimestampNanos % 1_000_000_000) / 100).UtcDateTime; [Name("open")] - public decimal Open { get; set; } + public long OpenRaw { get; set; } + public decimal Open => OpenRaw * PriceScaleFactor; [Name("high")] - public decimal High { get; set; } + public long HighRaw { get; set; } + public decimal High => HighRaw * PriceScaleFactor; [Name("low")] - public decimal Low { get; set; } + public long LowRaw { get; set; } + public decimal Low => LowRaw * PriceScaleFactor; [Name("close")] - public decimal Close { get; set; } + public long CloseRaw { get; set; } + public decimal Close => CloseRaw * PriceScaleFactor; [Name("volume")] - public decimal Volume { get; set; } + public long Volume { get; set; } } private class DatabentoTrade diff --git a/QuantConnect.DataBento/DataBentoDataProvider.cs b/QuantConnect.DataBento/DataBentoDataProvider.cs index 8cf6015..f810688 100644 --- a/QuantConnect.DataBento/DataBentoDataProvider.cs +++ b/QuantConnect.DataBento/DataBentoDataProvider.cs @@ -27,14 +27,23 @@ using QuantConnect.Packets; using QuantConnect.Securities; using System.Collections.Concurrent; +using System.ComponentModel.Composition; namespace QuantConnect.Lean.DataSource.DataBento { /// - /// Implementation of Custom Data Provider + /// Implementation of Custom Data Provider with Universe support for futures contract resolution /// - public class DataBentoProvider : IDataQueueHandler + [Export(typeof(IDataQueueHandler))] + public class DataBentoProvider : IDataQueueHandler, IDataQueueUniverseProvider { + /// + /// Track resolved contracts per canonical symbol. + /// When Databento sends symbol mappings (MYM.FUT -> MYMH6), we parse them + /// into proper LEAN Symbols and store here so LookupSymbols() can return them. + /// + private readonly ConcurrentDictionary> _resolvedContracts = new(); + private readonly IDataAggregator _dataAggregator = Composer.Instance.GetExportedValueByTypeName( Config.Get("data-aggregator", "QuantConnect.Lean.Engine.DataFeeds.AggregationManager"), forceTypeNameOnExisting: false); private EventBasedDataQueueHandlerSubscriptionManager _subscriptionManager = null!; @@ -49,6 +58,15 @@ public class DataBentoProvider : IDataQueueHandler private readonly MarketHoursDatabase _marketHoursDatabase = MarketHoursDatabase.FromDataFolder(); private readonly ConcurrentDictionary _symbolExchangeTimeZones = new(); + /// + /// Replay start time for intraday historical data. + /// - DateTime.MinValue = full replay (start=0, up to 24 hours) + /// - Specific DateTime = replay from that time + /// - null = live only (no replay) - but we default to 15 min ago if not configured + /// Configure via "databento-replay-start" in config.json: "0" for full, ISO datetime for specific, absent for 15-min default + /// + private readonly DateTime? _replayStart; + /// /// Returns true if we're currently connected to the Data Provider /// @@ -65,6 +83,11 @@ public DataBentoProvider() throw new ArgumentException("DataBento API key is required. Set 'databento-api-key' in configuration."); } + // Parse replay start config: "0" for full replay, ISO datetime for specific, absent for 15-min default + var replayStartConfig = Config.Get("databento-replay-start", ""); + _replayStart = ParseReplayStart(replayStartConfig); + Log.Trace($"DataBentoProvider: Replay start configured as: {FormatReplayStart(_replayStart)}"); + _dataDownloader = new DataBentoDataDownloader(_apiKey); Initialize(); } @@ -104,8 +127,10 @@ private void Initialize() return false; } - var resolution = config.Resolution > Resolution.Tick ? Resolution.Tick : config.Resolution; - if (!_client.Subscribe(config.Symbol, resolution, config.TickType)) + // Use intraday replay to get historical data at startup + var replayStart = GetEffectiveReplayStart(); + Log.Trace($"DataBentoProvider.SubscribeImpl(): Using intraday replay: {FormatReplayStart(replayStart)}"); + if (!_client.Subscribe(config.Symbol, config.Resolution, config.TickType, replayStart)) { Log.Error($"Failed to subscribe to {config.Symbol}"); return false; @@ -142,6 +167,7 @@ private void Initialize() _client = new DatabentoRawClient(_apiKey); _client.DataReceived += OnDataReceived; _client.ConnectionStatusChanged += OnConnectionStatusChanged; + _client.SymbolMappingReceived += OnSymbolMappingReceived; // Connect to live gateway Log.Trace("DataBentoProvider.Initialize(): Attempting connection to DataBento live gateway"); @@ -258,6 +284,59 @@ public void Dispose() } } + /// + /// Parses the databento-replay-start config value. + /// + /// Config value: "0" for full, ISO datetime for specific, empty for default + /// DateTime.MinValue for full replay, specific DateTime, or null for default (15-min lookback) + private static DateTime? ParseReplayStart(string configValue) + { + if (string.IsNullOrWhiteSpace(configValue)) + { + // Empty/absent = use default (will be calculated as 15 min ago at subscribe time) + return null; + } + + if (configValue == "0") + { + // "0" = full intraday replay (sentinel value) + return DateTime.MinValue; + } + + // Try to parse as ISO datetime + if (DateTime.TryParse(configValue, out var parsed)) + { + return parsed.ToUniversalTime(); + } + + Log.Error($"DataBentoProvider: Invalid databento-replay-start value '{configValue}'. Use '0' for full replay, ISO datetime, or omit for 15-min default."); + return null; + } + + /// + /// Formats replay start for logging. + /// + private static string FormatReplayStart(DateTime? replayStart) + { + if (!replayStart.HasValue) + return "15-minute lookback (default)"; + if (replayStart.Value == DateTime.MinValue) + return "FULL intraday replay (start=0)"; + return $"from {replayStart.Value:yyyy-MM-ddTHH:mm:ss} UTC"; + } + + /// + /// Gets the effective replay start time for subscription. + /// If _replayStart is null (default), calculates 15 minutes ago. + /// + private DateTime? GetEffectiveReplayStart() + { + if (_replayStart.HasValue) + return _replayStart.Value; + // Default: 15 minutes ago + return DateTime.UtcNow.AddMinutes(-15); + } + /// /// Checks if this Data provider supports the specified symbol /// @@ -265,8 +344,8 @@ public void Dispose() /// returns true if Data Provider supports the specified symbol; otherwise false private bool CanSubscribe(Symbol symbol) { + // Reject universe symbols but allow canonical (continuous) futures return !symbol.Value.Contains("universe", StringComparison.InvariantCultureIgnoreCase) && - !symbol.IsCanonical() && IsSecurityTypeSupported(symbol.SecurityType); } @@ -352,18 +431,26 @@ private void OnDataReceived(object? sender, BaseData data) { tick.Time = GetTickTime(tick.Symbol, tick.Time); _dataAggregator.Update(tick); - - Log.Trace($"DataBentoProvider.OnDataReceived(): Updated tick - Symbol: {tick.Symbol}, " + - $"TickType: {tick.TickType}, Price: {tick.Value}, Quantity: {tick.Quantity}"); + // Log.Trace removed - too spammy (millions of ticks) } else if (data is TradeBar tradeBar) { tradeBar.Time = GetTickTime(tradeBar.Symbol, tradeBar.Time); tradeBar.EndTime = GetTickTime(tradeBar.Symbol, tradeBar.EndTime); - _dataAggregator.Update(tradeBar); - Log.Trace($"DataBentoProvider.OnDataReceived(): Updated TradeBar - Symbol: {tradeBar.Symbol}, " + - $"O:{tradeBar.Open} H:{tradeBar.High} L:{tradeBar.Low} C:{tradeBar.Close} V:{tradeBar.Volume}"); + // Classify as replay vs live data based on age + // Replay data (from start= parameter) is historical, live data is recent + var now = DateTime.UtcNow; + var dataAgeSeconds = (now - tradeBar.EndTime.ConvertToUtc(TimeZones.Chicago)).TotalSeconds; + var dataType = dataAgeSeconds > 60 ? "REPLAY" : "LIVE"; + + // Log first 10 bars and every 50th bar thereafter + _tradeBarCount++; + if (_tradeBarCount <= 10 || _tradeBarCount % 50 == 0) + { + Log.Trace($"DataBentoProvider.OnDataReceived(): [{dataType}] TradeBar #{_tradeBarCount} for {tradeBar.Symbol} at {tradeBar.Time} (age: {dataAgeSeconds:F0}s)"); + } + _dataAggregator.Update(tradeBar); } else { @@ -377,6 +464,8 @@ private void OnDataReceived(object? sender, BaseData data) } } + private int _tradeBarCount = 0; + /// /// Handles connection status changes from the live client /// @@ -392,7 +481,7 @@ private void OnConnectionStatusChanged(object? sender, bool isConnected) _sessionStarted = false; } - // Resubscribe to all active subscriptions + // Resubscribe to all active subscriptions foreach (var config in _activeSubscriptionConfigs) { _client.Subscribe(config.Symbol, config.Resolution, config.TickType); @@ -412,5 +501,80 @@ private void OnConnectionStatusChanged(object? sender, bool isConnected) } } } + + /// + /// Handles symbol mapping events from Databento. + /// When Databento resolves a continuous symbol (MYM.FUT) to a specific contract (MYMH6), + /// we store the mapping so LEAN's universe selection can find the contract. + /// + private void OnSymbolMappingReceived(object? sender, SymbolMappingEventArgs e) + { + if (e.ContractSymbol == null) + { + Log.Trace($"DataBentoProvider.OnSymbolMappingReceived(): No contract symbol for {e.DatabentoSymbol} (canonical: {e.CanonicalSymbol})"); + return; + } + + // Add the resolved contract to our tracking dictionary + var contracts = _resolvedContracts.GetOrAdd(e.CanonicalSymbol, _ => new HashSet()); + lock (contracts) + { + if (contracts.Add(e.ContractSymbol)) + { + Log.Trace($"DataBentoProvider.OnSymbolMappingReceived(): Resolved {e.CanonicalSymbol} -> {e.ContractSymbol} (from {e.DatabentoSymbol})"); + } + } + } + + #region IDataQueueUniverseProvider Implementation + + /// + /// Returns the symbols available for the specified canonical symbol. + /// For futures, returns the resolved contracts (e.g., MYMH6 for canonical /MYM). + /// + /// The canonical symbol to lookup + /// Whether to include expired contracts + /// Expected security currency (not used) + /// Enumerable of resolved contract Symbols + public IEnumerable LookupSymbols(Symbol symbol, bool includeExpired, string securityCurrency = null) + { + Log.Trace($"DataBentoProvider.LookupSymbols(): Looking up symbols for {symbol}, includeExpired={includeExpired}"); + + if (_resolvedContracts.TryGetValue(symbol, out var contracts)) + { + lock (contracts) + { + var contractList = contracts.ToList(); + Log.Trace($"DataBentoProvider.LookupSymbols(): Found {contractList.Count} contracts for {symbol}"); + + // If not including expired, filter by expiry date + if (!includeExpired) + { + var now = DateTime.UtcNow.Date; + contractList = contractList.Where(c => c.ID.Date >= now).ToList(); + Log.Trace($"DataBentoProvider.LookupSymbols(): After expiry filter: {contractList.Count} contracts"); + } + + return contractList; + } + } + + Log.Trace($"DataBentoProvider.LookupSymbols(): No contracts found for {symbol}"); + return Enumerable.Empty(); + } + + /// + /// Returns whether selection can take place. + /// Selection is allowed when we're connected and have resolved at least one contract. + /// + /// True if universe selection can proceed + public bool CanPerformSelection() + { + var canPerform = IsConnected && _resolvedContracts.Any(); + Log.Trace($"DataBentoProvider.CanPerformSelection(): {canPerform} (IsConnected={IsConnected}, ResolvedContracts={_resolvedContracts.Count})"); + return canPerform; + } + + #endregion } } diff --git a/QuantConnect.DataBento/DataBentoHistoryProivder.cs b/QuantConnect.DataBento/DataBentoHistoryProivder.cs index a93b50e..5202fa9 100644 --- a/QuantConnect.DataBento/DataBentoHistoryProivder.cs +++ b/QuantConnect.DataBento/DataBentoHistoryProivder.cs @@ -101,10 +101,13 @@ public override void Initialize(HistoryProviderInitializeParameters parameters) /// An enumerable of BaseData points public IEnumerable? GetHistory(HistoryRequest request) { + // DIAGNOSTIC: Log that we're being called + Log.Trace($"DataBentoHistoryProvider.GetHistory(): Symbol={request.Symbol}, Type={request.DataType}, TickType={request.TickType}, Resolution={request.Resolution}, IsCanonical={request.Symbol.IsCanonical()}, Start={request.StartTimeUtc}, End={request.EndTimeUtc}"); + if (request.Symbol.IsCanonical() || !IsSupported(request.Symbol.SecurityType, request.DataType, request.TickType, request.Resolution)) { - // It is Logged in IsSupported(...) + Log.Trace($"DataBentoHistoryProvider.GetHistory(): Rejecting request - IsCanonical={request.Symbol.IsCanonical()}, IsSupported={(request.Symbol.IsCanonical() ? "N/A" : IsSupported(request.Symbol.SecurityType, request.DataType, request.TickType, request.Resolution).ToString())}"); return null; } diff --git a/QuantConnect.DataBento/DataBentoRawLiveClient.cs b/QuantConnect.DataBento/DataBentoRawLiveClient.cs index 2e7bb97..c7fb082 100644 --- a/QuantConnect.DataBento/DataBentoRawLiveClient.cs +++ b/QuantConnect.DataBento/DataBentoRawLiveClient.cs @@ -59,6 +59,12 @@ public class DatabentoRawClient : IDisposable /// public event EventHandler? ConnectionStatusChanged; + /// + /// Event fired when a symbol mapping is received from Databento. + /// This maps canonical futures symbols to specific contract symbols. + /// + public event EventHandler? SymbolMappingReceived; + /// /// Gets whether the client is currently connected /// @@ -202,9 +208,23 @@ private static string ComputeSHA256(string input) } /// - /// Subscribes to live data for a symbol + /// Sentinel value for full intraday replay (up to 24 hours). + /// Pass this as the start parameter to get all available intraday data. + /// Maps to Databento's start=0 convention. /// - public bool Subscribe(Symbol symbol, Resolution resolution, TickType tickType) + public static readonly DateTime FullReplay = DateTime.MinValue; + + /// + /// Subscribes to live data for a symbol with optional intraday replay + /// + /// The symbol to subscribe to + /// Data resolution + /// Type of tick data + /// Optional start time for intraday historical replay. + /// Use DateTime.MinValue (or FullReplay constant) for full intraday replay (start=0). + /// Use a specific DateTime for replay from that time. + /// Use null for live-only (no replay). + public bool Subscribe(Symbol symbol, Resolution resolution, TickType tickType, DateTime? start = null) { if (!IsConnected || _writer == null) { @@ -219,8 +239,29 @@ public bool Subscribe(Symbol symbol, Resolution resolution, TickType tickType) var databentoSymbol = MapSymbolToDataBento(symbol); var schema = GetSchema(resolution, tickType); - // subscribe + // Build subscription message with optional start time for intraday replay var subscribeMessage = $"schema={schema}|stype_in=parent|symbols={databentoSymbol}"; + + // Append start parameter based on value: + // - DateTime.MinValue (sentinel) -> start=0 (full intraday replay) + // - Specific DateTime -> start= + // - null -> no start parameter (live only) + if (start.HasValue) + { + if (start.Value == DateTime.MinValue) + { + // Sentinel: DateTime.MinValue -> start=0 (full intraday replay, up to 24 hours) + subscribeMessage += "|start=0"; + Log.Trace($"DatabentoRawClient.Subscribe(): Using FULL intraday replay (start=0)"); + } + else + { + // Specific timestamp for partial replay + var startStr = start.Value.ToUniversalTime().ToString("yyyy-MM-ddTHH:mm:ss"); + subscribeMessage += $"|start={startStr}"; + Log.Trace($"DatabentoRawClient.Subscribe(): Using intraday replay from {startStr}"); + } + } Log.Trace($"DatabentoRawClient.Subscribe(): Subscribing with message: {subscribeMessage}"); // Send subscribe message @@ -230,11 +271,24 @@ public bool Subscribe(Symbol symbol, Resolution resolution, TickType tickType) _subscriptions.TryAdd(symbol, (resolution, tickType)); Log.Trace($"DatabentoRawClient.Subscribe(): Subscribed to {symbol} ({databentoSymbol}) at {resolution} resolution for {tickType}"); - // If subscribing to quote ticks, also subscribe to trade ticks - if (tickType == TickType.Quote && resolution == Resolution.Tick) + // Also subscribe to trade data for OHLCV bars (for bar generation) + // This is needed because Quote data (mbp-1) doesn't produce TradeBars + if (tickType == TickType.Quote) { var tradeSchema = GetSchema(resolution, TickType.Trade); var tradeSubscribeMessage = $"schema={tradeSchema}|stype_in=parent|symbols={databentoSymbol}"; + if (start.HasValue) + { + if (start.Value == DateTime.MinValue) + { + tradeSubscribeMessage += "|start=0"; + } + else + { + var startStr = start.Value.ToUniversalTime().ToString("yyyy-MM-ddTHH:mm:ss"); + tradeSubscribeMessage += $"|start={startStr}"; + } + } Log.Trace($"DatabentoRawClient.Subscribe(): Also subscribing to trades with message: {tradeSubscribeMessage}"); _writer.WriteLine(tradeSubscribeMessage); } @@ -375,26 +429,45 @@ private void ProcessSingleMessage(string message) } else if (rtype == 22) { - // Symbol mapping message + // Symbol mapping message - maps continuous (MYM.FUT) to specific contract (MYMH6) if (root.TryGetProperty("stype_in_symbol", out var inSymbol) && root.TryGetProperty("stype_out_symbol", out var outSymbol) && headerElement.TryGetProperty("instrument_id", out var instId)) { var instrumentId = instId.GetInt64(); var outSymbolStr = outSymbol.GetString(); + var inSymbolStr = inSymbol.GetString(); - Log.Trace($"DatabentoRawClient: Symbol mapping: {inSymbol.GetString()} -> {outSymbolStr} (instrument_id: {instrumentId})"); + Log.Trace($"DatabentoRawClient: Symbol mapping: {inSymbolStr} -> {outSymbolStr} (instrument_id: {instrumentId})"); - // Find the subscription that matches this symbol + // Find the canonical subscription that matches this symbol + Symbol? canonicalSymbol = null; foreach (var kvp in _subscriptions) { - var leanSymbol = kvp.Key; - if (outSymbolStr != null) - { - _instrumentIdToSymbol[instrumentId] = leanSymbol; - Log.Trace($"DatabentoRawClient: Mapped instrument_id {instrumentId} to {leanSymbol}"); - break; - } + canonicalSymbol = kvp.Key; + break; // Take the first subscription as canonical + } + + if (canonicalSymbol != null && outSymbolStr != null) + { + // Parse the Databento symbol to get a proper LEAN contract Symbol + var contractSymbol = ParseDatabentoContractSymbol(canonicalSymbol, outSymbolStr); + + // Store the contract symbol (if parsed) or canonical (as fallback) + // CRITICAL: Route data to contract symbol, not canonical, so algorithm + // receives data on tradable contracts (e.g., MYMH6 not just /MYM) + var symbolToStore = contractSymbol ?? canonicalSymbol; + _instrumentIdToSymbol[instrumentId] = symbolToStore; + + Log.Trace($"DatabentoRawClient: Mapped instrument_id {instrumentId} to {symbolToStore}" + + (contractSymbol != null ? $" (parsed from {outSymbolStr})" : " (canonical fallback)")); + + // Raise event so DataBentoProvider can track resolved contracts + SymbolMappingReceived?.Invoke(this, new SymbolMappingEventArgs( + canonicalSymbol, + outSymbolStr, + instrumentId, + contractSymbol)); } } return; @@ -477,12 +550,12 @@ private void HandleOHLCVMessage(JsonElement root, JsonElement header) root.TryGetProperty("close", out var closeElement) && root.TryGetProperty("volume", out var volumeElement)) { - // Parse prices + // Parse prices and volume (all are strings in Databento JSON) var openRaw = long.Parse(openElement.GetString()!); var highRaw = long.Parse(highElement.GetString()!); var lowRaw = long.Parse(lowElement.GetString()!); var closeRaw = long.Parse(closeElement.GetString()!); - var volume = volumeElement.GetInt64(); + var volume = long.Parse(volumeElement.GetString()!); var open = openRaw * PriceScaleFactor; var high = highRaw * PriceScaleFactor; @@ -511,7 +584,9 @@ private void HandleOHLCVMessage(JsonElement root, JsonElement header) period ); - Log.Trace($"DatabentoRawClient: OHLCV bar: {matchedSymbol} O={open} H={high} L={low} C={close} V={volume} at {timestamp}"); + // OHLCV bars are now only used for historical replay, not live data + // Live data uses trades schema which gets consolidated by LEAN's aggregator + Log.Trace($"DatabentoRawClient: OHLCV bar (historical replay): {matchedSymbol} O={open} H={high} L={low} C={close} V={volume} at {timestamp}"); DataReceived?.Invoke(this, tradeBar); } } @@ -582,7 +657,7 @@ private void HandleMBPMessage(JsonElement root, JsonElement header) // QuantConnect convention: Quote ticks should have zero Price and Quantity quoteTick.Quantity = 0; - Log.Trace($"DatabentoRawClient: Quote tick: {matchedSymbol} Bid={quoteTick.BidPrice}x{quoteTick.BidSize} Ask={quoteTick.AskPrice}x{quoteTick.AskSize}"); + // Log.Trace removed - too spammy (millions of ticks) DataReceived?.Invoke(this, quoteTick); } } @@ -639,7 +714,7 @@ private void HandleTradeTickMessage(JsonElement root, JsonElement header) AskSize = 0 }; - Log.Trace($"DatabentoRawClient: Trade tick: {matchedSymbol} Price={price} Quantity={size}"); + // Log.Trace removed - too spammy (millions of ticks) DataReceived?.Invoke(this, tradeTick); } } @@ -656,42 +731,208 @@ private string MapSymbolToDataBento(Symbol symbol) { if (symbol.SecurityType == SecurityType.Future) { - // For DataBento, use the root symbol with .FUT suffix for parent subscription - // ES19Z25 -> ES.FUT + string root; + + // Check if this is a canonical (continuous) symbol like "/YM" + if (symbol.IsCanonical()) + { + // Canonical symbol - use ROOT.FUT format for parent subscription + // Databento will resolve to front month automatically + root = symbol.ID.Symbol; // e.g., "YM" from canonical "/YM" + Log.Trace($"DatabentoRawClient.MapSymbolToDataBento(): Canonical symbol {symbol} mapped to {root}.FUT"); + return $"{root}.FUT"; + } + + // Specific contract symbol like "ES19Z25" - extract root var value = symbol.Value; - - // Extract root by removing digits and month codes - var root = new string(value.TakeWhile(c => !char.IsDigit(c)).ToArray()); - + root = new string(value.TakeWhile(c => !char.IsDigit(c)).ToArray()); + + // Use parent subscription format for specific contracts too return $"{root}.FUT"; } return symbol.Value; } + /// + /// CME month codes: F=Jan, G=Feb, H=Mar, J=Apr, K=May, M=Jun, N=Jul, Q=Aug, U=Sep, V=Oct, X=Nov, Z=Dec + /// + private static readonly Dictionary MonthCodeToMonth = new() + { + { 'F', 1 }, { 'G', 2 }, { 'H', 3 }, { 'J', 4 }, { 'K', 5 }, { 'M', 6 }, + { 'N', 7 }, { 'Q', 8 }, { 'U', 9 }, { 'V', 10 }, { 'X', 11 }, { 'Z', 12 } + }; + + /// + /// CME quarterly month codes: H=Mar, M=Jun, U=Sep, Z=Dec + /// + private static readonly char[] QuarterlyMonthCodes = { 'H', 'M', 'U', 'Z' }; + private static readonly int[] QuarterlyMonths = { 3, 6, 9, 12 }; + + /// + /// Parses a Databento contract symbol (e.g., "MYMH6") into a LEAN Symbol. + /// Format: ROOT + MONTH_CODE + YEAR (1 or 2 digits) + /// Examples: MYMH6 = MYM March 2026, ESZ25 = ES December 2025 + /// + /// The canonical LEAN symbol for market reference + /// The Databento symbol (e.g., "MYMH6") + /// A LEAN Symbol for the specific contract, or null if parsing fails + public Symbol? ParseDatabentoContractSymbol(Symbol canonicalSymbol, string databentoSymbol) + { + if (string.IsNullOrEmpty(databentoSymbol)) + return null; + + // Skip spread symbols (contain "-") + if (databentoSymbol.Contains('-')) + { + Log.Trace($"DatabentoRawClient.ParseDatabentoContractSymbol(): Skipping spread symbol: {databentoSymbol}"); + return null; + } + + try + { + // Parse format: ROOT + MONTH_CODE + YEAR (e.g., MYMH6, ESZ25) + // The month code is a single letter from the set: F,G,H,J,K,M,N,Q,U,V,X,Z + // We need to find the month code by searching from the end, since the root + // symbol can contain letters that match month codes (e.g., MYM contains M) + + // Find where the digits start from the end + int digitStartIndex = databentoSymbol.Length; + while (digitStartIndex > 0 && char.IsDigit(databentoSymbol[digitStartIndex - 1])) + { + digitStartIndex--; + } + + if (digitStartIndex == databentoSymbol.Length || digitStartIndex < 2) + { + Log.Trace($"DatabentoRawClient.ParseDatabentoContractSymbol(): Invalid format (no year digits or too short): {databentoSymbol}"); + return null; + } + + // Month code is the character just before the digits + var monthCodeIndex = digitStartIndex - 1; + var monthCode = char.ToUpper(databentoSymbol[monthCodeIndex]); + if (!MonthCodeToMonth.TryGetValue(monthCode, out var month)) + { + Log.Error($"DatabentoRawClient.ParseDatabentoContractSymbol(): Unknown month code '{monthCode}' in {databentoSymbol}"); + return null; + } + + // Root is everything before the month code + var root = databentoSymbol.Substring(0, monthCodeIndex); + if (string.IsNullOrEmpty(root)) + { + Log.Trace($"DatabentoRawClient.ParseDatabentoContractSymbol(): Empty root in {databentoSymbol}"); + return null; + } + + // Year is the digits at the end + var yearStr = databentoSymbol.Substring(digitStartIndex); + if (!int.TryParse(yearStr, out var yearDigits)) + { + Log.Error($"DatabentoRawClient.ParseDatabentoContractSymbol(): Invalid year '{yearStr}' in {databentoSymbol}"); + return null; + } + + // Convert to full year (6 -> 2026, 25 -> 2025) + int year; + if (yearDigits < 100) + { + // Assume 20xx for 2-digit years, handle century boundary + year = yearDigits < 50 ? 2000 + yearDigits : 1900 + yearDigits; + if (yearDigits < 10) + { + // Single digit like "6" means 2026 + year = 2020 + yearDigits; + } + } + else + { + year = yearDigits; + } + + // Calculate expiry date (3rd Friday of contract month for index futures) + var firstDay = new DateTime(year, month, 1); + var firstFriday = firstDay.AddDays((DayOfWeek.Friday - firstDay.DayOfWeek + 7) % 7); + var thirdFriday = firstFriday.AddDays(14); + + // Create the LEAN Symbol + var market = canonicalSymbol.ID.Market; + var contractSymbol = Symbol.CreateFuture(root, market, thirdFriday); + + Log.Trace($"DatabentoRawClient.ParseDatabentoContractSymbol(): Parsed {databentoSymbol} -> {contractSymbol} (expiry: {thirdFriday:yyyy-MM-dd})"); + return contractSymbol; + } + catch (Exception ex) + { + Log.Error($"DatabentoRawClient.ParseDatabentoContractSymbol(): Error parsing {databentoSymbol}: {ex.Message}"); + return null; + } + } + + /// + /// Gets the front-month contract symbol for quarterly futures (ES, YM, NQ, etc.) + /// CME index futures expire on 3rd Friday of contract month + /// + private string GetFrontMonthContract(string root, DateTime now) + { + // Find the next quarterly expiry that hasn't passed + // Roll to next contract ~1 week before expiry for safety + var rollBuffer = TimeSpan.FromDays(7); + + for (int yearOffset = 0; yearOffset <= 1; yearOffset++) + { + var year = now.Year + yearOffset; + var yearCode = (year % 100).ToString("D2"); // "25" for 2025 + + foreach (var i in Enumerable.Range(0, 4)) + { + var month = QuarterlyMonths[i]; + var monthCode = QuarterlyMonthCodes[i]; + + // Skip months in previous year iterations + if (yearOffset == 0 && month < now.Month) + continue; + + // Calculate 3rd Friday of the contract month (approximate expiry) + var firstDay = new DateTime(year, month, 1); + var firstFriday = firstDay.AddDays((DayOfWeek.Friday - firstDay.DayOfWeek + 7) % 7); + var thirdFriday = firstFriday.AddDays(14); + var rollDate = thirdFriday - rollBuffer; + + // If we're past the roll date, skip to next contract + if (now > rollDate) + continue; + + // Found the front month contract + // Format: YMH25 (root + month code + 2-digit year) + return $"{root}{monthCode}{yearCode}"; + } + } + + // Fallback - shouldn't happen + var fallbackYear = (now.Year % 100).ToString("D2"); + return $"{root}H{fallbackYear}"; + } + /// /// Pick Databento schema from Lean resolution/ticktype + /// For live streaming, we always use tick-level data (trades, mbp-1) because + /// the LEAN aggregator expects to consolidate tick data into bars. + /// OHLCV schemas are only appropriate for historical data requests. /// private string GetSchema(Resolution resolution, TickType tickType) { if (tickType == TickType.Trade) { - if (resolution == Resolution.Tick) - return "trades"; - if (resolution == Resolution.Second) - return "ohlcv-1s"; - if (resolution == Resolution.Minute) - return "ohlcv-1m"; - if (resolution == Resolution.Hour) - return "ohlcv-1h"; - if (resolution == Resolution.Daily) - return "ohlcv-1d"; + // Always use trades schema for live data - the aggregator will consolidate + // This ensures proper data flow through LEAN's consolidation pipeline + return "trades"; } else if (tickType == TickType.Quote) { - // top of book - if (resolution == Resolution.Tick || resolution == Resolution.Second || resolution == Resolution.Minute || resolution == Resolution.Hour || resolution == Resolution.Daily) - return "mbp-1"; + // top of book - mbp-1 provides tick-level quote data + return "mbp-1"; } else if (tickType == TickType.OpenInterest) { @@ -749,4 +990,39 @@ public void Dispose() _tcpClient?.Dispose(); } } + + /// + /// Event args for symbol mapping messages from Databento. + /// Contains the mapping from canonical symbol to resolved contract. + /// + public class SymbolMappingEventArgs : EventArgs + { + /// + /// The canonical LEAN symbol that was subscribed (e.g., /MYM) + /// + public Symbol CanonicalSymbol { get; } + + /// + /// The resolved contract symbol from Databento (e.g., MYMH6) + /// + public string DatabentoSymbol { get; } + + /// + /// The Databento instrument ID for this contract + /// + public long InstrumentId { get; } + + /// + /// The resolved LEAN contract Symbol with proper expiry + /// + public Symbol? ContractSymbol { get; } + + public SymbolMappingEventArgs(Symbol canonicalSymbol, string databentoSymbol, long instrumentId, Symbol? contractSymbol) + { + CanonicalSymbol = canonicalSymbol; + DatabentoSymbol = databentoSymbol; + InstrumentId = instrumentId; + ContractSymbol = contractSymbol; + } + } } diff --git a/QuantConnect.DataBento/QuantConnect.DataSource.DataBento.csproj b/QuantConnect.DataBento/QuantConnect.DataSource.DataBento.csproj index 718002c..39130b8 100644 --- a/QuantConnect.DataBento/QuantConnect.DataSource.DataBento.csproj +++ b/QuantConnect.DataBento/QuantConnect.DataSource.DataBento.csproj @@ -3,7 +3,7 @@ Release AnyCPU - net9.0 + net10.0 QuantConnect.Lean.DataSource.DataBento QuantConnect.Lean.DataSource.DataBento QuantConnect.Lean.DataSource.DataBento