From 65ed9b4f4ead8f8c566c9d673fd9812fee7b7e99 Mon Sep 17 00:00:00 2001 From: Jason-Wanxt Date: Thu, 17 Jul 2025 20:07:07 +0800 Subject: [PATCH 1/5] fix post multiple same assertion creation tx --- assertions/manager.go | 6 ++++++ assertions/poster.go | 3 ++- assertions/sync.go | 29 +++++++++++++++++++++++++++-- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/assertions/manager.go b/assertions/manager.go index 6a4504d790..3e11f185f0 100644 --- a/assertions/manager.go +++ b/assertions/manager.go @@ -94,6 +94,9 @@ type Manager struct { autoDeposit bool autoAllowanceApproval bool maxGetLogBlocks uint64 + // New + confirming *threadsafe.LruSet[protocol.AssertionHash] + confirmQueueMutex sync.Mutex } type assertionChainData struct { @@ -248,6 +251,9 @@ func NewManager( autoDeposit: true, autoAllowanceApproval: true, maxGetLogBlocks: 1000, + + confirming: threadsafe.NewLruSet[protocol.AssertionHash](maxAssertions), + confirmQueueMutex: sync.Mutex{}, } for _, o := range opts { o(m) diff --git a/assertions/poster.go b/assertions/poster.go index 0148b8e6aa..2a14cf99c3 100644 --- a/assertions/poster.go +++ b/assertions/poster.go @@ -196,7 +196,8 @@ func (m *Manager) PostAssertionBasedOnParent( "postedExecutionState", fmt.Sprintf("%+v", newState), "assertionHash", assertion.Id(), ) - m.observedCanonicalAssertions <- assertion.Id() + // m.observedCanonicalAssertions <- assertion.Id() + m.sendToConfirmationQueue(assertion.Id(), "PostAssertionBasedOnParent") return option.Some(assertion), nil } diff --git a/assertions/sync.go b/assertions/sync.go index ee5c8b2696..a91a2b7de6 100644 --- a/assertions/sync.go +++ b/assertions/sync.go @@ -302,7 +302,8 @@ func (m *Manager) findCanonicalAssertionBranch( cursor = assertion.AssertionHash m.assertionChainData.latestAgreedAssertion = cursor m.assertionChainData.canonicalAssertions[cursor] = assertion - m.observedCanonicalAssertions <- cursor + // m.observedCanonicalAssertions <- cursor + m.sendToConfirmationQueue(cursor, "findCanonicalAssertionBranch") } } } @@ -359,7 +360,8 @@ func (m *Manager) respondToAnyInvalidAssertions( m.assertionChainData.canonicalAssertions[postedAssertionHash] = postedRival m.submittedAssertions.Insert(postedAssertionHash) m.submittedRivalsCount++ - m.observedCanonicalAssertions <- postedAssertionHash + // m.observedCanonicalAssertions <- postedAssertionHash + m.sendToConfirmationQueue(postedAssertionHash, "respondToAnyInvalidAssertions") } } } @@ -553,3 +555,26 @@ func (m *Manager) saveAssertionToDB(ctx context.Context, creationInfo *protocol. Status: status.String(), }) } + +// Send assertion to confirmation queue +func (m *Manager) sendToConfirmationQueue(assertionHash protocol.AssertionHash, addedBy string) { + m.confirmQueueMutex.Lock() + defer m.confirmQueueMutex.Unlock() + + // Check if assertion is already in confirmation queue + if m.confirming.Has(assertionHash) { + log.Info("Assertion already in confirmation queue", "assertionHash", assertionHash, "addedBy", addedBy) + return // Already in confirmation queue, skip + } + log.Info("Sending assertion to confirmation queue", "assertionHash", assertionHash, "addedBy", addedBy) + // Mark as confirming + m.confirming.Insert(assertionHash) + + // Send to confirmation queue + select { + case m.observedCanonicalAssertions <- assertionHash: + case <-m.GetContext().Done(): + // If sending fails, remove from set + m.confirming.Delete(assertionHash) + } +} From 0a747fb0f9909616396a2d9a25b4070bf0ef2584 Mon Sep 17 00:00:00 2001 From: Jason-Wanxt Date: Thu, 17 Jul 2025 22:18:14 +0800 Subject: [PATCH 2/5] format and change confirmation queue existed log to debug type --- assertions/manager.go | 5 ++--- assertions/poster.go | 2 +- assertions/sync.go | 4 +--- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/assertions/manager.go b/assertions/manager.go index 3e11f185f0..02246ae94d 100644 --- a/assertions/manager.go +++ b/assertions/manager.go @@ -94,9 +94,8 @@ type Manager struct { autoDeposit bool autoAllowanceApproval bool maxGetLogBlocks uint64 - // New - confirming *threadsafe.LruSet[protocol.AssertionHash] - confirmQueueMutex sync.Mutex + confirming *threadsafe.LruSet[protocol.AssertionHash] + confirmQueueMutex sync.Mutex } type assertionChainData struct { diff --git a/assertions/poster.go b/assertions/poster.go index 2a14cf99c3..fcb3c26563 100644 --- a/assertions/poster.go +++ b/assertions/poster.go @@ -196,7 +196,7 @@ func (m *Manager) PostAssertionBasedOnParent( "postedExecutionState", fmt.Sprintf("%+v", newState), "assertionHash", assertion.Id(), ) - // m.observedCanonicalAssertions <- assertion.Id() + m.sendToConfirmationQueue(assertion.Id(), "PostAssertionBasedOnParent") return option.Some(assertion), nil } diff --git a/assertions/sync.go b/assertions/sync.go index a91a2b7de6..146a6e9e94 100644 --- a/assertions/sync.go +++ b/assertions/sync.go @@ -302,7 +302,6 @@ func (m *Manager) findCanonicalAssertionBranch( cursor = assertion.AssertionHash m.assertionChainData.latestAgreedAssertion = cursor m.assertionChainData.canonicalAssertions[cursor] = assertion - // m.observedCanonicalAssertions <- cursor m.sendToConfirmationQueue(cursor, "findCanonicalAssertionBranch") } } @@ -360,7 +359,6 @@ func (m *Manager) respondToAnyInvalidAssertions( m.assertionChainData.canonicalAssertions[postedAssertionHash] = postedRival m.submittedAssertions.Insert(postedAssertionHash) m.submittedRivalsCount++ - // m.observedCanonicalAssertions <- postedAssertionHash m.sendToConfirmationQueue(postedAssertionHash, "respondToAnyInvalidAssertions") } } @@ -563,7 +561,7 @@ func (m *Manager) sendToConfirmationQueue(assertionHash protocol.AssertionHash, // Check if assertion is already in confirmation queue if m.confirming.Has(assertionHash) { - log.Info("Assertion already in confirmation queue", "assertionHash", assertionHash, "addedBy", addedBy) + log.Debug("Assertion already in confirmation queue", "assertionHash", assertionHash, "addedBy", addedBy) return // Already in confirmation queue, skip } log.Info("Sending assertion to confirmation queue", "assertionHash", assertionHash, "addedBy", addedBy) From 0095e9083cc68fedfba010d14873162cac27dda2 Mon Sep 17 00:00:00 2001 From: Jason-Wanxt Date: Thu, 17 Jul 2025 22:20:53 +0800 Subject: [PATCH 3/5] remove unused line --- assertions/manager.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/assertions/manager.go b/assertions/manager.go index 02246ae94d..7578f8c5c0 100644 --- a/assertions/manager.go +++ b/assertions/manager.go @@ -250,9 +250,8 @@ func NewManager( autoDeposit: true, autoAllowanceApproval: true, maxGetLogBlocks: 1000, - - confirming: threadsafe.NewLruSet[protocol.AssertionHash](maxAssertions), - confirmQueueMutex: sync.Mutex{}, + confirming: threadsafe.NewLruSet[protocol.AssertionHash](maxAssertions), + confirmQueueMutex: sync.Mutex{}, } for _, o := range opts { o(m) From d8b4719618160c2eb0b2e91b4f8dfc9a7a386c3e Mon Sep 17 00:00:00 2001 From: Jason-Wanxt Date: Fri, 18 Jul 2025 02:50:02 +0800 Subject: [PATCH 4/5] add confirming init to test --- assertions/sync_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/assertions/sync_test.go b/assertions/sync_test.go index 53aace50d3..034426fa95 100644 --- a/assertions/sync_test.go +++ b/assertions/sync_test.go @@ -136,6 +136,7 @@ func Test_findCanonicalAssertionBranch(t *testing.T) { execProvider: provider, chain: setup.Chains[0], observedCanonicalAssertions: make(chan protocol.AssertionHash), + confirming: threadsafe.NewLruSet[protocol.AssertionHash](1000), assertionChainData: &assertionChainData{ latestAgreedAssertion: numToAssertionHash(1), canonicalAssertions: make(map[protocol.AssertionHash]*protocol.AssertionCreatedInfo), @@ -267,6 +268,7 @@ func Test_respondToAnyInvalidAssertions(t *testing.T) { manager := &Manager{ observedCanonicalAssertions: make(chan protocol.AssertionHash), submittedAssertions: threadsafe.NewLruSet(1000, threadsafe.LruSetWithMetric[protocol.AssertionHash]("submittedAssertions")), + confirming: threadsafe.NewLruSet[protocol.AssertionHash](1000), assertionChainData: &assertionChainData{ latestAgreedAssertion: numToAssertionHash(1), canonicalAssertions: make(map[protocol.AssertionHash]*protocol.AssertionCreatedInfo), From 616e55d1735a90542d3844fb8333ddc63f0f0f13 Mon Sep 17 00:00:00 2001 From: Jason-Wanxt Date: Fri, 18 Jul 2025 03:28:34 +0800 Subject: [PATCH 5/5] use default instead of depends on StopWaiter to avoid panic in testing --- assertions/sync.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/assertions/sync.go b/assertions/sync.go index 146a6e9e94..e35561282c 100644 --- a/assertions/sync.go +++ b/assertions/sync.go @@ -571,8 +571,8 @@ func (m *Manager) sendToConfirmationQueue(assertionHash protocol.AssertionHash, // Send to confirmation queue select { case m.observedCanonicalAssertions <- assertionHash: - case <-m.GetContext().Done(): - // If sending fails, remove from set + default: m.confirming.Delete(assertionHash) + log.Warn("Failed to send assertion to confirmation queue: channel full", "assertionHash", assertionHash, "addedBy", addedBy) } }