Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ build/
bld/
[Bb]in/
[Oo]bj/

# Test artifacts
test-results/
test-config.json
46 changes: 46 additions & 0 deletions QuantConnect.DataBento.Tests/DataBentoRawLiveClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,5 +249,51 @@ public void SchemaResolutionMappingWorksCorrectly()
}
});
}

[Test]
[TestCase("MYMH6", "MYM", 3, 2026)] // MYM March 2026 (bug case - month code H is in root)
[TestCase("ESZ25", "ES", 12, 2025)] // ES December 2025
[TestCase("NQM6", "NQ", 6, 2026)] // NQ June 2026
[TestCase("YMH26", "YM", 3, 2026)] // YM March 2026 (2-digit year)
[TestCase("MESF7", "MES", 1, 2027)] // Micro E-mini S&P January 2027
[TestCase("CLG26", "CL", 2, 2026)] // Crude Oil February 2026
public void ParseDatabentoContractSymbol_ParsesCorrectly(string databentoSymbol, string expectedRoot, int expectedMonth, int expectedYear)
{
// Create a canonical symbol for reference (market doesn't matter for parsing test)
var canonicalSymbol = Symbol.Create("ES", SecurityType.Future, Market.CME);

var result = _client.ParseDatabentoContractSymbol(canonicalSymbol, databentoSymbol);

Assert.IsNotNull(result, $"Should successfully parse {databentoSymbol}");
Assert.AreEqual(expectedRoot, result.ID.Symbol, $"Root symbol should be {expectedRoot}");
Assert.AreEqual(expectedMonth, result.ID.Date.Month, $"Month should be {expectedMonth}");
Assert.AreEqual(expectedYear, result.ID.Date.Year, $"Year should be {expectedYear}");

Log.Trace($"Successfully parsed {databentoSymbol} -> {result} (expiry: {result.ID.Date:yyyy-MM-dd})");
}

[Test]
[TestCase("MYMH6-MYMM6")] // Spread symbol
[TestCase("ESZ25-ESH26")] // Spread symbol
public void ParseDatabentoContractSymbol_SkipsSpreads(string spreadSymbol)
{
var canonicalSymbol = Symbol.Create("ES", SecurityType.Future, Market.CME);

var result = _client.ParseDatabentoContractSymbol(canonicalSymbol, spreadSymbol);

Assert.IsNull(result, $"Should return null for spread symbol {spreadSymbol}");
}

[Test]
[TestCase("")]
[TestCase(null)]
public void ParseDatabentoContractSymbol_HandlesEmptyInput(string input)
{
var canonicalSymbol = Symbol.Create("ES", SecurityType.Future, Market.CME);

var result = _client.ParseDatabentoContractSymbol(canonicalSymbol, input);

Assert.IsNull(result, "Should return null for empty/null input");
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net9.0</TargetFramework>
<TargetFramework>net10.0</TargetFramework>
<RootNamespace>QuantConnect.DataLibrary.Tests</RootNamespace>
</PropertyGroup>
<ItemGroup>
Expand Down
18 changes: 11 additions & 7 deletions QuantConnect.DataBento/DataBentoDataDownloader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ private string MapSymbolToDataBento(Symbol symbol)
return symbol.Value;
}

/// Class for parsing trade data from Databento
/// Really used as a map from the http request to then get it in QC data structures
/// Class for parsing OHLCV bar data from Databento
/// Databento returns prices as fixed-point integers scaled by 10^9
private class DatabentoBar
{
[Name("ts_event")]
Expand All @@ -247,19 +247,23 @@ private class DatabentoBar
.AddTicks((TimestampNanos % 1_000_000_000) / 100).UtcDateTime;

[Name("open")]
public decimal Open { get; set; }
public long OpenRaw { get; set; }
public decimal Open => OpenRaw * PriceScaleFactor;

[Name("high")]
public decimal High { get; set; }
public long HighRaw { get; set; }
public decimal High => HighRaw * PriceScaleFactor;

[Name("low")]
public decimal Low { get; set; }
public long LowRaw { get; set; }
public decimal Low => LowRaw * PriceScaleFactor;

[Name("close")]
public decimal Close { get; set; }
public long CloseRaw { get; set; }
public decimal Close => CloseRaw * PriceScaleFactor;

[Name("volume")]
public decimal Volume { get; set; }
public long Volume { get; set; }
}

