Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 90 additions & 9 deletions QuantConnect.DataBento/DataBentoDataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
using QuantConnect.Packets;
using QuantConnect.Securities;
using System.Collections.Concurrent;
using System.ComponentModel.Composition;

namespace QuantConnect.Lean.DataSource.DataBento
{
/// <summary>
/// Implementation of Custom Data Provider
/// </summary>
[Export(typeof(IDataQueueHandler))]
public class DataBentoProvider : IDataQueueHandler
{
private readonly IDataAggregator _dataAggregator = Composer.Instance.GetExportedValueByTypeName<IDataAggregator>(
Expand All @@ -49,6 +51,15 @@ public class DataBentoProvider : IDataQueueHandler
private readonly MarketHoursDatabase _marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
private readonly ConcurrentDictionary<Symbol, DateTimeZone> _symbolExchangeTimeZones = new();

/// <summary>
/// Replay start time for intraday historical data.
/// - DateTime.MinValue = full replay (start=0, up to 24 hours)
/// - Specific DateTime = replay from that time
/// - null = live only (no replay) - but we default to 15 min ago if not configured
/// Configure via "databento-replay-start" in config.json: "0" for full, ISO datetime for specific, absent for 15-min default
/// </summary>
private readonly DateTime? _replayStart;

/// <summary>
/// Returns true if we're currently connected to the Data Provider
/// </summary>
Expand All @@ -65,6 +76,11 @@ public DataBentoProvider()
throw new ArgumentException("DataBento API key is required. Set 'databento-api-key' in configuration.");
}

// Parse replay start config: "0" for full replay, ISO datetime for specific, absent for 15-min default
var replayStartConfig = Config.Get("databento-replay-start", "");
_replayStart = ParseReplayStart(replayStartConfig);
Log.Trace($"DataBentoProvider: Replay start configured as: {FormatReplayStart(_replayStart)}");

_dataDownloader = new DataBentoDataDownloader(_apiKey);
Initialize();
}
Expand Down Expand Up @@ -104,8 +120,10 @@ private void Initialize()
return false;
}

var resolution = config.Resolution > Resolution.Tick ? Resolution.Tick : config.Resolution;
if (!_client.Subscribe(config.Symbol, resolution, config.TickType))
// Use intraday replay to get historical data at startup
var replayStart = GetEffectiveReplayStart();
Log.Trace($"DataBentoProvider.SubscribeImpl(): Using intraday replay: {FormatReplayStart(replayStart)}");
if (!_client.Subscribe(config.Symbol, config.Resolution, config.TickType, replayStart))
{
Log.Error($"Failed to subscribe to {config.Symbol}");
return false;
Expand Down Expand Up @@ -258,15 +276,68 @@ public void Dispose()
}
}

/// <summary>
/// Parses the databento-replay-start config value.
/// </summary>
/// <param name="configValue">Config value: "0" for full, ISO datetime for specific, empty for default</param>
/// <returns>DateTime.MinValue for full replay, specific DateTime, or null for default (15-min lookback)</returns>
private static DateTime? ParseReplayStart(string configValue)
{
if (string.IsNullOrWhiteSpace(configValue))
{
// Empty/absent = use default (will be calculated as 15 min ago at subscribe time)
return null;
}

if (configValue == "0")
{
// "0" = full intraday replay (sentinel value)
return DateTime.MinValue;
}

// Try to parse as ISO datetime
if (DateTime.TryParse(configValue, out var parsed))
{
return parsed.ToUniversalTime();
}

Log.Error($"DataBentoProvider: Invalid databento-replay-start value '{configValue}'. Use '0' for full replay, ISO datetime, or omit for 15-min default.");
return null;
}

/// <summary>
/// Formats replay start for logging.
/// </summary>
private static string FormatReplayStart(DateTime? replayStart)
{
if (!replayStart.HasValue)
return "15-minute lookback (default)";
if (replayStart.Value == DateTime.MinValue)
return "FULL intraday replay (start=0)";
return $"from {replayStart.Value:yyyy-MM-ddTHH:mm:ss} UTC";
}

