Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import Marconi.Cardano.Core.Types (
)
import Marconi.Cardano.Indexers.SyncHelper qualified as Core
import Marconi.Core qualified as Core
import Marconi.Core.Indexer.SQLiteIndexer (SQLiteDBLocation, inMemoryDB)
import Marconi.Core.Indexer.SQLiteIndexer (SQLiteDBLocation, defaultInsertPlan, inMemoryDB)
import Marconi.Core.JsonRpc qualified as Core
import Network.JsonRpc.Types (JsonRpc, RawJsonRpc)

Expand Down Expand Up @@ -332,22 +332,26 @@ mkBlockInfoSqliteIndexer dbPath = do
-- write the events in the SQLite database
[
[ Core.SQLInsertPlan
-- Translate the event into a list of rows. In this specific indexer, each event is a
-- single row.
List.singleton
-- The query that is called for each row that is the output of the previous parameter.
blockInfoInsertQuery
( defaultInsertPlan
-- Translate the event into a list of rows. In this specific indexer, each event is a
-- single row.
List.singleton
-- The query that is called for each row that is the output of the previous parameter.
blockInfoInsertQuery
)
]
]
-- Requests launched when a rollback occurs
[ Core.SQLRollbackPlan
-- Name of the SQLite table
"block_info_table"
-- Field name of the SQLite table which we will use for handling
-- rollbacks (which deletes any database after that point)
"slotNo"
-- Translate the point of the 'Core.Timed point event' to a 'SlotNo'
C.chainPointToSlotNo
( Core.defaultRollbackPlan
-- Name of the SQLite table
"block_info_table"
-- Field name of the SQLite table which we will use for handling
-- rollbacks (which deletes any database after that point)
"slotNo"
-- Translate the point of the 'Core.Timed point event' to a 'SlotNo'
C.chainPointToSlotNo
)
]
where
dbCreation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE ViewPatterns #-}

Expand Down Expand Up @@ -111,7 +112,7 @@ and expose a single coordinator to operate them
-}
buildIndexers
:: SecurityParam
-> Core.CatchupConfig
-> (forall indexer event. Core.CatchupConfig indexer event) -- (Core.WithTransform Core.SQLiteIndexer (NonEmpty Datum.DatumInfo)) (WithDistance (Maybe (NonEmpty Datum.DatumInfo)))
-> Utxo.UtxoIndexerConfig
-> MintTokenEvent.MintTokenEventConfig
-> ExtLedgerStateCoordinator.ExtLedgerStateWorkerConfig EpochEvent (WithDistance BlockEvent)
Expand Down Expand Up @@ -249,6 +250,8 @@ epochNonceBuilder
:: (MonadIO n, MonadError Core.IndexerError n, MonadIO m)
=> SecurityParam
-> Core.CatchupConfig
(Core.WithTransform indexer Nonce.EpochNonce)
(WithDistance (Maybe Nonce.EpochNonce))
-> BM.Trace m Text
-> FilePath
-> n
Expand Down Expand Up @@ -278,6 +281,8 @@ epochSDDBuilder
:: (MonadIO n, MonadError Core.IndexerError n, MonadIO m)
=> SecurityParam
-> Core.CatchupConfig
(Core.WithTransform indexer (NonEmpty SDD.EpochSDD))
(WithDistance (Maybe (NonEmpty SDD.EpochSDD)))
-> BM.Trace m Text
-> FilePath
-> n
Expand Down Expand Up @@ -307,6 +312,8 @@ snapshotBlockEventBuilder
:: (MonadIO n, MonadError Core.IndexerError n, MonadIO m)
=> SecurityParam
-> Core.CatchupConfig
(Core.WithTransform indexer SnapshotBlockEvent)
(WithDistance (Maybe SnapshotBlockEvent))
-> BM.Trace m Text
-> FilePath
-> BlockRange
Expand Down Expand Up @@ -346,7 +353,7 @@ extractSnapshotBlockEvent =
-- | Builds the coordinators for each sub-chain serializers.
buildIndexersForSnapshot
:: SecurityParam
-> Core.CatchupConfig
-> (forall indexer event. Core.CatchupConfig indexer event)
-> ExtLedgerStateCoordinator.ExtLedgerStateWorkerConfig
(ExtLedgerStateEvent, WithDistance BlockEvent)
(WithDistance BlockEvent)
Expand Down Expand Up @@ -419,6 +426,8 @@ snapshotExtLedgerStateEventBuilder
:: (MonadIO n, MonadError Core.IndexerError n, MonadIO m)
=> SecurityParam
-> Core.CatchupConfig
(Core.WithTransform indexer ExtLedgerStateEvent)
(WithDistance (Maybe ExtLedgerStateEvent))
-> BM.Trace m Text
-> FilePath
-> BlockRange
Expand Down Expand Up @@ -449,9 +458,9 @@ snapshotExtLedgerStateEventBuilder securityParam catchupConfig textLogger path b
path