private class DatabentoTrade
Expand Down
188 changes: 176 additions & 12 deletions QuantConnect.DataBento/DataBentoDataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,23 @@
using QuantConnect.Packets;
using QuantConnect.Securities;
using System.Collections.Concurrent;
using System.ComponentModel.Composition;

namespace QuantConnect.Lean.DataSource.DataBento
{
/// <summary>
/// Implementation of Custom Data Provider
/// Implementation of Custom Data Provider with Universe support for futures contract resolution
/// </summary>
public class DataBentoProvider : IDataQueueHandler
[Export(typeof(IDataQueueHandler))]
public class DataBentoProvider : IDataQueueHandler, IDataQueueUniverseProvider
{
/// <summary>
/// Track resolved contracts per canonical symbol.
/// When Databento sends symbol mappings (MYM.FUT -> MYMH6), we parse them
/// into proper LEAN Symbols and store here so LookupSymbols() can return them.
/// </summary>
private readonly ConcurrentDictionary<Symbol, HashSet<Symbol>> _resolvedContracts = new();

private readonly IDataAggregator _dataAggregator = Composer.Instance.GetExportedValueByTypeName<IDataAggregator>(
Config.Get("data-aggregator", "QuantConnect.Lean.Engine.DataFeeds.AggregationManager"), forceTypeNameOnExisting: false);
private EventBasedDataQueueHandlerSubscriptionManager _subscriptionManager = null!;
Expand All @@ -49,6 +58,15 @@ public class DataBentoProvider : IDataQueueHandler
private readonly MarketHoursDatabase _marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
private readonly ConcurrentDictionary<Symbol, DateTimeZone> _symbolExchangeTimeZones = new();

/// <summary>
/// Replay start time for intraday historical data.
/// - DateTime.MinValue = full replay (start=0, up to 24 hours)
/// - Specific DateTime = replay from that time
/// - null = live only (no replay) - but we default to 15 min ago if not configured
/// Configure via "databento-replay-start" in config.json: "0" for full, ISO datetime for specific, absent for 15-min default
/// </summary>
private readonly DateTime? _replayStart;

/// <summary>
/// Returns true if we're currently connected to the Data Provider
/// </summary>
Expand All @@ -65,6 +83,11 @@ public DataBentoProvider()
throw new ArgumentException("DataBento API key is required. Set 'databento-api-key' in configuration.");
}

// Parse replay start config: "0" for full replay, ISO datetime for specific, absent for 15-min default
var replayStartConfig = Config.Get("databento-replay-start", "");
_replayStart = ParseReplayStart(replayStartConfig);
Log.Trace($"DataBentoProvider: Replay start configured as: {FormatReplayStart(_replayStart)}");

_dataDownloader = new DataBentoDataDownloader(_apiKey);
Initialize();
}
Expand Down Expand Up @@ -104,8 +127,10 @@ private void Initialize()
return false;
}

var resolution = config.Resolution > Resolution.Tick ? Resolution.Tick : config.Resolution;
if (!_client.Subscribe(config.Symbol, resolution, config.TickType))
// Use intraday replay to get historical data at startup
var replayStart = GetEffectiveReplayStart();
Log.Trace($"DataBentoProvider.SubscribeImpl(): Using intraday replay: {FormatReplayStart(replayStart)}");
if (!_client.Subscribe(config.Symbol, config.Resolution, config.TickType, replayStart))
{
Log.Error($"Failed to subscribe to {config.Symbol}");
return false;
Expand Down Expand Up @@ -142,6 +167,7 @@ private void Initialize()
_client = new DatabentoRawClient(_apiKey);
_client.DataReceived += OnDataReceived;
_client.ConnectionStatusChanged += OnConnectionStatusChanged;
_client.SymbolMappingReceived += OnSymbolMappingReceived;

// Connect to live gateway
Log.Trace("DataBentoProvider.Initialize(): Attempting connection to DataBento live gateway");
Expand Down Expand Up @@ -258,15 +284,68 @@ public void Dispose()
}
}

