Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ private LockUpsertResult handleUpsertS3Exception(S3Exception e) {
logger.info("OwnerId: {}, Retriable conditional request conflict error: {}", ownerId, lockFilePath);
} else if (status == RATE_LIMIT_ERROR_CODE) {
logger.warn("OwnerId: {}, Rate limit exceeded for: {}", ownerId, lockFilePath);
return LockUpsertResult.THROTTLED;
} else if (status >= INTERNAL_SERVER_ERROR_CODE_MIN) {
logger.warn("OwnerId: {}, internal server error for: {}", ownerId, lockFilePath, e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@
import java.io.IOException;
import java.util.Properties;

import static org.apache.hudi.client.transaction.lock.models.LockUpsertResult.THROTTLED;
import static org.apache.hudi.client.transaction.lock.models.LockUpsertResult.UNKNOWN_ERROR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -222,8 +224,8 @@ void testTryCreateOrUpdateLockFile_rateLimitExceeded() {
Pair<LockUpsertResult, Option<StorageLockFile>> result =
lockService.tryUpsertLockFile(lockData, Option.empty());

assertEquals(UNKNOWN_ERROR, result.getLeft());
assertTrue(result.getRight().isEmpty());
assertEquals(THROTTLED, result.getLeft());
assertFalse(result.getRight().isPresent());
verify(mockLogger).warn(contains("Rate limit exceeded"), eq(OWNER_ID), eq(LOCK_FILE_PATH));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,11 @@ private LockUpsertResult handleUpsertBlobStorageException(BlobStorageException e
logger.info("OwnerId: {}, Unable to write new lock file. Another process has modified this lockfile {} already.",
ownerId, lockFileUri);
return LockUpsertResult.ACQUIRED_BY_OTHERS;
} else if (code == CONFLICT_ERROR_CODE) {
logger.info("OwnerId: {}, Retriable conditional request conflict error: {}", ownerId, lockFileUri);
} else if (code == RATE_LIMIT_ERROR_CODE) {
logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}", ownerId, lockFileUri);
return LockUpsertResult.THROTTLED;
} else if (code == CONFLICT_ERROR_CODE) {
logger.info("OwnerId: {}, Retriable conditional request conflict error: {}", ownerId, lockFileUri);
} else if (code >= INTERNAL_SERVER_ERROR_CODE_MIN) {
logger.warn("OwnerId: {}, Azure returned internal server error code for lock file: {}", ownerId, lockFileUri, e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,15 @@ void testTryUpsertLockFile_preconditionFailed_returnsAcquiredByOthers() {
}

@Test
void testTryUpsertLockFile_rateLimit_returnsUnknownError() {
void testTryUpsertLockFile_rateLimit_returnsThrottled() {
StorageLockData lockData = new StorageLockData(false, 999L, "owner");
BlobStorageException ex = mock(BlobStorageException.class);
when(ex.getStatusCode()).thenReturn(429);
when(mockBlobClient.uploadWithResponse(any(BlobParallelUploadOptions.class), isNull(), eq(Context.NONE))).thenThrow(ex);

Pair<LockUpsertResult, Option<StorageLockFile>> result = lockClient.tryUpsertLockFile(lockData, Option.empty());

assertEquals(LockUpsertResult.UNKNOWN_ERROR, result.getLeft());
assertEquals(LockUpsertResult.THROTTLED, result.getLeft());
assertTrue(result.getRight().isEmpty());
verify(mockLogger).warn(contains("Rate limit exceeded"), eq(OWNER_ID), eq(LOCK_FILE_URI));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ public class StorageBasedLockProvider implements LockProvider<StorageLockFile> {
// However, since our lock leases are pretty long, we can use a high buffer.
private static final long CLOCK_DRIFT_BUFFER_MS = 500;

// How long to wait before retrying the lock-expire write after a THROTTLED response. Tuned to be
// longer than typical cloud-storage rate-limit windows (e.g., GCS's 1-write/sec per object) so
// a single retry has a reasonable chance of succeeding.
@VisibleForTesting
static final long THROTTLE_RETRY_DELAY_SECONDS = 1;

// Use for testing
private final Logger logger;

Expand Down Expand Up @@ -346,9 +352,24 @@ public synchronized boolean tryLock() {
newLockData,
latestLock.getRight());
if (lockUpdateStatus.getLeft() != LockUpsertResult.SUCCESS) {
// failed to acquire the lock, indicates concurrent contention
logInfoLockState(FAILED_TO_ACQUIRE);
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockAcquirePreconditionFailureMetric);
switch (lockUpdateStatus.getLeft()) {
case ACQUIRED_BY_OTHERS:
// failed to acquire the lock, indicates concurrent contention
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockAcquirePreconditionFailureMetric);
break;
case THROTTLED:
Comment thread
linliu-code marked this conversation as resolved.
Comment thread
linliu-code marked this conversation as resolved.
// The write was rejected; we did not acquire. Transient — caller may retry.
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric);
break;
case UNKNOWN_ERROR:
// Lock state is unknown after the upsert attempt; surface it as such.
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric);
break;
default:
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockProviderFatalErrorMetric);
break;
}
return false;
}
this.setLock(lockUpdateStatus.getRight().get());
Expand Down Expand Up @@ -424,22 +445,60 @@ private boolean believesLockMightBeHeld() {
* Unlock by marking our current lock file "expired": true.
*/
@Override
public synchronized void unlock() {
assertHeartbeatManagerExists();
if (!believesLockMightBeHeld()) {
return;
public void unlock() {
ExpireLockResult expireResult;
// Snapshot the exact StorageLockFile we intend to expire. Identity comparison in the retry
// path guards against a concurrent tryLock() that, if our original lock had expired during
// the 1s sleep, could have installed a fresh lock that the retry must NOT mark expired.
StorageLockFile lockToExpire;
synchronized (this) {
assertHeartbeatManagerExists();
if (!believesLockMightBeHeld()) {
return;
}

// Try to stop the heartbeat first
if (heartbeatManager.hasActiveHeartbeat()) {
logger.debug("Owner {}: Gracefully shutting down heartbeat.", ownerId);
if (!heartbeatManager.stopHeartbeat(true)) {
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric);
throw new HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE));
}
}

// Then expire the current lock.
lockToExpire = getLock();
expireResult = tryExpireCurrentLock(false);
}
boolean believesNoLongerHoldsLock = true;

// Try to stop the heartbeat first
if (heartbeatManager.hasActiveHeartbeat()) {
logger.debug("Owner {}: Gracefully shutting down heartbeat.", ownerId);
believesNoLongerHoldsLock &= heartbeatManager.stopHeartbeat(true);
// If throttled, retry once after sleeping outside the monitor to avoid blocking other threads.
// Note: when unlock() is called via close() -> shutdown(), the outer synchronized caller still
// holds the provider monitor through reentrant locking, so other threads remain blocked in
// that scenario. This is acceptable since close() is a shutdown path, not the hot path.
if (expireResult == ExpireLockResult.THROTTLED) {
logger.warn("Owner {}: Lock expiration was throttled, retrying after {} seconds.",
ownerId, THROTTLE_RETRY_DELAY_SECONDS);
try {
TimeUnit.SECONDS.sleep(THROTTLE_RETRY_DELAY_SECONDS);
} catch (InterruptedException ie) {
Comment thread
linliu-code marked this conversation as resolved.
// Re-set the interrupt flag and abandon the retry — an interrupted thread shouldn't keep
// doing work. The caller will see FAILED_TO_RELEASE below.
Thread.currentThread().interrupt();
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric);
throw new HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE));
}
synchronized (this) {
// Bail out if the lock was either cleared by another path (e.g. shutdown hook,
// concurrent unlock) OR replaced by a concurrent tryLock(): operating on a fresh
// lock would mark a lock that another caller now holds as expired.
if (!believesLockMightBeHeld() || getLock() != lockToExpire) {
return;
}
expireResult = tryExpireCurrentLock(false);
}
}

// Then expire the current lock.
believesNoLongerHoldsLock &= tryExpireCurrentLock(false);
if (!believesNoLongerHoldsLock) {
if (expireResult != ExpireLockResult.SUCCESS) {
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric);
throw new HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE));
}
Expand All @@ -461,42 +520,57 @@ private void assertUnclosed() {
}