/// <summary>
/// Gets the effective replay start time for subscription.
/// If _replayStart is null (default), calculates 15 minutes ago.
/// </summary>
private DateTime? GetEffectiveReplayStart()
{
if (_replayStart.HasValue)
return _replayStart.Value;
// Default: 15 minutes ago
return DateTime.UtcNow.AddMinutes(-15);
}

/// <summary>
/// Checks if this Data provider supports the specified symbol
/// </summary>
/// <param name="symbol">The symbol</param>
/// <returns>returns true if Data Provider supports the specified symbol; otherwise false</returns>
private bool CanSubscribe(Symbol symbol)
{
// Reject universe symbols but allow canonical (continuous) futures
return !symbol.Value.Contains("universe", StringComparison.InvariantCultureIgnoreCase) &&
!symbol.IsCanonical() &&
IsSecurityTypeSupported(symbol.SecurityType);
}

Expand Down Expand Up @@ -352,18 +423,26 @@ private void OnDataReceived(object? sender, BaseData data)
{
tick.Time = GetTickTime(tick.Symbol, tick.Time);
_dataAggregator.Update(tick);

Log.Trace($"DataBentoProvider.OnDataReceived(): Updated tick - Symbol: {tick.Symbol}, " +
$"TickType: {tick.TickType}, Price: {tick.Value}, Quantity: {tick.Quantity}");
// Log.Trace removed - too spammy (millions of ticks)
}
else if (data is TradeBar tradeBar)
{
tradeBar.Time = GetTickTime(tradeBar.Symbol, tradeBar.Time);
tradeBar.EndTime = GetTickTime(tradeBar.Symbol, tradeBar.EndTime);
_dataAggregator.Update(tradeBar);

Log.Trace($"DataBentoProvider.OnDataReceived(): Updated TradeBar - Symbol: {tradeBar.Symbol}, " +
$"O:{tradeBar.Open} H:{tradeBar.High} L:{tradeBar.Low} C:{tradeBar.Close} V:{tradeBar.Volume}");
// Classify as replay vs live data based on age
// Replay data (from start= parameter) is historical, live data is recent
var now = DateTime.UtcNow;
var dataAgeSeconds = (now - tradeBar.EndTime.ConvertToUtc(TimeZones.Chicago)).TotalSeconds;
var dataType = dataAgeSeconds > 60 ? "REPLAY" : "LIVE";

// Log first 10 bars and every 50th bar thereafter
_tradeBarCount++;
if (_tradeBarCount <= 10 || _tradeBarCount % 50 == 0)
{
Log.Trace($"DataBentoProvider.OnDataReceived(): [{dataType}] TradeBar #{_tradeBarCount} for {tradeBar.Symbol} at {tradeBar.Time} (age: {dataAgeSeconds:F0}s)");
}
_dataAggregator.Update(tradeBar);
}
else
{
Expand All @@ -377,6 +456,8 @@ private void OnDataReceived(object? sender, BaseData data)
}
}

private int _tradeBarCount = 0;

/// <summary>
/// Handles connection status changes from the live client
/// </summary>
Expand Down
5 changes: 4 additions & 1 deletion QuantConnect.DataBento/DataBentoHistoryProivder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,13 @@ public override void Initialize(HistoryProviderInitializeParameters parameters)
/// <returns>An enumerable of BaseData points</returns>
public IEnumerable<BaseData>? GetHistory(HistoryRequest request)
{
// DIAGNOSTIC: Log that we're being called
Log.Trace($"DataBentoHistoryProvider.GetHistory(): Symbol={request.Symbol}, Type={request.DataType}, TickType={request.TickType}, Resolution={request.Resolution}, IsCanonical={request.Symbol.IsCanonical()}, Start={request.StartTimeUtc}, End={request.EndTimeUtc}");

if (request.Symbol.IsCanonical() ||
!IsSupported(request.Symbol.SecurityType, request.DataType, request.TickType, request.Resolution))
{
// It is Logged in IsSupported(...)
Log.Trace($"DataBentoHistoryProvider.GetHistory(): Rejecting request - IsCanonical={request.Symbol.IsCanonical()}, IsSupported={(request.Symbol.IsCanonical() ? "N/A" : IsSupported(request.Symbol.SecurityType, request.DataType, request.TickType, request.Resolution).ToString())}");
return null;
}

Expand Down
Loading