/// <summary>
/// Parses the databento-replay-start config value.
/// </summary>
/// <param name="configValue">Config value: "0" for full, ISO datetime for specific, empty for default</param>
/// <returns>DateTime.MinValue for full replay, specific DateTime, or null for default (15-min lookback)</returns>
private static DateTime? ParseReplayStart(string configValue)
{
if (string.IsNullOrWhiteSpace(configValue))
{
// Empty/absent = use default (will be calculated as 15 min ago at subscribe time)
return null;
}

if (configValue == "0")
{
// "0" = full intraday replay (sentinel value)
return DateTime.MinValue;
}

// Try to parse as ISO datetime
if (DateTime.TryParse(configValue, out var parsed))
{
return parsed.ToUniversalTime();
}

Log.Error($"DataBentoProvider: Invalid databento-replay-start value '{configValue}'. Use '0' for full replay, ISO datetime, or omit for 15-min default.");
return null;
}

/// <summary>
/// Formats replay start for logging.
/// </summary>
private static string FormatReplayStart(DateTime? replayStart)
{
if (!replayStart.HasValue)
return "15-minute lookback (default)";
if (replayStart.Value == DateTime.MinValue)
return "FULL intraday replay (start=0)";
return $"from {replayStart.Value:yyyy-MM-ddTHH:mm:ss} UTC";
}

/// <summary>
/// Gets the effective replay start time for subscription.
/// If _replayStart is null (default), calculates 15 minutes ago.
/// </summary>
private DateTime? GetEffectiveReplayStart()
{
if (_replayStart.HasValue)
return _replayStart.Value;
// Default: 15 minutes ago
return DateTime.UtcNow.AddMinutes(-15);
}

/// <summary>
/// Checks if this Data provider supports the specified symbol
/// </summary>
/// <param name="symbol">The symbol</param>
/// <returns>returns true if Data Provider supports the specified symbol; otherwise false</returns>
private bool CanSubscribe(Symbol symbol)
{
// Reject universe symbols but allow canonical (continuous) futures
return !symbol.Value.Contains("universe", StringComparison.InvariantCultureIgnoreCase) &&
!symbol.IsCanonical() &&
IsSecurityTypeSupported(symbol.SecurityType);
}

Expand Down Expand Up @@ -352,18 +431,26 @@ private void OnDataReceived(object? sender, BaseData data)
{
tick.Time = GetTickTime(tick.Symbol, tick.Time);
_dataAggregator.Update(tick);

Log.Trace($"DataBentoProvider.OnDataReceived(): Updated tick - Symbol: {tick.Symbol}, " +
$"TickType: {tick.TickType}, Price: {tick.Value}, Quantity: {tick.Quantity}");
// Log.Trace removed - too spammy (millions of ticks)
}
else if (data is TradeBar tradeBar)
{
tradeBar.Time = GetTickTime(tradeBar.Symbol, tradeBar.Time);
tradeBar.EndTime = GetTickTime(tradeBar.Symbol, tradeBar.EndTime);
_dataAggregator.Update(tradeBar);

Log.Trace($"DataBentoProvider.OnDataReceived(): Updated TradeBar - Symbol: {tradeBar.Symbol}, " +
$"O:{tradeBar.Open} H:{tradeBar.High} L:{tradeBar.Low} C:{tradeBar.Close} V:{tradeBar.Volume}");
// Classify as replay vs live data based on age
// Replay data (from start= parameter) is historical, live data is recent
var now = DateTime.UtcNow;
var dataAgeSeconds = (now - tradeBar.EndTime.ConvertToUtc(TimeZones.Chicago)).TotalSeconds;
var dataType = dataAgeSeconds > 60 ? "REPLAY" : "LIVE";

// Log first 10 bars and every 50th bar thereafter
_tradeBarCount++;
if (_tradeBarCount <= 10 || _tradeBarCount % 50 == 0)
{
Log.Trace($"DataBentoProvider.OnDataReceived(): [{dataType}] TradeBar #{_tradeBarCount} for {tradeBar.Symbol} at {tradeBar.Time} (age: {dataAgeSeconds:F0}s)");
}
_dataAggregator.Update(tradeBar);
}
else
{
Expand All @@ -377,6 +464,8 @@ private void OnDataReceived(object? sender, BaseData data)
}
}

private int _tradeBarCount = 0;

/// <summary>
/// Handles connection status changes from the live client
/// </summary>
Expand All @@ -392,7 +481,7 @@ private void OnConnectionStatusChanged(object? sender, bool isConnected)
_sessionStarted = false;
}

// Resubscribe to all active subscriptions
// Resubscribe to all active subscriptions
foreach (var config in _activeSubscriptionConfigs)
{
_client.Subscribe(config.Symbol, config.Resolution, config.TickType);
Expand All @@ -412,5 +501,80 @@ private void OnConnectionStatusChanged(object? sender, bool isConnected)
}
}
}