/**
* Tries to expire the currently held lock.
* Result of a single attempt to expire the currently held lock.
*/
@VisibleForTesting
enum ExpireLockResult {
SUCCESS,
THROTTLED,
FAILED
}

/**
* Tries to expire the currently held lock. This is a single-attempt primitive;
* callers are responsible for retry policy.
*
* @param fromShutdownHook Whether we are attempting best effort quick unlock from shutdown hook.
* @return True if we were successfully able to upload an expired lock.
* @return The result of the expire attempt.
*/
private synchronized boolean tryExpireCurrentLock(boolean fromShutdownHook) {
@VisibleForTesting
synchronized ExpireLockResult tryExpireCurrentLock(boolean fromShutdownHook) {
// It does not make sense to have heartbeat alive extending the lock lease while
// here we are trying
// to expire the lock.
// here we are trying to expire the lock.
if (!fromShutdownHook && heartbeatManager.hasActiveHeartbeat()) {
// broken function precondition.
throw new HoodieLockException("Must stop heartbeat before expire lock file");
}
logDebugLockState(RELEASING);
// Upload metadata that will unlock this lock.
StorageLockData expiredLockData = new StorageLockData(true, this.getLock().getValidUntilMs(), ownerId);
Pair<LockUpsertResult, Option<StorageLockFile>> result;
long lockExpirationTimeMs = System.currentTimeMillis();
result = this.storageLockClient.tryUpsertLockFile(expiredLockData, Option.of(this.getLock()));
Pair<LockUpsertResult, Option<StorageLockFile>> result =
this.storageLockClient.tryUpsertLockFile(expiredLockData, Option.of(this.getLock()));
switch (result.getLeft()) {
case UNKNOWN_ERROR:
Comment thread
linliu-code marked this conversation as resolved.
// Here we do not know the state of the lock.
logErrorLockState(FAILED_TO_RELEASE, "Lock state is unknown.");
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric);
return false;
return ExpireLockResult.FAILED;
case THROTTLED:
logWarnLockState(FAILED_TO_RELEASE, "Lock expiration write was throttled.");
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric);
return ExpireLockResult.THROTTLED;
case SUCCESS:
logInfoLockState(RELEASED);
recordAuditOperation(AuditOperationState.END, lockExpirationTimeMs);
setLock(null);
return true;
return ExpireLockResult.SUCCESS;
case ACQUIRED_BY_OTHERS:
// Lock was acquired by others, indicating heartbeat failure during lock hold period.
logErrorLockState(FAILED_TO_RELEASE, "lock was acquired by others, indicating heartbeat failure.");
setLock(null);
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockAcquiredByOthersErrorMetric);
return false;
return ExpireLockResult.FAILED;
default:
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric);
throw new HoodieLockException("Unexpected lock update result: " + result.getLeft());
Expand Down Expand Up @@ -551,6 +625,12 @@ protected synchronized boolean renewLock() {
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric);
// Let heartbeat retry later.
return true;
case THROTTLED:
// Throttling is transient, let the heartbeat retry on its next cycle.
logger.warn("Owner {}: Unable to renew lock due to throttling, will retry on next heartbeat.", ownerId);
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric);
// Let heartbeat retry later.
return true;
case SUCCESS:
// Only positive outcome
this.setLock(currentLock.getRight().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ public enum LockUpsertResult {
// Another process has modified the lock file (precondition failure) with code 1
ACQUIRED_BY_OTHERS(1),
// Unable to determine lock state due to transient errors with code 2
UNKNOWN_ERROR(2);
UNKNOWN_ERROR(2),
// Request was throttled by the storage backend (e.g. HTTP 429) with code 3
THROTTLED(3);

private final int code;
}
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,88 @@ void testUnlockThrowsExceptionWhenLockAcquiredByOthers() {
when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
}

@Test
void testUnlockSucceedsAfterThrottledRetry() {
// Simulates the GCP 1-write/sec limit: the first expire attempt gets THROTTLED,
// the retry (after sleeping outside the monitor) succeeds.
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
StorageLockFile realLockFile = new StorageLockFile(data, "v1");
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile)));
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
assertTrue(lockProvider.tryLock());

