diff --git a/assertions/manager.go b/assertions/manager.go index 6a4504d790..7578f8c5c0 100644 --- a/assertions/manager.go +++ b/assertions/manager.go @@ -94,6 +94,8 @@ type Manager struct { autoDeposit bool autoAllowanceApproval bool maxGetLogBlocks uint64 + confirming *threadsafe.LruSet[protocol.AssertionHash] + confirmQueueMutex sync.Mutex } type assertionChainData struct { @@ -248,6 +250,8 @@ func NewManager( autoDeposit: true, autoAllowanceApproval: true, maxGetLogBlocks: 1000, + confirming: threadsafe.NewLruSet[protocol.AssertionHash](maxAssertions), + confirmQueueMutex: sync.Mutex{}, } for _, o := range opts { o(m) diff --git a/assertions/poster.go b/assertions/poster.go index 0148b8e6aa..fcb3c26563 100644 --- a/assertions/poster.go +++ b/assertions/poster.go @@ -196,7 +196,8 @@ func (m *Manager) PostAssertionBasedOnParent( "postedExecutionState", fmt.Sprintf("%+v", newState), "assertionHash", assertion.Id(), ) - m.observedCanonicalAssertions <- assertion.Id() + + m.sendToConfirmationQueue(assertion.Id(), "PostAssertionBasedOnParent") return option.Some(assertion), nil } diff --git a/assertions/sync.go b/assertions/sync.go index da27f05640..7193579d0a 100644 --- a/assertions/sync.go +++ b/assertions/sync.go @@ -308,7 +308,7 @@ func (m *Manager) findCanonicalAssertionBranch( cursor = assertion.AssertionHash m.assertionChainData.latestAgreedAssertion = cursor m.assertionChainData.canonicalAssertions[cursor] = assertion - m.observedCanonicalAssertions <- cursor + m.sendToConfirmationQueue(cursor, "findCanonicalAssertionBranch") } } } @@ -365,7 +365,7 @@ func (m *Manager) respondToAnyInvalidAssertions( m.assertionChainData.canonicalAssertions[postedAssertionHash] = postedRival m.submittedAssertions.Insert(postedAssertionHash) m.submittedRivalsCount++ - m.observedCanonicalAssertions <- postedAssertionHash + m.sendToConfirmationQueue(postedAssertionHash, "respondToAnyInvalidAssertions") } } } @@ -559,3 +559,26 @@ func (m *Manager) saveAssertionToDB(ctx context.Context, creationInfo *protocol. Status: status.String(), }) } + +// Send assertion to confirmation queue +func (m *Manager) sendToConfirmationQueue(assertionHash protocol.AssertionHash, addedBy string) { + m.confirmQueueMutex.Lock() + defer m.confirmQueueMutex.Unlock() + + // Check if assertion is already in confirmation queue + if m.confirming.Has(assertionHash) { + log.Debug("Assertion already in confirmation queue", "assertionHash", assertionHash, "addedBy", addedBy) + return // Already in confirmation queue, skip + } + log.Info("Sending assertion to confirmation queue", "assertionHash", assertionHash, "addedBy", addedBy) + // Mark as confirming + m.confirming.Insert(assertionHash) + + // Send to confirmation queue + select { + case m.observedCanonicalAssertions <- assertionHash: + default: + m.confirming.Delete(assertionHash) + log.Warn("Failed to send assertion to confirmation queue: channel full", "assertionHash", assertionHash, "addedBy", addedBy) + } +} diff --git a/assertions/sync_test.go b/assertions/sync_test.go index 53aace50d3..034426fa95 100644 --- a/assertions/sync_test.go +++ b/assertions/sync_test.go @@ -136,6 +136,7 @@ func Test_findCanonicalAssertionBranch(t *testing.T) { execProvider: provider, chain: setup.Chains[0], observedCanonicalAssertions: make(chan protocol.AssertionHash), + confirming: threadsafe.NewLruSet[protocol.AssertionHash](1000), assertionChainData: &assertionChainData{ latestAgreedAssertion: numToAssertionHash(1), canonicalAssertions: make(map[protocol.AssertionHash]*protocol.AssertionCreatedInfo), @@ -267,6 +268,7 @@ func Test_respondToAnyInvalidAssertions(t *testing.T) { manager := &Manager{ observedCanonicalAssertions: make(chan protocol.AssertionHash), submittedAssertions: threadsafe.NewLruSet(1000, threadsafe.LruSetWithMetric[protocol.AssertionHash]("submittedAssertions")), + confirming: threadsafe.NewLruSet[protocol.AssertionHash](1000), assertionChainData: &assertionChainData{ latestAgreedAssertion: numToAssertionHash(1), canonicalAssertions: make(map[protocol.AssertionHash]*protocol.AssertionCreatedInfo),