diff --git a/hudi-azure/pom.xml b/hudi-azure/pom.xml new file mode 100644 index 0000000000000..b0b6e9151b71e --- /dev/null +++ b/hudi-azure/pom.xml @@ -0,0 +1,240 @@ + + + + + hudi + org.apache.hudi + 1.2.0-SNAPSHOT + ../pom.xml + + + 4.0.0 + hudi-azure + jar + + + + 12.26.0 + 1.12.2 + + 2.13.5 + + + + + + org.apache.logging.log4j + log4j-1.2-api + + + + + org.projectlombok + lombok + + + + + org.apache.hudi + hudi-client-common + ${project.version} + + + org.apache.hudi + hudi-common + ${project.version} + + + + + org.apache.hadoop + hadoop-common + test + + + + + com.azure + azure-storage-blob + ${azure.storage.blob.version} + + + com.azure + azure-identity + ${azure.identity.version} + + + + + org.apache.hudi + hudi-tests-common + ${project.version} + test + + + org.apache.hudi + hudi-client-common + ${project.version} + tests + test + + + org.apache.hudi + hudi-hadoop-common + ${project.version} + test + test-jar + + + org.apache.hudi + hudi-common + ${project.version} + tests + test + + + org.mockito + mockito-core + test + + + + com.fasterxml.jackson.core + jackson-core + ${azure.jackson.version} + test + + + com.fasterxml.jackson.core + jackson-annotations + ${azure.jackson.version} + test + + + com.fasterxml.jackson.core + jackson-databind + ${azure.jackson.version} + test + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${azure.jackson.version} + test + + + com.fasterxml.jackson.dataformat + jackson-dataformat-xml + ${azure.jackson.version} + test + + + org.testcontainers + testcontainers + ${testcontainers.version} + test + + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + com.esotericsoftware + kryo-shaded + test + + + + + + azure-integration-tests + + + + + org.apache.maven.plugins + maven-surefire-plugin + + true + + + + org.apache.maven.plugins + maven-failsafe-plugin + ${maven-failsafe-plugin.version} + + + **/IT*.java + + + + + integration-test + + integration-test + + + + verify-integration-test + verify + + verify + + + + + + + + + + + + + src/main/resources + + + + + org.apache.rat + apache-rat-plugin + + + org.apache.maven.plugins + maven-jar-plugin + ${maven-jar-plugin.version} + + + + test-jar + + + + + + org.jacoco + jacoco-maven-plugin + + + + + diff --git a/hudi-azure/src/main/java/org/apache/hudi/azure/credentials/AzureCredentialFactory.java b/hudi-azure/src/main/java/org/apache/hudi/azure/credentials/AzureCredentialFactory.java new file mode 100644 index 0000000000000..147eae1fbc5af --- /dev/null +++ b/hudi-azure/src/main/java/org/apache/hudi/azure/credentials/AzureCredentialFactory.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.azure.credentials; + +import org.apache.hudi.config.AzureStorageLockConfig; + +import com.azure.core.credential.TokenCredential; +import com.azure.identity.ClientSecretCredentialBuilder; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.identity.ManagedIdentityCredentialBuilder; + +import java.util.Properties; + +/** + * Factory for resolving an Azure {@link TokenCredential} from Hudi properties. + * + *

Credential precedence: + *

    + *
  1. User-assigned managed identity ({@link AzureStorageLockConfig#AZURE_MANAGED_IDENTITY_CLIENT_ID}) + * — uses {@code ManagedIdentityCredential}
  2. + *
  3. Service principal ({@link AzureStorageLockConfig#AZURE_CLIENT_TENANT_ID} + + * {@link AzureStorageLockConfig#AZURE_CLIENT_ID} + + * {@link AzureStorageLockConfig#AZURE_CLIENT_SECRET}) + * — uses {@code ClientSecretCredential}
  4. + *
  5. {@code DefaultAzureCredential} — (system-assigned MI, + * workload identity, env-var SP, Azure CLI, etc.); suitable for dev and environments + * where auth is controlled externally
  6. + *
+ * + *

Note: connection string and SAS token auth are not {@link TokenCredential}-based and are + * handled directly by the caller before this factory is consulted. + */ +public class AzureCredentialFactory { + + /** + * Lazily initializes {@code DefaultAzureCredential} on first use of the default chain only. + */ + private static final class DefaultAzureCredentialHolder { + static final TokenCredential INSTANCE = new DefaultAzureCredentialBuilder().build(); + } + + private AzureCredentialFactory() { + } + + /** + * Returns a {@link TokenCredential} resolved from the supplied properties. + * + * @param props Hudi lock properties + * @return resolved credential, never {@code null} + */ + public static TokenCredential getAzureCredential(Properties props) { + if (props != null) { + String miClientId = props.getProperty(AzureStorageLockConfig.AZURE_MANAGED_IDENTITY_CLIENT_ID.key()); + if (miClientId != null && !miClientId.trim().isEmpty()) { + return new ManagedIdentityCredentialBuilder() + .clientId(miClientId) + .build(); + } + + String tenantId = props.getProperty(AzureStorageLockConfig.AZURE_CLIENT_TENANT_ID.key()); + String clientId = props.getProperty(AzureStorageLockConfig.AZURE_CLIENT_ID.key()); + String clientSecret = props.getProperty(AzureStorageLockConfig.AZURE_CLIENT_SECRET.key()); + if (tenantId != null && !tenantId.trim().isEmpty() + && clientId != null && !clientId.trim().isEmpty() + && clientSecret != null && !clientSecret.trim().isEmpty()) { + return new ClientSecretCredentialBuilder() + .tenantId(tenantId) + .clientId(clientId) + .clientSecret(clientSecret) + .build(); + } + } + + return DefaultAzureCredentialHolder.INSTANCE; + } +} diff --git a/hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java b/hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java new file mode 100644 index 0000000000000..09cedf6b1faaa --- /dev/null +++ b/hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.azure.transaction.lock; + +import org.apache.hudi.azure.credentials.AzureCredentialFactory; +import org.apache.hudi.client.transaction.lock.StorageLockClient; +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.client.transaction.lock.models.LockUpsertResult; +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Functions; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.AzureStorageLockConfig; +import org.apache.hudi.config.StorageBasedLockConfig; +import org.apache.hudi.exception.HoodieLockException; + +import com.azure.core.credential.AzureSasCredential; +import com.azure.core.exception.HttpResponseException; +import com.azure.core.http.policy.ExponentialBackoffOptions; +import com.azure.core.http.policy.RetryOptions; +import com.azure.core.http.rest.Response; +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.core.util.HttpClientOptions; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.BlobUrlParts; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobRequestConditions; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.BlockBlobItem; +import com.azure.storage.blob.options.BlobParallelUploadOptions; +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; + +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; + +/** + * Azure Storage implementation of {@link StorageLockClient} using Azure Blob conditional requests. + * + *

Supports the following URI schemes: + *

+ * + * + * + * + *

Expected lock URI formats: + *

+ * + *

Authentication precedence (via {@link Properties}): + *

    + *
  1. {@link AzureStorageLockConfig#AZURE_CONNECTION_STRING} — connection string (includes shared key)
  2. + *
  3. {@link AzureStorageLockConfig#AZURE_SAS_TOKEN} — shared access signature
  4. + *
  5. {@link AzureStorageLockConfig#AZURE_MANAGED_IDENTITY_CLIENT_ID} — user-assigned managed identity + * via {@code ManagedIdentityCredential}
  6. + *
  7. {@link AzureStorageLockConfig#AZURE_CLIENT_TENANT_ID} + + * {@link AzureStorageLockConfig#AZURE_CLIENT_ID} + + * {@link AzureStorageLockConfig#AZURE_CLIENT_SECRET} — service principal + * via {@code ClientSecretCredential}
  8. + *
  9. {@code DefaultAzureCredential} — probing chain; see {@link org.apache.hudi.azure.credentials.AzureCredentialFactory}
  10. + *
+ */ +@ThreadSafe +public class AzureStorageLockClient implements StorageLockClient { + + private static final int PRECONDITION_FAILURE_ERROR_CODE = 412; + private static final int NOT_FOUND_ERROR_CODE = 404; + private static final int CONFLICT_ERROR_CODE = 409; + private static final int RATE_LIMIT_ERROR_CODE = 429; + private static final int INTERNAL_SERVER_ERROR_CODE_MIN = 500; + + private final Logger logger; + private final BlobServiceClient blobServiceClient; + private final Functions.Function1 blobServiceClientSupplier; + private final ConcurrentMap secondaryBlobServiceClients; + private final BlobClient lockBlobClient; + private final Properties clientProperties; + private final String ownerId; + private final String lockFileUri; + private final String lockBlobEndpoint; + + /** + * Constructor used by reflection by {@link org.apache.hudi.client.transaction.lock.StorageBasedLockProvider}. + * + * @param ownerId lock owner id + * @param lockFileUri lock file URI (abfs/abfss/wasb/wasbs) + * @param props properties used to customize/authenticate the Azure client + */ + public AzureStorageLockClient(String ownerId, String lockFileUri, Properties props) { + this(ownerId, lockFileUri, props, createDefaultBlobServiceClient(), LoggerFactory.getLogger(AzureStorageLockClient.class)); + } + + @VisibleForTesting + AzureStorageLockClient( + String ownerId, + String lockFileUri, + Properties props, + Functions.Function1 blobServiceClientSupplier, + Logger logger) { + this.ownerId = ownerId; + this.lockFileUri = lockFileUri; + this.logger = logger; + this.clientProperties = props; + this.blobServiceClientSupplier = blobServiceClientSupplier; + this.secondaryBlobServiceClients = new ConcurrentHashMap<>(); + + AzureLocation location = parseAzureLocation(lockFileUri).withProperties(props); + this.lockBlobEndpoint = location.blobEndpoint; + this.blobServiceClient = blobServiceClientSupplier.apply(location); + BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(location.container); + this.lockBlobClient = containerClient.getBlobClient(location.blobPath); + } + + private static Functions.Function1 createDefaultBlobServiceClient() { + return (location) -> { + Properties props = location.props; + BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); + configureAzureClientOptions(builder, props); + + // 1. Connection string (includes shared-key auth). + String connectionString = getStringWithAltKeys(props, AzureStorageLockConfig.AZURE_CONNECTION_STRING, true); + if (connectionString != null && !connectionString.trim().isEmpty()) { + return builder.connectionString(connectionString).buildClient(); + } + + builder.endpoint(location.blobEndpoint); + + // 2. SAS token. + String sasToken = getStringWithAltKeys(props, AzureStorageLockConfig.AZURE_SAS_TOKEN, true); + if (sasToken != null && !sasToken.trim().isEmpty()) { + String cleaned = sasToken.startsWith("?") ? sasToken.substring(1) : sasToken; + return builder.credential(new AzureSasCredential(cleaned)).buildClient(); + } + + // 3. TokenCredential — MI, service principal, or DefaultAzureCredential fallback. + return builder.credential(AzureCredentialFactory.getAzureCredential(props)).buildClient(); + }; + } + + private static void configureAzureClientOptions(BlobServiceClientBuilder builder, Properties props) { + // Set Azure SDK timeouts based on lock validity to avoid long-hanging calls. + TypedProperties typedProps = new TypedProperties(); + if (props != null) { + typedProps.putAll(props); + } + long validityTimeoutSecs; + try { + validityTimeoutSecs = getLongWithAltKeys(typedProps, StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS); + } catch (NumberFormatException e) { + validityTimeoutSecs = StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.defaultValue(); + } + long azureCallTimeoutSecs = Math.max(1, validityTimeoutSecs / 5); + + // Disable automatic SDK retries; Hudi manages retries at the lock-provider level. + ExponentialBackoffOptions exponentialOptions = new ExponentialBackoffOptions().setMaxRetries(0); + RetryOptions retryOptions = new RetryOptions(exponentialOptions); + + HttpClientOptions clientOptions = new HttpClientOptions() + .setResponseTimeout(Duration.ofSeconds(azureCallTimeoutSecs)) + .setReadTimeout(Duration.ofSeconds(azureCallTimeoutSecs)); + + builder.retryOptions(retryOptions).clientOptions(clientOptions); + } + + @Override + public Pair> tryUpsertLockFile( + StorageLockData newLockData, + Option previousLockFile) { + String expectedEtag = previousLockFile.isPresent() + ? previousLockFile.get().getVersionId() + : null; + try { + StorageLockFile updated = createOrUpdateLockFileInternal(newLockData, expectedEtag); + return Pair.of(LockUpsertResult.SUCCESS, Option.of(updated)); + } catch (BlobStorageException e) { + return Pair.of(handleUpsertBlobStorageException(e), Option.empty()); + } catch (HttpResponseException e) { + logger.error("OwnerId: {}, Unexpected Azure SDK error while writing lock file: {}", + ownerId, lockFileUri, e); + if (!previousLockFile.isPresent()) { + // For create, fail fast since this indicates a larger issue. + throw e; + } + return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()); + } catch (Exception e) { + logger.error("OwnerId: {}, Unexpected error while writing lock file: {}", ownerId, lockFileUri, e); + return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()); + } + } + + @Override + public Pair> readCurrentLockFile() { + try { + Response response = lockBlobClient.downloadContentWithResponse(null, null, null, Context.NONE); + //Check for null or empty ETag and inconsistent quotes + String eTag = canonicalizeEtag( + response.getHeaders() != null ? response.getHeaders().getValue("ETag") : null, + "download"); + StorageLockFile lockFile = StorageLockFile.createFromStream(response.getValue().toStream(), eTag); + return Pair.of(LockGetResult.SUCCESS, Option.of(lockFile)); + } catch (BlobStorageException e) { + return Pair.of(handleGetStorageException(e), Option.empty()); + } + } + + private LockGetResult handleGetStorageException(BlobStorageException e) { + int code = e.getStatusCode(); + if (code == NOT_FOUND_ERROR_CODE || e.getErrorCode() == BlobErrorCode.BLOB_NOT_FOUND) { + logger.info("OwnerId: {}, Object not found in the path: {}", ownerId, lockFileUri); + return LockGetResult.NOT_EXISTS; + } else if (code == RATE_LIMIT_ERROR_CODE) { + logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}", ownerId, lockFileUri); + } else if (code >= INTERNAL_SERVER_ERROR_CODE_MIN) { + logger.warn("OwnerId: {}, Azure returned internal server error code for lock file: {}", ownerId, lockFileUri, e); + } else { + throw e; + } + return LockGetResult.UNKNOWN_ERROR; + } + + private StorageLockFile createOrUpdateLockFileInternal(StorageLockData lockData, String expectedEtag) { + byte[] bytes = StorageLockFile.toByteArray(lockData); + BlobRequestConditions conditions = new BlobRequestConditions(); + if (expectedEtag == null) { + conditions.setIfNoneMatch("*"); + } else { + conditions.setIfMatch(expectedEtag); + } + + BlobParallelUploadOptions options = new BlobParallelUploadOptions(BinaryData.fromBytes(bytes)) + .setRequestConditions(conditions); + Response response = lockBlobClient.uploadWithResponse(options, null, Context.NONE); + String newEtag = response.getHeaders() != null ? response.getHeaders().getValue("ETag") : null; + if (newEtag == null && response.getValue() != null) { + newEtag = response.getValue().getETag(); + } + //Check for null or empty ETag and inconsistent quotes + newEtag = canonicalizeEtag(newEtag, "upload"); + return new StorageLockFile(lockData, newEtag); + } + + private String canonicalizeEtag(String eTag, String operation) { + if (eTag == null) { + throw new HoodieLockException("Missing ETag in Azure " + operation + " response for lock file: " + lockFileUri); + } + + String normalized = eTag.trim(); + if (normalized.isEmpty()) { + throw new HoodieLockException("Missing ETag in Azure " + operation + " response for lock file: " + lockFileUri); + } + + boolean startsWithQuote = normalized.startsWith("\""); + boolean endsWithQuote = normalized.endsWith("\""); + if (startsWithQuote && endsWithQuote) { + return normalized; + } + if (!startsWithQuote && !endsWithQuote) { + return "\"" + normalized + "\""; + } + + throw new HoodieLockException("Malformed ETag in Azure " + operation + " response for lock file: " + lockFileUri); + } + + private LockUpsertResult handleUpsertBlobStorageException(BlobStorageException e) { + int code = e.getStatusCode(); + if (code == PRECONDITION_FAILURE_ERROR_CODE || e.getErrorCode() == BlobErrorCode.CONDITION_NOT_MET) { + logger.info("OwnerId: {}, Unable to write new lock file. Another process has modified this lockfile {} already.", + ownerId, lockFileUri); + return LockUpsertResult.ACQUIRED_BY_OTHERS; + } else if (code == CONFLICT_ERROR_CODE) { + logger.info("OwnerId: {}, Retriable conditional request conflict error: {}", ownerId, lockFileUri); + } else if (code == RATE_LIMIT_ERROR_CODE) { + logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}", ownerId, lockFileUri); + } else if (code >= INTERNAL_SERVER_ERROR_CODE_MIN) { + logger.warn("OwnerId: {}, Azure returned internal server error code for lock file: {}", ownerId, lockFileUri, e); + } else { + logger.warn("OwnerId: {}, Error writing lock file: {}", ownerId, lockFileUri, e); + } + return LockUpsertResult.UNKNOWN_ERROR; + } + + @Override + public Option readObject(String filePath, boolean checkExistsFirst) { + try { + AzureLocation location = parseAzureLocation(filePath); + BlobServiceClient svc = getBlobServiceClient(location); + BlobClient blobClient = svc.getBlobContainerClient(location.container).getBlobClient(location.blobPath); + + if (checkExistsFirst && !blobClient.exists()) { + logger.debug("JSON config file not found: {}", filePath); + return Option.empty(); + } + byte[] bytes = blobClient.downloadContent().toBytes(); + return Option.of(new String(bytes, UTF_8)); + } catch (BlobStorageException e) { + if (e.getStatusCode() == NOT_FOUND_ERROR_CODE) { + logger.debug("JSON config file not found: {}", filePath); + } else { + logger.warn("Error reading JSON config file: {}", filePath, e); + } + return Option.empty(); + } catch (Exception e) { + logger.warn("Error reading JSON config file: {}", filePath, e); + return Option.empty(); + } + } + + @Override + public boolean writeObject(String filePath, String content) { + try { + AzureLocation location = parseAzureLocation(filePath); + BlobServiceClient svc = getBlobServiceClient(location); + BlobClient blobClient = svc.getBlobContainerClient(location.container).getBlobClient(location.blobPath); + blobClient.upload(BinaryData.fromString(content), true); + logger.debug("Successfully wrote object to: {}", filePath); + return true; + } catch (Exception e) { + logger.error("Error writing object to: {}", filePath, e); + return false; + } + } + + private BlobServiceClient getBlobServiceClient(AzureLocation location) { + if (location.blobEndpoint.equals(lockBlobEndpoint)) { + return blobServiceClient; + } + return secondaryBlobServiceClients.computeIfAbsent( + location.blobEndpoint, + endpoint -> blobServiceClientSupplier.apply(location.withProperties(clientProperties))); + } + + @Override + public void close() { + // BlobServiceClient does not require explicit close. No-op. + } + + @VisibleForTesting + static AzureLocation parseAzureLocation(String uriString) { + try { + URI uri = new URI(uriString); + String scheme = uri.getScheme(); + if (scheme == null) { + throw new IllegalArgumentException("URI does not contain a valid scheme."); + } + + String authority = uri.getAuthority(); + String path = uri.getPath() == null ? "" : uri.getPath().replaceFirst("/", ""); + + // ADLS Gen2: abfs[s]://@.dfs.core.windows.net/ + if ("abfs".equalsIgnoreCase(scheme) || "abfss".equalsIgnoreCase(scheme)) { + if (authority == null || !authority.contains("@")) { + throw new IllegalArgumentException("ABFS URI authority must be in the form '@': " + uriString); + } + String[] parts = authority.split("@", 2); + String container = parts[0]; + String host = parts[1]; + String endpointHost = dfsHostToBlobHost(host); + String endpoint = "https://" + endpointHost; + if (container.isEmpty() || path.isEmpty()) { + throw new IllegalArgumentException("ABFS URI must contain container and path: " + uriString); + } + return new AzureLocation(endpoint, container, path, null); + } + + // Azure Blob Storage: wasb[s]://@.blob.core.windows.net/ + if ("wasb".equalsIgnoreCase(scheme) || "wasbs".equalsIgnoreCase(scheme)) { + if (authority == null || !authority.contains("@")) { + throw new IllegalArgumentException("WASB URI authority must be in the form '@': " + uriString); + } + String[] parts = authority.split("@", 2); + String container = parts[0]; + String host = parts[1]; + String endpoint = "https://" + host; + if (container.isEmpty() || path.isEmpty()) { + throw new IllegalArgumentException("WASB URI must contain container and path: " + uriString); + } + return new AzureLocation(endpoint, container, path, null); + } + + // Direct HTTP(S) blob URL. + if ("https".equalsIgnoreCase(scheme) || "http".equalsIgnoreCase(scheme)) { + BlobUrlParts parts = BlobUrlParts.parse(uriString); + String container = parts.getBlobContainerName(); + String blobPath = parts.getBlobName(); + if (container == null || container.isEmpty() || blobPath == null || blobPath.isEmpty()) { + throw new IllegalArgumentException("HTTP(S) URI must contain container and path: " + uriString); + } + return new AzureLocation(parts.getScheme() + "://" + parts.getHost(), container, blobPath, null); + } + + throw new IllegalArgumentException("Unsupported scheme for Azure storage lock: " + scheme + + ". Supported schemes: abfs, abfss, wasb, wasbs, https, http"); + } catch (URISyntaxException e) { + throw new HoodieLockException("Failed to parse Azure URI: " + uriString, e); + } + } + + private static String dfsHostToBlobHost(String host) { + if (host == null) { + return null; + } + if (host.endsWith(".dfs.core.windows.net")) { + return host.replace(".dfs.core.windows.net", ".blob.core.windows.net"); + } + return host; + } + + @VisibleForTesting + @Getter + @AllArgsConstructor + static final class AzureLocation { + final String blobEndpoint; + final String container; + final String blobPath; + final Properties props; + + AzureLocation withProperties(Properties props) { + return new AzureLocation(blobEndpoint, container, blobPath, props); + } + } +} diff --git a/hudi-azure/src/main/java/org/apache/hudi/config/AzureStorageLockConfig.java b/hudi-azure/src/main/java/org/apache/hudi/config/AzureStorageLockConfig.java new file mode 100644 index 0000000000000..4ab42d3d22b66 --- /dev/null +++ b/hudi-azure/src/main/java/org/apache/hudi/config/AzureStorageLockConfig.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.LockConfiguration; + +/** + * Hoodie Configs for Azure based storage locks. + */ +@ConfigClassProperty(name = "Azure based Locks Configurations", + groupName = ConfigGroups.Names.WRITE_CLIENT, + subGroupName = ConfigGroups.SubGroupNames.LOCK, + description = "Configs that control Azure Blob/ADLS based locking mechanisms " + + "required for concurrency control between writers to a Hudi table.") +public class AzureStorageLockConfig extends HoodieConfig { + + private static final String AZURE_BASED_LOCK_PROPERTY_PREFIX = LockConfiguration.LOCK_PREFIX + "azure."; + + public static final ConfigProperty AZURE_CONNECTION_STRING = ConfigProperty + .key(AZURE_BASED_LOCK_PROPERTY_PREFIX + "connection.string") + .noDefaultValue() + .markAdvanced() + .sinceVersion("1.2.0") + .withDocumentation("For Azure based lock provider, optional Azure Storage connection string used " + + "for authenticating BlobServiceClient."); + + public static final ConfigProperty AZURE_SAS_TOKEN = ConfigProperty + .key(AZURE_BASED_LOCK_PROPERTY_PREFIX + "sas.token") + .noDefaultValue() + .markAdvanced() + .sinceVersion("1.2.0") + .withDocumentation("For Azure based lock provider, optional SAS token used for " + + "authenticating BlobServiceClient when connection string is not provided. SAS token is not recommended for production use by Azure."); + + public static final ConfigProperty AZURE_MANAGED_IDENTITY_CLIENT_ID = ConfigProperty + .key(AZURE_BASED_LOCK_PROPERTY_PREFIX + "managed.identity.client.id") + .noDefaultValue() + .markAdvanced() + .sinceVersion("1.2.0") + .withDocumentation("For Azure based lock provider, client ID of a user-assigned managed identity to authenticate " + + "BlobServiceClient with ManagedIdentityCredential."); + + public static final ConfigProperty AZURE_CLIENT_TENANT_ID = ConfigProperty + .key(AZURE_BASED_LOCK_PROPERTY_PREFIX + "client.tenant.id") + .noDefaultValue() + .markAdvanced() + .sinceVersion("1.2.0") + .withDocumentation("For Azure based lock provider, Azure AD tenant ID used together with " + + "'" + AZURE_BASED_LOCK_PROPERTY_PREFIX + "client.id' and " + + "'" + AZURE_BASED_LOCK_PROPERTY_PREFIX + "client.secret' to authenticate via service principal " + + "(ClientSecretCredential). All three must be set for this auth mode to activate."); + + public static final ConfigProperty AZURE_CLIENT_ID = ConfigProperty + .key(AZURE_BASED_LOCK_PROPERTY_PREFIX + "client.id") + .noDefaultValue() + .markAdvanced() + .sinceVersion("1.2.0") + .withDocumentation("For Azure based lock provider, Azure AD application (client) ID used together with " + + "'" + AZURE_BASED_LOCK_PROPERTY_PREFIX + "client.tenant.id' and " + + "'" + AZURE_BASED_LOCK_PROPERTY_PREFIX + "client.secret' to authenticate via service principal " + + "(ClientSecretCredential). All three must be set for this auth mode to activate."); + + public static final ConfigProperty AZURE_CLIENT_SECRET = ConfigProperty + .key(AZURE_BASED_LOCK_PROPERTY_PREFIX + "client.secret") + .noDefaultValue() + .markAdvanced() + .sinceVersion("1.2.0") + .withDocumentation("For Azure based lock provider, Azure AD client secret used together with " + + "'" + AZURE_BASED_LOCK_PROPERTY_PREFIX + "client.tenant.id' and " + + "'" + AZURE_BASED_LOCK_PROPERTY_PREFIX + "client.id' to authenticate via service principal " + + "(ClientSecretCredential). All three must be set for this auth mode to activate."); +} diff --git a/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/ITAzureStorageLockClientAzurite.java b/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/ITAzureStorageLockClientAzurite.java new file mode 100644 index 0000000000000..a7897a6ac3d7d --- /dev/null +++ b/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/ITAzureStorageLockClientAzurite.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.azure.transaction.lock; + +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.client.transaction.lock.models.LockUpsertResult; +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.AzureStorageLockConfig; + +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration tests for {@link AzureStorageLockClient} using Azurite (Azure Storage emulator). + * + *

Run with: {@code mvn -Pazure-integration-tests -pl hudi-azure verify} + */ +@Testcontainers(disabledWithoutDocker = true) +@DisabledIfEnvironmentVariable(named = "SKIP_AZURITE_IT", matches = "true") +public class ITAzureStorageLockClientAzurite { + + private static final DockerImageName AZURITE_IMAGE = + DockerImageName.parse("mcr.microsoft.com/azure-storage/azurite"); + + // Standard Azurite defaults (documented by Microsoft) + private static final String ACCOUNT_NAME = "devstoreaccount1"; + // Standard Azurite dev account key (NOT a secret; used by the emulator by default) + // See: https://learn.microsoft.com/azure/storage/common/storage-use-azurite?tabs=visual-studio#connection-strings + private static final String ACCOUNT_KEY = + "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="; + + @Container + public static final GenericContainer AZURITE = + new GenericContainer<>(AZURITE_IMAGE) + .withExposedPorts(10000) + .withCommand("azurite-blob", "--blobHost", "0.0.0.0", "--blobPort", "10000", "--loose") + .waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofSeconds(60))); + + private static String blobEndpoint() { + // Azurite expects / in the endpoint URL path + return "http://" + AZURITE.getHost() + ":" + AZURITE.getMappedPort(10000) + "/" + ACCOUNT_NAME; + } + + private static String connectionString() { + String key = System.getenv("AZURITE_ACCOUNT_KEY"); + if (key == null || key.trim().isEmpty()) { + key = ACCOUNT_KEY; + } + return "DefaultEndpointsProtocol=http;" + + "AccountName=" + ACCOUNT_NAME + ";" + + "AccountKey=" + key + ";" + + "BlobEndpoint=" + blobEndpoint() + ";"; + } + + @Test + void testCreateUpdateAndReadLockFileWithAzurite() { + String container = "container"; + String blobPath = "locks/table_lock.json"; + + BlobServiceClient svc = new BlobServiceClientBuilder() + .connectionString(connectionString()) + .buildClient(); + BlobContainerClient containerClient = svc.getBlobContainerClient(container); + containerClient.createIfNotExists(); + + // NOTE: lockFileUri only needs to be parseable for container/blobPath extraction. + // The actual endpoint comes from the connection string. + String lockFileUri = "https://localhost:10000/" + ACCOUNT_NAME + "/" + container + "/" + blobPath; + Properties props = new Properties(); + props.setProperty(AzureStorageLockConfig.AZURE_CONNECTION_STRING.key(), connectionString()); + + AzureStorageLockClient owner1 = new AzureStorageLockClient("owner1", lockFileUri, props); + StorageLockData lockData1 = new StorageLockData(false, System.currentTimeMillis() + 60_000, "owner1"); + Pair> upsert1 = owner1.tryUpsertLockFile(lockData1, Option.empty()); + assertEquals(LockUpsertResult.SUCCESS, upsert1.getLeft()); + assertTrue(upsert1.getRight().isPresent()); + + // Read back + Pair> read = owner1.readCurrentLockFile(); + assertEquals(LockGetResult.SUCCESS, read.getLeft()); + assertTrue(read.getRight().isPresent()); + + // Wrong ETag should fail with precondition and be mapped to ACQUIRED_BY_OTHERS + AzureStorageLockClient owner2 = new AzureStorageLockClient("owner2", lockFileUri, props); + StorageLockFile wrongPrev = new StorageLockFile(lockData1, "\"etag-does-not-match\""); + StorageLockData lockData2 = new StorageLockData(false, System.currentTimeMillis() + 60_000, "owner2"); + Pair> upsert2 = + owner2.tryUpsertLockFile(lockData2, Option.of(wrongPrev)); + assertEquals(LockUpsertResult.ACQUIRED_BY_OTHERS, upsert2.getLeft()); + } +} diff --git a/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageBasedLockProvider.java b/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageBasedLockProvider.java new file mode 100644 index 0000000000000..ecc1ee6c7c022 --- /dev/null +++ b/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageBasedLockProvider.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.azure.transaction.lock; + +import org.apache.hudi.client.transaction.lock.StorageBasedLockProvider; +import org.apache.hudi.client.transaction.lock.StorageBasedLockProviderTestBase; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.testutils.HoodieTestUtils; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; + +import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH; + +@Disabled("Requires Azurite/Testcontainers-based integration environment (not enabled by default).") +public class TestAzureStorageBasedLockProvider extends StorageBasedLockProviderTestBase { + + @BeforeEach + void setupLockProvider() { + providerProperties.put(BASE_PATH.key(), + "abfs://container@account.dfs.core.windows.net/lake/db/tbl-default"); + lockProvider = createLockProvider(); + } + + @Override + protected StorageBasedLockProvider createLockProvider() { + LockConfiguration lockConf = new LockConfiguration(providerProperties); + return new StorageBasedLockProvider(lockConf, HoodieTestUtils.getDefaultStorageConf()); + } +} diff --git a/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClient.java b/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClient.java new file mode 100644 index 0000000000000..801d20b91872b --- /dev/null +++ b/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClient.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.azure.transaction.lock; + +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.client.transaction.lock.models.LockUpsertResult; +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; + +import com.azure.core.http.HttpHeaders; +import com.azure.core.http.rest.Response; +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.models.BlobDownloadContentResponse; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.BlockBlobItem; +import com.azure.storage.blob.options.BlobParallelUploadOptions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.Logger; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.contains; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class TestAzureStorageLockClient { + + private static final String OWNER_ID = "ownerId"; + private static final String LOCK_FILE_URI = + "abfs://container@account.dfs.core.windows.net/lockFilePath"; + private static final String LOCK_FILE_URI_WITH_NESTED_PATH = + "abfs://container@account.dfs.core.windows.net/lake/db/tbl-default/.hoodie/.locks/table_lock.json"; + + @Mock + private BlobServiceClient mockBlobServiceClient; + + @Mock + private BlobContainerClient mockContainerClient; + + @Mock + private BlobClient mockBlobClient; + + @Mock + private Logger mockLogger; + + private AzureStorageLockClient lockClient; + + @BeforeEach + void setUp() { + setUp(LOCK_FILE_URI); + } + + private void setUp(String lockFileUri) { + when(mockBlobServiceClient.getBlobContainerClient(eq("container"))).thenReturn(mockContainerClient); + String expectedBlobPath = lockFileUri.replaceFirst("^abfss?://[^/]+/", ""); + when(mockContainerClient.getBlobClient(eq(expectedBlobPath))).thenReturn(mockBlobClient); + + lockClient = new AzureStorageLockClient( + OWNER_ID, + lockFileUri, + new Properties(), + (location) -> mockBlobServiceClient, + mockLogger); + } + + @Test + void testTryUpsertLockFile_noPreviousLock_success_setsIfNoneMatchStar() throws Exception { + StorageLockData lockData = new StorageLockData(false, 123L, "test-owner"); + @SuppressWarnings("unchecked") + Response response = (Response) mock(Response.class); + when(response.getHeaders()).thenReturn(new HttpHeaders().set("ETag", "\"etag-1\"")); + when(mockBlobClient.uploadWithResponse(any(BlobParallelUploadOptions.class), isNull(), eq(Context.NONE))) + .thenReturn(response); + + ArgumentCaptor optionsCaptor = ArgumentCaptor.forClass(BlobParallelUploadOptions.class); + + Pair> result = lockClient.tryUpsertLockFile(lockData, Option.empty()); + + assertEquals(LockUpsertResult.SUCCESS, result.getLeft()); + assertTrue(result.getRight().isPresent()); + assertEquals("\"etag-1\"", result.getRight().get().getVersionId()); + + verify(mockBlobClient).uploadWithResponse(optionsCaptor.capture(), isNull(), eq(Context.NONE)); + BlobParallelUploadOptions options = optionsCaptor.getValue(); + assertRequestCondition(options, "ifNoneMatch", "*"); + verifyNoMoreInteractions(mockLogger); + } + + @Test + void testTryUpsertLockFile_withPreviousLock_success_setsIfMatch() throws Exception { + StorageLockData lockData = new StorageLockData(false, 999L, "existing-owner"); + StorageLockFile previousLockFile = new StorageLockFile(lockData, "\"etag-prev\""); + + @SuppressWarnings("unchecked") + Response response = (Response) mock(Response.class); + when(response.getHeaders()).thenReturn(new HttpHeaders().set("ETag", "\"etag-new\"")); + when(mockBlobClient.uploadWithResponse(any(BlobParallelUploadOptions.class), isNull(), eq(Context.NONE))) + .thenReturn(response); + + ArgumentCaptor optionsCaptor = ArgumentCaptor.forClass(BlobParallelUploadOptions.class); + + Pair> result = + lockClient.tryUpsertLockFile(lockData, Option.of(previousLockFile)); + + assertEquals(LockUpsertResult.SUCCESS, result.getLeft()); + assertTrue(result.getRight().isPresent()); + assertEquals("\"etag-new\"", result.getRight().get().getVersionId()); + + verify(mockBlobClient).uploadWithResponse(optionsCaptor.capture(), isNull(), eq(Context.NONE)); + BlobParallelUploadOptions options = optionsCaptor.getValue(); + assertRequestCondition(options, "ifMatch", "\"etag-prev\""); + } + + @Test + void testTryUpsertLockFile_fallsBackToBlockBlobItemEtag() { + StorageLockData lockData = new StorageLockData(false, 123L, "test-owner"); + @SuppressWarnings("unchecked") + Response response = (Response) mock(Response.class); + BlockBlobItem blockBlobItem = mock(BlockBlobItem.class); + when(response.getHeaders()).thenReturn(null); + when(response.getValue()).thenReturn(blockBlobItem); + when(blockBlobItem.getETag()).thenReturn("0x8DABC123"); + when(mockBlobClient.uploadWithResponse(any(BlobParallelUploadOptions.class), isNull(), eq(Context.NONE))) + .thenReturn(response); + + Pair> result = lockClient.tryUpsertLockFile(lockData, Option.empty()); + + assertEquals(LockUpsertResult.SUCCESS, result.getLeft()); + assertTrue(result.getRight().isPresent()); + assertEquals("\"0x8DABC123\"", result.getRight().get().getVersionId()); + } + + @Test + void testTryUpsertLockFile_preconditionFailed_returnsAcquiredByOthers() { + StorageLockData lockData = new StorageLockData(false, 999L, "owner"); + BlobStorageException ex = mock(BlobStorageException.class); + when(ex.getStatusCode()).thenReturn(412); + when(mockBlobClient.uploadWithResponse(any(BlobParallelUploadOptions.class), isNull(), eq(Context.NONE))).thenThrow(ex); + + Pair> result = lockClient.tryUpsertLockFile(lockData, Option.empty()); + + assertEquals(LockUpsertResult.ACQUIRED_BY_OTHERS, result.getLeft()); + assertTrue(result.getRight().isEmpty()); + verify(mockLogger).info( + contains("Unable to write new lock file. Another process has modified this lockfile"), + eq(OWNER_ID), + eq(LOCK_FILE_URI)); + } + + @Test + void testTryUpsertLockFile_rateLimit_returnsUnknownError() { + StorageLockData lockData = new StorageLockData(false, 999L, "owner"); + BlobStorageException ex = mock(BlobStorageException.class); + when(ex.getStatusCode()).thenReturn(429); + when(mockBlobClient.uploadWithResponse(any(BlobParallelUploadOptions.class), isNull(), eq(Context.NONE))).thenThrow(ex); + + Pair> result = lockClient.tryUpsertLockFile(lockData, Option.empty()); + + assertEquals(LockUpsertResult.UNKNOWN_ERROR, result.getLeft()); + assertTrue(result.getRight().isEmpty()); + verify(mockLogger).warn(contains("Rate limit exceeded"), eq(OWNER_ID), eq(LOCK_FILE_URI)); + } + + @Test + void testTryUpsertLockFile_serverError_returnsUnknownError() { + StorageLockData lockData = new StorageLockData(false, 999L, "owner"); + BlobStorageException ex = mock(BlobStorageException.class); + when(ex.getStatusCode()).thenReturn(503); + when(mockBlobClient.uploadWithResponse(any(BlobParallelUploadOptions.class), isNull(), eq(Context.NONE))).thenThrow(ex); + + Pair> result = lockClient.tryUpsertLockFile(lockData, Option.empty()); + + assertEquals(LockUpsertResult.UNKNOWN_ERROR, result.getLeft()); + assertTrue(result.getRight().isEmpty()); + verify(mockLogger).warn(contains("Azure returned internal server error code"), eq(OWNER_ID), eq(LOCK_FILE_URI), eq(ex)); + } + + @Test + void testTryUpsertLockFile_unexpectedError_returnsUnknownError() { + StorageLockData lockData = new StorageLockData(false, 999L, "owner"); + BlobStorageException ex = mock(BlobStorageException.class); + when(ex.getStatusCode()).thenReturn(400); + when(mockBlobClient.uploadWithResponse(any(BlobParallelUploadOptions.class), isNull(), eq(Context.NONE))).thenThrow(ex); + + Pair> result = lockClient.tryUpsertLockFile(lockData, Option.empty()); + + assertEquals(LockUpsertResult.UNKNOWN_ERROR, result.getLeft()); + assertTrue(result.getRight().isEmpty()); + } + + @Test + void testReadCurrentLockFile_notFound_returnsNotExists() { + BlobStorageException ex = mock(BlobStorageException.class); + when(ex.getStatusCode()).thenReturn(404); + when(mockBlobClient.downloadContentWithResponse(isNull(), isNull(), isNull(), eq(Context.NONE))).thenThrow(ex); + + Pair> result = lockClient.readCurrentLockFile(); + assertEquals(LockGetResult.NOT_EXISTS, result.getLeft()); + assertTrue(result.getRight().isEmpty()); + verify(mockLogger).info(contains("Object not found"), eq(OWNER_ID), eq(LOCK_FILE_URI)); + } + + @Test + void testReadCurrentLockFile_blobFound_success() { + setUp(LOCK_FILE_URI_WITH_NESTED_PATH); + + StorageLockData data = new StorageLockData(false, 1700000000000L, "testOwner"); + byte[] json = StorageLockFile.toByteArray(data); + BlobDownloadContentResponse response = mock(BlobDownloadContentResponse.class); + when(response.getValue()).thenReturn(BinaryData.fromBytes(json)); + when(response.getHeaders()).thenReturn(new HttpHeaders().set("ETag", "\"etag-123\"")); + when(mockBlobClient.downloadContentWithResponse(isNull(), isNull(), isNull(), eq(Context.NONE))).thenReturn(response); + + Pair> result = lockClient.readCurrentLockFile(); + + assertEquals(LockGetResult.SUCCESS, result.getLeft()); + assertTrue(result.getRight().isPresent()); + assertEquals("\"etag-123\"", result.getRight().get().getVersionId()); + assertEquals("testOwner", result.getRight().get().getOwner()); + } + + @Test + void testReadCurrentLockFile_missingEtag_throwsHoodieLockException() { + StorageLockData data = new StorageLockData(false, 1700000000000L, "testOwner"); + byte[] json = StorageLockFile.toByteArray(data); + BlobDownloadContentResponse response = mock(BlobDownloadContentResponse.class); + when(response.getHeaders()).thenReturn(null); + when(mockBlobClient.downloadContentWithResponse(isNull(), isNull(), isNull(), eq(Context.NONE))).thenReturn(response); + + HoodieLockException exception = + assertThrows(HoodieLockException.class, () -> lockClient.readCurrentLockFile()); + + assertTrue(exception.getMessage().contains("Missing ETag in Azure download response for lock file")); + } + + @Test + void testReadCurrentLockFile_emptyEtag_throwsHoodieLockException() { + StorageLockData data = new StorageLockData(false, 1700000000000L, "testOwner"); + byte[] json = StorageLockFile.toByteArray(data); + BlobDownloadContentResponse response = mock(BlobDownloadContentResponse.class); + when(response.getHeaders()).thenReturn(new HttpHeaders().set("ETag", "")); + when(mockBlobClient.downloadContentWithResponse(isNull(), isNull(), isNull(), eq(Context.NONE))).thenReturn(response); + + HoodieLockException exception = + assertThrows(HoodieLockException.class, () -> lockClient.readCurrentLockFile()); + + assertTrue(exception.getMessage().contains("Missing ETag in Azure download response for lock file")); + } + + @Test + void testReadCurrentLockFile_malformedQuotedEtag_throwsHoodieLockException() { + BlobDownloadContentResponse response = mock(BlobDownloadContentResponse.class); + when(response.getHeaders()).thenReturn(new HttpHeaders().set("ETag", "\"etag-123")); + when(mockBlobClient.downloadContentWithResponse(isNull(), isNull(), isNull(), eq(Context.NONE))).thenReturn(response); + + HoodieLockException exception = + assertThrows(HoodieLockException.class, () -> lockClient.readCurrentLockFile()); + + assertTrue(exception.getMessage().contains("Malformed ETag in Azure download response for lock file")); + } + + @Test + void testReadCurrentLockFile_download404_returnsNotExists() { + BlobStorageException ex404 = mock(BlobStorageException.class); + when(ex404.getStatusCode()).thenReturn(404); + when(mockBlobClient.downloadContentWithResponse(isNull(), isNull(), isNull(), eq(Context.NONE))).thenThrow(ex404); + + Pair> result = lockClient.readCurrentLockFile(); + assertEquals(LockGetResult.NOT_EXISTS, result.getLeft()); + assertTrue(result.getRight().isEmpty()); + } + + @Test + void testClose_noop() { + lockClient.close(); + } + + @Test + void testReadObject_reusesSecondaryBlobServiceClientForSameEndpoint() { + BlobServiceClient primaryServiceClient = mock(BlobServiceClient.class); + BlobContainerClient primaryContainerClient = mock(BlobContainerClient.class); + BlobClient primaryBlobClient = mock(BlobClient.class); + when(primaryServiceClient.getBlobContainerClient(any(String.class))).thenReturn(primaryContainerClient); + when(primaryContainerClient.getBlobClient(any(String.class))).thenReturn(primaryBlobClient); + + BlobServiceClient secondaryServiceClient = mock(BlobServiceClient.class); + BlobContainerClient secondaryContainerClient = mock(BlobContainerClient.class); + BlobClient secondaryBlobClient = mock(BlobClient.class); + when(secondaryServiceClient.getBlobContainerClient(any(String.class))).thenReturn(secondaryContainerClient); + when(secondaryContainerClient.getBlobClient(any(String.class))).thenReturn(secondaryBlobClient); + when(secondaryBlobClient.exists()).thenReturn(false); + + AtomicInteger secondarySupplierInvocations = new AtomicInteger(0); + AzureStorageLockClient client = new AzureStorageLockClient( + OWNER_ID, + LOCK_FILE_URI, + new Properties(), + location -> { + if ("https://secondary.blob.core.windows.net".equals(location.getBlobEndpoint())) { + secondarySupplierInvocations.incrementAndGet(); + return secondaryServiceClient; + } + return primaryServiceClient; + }, + mockLogger); + + client.readObject("abfs://container@secondary.dfs.core.windows.net/path1", true); + client.readObject("abfs://container@secondary.dfs.core.windows.net/path2", true); + + assertEquals(1, secondarySupplierInvocations.get()); + } + + private static void assertRequestCondition(Object blobParallelUploadOptions, String expectedField, String expectedValue) throws Exception { + Object requestConditions = tryInvoke(blobParallelUploadOptions, "getRequestConditions"); + if (requestConditions == null) { + Field f = blobParallelUploadOptions.getClass().getDeclaredField("requestConditions"); + f.setAccessible(true); + requestConditions = f.get(blobParallelUploadOptions); + } + assertNotNull(requestConditions, "requestConditions should be set on upload options"); + + Object actualIfMatch = tryInvoke(requestConditions, "getIfMatch"); + if (actualIfMatch == null) { + actualIfMatch = getFieldIfExists(requestConditions, "ifMatch"); + } + Object actualIfNoneMatch = tryInvoke(requestConditions, "getIfNoneMatch"); + if (actualIfNoneMatch == null) { + actualIfNoneMatch = getFieldIfExists(requestConditions, "ifNoneMatch"); + } + + if ("ifMatch".equals(expectedField)) { + assertEquals(expectedValue, actualIfMatch, "Expected If-Match to be set"); + } else if ("ifNoneMatch".equals(expectedField)) { + assertEquals(expectedValue, actualIfNoneMatch, "Expected If-None-Match to be set"); + } else { + throw new IllegalArgumentException("Unexpected expectedField: " + expectedField); + } + } + + private static Object tryInvoke(Object target, String methodName) { + try { + Method m = target.getClass().getMethod(methodName); + return m.invoke(target); + } catch (Exception ignored) { + return null; + } + } + + private static Object getFieldIfExists(Object target, String fieldName) { + try { + Field f = target.getClass().getDeclaredField(fieldName); + f.setAccessible(true); + return f.get(target); + } catch (Exception ignored) { + return null; + } + } +} diff --git a/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClientUriParsing.java b/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClientUriParsing.java new file mode 100644 index 0000000000000..ba1cde8047f71 --- /dev/null +++ b/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClientUriParsing.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.azure.transaction.lock; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestAzureStorageLockClientUriParsing { + + @Test + public void testParseAbfsUri() { + String uri = "abfs://container@account.dfs.core.windows.net/table/.hoodie/.locks/table_lock.json"; + + AzureStorageLockClient.AzureLocation l = AzureStorageLockClient.parseAzureLocation(uri); + + assertEquals("https://account.blob.core.windows.net", l.getBlobEndpoint()); + assertEquals("container", l.getContainer()); + assertEquals("table/.hoodie/.locks/table_lock.json", l.getBlobPath()); + } + + @Test + public void testParseAbfssUri() { + String uri = "abfss://container@account.dfs.core.windows.net/lake/db/tbl/.hoodie/.locks/table_lock.json"; + + AzureStorageLockClient.AzureLocation l = AzureStorageLockClient.parseAzureLocation(uri); + + assertEquals("https://account.blob.core.windows.net", l.getBlobEndpoint()); + assertEquals("container", l.getContainer()); + assertEquals("lake/db/tbl/.hoodie/.locks/table_lock.json", l.getBlobPath()); + } + + @Test + public void testParseWasbUri() { + String uri = "wasb://container@account.blob.core.windows.net/table/.hoodie/.locks/table_lock.json"; + + AzureStorageLockClient.AzureLocation l = AzureStorageLockClient.parseAzureLocation(uri); + + assertEquals("https://account.blob.core.windows.net", l.getBlobEndpoint()); + assertEquals("container", l.getContainer()); + assertEquals("table/.hoodie/.locks/table_lock.json", l.getBlobPath()); + } + + @Test + public void testParseWasbsUri() { + String uri = "wasbs://container@account.blob.core.windows.net/lake/db/tbl/.hoodie/.locks/table_lock.json"; + + AzureStorageLockClient.AzureLocation l = AzureStorageLockClient.parseAzureLocation(uri); + + assertEquals("https://account.blob.core.windows.net", l.getBlobEndpoint()); + assertEquals("container", l.getContainer()); + assertEquals("lake/db/tbl/.hoodie/.locks/table_lock.json", l.getBlobPath()); + } + + @Test + public void testParseHttpsUri() { + String uri = "https://account.blob.core.windows.net/container/table/.hoodie/.locks/table_lock.json"; + + AzureStorageLockClient.AzureLocation l = AzureStorageLockClient.parseAzureLocation(uri); + + assertEquals("https://account.blob.core.windows.net", l.getBlobEndpoint()); + assertEquals("container", l.getContainer()); + assertEquals("table/.hoodie/.locks/table_lock.json", l.getBlobPath()); + } + + @Test + public void testParseHttpUri() { + String uri = "http://127.0.0.1:10000/container/table/.hoodie/.locks/table_lock.json"; + + AzureStorageLockClient.AzureLocation l = AzureStorageLockClient.parseAzureLocation(uri); + + assertEquals("http://127.0.0.1:10000", l.getBlobEndpoint()); + assertEquals("table", l.getContainer()); + assertEquals(".hoodie/.locks/table_lock.json", l.getBlobPath()); + } +} diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java index ab9994aa36b5c..faa7f52e12d68 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java @@ -41,7 +41,10 @@ public void testStorageSchemes() { for (StorageSchemes scheme : StorageSchemes.values()) { String schemeName = scheme.getScheme(); - if (scheme.getScheme().startsWith("s3") || scheme.getScheme().startsWith("gs")) { + if (scheme.getScheme().startsWith("s3") + || scheme.getScheme().startsWith("gs") + || scheme.getScheme().startsWith("abfs") + || scheme.getScheme().startsWith("wasb")) { assertTrue(StorageSchemes.getStorageLockImplementationIfExists(schemeName).isPresent()); } else { assertFalse(StorageSchemes.getStorageLockImplementationIfExists(schemeName).isPresent()); diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java b/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java index 77ecb536cf3ea..a052a84f743a6 100644 --- a/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java +++ b/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java @@ -47,14 +47,14 @@ public enum StorageSchemes { S3("s3", true, null, "org.apache.hudi.aws.transaction.lock.S3StorageLockClient"), // Google Cloud Storage GCS("gs", true, null, "org.apache.hudi.gcp.transaction.lock.GCSStorageLockClient"), - // Azure WASB - WASB("wasb", null, null, null), - WASBS("wasbs", null, null, null), + // Azure WASB (Azure Blob Storage) + WASB("wasb", null, null, "org.apache.hudi.azure.transaction.lock.AzureStorageLockClient"), + WASBS("wasbs", null, null, "org.apache.hudi.azure.transaction.lock.AzureStorageLockClient"), // Azure ADLS ADL("adl", null, null, null), // Azure ADLS Gen2 - ABFS("abfs", null, null, null), - ABFSS("abfss", null, null, null), + ABFS("abfs", null, null, "org.apache.hudi.azure.transaction.lock.AzureStorageLockClient"), + ABFSS("abfss", null, null, "org.apache.hudi.azure.transaction.lock.AzureStorageLockClient"), // Aliyun OSS OSS("oss", null, null, null), // View FS for federated setups. If federating across cloud stores, then append diff --git a/packaging/hudi-azure-bundle/pom.xml b/packaging/hudi-azure-bundle/pom.xml new file mode 100644 index 0000000000000..479a4bb6c93c0 --- /dev/null +++ b/packaging/hudi-azure-bundle/pom.xml @@ -0,0 +1,179 @@ + + + + + hudi + org.apache.hudi + 1.2.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + + hudi-azure-bundle + jar + + + true + ${project.parent.basedir} + true + + + + + + org.apache.rat + apache-rat-plugin + + + org.apache.maven.plugins + maven-shade-plugin + ${maven-shade-plugin.version} + + + package + + shade + + + ${shadeSources} + ${project.build.directory}/dependency-reduced-pom.xml + + + + + true + + + META-INF/LICENSE + target/classes/META-INF/LICENSE + + + + + + + org.apache.hudi:hudi-hadoop-mr + org.apache.hudi:hudi-sync-common + org.apache.hudi:hudi-hive-sync + org.apache.hudi:hudi-azure + + + com.azure:* + com.microsoft.azure:* + io.projectreactor:* + io.netty:* + + + io.dropwizard.metrics:metrics-core + com.beust:jcommander + commons-io:commons-io + org.openjdk.jol:jol-core + org.apache.parquet:parquet-avro + + + + + com.beust.jcommander. + org.apache.hudi.com.beust.jcommander. + + + org.apache.commons.io. + org.apache.hudi.org.apache.commons.io. + + + com.codahale.metrics. + org.apache.hudi.com.codahale.metrics. + + + org.openjdk.jol. + org.apache.hudi.org.openjdk.jol. + + + false + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + META-INF/services/javax.* + **/*.proto + + + + ${project.artifactId}-${project.version} + + + + + + + + src/main/resources + + + + + + + + org.apache.hudi + hudi-common + ${project.version} + + + + org.apache.hadoop + * + + + com.fasterxml.jackson.module + jackson-module-afterburner + + + + + org.apache.hudi + hudi-hive-sync + ${project.version} + + + javax.servlet + servlet-api + + + + + org.apache.hudi + hudi-azure + ${project.version} + + + + + org.apache.parquet + parquet-avro + ${parquet.version} + compile + + + + diff --git a/pom.xml b/pom.xml index 1dbfc302754fb..bc63c732f74ef 100644 --- a/pom.xml +++ b/pom.xml @@ -40,6 +40,7 @@ hudi-client hudi-aws hudi-gcp + hudi-azure hudi-hadoop-common hudi-hadoop-mr hudi-io @@ -52,6 +53,7 @@ packaging/hudi-hive-sync-bundle packaging/hudi-aws-bundle packaging/hudi-gcp-bundle + packaging/hudi-azure-bundle packaging/hudi-spark-bundle packaging/hudi-presto-bundle packaging/hudi-utilities-bundle