when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true).thenReturn(false);
StorageLockFile expiredLockFile = new StorageLockFile(new StorageLockData(true, data.getValidUntil(), ownerId), "v2");
when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(realLockFile))))
.thenReturn(Pair.of(LockUpsertResult.THROTTLED, Option.empty()))
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(expiredLockFile)));

lockProvider.unlock();

assertNull(lockProvider.getLock(), "Lock should be null after successful unlock on retry");
verify(mockLockService, times(2)).tryUpsertLockFile(any(), eq(Option.of(realLockFile)));
}

@Test
void testUnlockBailsOutWhenLockReplacedDuringRetrySleep() {
// Race: the first expire attempt returns THROTTLED. While unlock() is sleeping outside
// the monitor, the original lock expires and a concurrent tryLock() acquires a fresh
// lock (currentLockObj is replaced from lock1 -> lock2). The retry MUST detect this
// and bail out — operating on lock2 would mark a lock that another caller now holds
// as expired.
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
StorageLockFile lock1 = new StorageLockFile(data, "v1");
StorageLockFile lock2 = new StorageLockFile(data, "v2");
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(lock1)));
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
assertTrue(lockProvider.tryLock());

when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true).thenReturn(false);
// Simulate the race in the same answer that returns THROTTLED: by the time the sleep
// ends, getLock() returns lock2 (the lock a concurrent tryLock() would have acquired).
when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lock1))))
.thenAnswer(inv -> {
doReturn(lock2).when(lockProvider).getLock();
return Pair.of(LockUpsertResult.THROTTLED, Option.empty());
});

