diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java index 10af6c30d2f7e..290a38f8661ab 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java @@ -193,6 +193,7 @@ private LockUpsertResult handleUpsertS3Exception(S3Exception e) { logger.info("OwnerId: {}, Retriable conditional request conflict error: {}", ownerId, lockFilePath); } else if (status == RATE_LIMIT_ERROR_CODE) { logger.warn("OwnerId: {}, Rate limit exceeded for: {}", ownerId, lockFilePath); + return LockUpsertResult.THROTTLED; } else if (status >= INTERNAL_SERVER_ERROR_CODE_MIN) { logger.warn("OwnerId: {}, internal server error for: {}", ownerId, lockFilePath, e); } else { diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java index 0358cfa281aed..487268a6b4a24 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java @@ -49,8 +49,10 @@ import java.io.IOException; import java.util.Properties; +import static org.apache.hudi.client.transaction.lock.models.LockUpsertResult.THROTTLED; import static org.apache.hudi.client.transaction.lock.models.LockUpsertResult.UNKNOWN_ERROR; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -222,8 +224,8 @@ void testTryCreateOrUpdateLockFile_rateLimitExceeded() { Pair> result = lockService.tryUpsertLockFile(lockData, Option.empty()); - assertEquals(UNKNOWN_ERROR, result.getLeft()); - assertTrue(result.getRight().isEmpty()); + assertEquals(THROTTLED, result.getLeft()); + assertFalse(result.getRight().isPresent()); verify(mockLogger).warn(contains("Rate limit exceeded"), eq(OWNER_ID), eq(LOCK_FILE_PATH)); } diff --git a/hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java b/hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java index d344cfc341bdb..d62a0f0184304 100644 --- a/hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java +++ b/hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java @@ -317,10 +317,11 @@ private LockUpsertResult handleUpsertBlobStorageException(BlobStorageException e logger.info("OwnerId: {}, Unable to write new lock file. Another process has modified this lockfile {} already.", ownerId, lockFileUri); return LockUpsertResult.ACQUIRED_BY_OTHERS; - } else if (code == CONFLICT_ERROR_CODE) { - logger.info("OwnerId: {}, Retriable conditional request conflict error: {}", ownerId, lockFileUri); } else if (code == RATE_LIMIT_ERROR_CODE) { logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}", ownerId, lockFileUri); + return LockUpsertResult.THROTTLED; + } else if (code == CONFLICT_ERROR_CODE) { + logger.info("OwnerId: {}, Retriable conditional request conflict error: {}", ownerId, lockFileUri); } else if (code >= INTERNAL_SERVER_ERROR_CODE_MIN) { logger.warn("OwnerId: {}, Azure returned internal server error code for lock file: {}", ownerId, lockFileUri, e); } else { diff --git a/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClient.java b/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClient.java index d4a1ff822d7c0..da6eb7446e07f 100644 --- a/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClient.java +++ b/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClient.java @@ -196,7 +196,7 @@ void testTryUpsertLockFile_preconditionFailed_returnsAcquiredByOthers() { } @Test - void testTryUpsertLockFile_rateLimit_returnsUnknownError() { + void testTryUpsertLockFile_rateLimit_returnsThrottled() { StorageLockData lockData = new StorageLockData(false, 999L, "owner"); BlobStorageException ex = mock(BlobStorageException.class); when(ex.getStatusCode()).thenReturn(429); @@ -204,7 +204,7 @@ void testTryUpsertLockFile_rateLimit_returnsUnknownError() { Pair> result = lockClient.tryUpsertLockFile(lockData, Option.empty()); - assertEquals(LockUpsertResult.UNKNOWN_ERROR, result.getLeft()); + assertEquals(LockUpsertResult.THROTTLED, result.getLeft()); assertTrue(result.getRight().isEmpty()); verify(mockLogger).warn(contains("Rate limit exceeded"), eq(OWNER_ID), eq(LOCK_FILE_URI)); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java index 69ed1c6f572e6..dc99975414b1a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java @@ -86,6 +86,12 @@ public class StorageBasedLockProvider implements LockProvider { // However, since our lock leases are pretty long, we can use a high buffer. private static final long CLOCK_DRIFT_BUFFER_MS = 500; + // How long to wait before retrying the lock-expire write after a THROTTLED response. Tuned to be + // longer than typical cloud-storage rate-limit windows (e.g., GCS's 1-write/sec per object) so + // a single retry has a reasonable chance of succeeding. + @VisibleForTesting + static final long THROTTLE_RETRY_DELAY_SECONDS = 1; + // Use for testing private final Logger logger; @@ -346,9 +352,24 @@ public synchronized boolean tryLock() { newLockData, latestLock.getRight()); if (lockUpdateStatus.getLeft() != LockUpsertResult.SUCCESS) { - // failed to acquire the lock, indicates concurrent contention logInfoLockState(FAILED_TO_ACQUIRE); - hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockAcquirePreconditionFailureMetric); + switch (lockUpdateStatus.getLeft()) { + case ACQUIRED_BY_OTHERS: + // failed to acquire the lock, indicates concurrent contention + hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockAcquirePreconditionFailureMetric); + break; + case THROTTLED: + // The write was rejected; we did not acquire. Transient — caller may retry. + hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric); + break; + case UNKNOWN_ERROR: + // Lock state is unknown after the upsert attempt; surface it as such. + hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric); + break; + default: + hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockProviderFatalErrorMetric); + break; + } return false; } this.setLock(lockUpdateStatus.getRight().get()); @@ -424,22 +445,60 @@ private boolean believesLockMightBeHeld() { * Unlock by marking our current lock file "expired": true. */ @Override - public synchronized void unlock() { - assertHeartbeatManagerExists(); - if (!believesLockMightBeHeld()) { - return; + public void unlock() { + ExpireLockResult expireResult; + // Snapshot the exact StorageLockFile we intend to expire. Identity comparison in the retry + // path guards against a concurrent tryLock() that, if our original lock had expired during + // the 1s sleep, could have installed a fresh lock that the retry must NOT mark expired. + StorageLockFile lockToExpire; + synchronized (this) { + assertHeartbeatManagerExists(); + if (!believesLockMightBeHeld()) { + return; + } + + // Try to stop the heartbeat first + if (heartbeatManager.hasActiveHeartbeat()) { + logger.debug("Owner {}: Gracefully shutting down heartbeat.", ownerId); + if (!heartbeatManager.stopHeartbeat(true)) { + hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric); + throw new HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE)); + } + } + + // Then expire the current lock. + lockToExpire = getLock(); + expireResult = tryExpireCurrentLock(false); } - boolean believesNoLongerHoldsLock = true; - // Try to stop the heartbeat first - if (heartbeatManager.hasActiveHeartbeat()) { - logger.debug("Owner {}: Gracefully shutting down heartbeat.", ownerId); - believesNoLongerHoldsLock &= heartbeatManager.stopHeartbeat(true); + // If throttled, retry once after sleeping outside the monitor to avoid blocking other threads. + // Note: when unlock() is called via close() -> shutdown(), the outer synchronized caller still + // holds the provider monitor through reentrant locking, so other threads remain blocked in + // that scenario. This is acceptable since close() is a shutdown path, not the hot path. + if (expireResult == ExpireLockResult.THROTTLED) { + logger.warn("Owner {}: Lock expiration was throttled, retrying after {} seconds.", + ownerId, THROTTLE_RETRY_DELAY_SECONDS); + try { + TimeUnit.SECONDS.sleep(THROTTLE_RETRY_DELAY_SECONDS); + } catch (InterruptedException ie) { + // Re-set the interrupt flag and abandon the retry — an interrupted thread shouldn't keep + // doing work. The caller will see FAILED_TO_RELEASE below. + Thread.currentThread().interrupt(); + hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric); + throw new HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE)); + } + synchronized (this) { + // Bail out if the lock was either cleared by another path (e.g. shutdown hook, + // concurrent unlock) OR replaced by a concurrent tryLock(): operating on a fresh + // lock would mark a lock that another caller now holds as expired. + if (!believesLockMightBeHeld() || getLock() != lockToExpire) { + return; + } + expireResult = tryExpireCurrentLock(false); + } } - // Then expire the current lock. - believesNoLongerHoldsLock &= tryExpireCurrentLock(false); - if (!believesNoLongerHoldsLock) { + if (expireResult != ExpireLockResult.SUCCESS) { hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric); throw new HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE)); } @@ -461,15 +520,26 @@ private void assertUnclosed() { } /** - * Tries to expire the currently held lock. + * Result of a single attempt to expire the currently held lock. + */ + @VisibleForTesting + enum ExpireLockResult { + SUCCESS, + THROTTLED, + FAILED + } + + /** + * Tries to expire the currently held lock. This is a single-attempt primitive; + * callers are responsible for retry policy. * * @param fromShutdownHook Whether we are attempting best effort quick unlock from shutdown hook. - * @return True if we were successfully able to upload an expired lock. + * @return The result of the expire attempt. */ - private synchronized boolean tryExpireCurrentLock(boolean fromShutdownHook) { + @VisibleForTesting + synchronized ExpireLockResult tryExpireCurrentLock(boolean fromShutdownHook) { // It does not make sense to have heartbeat alive extending the lock lease while - // here we are trying - // to expire the lock. + // here we are trying to expire the lock. if (!fromShutdownHook && heartbeatManager.hasActiveHeartbeat()) { // broken function precondition. throw new HoodieLockException("Must stop heartbeat before expire lock file"); @@ -477,26 +547,30 @@ private synchronized boolean tryExpireCurrentLock(boolean fromShutdownHook) { logDebugLockState(RELEASING); // Upload metadata that will unlock this lock. StorageLockData expiredLockData = new StorageLockData(true, this.getLock().getValidUntilMs(), ownerId); - Pair> result; long lockExpirationTimeMs = System.currentTimeMillis(); - result = this.storageLockClient.tryUpsertLockFile(expiredLockData, Option.of(this.getLock())); + Pair> result = + this.storageLockClient.tryUpsertLockFile(expiredLockData, Option.of(this.getLock())); switch (result.getLeft()) { case UNKNOWN_ERROR: // Here we do not know the state of the lock. logErrorLockState(FAILED_TO_RELEASE, "Lock state is unknown."); hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric); - return false; + return ExpireLockResult.FAILED; + case THROTTLED: + logWarnLockState(FAILED_TO_RELEASE, "Lock expiration write was throttled."); + hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric); + return ExpireLockResult.THROTTLED; case SUCCESS: logInfoLockState(RELEASED); recordAuditOperation(AuditOperationState.END, lockExpirationTimeMs); setLock(null); - return true; + return ExpireLockResult.SUCCESS; case ACQUIRED_BY_OTHERS: // Lock was acquired by others, indicating heartbeat failure during lock hold period. logErrorLockState(FAILED_TO_RELEASE, "lock was acquired by others, indicating heartbeat failure."); setLock(null); hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockAcquiredByOthersErrorMetric); - return false; + return ExpireLockResult.FAILED; default: hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric); throw new HoodieLockException("Unexpected lock update result: " + result.getLeft()); @@ -551,6 +625,12 @@ protected synchronized boolean renewLock() { hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric); // Let heartbeat retry later. return true; + case THROTTLED: + // Throttling is transient, let the heartbeat retry on its next cycle. + logger.warn("Owner {}: Unable to renew lock due to throttling, will retry on next heartbeat.", ownerId); + hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric); + // Let heartbeat retry later. + return true; case SUCCESS: // Only positive outcome this.setLock(currentLock.getRight().get()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java index 27f06f59f58b5..558a1bf11dd87 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java @@ -29,7 +29,9 @@ public enum LockUpsertResult { // Another process has modified the lock file (precondition failure) with code 1 ACQUIRED_BY_OTHERS(1), // Unable to determine lock state due to transient errors with code 2 - UNKNOWN_ERROR(2); + UNKNOWN_ERROR(2), + // Request was throttled by the storage backend (e.g. HTTP 429) with code 3 + THROTTLED(3); private final int code; } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java index f2f967a082379..a51a1af06eca2 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java @@ -411,6 +411,88 @@ void testUnlockThrowsExceptionWhenLockAcquiredByOthers() { when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false); } + @Test + void testUnlockSucceedsAfterThrottledRetry() { + // Simulates the GCP 1-write/sec limit: the first expire attempt gets THROTTLED, + // the retry (after sleeping outside the monitor) succeeds. + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty())); + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile realLockFile = new StorageLockFile(data, "v1"); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty()))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile))); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + assertTrue(lockProvider.tryLock()); + + when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true).thenReturn(false); + StorageLockFile expiredLockFile = new StorageLockFile(new StorageLockData(true, data.getValidUntil(), ownerId), "v2"); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(realLockFile)))) + .thenReturn(Pair.of(LockUpsertResult.THROTTLED, Option.empty())) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(expiredLockFile))); + + lockProvider.unlock(); + + assertNull(lockProvider.getLock(), "Lock should be null after successful unlock on retry"); + verify(mockLockService, times(2)).tryUpsertLockFile(any(), eq(Option.of(realLockFile))); + } + + @Test + void testUnlockBailsOutWhenLockReplacedDuringRetrySleep() { + // Race: the first expire attempt returns THROTTLED. While unlock() is sleeping outside + // the monitor, the original lock expires and a concurrent tryLock() acquires a fresh + // lock (currentLockObj is replaced from lock1 -> lock2). The retry MUST detect this + // and bail out — operating on lock2 would mark a lock that another caller now holds + // as expired. + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty())); + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile lock1 = new StorageLockFile(data, "v1"); + StorageLockFile lock2 = new StorageLockFile(data, "v2"); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty()))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(lock1))); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + assertTrue(lockProvider.tryLock()); + + when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true).thenReturn(false); + // Simulate the race in the same answer that returns THROTTLED: by the time the sleep + // ends, getLock() returns lock2 (the lock a concurrent tryLock() would have acquired). + when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lock1)))) + .thenAnswer(inv -> { + doReturn(lock2).when(lockProvider).getLock(); + return Pair.of(LockUpsertResult.THROTTLED, Option.empty()); + }); + + // unlock() should return normally — the original lock is no longer the caller's + // responsibility, and we must not touch lock2. + lockProvider.unlock(); + + verify(mockLockService, times(1)).tryUpsertLockFile(any(), eq(Option.of(lock1))); + verify(mockLockService, never()).tryUpsertLockFile(any(), eq(Option.of(lock2))); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false); + } + + @Test + void testUnlockThrowsExceptionWhenStillThrottledAfterRetry() { + // Both the initial attempt and the retry get THROTTLED — unlock should fail. + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty())); + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile realLockFile = new StorageLockFile(data, "v1"); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty()))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile))); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + assertTrue(lockProvider.tryLock()); + + when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true).thenReturn(false); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(realLockFile)))) + .thenReturn(Pair.of(LockUpsertResult.THROTTLED, Option.empty())); + + HoodieLockException exception = assertThrows(HoodieLockException.class, () -> lockProvider.unlock()); + assertTrue(exception.getMessage().contains("FAILED_TO_RELEASE")); + verify(mockLockService, times(2)).tryUpsertLockFile(any(), eq(Option.of(realLockFile))); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false); + } + @Test void testCloseFailsToStopHeartbeat() { when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty())); @@ -466,6 +548,18 @@ void testRenewLockUnableToUpsertLockFileButNotFatal() { assertTrue(lockProvider.renewLock()); } + @Test + void testRenewLockThrottledReturnsTrue() { + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile lockFile = new StorageLockFile(data, "v1"); + doReturn(lockFile).when(lockProvider).getLock(); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile)))) + .thenReturn(Pair.of(LockUpsertResult.THROTTLED, Option.empty())); + // Throttling is transient, renewLock should return true so the heartbeat retries later. + assertTrue(lockProvider.renewLock()); + verify(mockLogger).warn("Owner {}: Unable to renew lock due to throttling, will retry on next heartbeat.", this.ownerId); + } + @Test void testRenewLockUnableToUpsertLockFileFatal() { StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java index 123b76bb69269..503d2f07c8893 100644 --- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java +++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java @@ -150,6 +150,7 @@ public Pair> tryUpsertLockFile( return Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS, Option.empty()); } else if (e.getCode() == RATE_LIMIT_ERROR_CODE) { logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}", ownerId, lockFilePath); + return Pair.of(LockUpsertResult.THROTTLED, Option.empty()); } else if (e.getCode() >= INTERNAL_SERVER_ERROR_CODE_MIN) { logger.warn("OwnerId: {}, GCS returned internal server error code for lock file: {}", ownerId, lockFilePath, e); diff --git a/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java b/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java index 9f57077bc1c8e..2033d8bc50739 100644 --- a/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java +++ b/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java @@ -49,6 +49,7 @@ import java.util.Properties; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -187,8 +188,8 @@ void testTryCreateOrUpdateLockFile_rateLimitExceeded() { Pair> result = lockService.tryUpsertLockFile(lockData, Option.empty()); - assertEquals(LockUpsertResult.UNKNOWN_ERROR, result.getLeft()); - assertTrue(result.getRight().isEmpty(), "Should return empty when a 429 occurs"); + assertEquals(LockUpsertResult.THROTTLED, result.getLeft()); + assertFalse(result.getRight().isPresent(), "Should return empty when a 429 occurs"); verify(mockLogger).warn(contains("Rate limit exceeded"), eq(OWNER_ID), eq(LOCK_FILE_PATH)); }