diff --git a/README.md b/README.md index 2cfce9d2..b567918a 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ The notifier service is a component that receives block events synchronously from multiversx observer nodes, and it forwards them to a subscribing component -(via message queuing service or websockets) +(via message queuing service or websockets). ## Prerequisites @@ -11,15 +11,12 @@ has to setup one or multiple observers. For running an observing squad, these [docs](https://docs.multiversx.com/integrators/observing-squad/) cover the whole process. -The observer node can be connected using WebSocket integration. Please check observer -node config for setting up the connector. Enable `HostDriversConfig` in order to use -WebSocket integration. +There are two ways the observer nodes can push data to the notifier service: -The required configs for launching an observer/s with a driver attached, -can be found [here](https://github.com/multiversx/mx-chain-go/blob/master/cmd/node/config/external.toml). +1. **WebSocket Integration (Recommended)**: The observer node pushes events to the notifier over a persistent WebSocket connection. This is the newer, more efficient method. To use this, check the observer node config and enable `HostDriversConfig`. + The required configs for launching an observer/s with a WS driver attached can be found [here](https://github.com/multiversx/mx-chain-go/blob/master/cmd/node/config/external.toml). -The HTTP integration is still available for backwards compatibility, but it will be -deprecated in the future. +2. **HTTP POST Integration (Deprecated)**: The observer node makes separate HTTP POST requests to the notifier's API endpoints (`/events/push`, etc.) for every event. This integration is still available for backwards compatibility, but it will be deprecated in the future. ## How to run @@ -64,6 +61,8 @@ The supported config variables are: as the one specified in the `ProxyUrl` described above. - `Username`: the username used to authorize an observer. Can be left empty for `UseAuthorization = false` on observer connector. - `Password`: the password used to authorize an observer. Can be left empty for `UseAuthorization = false` on observer connector. +- `CheckDuplicates`: signals if the events received from observers have already been pushed to clients. Requires a Redis instance/cluster. +- `WithReadStateChanges`: signals if read state changes operations will be handled by the notifier. Note: this requires read state changes to also be enabled on the observer nodes. If observer connector is set to use BasicAuth with `UseAuthorization = true`, `Username` and `Password` has to be set here on events notifier, and `Auth` flag has to be enabled in @@ -131,16 +130,22 @@ make docker-new publisher_type=rabbitmq ### API Endpoints -Notifier service will expose several events routes, the observer nodes will -push events to these routes: +Depending on the observer integration method chosen, the notifier service will expose the following routes. + +**For WebSocket Integration (Observer -> Notifier):** +When using the WebSocket integration, the observer pushes all events through a single persistent WebSocket connection rather than relying on the HTTP routes below. + +**For HTTP POST Integration (Observer -> Notifier):** +The observer nodes will push events to these routes via consecutive HTTP requests: - `/events/push` (POST) -> it will handle all events for each round - `/events/revert` (POST) -> if there is a reverted block, the event will be pushed on this route - `/events/finalized` (POST) -> when the block has been finalized, the events will be pushed on this route -If the service will be in "notifier" mode, it will expose a additional route: -- `/hub/ws` (GET) - this route can be used to manage the websocket connection (check [websocket subscribing](#websockets) section for more details on this) +**For Client Subscriptions (Notifier -> Consumer):** +If the service will be in "notifier" mode (using the `ws` publisher), it will expose an additional route for end consumers to receive data: +- `/hub/ws` (GET) - this route can be used to manage the WebSocket connection (check [websocket subscribing](#websockets) section for more details on this) ## Redis @@ -166,10 +171,110 @@ in code in `data/outport.go` file. Once the proxy is launched together with the observer/s, the driver's methods will be called. +### Event types + +There are multiple event types available, they can be found as constants in common package, +[constants](https://github.com/multiversx/mx-chain-notifier-go/blob/main/common/constants.go). +Below is the list of available event types together with their associated JSON payload structures. + +- `all_events`: Pushes all logs and events for a block. +```json +{ + "hash": "blockHash1", + "events": [ + { + "address": "addr1", + "identifier": "identifier", + "topics": ["<< base64 encoded topic1 >>", "<< base64 encoded topic2 >>"], + "data": "<< base64 encoded data >> ", + ... + } + ] +} +``` + +- `block_events`: Pushes block info alongside its logs and events. +```json +{ + "hash": "blockHash1", + "shardId": 1, + "timestamp": 12345678, + "timestampMs": 12345678000, + "events": [ + { + "address": "addr1", + "identifier": "identifier", + ... + } + ] +} +``` + +- `revert_events`: Pushes information relating to a reverted block. +```json +{ + "hash": "blockHash1", + "nonce": 11, + "round": 2, + "epoch": 1, + "shardId": 1, + "timestamp": 12345678, + "timestampMs": 12345678000 +} +``` + +- `finalized_events`: Pushes the hash of a finalized block. +```json +{ + "hash": "blockHash" +} +``` + +- `block_txs`: Pushes all transactions contained within a block. +```json +{ + "hash": "blockHash1", + "txs": { + "txHash1": { + "Nonce": 123, + "Round": 1, + "Epoch": 1, + ... + } + } +} +``` + +- `block_scrs`: Pushes all smart contract results contained within a block. +```json +{ + "hash": "blockHash1", + "scrs": { + "scrHash1": { + "Nonce": 123, + ... + } + } +} +``` + +- `block_state_accesses`: Pushes information regarding the state accesses (reads/writes) occurring within a block. *(Requires `WithReadStateChanges` config enabled)* +```json +{ + "hash": "blockHash1", + "shardID": 1, + "timestampMs": 12345678000, + "nonce": 123, + "stateAccessesPerAccounts": { + ... + } +} +``` + ### RabbitMQ When using a setup with `RabbitMQ` you have to subscribe to each exchange -separately. +separately. This can be handled via RabbitMQ Management UI platform. ### WebSockets @@ -295,63 +400,3 @@ inner marshalled data like: } ``` -There are multiple event types available, they can be found as constants in common package, -[constants](https://github.com/multiversx/mx-chain-notifier-go/blob/main/common/constants.go). Below there is the event type together with the associated marshalled data type. -- `all_events` -```json -{ - "hash": "blockHash1", - "events": [ - { - "address": "addr1", - "identifier": "identifier", - ... - } - ] -} -``` - -- `revert_events` -```json -{ - "hash": "blockHash1", - "nonce": 11, - "round": 2, - "epoch": 1, -} -``` - -- `finalized_events` -```json -{ - "hash": "blockHash" -} -``` - -- `block_txs`: -```json -{ - "hash": "blockHash1", - "txs": { - "txHash1": { - "Nonce": 123, - "Round": 1, - "Epoch": 1, - ... - } - } -} -``` - -- `block_scrs` -```json -{ - "hash": "blockHash1", - "scrs": { - "scrHash1": { - "Nonce": 123, - ... - } - } -} -``` diff --git a/api/groups/eventsGroup.go b/api/groups/eventsGroup.go index a394c7ed..0fd83f94 100644 --- a/api/groups/eventsGroup.go +++ b/api/groups/eventsGroup.go @@ -98,6 +98,7 @@ func getPayloadVersion(c *gin.Context) uint32 { func (h *eventsGroup) pushEvents(c *gin.Context) { pushEventsRawData, err := c.GetRawData() if err != nil { + log.Error("pushEvents: failed to get raw data", "error", err) shared.JSONResponse(c, http.StatusBadRequest, nil, err.Error()) return } @@ -106,6 +107,7 @@ func (h *eventsGroup) pushEvents(c *gin.Context) { err = h.payloadHandler.ProcessPayload(pushEventsRawData, outport.TopicSaveBlock, payloadVersion) if err != nil { + log.Error("pushEvents: failed to process payload", "error", err) shared.JSONResponse(c, http.StatusBadRequest, nil, err.Error()) return } @@ -116,6 +118,7 @@ func (h *eventsGroup) pushEvents(c *gin.Context) { func (h *eventsGroup) revertEvents(c *gin.Context) { revertEventsRawData, err := c.GetRawData() if err != nil { + log.Error("revertEvents: failed to get raw data", "error", err) shared.JSONResponse(c, http.StatusBadRequest, nil, err.Error()) return } @@ -124,6 +127,7 @@ func (h *eventsGroup) revertEvents(c *gin.Context) { err = h.payloadHandler.ProcessPayload(revertEventsRawData, outport.TopicRevertIndexedBlock, payloadVersion) if err != nil { + log.Error("revertEvents: failed to process payload", "error", err) shared.JSONResponse(c, http.StatusBadRequest, nil, err.Error()) return } @@ -134,6 +138,7 @@ func (h *eventsGroup) revertEvents(c *gin.Context) { func (h *eventsGroup) finalizedEvents(c *gin.Context) { finalizedRawData, err := c.GetRawData() if err != nil { + log.Error("finalizedEvents: failed to get raw data", "error", err) shared.JSONResponse(c, http.StatusBadRequest, nil, err.Error()) return } @@ -142,6 +147,7 @@ func (h *eventsGroup) finalizedEvents(c *gin.Context) { err = h.payloadHandler.ProcessPayload(finalizedRawData, outport.TopicFinalizedBlock, payloadVersion) if err != nil { + log.Error("finalizedEvents: failed to process payload", "error", err) shared.JSONResponse(c, http.StatusBadRequest, nil, err.Error()) return } diff --git a/api/groups/eventsGroup_test.go b/api/groups/eventsGroup_test.go index 32c17a44..5cfc5c45 100644 --- a/api/groups/eventsGroup_test.go +++ b/api/groups/eventsGroup_test.go @@ -190,7 +190,7 @@ func TestEventsGroup_PushEvents(t *testing.T) { ExecutionOrder: 1, }, }, - Logs: []*outport.LogData{ + Logs: []*transaction.LogData{ { Log: &transaction.Log{ Address: []byte("logaddr1"), diff --git a/common/common.go b/common/common.go new file mode 100644 index 00000000..889986fe --- /dev/null +++ b/common/common.go @@ -0,0 +1,6 @@ +package common + +// ConvertTimeStampMsToSec will convert unix timestamp from milliseconds to seconds +func ConvertTimeStampMsToSec(timeStamp uint64) uint64 { + return timeStamp / 1000 +} diff --git a/data/block.go b/data/block.go index cf26ebe6..6d645a89 100644 --- a/data/block.go +++ b/data/block.go @@ -32,6 +32,8 @@ type InterceptorBlockData struct { ScrsWithOrder map[string]*outport.SCRInfo LogEvents []Event StateAccessesPerAccounts map[string]*stateChange.StateAccesses + Nonce uint64 + TimeStampMs uint64 } // ArgsSaveBlockData holds the block data that will be received on push events @@ -47,6 +49,8 @@ type ArgsSaveBlockData struct { NumberOfShards uint32 HeaderTimeStampMs uint64 StateAccesses map[string]*stateChange.StateAccesses + StateAccessesForBlock map[string]*outport.StateAccessesForBlock + Results map[string]*outport.ExecutionResultData } // OutportBlockDataOld holds the block data that will be received on push events @@ -62,6 +66,7 @@ type OutportBlockDataOld struct { AlteredAccounts map[string]*alteredAccount.AlteredAccount NumberOfShards uint32 IsImportDB bool + StateAccesses map[string]*stateChange.StateAccesses } // ArgsSaveBlock holds block data with header type diff --git a/go.mod b/go.mod index e3e11a4a..83980251 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,8 @@ require ( github.com/go-redis/redis/v8 v8.11.3 github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 - github.com/multiversx/mx-chain-communication-go v1.2.0 - github.com/multiversx/mx-chain-core-go v1.4.1 + github.com/multiversx/mx-chain-communication-go v1.3.0 + github.com/multiversx/mx-chain-core-go v1.4.2-0.20260219122727-014ae9f9311f github.com/multiversx/mx-chain-logger-go v1.1.0 github.com/pelletier/go-toml v1.9.3 github.com/prometheus/client_model v0.6.1 @@ -48,7 +48,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mr-tron/base58 v1.2.0 // indirect - github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234 // indirect + github.com/multiversx/mx-chain-crypto-go v1.3.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pelletier/go-toml/v2 v2.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index f63f3c00..1e6e71d3 100644 --- a/go.sum +++ b/go.sum @@ -128,12 +128,12 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= -github.com/multiversx/mx-chain-communication-go v1.2.0 h1:0wOoLldiRbvaOPxwICbnRCqCpLqPewg8M/FMbC/0OXY= -github.com/multiversx/mx-chain-communication-go v1.2.0/go.mod h1:wS3aAwkmHbC9mlzQdvL6p7l8Rqw3vmzhj7WZW1dTveA= -github.com/multiversx/mx-chain-core-go v1.4.1 h1:ljs53jpdjtCohpaqm2n/dvTGrFlSgIpoZYH8RVt5cWo= -github.com/multiversx/mx-chain-core-go v1.4.1/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g= -github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234 h1:NNI7kYxzsq+4mTPSUJo0cK1+iPxjUX+gRJDaBRwEQ7M= -github.com/multiversx/mx-chain-crypto-go v1.2.13-0.20250218161752-9482d9a22234/go.mod h1:QZAw2bZcOxGQRgYACTrmP8pfTa3NyxENIL+00G6nM5E= +github.com/multiversx/mx-chain-communication-go v1.3.0 h1:ziNM1dRuiR/7al2L/jGEA/a/hjurtJ/HEqgazHNt9P8= +github.com/multiversx/mx-chain-communication-go v1.3.0/go.mod h1:gDVWn6zUW6aCN1YOm/FbbT5MUmhgn/L1Rmpl8EoH3Yg= +github.com/multiversx/mx-chain-core-go v1.4.2-0.20260219122727-014ae9f9311f h1:kngckbX3TbZpU0LQpetUM8xNvUu/FtmZQtM66yoGcz0= +github.com/multiversx/mx-chain-core-go v1.4.2-0.20260219122727-014ae9f9311f/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g= +github.com/multiversx/mx-chain-crypto-go v1.3.0 h1:0eK2bkDOMi8VbSPrB1/vGJSYT81IBtfL4zw+C4sWe/k= +github.com/multiversx/mx-chain-crypto-go v1.3.0/go.mod h1:nPIkxxzyTP8IquWKds+22Q2OJ9W7LtusC7cAosz7ojM= github.com/multiversx/mx-chain-logger-go v1.1.0 h1:97x84A6L4RfCa6YOx1HpAFxZp1cf/WI0Qh112whgZNM= github.com/multiversx/mx-chain-logger-go v1.1.0/go.mod h1:K9XgiohLwOsNACETMNL0LItJMREuEvTH6NsoXWXWg7g= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= diff --git a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go index 6be251f8..4c0a7124 100644 --- a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go +++ b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go @@ -13,13 +13,26 @@ import ( "github.com/multiversx/mx-chain-core-go/data/smartContractResult" "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-core-go/data/transaction" + "github.com/multiversx/mx-chain-core-go/marshal" logger "github.com/multiversx/mx-chain-logger-go" "github.com/multiversx/mx-chain-notifier-go/common" "github.com/multiversx/mx-chain-notifier-go/integrationTests" + "github.com/multiversx/mx-chain-notifier-go/testdata" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +const ( + // number of expected redis events + // one event for each outport driver method: Save, Revert, Finalized + numExpRedisEvents = 3 + + // number of exected rabbitmq events + // 5 events (logs & events, txs, scrs, full blocks events, state accesses) for Save method + // + one event for each other outport driver method: Revert, Finalized + numExpRabbitMQEvents = 7 +) + var log = logger.GetOrCreate("integrationTests/rabbitmq") func TestNotifierWithRabbitMQ(t *testing.T) { @@ -32,6 +45,16 @@ func TestNotifierWithRabbitMQ(t *testing.T) { }) } +func TestNotifierWithRabbitMQV3(t *testing.T) { + t.Run("with http observer connnector", func(t *testing.T) { + testNotifierWithRabbitMQV3(t, common.HTTPConnectorType, common.PayloadV1) + }) + + t.Run("with ws observer connnector", func(t *testing.T) { + testNotifierWithRabbitMQV3(t, common.WSObsConnectorType, common.PayloadV1) + }) +} + func testNotifierWithRabbitMQ(t *testing.T, observerType string, payloadVersion uint32) { cfg := integrationTests.GetDefaultConfigs() cfg.MainConfig.General.CheckDuplicates = true @@ -59,10 +82,42 @@ func testNotifierWithRabbitMQ(t *testing.T, observerType string, payloadVersion go pushEventsRequest(wg, client) go pushRevertRequest(wg, client) - integrationTests.WaitTimeout(t, wg, time.Second*2) + integrationTests.WaitTimeout(t, wg, time.Second*5) - assert.Equal(t, 3, len(notifier.RedisClient.GetEntries())) - assert.Equal(t, 7, len(notifier.RabbitMQClient.GetEntries())) + assert.Equal(t, numExpRedisEvents, len(notifier.RedisClient.GetEntries())) + assert.Equal(t, numExpRabbitMQEvents, len(notifier.RabbitMQClient.GetEntries())) +} + +func testNotifierWithRabbitMQV3(t *testing.T, observerType string, payloadVersion uint32) { + cfg := integrationTests.GetDefaultConfigs() + cfg.MainConfig.General.CheckDuplicates = true + notifier, err := integrationTests.NewTestNotifierWithRabbitMq(cfg.MainConfig) + require.Nil(t, err) + + client, err := integrationTests.CreateObserverConnector(notifier.Facade, observerType, common.MessageQueuePublisherType, payloadVersion) + require.Nil(t, err) + + // wait for components to start + time.Sleep(time.Second * 5) + + _ = notifier.Publisher.Run() + defer notifier.Publisher.Close() + + wg := &sync.WaitGroup{} + wg.Add(5) + + go pushEventsRequestV3(wg, client) + go pushRevertRequestV3(wg, client) + go pushFinalizedRequest(wg, client) + + // send requests again + go pushEventsRequestV3(wg, client) + go pushRevertRequestV3(wg, client) + + integrationTests.WaitTimeout(t, wg, time.Second*5) + + assert.Equal(t, numExpRedisEvents, len(notifier.RedisClient.GetEntries())) + assert.Equal(t, numExpRabbitMQEvents, len(notifier.RabbitMQClient.GetEntries())) } func pushEventsRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverConnector) { @@ -96,7 +151,7 @@ func pushEventsRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverCo ExecutionOrder: 3, }, }, - Logs: []*outport.LogData{ + Logs: []*transaction.LogData{ { Log: &transaction.Log{ Address: []byte("logaddr1"), @@ -110,11 +165,11 @@ func pushEventsRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverCo stateAccesses := make(map[string]*stateChange.StateAccesses) stateAccesses["txHash1"] = &stateChange.StateAccesses{ StateAccess: []*stateChange.StateAccess{ - &stateChange.StateAccess{ + { MainTrieKey: []byte("mainTrieKey1"), MainTrieVal: []byte("mainTrieVal1"), }, - &stateChange.StateAccess{ + { MainTrieKey: []byte("mainTrieKey2"), MainTrieVal: []byte("mainTrieVal2"), }, @@ -135,7 +190,11 @@ func pushEventsRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverCo }, TransactionPool: txPool, HeaderGasConsumption: &outport.HeaderGasConsumption{}, - StateAccesses: stateAccesses, + StateAccessesForBlock: map[string]*outport.StateAccessesForBlock{ + hex.EncodeToString([]byte("headerHash1")): { + StateAccesses: stateAccesses, + }, + }, } err := webServer.PushEventsRequest(saveBlockData) @@ -146,6 +205,24 @@ func pushEventsRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverCo } } +func pushEventsRequestV3( + wg *sync.WaitGroup, + webServer integrationTests.ObserverConnector, +) { + marshaller := &marshal.JsonMarshalizer{} + blockData, err := testdata.NewBlockData(marshaller) + log.LogIfError(err) + + saveBlockData := blockData.OutportBlockV2() + + err = webServer.PushEventsRequest(saveBlockData) + log.LogIfError(err) + + if err == nil { + wg.Done() + } +} + func pushRevertRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverConnector) { header := &block.HeaderV2{ Header: &block.Header{ @@ -166,6 +243,24 @@ func pushRevertRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverCo } } +func pushRevertRequestV3(wg *sync.WaitGroup, webServer integrationTests.ObserverConnector) { + header := &block.HeaderV3{ + Nonce: 1, + } + headerBytes, _ := json.Marshal(header) + blockData := &outport.BlockData{ + HeaderBytes: headerBytes, + HeaderType: string(core.ShardHeaderV3), + HeaderHash: []byte("headerHash3"), + } + err := webServer.RevertEventsRequest(blockData) + log.LogIfError(err) + + if err == nil { + wg.Done() + } +} + func pushFinalizedRequest(wg *sync.WaitGroup, webServer integrationTests.ObserverConnector) { blockEvents := &outport.FinalizedBlock{ HeaderHash: []byte("headerHash3"), diff --git a/integrationTests/websocket/testNotifierWithWebsockets_test.go b/integrationTests/websocket/testNotifierWithWebsockets_test.go index 3c558792..b51ead00 100644 --- a/integrationTests/websocket/testNotifierWithWebsockets_test.go +++ b/integrationTests/websocket/testNotifierWithWebsockets_test.go @@ -13,9 +13,11 @@ import ( "github.com/multiversx/mx-chain-core-go/data/smartContractResult" "github.com/multiversx/mx-chain-core-go/data/stateChange" "github.com/multiversx/mx-chain-core-go/data/transaction" + "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-notifier-go/common" "github.com/multiversx/mx-chain-notifier-go/data" "github.com/multiversx/mx-chain-notifier-go/integrationTests" + "github.com/multiversx/mx-chain-notifier-go/testdata" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -66,11 +68,11 @@ func TestNotifierWithWebsockets_PushEvents(t *testing.T) { stateAccesses := make(map[string]*stateChange.StateAccesses) stateAccesses["txHash1"] = &stateChange.StateAccesses{ StateAccess: []*stateChange.StateAccess{ - &stateChange.StateAccess{ + { MainTrieKey: []byte("mainTrieKey1"), MainTrieVal: []byte("mainTrieVal1"), }, - &stateChange.StateAccess{ + { MainTrieKey: []byte("mainTrieKey2"), MainTrieVal: []byte("mainTrieVal2"), }, @@ -80,7 +82,7 @@ func TestNotifierWithWebsockets_PushEvents(t *testing.T) { saveBlockData := &outport.OutportBlock{ TransactionPool: &outport.TransactionPool{ - Logs: []*outport.LogData{ + Logs: []*transaction.LogData{ { Log: &transaction.Log{ Events: []*transaction.Event{ @@ -102,9 +104,73 @@ func TestNotifierWithWebsockets_PushEvents(t *testing.T) { }, }, HeaderGasConsumption: &outport.HeaderGasConsumption{}, - StateAccesses: stateAccesses, + StateAccessesForBlock: map[string]*outport.StateAccessesForBlock{ + hex.EncodeToString([]byte("headerHash")): { + StateAccesses: stateAccesses, + }, + }, + } + + wg := &sync.WaitGroup{} + wg.Add(1) + + go func() { + reply, err := ws.ReceiveEvents() + require.Nil(t, err) + + require.Equal(t, events, reply) + wg.Done() + }() + + time.Sleep(time.Second) + + err = webServer.PushEventsRequest(saveBlockData) + require.Nil(t, err) + + integrationTests.WaitTimeout(t, wg, time.Second*2) +} + +func TestNotifierWithWebsockets_PushEventsV3(t *testing.T) { + cfg := integrationTests.GetDefaultConfigs() + notifier, err := integrationTests.NewTestNotifierWithWS(cfg.MainConfig) + require.Nil(t, err) + + webServer, err := integrationTests.CreateObserverConnector(notifier.Facade, common.HTTPConnectorType, common.WSPublisherType, common.PayloadV1) + require.Nil(t, err) + + _ = notifier.Publisher.Run() + defer notifier.Publisher.Close() + + ws, err := integrationTests.NewWSClient(notifier.WSHandler) + require.Nil(t, err) + defer ws.Close() + + subscribeEvent := &data.SubscribeEvent{ + SubscriptionEntries: []data.SubscriptionEntry{ + { + EventType: common.PushLogsAndEvents, + }, + }, } + ws.SendSubscribeMessage(subscribeEvent) + + addr := []byte("logaddr1") + events := []data.Event{ + { + Address: hex.EncodeToString(addr), + TxHash: "txHash1", + Data: make([]byte, 0), + Topics: make([][]byte, 0), + }, + } + + marshaller := &marshal.JsonMarshalizer{} + blockData, err := testdata.NewBlockData(marshaller) + require.Nil(t, err) + + saveBlockData := blockData.OutportBlockV2() + wg := &sync.WaitGroup{} wg.Add(1) @@ -180,7 +246,7 @@ func TestNotifierWithWebsockets_BlockEvents(t *testing.T) { saveBlockData := &outport.OutportBlock{ TransactionPool: &outport.TransactionPool{ - Logs: []*outport.LogData{ + Logs: []*transaction.LogData{ { Log: &transaction.Log{ Events: []*transaction.Event{ @@ -203,7 +269,11 @@ func TestNotifierWithWebsockets_BlockEvents(t *testing.T) { TimestampMs: 1234000, }, HeaderGasConsumption: &outport.HeaderGasConsumption{}, - StateAccesses: stateAccesses, + StateAccessesForBlock: map[string]*outport.StateAccessesForBlock{ + hex.EncodeToString(headerHash): { + StateAccesses: stateAccesses, + }, + }, } wg := &sync.WaitGroup{} @@ -286,6 +356,65 @@ func TestNotifierWithWebsockets_RevertEvents(t *testing.T) { integrationTests.WaitTimeout(t, wg, time.Second*2) } +func TestNotifierWithWebsockets_RevertEventsV3(t *testing.T) { + cfg := integrationTests.GetDefaultConfigs() + notifier, err := integrationTests.NewTestNotifierWithWS(cfg.MainConfig) + require.Nil(t, err) + + webServer, err := integrationTests.CreateObserverConnector(notifier.Facade, common.HTTPConnectorType, common.WSPublisherType, common.PayloadV1) + require.Nil(t, err) + + _ = notifier.Publisher.Run() + defer notifier.Publisher.Close() + + ws, err := integrationTests.NewWSClient(notifier.WSHandler) + require.Nil(t, err) + defer ws.Close() + + subscribeEvent := &data.SubscribeEvent{ + SubscriptionEntries: []data.SubscriptionEntry{ + { + EventType: common.RevertBlockEvents, + }, + }, + } + + ws.SendSubscribeMessage(subscribeEvent) + + header := &block.HeaderV3{ + Nonce: 1, + } + headerBytes, _ := json.Marshal(header) + blockEvents := &outport.BlockData{ + HeaderBytes: headerBytes, + HeaderType: string(core.ShardHeaderV3), + HeaderHash: []byte("hash1"), + } + + expReply := &data.RevertBlock{ + Hash: hex.EncodeToString([]byte("hash1")), + Nonce: 1, + } + + wg := &sync.WaitGroup{} + wg.Add(1) + + go func() { + reply, err := ws.ReceiveRevertBlock() + require.Nil(t, err) + + require.Equal(t, expReply, reply) + wg.Done() + }() + + time.Sleep(time.Second) + + err = webServer.RevertEventsRequest(blockEvents) + require.Nil(t, err) + + integrationTests.WaitTimeout(t, wg, time.Second*2) +} + func TestNotifierWithWebsockets_FinalizedEvents(t *testing.T) { cfg := integrationTests.GetDefaultConfigs() notifier, err := integrationTests.NewTestNotifierWithWS(cfg.MainConfig) @@ -395,7 +524,11 @@ func TestNotifierWithWebsockets_TxsEvents(t *testing.T) { }, }, HeaderGasConsumption: &outport.HeaderGasConsumption{}, - StateAccesses: stateAccesses, + StateAccessesForBlock: map[string]*outport.StateAccessesForBlock{ + hex.EncodeToString(blockHash): { + StateAccesses: stateAccesses, + }, + }, } expTxs := map[string]*transaction.Transaction{ @@ -483,7 +616,11 @@ func TestNotifierWithWebsockets_ScrsEvents(t *testing.T) { }, }, HeaderGasConsumption: &outport.HeaderGasConsumption{}, - StateAccesses: stateAccesses, + StateAccessesForBlock: map[string]*outport.StateAccessesForBlock{ + hex.EncodeToString(blockHash): { + StateAccesses: stateAccesses, + }, + }, } expScrs := map[string]*smartContractResult.SmartContractResult{ @@ -678,7 +815,7 @@ func testNotifierWithWebsockets_AllEvents(t *testing.T, observerType string) { TransactionPool: &outport.TransactionPool{ Transactions: txs, SmartContractResults: scrs, - Logs: []*outport.LogData{ + Logs: []*transaction.LogData{ { Log: &transaction.Log{ Events: []*transaction.Event{ @@ -702,7 +839,11 @@ func testNotifierWithWebsockets_AllEvents(t *testing.T, observerType string) { TimestampMs: 1234000, }, HeaderGasConsumption: &outport.HeaderGasConsumption{}, - StateAccesses: stateAccesses, + StateAccessesForBlock: map[string]*outport.StateAccessesForBlock{ + hex.EncodeToString(blockHash): { + StateAccesses: stateAccesses, + }, + }, } numEvents := 6 diff --git a/mocks/eventsInterceptorStub.go b/mocks/eventsInterceptorStub.go index 6c7fb8d6..897c75b8 100644 --- a/mocks/eventsInterceptorStub.go +++ b/mocks/eventsInterceptorStub.go @@ -4,7 +4,8 @@ import "github.com/multiversx/mx-chain-notifier-go/data" // EventsInterceptorStub - type EventsInterceptorStub struct { - ProcessBlockEventsCalled func(eventsData *data.ArgsSaveBlockData) (*data.InterceptorBlockData, error) + ProcessBlockEventsCalled func(eventsData *data.ArgsSaveBlockData) (*data.InterceptorBlockData, error) + ProcessBlockEventsV3Called func(eventsData *data.ArgsSaveBlockData) ([]*data.InterceptorBlockData, error) } // ProcessBlockEvents - @@ -16,6 +17,15 @@ func (stub *EventsInterceptorStub) ProcessBlockEvents(eventsData *data.ArgsSaveB return nil, nil } +// ProcessBlockEventsV3 - +func (stub *EventsInterceptorStub) ProcessBlockEventsV3(eventsData *data.ArgsSaveBlockData) ([]*data.InterceptorBlockData, error) { + if stub.ProcessBlockEventsV3Called != nil { + return stub.ProcessBlockEventsV3Called(eventsData) + } + + return nil, nil +} + // IsInterfaceNil returns true if there is not value under the interface func (stub *EventsInterceptorStub) IsInterfaceNil() bool { return stub == nil diff --git a/process/errors.go b/process/errors.go index 1c1739dd..d3e8243b 100644 --- a/process/errors.go +++ b/process/errors.go @@ -34,3 +34,6 @@ var ErrNilEventsInterceptor = errors.New("nil events interceptor") // ErrNilStateAccesses signals that a nil state accesses has been provided var ErrNilStateAccesses = errors.New("nil state accesses provided") + +// ErrNilExecutionResults signals that a nil execution results map has been provided +var ErrNilExecutionResults = errors.New("nil execution results provided") diff --git a/process/eventsHandler.go b/process/eventsHandler.go index 0c8856bb..06529831 100644 --- a/process/eventsHandler.go +++ b/process/eventsHandler.go @@ -83,26 +83,59 @@ func (eh *eventsHandler) HandleSaveBlockEvents(allEvents data.ArgsSaveBlockData) return nil } + if check.IfNil(allEvents.Header) { + return ErrNilBlockHeader + } + + if allEvents.Header.IsHeaderV3() { + return eh.handleSaveBlockEventsV3(allEvents) + } + + return eh.handleSaveBlockEventsLegacy(allEvents) +} + +func (eh *eventsHandler) handleSaveBlockEventsLegacy(allEvents data.ArgsSaveBlockData) error { eventsData, err := eh.eventsInterceptor.ProcessBlockEvents(&allEvents) if err != nil { return err } + headerTimeStamp := eventsData.Header.GetTimeStamp() + headerTimeStampMs := allEvents.HeaderTimeStampMs + shardID := eventsData.Header.GetShardID() + nonce := eventsData.Header.GetNonce() + + return eh.handleSaveBlockEvents( + eventsData, + headerTimeStamp, + headerTimeStampMs, + shardID, + nonce, + ) +} + +func (eh *eventsHandler) handleSaveBlockEvents( + eventsData *data.InterceptorBlockData, + headerTimeStamp uint64, + headerTimeStampMs uint64, + shardID uint32, + nonce uint64, +) error { + if eventsData == nil { + return ErrNilEventsInterceptor + } if check.IfNil(eventsData.Header) { return ErrNilBlockHeader } - headerTimeStamp := eventsData.Header.GetTimeStamp() - headerTimeStampMs := allEvents.HeaderTimeStampMs - pushEvents := data.BlockEvents{ Hash: eventsData.Hash, - ShardID: eventsData.Header.GetShardID(), + ShardID: shardID, TimeStamp: headerTimeStamp, TimeStampMs: headerTimeStampMs, Events: eventsData.LogEvents, } - err = eh.handlePushEvents(pushEvents) + err := eh.handlePushEvents(pushEvents) if err != nil { return err } @@ -121,7 +154,7 @@ func (eh *eventsHandler) HandleSaveBlockEvents(allEvents data.ArgsSaveBlockData) txsWithOrder := data.BlockEventsWithOrder{ Hash: eventsData.Hash, - ShardID: eventsData.Header.GetShardID(), + ShardID: shardID, TimeStamp: headerTimeStamp, TimeStampMs: headerTimeStampMs, Txs: eventsData.TxsWithOrder, @@ -132,9 +165,9 @@ func (eh *eventsHandler) HandleSaveBlockEvents(allEvents data.ArgsSaveBlockData) stateAccesses := data.BlockStateAccesses{ Hash: eventsData.Hash, - ShardID: eventsData.Header.GetShardID(), + ShardID: shardID, TimeStampMs: headerTimeStampMs, - Nonce: eventsData.Header.GetNonce(), + Nonce: nonce, StateAccessesPerAccounts: eventsData.StateAccessesPerAccounts, } eh.handleStateAccesses(stateAccesses) @@ -142,6 +175,31 @@ func (eh *eventsHandler) HandleSaveBlockEvents(allEvents data.ArgsSaveBlockData) return nil } +func (eh *eventsHandler) handleSaveBlockEventsV3(allEvents data.ArgsSaveBlockData) error { + executionResultsData, err := eh.eventsInterceptor.ProcessBlockEventsV3(&allEvents) + if err != nil { + return err + } + + shardID := allEvents.Header.GetShardID() + + for _, executionResultData := range executionResultsData { + timeStampSec := common.ConvertTimeStampMsToSec(executionResultData.TimeStampMs) // this is used for backwards compatibility + err = eh.handleSaveBlockEvents( + executionResultData, + timeStampSec, + executionResultData.TimeStampMs, + shardID, + executionResultData.Nonce, + ) + if err != nil { + return err + } + } + + return nil +} + // HandlePushEvents will handle push events received from observer func (eh *eventsHandler) handlePushEvents(events data.BlockEvents) error { if events.Hash == "" { @@ -152,7 +210,7 @@ func (eh *eventsHandler) handlePushEvents(events data.BlockEvents) error { } if len(events.Events) == 0 { - log.Warn("received empty events", "event", common.PushLogsAndEvents, + log.Debug("received empty events", "event", common.PushLogsAndEvents, "block hash", events.Hash, ) events.Events = make([]data.Event, 0) @@ -259,7 +317,7 @@ func (eh *eventsHandler) handleBlockTxs(blockTxs data.BlockTxs) { } if len(blockTxs.Txs) == 0 { - log.Warn("received empty events", "event", common.BlockTxs, + log.Debug("received empty events", "event", common.BlockTxs, "block hash", blockTxs.Hash, ) } else { @@ -283,7 +341,7 @@ func (eh *eventsHandler) handleBlockScrs(blockScrs data.BlockScrs) { } if len(blockScrs.Scrs) == 0 { - log.Warn("received empty events", "event", common.BlockScrs, + log.Debug("received empty events", "event", common.BlockScrs, "block hash", blockScrs.Hash, ) } else { @@ -325,6 +383,8 @@ func (eh *eventsHandler) handleStateAccesses(stateAccesses data.BlockStateAccess log.Info("received state accesses", "block hash", stateAccesses.Hash, + "nonce", stateAccesses.Nonce, + "stateAccesesPerAccounts num", len(stateAccesses.StateAccessesPerAccounts), ) t := time.Now() diff --git a/process/eventsHandler_test.go b/process/eventsHandler_test.go index 6e46d955..73f0ffcc 100644 --- a/process/eventsHandler_test.go +++ b/process/eventsHandler_test.go @@ -91,7 +91,7 @@ func TestNewEventsHandler(t *testing.T) { }) } -func TestHandleSaveBlockEvents(t *testing.T) { +func TestHandleSaveBlockEvents_ShouldFail(t *testing.T) { t.Parallel() t.Run("duplicated events, should return early", func(t *testing.T) { @@ -120,7 +120,7 @@ func TestHandleSaveBlockEvents(t *testing.T) { require.Nil(t, err) }) - t.Run("failed to pre-process events, should fail", func(t *testing.T) { + t.Run("nil events header, should fail", func(t *testing.T) { t.Parallel() args := createMockEventsHandlerArgs() @@ -142,106 +142,135 @@ func TestHandleSaveBlockEvents(t *testing.T) { eventsHandler, err := process.NewEventsHandler(args) require.Nil(t, err) - err = eventsHandler.HandleSaveBlockEvents(data.ArgsSaveBlockData{}) - require.Equal(t, expectedErr, err) + blockData := data.ArgsSaveBlockData{ + Header: nil, + } + + err = eventsHandler.HandleSaveBlockEvents(blockData) + require.Equal(t, process.ErrNilBlockHeader, err) }) - t.Run("should work", func(t *testing.T) { + t.Run("failed to pre-process events, should fail", func(t *testing.T) { t.Parallel() - blockHash := "blockHash1" - txs := map[string]*outport.TxInfo{ - "hash1": { - Transaction: &transaction.Transaction{ - Nonce: 1, - }, - ExecutionOrder: 1, - }, - } - scrs := map[string]*outport.SCRInfo{ - "hash2": { - SmartContractResult: &smartContractResult.SmartContractResult{ - Nonce: 2, - }, - }, - } - logData := []*outport.LogData{ - { - Log: &transaction.Log{ - Address: []byte("logaddr1"), - Events: []*transaction.Event{}, - }, - TxHash: "logHash1", - }, - } + args := createMockEventsHandlerArgs() + args.CheckDuplicates = true - logEvents := []data.Event{ - { - Address: "addr1", + args.Locker = &mocks.LockerStub{ + IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { + return true, nil }, } - header := &block.HeaderV2{ - Header: &block.Header{ - ShardID: 2, + expectedErr := errors.New("expected err") + args.EventsInterceptor = &mocks.EventsInterceptorStub{ + ProcessBlockEventsCalled: func(eventsData *data.ArgsSaveBlockData) (*data.InterceptorBlockData, error) { + return nil, expectedErr }, } + + eventsHandler, err := process.NewEventsHandler(args) + require.Nil(t, err) + blockData := data.ArgsSaveBlockData{ - HeaderHash: []byte(blockHash), - TransactionsPool: &outport.TransactionPool{ - Transactions: txs, - SmartContractResults: scrs, - Logs: logData, - }, Header: &block.HeaderV2{}, } - expTxs := map[string]*transaction.Transaction{ - "hash1": { + err = eventsHandler.HandleSaveBlockEvents(blockData) + require.Equal(t, expectedErr, err) + }) +} + +func TestHandleSaveBlockEvents_ShouldWork(t *testing.T) { + t.Parallel() + + blockHash := "blockHash1" + txs := map[string]*outport.TxInfo{ + "hash1": { + Transaction: &transaction.Transaction{ Nonce: 1, }, - } - expScrs := map[string]*smartContractResult.SmartContractResult{ - "hash2": { + ExecutionOrder: 1, + }, + } + scrs := map[string]*outport.SCRInfo{ + "hash2": { + SmartContractResult: &smartContractResult.SmartContractResult{ Nonce: 2, }, - } + }, + } + logData := []*transaction.LogData{ + { + Log: &transaction.Log{ + Address: []byte("logaddr1"), + Events: []*transaction.Event{}, + }, + TxHash: "logHash1", + }, + } - expTxsData := data.BlockTxs{ - Hash: blockHash, - Txs: expTxs, - } - expScrsData := data.BlockScrs{ - Hash: blockHash, - Scrs: expScrs, - } - expLogEvents := data.BlockEvents{ - Hash: blockHash, - Events: logEvents, - ShardID: 2, - } + logEvents := []data.Event{ + { + Address: "addr1", + }, + } - expTxsWithOrder := map[string]*outport.TxInfo{ - "hash1": { - Transaction: &transaction.Transaction{ - Nonce: 1, - }, - ExecutionOrder: 1, + expTxs := map[string]*transaction.Transaction{ + "hash1": { + Nonce: 1, + }, + } + expScrs := map[string]*smartContractResult.SmartContractResult{ + "hash2": { + Nonce: 2, + }, + } + + expTxsData := data.BlockTxs{ + Hash: blockHash, + Txs: expTxs, + } + expScrsData := data.BlockScrs{ + Hash: blockHash, + Scrs: expScrs, + } + expLogEvents := data.BlockEvents{ + Hash: blockHash, + Events: logEvents, + ShardID: 2, + } + + expTxsWithOrder := map[string]*outport.TxInfo{ + "hash1": { + Transaction: &transaction.Transaction{ + Nonce: 1, }, - } - expScrsWithOrder := map[string]*outport.SCRInfo{ - "hash2": { - SmartContractResult: &smartContractResult.SmartContractResult{ - Nonce: 2, - }, + ExecutionOrder: 1, + }, + } + expScrsWithOrder := map[string]*outport.SCRInfo{ + "hash2": { + SmartContractResult: &smartContractResult.SmartContractResult{ + Nonce: 2, + }, + }, + } + expTxsWithOrderData := data.BlockEventsWithOrder{ + Hash: blockHash, + ShardID: 2, + Txs: expTxsWithOrder, + Scrs: expScrsWithOrder, + Events: logEvents, + } + + t.Run("should work before header v3", func(t *testing.T) { + t.Parallel() + + header := &block.HeaderV2{ + Header: &block.Header{ + ShardID: 2, }, - } - expTxsWithOrderData := data.BlockEventsWithOrder{ - Hash: blockHash, - ShardID: 2, - Txs: expTxsWithOrder, - Scrs: expScrsWithOrder, - Events: logEvents, } pushWasCalled := false @@ -287,6 +316,91 @@ func TestHandleSaveBlockEvents(t *testing.T) { eventsHandler, err := process.NewEventsHandler(args) require.Nil(t, err) + blockData := data.ArgsSaveBlockData{ + HeaderHash: []byte(blockHash), + TransactionsPool: &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + Logs: logData, + }, + Header: header, + } + + err = eventsHandler.HandleSaveBlockEvents(blockData) + require.Nil(t, err) + + assert.True(t, pushWasCalled) + assert.True(t, txsWasCalled) + assert.True(t, scrsWasCalled) + assert.True(t, blockEventsWithOrderWasCalled) + }) + + t.Run("should work with header v3", func(t *testing.T) { + t.Parallel() + + header := &block.HeaderV3{ + ShardID: 2, + } + + pushWasCalled := false + txsWasCalled := false + scrsWasCalled := false + blockEventsWithOrderWasCalled := false + + args := createMockEventsHandlerArgs() + + args.EventsInterceptor = &mocks.EventsInterceptorStub{ + ProcessBlockEventsCalled: func(eventsData *data.ArgsSaveBlockData) (*data.InterceptorBlockData, error) { + assert.Fail(t, "should have not been called") + return &data.InterceptorBlockData{}, nil + }, + ProcessBlockEventsV3Called: func(eventsData *data.ArgsSaveBlockData) ([]*data.InterceptorBlockData, error) { + return []*data.InterceptorBlockData{ + { + Hash: blockHash, + Header: header, + Txs: expTxs, + Scrs: expScrs, + LogEvents: logEvents, + TxsWithOrder: expTxsWithOrder, + ScrsWithOrder: expScrsWithOrder, + }, + }, nil + }, + } + + args.Publisher = &mocks.PublisherStub{ + BroadcastCalled: func(events data.BlockEvents) { + pushWasCalled = true + assert.Equal(t, expLogEvents, events) + }, + BroadcastTxsCalled: func(event data.BlockTxs) { + txsWasCalled = true + assert.Equal(t, expTxsData, event) + }, + BroadcastScrsCalled: func(event data.BlockScrs) { + scrsWasCalled = true + assert.Equal(t, expScrsData, event) + }, + BroadcastBlockEventsWithOrderCalled: func(event data.BlockEventsWithOrder) { + blockEventsWithOrderWasCalled = true + assert.Equal(t, expTxsWithOrderData, event) + }, + } + + eventsHandler, err := process.NewEventsHandler(args) + require.Nil(t, err) + + blockData := data.ArgsSaveBlockData{ + HeaderHash: []byte(blockHash), + TransactionsPool: &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + Logs: logData, + }, + Header: header, + } + err = eventsHandler.HandleSaveBlockEvents(blockData) require.Nil(t, err) diff --git a/process/eventsInterceptor.go b/process/eventsInterceptor.go index 778b2bc5..5d62ca74 100644 --- a/process/eventsInterceptor.go +++ b/process/eventsInterceptor.go @@ -2,11 +2,12 @@ package process import ( "encoding/hex" + "fmt" "sort" "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" - nodeData "github.com/multiversx/mx-chain-core-go/data" + coreData "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" "github.com/multiversx/mx-chain-core-go/data/stateChange" @@ -15,14 +16,24 @@ import ( "github.com/multiversx/mx-chain-notifier-go/data" ) +type txType int + +const ( + normalTx txType = iota + scr + rewardTx + invalidTx +) + type txWithOrder struct { - hash string - index uint32 + Hash string + Index uint32 + TxType txType } // logEvent defines a log event associated with corresponding tx hash type logEvent struct { - EventHandler nodeData.EventHandler + EventHandler coreData.EventHandler TxHash string } @@ -49,109 +60,251 @@ func NewEventsInterceptor(args ArgsEventsInterceptor) (*eventsInterceptor, error }, nil } -// ProcessBlockEvents will process block events data -func (ei *eventsInterceptor) ProcessBlockEvents(eventsData *data.ArgsSaveBlockData) (*data.InterceptorBlockData, error) { +func baseNilEventsDataChecks(eventsData *data.ArgsSaveBlockData) error { if eventsData == nil { - return nil, ErrNilBlockEvents - } - if eventsData.TransactionsPool == nil { - return nil, ErrNilTransactionsPool + return ErrNilBlockEvents } if eventsData.Body == nil { - return nil, ErrNilBlockBody + return ErrNilBlockBody } if eventsData.Header == nil { - return nil, ErrNilBlockHeader + return ErrNilBlockHeader } - events := ei.getLogEventsFromTransactionsPool(eventsData.TransactionsPool.Logs) + return nil +} - txs := make(map[string]*transaction.Transaction) - for hash, tx := range eventsData.TransactionsPool.Transactions { - txs[hash] = tx.Transaction +// ProcessBlockEvents will process block events data +func (ei *eventsInterceptor) ProcessBlockEvents(eventsData *data.ArgsSaveBlockData) (*data.InterceptorBlockData, error) { + err := baseNilEventsDataChecks(eventsData) + if err != nil { + return nil, err } - txsWithOrder := eventsData.TransactionsPool.Transactions - - scrs := make(map[string]*smartContractResult.SmartContractResult) - for hash, scr := range eventsData.TransactionsPool.SmartContractResults { - scrs[hash] = scr.SmartContractResult + if eventsData.TransactionsPool == nil { + return nil, ErrNilTransactionsPool } - scrsWithOrder := eventsData.TransactionsPool.SmartContractResults - stateAccessesPerAccounts := ei.getStateAccessesPerAccounts(eventsData) + transactionsPool := eventsData.TransactionsPool + + events := ei.getLogEventsFromTransactionsPool(transactionsPool.Logs) + + stateAccessesPerAccounts := ei.getStateAccessesPerAccounts(eventsData, hex.EncodeToString(eventsData.HeaderHash), transactionsPool) return &data.InterceptorBlockData{ Hash: hex.EncodeToString(eventsData.HeaderHash), Body: eventsData.Body, Header: eventsData.Header, - Txs: txs, - TxsWithOrder: txsWithOrder, - Scrs: scrs, - ScrsWithOrder: scrsWithOrder, + Txs: getTxsFromPool(transactionsPool), + TxsWithOrder: transactionsPool.GetTransactions(), + Scrs: getScrsFromPool(transactionsPool), + ScrsWithOrder: transactionsPool.GetSmartContractResults(), LogEvents: events, StateAccessesPerAccounts: stateAccessesPerAccounts, }, nil } +// ProcessBlockEventsV3 will process block events data for async execution model +func (ei *eventsInterceptor) ProcessBlockEventsV3(eventsData *data.ArgsSaveBlockData) ([]*data.InterceptorBlockData, error) { + err := baseNilEventsDataChecks(eventsData) + if err != nil { + return nil, err + } + + if !eventsData.Header.IsHeaderV3() { + return nil, coreData.ErrInvalidHeaderType + } + + if eventsData.Results == nil { + return nil, ErrNilExecutionResults + } + + execBlocksData := make([]*data.InterceptorBlockData, 0) + if len(eventsData.Results) == 0 { + return execBlocksData, nil + } + + for headerHash, execBlockData := range eventsData.Results { + transactionsPool := execBlockData.GetTransactionPool() + if transactionsPool == nil { + return nil, fmt.Errorf("%w: for execution results block data", ErrNilTransactionsPool) + } + + body := execBlockData.Body + + events := ei.getLogEventsFromTransactionsPool(transactionsPool.GetLogs()) + + stateAccessesPerAccounts := ei.getStateAccessesPerAccountsV3(eventsData, headerHash, transactionsPool) + + blockData := &data.InterceptorBlockData{ + Hash: headerHash, + Body: body, + Header: eventsData.Header, // this holds current proposed header, not executed header + Txs: getTxsFromPool(transactionsPool), + TxsWithOrder: transactionsPool.GetTransactions(), + Scrs: getScrsFromPool(transactionsPool), + ScrsWithOrder: transactionsPool.GetSmartContractResults(), + LogEvents: events, + StateAccessesPerAccounts: stateAccessesPerAccounts, + Nonce: execBlockData.GetHeaderNonce(), + TimeStampMs: execBlockData.GetTimestampMs(), + } + + execBlocksData = append(execBlocksData, blockData) + } + + return execBlocksData, nil +} + +func getScrsFromPool(transactionsPool *outport.TransactionPool) map[string]*smartContractResult.SmartContractResult { + scrs := make(map[string]*smartContractResult.SmartContractResult) + + for hash, scr := range transactionsPool.GetSmartContractResults() { + scrs[hash] = scr.SmartContractResult + } + + return scrs +} + +func getTxsFromPool(transactionsPool *outport.TransactionPool) map[string]*transaction.Transaction { + txs := make(map[string]*transaction.Transaction) + + for hash, tx := range transactionsPool.GetTransactions() { + txs[hash] = tx.Transaction + } + + return txs +} + func getTxsWithOrder(transactionsPool *outport.TransactionPool) []txWithOrder { - txsWithOrderMap := make(map[string]uint32) + // This map is needed because of duplicated transactions. + // There can be a case when a transaction is included in the block, but also marked as invalid, so it will be present + // in both transactions and invalidTxs maps from transactions pool, with the same execution order. In that case, + // we want to make sure that we process that transaction only as invalid. + numTxs := len(transactionsPool.Transactions) + + len(transactionsPool.SmartContractResults) + + len(transactionsPool.Rewards) + + len(transactionsPool.InvalidTxs) + txsWithOrderMap := make(map[string]txWithOrder, numTxs) for txHash, txInfo := range transactionsPool.Transactions { - txsWithOrderMap[txHash] = txInfo.ExecutionOrder + txsWithOrderMap[txHash] = txWithOrder{ + Hash: txHash, + Index: txInfo.ExecutionOrder, + TxType: normalTx, + } } for txHash, txInfo := range transactionsPool.SmartContractResults { - txsWithOrderMap[txHash] = txInfo.ExecutionOrder + txsWithOrderMap[txHash] = txWithOrder{ + Hash: txHash, + Index: txInfo.ExecutionOrder, + TxType: scr, + } } for txHash, txInfo := range transactionsPool.Rewards { - txsWithOrderMap[txHash] = txInfo.ExecutionOrder + txsWithOrderMap[txHash] = txWithOrder{ + Hash: txHash, + Index: txInfo.ExecutionOrder, + TxType: rewardTx, + } } for txHash, txInfo := range transactionsPool.InvalidTxs { - txsWithOrderMap[txHash] = txInfo.ExecutionOrder + txsWithOrderMap[txHash] = txWithOrder{ + Hash: txHash, + Index: txInfo.ExecutionOrder, + TxType: invalidTx, + } } txsWithOrder := make([]txWithOrder, 0, len(txsWithOrderMap)) - for txHash, index := range txsWithOrderMap { - txsWithOrder = append(txsWithOrder, txWithOrder{ - hash: txHash, - index: index, - }) + for _, txWithData := range txsWithOrderMap { + txsWithOrder = append(txsWithOrder, txWithData) } sort.Slice(txsWithOrder, func(i, j int) bool { - return txsWithOrder[i].index < txsWithOrder[j].index + return txsWithOrder[i].Index < txsWithOrder[j].Index }) return txsWithOrder } -func (ei *eventsInterceptor) getStateAccessesPerAccounts(eventsData *data.ArgsSaveBlockData) map[string]*stateChange.StateAccesses { +func (ei *eventsInterceptor) getStateAccessesPerAccounts( + eventsData *data.ArgsSaveBlockData, + headerHash string, + transactionPool *outport.TransactionPool, +) map[string]*stateChange.StateAccesses { if eventsData.StateAccesses == nil { - log.Warn("getStateAccessesPerAccounts failed: will return empty state accesses per accounts", - "block hash", eventsData.HeaderHash, + log.Debug("getStateAccessesPerAccounts failed: will return empty state accesses per accounts", + "block hash", headerHash, "error", ErrNilStateAccesses, ) return make(map[string]*stateChange.StateAccesses) } - stateAccessesPerTxs := eventsData.StateAccesses + stateAccesses := eventsData.StateAccesses - logStateAccessesPerTxs(stateAccessesPerTxs) + return ei.fetchStateAccessesPerAccounts(stateAccesses, transactionPool) +} - // txs hashes with order - txsWithOrder := getTxsWithOrder(eventsData.TransactionsPool) +func (ei *eventsInterceptor) getStateAccessesPerAccountsV3( + eventsData *data.ArgsSaveBlockData, + headerHash string, + transactionPool *outport.TransactionPool, +) map[string]*stateChange.StateAccesses { + stateAccessesPerBlock, ok := eventsData.StateAccessesForBlock[headerHash] + if !ok { + log.Debug("stateAccessesPerBlock failed: will return empty state accesses per accounts", + "block hash", headerHash, + ) + + return make(map[string]*stateChange.StateAccesses) + } + + if stateAccessesPerBlock == nil { + log.Debug("stateAccessesPerBlock failed: will return empty state accesses per accounts", + "block hash", headerHash, + "num state accesses for block", len(eventsData.StateAccessesForBlock), + ) + + return make(map[string]*stateChange.StateAccesses) + } + + stateAccesses := stateAccessesPerBlock.StateAccesses + + return ei.fetchStateAccessesPerAccounts(stateAccesses, transactionPool) +} + +func (ei *eventsInterceptor) fetchStateAccessesPerAccounts( + stateAccesses map[string]*stateChange.StateAccesses, + transactionPool *outport.TransactionPool, +) map[string]*stateChange.StateAccesses { + if stateAccesses == nil { + return make(map[string]*stateChange.StateAccesses) + } stateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) + + logStateAccessesPerTxs(stateAccesses) + + // txs hashes with order + txsWithOrder := getTxsWithOrder(transactionPool) + for _, txInfo := range txsWithOrder { - txHash, err := hex.DecodeString(txInfo.hash) + txHash, err := hex.DecodeString(txInfo.Hash) if err != nil { - log.Error("failed to decode tx hash", "txHash", txInfo.hash) + log.Error("failed to decode tx hash", "txHash", txInfo.Hash) continue } - stateAccessesPerTx, ok := stateAccessesPerTxs[string(txHash)] + stateAccessesPerTx, ok := stateAccesses[string(txHash)] if !ok { - log.Warn("did not find state accesses for tx", "txHash", txInfo.hash) + if txInfo.TxType == scr { + // there are cases when SCRs are generated but no state accesses are produced, so we will not log a warning in those cases + log.Trace("SCR with no state accesses", "txHash", txInfo.Hash) + continue + } + + log.Warn("did not find state accesses for tx", "txHash", txInfo.Hash, "txType", txInfo.TxType) continue } @@ -202,7 +355,7 @@ func logStateAccessesPerTxs(stateAccesses map[string]*stateChange.StateAccesses) } } -func (ei *eventsInterceptor) getLogEventsFromTransactionsPool(logs []*outport.LogData) []data.Event { +func (ei *eventsInterceptor) getLogEventsFromTransactionsPool(logs []*transaction.LogData) []data.Event { var logEvents []*logEvent for _, logData := range logs { if logData == nil { diff --git a/process/eventsInterceptor_test.go b/process/eventsInterceptor_test.go index 986fb089..5e9f9375 100644 --- a/process/eventsInterceptor_test.go +++ b/process/eventsInterceptor_test.go @@ -46,62 +46,44 @@ func TestNewEventsInterceptor(t *testing.T) { }) } -func TestProcessBlockEvents(t *testing.T) { +func TestEventsInterceptor_baseNilChecks(t *testing.T) { t.Parallel() t.Run("nil block events data", func(t *testing.T) { t.Parallel() - eventsInterceptor, _ := process.NewEventsInterceptor(createMockEventsInterceptorArgs()) - events, err := eventsInterceptor.ProcessBlockEvents(nil) - require.Nil(t, events) + err := process.BaseNilEventsDataCheks(nil) require.Equal(t, process.ErrNilBlockEvents, err) }) - t.Run("nil transactions pool", func(t *testing.T) { - t.Parallel() - - eventsInterceptor, _ := process.NewEventsInterceptor(createMockEventsInterceptorArgs()) - - eventsData := &data.ArgsSaveBlockData{ - HeaderHash: []byte("headerHash"), - TransactionsPool: nil, - } - events, err := eventsInterceptor.ProcessBlockEvents(eventsData) - require.Nil(t, events) - require.Equal(t, process.ErrNilTransactionsPool, err) - }) - t.Run("nil block body", func(t *testing.T) { t.Parallel() - eventsInterceptor, _ := process.NewEventsInterceptor(createMockEventsInterceptorArgs()) - eventsData := &data.ArgsSaveBlockData{ HeaderHash: []byte("headerHash"), TransactionsPool: &outport.TransactionPool{}, Body: nil, } - events, err := eventsInterceptor.ProcessBlockEvents(eventsData) - require.Nil(t, events) + err := process.BaseNilEventsDataCheks(eventsData) require.Equal(t, process.ErrNilBlockBody, err) }) t.Run("nil block header", func(t *testing.T) { t.Parallel() - eventsInterceptor, _ := process.NewEventsInterceptor(createMockEventsInterceptorArgs()) - eventsData := &data.ArgsSaveBlockData{ HeaderHash: []byte("headerHash"), TransactionsPool: &outport.TransactionPool{}, Body: &block.Body{}, Header: nil, } - events, err := eventsInterceptor.ProcessBlockEvents(eventsData) - require.Nil(t, events) + err := process.BaseNilEventsDataCheks(eventsData) require.Equal(t, process.ErrNilBlockHeader, err) }) +} + +func TestProcessBlockEvents_WithoutExecutionResults(t *testing.T) { + t.Parallel() t.Run("nil state accesses, should return empty map", func(t *testing.T) { t.Parallel() @@ -166,7 +148,7 @@ func TestProcessBlockEvents(t *testing.T) { }, } - logs := []*outport.LogData{ + logs := []*transaction.LogData{ { Log: &transaction.Log{ Address: addr, @@ -189,7 +171,8 @@ func TestProcessBlockEvents(t *testing.T) { SmartContractResults: scrs, Logs: logs, }, - StateAccesses: make(map[string]*stateChange.StateAccesses), + StateAccesses: make(map[string]*stateChange.StateAccesses), + StateAccessesForBlock: make(map[string]*outport.StateAccessesForBlock), } expTxs := map[string]*transaction.Transaction{ @@ -260,7 +243,7 @@ func TestProcessBlockEvents(t *testing.T) { }, } - logs := []*outport.LogData{ + logs := []*transaction.LogData{ { Log: &transaction.Log{ Address: addr, @@ -284,7 +267,8 @@ func TestProcessBlockEvents(t *testing.T) { TransactionsPool: &outport.TransactionPool{ Logs: logs, }, - StateAccesses: make(map[string]*stateChange.StateAccesses), + StateAccesses: make(map[string]*stateChange.StateAccesses), + StateAccessesForBlock: make(map[string]*outport.StateAccessesForBlock), } expEvents := &data.InterceptorBlockData{ @@ -308,7 +292,222 @@ func TestProcessBlockEvents(t *testing.T) { require.Nil(t, err) require.Equal(t, expEvents, events) }) +} + +func TestProcessBlockEvents_WithExecutionResults(t *testing.T) { + t.Parallel() + + // TODO: add test for state accesses when implemented on the node + + t.Run("should work", func(t *testing.T) { + t.Parallel() + + eventsInterceptor, _ := process.NewEventsInterceptor(createMockEventsInterceptorArgs()) + + txs := map[string]*outport.TxInfo{ + "hash2": { + Transaction: &transaction.Transaction{ + Nonce: 2, + }, + ExecutionOrder: 1, + }, + } + scrs := map[string]*outport.SCRInfo{ + "hash3": { + SmartContractResult: &smartContractResult.SmartContractResult{ + Nonce: 3, + }, + ExecutionOrder: 1, + }, + } + addr := []byte("addr1") + + blockBody := &block.Body{ + MiniBlocks: make([]*block.MiniBlock, 1), + } + blockHeader := &block.HeaderV3{ + ShardID: 1, + TimestampMs: 1234, + } + + logs := []*transaction.LogData{ + { + Log: &transaction.Log{ + Address: addr, + Events: []*transaction.Event{ + { + Address: addr, + }, + }, + }, + }, + } + + proposedTxPool := &outport.TransactionPool{ + Transactions: nil, + SmartContractResults: nil, + Logs: nil, + } + + execTxPool := &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + Logs: logs, + } + + blockHash := []byte("blockHash") + + execResults := map[string]*outport.ExecutionResultData{ + hex.EncodeToString(blockHash): { + Body: blockBody, + TransactionPool: execTxPool, + }, + } + + blockEvents := data.ArgsSaveBlockData{ + HeaderHash: blockHash, + Body: blockBody, + Header: blockHeader, + TransactionsPool: proposedTxPool, + StateAccesses: make(map[string]*stateChange.StateAccesses), + StateAccessesForBlock: make(map[string]*outport.StateAccessesForBlock), + Results: execResults, + } + + expTxs := map[string]*transaction.Transaction{ + "hash2": { + Nonce: 2, + }, + } + expTxsWithOrder := map[string]*outport.TxInfo{ + "hash2": { + Transaction: &transaction.Transaction{ + Nonce: 2, + }, + ExecutionOrder: 1, + }, + } + expScrs := map[string]*smartContractResult.SmartContractResult{ + "hash3": { + Nonce: 3, + }, + } + expScrsWithOrder := map[string]*outport.SCRInfo{ + "hash3": { + SmartContractResult: &smartContractResult.SmartContractResult{ + Nonce: 3, + }, + ExecutionOrder: 1, + }, + } + + expEvents := []*data.InterceptorBlockData{ + { + Hash: hex.EncodeToString(blockHash), + Body: blockBody, + Header: blockHeader, + Txs: expTxs, + TxsWithOrder: expTxsWithOrder, + Scrs: expScrs, + ScrsWithOrder: expScrsWithOrder, + LogEvents: []data.Event{ + { + Address: hex.EncodeToString(addr), + Identifier: "", + Data: make([]byte, 0), + Topics: make([][]byte, 0), + }, + }, + StateAccessesPerAccounts: make(map[string]*stateChange.StateAccesses), + }, + } + + events, err := eventsInterceptor.ProcessBlockEventsV3(&blockEvents) + require.Nil(t, err) + require.Equal(t, expEvents, events) + }) + + t.Run("nil event fields should be returned as empty", func(t *testing.T) { + t.Parallel() + + eventsInterceptor, _ := process.NewEventsInterceptor(createMockEventsInterceptorArgs()) + + addr := []byte("addr1") + + blockBody := &block.Body{ + MiniBlocks: make([]*block.MiniBlock, 1), + } + blockHeader := &block.HeaderV3{ + ShardID: 1, + TimestampMs: 1234, + } + + logs := []*transaction.LogData{ + { + Log: &transaction.Log{ + Address: addr, + Events: []*transaction.Event{ + { + Address: addr, + Topics: nil, + Data: nil, + Identifier: nil, + }, + }, + }, + }, + } + + proposedTxPool := &outport.TransactionPool{ + Logs: nil, + } + + execTxPool := &outport.TransactionPool{ + Logs: logs, + } + + blockHash := []byte("blockHash") + + execResults := map[string]*outport.ExecutionResultData{ + hex.EncodeToString(blockHash): { + Body: blockBody, + TransactionPool: execTxPool, + }, + } + + blockEvents := data.ArgsSaveBlockData{ + HeaderHash: blockHash, + Body: blockBody, + Header: blockHeader, + TransactionsPool: proposedTxPool, + StateAccesses: make(map[string]*stateChange.StateAccesses), + StateAccessesForBlock: make(map[string]*outport.StateAccessesForBlock), + Results: execResults, + } + + expEvents := []*data.InterceptorBlockData{ + { + Hash: hex.EncodeToString(blockHash), + Body: blockBody, + Header: blockHeader, + Txs: make(map[string]*transaction.Transaction), + Scrs: make(map[string]*smartContractResult.SmartContractResult), + LogEvents: []data.Event{ + { + Address: hex.EncodeToString(addr), + Identifier: "", + Data: make([]byte, 0), + Topics: make([][]byte, 0), + }, + }, + StateAccessesPerAccounts: make(map[string]*stateChange.StateAccesses), + }, + } + events, err := eventsInterceptor.ProcessBlockEventsV3(&blockEvents) + require.Nil(t, err) + require.Equal(t, expEvents, events) + }) } func TestGetLogEventsFromTransactionsPool(t *testing.T) { @@ -332,7 +531,7 @@ func TestGetLogEventsFromTransactionsPool(t *testing.T) { }, } - logs := []*outport.LogData{ + logs := []*transaction.LogData{ { Log: &transaction.Log{ Events: []*transaction.Event{ @@ -491,9 +690,42 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { blockHash := []byte("blockHash") + // before header V3 + t.Run("with write operations", func(t *testing.T) { t.Parallel() + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccesses["txHash2"] = &stateChange.StateAccesses{} + stateAccesses["txHash0"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + }, + } + blockEvents := &data.ArgsSaveBlockData{ HeaderHash: blockHash, TransactionsPool: &outport.TransactionPool{ @@ -501,13 +733,13 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { SmartContractResults: scrs, InvalidTxs: invalidTxs, }, - StateAccesses: stateAccessesWrite, + StateAccesses: stateAccesses, } expStateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey1"))] = &stateChange.StateAccesses{ StateAccess: []*stateChange.StateAccess{ - &stateChange.StateAccess{ + { Type: stateChange.Write, MainTrieKey: []byte("mainTrieKey1"), MainTrieVal: []byte("mainTrieVal1"), @@ -516,12 +748,12 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { } expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey2"))] = &stateChange.StateAccesses{ StateAccess: []*stateChange.StateAccess{ - &stateChange.StateAccess{ + { Type: stateChange.Write, MainTrieKey: []byte("mainTrieKey2"), MainTrieVal: []byte("mainTrieVal4"), }, - &stateChange.StateAccess{ + { Type: stateChange.Write, MainTrieKey: []byte("mainTrieKey2"), MainTrieVal: []byte("mainTrieVal2"), @@ -530,7 +762,7 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { } expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey3"))] = &stateChange.StateAccesses{ StateAccess: []*stateChange.StateAccess{ - &stateChange.StateAccess{ + { Type: stateChange.Write, MainTrieKey: []byte("mainTrieKey3"), MainTrieVal: []byte("mainTrieVal3"), @@ -549,6 +781,37 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { t.Run("with read operations, but not enabled from config", func(t *testing.T) { t.Parallel() + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + { + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + { + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccesses["txHash2"] = &stateChange.StateAccesses{} + stateAccesses["txHash0"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + { + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + { + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + }, + } + blockEvents := &data.ArgsSaveBlockData{ HeaderHash: blockHash, TransactionsPool: &outport.TransactionPool{ @@ -556,7 +819,7 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { SmartContractResults: scrs, InvalidTxs: invalidTxs, }, - StateAccesses: stateAccessesRead, + StateAccesses: stateAccesses, } args := createMockEventsInterceptorArgs() @@ -572,20 +835,52 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { t.Run("with read (not enabled from config) and write operations", func(t *testing.T) { t.Parallel() + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + { + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccesses["txHash2"] = &stateChange.StateAccesses{} + stateAccesses["txHash0"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + { + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + }, + } + blockEvents := &data.ArgsSaveBlockData{ HeaderHash: blockHash, + Header: &block.HeaderV3{}, TransactionsPool: &outport.TransactionPool{ Transactions: txs, SmartContractResults: scrs, InvalidTxs: invalidTxs, }, - StateAccesses: stateAccessesReadWrite, + StateAccesses: stateAccesses, } expStateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey2"))] = &stateChange.StateAccesses{ StateAccess: []*stateChange.StateAccess{ - &stateChange.StateAccess{ + { Type: stateChange.Write, MainTrieKey: []byte("mainTrieKey2"), MainTrieVal: []byte("mainTrieVal2"), @@ -594,7 +889,7 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { } expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey3"))] = &stateChange.StateAccesses{ StateAccess: []*stateChange.StateAccess{ - &stateChange.StateAccess{ + { Type: stateChange.Write, MainTrieKey: []byte("mainTrieKey3"), MainTrieVal: []byte("mainTrieVal3"), @@ -665,4 +960,342 @@ func TestEventsInterceptor_GetStateAccessesPerAccounts(t *testing.T) { require.Equal(t, expStateAccessesPerAccounts, stateAccessesPerAccounts) }) + + // with header V3 + + t.Run("with write operations", func(t *testing.T) { + t.Parallel() + + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccesses["txHash2"] = &stateChange.StateAccesses{} + stateAccesses["txHash0"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + }, + } + + blockEvents := &data.ArgsSaveBlockData{ + HeaderHash: blockHash, + TransactionsPool: &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + InvalidTxs: invalidTxs, + }, + StateAccesses: make(map[string]*stateChange.StateAccesses), + StateAccessesForBlock: map[string]*outport.StateAccessesForBlock{ + hex.EncodeToString(blockHash): {stateAccesses}, + }, + } + + expStateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey1"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + }, + } + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey2"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey3"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + }, + } + + args := createMockEventsInterceptorArgs() + en, _ := process.NewEventsInterceptor(args) + + stateAccessesPerAccounts := en.GetStateAccessesPerAccountsV3(blockEvents) + + require.Equal(t, expStateAccessesPerAccounts, stateAccessesPerAccounts) + }) + + t.Run("with read operations, but not enabled from config", func(t *testing.T) { + t.Parallel() + + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + { + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + { + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccesses["txHash2"] = &stateChange.StateAccesses{} + stateAccesses["txHash0"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + { + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + { + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + }, + } + + blockEvents := &data.ArgsSaveBlockData{ + HeaderHash: blockHash, + TransactionsPool: &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + InvalidTxs: invalidTxs, + }, + StateAccesses: make(map[string]*stateChange.StateAccesses), + StateAccessesForBlock: map[string]*outport.StateAccessesForBlock{ + hex.EncodeToString(blockHash): {stateAccesses}, + }, + } + + args := createMockEventsInterceptorArgs() + en, _ := process.NewEventsInterceptor(args) + + expStateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) + + stateAccessesPerAccounts := en.GetStateAccessesPerAccountsV3(blockEvents) + + require.Equal(t, expStateAccessesPerAccounts, stateAccessesPerAccounts) + }) + + t.Run("with read (not enabled from config) and write operations", func(t *testing.T) { + t.Parallel() + + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + { + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + stateAccesses["txHash2"] = &stateChange.StateAccesses{} + stateAccesses["txHash0"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + { + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + }, + } + + blockEvents := &data.ArgsSaveBlockData{ + HeaderHash: blockHash, + Header: &block.HeaderV3{}, + TransactionsPool: &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + InvalidTxs: invalidTxs, + }, + StateAccesses: make(map[string]*stateChange.StateAccesses), + StateAccessesForBlock: map[string]*outport.StateAccessesForBlock{ + hex.EncodeToString(blockHash): {stateAccesses}, + }, + } + + expStateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey2"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey3"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + { + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + }, + } + + args := createMockEventsInterceptorArgs() + en, _ := process.NewEventsInterceptor(args) + + stateAccessesPerAccounts := en.GetStateAccessesPerAccountsV3(blockEvents) + + require.Equal(t, expStateAccessesPerAccounts, stateAccessesPerAccounts) + }) + + t.Run("with read and write operations", func(t *testing.T) { + t.Parallel() + + blockEvents := &data.ArgsSaveBlockData{ + HeaderHash: blockHash, + TransactionsPool: &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + InvalidTxs: invalidTxs, + }, + StateAccesses: make(map[string]*stateChange.StateAccesses), + StateAccessesForBlock: map[string]*outport.StateAccessesForBlock{ + hex.EncodeToString(blockHash): {stateAccessesReadWrite}, + }, + } + + expStateAccessesPerAccounts := make(map[string]*stateChange.StateAccesses) + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey1"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + }, + }, + } + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey2"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Read, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal4"), + }, + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + }, + }, + } + expStateAccessesPerAccounts[hex.EncodeToString([]byte("mainTrieKey3"))] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey3"), + MainTrieVal: []byte("mainTrieVal3"), + }, + }, + } + + args := createMockEventsInterceptorArgs() + args.WithReadStateChanges = true + en, _ := process.NewEventsInterceptor(args) + + stateAccessesPerAccounts := en.GetStateAccessesPerAccountsV3(blockEvents) + + require.Equal(t, expStateAccessesPerAccounts, stateAccessesPerAccounts) + }) +} + +func TestEventsInterceptor_GetTxsWithOrder(t *testing.T) { + t.Parallel() + + transactionPool := &outport.TransactionPool{ + Transactions: map[string]*outport.TxInfo{ + "hash1": { + ExecutionOrder: 0, + }, + "hash2": { + ExecutionOrder: 1, + }, + }, + SmartContractResults: map[string]*outport.SCRInfo{ + "hash3": { + ExecutionOrder: 2, + }, + }, + Rewards: map[string]*outport.RewardInfo{ + "hash4": { + ExecutionOrder: 3, + }, + }, + InvalidTxs: map[string]*outport.TxInfo{ + "hash1": { + ExecutionOrder: 0, + }, + }, + } + + txsWithOrder := process.GetTxsWithOrder(transactionPool) + + // we expect one entry for each execution order 0..3, + // with the duplicate "hash1" treated as invalid + require.Len(t, txsWithOrder, 4) + var hashes []string + invalidCount := 0 + for _, tx := range txsWithOrder { + hashes = append(hashes, tx.Hash) + if tx.TxType == 3 { //invalid tx + invalidCount++ + } + } + // execution order should be preserved: 0,1,2,3 -> hash1,hash2,hash3,hash4 + require.Equal(t, []string{"hash1", "hash2", "hash3", "hash4"}, hashes) + // "hash1" should appear exactly once and be marked invalid + hash1Count := 0 + for _, h := range hashes { + if h == "hash1" { + hash1Count++ + } + } + require.Equal(t, 1, hash1Count) + require.Equal(t, invalidCount, 1) } diff --git a/process/export_test.go b/process/export_test.go index fa032b33..bef35e66 100644 --- a/process/export_test.go +++ b/process/export_test.go @@ -1,8 +1,11 @@ package process import ( + "encoding/hex" + "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/stateChange" + "github.com/multiversx/mx-chain-core-go/data/transaction" "github.com/multiversx/mx-chain-notifier-go/data" ) @@ -37,11 +40,26 @@ func (eh *eventsHandler) ShouldProcessSaveBlockEvents(blockHash string) bool { } // GetLogEventsFromTransactionsPool exports internal method for testing -func (ei *eventsInterceptor) GetLogEventsFromTransactionsPool(logs []*outport.LogData) []data.Event { +func (ei *eventsInterceptor) GetLogEventsFromTransactionsPool(logs []*transaction.LogData) []data.Event { return ei.getLogEventsFromTransactionsPool(logs) } // GetStateAccessesPerAccounts - func (ei *eventsInterceptor) GetStateAccessesPerAccounts(eventsData *data.ArgsSaveBlockData) map[string]*stateChange.StateAccesses { - return ei.getStateAccessesPerAccounts(eventsData) + return ei.getStateAccessesPerAccounts(eventsData, hex.EncodeToString(eventsData.HeaderHash), eventsData.TransactionsPool) +} + +// GetStateAccessesPerAccountsV3 - +func (ei *eventsInterceptor) GetStateAccessesPerAccountsV3(eventsData *data.ArgsSaveBlockData) map[string]*stateChange.StateAccesses { + return ei.getStateAccessesPerAccountsV3(eventsData, hex.EncodeToString(eventsData.HeaderHash), eventsData.TransactionsPool) +} + +// BaseNilEventsDataCheks - +func BaseNilEventsDataCheks(eventsData *data.ArgsSaveBlockData) error { + return baseNilEventsDataChecks(eventsData) +} + +// GetTxsWithOrder exports internal method for testing +func GetTxsWithOrder(transactionsPool *outport.TransactionPool) []txWithOrder { + return getTxsWithOrder(transactionsPool) } diff --git a/process/interface.go b/process/interface.go index 5e9c8636..95378474 100644 --- a/process/interface.go +++ b/process/interface.go @@ -40,6 +40,7 @@ type EventsHandler interface { // EventsInterceptor defines the behaviour of an events interceptor component type EventsInterceptor interface { ProcessBlockEvents(eventsData *data.ArgsSaveBlockData) (*data.InterceptorBlockData, error) + ProcessBlockEventsV3(eventsData *data.ArgsSaveBlockData) ([]*data.InterceptorBlockData, error) IsInterfaceNil() bool } diff --git a/process/payloadHandler.go b/process/payloadHandler.go index 310c472d..49c813b9 100644 --- a/process/payloadHandler.go +++ b/process/payloadHandler.go @@ -2,6 +2,7 @@ package process import ( "errors" + "time" "github.com/multiversx/mx-chain-core-go/data/outport" ) @@ -65,6 +66,11 @@ func (ph *payloadHandler) saveBlock(marshalledData []byte, version uint32) error return ErrInvalidPayloadType } + t := time.Now() + defer func() { + log.Debug("saveBlock done", "duration", time.Since(t)) + }() + return dataProcessor.SaveBlock(marshalledData) } @@ -75,6 +81,11 @@ func (ph *payloadHandler) revertIndexedBlock(marshalledData []byte, version uint return ErrInvalidPayloadType } + t := time.Now() + defer func() { + log.Debug("revertIndexedBlock done", "duration", time.Since(t)) + }() + return dataProcessor.RevertIndexedBlock(marshalledData) } @@ -85,6 +96,11 @@ func (ph *payloadHandler) finalizedBlock(marshalledData []byte, version uint32) return ErrInvalidPayloadType } + t := time.Now() + defer func() { + log.Debug("finalizedBlock done", "duration", time.Since(t)) + }() + return dataProcessor.FinalizedBlock(marshalledData) } diff --git a/process/preprocess/basePreProcessor.go b/process/preprocess/basePreProcessor.go index 63a9bad8..fbed27bf 100644 --- a/process/preprocess/basePreProcessor.go +++ b/process/preprocess/basePreProcessor.go @@ -80,10 +80,20 @@ func createEmptyBlockCreatorContainer() (EmptyBlockCreatorContainer, error) { return nil, err } + err = container.Add(core.ShardHeaderV3, block.NewEmptyHeaderV3Creator()) + if err != nil { + return nil, err + } + err = container.Add(core.MetaHeader, block.NewEmptyMetaBlockCreator()) if err != nil { return nil, err } + err = container.Add(core.MetaHeaderV3, block.NewEmptyMetaBlockV3Creator()) + if err != nil { + return nil, err + } + return container, nil } diff --git a/process/preprocess/basePreProcessor_test.go b/process/preprocess/basePreProcessor_test.go index b944f8e0..19a1bc45 100644 --- a/process/preprocess/basePreProcessor_test.go +++ b/process/preprocess/basePreProcessor_test.go @@ -3,6 +3,7 @@ package preprocess_test import ( "testing" + "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/mock" "github.com/multiversx/mx-chain-notifier-go/common" "github.com/multiversx/mx-chain-notifier-go/mocks" @@ -51,3 +52,27 @@ func TestNewBaseEventsPreProcessor(t *testing.T) { require.NotNil(t, dp) }) } + +func TestCreateEmptyBlockCreatorContainer(t *testing.T) { + t.Parallel() + + cont, err := preprocess.CreateEmptyBlockCreatorContainer() + require.Nil(t, err) + + // shard + _, err = cont.Get(core.ShardHeaderV1) + require.Nil(t, err) + + _, err = cont.Get(core.ShardHeaderV2) + require.Nil(t, err) + + _, err = cont.Get(core.ShardHeaderV3) + require.Nil(t, err) + + // meta + _, err = cont.Get(core.MetaHeader) + require.Nil(t, err) + + _, err = cont.Get(core.MetaHeaderV3) + require.Nil(t, err) +} diff --git a/process/preprocess/eventsPreProcessorV0.go b/process/preprocess/eventsPreProcessorV0.go index 8d2398b1..74571a04 100644 --- a/process/preprocess/eventsPreProcessorV0.go +++ b/process/preprocess/eventsPreProcessorV0.go @@ -8,6 +8,7 @@ import ( nodeData "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" + "github.com/multiversx/mx-chain-core-go/data/transaction" "github.com/multiversx/mx-chain-notifier-go/data" "github.com/multiversx/mx-chain-notifier-go/process" ) @@ -57,6 +58,7 @@ func (d *eventsPreProcessorV0) SaveBlock(marshalledData []byte) error { NumberOfShards: blockData.NumberOfShards, TransactionsPool: txsPool, Header: header, + StateAccesses: blockData.StateAccesses, } err = d.facade.HandlePushEvents(*saveBlockData) @@ -113,14 +115,14 @@ func (d *eventsPreProcessorV0) parseScrs(scrs map[string]*data.NodeSmartContract return newScrs } -func (d *eventsPreProcessorV0) parseLogs(logs []*data.LogData) []*outport.LogData { - newLogs := make([]*outport.LogData, len(logs)) +func (d *eventsPreProcessorV0) parseLogs(logs []*data.LogData) []*transaction.LogData { + newLogs := make([]*transaction.LogData, len(logs)) for _, logHandler := range logs { if logHandler == nil { continue } - newLogs = append(newLogs, &outport.LogData{ + newLogs = append(newLogs, &transaction.LogData{ TxHash: logHandler.TxHash, Log: logHandler.LogHandler, }) diff --git a/process/preprocess/eventsPreProcessorV1.go b/process/preprocess/eventsPreProcessorV1.go index 9a4add06..060ff323 100644 --- a/process/preprocess/eventsPreProcessorV1.go +++ b/process/preprocess/eventsPreProcessorV1.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/multiversx/mx-chain-core-go/core" + coreData "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-notifier-go/data" ) @@ -44,16 +45,27 @@ func (d *eventsPreProcessorV1) SaveBlock(marshalledData []byte) error { return err } - err = checkBlockDataValid(outportBlock) + if outportBlock.BlockData == nil { + return ErrNilBlockData + } + + headerType := core.HeaderType(outportBlock.BlockData.HeaderType) + + header, err := d.getHeaderFromBytes(headerType, outportBlock.BlockData.HeaderBytes) if err != nil { return err } - header, err := d.getHeaderFromBytes(core.HeaderType(outportBlock.BlockData.HeaderType), outportBlock.BlockData.HeaderBytes) + err = checkHeaderGasConsumption(header, outportBlock) if err != nil { return err } + var executionResults map[string]*outport.ExecutionResultData + if header.IsHeaderV3() { + executionResults = outportBlock.BlockData.Results + } + saveBlockData := &data.ArgsSaveBlockData{ HeaderHash: outportBlock.BlockData.HeaderHash, Body: outportBlock.BlockData.Body, @@ -66,6 +78,8 @@ func (d *eventsPreProcessorV1) SaveBlock(marshalledData []byte) error { Header: header, HeaderTimeStampMs: outportBlock.BlockData.GetTimestampMs(), StateAccesses: outportBlock.GetStateAccesses(), + StateAccessesForBlock: outportBlock.GetStateAccessesForBlock(), + Results: executionResults, } err = d.facade.HandlePushEvents(*saveBlockData) @@ -76,15 +90,22 @@ func (d *eventsPreProcessorV1) SaveBlock(marshalledData []byte) error { return nil } -func checkBlockDataValid(block *outport.OutportBlock) error { - if block.BlockData == nil { - return ErrNilBlockData - } - if block.TransactionPool == nil { - return ErrNilTransactionPool +func checkHeaderGasConsumption(header coreData.HeaderHandler, block *outport.OutportBlock) error { + if !header.IsHeaderV3() { + if block.HeaderGasConsumption == nil { + return ErrNilHeaderGasConsumption + } + + return nil } - if block.HeaderGasConsumption == nil { - return ErrNilHeaderGasConsumption + + for _, execRes := range block.BlockData.Results { + if execRes == nil { + continue + } + if execRes.HeaderGasConsumption == nil { + return ErrNilHeaderGasConsumption + } } return nil diff --git a/process/preprocess/eventsPreProcessorV1_test.go b/process/preprocess/eventsPreProcessorV1_test.go index 0042ca77..84b6bea5 100644 --- a/process/preprocess/eventsPreProcessorV1_test.go +++ b/process/preprocess/eventsPreProcessorV1_test.go @@ -34,20 +34,6 @@ func TestPreProcessorV1_SaveBlock(t *testing.T) { require.Equal(t, preprocess.ErrNilBlockData, err) }) - t.Run("nil transaction pool", func(t *testing.T) { - t.Parallel() - - outportBlock := createDefaultOutportBlock() - outportBlock.TransactionPool = nil - - dp, err := preprocess.NewEventsPreProcessorV1(createMockEventsDataPreProcessorArgs()) - require.Nil(t, err) - - marshalledBlock, _ := json.Marshal(outportBlock) - err = dp.SaveBlock(marshalledBlock) - require.Equal(t, preprocess.ErrNilTransactionPool, err) - }) - t.Run("nil header gas consumption", func(t *testing.T) { t.Parallel() @@ -242,7 +228,7 @@ func createDefaultOutportBlock() *outport.OutportBlock { ExecutionOrder: 2, }, }, - Logs: []*outport.LogData{}, + Logs: []*transaction.LogData{}, }, HeaderGasConsumption: &outport.HeaderGasConsumption{ GasProvided: 3, diff --git a/process/preprocess/export_test.go b/process/preprocess/export_test.go index 24227833..2b790af0 100644 --- a/process/preprocess/export_test.go +++ b/process/preprocess/export_test.go @@ -4,3 +4,8 @@ package preprocess func NewBaseEventsPreProcessor(args ArgsEventsPreProcessor) (*baseEventsPreProcessor, error) { return newBaseEventsPreProcessor(args) } + +// CreateEmptyBlockCreatorContainer - +func CreateEmptyBlockCreatorContainer() (EmptyBlockCreatorContainer, error) { + return createEmptyBlockCreatorContainer() +} diff --git a/testdata/testData.go b/testdata/testData.go index 9962d4d3..a090cf92 100644 --- a/testdata/testData.go +++ b/testdata/testData.go @@ -11,7 +11,6 @@ import ( "github.com/multiversx/mx-chain-core-go/data/transaction" "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-notifier-go/common" - "github.com/multiversx/mx-chain-notifier-go/data" notifierData "github.com/multiversx/mx-chain-notifier-go/data" ) @@ -52,7 +51,28 @@ func (bd *blockData) OldSaveBlockData() *notifierData.SaveBlockData { // OutportBlockV0 - func (bd *blockData) OutportBlockV0() *notifierData.ArgsSaveBlock { - saveBlockData := data.OutportBlockDataOld{ + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + TxHash: []byte("txHash1"), + AccountChanges: 8, + }, + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + TxHash: []byte("txHash1"), + AccountChanges: 4, + }, + }, + } + stateAccesses["txHash2"] = &stateChange.StateAccesses{} + + saveBlockData := notifierData.OutportBlockDataOld{ HeaderHash: []byte("headerHash3"), Body: &block.Body{ MiniBlocks: []*block.MiniBlock{ @@ -102,9 +122,10 @@ func (bd *blockData) OutportBlockV0() *notifierData.ArgsSaveBlock { }, }, NumberOfShards: 2, + StateAccesses: stateAccesses, } - return &data.ArgsSaveBlock{ + return ¬ifierData.ArgsSaveBlock{ HeaderType: "Header", OutportBlockDataOld: saveBlockData, } @@ -117,33 +138,14 @@ func (bd *blockData) OutportBlockV1() *outport.OutportBlock { TimeStamp: 1234, } headerBytes, _ := bd.marshaller.Marshal(header) - - stateAccesses := make(map[string]*stateChange.StateAccesses) - stateAccesses["txHash1"] = &stateChange.StateAccesses{ - StateAccess: []*stateChange.StateAccess{ - &stateChange.StateAccess{ - Type: stateChange.Write, - MainTrieKey: []byte("mainTrieKey1"), - MainTrieVal: []byte("mainTrieVal1"), - TxHash: []byte("txHash1"), - AccountChanges: 8, - }, - &stateChange.StateAccess{ - Type: stateChange.Write, - MainTrieKey: []byte("mainTrieKey2"), - MainTrieVal: []byte("mainTrieVal2"), - TxHash: []byte("txHash1"), - AccountChanges: 4, - }, - }, - } - stateAccesses["txHash2"] = &stateChange.StateAccesses{} + headerHash := []byte("headerHash1") + stateAccessesForBlock := getStateAccessesForBlock(headerHash) return &outport.OutportBlock{ BlockData: &outport.BlockData{ HeaderBytes: headerBytes, HeaderType: "Header", - HeaderHash: []byte("headerHash1"), + HeaderHash: headerHash, Body: &block.Body{ MiniBlocks: []*block.MiniBlock{ { @@ -183,7 +185,7 @@ func (bd *blockData) OutportBlockV1() *outport.OutportBlock { ExecutionOrder: 0, }, }, - Logs: []*outport.LogData{ + Logs: []*transaction.LogData{ { Log: &transaction.Log{ Address: []byte("logaddr1"), @@ -193,8 +195,135 @@ func (bd *blockData) OutportBlockV1() *outport.OutportBlock { }, }, }, - StateAccesses: stateAccesses, - NumberOfShards: 2, + StateAccessesForBlock: stateAccessesForBlock, + NumberOfShards: 2, + } +} + +func getStateAccessesForBlock(headerHash []byte) map[string]*outport.StateAccessesForBlock { + stateAccesses := make(map[string]*stateChange.StateAccesses) + stateAccesses["txHash1"] = &stateChange.StateAccesses{ + StateAccess: []*stateChange.StateAccess{ + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey1"), + MainTrieVal: []byte("mainTrieVal1"), + TxHash: []byte("txHash1"), + AccountChanges: 8, + }, + &stateChange.StateAccess{ + Type: stateChange.Write, + MainTrieKey: []byte("mainTrieKey2"), + MainTrieVal: []byte("mainTrieVal2"), + TxHash: []byte("txHash1"), + AccountChanges: 4, + }, + }, + } + stateAccesses["txHash2"] = &stateChange.StateAccesses{} + stateAccessesForBlock := map[string]*outport.StateAccessesForBlock{} + stateAccessesForBlock[hex.EncodeToString(headerHash)] = &outport.StateAccessesForBlock{StateAccesses: stateAccesses} + return stateAccessesForBlock +} + +// OutportBlockV2 - +func (bd *blockData) OutportBlockV2() *outport.OutportBlock { + header := &block.HeaderV3{ + ShardID: 1, + TimestampMs: 1234, + } + headerBytes, _ := bd.marshaller.Marshal(header) + + execBlockHash := []byte("execBlockHash1") + stateAccessesForBlock := getStateAccessesForBlock(execBlockHash) + + blockBody := &block.Body{ + MiniBlocks: []*block.MiniBlock{ + { + TxHashes: [][]byte{}, + ReceiverShardID: 1, + SenderShardID: 1, + }, + }, + } + + execResTxPool := &outport.TransactionPool{ + Transactions: map[string]*outport.TxInfo{ + hex.EncodeToString([]byte("txHash1")): { + Transaction: &transaction.Transaction{ + Nonce: 1, + GasPrice: 1, + GasLimit: 1, + }, + FeeInfo: &outport.FeeInfo{ + GasUsed: 1, + }, + ExecutionOrder: 2, + }, + }, + SmartContractResults: map[string]*outport.SCRInfo{ + hex.EncodeToString([]byte("scrHash1")): { + SmartContractResult: &smartContractResult.SmartContractResult{ + Nonce: 2, + GasLimit: 2, + GasPrice: 2, + CallType: 2, + }, + FeeInfo: &outport.FeeInfo{ + GasUsed: 2, + }, + ExecutionOrder: 0, + }, + }, + Logs: []*transaction.LogData{ + { + Log: &transaction.Log{ + Address: []byte("logaddr1"), + Events: []*transaction.Event{ + { + Address: []byte("logaddr1"), + }, + }, + }, + TxHash: "txHash1", + }, + }, + } + + execBlockHash2 := []byte("execBlockHash2") + + execResults := map[string]*outport.ExecutionResultData{ + hex.EncodeToString(execBlockHash): { + Body: blockBody, + TransactionPool: execResTxPool, + HeaderGasConsumption: &outport.HeaderGasConsumption{}, + }, + hex.EncodeToString(execBlockHash2): { + Body: blockBody, + TransactionPool: execResTxPool, + HeaderGasConsumption: &outport.HeaderGasConsumption{}, + }, + } + + return &outport.OutportBlock{ + BlockData: &outport.BlockData{ + HeaderBytes: headerBytes, + HeaderType: "HeaderV3", + HeaderHash: []byte("headerHash1"), + Body: &block.Body{ + MiniBlocks: []*block.MiniBlock{ + { + TxHashes: [][]byte{}, + ReceiverShardID: 1, + SenderShardID: 1, + }, + }, + }, + Results: execResults, + }, + HeaderGasConsumption: &outport.HeaderGasConsumption{}, + NumberOfShards: 2, + StateAccessesForBlock: stateAccessesForBlock, } }