Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/late-sharks-burn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#bugfix Wire user metric limiters to WASM ModuleConfig
304 changes: 171 additions & 133 deletions .github/integration-in-memory-tests.yml

Large diffs are not rendered by default.

76 changes: 36 additions & 40 deletions core/capabilities/remote/executable/request/client_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
workflowID1 = "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0"
workflowExecutionID1 = "95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0abbadeed"
stepRef1 = "stepRef1"

testDispatcherChanCap = 100
)

func Test_ClientRequest_MessageValidation(t *testing.T) {
Expand Down Expand Up @@ -82,7 +84,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
ctx := t.Context()
capabilityPeers, capDonInfo, capInfo := capabilityDon(t, 2, 1)

dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
dispatcher := newClientRequestTestDispatcher()
req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo,
workflowDonInfo, dispatcher, 10*time.Minute, nil, "")
defer req.Cancel(errors.New("test end"))
Expand Down Expand Up @@ -133,7 +135,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
ctx := t.Context()
capabilityPeers, capDonInfo, capInfo := capabilityDon(t, 2, 1)

dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
dispatcher := newClientRequestTestDispatcher()
req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo,
workflowDonInfo, dispatcher, 10*time.Minute, nil, "")
require.NoError(t, err)
Expand Down Expand Up @@ -167,7 +169,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
ctx := t.Context()
capabilityPeers, capDonInfo, capInfo := capabilityDon(t, 2, 1)

dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
dispatcher := newClientRequestTestDispatcher()
req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo,
workflowDonInfo, dispatcher, 10*time.Minute, nil, "")
require.NoError(t, err)
Expand Down Expand Up @@ -198,15 +200,13 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
ctx := t.Context()
capabilityPeers, capDonInfo, capInfo := capabilityDon(t, 4, 1)

dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
dispatcher := newClientRequestTestDispatcher()
req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo,
workflowDonInfo, dispatcher, 10*time.Minute, nil, "")
require.NoError(t, err)
defer req.Cancel(errors.New("test end"))

<-dispatcher.msgs
<-dispatcher.msgs
assert.Empty(t, dispatcher.msgs)
drainInitialPeerSends(t, dispatcher, len(capabilityPeers))

msgWithError := &types.MessageBody{
CapabilityId: capInfo.ID,
Expand Down Expand Up @@ -242,15 +242,13 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
ctx := t.Context()
capabilityPeers, capDonInfo, capInfo := capabilityDon(t, 4, 1)

dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
dispatcher := newClientRequestTestDispatcher()
req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo,
workflowDonInfo, dispatcher, 10*time.Minute, nil, "")
require.NoError(t, err)
defer req.Cancel(errors.New("test end"))

<-dispatcher.msgs
<-dispatcher.msgs
assert.Empty(t, dispatcher.msgs)
drainInitialPeerSends(t, dispatcher, len(capabilityPeers))

serialized := caperrors.NewPublicUserError(errors.New("rpc error: EVM error invalid argument"), caperrors.FailedPrecondition).SerializeToRemoteString()
msgWithError := &types.MessageBody{
Expand Down Expand Up @@ -288,15 +286,13 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
ctx := t.Context()
capabilityPeers, capDonInfo, capInfo := capabilityDon(t, 4, 1)

dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
dispatcher := newClientRequestTestDispatcher()
req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo,
workflowDonInfo, dispatcher, 10*time.Minute, nil, "")
require.NoError(t, err)
defer req.Cancel(errors.New("test end"))

<-dispatcher.msgs
<-dispatcher.msgs
assert.Empty(t, dispatcher.msgs)
drainInitialPeerSends(t, dispatcher, len(capabilityPeers))

msgWithError := &types.MessageBody{
CapabilityId: capInfo.ID,
Expand Down Expand Up @@ -349,15 +345,13 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
ctx := t.Context()
capabilityPeers, capDonInfo, capInfo := capabilityDon(t, 4, 1)

dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
dispatcher := newClientRequestTestDispatcher()
req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo,
workflowDonInfo, dispatcher, 10*time.Minute, nil, "")
require.NoError(t, err)
defer req.Cancel(errors.New("test end"))

<-dispatcher.msgs
<-dispatcher.msgs
assert.Empty(t, dispatcher.msgs)
drainInitialPeerSends(t, dispatcher, len(capabilityPeers))

