From 79ce7f5d0cff2a1d15cefb8ef11e581028f43825 Mon Sep 17 00:00:00 2001 From: Siddharth Agrawal Date: Fri, 18 Apr 2025 18:33:27 -0700 Subject: [PATCH] fix: don't start a retry timer if connection was closed due to being idle --- .../bigquery/storage/v1/ConnectionWorker.java | 20 ++++++--- .../storage/v1/ConnectionWorkerTest.java | 44 +++++++++++++++++++ 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index dbb7e6ce02..6e8b24de6a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -1273,23 +1273,28 @@ private void doneCallback(Throwable finalStatus) { + writerId + " Final status: " + finalStatus.toString()); + boolean closedIdleConnection = + finalStatus.toString().contains("Closing the stream because it has been inactive"); this.lock.lock(); try { this.streamConnectionIsConnected = false; this.telemetryMetrics.recordConnectionEnd( Code.values()[Status.fromThrowable(finalStatus).getCode().ordinal()].toString()); if (connectionFinalStatus == null) { - if (connectionRetryStartTime == 0) { + if (!closedIdleConnection && connectionRetryStartTime == 0) { connectionRetryStartTime = System.currentTimeMillis(); } // If the error can be retried, don't set it here, let it try to retry later on. if (isConnectionErrorRetriable(Status.fromThrowable(finalStatus).getCode()) && !userClosed && (maxRetryDuration.toMillis() == 0f + || closedIdleConnection || System.currentTimeMillis() - connectionRetryStartTime <= maxRetryDuration.toMillis())) { - this.conectionRetryCountWithoutCallback++; - this.telemetryMetrics.recordConnectionStartWithRetry(); + if (!closedIdleConnection) { + this.conectionRetryCountWithoutCallback++; + this.telemetryMetrics.recordConnectionStartWithRetry(); + } log.info( "Connection is going to be reestablished with the next request. Retriable error " + finalStatus.toString() @@ -1297,7 +1302,9 @@ private void doneCallback(Throwable finalStatus) { + conectionRetryCountWithoutCallback + ", millis left to retry " + (maxRetryDuration.toMillis() - - (System.currentTimeMillis() - connectionRetryStartTime)) + - (connectionRetryStartTime > 0 + ? System.currentTimeMillis() - connectionRetryStartTime + : 0)) + ", for stream " + streamName + " id:" @@ -1311,7 +1318,10 @@ private void doneCallback(Throwable finalStatus) { + " for stream " + streamName + " with write id: " - + writerId); + + writerId + + ", millis left to retry was " + + (maxRetryDuration.toMillis() + - (System.currentTimeMillis() - connectionRetryStartTime))); } } } finally { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 961c46f0da..8ee6437e77 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -32,6 +32,7 @@ import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Int64Value; +import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.opentelemetry.api.common.Attributes; import java.io.IOException; @@ -898,6 +899,49 @@ public void testOpenTelemetryAttributesWithTraceId() throws Exception { exerciseOpenTelemetryAttributesWithTraceId("a:b dataflow:c:d", "c", "d", null); } + @Test + public void testDoubleDisconnectWithShorterRetryDuration() throws Exception { + // simulate server disconnect due to idle stream + testBigQueryWrite.setFailedStatus( + Status.ABORTED.withDescription( + "Closing the stream because it has been inactive for 600 seconds.")); + testBigQueryWrite.setCloseEveryNAppends(1); + testBigQueryWrite.setTimesToClose( + 2); // Total of 2 connection failures. The time interval between the processing of these + // failures will exceed the configured maxRetryDuration. + testBigQueryWrite.addResponse(createAppendResponse(0)); + + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema1) + .build(); + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + "us", + schema1, + 100000, + 100000, + Duration.ofMillis(1), // very small maxRetryDuration + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + null, + client.getSettings(), + retrySettings, + /*enableRequestProfiler=*/ false, + /*enableOpenTelemetry=*/ false, + /*isMultiplexing*/ false); + + List> futures = new ArrayList<>(); + futures.add( + sendTestMessage( + connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(0)}), 0)); + + assertEquals(0, futures.get(0).get().getAppendResult().getOffset().getValue()); + } + @Test public void testLocationName() throws Exception { assertEquals(