// unlock() should return normally — the original lock is no longer the caller's
// responsibility, and we must not touch lock2.
lockProvider.unlock();

verify(mockLockService, times(1)).tryUpsertLockFile(any(), eq(Option.of(lock1)));
verify(mockLockService, never()).tryUpsertLockFile(any(), eq(Option.of(lock2)));
when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
}

@Test
void testUnlockThrowsExceptionWhenStillThrottledAfterRetry() {
// Both the initial attempt and the retry get THROTTLED — unlock should fail.
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
StorageLockFile realLockFile = new StorageLockFile(data, "v1");
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
.thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile)));
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
assertTrue(lockProvider.tryLock());

when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true).thenReturn(false);
when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(realLockFile))))
.thenReturn(Pair.of(LockUpsertResult.THROTTLED, Option.empty()));

HoodieLockException exception = assertThrows(HoodieLockException.class, () -> lockProvider.unlock());
assertTrue(exception.getMessage().contains("FAILED_TO_RELEASE"));
verify(mockLockService, times(2)).tryUpsertLockFile(any(), eq(Option.of(realLockFile)));
when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
}

@Test
void testCloseFailsToStopHeartbeat() {
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
Expand Down Expand Up @@ -466,6 +548,18 @@ void testRenewLockUnableToUpsertLockFileButNotFatal() {
assertTrue(lockProvider.renewLock());
}

@Test
void testRenewLockThrottledReturnsTrue() {
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
StorageLockFile lockFile = new StorageLockFile(data, "v1");
doReturn(lockFile).when(lockProvider).getLock();
when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile))))
.thenReturn(Pair.of(LockUpsertResult.THROTTLED, Option.empty()));
// Throttling is transient, renewLock should return true so the heartbeat retries later.
assertTrue(lockProvider.renewLock());
verify(mockLogger).warn("Owner {}: Unable to renew lock due to throttling, will retry on next heartbeat.", this.ownerId);
}

@Test
void testRenewLockUnableToUpsertLockFileFatal() {
StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public Pair<LockUpsertResult, Option<StorageLockFile>> tryUpsertLockFile(
return Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS, Option.empty());
} else if (e.getCode() == RATE_LIMIT_ERROR_CODE) {
logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}", ownerId, lockFilePath);
return Pair.of(LockUpsertResult.THROTTLED, Option.empty());
} else if (e.getCode() >= INTERNAL_SERVER_ERROR_CODE_MIN) {
logger.warn("OwnerId: {}, GCS returned internal server error code for lock file: {}",
ownerId, lockFilePath, e);
Expand Down
Loading
Loading