Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions tools/java-iceberg-cli/src/main/java/iceberg/IcebergConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,25 @@ Long getJsonLongOrDefault(JSONObject o, String key, Long defVal) {
}
}

AwsBasicCredentials getSourceAwsCreds() {
// Checking whether secondary credentials are declared.
// If yes, use those for the source files, other set it to the default credentials.

AwsBasicCredentials awsCreds;
String srcAccessKeyId = System.getenv("SECONDARY_AWS_ACCESS_KEY_ID");
String srcSecretKey = System.getenv("SECONDARY_AWS_SECRET_ACCESS_KEY");

if((srcAccessKeyId != null) && (srcSecretKey != null)) {
awsCreds = AwsBasicCredentials.create(srcAccessKeyId, srcSecretKey);
} else {
awsCreds = AwsBasicCredentials.create(
System.getenv("AWS_ACCESS_KEY_ID"),
System.getenv("AWS_SECRET_ACCESS_KEY"));
}

return awsCreds;
}

public boolean commitTable(String dataFiles) throws Exception {
if (iceberg_table == null)
loadTable();
Expand All @@ -446,16 +465,29 @@ public boolean commitTable(String dataFiles) throws Exception {

PartitionSpec ps = iceberg_table.spec();

AwsBasicCredentials awsCreds = AwsBasicCredentials.create(
System.getenv("AWS_ACCESS_KEY_ID"),
System.getenv("AWS_SECRET_ACCESS_KEY"));
AwsBasicCredentials awsCreds = getSourceAwsCreds();

String srcRegion;
if (System.getenv("SECONDARY_AWS_REGION") != null) {
srcRegion = System.getenv("SECONDARY_AWS_REGION");
} else {
srcRegion = System.getenv("AWS_REGION");
}
// final String srcRegion_final = srcRegion;

// We also need to set the source configuration accordingly,
// for hadoop FileSystem to use it correctly.
// The catalog and destination files (if any) should use the main credentials.
Configuration srcConfig = m_catalog.getConf();
srcConfig.set("fs.s3a.access.key", awsCreds.accessKeyId());
srcConfig.set("fs.s3a.secret.key", awsCreds.secretAccessKey());

SdkHttpClient client = ApacheHttpClient.builder()
.maxConnections(100)
.build();

SerializableSupplier<S3Client> supplier = () -> S3Client.builder()
.region(Region.of(System.getenv("AWS_REGION")))
.region(Region.of(srcRegion))
.credentialsProvider(StaticCredentialsProvider.create(awsCreds))
.httpClient(client)
.build();
Expand Down