snapshotExtLedgerStateEventWorker
:: forall input m n
:: forall indexer input m n
. (MonadIO m, MonadError Core.IndexerError m, MonadIO n)
=> StandardWorkerConfig n input ExtLedgerStateEvent
=> StandardWorkerConfig n indexer input ExtLedgerStateEvent
-> SnapshotWorkerConfig input
-> FilePath
-> m
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TemplateHaskell #-}

{- | Generators and helpers for testing @Marconi.Cardano.Indexers@, namely the
Expand Down Expand Up @@ -142,7 +143,7 @@ we cannot generate explicitly.
-}
buildIndexers
:: SecurityParam
-> Core.CatchupConfig
-> (forall indexer event. Core.CatchupConfig indexer event)
-> Utxo.UtxoIndexerConfig
-> MintTokenEvent.MintTokenEventConfig
-> ExtLedgerStateCoordinator.ExtLedgerStateWorkerConfig EpochEvent (WithDistance BlockEvent)
Expand Down
16 changes: 10 additions & 6 deletions marconi-cardano-core/src/Marconi/Cardano/Core/Indexer/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import Marconi.Cardano.Core.Extract.WithDistance (WithDistance)
import Marconi.Cardano.Core.Extract.WithDistance qualified as Distance
import Marconi.Cardano.Core.Orphans ()
import Marconi.Cardano.Core.Types (SecurityParam)
import Marconi.Core (WithTransform)
import Marconi.Core qualified as Core

-- | An alias for an indexer with catchup and transformation to perform filtering
Expand All @@ -38,10 +39,13 @@ type StandardIndexer m indexer event =
-- | An alias for an SQLiteWorker with catchup and transformation to perform filtering
type StandardSQLiteIndexer m event = StandardIndexer m Core.SQLiteIndexer event

data StandardWorkerConfig m input event = StandardWorkerConfig
type StandardCatchupConfig indexer event =
Core.CatchupConfig (WithTransform indexer event) (WithDistance (Maybe event))

data StandardWorkerConfig m indexer input event = StandardWorkerConfig
{ workerName :: Text
, securityParamConfig :: !SecurityParam
, catchupConfig :: Core.CatchupConfig
, catchupConfig :: StandardCatchupConfig indexer event
, eventExtractor :: input -> m (Maybe event)
, logger :: Trace m (Core.IndexerEvent C.ChainPoint)
}
Expand All @@ -57,7 +61,7 @@ mkStandardIndexer
:: ( MonadIO m
, Core.Point event ~ C.ChainPoint
)
=> StandardWorkerConfig m a b
=> StandardWorkerConfig m indexer a event
-> indexer event
-> StandardIndexer m indexer event
mkStandardIndexer config indexer =
Expand All @@ -75,7 +79,7 @@ mkStandardWorker
, Core.IsSync n event indexer
, Core.Point event ~ C.ChainPoint
)
=> StandardWorkerConfig m input event
=> StandardWorkerConfig m indexer input event
-> indexer event
-> n (StandardWorker m input event indexer)
mkStandardWorker config = mkStandardWorkerWithFilter config Just
Expand All @@ -85,7 +89,7 @@ mkStandardIndexerWithFilter
:: ( MonadIO m
, Core.Point event ~ C.ChainPoint
)
=> StandardWorkerConfig m a b
=> StandardWorkerConfig m indexer a event
-> (event -> Maybe event)
-> indexer event
-> StandardIndexer m indexer event
Expand All @@ -105,7 +109,7 @@ mkStandardWorkerWithFilter
, Ord (Core.Point event)
, Core.Point event ~ C.ChainPoint
)
=> StandardWorkerConfig m input event
=> StandardWorkerConfig m indexer input event
-> (event -> Maybe event)
-> indexer event
-> n (StandardWorker m input event indexer)
Expand Down
1 change: 1 addition & 0 deletions marconi-cardano-indexers/marconi-cardano-indexers.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ library
Marconi.Cardano.Indexers.SyncHelper
Marconi.Cardano.Indexers.Utxo
Marconi.Cardano.Indexers.UtxoQuery
Marconi.Cardano.Indexers.UtxoWithSpent

