diff --git a/tools/java-iceberg-cli/src/main/java/iceberg/IcebergConnector.java b/tools/java-iceberg-cli/src/main/java/iceberg/IcebergConnector.java index fa98edf..3c223b1 100644 --- a/tools/java-iceberg-cli/src/main/java/iceberg/IcebergConnector.java +++ b/tools/java-iceberg-cli/src/main/java/iceberg/IcebergConnector.java @@ -438,6 +438,25 @@ Long getJsonLongOrDefault(JSONObject o, String key, Long defVal) { } } + AwsBasicCredentials getSourceAwsCreds() { + // Checking whether secondary credentials are declared. + // If yes, use those for the source files, other set it to the default credentials. + + AwsBasicCredentials awsCreds; + String srcAccessKeyId = System.getenv("SECONDARY_AWS_ACCESS_KEY_ID"); + String srcSecretKey = System.getenv("SECONDARY_AWS_SECRET_ACCESS_KEY"); + + if((srcAccessKeyId != null) && (srcSecretKey != null)) { + awsCreds = AwsBasicCredentials.create(srcAccessKeyId, srcSecretKey); + } else { + awsCreds = AwsBasicCredentials.create( + System.getenv("AWS_ACCESS_KEY_ID"), + System.getenv("AWS_SECRET_ACCESS_KEY")); + } + + return awsCreds; + } + public boolean commitTable(String dataFiles) throws Exception { if (iceberg_table == null) loadTable(); @@ -446,16 +465,29 @@ public boolean commitTable(String dataFiles) throws Exception { PartitionSpec ps = iceberg_table.spec(); - AwsBasicCredentials awsCreds = AwsBasicCredentials.create( - System.getenv("AWS_ACCESS_KEY_ID"), - System.getenv("AWS_SECRET_ACCESS_KEY")); + AwsBasicCredentials awsCreds = getSourceAwsCreds(); + + String srcRegion; + if (System.getenv("SECONDARY_AWS_REGION") != null) { + srcRegion = System.getenv("SECONDARY_AWS_REGION"); + } else { + srcRegion = System.getenv("AWS_REGION"); + } + // final String srcRegion_final = srcRegion; + + // We also need to set the source configuration accordingly, + // for hadoop FileSystem to use it correctly. + // The catalog and destination files (if any) should use the main credentials. + Configuration srcConfig = m_catalog.getConf(); + srcConfig.set("fs.s3a.access.key", awsCreds.accessKeyId()); + srcConfig.set("fs.s3a.secret.key", awsCreds.secretAccessKey()); SdkHttpClient client = ApacheHttpClient.builder() .maxConnections(100) .build(); SerializableSupplier supplier = () -> S3Client.builder() - .region(Region.of(System.getenv("AWS_REGION"))) + .region(Region.of(srcRegion)) .credentialsProvider(StaticCredentialsProvider.create(awsCreds)) .httpClient(client) .build();