/// <summary>
/// Handles symbol mapping events from Databento.
/// When Databento resolves a continuous symbol (MYM.FUT) to a specific contract (MYMH6),
/// we store the mapping so LEAN's universe selection can find the contract.
/// </summary>
private void OnSymbolMappingReceived(object? sender, SymbolMappingEventArgs e)
{
if (e.ContractSymbol == null)
{
Log.Trace($"DataBentoProvider.OnSymbolMappingReceived(): No contract symbol for {e.DatabentoSymbol} (canonical: {e.CanonicalSymbol})");
return;
}

// Add the resolved contract to our tracking dictionary
var contracts = _resolvedContracts.GetOrAdd(e.CanonicalSymbol, _ => new HashSet<Symbol>());
lock (contracts)
{
if (contracts.Add(e.ContractSymbol))
{
Log.Trace($"DataBentoProvider.OnSymbolMappingReceived(): Resolved {e.CanonicalSymbol} -> {e.ContractSymbol} (from {e.DatabentoSymbol})");
}
}
}

#region IDataQueueUniverseProvider Implementation

/// <summary>
/// Returns the symbols available for the specified canonical symbol.
/// For futures, returns the resolved contracts (e.g., MYMH6 for canonical /MYM).
/// </summary>
/// <param name="symbol">The canonical symbol to lookup</param>
/// <param name="includeExpired">Whether to include expired contracts</param>
/// <param name="securityCurrency">Expected security currency (not used)</param>
/// <returns>Enumerable of resolved contract Symbols</returns>
public IEnumerable<Symbol> LookupSymbols(Symbol symbol, bool includeExpired, string securityCurrency = null)
{
Log.Trace($"DataBentoProvider.LookupSymbols(): Looking up symbols for {symbol}, includeExpired={includeExpired}");

if (_resolvedContracts.TryGetValue(symbol, out var contracts))
{
lock (contracts)
{
var contractList = contracts.ToList();
Log.Trace($"DataBentoProvider.LookupSymbols(): Found {contractList.Count} contracts for {symbol}");

// If not including expired, filter by expiry date
if (!includeExpired)
{
var now = DateTime.UtcNow.Date;
contractList = contractList.Where(c => c.ID.Date >= now).ToList();
Log.Trace($"DataBentoProvider.LookupSymbols(): After expiry filter: {contractList.Count} contracts");
}

return contractList;
}
}

Log.Trace($"DataBentoProvider.LookupSymbols(): No contracts found for {symbol}");
return Enumerable.Empty<Symbol>();
}

/// <summary>
/// Returns whether selection can take place.
/// Selection is allowed when we're connected and have resolved at least one contract.
/// </summary>
/// <returns>True if universe selection can proceed</returns>
public bool CanPerformSelection()
{
var canPerform = IsConnected && _resolvedContracts.Any();
Log.Trace($"DataBentoProvider.CanPerformSelection(): {canPerform} (IsConnected={IsConnected}, ResolvedContracts={_resolvedContracts.Count})");
return canPerform;
}

#endregion
}
}
5 changes: 4 additions & 1 deletion QuantConnect.DataBento/DataBentoHistoryProivder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,13 @@ public override void Initialize(HistoryProviderInitializeParameters parameters)
/// <returns>An enumerable of BaseData points</returns>
public IEnumerable<BaseData>? GetHistory(HistoryRequest request)
{
// DIAGNOSTIC: Log that we're being called
Log.Trace($"DataBentoHistoryProvider.GetHistory(): Symbol={request.Symbol}, Type={request.DataType}, TickType={request.TickType}, Resolution={request.Resolution}, IsCanonical={request.Symbol.IsCanonical()}, Start={request.StartTimeUtc}, End={request.EndTimeUtc}");

if (request.Symbol.IsCanonical() ||
!IsSupported(request.Symbol.SecurityType, request.DataType, request.TickType, request.Resolution))
{
// It is Logged in IsSupported(...)
Log.Trace($"DataBentoHistoryProvider.GetHistory(): Rejecting request - IsCanonical={request.Symbol.IsCanonical()}, IsSupported={(request.Symbol.IsCanonical() ? "N/A" : IsSupported(request.Symbol.SecurityType, request.DataType, request.TickType, request.Resolution).ToString())}");
return null;
}

Expand Down
Loading