--------------------
-- Local components
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ module Marconi.Cardano.Indexers.BlockInfo (
import Cardano.Api qualified as C
import Cardano.BM.Data.Trace (Trace)
import Cardano.BM.Tracing qualified as BM
import Control.Lens ((&), (?~), (^.))
import Control.Lens ((&), (.~), (^.))
import Control.Lens qualified as Lens
import Control.Monad (when)
import Control.Monad.Except (MonadError (throwError))
Expand Down Expand Up @@ -133,25 +133,26 @@ mkBlockInfoIndexer path = do
id
createBlockInfoTable
blockInfoInsertQuery
(Core.SQLRollbackPlan "blockInfo" "slotNo" C.chainPointToSlotNo)
(Core.SQLRollbackPlan (Core.defaultRollbackPlan "blockInfo" "slotNo" C.chainPointToSlotNo))

catchupConfigEventHook :: Text -> Trace IO Text -> FilePath -> Core.CatchupEvent -> IO ()
catchupConfigEventHook indexerName stdoutTrace dbPath Core.Synced = do
catchupConfigEventHook :: Text -> Trace IO Text -> FilePath -> indexer -> IO indexer
catchupConfigEventHook indexerName stdoutTrace dbPath indexer = do
SQL.withConnection dbPath $ \c -> do
let slotNoIndexName = "blockInfo__slotNo"
createSlotNoIndexStatement =
"CREATE INDEX IF NOT EXISTS "
<> fromString slotNoIndexName
<> " ON blockInfo (slotNo)"
Core.createIndexTable indexerName stdoutTrace c slotNoIndexName createSlotNoIndexStatement
pure indexer

-- | Create a worker for 'BlockInfoIndexer' with catchup
blockInfoWorker
:: ( MonadIO n
, MonadError Core.IndexerError n
, MonadIO m
)
=> StandardWorkerConfig m input BlockInfo
=> StandardWorkerConfig m Core.SQLiteIndexer input BlockInfo
-- ^ General indexer configuration
-> SQLiteDBLocation
-- ^ SQLite database location
Expand All @@ -166,7 +167,7 @@ creating 'StandardWorkerConfig', including a preprocessor.
blockInfoBuilder
:: (MonadIO n, MonadError Core.IndexerError n)
=> SecurityParam
-> Core.CatchupConfig
-> Core.CatchupConfig indexer event
-> BM.Trace IO Text
-> FilePath
-> n (StandardWorker IO BlockEvent BlockInfo Core.SQLiteIndexer)
Expand All @@ -177,7 +178,7 @@ blockInfoBuilder securityParam catchupConfig textLogger path =
catchupConfigWithTracer =
catchupConfig
& Core.configCatchupEventHook
?~ catchupConfigEventHook indexerName textLogger blockInfoDbPath
.~ catchupConfigEventHook indexerName textLogger blockInfoDbPath
blockInfoWorkerConfig =
StandardWorkerConfig
indexerName
Expand Down
10 changes: 7 additions & 3 deletions marconi-cardano-indexers/src/Marconi/Cardano/Indexers/Datum.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import Database.SQLite.Simple (NamedParam ((:=)))
import Database.SQLite.Simple qualified as SQL
import Database.SQLite.Simple.QQ (sql)
import GHC.Generics (Generic)
import Marconi.Cardano.Core.Extract.WithDistance (WithDistance)
import Marconi.Cardano.Core.Indexer.Worker (
StandardSQLiteIndexer,
StandardWorker,
Expand All @@ -66,6 +67,7 @@ import Marconi.Cardano.Core.Types (
import Marconi.Cardano.Indexers.SyncHelper qualified as Sync
import Marconi.Core (SQLiteDBLocation)
import Marconi.Core qualified as Core
import Marconi.Core.Indexer.SQLiteIndexer (defaultInsertPlan)
import System.FilePath ((</>))

data DatumInfo = DatumInfo
Expand Down Expand Up @@ -124,13 +126,13 @@ mkDatumIndexer path = do
Sync.mkSyncedSqliteIndexer
path
createDatumTables
[[Core.SQLInsertPlan (traverse NonEmpty.toList) datumInsertQuery]]
[Core.SQLRollbackPlan "datum" "slotNo" C.chainPointToSlotNo]
[[Core.SQLInsertPlan (defaultInsertPlan (traverse NonEmpty.toList) datumInsertQuery)]]
[Core.SQLRollbackPlan (Core.defaultRollbackPlan "datum" "slotNo" C.chainPointToSlotNo)]

-- | A worker with catchup for a 'DatumIndexer'
datumWorker
:: (MonadIO m, MonadIO n, MonadError Core.IndexerError n)
=> StandardWorkerConfig m input DatumEvent
=> StandardWorkerConfig m Core.SQLiteIndexer input DatumEvent
-- ^ General configuration of the indexer (mostly for logging purpose)
-> SQLiteDBLocation
-- ^ SQLite database location
Expand All @@ -146,6 +148,8 @@ datumBuilder
:: (MonadIO n, MonadError Core.IndexerError n, MonadIO m)
=> SecurityParam
-> Core.CatchupConfig
(Core.WithTransform Core.SQLiteIndexer (NonEmpty DatumInfo))
(WithDistance (Maybe (NonEmpty DatumInfo)))
-> Trace m Text
-> FilePath
-> n (StandardWorker m [AnyTxBody] DatumEvent Core.SQLiteIndexer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import Marconi.Cardano.Indexers.ExtLedgerStateCoordinator (
)
import Marconi.Cardano.Indexers.SyncHelper qualified as Sync
import Marconi.Core qualified as Core
import Marconi.Core.Indexer.SQLiteIndexer (SQLiteDBLocation)
import Marconi.Core.Indexer.SQLiteIndexer (SQLiteDBLocation, defaultInsertPlan)
import Ouroboros.Consensus.Cardano.Block qualified as O
import Ouroboros.Consensus.HeaderValidation qualified as O
import Ouroboros.Consensus.Ledger.Extended qualified as O
Expand Down Expand Up @@ -107,21 +107,21 @@ mkEpochNonceIndexer path = do
, slotNo
, blockHeaderHash
) VALUES (?, ?, ?, ?, ?)|]
insertEvent = [Core.SQLInsertPlan pure nonceInsertQuery]
insertEvent = [Core.SQLInsertPlan (defaultInsertPlan pure nonceInsertQuery)]
Sync.mkSyncedSqliteIndexer
path
[createNonce]
[insertEvent]
[Core.SQLRollbackPlan "epoch_nonce" "slotNo" C.chainPointToSlotNo]
[Core.SQLRollbackPlan (Core.defaultRollbackPlan "epoch_nonce" "slotNo" C.chainPointToSlotNo)]

newtype EpochNonceWorkerConfig input = EpochNonceWorkerConfig
{ epochNonceWorkerConfigExtractor :: input -> C.EpochNo
}

epochNonceWorker
:: forall input m n
:: forall indexer input m n
. (MonadIO m, MonadError Core.IndexerError m, MonadIO n)
=> StandardWorkerConfig n input EpochNonce
=> StandardWorkerConfig n indexer input EpochNonce
-> EpochNonceWorkerConfig input
-> SQLiteDBLocation
-> m (Core.WorkerIndexer n input EpochNonce (Core.WithTrace n Core.SQLiteIndexer))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import Marconi.Cardano.Indexers.ExtLedgerStateCoordinator (
)
import Marconi.Cardano.Indexers.SyncHelper qualified as Sync
import Marconi.Core qualified as Core
import Marconi.Core.Indexer.SQLiteIndexer (SQLiteDBLocation)
import Marconi.Core.Indexer.SQLiteIndexer (SQLiteDBLocation, defaultInsertPlan)
import Ouroboros.Consensus.Cardano.Block qualified as O
import Ouroboros.Consensus.Ledger.Extended qualified as O
import Ouroboros.Consensus.Shelley.Ledger qualified as O
Expand Down Expand Up @@ -114,21 +114,21 @@ mkEpochSDDIndexer path = do
, slotNo
, blockHeaderHash
) VALUES (?, ?, ?, ?, ?, ?)|]
insertEvent = [Core.SQLInsertPlan (traverse NonEmpty.toList) sddInsertQuery]
insertEvent = [Core.SQLInsertPlan (defaultInsertPlan (traverse NonEmpty.toList) sddInsertQuery)]
Sync.mkSyncedSqliteIndexer
path
[createSDD]
[insertEvent]
[Core.SQLRollbackPlan "epoch_sdd" "slotNo" C.chainPointToSlotNo]
[Core.SQLRollbackPlan (Core.defaultRollbackPlan "epoch_sdd" "slotNo" C.chainPointToSlotNo)]

newtype EpochSDDWorkerConfig input = EpochSDDWorkerConfig
{ epochSDDWorkerConfigExtractor :: input -> C.EpochNo
}

epochSDDWorker
:: forall input m n
:: forall indexer input m n
. (MonadIO m, MonadError Core.IndexerError m, MonadIO n)
=> StandardWorkerConfig n input (NonEmpty EpochSDD)
=> StandardWorkerConfig n indexer input (NonEmpty EpochSDD)
-> EpochSDDWorkerConfig input
-> SQLiteDBLocation
-> m (Core.WorkerIndexer n input (NonEmpty EpochSDD) (Core.WithTrace n Core.SQLiteIndexer))
Expand Down
Loading