msg := &types.MessageBody{
CapabilityId: capInfo.ID,
Expand Down Expand Up @@ -397,9 +391,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
// that the schedule is still executed entirely.
cancelFn()

// Buffered channel so the goroutines block
// when executing the schedule
dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody)}
dispatcher := newClientRequestTestDispatcher()
req, err := request.NewClientExecuteRequest(
ctxWithCancel,
lggr,
Expand All @@ -416,10 +408,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {

// Despite the context being cancelled,
// we still send the full schedule.
<-dispatcher.msgs
<-dispatcher.msgs
<-dispatcher.msgs
assert.Empty(t, dispatcher.msgs)
drainInitialPeerSends(t, dispatcher, len(capPeers))

msg := &types.MessageBody{
CapabilityId: capInfo.ID,
Expand Down Expand Up @@ -516,9 +505,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
ctx, cancelFn := context.WithTimeout(ctx, 15*time.Second)
defer cancelFn()

// Buffered channel so the goroutines block
// when executing the schedule
dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody)}
dispatcher := newClientRequestTestDispatcher()
req, err := request.NewClientExecuteRequest(
ctx,
lggr,
Expand All @@ -535,10 +522,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {

// Despite the context being cancelled,
// we still send the full schedule.
<-dispatcher.msgs
<-dispatcher.msgs
<-dispatcher.msgs
assert.Empty(t, dispatcher.msgs)
drainInitialPeerSends(t, dispatcher, len(capPeers))

msg := &types.MessageBody{
CapabilityId: capInfo.ID,
Expand Down Expand Up @@ -619,7 +603,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {

ctx := t.Context()

dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
dispatcher := newClientRequestTestDispatcher()
req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo,
workflowDonInfo, dispatcher, 10*time.Minute, nil, "")
require.NoError(t, err)
Expand Down Expand Up @@ -672,7 +656,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
beholderTester := beholdertest.NewObserver(t)
lggr, obs := logger.TestObserved(t, zapcore.DebugLevel)
capPeers, capDonInfo, capInfo := capabilityDon(t, 3, 1)
dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody)}
dispatcher := newClientRequestTestDispatcher()
req, err := request.NewClientExecuteRequest(
t.Context(),
lggr,
Expand All @@ -690,11 +674,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
require.NoError(t, err)
defer req.Cancel(errors.New("test end"))

// Expect all 3 capability nodes to receive the request.
<-dispatcher.msgs
<-dispatcher.msgs
<-dispatcher.msgs
assert.Empty(t, dispatcher.msgs)
drainInitialPeerSends(t, dispatcher, len(capPeers))

msg := &types.MessageBody{
CapabilityId: capInfo.ID,
Expand Down Expand Up @@ -754,6 +734,22 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
})
}

func newClientRequestTestDispatcher() *clientRequestTestDispatcher {
return &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, testDispatcherChanCap)}
}

func drainInitialPeerSends(t *testing.T, d *clientRequestTestDispatcher, numCapabilityPeers int) {
t.Helper()
require.Eventually(t, func() bool {
return len(d.msgs) == numCapabilityPeers
}, 2*time.Second, time.Millisecond, "timed out waiting for %d buffered outbound messages", numCapabilityPeers)
require.Len(t, d.msgs, numCapabilityPeers, "dispatcher outbound buffer before draining initial peer sends")
for range numCapabilityPeers {
<-d.msgs
}
require.Empty(t, d.msgs)
}

func capabilityDon(t *testing.T, numCapabilityPeers int, f uint8) ([]p2ptypes.PeerID, commoncap.DON, commoncap.CapabilityInfo) {
capabilityPeers := make([]p2ptypes.PeerID, numCapabilityPeers)
for i := range numCapabilityPeers {
Expand Down
7 changes: 4 additions & 3 deletions core/internal/cltest/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -168,15 +169,15 @@ func NewHTTPMockServer(
response string,
callback ...func(http.Header, string),
) *httptest.Server {
called := false
var called atomic.Bool
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
b, err := io.ReadAll(r.Body)
assert.NoError(t, err)
assert.Equal(t, wantMethod, r.Method)
if len(callback) > 0 {
callback[0](r.Header, string(b))
}
called = true
called.Store(true)

w.WriteHeader(status)
_, _ = io.WriteString(w, response) // Assignment for errcheck. Only used in tests so we can ignore.
Expand All @@ -185,7 +186,7 @@ func NewHTTPMockServer(
server := httptest.NewServer(handler)
t.Cleanup(func() {
server.Close()
assert.True(t, called, "expected call Mock HTTP endpoint '%s'", server.URL)
assert.True(t, called.Load(), "expected call Mock HTTP endpoint '%s'", server.URL)
})
return server
}
Expand Down
4 changes: 3 additions & 1 deletion core/scripts/cre/environment/completions.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func buildCommandTree() *CompletionNode {
Suggestions: []prompt.Suggest{
{Text: "start", Description: "Spin up the development environment"},
{Text: "stop", Description: "Tear down the development environment"},
{Text: "status", Description: "Show status of local CRE services"},
{Text: "restart", Description: "Restart the development environment"},
{Text: "setup", Description: "Setup the CRE environment prerequisites"},
{Text: "build-caps", Description: "Build capabilities binaries"},
Expand Down Expand Up @@ -257,7 +258,7 @@ func buildCommandTree() *CompletionNode {
// ENV STOP - flags
envStopNode := &CompletionNode{
Flags: []prompt.Suggest{
{Text: "--all", Description: "Remove also all extra services (beholder, billing) (default: false)"},
{Text: "--all", Description: "Remove also all extra services (beholder, billing, observability) (default: false)"},
},
}

Expand All @@ -269,6 +270,7 @@ func buildCommandTree() *CompletionNode {

envNode.Children["start"] = envStartNode
envNode.Children["stop"] = envStopNode
envNode.Children["status"] = &CompletionNode{}
envNode.Children["restart"] = envRestartNode

// ENV SETUP - setup prerequisites
Expand Down
Loading
Loading