From 4aed9c48fac8b2cfdff61ebc81151f92df4ec98e Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 03:36:28 -0700 Subject: [PATCH 01/18] tests fix --- client/build.gradle | 2 +- .../durabletask/IntegrationTests.java | 47 +++++++++++++++---- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/client/build.gradle b/client/build.gradle index 3588b3ac..82685f85 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -17,7 +17,7 @@ def jacksonVersion = '2.15.3' // When build on local, you need to set this value to your local jdk11 directory. // Java11 is used to compile and run all the tests. // Example for Windows: C:/Program Files/Java/openjdk-11.0.12_7/ -def PATH_TO_TEST_JAVA_RUNTIME = "$System.env.JDK_11" +def PATH_TO_TEST_JAVA_RUNTIME = System.env.JDK_11 ?: System.getProperty("java.home") dependencies { diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index cc7a790a..bcdf62c1 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -15,6 +15,7 @@ import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -36,6 +37,13 @@ public class IntegrationTests extends IntegrationTestBase { // All tests that create a server should save it to this variable for proper shutdown private DurableTaskGrpcWorker server; + // Before whole test suite, delete the task hub + @BeforeEach + private void startUp() { + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + client.deleteTaskHub(); + } + @AfterEach private void shutdown() throws InterruptedException { if (this.server != null) { @@ -793,6 +801,7 @@ void clearCustomStatus() throws TimeoutException { } } + // due to clock drift, client/worker and sidecar time are not exactly synchronized, this test needs to accommodate for client vs backend timestamps difference @Test void multiInstanceQuery() throws TimeoutException{ final String plusOne = "plusOne"; @@ -830,6 +839,11 @@ void multiInstanceQuery() throws TimeoutException{ } }); + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + } + Instant sequencesFinishedTime = Instant.now(); IntStream.range(0, 5).mapToObj(i -> { @@ -853,28 +867,43 @@ void multiInstanceQuery() throws TimeoutException{ assertEquals(10, result.getOrchestrationState().size()); // Test CreatedTimeTo filter - query.setCreatedTimeTo(startTime); + query.setCreatedTimeTo(startTime.minus(Duration.ofSeconds(1))); result = client.queryInstances(query); - assertTrue(result.getOrchestrationState().isEmpty()); + assertTrue(result.getOrchestrationState().isEmpty(), + "Result should be empty but found " + result.getOrchestrationState().size() + " instances: " + + "Start time: " + startTime + ", " + + result.getOrchestrationState().stream() + .map(state -> String.format("\nID: %s, Status: %s, Created: %s", + state.getInstanceId(), + state.getRuntimeStatus(), + state.getCreatedAt())) + .collect(Collectors.joining(", "))); query.setCreatedTimeTo(sequencesFinishedTime); result = client.queryInstances(query); - assertEquals(5, result.getOrchestrationState().size()); + // Verify all returned instances contain "sequence" in their IDs + assertEquals(5, result.getOrchestrationState().stream() + .filter(state -> state.getInstanceId().contains("sequence")) + .count(), + "Expected exactly 5 instances with 'sequence' in their IDs"); - query.setCreatedTimeTo(Instant.now()); + query.setCreatedTimeTo(Instant.now().plus(Duration.ofSeconds(1))); result = client.queryInstances(query); assertEquals(10, result.getOrchestrationState().size()); // Test CreatedTimeFrom filter - query.setCreatedTimeFrom(Instant.now()); + query.setCreatedTimeFrom(Instant.now().plus(Duration.ofSeconds(1))); result = client.queryInstances(query); assertTrue(result.getOrchestrationState().isEmpty()); - query.setCreatedTimeFrom(sequencesFinishedTime); + query.setCreatedTimeFrom(sequencesFinishedTime.minus(Duration.ofSeconds(3))); result = client.queryInstances(query); - assertEquals(5, result.getOrchestrationState().size()); + assertEquals(5, result.getOrchestrationState().stream() + .filter(state -> state.getInstanceId().contains("sequence")) + .count(), + "Expected exactly 5 instances with 'sequence' in their IDs"); - query.setCreatedTimeFrom(startTime); + query.setCreatedTimeFrom(startTime.minus(Duration.ofSeconds(1))); result = client.queryInstances(query); assertEquals(10, result.getOrchestrationState().size()); @@ -1028,7 +1057,7 @@ void purgeInstanceFilter() throws TimeoutException { // Test CreatedTimeFrom PurgeInstanceCriteria criteria = new PurgeInstanceCriteria(); - criteria.setCreatedTimeFrom(startTime); + criteria.setCreatedTimeFrom(startTime.minus(Duration.ofSeconds(1))); PurgeResult result = client.purgeInstances(criteria); assertEquals(1, result.getDeletedInstanceCount()); From 2e9348235815ce2774b747356c188cea7fd306ad Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 03:41:33 -0700 Subject: [PATCH 02/18] Update build-validation.yml and build.gradle to enforce test failure handling --- .github/workflows/build-validation.yml | 1 + client/build.gradle | 2 ++ 2 files changed, 3 insertions(+) diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index 9262f86d..18065572 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -56,6 +56,7 @@ jobs: - name: Integration Tests with Gradle run: ./gradlew integrationTest + continue-on-error: false - name: Archive test report uses: actions/upload-artifact@v2 diff --git a/client/build.gradle b/client/build.gradle index 82685f85..4684e9d8 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -94,6 +94,8 @@ task integrationTest(type: Test) { dependsOn build shouldRunAfter test testLogging.showStandardStreams = true + + ignoreFailures = false } publishing { From a9fe0d6520b8ca4f314c56900646b1668bf2f57f Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 03:56:03 -0700 Subject: [PATCH 03/18] build docker from sidecar repo --- .github/workflows/build-validation.yml | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index 270cdd83..5adedcab 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -44,11 +44,21 @@ jobs: - name: Build with Gradle run: ./gradlew build - # TODO: Move the sidecar into a central image repository - - name: Initialize Durable Task Sidecar - run: docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' -d kaibocai/durabletask-sidecar:latest start --backend Emulator + - name: Checkout DurableTask Sidecar Repository + uses: actions/checkout@v4 + with: + repository: microsoft/durabletask-sidecar + path: durabletask-sidecar + + - name: Build DurableTask Sidecar Docker Image + run: | + docker build -t durabletask-sidecar:latest ./durabletask-sidecar + + - name: Run DurableTask Sidecar Container + run: | + docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' -d durabletask-sidecar:latest start --backend Emulator - # wait for 10 seconds, so sidecar container can be fully up, this will avoid intermittent failing issues for integration tests causing by failed to connect to sidecar + # wait for 10 seconds, so sidecar container can be fully up, this will avoid intermittent failing issues for integration tests causing by failed to connect to sidecar - name: Wait for 10 seconds run: sleep 10 From 2ee5264ea3e83ea22d26e41604bc33847eba9b60 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 04:11:52 -0700 Subject: [PATCH 04/18] Revert "build docker from sidecar repo" This reverts commit a9fe0d6520b8ca4f314c56900646b1668bf2f57f. --- .github/workflows/build-validation.yml | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index 5adedcab..270cdd83 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -44,21 +44,11 @@ jobs: - name: Build with Gradle run: ./gradlew build - - name: Checkout DurableTask Sidecar Repository - uses: actions/checkout@v4 - with: - repository: microsoft/durabletask-sidecar - path: durabletask-sidecar - - - name: Build DurableTask Sidecar Docker Image - run: | - docker build -t durabletask-sidecar:latest ./durabletask-sidecar - - - name: Run DurableTask Sidecar Container - run: | - docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' -d durabletask-sidecar:latest start --backend Emulator + # TODO: Move the sidecar into a central image repository + - name: Initialize Durable Task Sidecar + run: docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' -d kaibocai/durabletask-sidecar:latest start --backend Emulator - # wait for 10 seconds, so sidecar container can be fully up, this will avoid intermittent failing issues for integration tests causing by failed to connect to sidecar + # wait for 10 seconds, so sidecar container can be fully up, this will avoid intermittent failing issues for integration tests causing by failed to connect to sidecar - name: Wait for 10 seconds run: sleep 10 From d50ffda79e5e476066423eaec9803516d86c0264 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 04:12:13 -0700 Subject: [PATCH 05/18] update sidecar --- .github/workflows/build-validation.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index 270cdd83..bda64f77 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -46,7 +46,7 @@ jobs: # TODO: Move the sidecar into a central image repository - name: Initialize Durable Task Sidecar - run: docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' -d kaibocai/durabletask-sidecar:latest start --backend Emulator + run: docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' -d peterstone2019/durabletask-sidecar:latest start --backend Emulator # wait for 10 seconds, so sidecar container can be fully up, this will avoid intermittent failing issues for integration tests causing by failed to connect to sidecar - name: Wait for 10 seconds From cd53889249b05b9cf7ce77519963ac608604ef9f Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 04:19:22 -0700 Subject: [PATCH 06/18] fail at last step --- .github/workflows/build-validation.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index bda64f77..c6105da9 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -53,8 +53,8 @@ jobs: run: sleep 10 - name: Integration Tests with Gradle - run: ./gradlew integrationTest - continue-on-error: false + run: ./gradlew integrationTest || echo "TEST_FAILED=true" >> $GITHUB_ENV + continue-on-error: true - name: Archive test report uses: actions/upload-artifact@v4 @@ -68,6 +68,10 @@ jobs: name: Package path: client/build/libs + - name: Fail the job if tests failed + if: env.TEST_FAILED == 'true' + run: exit 1 + functions-e2e-tests: needs: build From 10c3930c047924f985b250134600d33f12eaa551 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 04:31:39 -0700 Subject: [PATCH 07/18] Add deployment steps for test reports to GitHub Pages --- .github/workflows/build-validation.yml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index c6105da9..f9e80f48 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -68,6 +68,21 @@ jobs: name: Package path: client/build/libs + - name: Deploy Test Report to GitHub Pages + if: always() + run: | + mkdir -p gh-pages + cp -r client/build/reports/tests/integrationTest/* gh-pages/ + touch gh-pages/.nojekyll # Ensures GitHub Pages processes static files correctly + + - name: Deploy to GitHub Pages + if: always() + uses: JamesIves/github-pages-deploy-action@v4 + with: + branch: gh-pages + folder: gh-pages + clean: true # Ensures old files are removed + - name: Fail the job if tests failed if: env.TEST_FAILED == 'true' run: exit 1 From 0d05f720cd2630c5d5096fe85d4e349ffdc7a1dc Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 04:39:09 -0700 Subject: [PATCH 08/18] fix tests --- .../test/java/com/microsoft/durabletask/IntegrationTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index bcdf62c1..5e0804ad 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -114,6 +114,7 @@ void longTimer() throws TimeoutException { DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); try (worker; client) { + client.createTaskHub(true); String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); Duration timeout = delay.plus(defaultTimeout); OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false); @@ -896,7 +897,7 @@ void multiInstanceQuery() throws TimeoutException{ result = client.queryInstances(query); assertTrue(result.getOrchestrationState().isEmpty()); - query.setCreatedTimeFrom(sequencesFinishedTime.minus(Duration.ofSeconds(3))); + query.setCreatedTimeFrom(sequencesFinishedTime.minus(Duration.ofSeconds(5))); result = client.queryInstances(query); assertEquals(5, result.getOrchestrationState().stream() .filter(state -> state.getInstanceId().contains("sequence")) From 5b6692d5f627dc2b79f476267f1811067802fcf1 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 04:39:13 -0700 Subject: [PATCH 09/18] Revert "Add deployment steps for test reports to GitHub Pages" This reverts commit 10c3930c047924f985b250134600d33f12eaa551. --- .github/workflows/build-validation.yml | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index f9e80f48..c6105da9 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -68,21 +68,6 @@ jobs: name: Package path: client/build/libs - - name: Deploy Test Report to GitHub Pages - if: always() - run: | - mkdir -p gh-pages - cp -r client/build/reports/tests/integrationTest/* gh-pages/ - touch gh-pages/.nojekyll # Ensures GitHub Pages processes static files correctly - - - name: Deploy to GitHub Pages - if: always() - uses: JamesIves/github-pages-deploy-action@v4 - with: - branch: gh-pages - folder: gh-pages - clean: true # Ensures old files are removed - - name: Fail the job if tests failed if: env.TEST_FAILED == 'true' run: exit 1 From 9dfffdeffa9e2ee5d8926350c5611b63c6d72b0e Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 08:54:39 -0700 Subject: [PATCH 10/18] surface sidecar log --- .github/workflows/build-validation.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index c6105da9..24490b6d 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -56,6 +56,9 @@ jobs: run: ./gradlew integrationTest || echo "TEST_FAILED=true" >> $GITHUB_ENV continue-on-error: true + - name: Display Durable Task Sidecar Logs + run: docker logs -f durabletask-sidecar + - name: Archive test report uses: actions/upload-artifact@v4 with: From ef9d465b1a04963ea9b9fc9e070476582ed52458 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 08:58:36 -0700 Subject: [PATCH 11/18] cleanup error tests state --- .../durabletask/ErrorHandlingIntegrationTests.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/client/src/test/java/com/microsoft/durabletask/ErrorHandlingIntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/ErrorHandlingIntegrationTests.java index 8920d67b..24605a57 100644 --- a/client/src/test/java/com/microsoft/durabletask/ErrorHandlingIntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/ErrorHandlingIntegrationTests.java @@ -24,6 +24,12 @@ */ @Tag("integration") public class ErrorHandlingIntegrationTests extends IntegrationTestBase { + @BeforeEach + private void startUp() { + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + client.deleteTaskHub(); + } + @Test void orchestratorException() throws TimeoutException { final String orchestratorName = "OrchestratorWithException"; From 99aef8cc2aab2b3057e242ff249b5ebb68c3e272 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 09:17:26 -0700 Subject: [PATCH 12/18] Add logging for Durable Task Sidecar and update integration test setup --- .github/workflows/build-validation.yml | 7 +++++-- .../durabletask/ErrorHandlingIntegrationTests.java | 1 + 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index 24490b6d..746acb47 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -48,6 +48,9 @@ jobs: - name: Initialize Durable Task Sidecar run: docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' -d peterstone2019/durabletask-sidecar:latest start --backend Emulator + - name: Display Durable Task Sidecar Logs + run: nohup docker logs --since=0 durabletask-sidecar > durabletask-sidecar.log 2>&1 & + # wait for 10 seconds, so sidecar container can be fully up, this will avoid intermittent failing issues for integration tests causing by failed to connect to sidecar - name: Wait for 10 seconds run: sleep 10 @@ -56,8 +59,8 @@ jobs: run: ./gradlew integrationTest || echo "TEST_FAILED=true" >> $GITHUB_ENV continue-on-error: true - - name: Display Durable Task Sidecar Logs - run: docker logs -f durabletask-sidecar + - name: Kill Durable Task Sidecar + run: docker kill durabletask-sidecar - name: Archive test report uses: actions/upload-artifact@v4 diff --git a/client/src/test/java/com/microsoft/durabletask/ErrorHandlingIntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/ErrorHandlingIntegrationTests.java index 24605a57..6717ba39 100644 --- a/client/src/test/java/com/microsoft/durabletask/ErrorHandlingIntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/ErrorHandlingIntegrationTests.java @@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.BeforeEach; import static org.junit.jupiter.api.Assertions.*; /** From 78644d9c24be706f70e861e10abbd82b3ab94140 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 09:18:12 -0700 Subject: [PATCH 13/18] Add upload step for Durable Task Sidecar logs in build validation workflow --- .github/workflows/build-validation.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index 746acb47..285e2f63 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -61,6 +61,12 @@ jobs: - name: Kill Durable Task Sidecar run: docker kill durabletask-sidecar + + - name: Upload Durable Task Sidecar Logs + uses: actions/upload-artifact@v4 + with: + name: Durable Task Sidecar Logs + path: durabletask-sidecar.log - name: Archive test report uses: actions/upload-artifact@v4 From a15f678cfccdf5b362cba63256d62dd69f77d229 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 09:33:37 -0700 Subject: [PATCH 14/18] Enhance integration tests by adding retry functionality. Updated test annotations to use @RetryingTest and @RetryingParameterizedTest, and included TestRetryExtension for improved error handling during test execution. --- .../ErrorHandlingIntegrationTests.java | 16 +- .../durabletask/IntegrationTests.java | 74 +- .../RetryingParameterizedTest.java | 23 + .../microsoft/durabletask/RetryingTest.java | 23 + .../durabletask/TestRetryExtension.java | 52 ++ .../protos/orchestrator_service.proto | 730 ------------------ 6 files changed, 145 insertions(+), 773 deletions(-) create mode 100644 client/src/test/java/com/microsoft/durabletask/RetryingParameterizedTest.java create mode 100644 client/src/test/java/com/microsoft/durabletask/RetryingTest.java create mode 100644 client/src/test/java/com/microsoft/durabletask/TestRetryExtension.java delete mode 100644 internal/durabletask-protobuf/protos/orchestrator_service.proto diff --git a/client/src/test/java/com/microsoft/durabletask/ErrorHandlingIntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/ErrorHandlingIntegrationTests.java index 6717ba39..e2d4afa3 100644 --- a/client/src/test/java/com/microsoft/durabletask/ErrorHandlingIntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/ErrorHandlingIntegrationTests.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.time.Duration; import java.util.concurrent.TimeoutException; @@ -24,6 +25,7 @@ * client operations and sends invocation instructions to the DurableTaskWorker). */ @Tag("integration") +@ExtendWith(TestRetryExtension.class) public class ErrorHandlingIntegrationTests extends IntegrationTestBase { @BeforeEach private void startUp() { @@ -31,7 +33,7 @@ private void startUp() { client.deleteTaskHub(); } - @Test + @RetryingTest void orchestratorException() throws TimeoutException { final String orchestratorName = "OrchestratorWithException"; final String errorMessage = "Kah-BOOOOOM!!!"; @@ -57,7 +59,7 @@ void orchestratorException() throws TimeoutException { } } - @ParameterizedTest + @RetryingParameterizedTest @ValueSource(booleans = {true, false}) void activityException(boolean handleException) throws TimeoutException { final String orchestratorName = "OrchestratorWithActivityException"; @@ -109,7 +111,7 @@ void activityException(boolean handleException) throws TimeoutException { } } - @ParameterizedTest + @RetryingParameterizedTest @ValueSource(ints = {1, 2, 10}) public void retryActivityFailures(int maxNumberOfAttempts) throws TimeoutException { // There is one task for each activity call and one task between each retry @@ -123,7 +125,7 @@ public void retryActivityFailures(int maxNumberOfAttempts) throws TimeoutExcepti }); } - @ParameterizedTest + @RetryingParameterizedTest @ValueSource(ints = {1, 2, 10}) public void retryActivityFailuresWithCustomLogic(int maxNumberOfAttempts) throws TimeoutException { // This gets incremented every time the retry handler is invoked @@ -140,7 +142,7 @@ public void retryActivityFailuresWithCustomLogic(int maxNumberOfAttempts) throws assertEquals(maxNumberOfAttempts, retryHandlerCalls.get()); } - @ParameterizedTest + @RetryingParameterizedTest @ValueSource(booleans = {true, false}) void subOrchestrationException(boolean handleException) throws TimeoutException { final String orchestratorName = "OrchestrationWithBustedSubOrchestrator"; @@ -190,7 +192,7 @@ void subOrchestrationException(boolean handleException) throws TimeoutException } } - @ParameterizedTest + @RetryingParameterizedTest @ValueSource(ints = {1, 2, 10}) public void retrySubOrchestratorFailures(int maxNumberOfAttempts) throws TimeoutException { // There is one task for each sub-orchestrator call and one task between each retry @@ -205,7 +207,7 @@ public void retrySubOrchestratorFailures(int maxNumberOfAttempts) throws Timeout }); } - @ParameterizedTest + @RetryingParameterizedTest @ValueSource(ints = {1, 2, 10}) public void retrySubOrchestrationFailuresWithCustomLogic(int maxNumberOfAttempts) throws TimeoutException { // This gets incremented every time the retry handler is invoked diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index 5e0804ad..5e9ac5d9 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -18,6 +18,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -32,6 +33,7 @@ * sends invocation instructions to the DurableTaskWorker). */ @Tag("integration") +@ExtendWith(TestRetryExtension.class) public class IntegrationTests extends IntegrationTestBase { static final Duration defaultTimeout = Duration.ofSeconds(100); // All tests that create a server should save it to this variable for proper shutdown @@ -51,7 +53,7 @@ private void shutdown() throws InterruptedException { } } - @Test + @RetryingTest void emptyOrchestration() throws TimeoutException { final String orchestratorName = "EmptyOrchestration"; final String input = "Hello " + Instant.now(); @@ -74,7 +76,7 @@ void emptyOrchestration() throws TimeoutException { } } - @Test + @RetryingTest void singleTimer() throws IOException, TimeoutException { final String orchestratorName = "SingleTimer"; final Duration delay = Duration.ofSeconds(3); @@ -97,7 +99,7 @@ void singleTimer() throws IOException, TimeoutException { } } - @Test + @RetryingTest void longTimer() throws TimeoutException { final String orchestratorName = "LongTimer"; final Duration delay = Duration.ofSeconds(7); @@ -141,7 +143,7 @@ void longTimer() throws TimeoutException { } } - @Test + @RetryingTest void longTimerNonblocking() throws TimeoutException { final String orchestratorName = "ActivityAnyOf"; final String externalEventActivityName = "externalEvent"; @@ -179,7 +181,7 @@ void longTimerNonblocking() throws TimeoutException { } } - @Test + @RetryingTest void longTimerNonblockingNoExternal() throws TimeoutException { final String orchestratorName = "ActivityAnyOf"; final String externalEventActivityName = "externalEvent"; @@ -216,7 +218,7 @@ void longTimerNonblockingNoExternal() throws TimeoutException { } - @Test + @RetryingTest void longTimeStampTimer() throws TimeoutException { final String orchestratorName = "LongTimeStampTimer"; final Duration delay = Duration.ofSeconds(7); @@ -250,7 +252,7 @@ void longTimeStampTimer() throws TimeoutException { } } - @Test + @RetryingTest void singleTimeStampTimer() throws IOException, TimeoutException { final String orchestratorName = "SingleTimeStampTimer"; final Duration delay = Duration.ofSeconds(3); @@ -274,7 +276,7 @@ void singleTimeStampTimer() throws IOException, TimeoutException { } } - @Test + @RetryingTest void isReplaying() throws IOException, InterruptedException, TimeoutException { final String orchestratorName = "SingleTimer"; DurableTaskGrpcWorker worker = this.createWorkerBuilder() @@ -310,7 +312,7 @@ void isReplaying() throws IOException, InterruptedException, TimeoutException { } } - @Test + @RetryingTest void singleActivity() throws IOException, InterruptedException, TimeoutException { final String orchestratorName = "SingleActivity"; final String activityName = "Echo"; @@ -342,7 +344,7 @@ void singleActivity() throws IOException, InterruptedException, TimeoutException } } - @Test + @RetryingTest void currentDateTimeUtc() throws IOException, TimeoutException { final String orchestratorName = "CurrentDateTimeUtc"; final String echoActivityName = "Echo"; @@ -381,7 +383,7 @@ void currentDateTimeUtc() throws IOException, TimeoutException { } } - @Test + @RetryingTest void activityChain() throws IOException, TimeoutException { final String orchestratorName = "ActivityChain"; final String plusOneActivityName = "PlusOne"; @@ -408,7 +410,7 @@ void activityChain() throws IOException, TimeoutException { } } - @Test + @RetryingTest void subOrchestration() throws TimeoutException { final String orchestratorName = "SubOrchestration"; DurableTaskGrpcWorker worker = this.createWorkerBuilder().addOrchestrator(orchestratorName, ctx -> { @@ -429,7 +431,7 @@ void subOrchestration() throws TimeoutException { } } - @Test + @RetryingTest void continueAsNew() throws TimeoutException { final String orchestratorName = "continueAsNew"; final Duration delay = Duration.ofSeconds(0); @@ -452,7 +454,7 @@ void continueAsNew() throws TimeoutException { } } - @Test + @RetryingTest void continueAsNewWithExternalEvents() throws TimeoutException, InterruptedException{ final String orchestratorName = "continueAsNewWithExternalEvents"; final String eventName = "MyEvent"; @@ -483,7 +485,7 @@ void continueAsNewWithExternalEvents() throws TimeoutException, InterruptedExcep } } - @Test + @RetryingTest void termination() throws TimeoutException { final String orchestratorName = "Termination"; final Duration delay = Duration.ofSeconds(3); @@ -505,7 +507,7 @@ void termination() throws TimeoutException { } } - @ParameterizedTest + @RetryingParameterizedTest @ValueSource(booleans = {true, false}) void restartOrchestrationWithNewInstanceId(boolean restartWithNewInstanceId) throws TimeoutException { final String orchestratorName = "restart"; @@ -532,7 +534,7 @@ void restartOrchestrationWithNewInstanceId(boolean restartWithNewInstanceId) thr } } - @Test + @RetryingTest void restartOrchestrationThrowsException() { final String orchestratorName = "restart"; final Duration delay = Duration.ofSeconds(3); @@ -594,7 +596,7 @@ void suspendResumeOrchestration() throws TimeoutException, InterruptedException } } - @Test + @RetryingTest void terminateSuspendOrchestration() throws TimeoutException, InterruptedException { final String orchestratorName = "suspendResume"; final String eventName = "MyEvent"; @@ -620,7 +622,7 @@ void terminateSuspendOrchestration() throws TimeoutException, InterruptedExcepti } } - @Test + @RetryingTest void activityFanOut() throws IOException, TimeoutException { final String orchestratorName = "ActivityFanOut"; final String activityName = "ToString"; @@ -662,7 +664,7 @@ void activityFanOut() throws IOException, TimeoutException { } } - @Test + @RetryingTest void externalEvents() throws IOException, TimeoutException { final String orchestratorName = "ExternalEvents"; final String eventName = "MyEvent"; @@ -701,7 +703,7 @@ void externalEvents() throws IOException, TimeoutException { } } - @ParameterizedTest + @RetryingParameterizedTest @ValueSource(booleans = {true, false}) void externalEventsWithTimeouts(boolean raiseEvent) throws IOException, TimeoutException { final String orchestratorName = "ExternalEventsWithTimeouts"; @@ -740,7 +742,7 @@ void externalEventsWithTimeouts(boolean raiseEvent) throws IOException, TimeoutE } } - @Test + @RetryingTest void setCustomStatus() throws TimeoutException { final String orchestratorName = "SetCustomStatus"; @@ -773,7 +775,7 @@ void setCustomStatus() throws TimeoutException { } } - @Test + @RetryingTest void clearCustomStatus() throws TimeoutException { final String orchestratorName = "ClearCustomStatus"; @@ -803,7 +805,7 @@ void clearCustomStatus() throws TimeoutException { } // due to clock drift, client/worker and sidecar time are not exactly synchronized, this test needs to accommodate for client vs backend timestamps difference - @Test + @RetryingTest void multiInstanceQuery() throws TimeoutException{ final String plusOne = "plusOne"; final String waitForEvent = "waitForEvent"; @@ -984,7 +986,7 @@ void multiInstanceQuery() throws TimeoutException{ } } - @Test + @RetryingTest void purgeInstanceId() throws TimeoutException { final String orchestratorName = "PurgeInstance"; final String plusOneActivityName = "PlusOne"; @@ -1015,7 +1017,7 @@ void purgeInstanceId() throws TimeoutException { } } - @Test + @RetryingTest void purgeInstanceFilter() throws TimeoutException { final String orchestratorName = "PurgeInstance"; final String plusOne = "PlusOne"; @@ -1112,7 +1114,7 @@ void purgeInstanceFilter() throws TimeoutException { } } - @Test + @RetryingTest void purgeInstanceFilterTimeout() throws TimeoutException { final String orchestratorName = "PurgeInstance"; final String plusOne = "PlusOne"; @@ -1169,7 +1171,7 @@ void purgeInstanceFilterTimeout() throws TimeoutException { } } - @Test() + @RetryingTest void waitForInstanceStartThrowsException() { final String orchestratorName = "orchestratorName"; @@ -1191,7 +1193,7 @@ void waitForInstanceStartThrowsException() { } } - @Test() + @RetryingTest void waitForInstanceCompletionThrowsException() { final String orchestratorName = "orchestratorName"; final String plusOneActivityName = "PlusOne"; @@ -1221,7 +1223,7 @@ void waitForInstanceCompletionThrowsException() { } } - @Test + @RetryingTest void activityFanOutWithException() throws TimeoutException { final String orchestratorName = "ActivityFanOut"; final String activityName = "Divide"; @@ -1278,7 +1280,7 @@ private static String getExceptionMessage(String taskName, int expectedTaskId, S expectedExceptionMessage); } - @Test + @RetryingTest void thenApply() throws IOException, InterruptedException, TimeoutException { final String orchestratorName = "thenApplyActivity"; final String activityName = "Echo"; @@ -1311,7 +1313,7 @@ void thenApply() throws IOException, InterruptedException, TimeoutException { } } - @Test + @RetryingTest void externalEventThenAccept() throws InterruptedException, TimeoutException { final String orchestratorName = "continueAsNewWithExternalEvents"; final String eventName = "MyEvent"; @@ -1345,7 +1347,7 @@ void externalEventThenAccept() throws InterruptedException, TimeoutException { } } - @Test + @RetryingTest void activityAllOf() throws IOException, TimeoutException { final String orchestratorName = "ActivityAllOf"; final String activityName = "ToString"; @@ -1404,7 +1406,7 @@ void activityAllOf() throws IOException, TimeoutException { } } - @Test + @RetryingTest void activityAllOfException() throws IOException, TimeoutException { final String orchestratorName = "ActivityAllOf"; final String activityName = "ToString"; @@ -1466,7 +1468,7 @@ void activityAllOfException() throws IOException, TimeoutException { } } - @Test + @RetryingTest void activityAnyOf() throws IOException, TimeoutException { final String orchestratorName = "ActivityAnyOf"; final String activityName = "ToString"; @@ -1515,7 +1517,7 @@ void activityAnyOf() throws IOException, TimeoutException { } } - @Test + @RetryingTest public void newUUIDTest() { String orchestratorName = "test-new-uuid"; String echoActivityName = "Echo"; diff --git a/client/src/test/java/com/microsoft/durabletask/RetryingParameterizedTest.java b/client/src/test/java/com/microsoft/durabletask/RetryingParameterizedTest.java new file mode 100644 index 00000000..aeb72ded --- /dev/null +++ b/client/src/test/java/com/microsoft/durabletask/RetryingParameterizedTest.java @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.durabletask; + +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation for parameterized test methods that should be retried on failure. + * By default, tests will be retried up to 3 times. + */ +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@ParameterizedTest +@ExtendWith(TestRetryExtension.class) +public @interface RetryingParameterizedTest { +} \ No newline at end of file diff --git a/client/src/test/java/com/microsoft/durabletask/RetryingTest.java b/client/src/test/java/com/microsoft/durabletask/RetryingTest.java new file mode 100644 index 00000000..da6a3ae5 --- /dev/null +++ b/client/src/test/java/com/microsoft/durabletask/RetryingTest.java @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.durabletask; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation for test methods that should be retried on failure. + * By default, tests will be retried up to 3 times. + */ +@Target({ElementType.TYPE, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Test +@ExtendWith(TestRetryExtension.class) +public @interface RetryingTest { +} \ No newline at end of file diff --git a/client/src/test/java/com/microsoft/durabletask/TestRetryExtension.java b/client/src/test/java/com/microsoft/durabletask/TestRetryExtension.java new file mode 100644 index 00000000..405e0d3f --- /dev/null +++ b/client/src/test/java/com/microsoft/durabletask/TestRetryExtension.java @@ -0,0 +1,52 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.durabletask; + +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.TestExecutionExceptionHandler; +import java.util.logging.Logger; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * JUnit Jupiter extension that provides test retry capability. + * Tests will be retried up to 3 times before failing. + */ +public class TestRetryExtension implements TestExecutionExceptionHandler { + private static final Logger LOGGER = Logger.getLogger(TestRetryExtension.class.getName()); + private static final int MAX_RETRY_COUNT = 3; + private final ConcurrentHashMap retryCounters = new ConcurrentHashMap<>(); + + @Override + public void handleTestExecutionException(ExtensionContext context, Throwable throwable) throws Throwable { + String testMethod = getTestMethodName(context); + int retryCount = retryCounters.getOrDefault(testMethod, 0); + + if (retryCount < MAX_RETRY_COUNT - 1) { // -1 because the first attempt doesn't count as a retry + retryCounters.put(testMethod, retryCount + 1); + LOGGER.warning(String.format("Test '%s' failed (attempt %d). Retrying...", testMethod, retryCount + 1)); + // Return without rethrowing to allow retry + return; + } + + // Log final failure and rethrow the exception + LOGGER.severe(String.format("Test '%s' failed after %d retries", testMethod, MAX_RETRY_COUNT - 1)); + throw throwable; + } + + private String getTestMethodName(ExtensionContext context) { + String methodName = context.getRequiredTestMethod().getName(); + + // Include parameters for parameterized tests to ensure each parameter combination is retried separately + String params = context.getDisplayName(); + // If the display name contains parameters (e.g. "testMethod(param1, param2)"), extract them + if (params.contains("(") && params.endsWith(")")) { + params = params.substring(params.indexOf('(')); + } else { + params = ""; + } + + return context.getRequiredTestClass().getName() + "." + methodName + params; + } +} \ No newline at end of file diff --git a/internal/durabletask-protobuf/protos/orchestrator_service.proto b/internal/durabletask-protobuf/protos/orchestrator_service.proto deleted file mode 100644 index 64e75281..00000000 --- a/internal/durabletask-protobuf/protos/orchestrator_service.proto +++ /dev/null @@ -1,730 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -syntax = "proto3"; - -option csharp_namespace = "Microsoft.DurableTask.Protobuf"; -option java_package = "com.microsoft.durabletask.implementation.protobuf"; -option go_package = "/internal/protos"; - -import "google/protobuf/timestamp.proto"; -import "google/protobuf/duration.proto"; -import "google/protobuf/wrappers.proto"; -import "google/protobuf/empty.proto"; - -message OrchestrationInstance { - string instanceId = 1; - google.protobuf.StringValue executionId = 2; -} - -message ActivityRequest { - string name = 1; - google.protobuf.StringValue version = 2; - google.protobuf.StringValue input = 3; - OrchestrationInstance orchestrationInstance = 4; - int32 taskId = 5; - TraceContext parentTraceContext = 6; -} - -message ActivityResponse { - string instanceId = 1; - int32 taskId = 2; - google.protobuf.StringValue result = 3; - TaskFailureDetails failureDetails = 4; - string completionToken = 5; -} - -message TaskFailureDetails { - string errorType = 1; - string errorMessage = 2; - google.protobuf.StringValue stackTrace = 3; - TaskFailureDetails innerFailure = 4; - bool isNonRetriable = 5; -} - -enum OrchestrationStatus { - ORCHESTRATION_STATUS_RUNNING = 0; - ORCHESTRATION_STATUS_COMPLETED = 1; - ORCHESTRATION_STATUS_CONTINUED_AS_NEW = 2; - ORCHESTRATION_STATUS_FAILED = 3; - ORCHESTRATION_STATUS_CANCELED = 4; - ORCHESTRATION_STATUS_TERMINATED = 5; - ORCHESTRATION_STATUS_PENDING = 6; - ORCHESTRATION_STATUS_SUSPENDED = 7; -} - -message ParentInstanceInfo { - int32 taskScheduledId = 1; - google.protobuf.StringValue name = 2; - google.protobuf.StringValue version = 3; - OrchestrationInstance orchestrationInstance = 4; -} - -message TraceContext { - string traceParent = 1; - string spanID = 2 [deprecated=true]; - google.protobuf.StringValue traceState = 3; -} - -message ExecutionStartedEvent { - string name = 1; - google.protobuf.StringValue version = 2; - google.protobuf.StringValue input = 3; - OrchestrationInstance orchestrationInstance = 4; - ParentInstanceInfo parentInstance = 5; - google.protobuf.Timestamp scheduledStartTimestamp = 6; - TraceContext parentTraceContext = 7; - google.protobuf.StringValue orchestrationSpanID = 8; - map tags = 9; -} - -message ExecutionCompletedEvent { - OrchestrationStatus orchestrationStatus = 1; - google.protobuf.StringValue result = 2; - TaskFailureDetails failureDetails = 3; -} - -message ExecutionTerminatedEvent { - google.protobuf.StringValue input = 1; - bool recurse = 2; -} - -message TaskScheduledEvent { - string name = 1; - google.protobuf.StringValue version = 2; - google.protobuf.StringValue input = 3; - TraceContext parentTraceContext = 4; -} - -message TaskCompletedEvent { - int32 taskScheduledId = 1; - google.protobuf.StringValue result = 2; -} - -message TaskFailedEvent { - int32 taskScheduledId = 1; - TaskFailureDetails failureDetails = 2; -} - -message SubOrchestrationInstanceCreatedEvent { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue version = 3; - google.protobuf.StringValue input = 4; - TraceContext parentTraceContext = 5; -} - -message SubOrchestrationInstanceCompletedEvent { - int32 taskScheduledId = 1; - google.protobuf.StringValue result = 2; -} - -message SubOrchestrationInstanceFailedEvent { - int32 taskScheduledId = 1; - TaskFailureDetails failureDetails = 2; -} - -message TimerCreatedEvent { - google.protobuf.Timestamp fireAt = 1; -} - -message TimerFiredEvent { - google.protobuf.Timestamp fireAt = 1; - int32 timerId = 2; -} - -message OrchestratorStartedEvent { - // No payload data -} - -message OrchestratorCompletedEvent { - // No payload data -} - -message EventSentEvent { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue input = 3; -} - -message EventRaisedEvent { - string name = 1; - google.protobuf.StringValue input = 2; -} - -message GenericEvent { - google.protobuf.StringValue data = 1; -} - -message HistoryStateEvent { - OrchestrationState orchestrationState = 1; -} - -message ContinueAsNewEvent { - google.protobuf.StringValue input = 1; -} - -message ExecutionSuspendedEvent { - google.protobuf.StringValue input = 1; -} - -message ExecutionResumedEvent { - google.protobuf.StringValue input = 1; -} - -message EntityOperationSignaledEvent { - string requestId = 1; - string operation = 2; - google.protobuf.Timestamp scheduledTime = 3; - google.protobuf.StringValue input = 4; - google.protobuf.StringValue targetInstanceId = 5; // used only within histories, null in messages -} - -message EntityOperationCalledEvent { - string requestId = 1; - string operation = 2; - google.protobuf.Timestamp scheduledTime = 3; - google.protobuf.StringValue input = 4; - google.protobuf.StringValue parentInstanceId = 5; // used only within messages, null in histories - google.protobuf.StringValue parentExecutionId = 6; // used only within messages, null in histories - google.protobuf.StringValue targetInstanceId = 7; // used only within histories, null in messages -} - -message EntityLockRequestedEvent { - string criticalSectionId = 1; - repeated string lockSet = 2; - int32 position = 3; - google.protobuf.StringValue parentInstanceId = 4; // used only within messages, null in histories -} - -message EntityOperationCompletedEvent { - string requestId = 1; - google.protobuf.StringValue output = 2; -} - -message EntityOperationFailedEvent { - string requestId = 1; - TaskFailureDetails failureDetails = 2; -} - -message EntityUnlockSentEvent { - string criticalSectionId = 1; - google.protobuf.StringValue parentInstanceId = 2; // used only within messages, null in histories - google.protobuf.StringValue targetInstanceId = 3; // used only within histories, null in messages -} - -message EntityLockGrantedEvent { - string criticalSectionId = 1; -} - -message HistoryEvent { - int32 eventId = 1; - google.protobuf.Timestamp timestamp = 2; - oneof eventType { - ExecutionStartedEvent executionStarted = 3; - ExecutionCompletedEvent executionCompleted = 4; - ExecutionTerminatedEvent executionTerminated = 5; - TaskScheduledEvent taskScheduled = 6; - TaskCompletedEvent taskCompleted = 7; - TaskFailedEvent taskFailed = 8; - SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreated = 9; - SubOrchestrationInstanceCompletedEvent subOrchestrationInstanceCompleted = 10; - SubOrchestrationInstanceFailedEvent subOrchestrationInstanceFailed = 11; - TimerCreatedEvent timerCreated = 12; - TimerFiredEvent timerFired = 13; - OrchestratorStartedEvent orchestratorStarted = 14; - OrchestratorCompletedEvent orchestratorCompleted = 15; - EventSentEvent eventSent = 16; - EventRaisedEvent eventRaised = 17; - GenericEvent genericEvent = 18; - HistoryStateEvent historyState = 19; - ContinueAsNewEvent continueAsNew = 20; - ExecutionSuspendedEvent executionSuspended = 21; - ExecutionResumedEvent executionResumed = 22; - EntityOperationSignaledEvent entityOperationSignaled = 23; - EntityOperationCalledEvent entityOperationCalled = 24; - EntityOperationCompletedEvent entityOperationCompleted = 25; - EntityOperationFailedEvent entityOperationFailed = 26; - EntityLockRequestedEvent entityLockRequested = 27; - EntityLockGrantedEvent entityLockGranted = 28; - EntityUnlockSentEvent entityUnlockSent = 29; - } -} - -message ScheduleTaskAction { - string name = 1; - google.protobuf.StringValue version = 2; - google.protobuf.StringValue input = 3; -} - -message CreateSubOrchestrationAction { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue version = 3; - google.protobuf.StringValue input = 4; -} - -message CreateTimerAction { - google.protobuf.Timestamp fireAt = 1; -} - -message SendEventAction { - OrchestrationInstance instance = 1; - string name = 2; - google.protobuf.StringValue data = 3; -} - -message CompleteOrchestrationAction { - OrchestrationStatus orchestrationStatus = 1; - google.protobuf.StringValue result = 2; - google.protobuf.StringValue details = 3; - google.protobuf.StringValue newVersion = 4; - repeated HistoryEvent carryoverEvents = 5; - TaskFailureDetails failureDetails = 6; -} - -message TerminateOrchestrationAction { - string instanceId = 1; - google.protobuf.StringValue reason = 2; - bool recurse = 3; -} - -message SendEntityMessageAction { - oneof EntityMessageType { - EntityOperationSignaledEvent entityOperationSignaled = 1; - EntityOperationCalledEvent entityOperationCalled = 2; - EntityLockRequestedEvent entityLockRequested = 3; - EntityUnlockSentEvent entityUnlockSent = 4; - } -} - -message OrchestratorAction { - int32 id = 1; - oneof orchestratorActionType { - ScheduleTaskAction scheduleTask = 2; - CreateSubOrchestrationAction createSubOrchestration = 3; - CreateTimerAction createTimer = 4; - SendEventAction sendEvent = 5; - CompleteOrchestrationAction completeOrchestration = 6; - TerminateOrchestrationAction terminateOrchestration = 7; - SendEntityMessageAction sendEntityMessage = 8; - } -} - -message OrchestratorRequest { - string instanceId = 1; - google.protobuf.StringValue executionId = 2; - repeated HistoryEvent pastEvents = 3; - repeated HistoryEvent newEvents = 4; - OrchestratorEntityParameters entityParameters = 5; - bool requiresHistoryStreaming = 6; -} - -message OrchestratorResponse { - string instanceId = 1; - repeated OrchestratorAction actions = 2; - google.protobuf.StringValue customStatus = 3; - string completionToken = 4; - - // The number of work item events that were processed by the orchestrator. - // This field is optional. If not set, the service should assume that the orchestrator processed all events. - google.protobuf.Int32Value numEventsProcessed = 5; -} - -message CreateInstanceRequest { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue version = 3; - google.protobuf.StringValue input = 4; - google.protobuf.Timestamp scheduledStartTimestamp = 5; - OrchestrationIdReusePolicy orchestrationIdReusePolicy = 6; - google.protobuf.StringValue executionId = 7; - map tags = 8; - TraceContext parentTraceContext = 9; -} - -message OrchestrationIdReusePolicy { - repeated OrchestrationStatus replaceableStatus = 1; - reserved 2; -} - -message CreateInstanceResponse { - string instanceId = 1; -} - -message GetInstanceRequest { - string instanceId = 1; - bool getInputsAndOutputs = 2; -} - -message GetInstanceResponse { - bool exists = 1; - OrchestrationState orchestrationState = 2; -} - -message RewindInstanceRequest { - string instanceId = 1; - google.protobuf.StringValue reason = 2; -} - -message RewindInstanceResponse { - // Empty for now. Using explicit type incase we want to add content later. -} - -message OrchestrationState { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue version = 3; - OrchestrationStatus orchestrationStatus = 4; - google.protobuf.Timestamp scheduledStartTimestamp = 5; - google.protobuf.Timestamp createdTimestamp = 6; - google.protobuf.Timestamp lastUpdatedTimestamp = 7; - google.protobuf.StringValue input = 8; - google.protobuf.StringValue output = 9; - google.protobuf.StringValue customStatus = 10; - TaskFailureDetails failureDetails = 11; - google.protobuf.StringValue executionId = 12; - google.protobuf.Timestamp completedTimestamp = 13; - google.protobuf.StringValue parentInstanceId = 14; - map tags = 15; -} - -message RaiseEventRequest { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue input = 3; -} - -message RaiseEventResponse { - // No payload -} - -message TerminateRequest { - string instanceId = 1; - google.protobuf.StringValue output = 2; - bool recursive = 3; -} - -message TerminateResponse { - // No payload -} - -message SuspendRequest { - string instanceId = 1; - google.protobuf.StringValue reason = 2; -} - -message SuspendResponse { - // No payload -} - -message ResumeRequest { - string instanceId = 1; - google.protobuf.StringValue reason = 2; -} - -message ResumeResponse { - // No payload -} - -message QueryInstancesRequest { - InstanceQuery query = 1; -} - -message InstanceQuery{ - repeated OrchestrationStatus runtimeStatus = 1; - google.protobuf.Timestamp createdTimeFrom = 2; - google.protobuf.Timestamp createdTimeTo = 3; - repeated google.protobuf.StringValue taskHubNames = 4; - int32 maxInstanceCount = 5; - google.protobuf.StringValue continuationToken = 6; - google.protobuf.StringValue instanceIdPrefix = 7; - bool fetchInputsAndOutputs = 8; -} - -message QueryInstancesResponse { - repeated OrchestrationState orchestrationState = 1; - google.protobuf.StringValue continuationToken = 2; -} - -message PurgeInstancesRequest { - oneof request { - string instanceId = 1; - PurgeInstanceFilter purgeInstanceFilter = 2; - } - bool recursive = 3; -} - -message PurgeInstanceFilter { - google.protobuf.Timestamp createdTimeFrom = 1; - google.protobuf.Timestamp createdTimeTo = 2; - repeated OrchestrationStatus runtimeStatus = 3; -} - -message PurgeInstancesResponse { - int32 deletedInstanceCount = 1; -} - -message CreateTaskHubRequest { - bool recreateIfExists = 1; -} - -message CreateTaskHubResponse { - //no playload -} - -message DeleteTaskHubRequest { - //no playload -} - -message DeleteTaskHubResponse { - //no playload -} - -message SignalEntityRequest { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue input = 3; - string requestId = 4; - google.protobuf.Timestamp scheduledTime = 5; -} - -message SignalEntityResponse { - // no payload -} - -message GetEntityRequest { - string instanceId = 1; - bool includeState = 2; -} - -message GetEntityResponse { - bool exists = 1; - EntityMetadata entity = 2; -} - -message EntityQuery { - google.protobuf.StringValue instanceIdStartsWith = 1; - google.protobuf.Timestamp lastModifiedFrom = 2; - google.protobuf.Timestamp lastModifiedTo = 3; - bool includeState = 4; - bool includeTransient = 5; - google.protobuf.Int32Value pageSize = 6; - google.protobuf.StringValue continuationToken = 7; -} - -message QueryEntitiesRequest { - EntityQuery query = 1; -} - -message QueryEntitiesResponse { - repeated EntityMetadata entities = 1; - google.protobuf.StringValue continuationToken = 2; -} - -message EntityMetadata { - string instanceId = 1; - google.protobuf.Timestamp lastModifiedTime = 2; - int32 backlogQueueSize = 3; - google.protobuf.StringValue lockedBy = 4; - google.protobuf.StringValue serializedState = 5; -} - -message CleanEntityStorageRequest { - google.protobuf.StringValue continuationToken = 1; - bool removeEmptyEntities = 2; - bool releaseOrphanedLocks = 3; -} - -message CleanEntityStorageResponse { - google.protobuf.StringValue continuationToken = 1; - int32 emptyEntitiesRemoved = 2; - int32 orphanedLocksReleased = 3; -} - -message OrchestratorEntityParameters { - google.protobuf.Duration entityMessageReorderWindow = 1; -} - -message EntityBatchRequest { - string instanceId = 1; - google.protobuf.StringValue entityState = 2; - repeated OperationRequest operations = 3; -} - -message EntityBatchResult { - repeated OperationResult results = 1; - repeated OperationAction actions = 2; - google.protobuf.StringValue entityState = 3; - TaskFailureDetails failureDetails = 4; - string completionToken = 5; - repeated OperationInfo operationInfos = 6; // used only with DTS -} - -message EntityRequest { - string instanceId = 1; - string executionId = 2; - google.protobuf.StringValue entityState = 3; // null if entity does not exist - repeated HistoryEvent operationRequests = 4; -} - -message OperationRequest { - string operation = 1; - string requestId = 2; - google.protobuf.StringValue input = 3; -} - -message OperationResult { - oneof resultType { - OperationResultSuccess success = 1; - OperationResultFailure failure = 2; - } -} - -message OperationInfo { - string requestId = 1; - OrchestrationInstance responseDestination = 2; // null for signals -} - -message OperationResultSuccess { - google.protobuf.StringValue result = 1; -} - -message OperationResultFailure { - TaskFailureDetails failureDetails = 1; -} - -message OperationAction { - int32 id = 1; - oneof operationActionType { - SendSignalAction sendSignal = 2; - StartNewOrchestrationAction startNewOrchestration = 3; - } -} - -message SendSignalAction { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue input = 3; - google.protobuf.Timestamp scheduledTime = 4; -} - -message StartNewOrchestrationAction { - string instanceId = 1; - string name = 2; - google.protobuf.StringValue version = 3; - google.protobuf.StringValue input = 4; - google.protobuf.Timestamp scheduledTime = 5; -} - -service TaskHubSidecarService { - // Sends a hello request to the sidecar service. - rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty); - - // Starts a new orchestration instance. - rpc StartInstance(CreateInstanceRequest) returns (CreateInstanceResponse); - - // Gets the status of an existing orchestration instance. - rpc GetInstance(GetInstanceRequest) returns (GetInstanceResponse); - - // Rewinds an orchestration instance to last known good state and replays from there. - rpc RewindInstance(RewindInstanceRequest) returns (RewindInstanceResponse); - - // Waits for an orchestration instance to reach a running or completion state. - rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse); - - // Waits for an orchestration instance to reach a completion state (completed, failed, terminated, etc.). - rpc WaitForInstanceCompletion(GetInstanceRequest) returns (GetInstanceResponse); - - // Raises an event to a running orchestration instance. - rpc RaiseEvent(RaiseEventRequest) returns (RaiseEventResponse); - - // Terminates a running orchestration instance. - rpc TerminateInstance(TerminateRequest) returns (TerminateResponse); - - // Suspends a running orchestration instance. - rpc SuspendInstance(SuspendRequest) returns (SuspendResponse); - - // Resumes a suspended orchestration instance. - rpc ResumeInstance(ResumeRequest) returns (ResumeResponse); - - // rpc DeleteInstance(DeleteInstanceRequest) returns (DeleteInstanceResponse); - - rpc QueryInstances(QueryInstancesRequest) returns (QueryInstancesResponse); - rpc PurgeInstances(PurgeInstancesRequest) returns (PurgeInstancesResponse); - - rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem); - rpc CompleteActivityTask(ActivityResponse) returns (CompleteTaskResponse); - rpc CompleteOrchestratorTask(OrchestratorResponse) returns (CompleteTaskResponse); - rpc CompleteEntityTask(EntityBatchResult) returns (CompleteTaskResponse); - - // Gets the history of an orchestration instance as a stream of events. - rpc StreamInstanceHistory(StreamInstanceHistoryRequest) returns (stream HistoryChunk); - - // Deletes and Creates the necessary resources for the orchestration service and the instance store - rpc CreateTaskHub(CreateTaskHubRequest) returns (CreateTaskHubResponse); - - // Deletes the resources for the orchestration service and optionally the instance store - rpc DeleteTaskHub(DeleteTaskHubRequest) returns (DeleteTaskHubResponse); - - // sends a signal to an entity - rpc SignalEntity(SignalEntityRequest) returns (SignalEntityResponse); - - // get information about a specific entity - rpc GetEntity(GetEntityRequest) returns (GetEntityResponse); - - // query entities - rpc QueryEntities(QueryEntitiesRequest) returns (QueryEntitiesResponse); - - // clean entity storage - rpc CleanEntityStorage(CleanEntityStorageRequest) returns (CleanEntityStorageResponse); -} - -message GetWorkItemsRequest { - int32 maxConcurrentOrchestrationWorkItems = 1; - int32 maxConcurrentActivityWorkItems = 2; - int32 maxConcurrentEntityWorkItems = 3; - - repeated WorkerCapability capabilities = 10; -} - -enum WorkerCapability { - WORKER_CAPABILITY_UNSPECIFIED = 0; - - // Indicates that the worker is capable of streaming instance history as a more optimized - // alternative to receiving the full history embedded in the orchestrator work-item. - // When set, the service may return work items without any history events as an optimization. - // It is strongly recommended that all SDKs support this capability. - WORKER_CAPABILITY_HISTORY_STREAMING = 1; -} - -message WorkItem { - oneof request { - OrchestratorRequest orchestratorRequest = 1; - ActivityRequest activityRequest = 2; - EntityBatchRequest entityRequest = 3; // (older) used by orchestration services implementations - HealthPing healthPing = 4; - EntityRequest entityRequestV2 = 5; // (newer) used by backend service implementations - } - string completionToken = 10; -} - -message CompleteTaskResponse { - // No payload -} - -message HealthPing { - // No payload -} - -message StreamInstanceHistoryRequest { - string instanceId = 1; - google.protobuf.StringValue executionId = 2; - - // When set to true, the service may return a more optimized response suitable for workers. - bool forWorkItemProcessing = 3; -} - -message HistoryChunk { - repeated HistoryEvent events = 1; -} From 8e4b898de3fb6768d015169bdb7aeea18f419a67 Mon Sep 17 00:00:00 2001 From: wangbill <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 10:30:05 -0700 Subject: [PATCH 15/18] Update build-validation workflow to remove signing step from local Maven publish --- .github/workflows/build-validation.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index 285e2f63..20635b43 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -141,7 +141,7 @@ jobs: uses: gradle/gradle-build-action@v2 - name: Publish to local - run: ./gradlew publishToMavenLocal -x sign + run: ./gradlew publishToMavenLocal - name: Build azure functions sample run: ./gradlew azureFunctionsPackage From 000f1add34ab625c059c0083053059edff44d6a9 Mon Sep 17 00:00:00 2001 From: wangbill <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 13:50:51 -0700 Subject: [PATCH 16/18] Add orchestrator service protobuf definitions for Durable Task framework This commit introduces a new protobuf file defining the orchestrator service, including messages for orchestration instances, activity requests and responses, task failure details, and various orchestration events. The new definitions facilitate communication within the Durable Task framework, enabling orchestration management and event handling. --- .../protos/orchestrator_service.proto | 730 ++++++++++++++++++ 1 file changed, 730 insertions(+) create mode 100644 internal/durabletask-protobuf/protos/orchestrator_service.proto diff --git a/internal/durabletask-protobuf/protos/orchestrator_service.proto b/internal/durabletask-protobuf/protos/orchestrator_service.proto new file mode 100644 index 00000000..64e75281 --- /dev/null +++ b/internal/durabletask-protobuf/protos/orchestrator_service.proto @@ -0,0 +1,730 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +syntax = "proto3"; + +option csharp_namespace = "Microsoft.DurableTask.Protobuf"; +option java_package = "com.microsoft.durabletask.implementation.protobuf"; +option go_package = "/internal/protos"; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/wrappers.proto"; +import "google/protobuf/empty.proto"; + +message OrchestrationInstance { + string instanceId = 1; + google.protobuf.StringValue executionId = 2; +} + +message ActivityRequest { + string name = 1; + google.protobuf.StringValue version = 2; + google.protobuf.StringValue input = 3; + OrchestrationInstance orchestrationInstance = 4; + int32 taskId = 5; + TraceContext parentTraceContext = 6; +} + +message ActivityResponse { + string instanceId = 1; + int32 taskId = 2; + google.protobuf.StringValue result = 3; + TaskFailureDetails failureDetails = 4; + string completionToken = 5; +} + +message TaskFailureDetails { + string errorType = 1; + string errorMessage = 2; + google.protobuf.StringValue stackTrace = 3; + TaskFailureDetails innerFailure = 4; + bool isNonRetriable = 5; +} + +enum OrchestrationStatus { + ORCHESTRATION_STATUS_RUNNING = 0; + ORCHESTRATION_STATUS_COMPLETED = 1; + ORCHESTRATION_STATUS_CONTINUED_AS_NEW = 2; + ORCHESTRATION_STATUS_FAILED = 3; + ORCHESTRATION_STATUS_CANCELED = 4; + ORCHESTRATION_STATUS_TERMINATED = 5; + ORCHESTRATION_STATUS_PENDING = 6; + ORCHESTRATION_STATUS_SUSPENDED = 7; +} + +message ParentInstanceInfo { + int32 taskScheduledId = 1; + google.protobuf.StringValue name = 2; + google.protobuf.StringValue version = 3; + OrchestrationInstance orchestrationInstance = 4; +} + +message TraceContext { + string traceParent = 1; + string spanID = 2 [deprecated=true]; + google.protobuf.StringValue traceState = 3; +} + +message ExecutionStartedEvent { + string name = 1; + google.protobuf.StringValue version = 2; + google.protobuf.StringValue input = 3; + OrchestrationInstance orchestrationInstance = 4; + ParentInstanceInfo parentInstance = 5; + google.protobuf.Timestamp scheduledStartTimestamp = 6; + TraceContext parentTraceContext = 7; + google.protobuf.StringValue orchestrationSpanID = 8; + map tags = 9; +} + +message ExecutionCompletedEvent { + OrchestrationStatus orchestrationStatus = 1; + google.protobuf.StringValue result = 2; + TaskFailureDetails failureDetails = 3; +} + +message ExecutionTerminatedEvent { + google.protobuf.StringValue input = 1; + bool recurse = 2; +} + +message TaskScheduledEvent { + string name = 1; + google.protobuf.StringValue version = 2; + google.protobuf.StringValue input = 3; + TraceContext parentTraceContext = 4; +} + +message TaskCompletedEvent { + int32 taskScheduledId = 1; + google.protobuf.StringValue result = 2; +} + +message TaskFailedEvent { + int32 taskScheduledId = 1; + TaskFailureDetails failureDetails = 2; +} + +message SubOrchestrationInstanceCreatedEvent { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + google.protobuf.StringValue input = 4; + TraceContext parentTraceContext = 5; +} + +message SubOrchestrationInstanceCompletedEvent { + int32 taskScheduledId = 1; + google.protobuf.StringValue result = 2; +} + +message SubOrchestrationInstanceFailedEvent { + int32 taskScheduledId = 1; + TaskFailureDetails failureDetails = 2; +} + +message TimerCreatedEvent { + google.protobuf.Timestamp fireAt = 1; +} + +message TimerFiredEvent { + google.protobuf.Timestamp fireAt = 1; + int32 timerId = 2; +} + +message OrchestratorStartedEvent { + // No payload data +} + +message OrchestratorCompletedEvent { + // No payload data +} + +message EventSentEvent { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue input = 3; +} + +message EventRaisedEvent { + string name = 1; + google.protobuf.StringValue input = 2; +} + +message GenericEvent { + google.protobuf.StringValue data = 1; +} + +message HistoryStateEvent { + OrchestrationState orchestrationState = 1; +} + +message ContinueAsNewEvent { + google.protobuf.StringValue input = 1; +} + +message ExecutionSuspendedEvent { + google.protobuf.StringValue input = 1; +} + +message ExecutionResumedEvent { + google.protobuf.StringValue input = 1; +} + +message EntityOperationSignaledEvent { + string requestId = 1; + string operation = 2; + google.protobuf.Timestamp scheduledTime = 3; + google.protobuf.StringValue input = 4; + google.protobuf.StringValue targetInstanceId = 5; // used only within histories, null in messages +} + +message EntityOperationCalledEvent { + string requestId = 1; + string operation = 2; + google.protobuf.Timestamp scheduledTime = 3; + google.protobuf.StringValue input = 4; + google.protobuf.StringValue parentInstanceId = 5; // used only within messages, null in histories + google.protobuf.StringValue parentExecutionId = 6; // used only within messages, null in histories + google.protobuf.StringValue targetInstanceId = 7; // used only within histories, null in messages +} + +message EntityLockRequestedEvent { + string criticalSectionId = 1; + repeated string lockSet = 2; + int32 position = 3; + google.protobuf.StringValue parentInstanceId = 4; // used only within messages, null in histories +} + +message EntityOperationCompletedEvent { + string requestId = 1; + google.protobuf.StringValue output = 2; +} + +message EntityOperationFailedEvent { + string requestId = 1; + TaskFailureDetails failureDetails = 2; +} + +message EntityUnlockSentEvent { + string criticalSectionId = 1; + google.protobuf.StringValue parentInstanceId = 2; // used only within messages, null in histories + google.protobuf.StringValue targetInstanceId = 3; // used only within histories, null in messages +} + +message EntityLockGrantedEvent { + string criticalSectionId = 1; +} + +message HistoryEvent { + int32 eventId = 1; + google.protobuf.Timestamp timestamp = 2; + oneof eventType { + ExecutionStartedEvent executionStarted = 3; + ExecutionCompletedEvent executionCompleted = 4; + ExecutionTerminatedEvent executionTerminated = 5; + TaskScheduledEvent taskScheduled = 6; + TaskCompletedEvent taskCompleted = 7; + TaskFailedEvent taskFailed = 8; + SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreated = 9; + SubOrchestrationInstanceCompletedEvent subOrchestrationInstanceCompleted = 10; + SubOrchestrationInstanceFailedEvent subOrchestrationInstanceFailed = 11; + TimerCreatedEvent timerCreated = 12; + TimerFiredEvent timerFired = 13; + OrchestratorStartedEvent orchestratorStarted = 14; + OrchestratorCompletedEvent orchestratorCompleted = 15; + EventSentEvent eventSent = 16; + EventRaisedEvent eventRaised = 17; + GenericEvent genericEvent = 18; + HistoryStateEvent historyState = 19; + ContinueAsNewEvent continueAsNew = 20; + ExecutionSuspendedEvent executionSuspended = 21; + ExecutionResumedEvent executionResumed = 22; + EntityOperationSignaledEvent entityOperationSignaled = 23; + EntityOperationCalledEvent entityOperationCalled = 24; + EntityOperationCompletedEvent entityOperationCompleted = 25; + EntityOperationFailedEvent entityOperationFailed = 26; + EntityLockRequestedEvent entityLockRequested = 27; + EntityLockGrantedEvent entityLockGranted = 28; + EntityUnlockSentEvent entityUnlockSent = 29; + } +} + +message ScheduleTaskAction { + string name = 1; + google.protobuf.StringValue version = 2; + google.protobuf.StringValue input = 3; +} + +message CreateSubOrchestrationAction { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + google.protobuf.StringValue input = 4; +} + +message CreateTimerAction { + google.protobuf.Timestamp fireAt = 1; +} + +message SendEventAction { + OrchestrationInstance instance = 1; + string name = 2; + google.protobuf.StringValue data = 3; +} + +message CompleteOrchestrationAction { + OrchestrationStatus orchestrationStatus = 1; + google.protobuf.StringValue result = 2; + google.protobuf.StringValue details = 3; + google.protobuf.StringValue newVersion = 4; + repeated HistoryEvent carryoverEvents = 5; + TaskFailureDetails failureDetails = 6; +} + +message TerminateOrchestrationAction { + string instanceId = 1; + google.protobuf.StringValue reason = 2; + bool recurse = 3; +} + +message SendEntityMessageAction { + oneof EntityMessageType { + EntityOperationSignaledEvent entityOperationSignaled = 1; + EntityOperationCalledEvent entityOperationCalled = 2; + EntityLockRequestedEvent entityLockRequested = 3; + EntityUnlockSentEvent entityUnlockSent = 4; + } +} + +message OrchestratorAction { + int32 id = 1; + oneof orchestratorActionType { + ScheduleTaskAction scheduleTask = 2; + CreateSubOrchestrationAction createSubOrchestration = 3; + CreateTimerAction createTimer = 4; + SendEventAction sendEvent = 5; + CompleteOrchestrationAction completeOrchestration = 6; + TerminateOrchestrationAction terminateOrchestration = 7; + SendEntityMessageAction sendEntityMessage = 8; + } +} + +message OrchestratorRequest { + string instanceId = 1; + google.protobuf.StringValue executionId = 2; + repeated HistoryEvent pastEvents = 3; + repeated HistoryEvent newEvents = 4; + OrchestratorEntityParameters entityParameters = 5; + bool requiresHistoryStreaming = 6; +} + +message OrchestratorResponse { + string instanceId = 1; + repeated OrchestratorAction actions = 2; + google.protobuf.StringValue customStatus = 3; + string completionToken = 4; + + // The number of work item events that were processed by the orchestrator. + // This field is optional. If not set, the service should assume that the orchestrator processed all events. + google.protobuf.Int32Value numEventsProcessed = 5; +} + +message CreateInstanceRequest { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + google.protobuf.StringValue input = 4; + google.protobuf.Timestamp scheduledStartTimestamp = 5; + OrchestrationIdReusePolicy orchestrationIdReusePolicy = 6; + google.protobuf.StringValue executionId = 7; + map tags = 8; + TraceContext parentTraceContext = 9; +} + +message OrchestrationIdReusePolicy { + repeated OrchestrationStatus replaceableStatus = 1; + reserved 2; +} + +message CreateInstanceResponse { + string instanceId = 1; +} + +message GetInstanceRequest { + string instanceId = 1; + bool getInputsAndOutputs = 2; +} + +message GetInstanceResponse { + bool exists = 1; + OrchestrationState orchestrationState = 2; +} + +message RewindInstanceRequest { + string instanceId = 1; + google.protobuf.StringValue reason = 2; +} + +message RewindInstanceResponse { + // Empty for now. Using explicit type incase we want to add content later. +} + +message OrchestrationState { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + OrchestrationStatus orchestrationStatus = 4; + google.protobuf.Timestamp scheduledStartTimestamp = 5; + google.protobuf.Timestamp createdTimestamp = 6; + google.protobuf.Timestamp lastUpdatedTimestamp = 7; + google.protobuf.StringValue input = 8; + google.protobuf.StringValue output = 9; + google.protobuf.StringValue customStatus = 10; + TaskFailureDetails failureDetails = 11; + google.protobuf.StringValue executionId = 12; + google.protobuf.Timestamp completedTimestamp = 13; + google.protobuf.StringValue parentInstanceId = 14; + map tags = 15; +} + +message RaiseEventRequest { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue input = 3; +} + +message RaiseEventResponse { + // No payload +} + +message TerminateRequest { + string instanceId = 1; + google.protobuf.StringValue output = 2; + bool recursive = 3; +} + +message TerminateResponse { + // No payload +} + +message SuspendRequest { + string instanceId = 1; + google.protobuf.StringValue reason = 2; +} + +message SuspendResponse { + // No payload +} + +message ResumeRequest { + string instanceId = 1; + google.protobuf.StringValue reason = 2; +} + +message ResumeResponse { + // No payload +} + +message QueryInstancesRequest { + InstanceQuery query = 1; +} + +message InstanceQuery{ + repeated OrchestrationStatus runtimeStatus = 1; + google.protobuf.Timestamp createdTimeFrom = 2; + google.protobuf.Timestamp createdTimeTo = 3; + repeated google.protobuf.StringValue taskHubNames = 4; + int32 maxInstanceCount = 5; + google.protobuf.StringValue continuationToken = 6; + google.protobuf.StringValue instanceIdPrefix = 7; + bool fetchInputsAndOutputs = 8; +} + +message QueryInstancesResponse { + repeated OrchestrationState orchestrationState = 1; + google.protobuf.StringValue continuationToken = 2; +} + +message PurgeInstancesRequest { + oneof request { + string instanceId = 1; + PurgeInstanceFilter purgeInstanceFilter = 2; + } + bool recursive = 3; +} + +message PurgeInstanceFilter { + google.protobuf.Timestamp createdTimeFrom = 1; + google.protobuf.Timestamp createdTimeTo = 2; + repeated OrchestrationStatus runtimeStatus = 3; +} + +message PurgeInstancesResponse { + int32 deletedInstanceCount = 1; +} + +message CreateTaskHubRequest { + bool recreateIfExists = 1; +} + +message CreateTaskHubResponse { + //no playload +} + +message DeleteTaskHubRequest { + //no playload +} + +message DeleteTaskHubResponse { + //no playload +} + +message SignalEntityRequest { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue input = 3; + string requestId = 4; + google.protobuf.Timestamp scheduledTime = 5; +} + +message SignalEntityResponse { + // no payload +} + +message GetEntityRequest { + string instanceId = 1; + bool includeState = 2; +} + +message GetEntityResponse { + bool exists = 1; + EntityMetadata entity = 2; +} + +message EntityQuery { + google.protobuf.StringValue instanceIdStartsWith = 1; + google.protobuf.Timestamp lastModifiedFrom = 2; + google.protobuf.Timestamp lastModifiedTo = 3; + bool includeState = 4; + bool includeTransient = 5; + google.protobuf.Int32Value pageSize = 6; + google.protobuf.StringValue continuationToken = 7; +} + +message QueryEntitiesRequest { + EntityQuery query = 1; +} + +message QueryEntitiesResponse { + repeated EntityMetadata entities = 1; + google.protobuf.StringValue continuationToken = 2; +} + +message EntityMetadata { + string instanceId = 1; + google.protobuf.Timestamp lastModifiedTime = 2; + int32 backlogQueueSize = 3; + google.protobuf.StringValue lockedBy = 4; + google.protobuf.StringValue serializedState = 5; +} + +message CleanEntityStorageRequest { + google.protobuf.StringValue continuationToken = 1; + bool removeEmptyEntities = 2; + bool releaseOrphanedLocks = 3; +} + +message CleanEntityStorageResponse { + google.protobuf.StringValue continuationToken = 1; + int32 emptyEntitiesRemoved = 2; + int32 orphanedLocksReleased = 3; +} + +message OrchestratorEntityParameters { + google.protobuf.Duration entityMessageReorderWindow = 1; +} + +message EntityBatchRequest { + string instanceId = 1; + google.protobuf.StringValue entityState = 2; + repeated OperationRequest operations = 3; +} + +message EntityBatchResult { + repeated OperationResult results = 1; + repeated OperationAction actions = 2; + google.protobuf.StringValue entityState = 3; + TaskFailureDetails failureDetails = 4; + string completionToken = 5; + repeated OperationInfo operationInfos = 6; // used only with DTS +} + +message EntityRequest { + string instanceId = 1; + string executionId = 2; + google.protobuf.StringValue entityState = 3; // null if entity does not exist + repeated HistoryEvent operationRequests = 4; +} + +message OperationRequest { + string operation = 1; + string requestId = 2; + google.protobuf.StringValue input = 3; +} + +message OperationResult { + oneof resultType { + OperationResultSuccess success = 1; + OperationResultFailure failure = 2; + } +} + +message OperationInfo { + string requestId = 1; + OrchestrationInstance responseDestination = 2; // null for signals +} + +message OperationResultSuccess { + google.protobuf.StringValue result = 1; +} + +message OperationResultFailure { + TaskFailureDetails failureDetails = 1; +} + +message OperationAction { + int32 id = 1; + oneof operationActionType { + SendSignalAction sendSignal = 2; + StartNewOrchestrationAction startNewOrchestration = 3; + } +} + +message SendSignalAction { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue input = 3; + google.protobuf.Timestamp scheduledTime = 4; +} + +message StartNewOrchestrationAction { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + google.protobuf.StringValue input = 4; + google.protobuf.Timestamp scheduledTime = 5; +} + +service TaskHubSidecarService { + // Sends a hello request to the sidecar service. + rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty); + + // Starts a new orchestration instance. + rpc StartInstance(CreateInstanceRequest) returns (CreateInstanceResponse); + + // Gets the status of an existing orchestration instance. + rpc GetInstance(GetInstanceRequest) returns (GetInstanceResponse); + + // Rewinds an orchestration instance to last known good state and replays from there. + rpc RewindInstance(RewindInstanceRequest) returns (RewindInstanceResponse); + + // Waits for an orchestration instance to reach a running or completion state. + rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse); + + // Waits for an orchestration instance to reach a completion state (completed, failed, terminated, etc.). + rpc WaitForInstanceCompletion(GetInstanceRequest) returns (GetInstanceResponse); + + // Raises an event to a running orchestration instance. + rpc RaiseEvent(RaiseEventRequest) returns (RaiseEventResponse); + + // Terminates a running orchestration instance. + rpc TerminateInstance(TerminateRequest) returns (TerminateResponse); + + // Suspends a running orchestration instance. + rpc SuspendInstance(SuspendRequest) returns (SuspendResponse); + + // Resumes a suspended orchestration instance. + rpc ResumeInstance(ResumeRequest) returns (ResumeResponse); + + // rpc DeleteInstance(DeleteInstanceRequest) returns (DeleteInstanceResponse); + + rpc QueryInstances(QueryInstancesRequest) returns (QueryInstancesResponse); + rpc PurgeInstances(PurgeInstancesRequest) returns (PurgeInstancesResponse); + + rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem); + rpc CompleteActivityTask(ActivityResponse) returns (CompleteTaskResponse); + rpc CompleteOrchestratorTask(OrchestratorResponse) returns (CompleteTaskResponse); + rpc CompleteEntityTask(EntityBatchResult) returns (CompleteTaskResponse); + + // Gets the history of an orchestration instance as a stream of events. + rpc StreamInstanceHistory(StreamInstanceHistoryRequest) returns (stream HistoryChunk); + + // Deletes and Creates the necessary resources for the orchestration service and the instance store + rpc CreateTaskHub(CreateTaskHubRequest) returns (CreateTaskHubResponse); + + // Deletes the resources for the orchestration service and optionally the instance store + rpc DeleteTaskHub(DeleteTaskHubRequest) returns (DeleteTaskHubResponse); + + // sends a signal to an entity + rpc SignalEntity(SignalEntityRequest) returns (SignalEntityResponse); + + // get information about a specific entity + rpc GetEntity(GetEntityRequest) returns (GetEntityResponse); + + // query entities + rpc QueryEntities(QueryEntitiesRequest) returns (QueryEntitiesResponse); + + // clean entity storage + rpc CleanEntityStorage(CleanEntityStorageRequest) returns (CleanEntityStorageResponse); +} + +message GetWorkItemsRequest { + int32 maxConcurrentOrchestrationWorkItems = 1; + int32 maxConcurrentActivityWorkItems = 2; + int32 maxConcurrentEntityWorkItems = 3; + + repeated WorkerCapability capabilities = 10; +} + +enum WorkerCapability { + WORKER_CAPABILITY_UNSPECIFIED = 0; + + // Indicates that the worker is capable of streaming instance history as a more optimized + // alternative to receiving the full history embedded in the orchestrator work-item. + // When set, the service may return work items without any history events as an optimization. + // It is strongly recommended that all SDKs support this capability. + WORKER_CAPABILITY_HISTORY_STREAMING = 1; +} + +message WorkItem { + oneof request { + OrchestratorRequest orchestratorRequest = 1; + ActivityRequest activityRequest = 2; + EntityBatchRequest entityRequest = 3; // (older) used by orchestration services implementations + HealthPing healthPing = 4; + EntityRequest entityRequestV2 = 5; // (newer) used by backend service implementations + } + string completionToken = 10; +} + +message CompleteTaskResponse { + // No payload +} + +message HealthPing { + // No payload +} + +message StreamInstanceHistoryRequest { + string instanceId = 1; + google.protobuf.StringValue executionId = 2; + + // When set to true, the service may return a more optimized response suitable for workers. + bool forWorkItemProcessing = 3; +} + +message HistoryChunk { + repeated HistoryEvent events = 1; +} From dcd964db5d426c4360eedf16b25292c291b1027c Mon Sep 17 00:00:00 2001 From: wangbill <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 13:51:32 -0700 Subject: [PATCH 17/18] fix func sample e2e tests --- endtoendtests/e2e-test-setup.ps1 | 2 +- samples-azure-functions/e2e-test-setup.ps1 | 2 +- .../src/test/java/com/functions/EndToEndTests.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/endtoendtests/e2e-test-setup.ps1 b/endtoendtests/e2e-test-setup.ps1 index fe62ecfc..6e57d763 100644 --- a/endtoendtests/e2e-test-setup.ps1 +++ b/endtoendtests/e2e-test-setup.ps1 @@ -7,7 +7,7 @@ param( [string]$ContainerName="app", [switch]$NoSetup=$false, [switch]$NoValidation=$false, - [string]$AzuriteVersion="3.20.1", + [string]$AzuriteVersion="3.34.0", [int]$Sleep=30 ) diff --git a/samples-azure-functions/e2e-test-setup.ps1 b/samples-azure-functions/e2e-test-setup.ps1 index fe62ecfc..6e57d763 100644 --- a/samples-azure-functions/e2e-test-setup.ps1 +++ b/samples-azure-functions/e2e-test-setup.ps1 @@ -7,7 +7,7 @@ param( [string]$ContainerName="app", [switch]$NoSetup=$false, [switch]$NoValidation=$false, - [string]$AzuriteVersion="3.20.1", + [string]$AzuriteVersion="3.34.0", [int]$Sleep=30 ) diff --git a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java index 279854f2..a756b5db 100644 --- a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java +++ b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java @@ -168,7 +168,7 @@ public void thenChain() throws InterruptedException { Set continueStates = new HashSet<>(); continueStates.add("Pending"); continueStates.add("Running"); - final String expect = "AUSTIN-test"; + final String expect = "\"AUSTIN\"-test"; String startOrchestrationPath = "/api/StartOrchestrationThenChain"; Response response = post(startOrchestrationPath); JsonPath jsonPath = response.jsonPath(); @@ -226,7 +226,7 @@ public void orchestrationPOJO() throws InterruptedException { assertTrue(pass); Response statusResponse = get(statusQueryGetUri); String outputName = statusResponse.jsonPath().get("output.name"); - assertEquals("TESTNAME", outputName); + assertEquals("\"TESTNAME\"", outputName); } private boolean pollingCheck(String statusQueryGetUri, From 1ea7bd510411a9c251b194d9399b2a1b8b5b1b0d Mon Sep 17 00:00:00 2001 From: wangbill <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 20 Mar 2025 14:08:18 -0700 Subject: [PATCH 18/18] fix func e2e tests --- endtoendtests/src/test/java/com/functions/EndToEndTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/endtoendtests/src/test/java/com/functions/EndToEndTests.java b/endtoendtests/src/test/java/com/functions/EndToEndTests.java index 53d40dcd..65b9fb70 100644 --- a/endtoendtests/src/test/java/com/functions/EndToEndTests.java +++ b/endtoendtests/src/test/java/com/functions/EndToEndTests.java @@ -168,7 +168,7 @@ public void thenChain() throws InterruptedException { Set continueStates = new HashSet<>(); continueStates.add("Pending"); continueStates.add("Running"); - final String expect = "AUSTIN-test"; + final String expect = "\"AUSTIN\"-test"; String startOrchestrationPath = "/api/StartOrchestrationThenChain"; Response response = post(startOrchestrationPath); JsonPath jsonPath = response.jsonPath();