feat: Add Azure-based storage lock#17951
Conversation
|
|
@chrevanthreddy can we fix the PR description.. it ll fail validation. I approved the workflows to run now |
|
Changed title, added description here too. |
|
@vinothchandar @bhasudha I merged this PR and Alex PR together. Testing looks good to me. Let me know the next steps. |
|
@chrevanthreddy I will queue this up for review.. |
yihua
left a comment
There was a problem hiding this comment.
Thanks for contributing! I left a few inline comments around correctness and naming.
| public Option<String> readObject(String filePath, boolean checkExistsFirst) { | ||
| try { | ||
| AzureLocation location = parseAzureLocation(filePath); | ||
| AzureLocation lockLocation = parseAzureLocation(lockFileUri); | ||
| BlobServiceClient svc = location.blobEndpoint.equals(lockLocation.blobEndpoint) |
There was a problem hiding this comment.
Both readObject and writeObject call parseAzureLocation and potentially createDefaultBlobServiceClient() on every invocation. If these are called frequently (e.g., audit logging), this creates a new BlobServiceClient each time for non-matching endpoints. Have you considered caching the secondary service client?
| public Option<String> readObject(String filePath, boolean checkExistsFirst) { | ||
| try { | ||
| AzureLocation location = parseAzureLocation(filePath); | ||
| AzureLocation lockLocation = parseAzureLocation(lockFileUri); |
There was a problem hiding this comment.
The lock file URI has already been parsed in the constructor. Could we reuse that instead of reparsing?
| logger.error("OwnerId: {}, Unexpected error while writing lock file: {}", ownerId, lockFileUri, e); | ||
| return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()); |
There was a problem hiding this comment.
For other exceptions, like S3 and GCS-based implementation, it should let the exception throw without return.
| public boolean writeObject(String filePath, String content) { | ||
| try { | ||
| AzureLocation location = parseAzureLocation(filePath); | ||
| AzureLocation lockLocation = parseAzureLocation(lockFileUri); |
There was a problem hiding this comment.
Similarly, could we avoid this?
…h calls and retries
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #17951 +/- ##
============================================
+ Coverage 61.43% 68.84% +7.41%
- Complexity 23082 28333 +5251
============================================
Files 2108 2467 +359
Lines 127636 135839 +8203
Branches 14534 16480 +1946
============================================
+ Hits 78409 93521 +15112
+ Misses 42873 34914 -7959
- Partials 6354 7404 +1050
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
vinothchandar
left a comment
There was a problem hiding this comment.
Few questions on code, the security..
I am relying on @yihua to review the logic. Ethan - please lmk if you'd like me to take over
| private static Functions.Function1<AzureLocation, BlobServiceClient> createDefaultBlobServiceClient() { | ||
| return (location) -> { | ||
| Properties props = location.props; | ||
| BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); | ||
| configureAzureClientOptions(builder, props); | ||
|
|
||
| String connectionString = props == null ? null : props.getProperty(AZURE_CONNECTION_STRING); | ||
| if (connectionString != null && !connectionString.trim().isEmpty()) { | ||
| return builder.connectionString(connectionString).buildClient(); | ||
| } | ||
|
|
||
| builder.endpoint(location.blobEndpoint); | ||
| String sasToken = props == null ? null : props.getProperty(AZURE_SAS_TOKEN); | ||
| if (sasToken != null && !sasToken.trim().isEmpty()) { | ||
| String cleaned = sasToken.startsWith("?") ? sasToken.substring(1) : sasToken; | ||
| return builder.credential(new AzureSasCredential(cleaned)).buildClient(); | ||
| } | ||
|
|
||
| return builder.credential(new DefaultAzureCredentialBuilder().build()).buildClient(); |
There was a problem hiding this comment.
Should we use different credential builder here for typical azure production setups? (not very fam myself there).
Asking since IIRC HoodieAWSConfig supports role ARN, access key, secret key, session token etc..
| .key(AZURE_BASED_LOCK_PROPERTY_PREFIX + "sas.token") | ||
| .noDefaultValue() | ||
| .markAdvanced() | ||
| .withDocumentation("For Azure based lock provider, optional SAS token used for " |
There was a problem hiding this comment.
nit: document the minimum permissions required on the token. Read, Write, Create?
|
Hi @chrevanthreddy any update on addressing the comments in this PR? |
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for addressing the feedback! All three prior issues from my review are resolved: null ETag in readCurrentLockFile is now validated via the new canonicalizeEtag() helper, the TOCTOU-prone ADLSStorageLockClient is deleted, and DefaultAzureCredential uses lazy initialization. The ETag canonicalization logic correctly handles null, empty, quoted, unquoted, and malformed values — nice touch adding the quote normalization. The switch to BlobUrlParts.parse() for HTTP(S) URIs and getStringWithAltKeys for config lookups are clean improvements. One note: @vinothchandar's question about the parquet version downgrade in pom.xml is still open — please address that in the thread.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: This pull request introduces Azure Blob Storage and ADLS support for Hudi's distributed locking mechanism. It adds a new hudi-azure module with credential management, lock file operations via conditional writes, configuration properties, comprehensive tests, and a shaded bundle for packaging. Azure storage schemes are registered with the new lock implementation.
Greptile Summary: This PR adds Azure Blob Storage / ADLS Gen2 based distributed locking for Hudi tables, mirroring the approach taken for S3 and GCS. It introduces AzureStorageLockClient (using Azure Blob conditional writes with ETag-based If-Match / If-None-Match), AzureStorageLockConfig for auth/connection configuration, AzureCredentialFactory for credential resolution, and wires the abfs, abfss, wasb, wasbs URI schemes into StorageSchemes. The implementation is well-structured with comprehensive unit tests and an optional Azurite integration test.
Key changes:
- New
AzureStorageLockClientimplementingStorageLockClientwith conditional-write semantics (412 →ACQUIRED_BY_OTHERS, 409/429/5xx →UNKNOWN_ERROR) - Supports five auth modes: connection string, SAS token, managed identity, service principal,
DefaultAzureCredential - URI parsing for
abfs[s]://,wasb[s]://, andhttps://http://(Azurite) schemes StorageSchemesenum updated to register Azure lock client class forwasb,wasbs,abfs,abfss- One logic concern:
readCurrentLockFilecan throwHoodieLockExceptionfor null/malformed ETags, butStorageBasedLockProviderdoesn't catch that exception around its call — callers should be aware - One unused public constant:
AZURE_SAS_TOKENinAzureStorageLockClientduplicatesAzureStorageLockConfig.AZURE_SAS_TOKEN.key()and is not used internally
Greptile Confidence Score: 4/5
Safe to merge with one targeted fix recommended: readCurrentLockFile should handle HoodieLockException consistently
The implementation is solid and well-tested. The main concern is the exception-handling inconsistency in readCurrentLockFile — throwing HoodieLockException for null/malformed ETags instead of returning UNKNOWN_ERROR (as tryUpsertLockFile does), while the StorageBasedLockProvider caller has no catch around that call. In normal Azure operation this path is never hit, so it does not affect the happy path. The fix is straightforward and low-risk. The rest of the code (auth factory, URI parsing, conditional writes, caching) is correct and comprehensively tested.
hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java — specifically the readCurrentLockFile exception handling and the redundant AZURE_SAS_TOKEN constant
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant Client
participant AzureStorageLockClient
participant CredentialFactory
participant BlobServiceClient
participant AzureStorage["Azure Blob Storage"]
Client->>AzureStorageLockClient: tryUpsertLockFile(newLockData, previousLockFile)
AzureStorageLockClient->>CredentialFactory: getAzureCredential(props)
CredentialFactory->>CredentialFactory: resolve identity or service principal
CredentialFactory-->>AzureStorageLockClient: TokenCredential
AzureStorageLockClient->>BlobServiceClient: uploadBlockBlob(content, condition)
alt No prior lock
BlobServiceClient->>AzureStorage: PUT with If-None-Match: *
else Prior lock exists
BlobServiceClient->>AzureStorage: PUT with If-Match: <ETag>
end
alt Success
AzureStorage-->>BlobServiceClient: 201/200 + ETag
BlobServiceClient-->>AzureStorageLockClient: BlockBlobItem
AzureStorageLockClient-->>Client: (ACQUIRED, StorageLockFile)
else Precondition failed
AzureStorage-->>BlobServiceClient: 412
BlobServiceClient-->>AzureStorageLockClient: BlobStorageException
AzureStorageLockClient-->>Client: (ACQUIRED_BY_OTHERS, None)
end
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant Client
participant AzureStorageLockClient
participant BlobServiceClient
participant AzureStorage["Azure Blob Storage"]
Client->>AzureStorageLockClient: readCurrentLockFile()
AzureStorageLockClient->>BlobServiceClient: downloadStream(blobPath)
BlobServiceClient->>AzureStorage: GET blob
alt Blob exists
AzureStorage-->>BlobServiceClient: 200 + stream + ETag header
BlobServiceClient-->>AzureStorageLockClient: stream + headers
AzureStorageLockClient->>AzureStorageLockClient: normalize ETag
AzureStorageLockClient-->>Client: (SUCCESS, StorageLockFile)
else Blob not found
AzureStorage-->>BlobServiceClient: 404
BlobServiceClient-->>AzureStorageLockClient: BlobStorageException
AzureStorageLockClient-->>Client: (NOT_EXISTS, None)
end
Sequence Diagram (Greptile):
sequenceDiagram
participant P as StorageBasedLockProvider
participant C as AzureStorageLockClient
participant B as BlobClient (Azure SDK)
participant Z as Azure Blob Storage
Note over P,Z: Lock Acquisition (tryUpsertLockFile)
P->>C: tryUpsertLockFile(newLockData, previousLockFile)
C->>B: uploadWithResponse(BlobParallelUploadOptions, ...)
Note over C,B: Conditional headers:<br/>If-None-Match:* (create)<br/>If-Match: etag (update)
B->>Z: PUT blob with conditional header
alt 200 OK
Z-->>B: ETag in response
B-->>C: Response BlockBlobItem
C->>C: canonicalizeEtag(etag)
C-->>P: (SUCCESS, StorageLockFile)
else 412 Precondition Failed
Z-->>B: 412
B-->>C: BlobStorageException(412)
C-->>P: (ACQUIRED_BY_OTHERS, empty)
else 409/429/5xx
Z-->>B: error
B-->>C: BlobStorageException
C-->>P: (UNKNOWN_ERROR, empty)
end
Note over P,Z: Lock Read (readCurrentLockFile)
P->>C: readCurrentLockFile()
C->>B: downloadContentWithResponse(...)
B->>Z: GET blob
alt 200 OK
Z-->>B: blob content + ETag header
B-->>C: Response BinaryData
C->>C: canonicalizeEtag(etag)
Note over C: Throws HoodieLockException<br/>if ETag null/malformed
C-->>P: (SUCCESS, StorageLockFile)
else 404 Not Found
Z-->>B: 404
B-->>C: BlobStorageException(404)
C-->>P: (NOT_EXISTS, empty)
else 429/5xx
Z-->>B: error
B-->>C: BlobStorageException
C-->>P: (UNKNOWN_ERROR, empty)
end
CodeRabbit: yihua#45 (review)
Greptile: yihua#45 (review)
yihua
left a comment
There was a problem hiding this comment.
LGTM! Minor nits are non-blocking.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Nice cleanup — the only changes since last review are import reorganization and removal of the unused AZURE_SAS_TOKEN public constant in AzureStorageLockClient. No functional changes. All prior findings from my earlier review (null ETag handling, removal of ADLSStorageLockClient, lazy DefaultAzureCredential init) remain resolved. Open items from other reviewers (Vinoth's questions about parquet version bump, DefaultAzureCredential vs. production credential setup, throwing on UNKNOWN case at line 322, documenting minimum SAS permissions) are still pending responses in the thread — worth addressing before merge, but no new issues introduced by this patch.
Describe the issue this Pull Request addresses
Adds Azure Blob Storage-based distributed lock provider for Hudi tables on ADLS Gen2 and Azure Blob Storage, extending the existing S3 and GCS storage-based lock implementations (RFC-91).
Summary and Changelog
Add azure lock storage provider for ADLS. Extension to S3/GS object storage lock providers.
New module:
hudi-azureAzureStorageLockClient—StorageLockClientimplementation using Azure Blob conditional requests (ETag-based optimistic concurrency control)If-None-Match: *for lock creation (fail if blob already exists)If-Match: <etag>for lock renewal/expiry (fail if modified by another writer)AzureCredentialFactory— Credential resolution with precedence: connection string → SAS token → user-assigned managed identity → service principal →DefaultAzureCredential(lazy singleton via holder pattern)AzureStorageLockConfig—ConfigProperty-based configuration extendingHoodieConfig, withsinceVersion("1.2.0")on all new keys underhoodie.write.lock.azure.*abfs://,abfss://,wasb://,wasbs://,https://,http://dfs.core.windows.net→blob.core.windows.net) since lock operations use the Blob Storage REST APIBlobServiceClientcaching viaConcurrentHashMapfor secondary endpoints (audit file operations)StorageBasedLockProviderlevelcanonicalizeEtag) ensures consistent double-quoted format across read/write paths, with fail-fast on null/empty/malformed ETagsNew module:
packaging/hudi-azure-bundlehudi-azure, Azure SDK, Reactor, and Netty with relocations to avoid classpath conflictsNew dependencies
azure-storage-blobazure-identityModified:
StorageSchemesAzureStorageLockClientforwasb,wasbs,abfs,abfssschemesNew tests are added:
TestAzureStorageLockClient): lock create/update withIf-None-Match/If-Matchprecondition verification viaArgumentCaptor, ETag fallback fromBlockBlobItem, ETag canonicalization (null/empty/malformed), HTTP error code mapping (412→ACQUIRED_BY_OTHERS, 409/429/5xx→UNKNOWN_ERROR, 400→rethrown),BlobServiceClientcaching for secondary endpoints,readObject/writeObjectsuccess and failure paths, constructor validation for multiple URI schemesTestAzureStorageLockClientUriParsing):abfs://,abfss://,wasb://,wasbs://,https://,http://schemes with DFS-to-Blob host conversion, plus negative tests for missing scheme/authority/path, invalid formats, unsupported schemes, empty containers, and edge cases (single-segment paths, deep paths, special characters, hyphenated containers)ITAzureStorageLockClientAzurite): end-to-end create → read → wrong-ETag-update flow against Azurite (Azure Storage emulator) via Testcontainers. Run with-Pazure-integration-testsTestAzureStorageBasedLockProvider): extendsStorageBasedLockProviderTestBase(disabled by default, requires Azurite)TestStorageSchemes: updated assertions forabfs/wasblock provider registrationImpact
Adds new feature for azure to existing LP providers
StorageSchemesregistration and rootpom.xmlmodule additionsRisk Level
low — additive feature in a new module. Core lock protocol (
StorageBasedLockProvider) is unchanged.Documentation Update
New config keys documented via
ConfigProperty.withDocumentation():hoodie.write.lock.azure.connection.stringhoodie.write.lock.azure.sas.tokenhoodie.write.lock.azure.managed.identity.client.idhoodie.write.lock.azure.client.tenant.idhoodie.write.lock.azure.client.idhoodie.write.lock.azure.client.secretContributor's checklist