Skip to content

Conversation

@Joseph-Matteo-Scorsone
Copy link
Contributor

Changed to comments, code conventions, written out readme, and got rid of unnecessary file.

{
Log.Trace($"DataBentoProvider.UnsubscribeImpl(): Processing symbol {symbol}");

if (_client?.IsConnected == true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would silently fail to subscribe if not connected; you should either throw or try to reconnect

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Private comment was to invert the If to reduce nesting.


// Connect to live gateway
Log.Trace("DataBentoProvider.Initialize(): Attempting async connection to DataBento live gateway");
Task.Run(async () =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why async? Wouldn't this continue without successful connection? Test running this without an internet connection

if (!_potentialUnsupportedResolutionMessageLogged)
{
_potentialUnsupportedResolutionMessageLogged = true;
Log.Trace("DataBentoDataProvider.IsSupported(): " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You deleted a bunch of informative logging here, why?
Statements like this "_potentialUnsupportedResolutionMessageLogged = true;" are intended to show an informative error message once. Why have the boolean if deleting the log?

Private comment was to delete the debugging logging. If there is informative logging to inform user about failures in subscription/application those will be needed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing an exception is better than silently continuing as it will look like a code bug if not.

…ing. Changed all async to sync, only using Task to connect and get messages sync.
@jaredbroad jaredbroad merged commit a550fb4 into QuantConnect:master Nov 11, 2025
1 check failed
Copy link

@Romazes Romazes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, your PR needs additional improvements.

  • Please, fix the project with Tests.
  • Add more [TestCases]

@@ -16,4 +15,4 @@
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.4" />
<PackageReference Include="Microsoft.TestPlatform.ObjectModel" Version="16.9.4" />
<PackageReference Include="QuantConnect.Algorithm" Version="2.5.*" />
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can not run test project.
You need remove it because the package QuantConnect.Lean.Engine already has it.

Image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi could you please clarify this more? The way I wrote it is inside a Lean directory I cloned. I made this directory and all the databento code inside it in Lean. In the Algorithm.CSharp directory I wrote a test algorithm and set it in the config. From that I can easily run it in VScode with F5. I run the tests by dotnet test "path to the .csproj in the QuantConnect.DataBento.Test directory" That works for me, I just had to change some paths slightly in the config for the MarketHoursDatabase.FromDataFolder(); to work.

I went through the rest of your comments, thank you for them, and implemented the changes. I believe we're all good, I just haven't had time to run everything with live data due to the early close today. If you'd like we can talk more before I do another PR. Happy holidays!

Copy link

@Romazes Romazes Dec 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have run GetsHistory() Tests and got exception:

System.MissingMethodException : Method not found: 'Void CsvHelper.CsvReader..ctor(System.IO.TextReader, System.Globalization.CultureInfo)'.

That's a mean version conflict between dependencies of the CsvHelper NuGet package.
On the screen above, I showed you that some dependencies (in the Test project) have the CsvHelper package.

image

using QuantConnect.Util;
using QuantConnect.Configuration;
using QuantConnect.Interfaces;
using QuantConnect.Securities;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove unused or unnecessary usings. ⬆

/// <inheritdoc cref="HttpClient"/>
private readonly HttpClient _httpClient;

private readonly string _apiKey;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove.
This field is used only in .ctor.


private readonly string _apiKey;

private const decimal PriceScaleFactor = 1e-9m;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an xml description of the const, such as a link to the documentation.

@@ -57,4 +55,4 @@
_httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", credentials);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears to be an extra constructor, which we can remove.

/// <summary>
/// Maps a LEAN symbol to DataBento symbol format
/// </summary>
private string MapSymbolToDataBento(Symbol symbol)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need create new class SymbolMapper : ISymbolMapper with methods:

  • GetBrokerageSymbol()
  • GetLeanSymbol()

example - PolygonSymbolMapper


/// Class for parsing trade data from Databento
/// Really used as a map from the http request to then get it in QC data structures
private class DatabentoBar
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a folder called Models and place all classes there to make the code clearer, more readable, and less noisy.

[Name("ts_event")]
public long TimestampNanos { get; set; }

public DateTime Timestamp => DateTimeOffset.FromUnixTimeSeconds(TimestampNanos / 1_000_000_000)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Lean has a lot of extension use this one:

Time.UnixNanosecondTimeStampToDateTime()

{
yield return new Tick
{
Time = record.Timestamp,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return parameter Time in the Symbol timeZone parameter instead of UTC.

example: polygon

.AddTicks((TimestampNanos % 1_000_000_000) / 100).UtcDateTime;

[Name("open")]
public decimal Open { get; set; }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, please ensure that when converting raw parameters into a DTO, you use the correct precision internally in the DTO.

Copy link

@Romazes Romazes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I’ve included several remarks.
Please add test cases:

  1. Connect
  2. Disconnect
  3. Subscribe with different resolution, tick type, symbols.
  4. Unsubscriberibe

using QuantConnect.Data.Market;
using QuantConnect.Util;
using QuantConnect.Interfaces;
using System.Collections.Generic;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, remove not used using ...s.

namespace QuantConnect.Lean.DataSource.DataBento
{
/// <summary>
/// Implementation of Custom Data Provider
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, write a great and associated description. 😎

/// <summary>
/// <inheritdoc cref="IDataAggregator"/>
/// </summary>
private readonly IDataAggregator _dataAggregator = Composer.Instance.GetExportedValueByTypeName<IDataAggregator>(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should init it in Initialize() by pattern:

_dataAggregator = Composer.Instance.GetPart<IDataAggregator>();
if (_dataAggregator == null)
{
  _dataAggregator = Composer.Instance.GetExportedValueByTypeName<IDataAggregator>(Config.Get("data-aggregator", "QuantConnect.Lean.Engine.DataFeeds.AggregationManager"), forceTypeNameOnExisting: false);
}

/// <summary>
/// <inheritdoc cref="EventBasedDataQueueHandlerSubscriptionManager"/>
/// </summary>
private EventBasedDataQueueHandlerSubscriptionManager _subscriptionManager = null!;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a weird code. (looks chat GPT style)
Better, you can create:

  1. instance in global scope _subscriptionManager = new EventBasedDataQueueHandlerSubscriptionManager();
  2. Declare the variable in the global scope and initialize it in the Initialize() method.
  3. Please, avoid null! everywhere.

/// </summary>
private EventBasedDataQueueHandlerSubscriptionManager _subscriptionManager = null!;

private readonly List<SubscriptionDataConfig> _activeSubscriptionConfigs = new();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove the collection completely.
The Lean service EventBasedDataQueueHandlerSubscriptionManager already has helper method to get symbols.

_subscriptionManager.GetSubscribedSymbols()

spoiler: The subscription to live updates is incorrect. We should use another scheme for live updates.
https://databento.com/docs/schemas-and-data-formats/whats-a-schema#supported-schemas-and-their-fields?historical=python&live=python&reference=python

_dataAggregator.Update(tradeBar);

Log.Trace($"DataBentoProvider.OnDataReceived(): Updated TradeBar - Symbol: {tradeBar.Symbol}, " +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to remove it; it's really spamming the log and making the app slower.

{
tradeBar.Time = GetTickTime(tradeBar.Symbol, tradeBar.Time);
tradeBar.EndTime = GetTickTime(tradeBar.Symbol, tradeBar.EndTime);
_dataAggregator.Update(tradeBar);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use lock(...) { ... } when calling the Update() of dataAggregator.

example

{
if (data is Tick tick)
{
tick.Time = GetTickTime(tick.Symbol, tick.Time);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The greatest way to convert time internally when creating an instance of the Tick.

Task.Run(() =>

// Resubscribe to all active subscriptions
foreach (var config in _activeSubscriptionConfigs)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change subscription logic by using _subscriptionManager.GetSubscribedSymbols()

/// <summary>
/// Handles connection status changes from the live client
/// </summary>
private void OnConnectionStatusChanged(object? sender, bool isConnected)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method used incorrect logic and never called resubscription logic as expected.

  • How did you test it?

Copy link

@Romazes Romazes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool staff. Please refactor it with best practices.

/// </summary>
public partial class DataBentoHistoryProvider : SynchronizingHistoryProvider
{
private int _dataPointCount;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, add xml description.

using QuantConnect.Lean.DataSource.DataBento;
using QuantConnect.Interfaces;
using System.Collections.Generic;
using QuantConnect.Configuration;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove any unused usings.

Image

/// Initializes this history provider to work for the specified job
/// </summary>
/// <param name="parameters">The initialization parameters</param>
public override void Initialize(HistoryProviderInitializeParameters parameters)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, it is obsolete code.
Please, keep emtpy method - example
We have already initialized in DataBentoProvider.Initialize()

/// <summary>
/// DataBento implementation of <see cref="IHistoryProvider"/>
/// </summary>
public partial class DataBentoHistoryProvider : SynchronizingHistoryProvider
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not partial, but it needs to be partial.

  1. Rename to DataBentoProvider
  2. make partial main class DataBentoProvider

public partial class DataBentoHistoryProvider : SynchronizingHistoryProvider
{
private int _dataPointCount;
private DataBentoDataDownloader _dataDownloader;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove it to reuse from the main class since it's partial.

return;
}

var messageCount = 0;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use it?
It is so spammy.

  • Recommendation: Remove it.

if (line == null)
{
Log.Trace("DatabentoRawClient.ProcessMessagesAsync(): Connection closed by server");
Log.Trace("DatabentoRawClient.ProcessMessages(): Connection closed by server");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a closed connection. The line is null for some reason.

break;
}

if (string.IsNullOrWhiteSpace(line))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have validated the null above.

  • Why do we need continue?
  • Use curly brackets.

continue;

messageCount++;
if (messageCount <= 50 || messageCount % 100 == 0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, remove it.

{
var rtype = rtypeElement.GetInt32();

if (rtype == 23)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use switch/cases instead of if

@Joseph-Matteo-Scorsone
Copy link
Contributor Author

In the new code I committed earlier today I kept the use of Task.Run for a message loop in the live client. I kept it because the message loop is blocking and with this block the message sequence to DataBento could become out of order and we'd never escape that blocking loop. With Task.Run the message loop can start and the Connect function can finish and return.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants