From b1f65736b21e93a3f64140958d519e8d719e486d Mon Sep 17 00:00:00 2001 From: Saransh Gupta Date: Wed, 10 May 2023 20:52:29 +0000 Subject: [PATCH 1/3] added capability to use source s3 credentials different from those for catalog/target --- .../main/java/iceberg/IcebergConnector.java | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/tools/java-iceberg-cli/src/main/java/iceberg/IcebergConnector.java b/tools/java-iceberg-cli/src/main/java/iceberg/IcebergConnector.java index fa98edf..36e365f 100644 --- a/tools/java-iceberg-cli/src/main/java/iceberg/IcebergConnector.java +++ b/tools/java-iceberg-cli/src/main/java/iceberg/IcebergConnector.java @@ -446,16 +446,33 @@ public boolean commitTable(String dataFiles) throws Exception { PartitionSpec ps = iceberg_table.spec(); - AwsBasicCredentials awsCreds = AwsBasicCredentials.create( - System.getenv("AWS_ACCESS_KEY_ID"), - System.getenv("AWS_SECRET_ACCESS_KEY")); + AwsBasicCredentials awsCreds; + String srcAccessKeyId = System.getenv("SOURCE_AWS_ACCESS_KEY_ID"); + String srcSecretKey = System.getenv("SOURCE_AWS_SECRET_ACCESS_KEY"); + String srcRegion = System.getenv("SOURCE_AWS_REGION"); + Configuration srcConfig = m_catalog.getConf(); + + if((srcAccessKeyId != null) && (srcSecretKey != null)) { + awsCreds = AwsBasicCredentials.create(srcAccessKeyId, srcSecretKey); + srcConfig.set("fs.s3a.access.key", srcAccessKeyId); + srcConfig.set("fs.s3a.secret.key", srcSecretKey); + } else { + awsCreds = AwsBasicCredentials.create( + System.getenv("AWS_ACCESS_KEY_ID"), + System.getenv("AWS_SECRET_ACCESS_KEY")); + } + + if (srcRegion == null) { + srcRegion = System.getenv("AWS_REGION"); + } + final String srcRegion_final = srcRegion; SdkHttpClient client = ApacheHttpClient.builder() .maxConnections(100) .build(); SerializableSupplier supplier = () -> S3Client.builder() - .region(Region.of(System.getenv("AWS_REGION"))) + .region(Region.of(srcRegion_final)) .credentialsProvider(StaticCredentialsProvider.create(awsCreds)) .httpClient(client) .build(); From a47ef9989e23373ad5c7d5d56a859d77ad95273d Mon Sep 17 00:00:00 2001 From: Saransh Gupta Date: Wed, 10 May 2023 21:07:19 +0000 Subject: [PATCH 2/3] added capability to use source s3 credentials different from those for catalog/target. added minor comments --- .../src/main/java/iceberg/IcebergConnector.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tools/java-iceberg-cli/src/main/java/iceberg/IcebergConnector.java b/tools/java-iceberg-cli/src/main/java/iceberg/IcebergConnector.java index 36e365f..d38a9c5 100644 --- a/tools/java-iceberg-cli/src/main/java/iceberg/IcebergConnector.java +++ b/tools/java-iceberg-cli/src/main/java/iceberg/IcebergConnector.java @@ -446,6 +446,11 @@ public boolean commitTable(String dataFiles) throws Exception { PartitionSpec ps = iceberg_table.spec(); + // Checking whether separate source credentials are declared. + // If yes, use those, other set it to the default credentials. + // We also need to set the source configuration accordingly, + // for hadoop FileSystem to use it correctly. + AwsBasicCredentials awsCreds; String srcAccessKeyId = System.getenv("SOURCE_AWS_ACCESS_KEY_ID"); String srcSecretKey = System.getenv("SOURCE_AWS_SECRET_ACCESS_KEY"); From 09342f5c26629a26dba264ff75a466e83f262628 Mon Sep 17 00:00:00 2001 From: Saransh Gupta Date: Fri, 12 May 2023 01:02:58 +0000 Subject: [PATCH 3/3] Function to get src creds. creds prefix changed from SOURCE to SECONDARY. --- .../main/java/iceberg/IcebergConnector.java | 52 +++++++++++-------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/tools/java-iceberg-cli/src/main/java/iceberg/IcebergConnector.java b/tools/java-iceberg-cli/src/main/java/iceberg/IcebergConnector.java index d38a9c5..3c223b1 100644 --- a/tools/java-iceberg-cli/src/main/java/iceberg/IcebergConnector.java +++ b/tools/java-iceberg-cli/src/main/java/iceberg/IcebergConnector.java @@ -438,46 +438,56 @@ Long getJsonLongOrDefault(JSONObject o, String key, Long defVal) { } } - public boolean commitTable(String dataFiles) throws Exception { - if (iceberg_table == null) - loadTable(); + AwsBasicCredentials getSourceAwsCreds() { + // Checking whether secondary credentials are declared. + // If yes, use those for the source files, other set it to the default credentials. - System.out.println("Commiting to the table " + m_tableIdentifier); - - PartitionSpec ps = iceberg_table.spec(); - - // Checking whether separate source credentials are declared. - // If yes, use those, other set it to the default credentials. - // We also need to set the source configuration accordingly, - // for hadoop FileSystem to use it correctly. - AwsBasicCredentials awsCreds; - String srcAccessKeyId = System.getenv("SOURCE_AWS_ACCESS_KEY_ID"); - String srcSecretKey = System.getenv("SOURCE_AWS_SECRET_ACCESS_KEY"); - String srcRegion = System.getenv("SOURCE_AWS_REGION"); - Configuration srcConfig = m_catalog.getConf(); + String srcAccessKeyId = System.getenv("SECONDARY_AWS_ACCESS_KEY_ID"); + String srcSecretKey = System.getenv("SECONDARY_AWS_SECRET_ACCESS_KEY"); if((srcAccessKeyId != null) && (srcSecretKey != null)) { awsCreds = AwsBasicCredentials.create(srcAccessKeyId, srcSecretKey); - srcConfig.set("fs.s3a.access.key", srcAccessKeyId); - srcConfig.set("fs.s3a.secret.key", srcSecretKey); } else { awsCreds = AwsBasicCredentials.create( System.getenv("AWS_ACCESS_KEY_ID"), System.getenv("AWS_SECRET_ACCESS_KEY")); } - if (srcRegion == null) { + return awsCreds; + } + + public boolean commitTable(String dataFiles) throws Exception { + if (iceberg_table == null) + loadTable(); + + System.out.println("Commiting to the table " + m_tableIdentifier); + + PartitionSpec ps = iceberg_table.spec(); + + AwsBasicCredentials awsCreds = getSourceAwsCreds(); + + String srcRegion; + if (System.getenv("SECONDARY_AWS_REGION") != null) { + srcRegion = System.getenv("SECONDARY_AWS_REGION"); + } else { srcRegion = System.getenv("AWS_REGION"); } - final String srcRegion_final = srcRegion; + // final String srcRegion_final = srcRegion; + + // We also need to set the source configuration accordingly, + // for hadoop FileSystem to use it correctly. + // The catalog and destination files (if any) should use the main credentials. + Configuration srcConfig = m_catalog.getConf(); + srcConfig.set("fs.s3a.access.key", awsCreds.accessKeyId()); + srcConfig.set("fs.s3a.secret.key", awsCreds.secretAccessKey()); SdkHttpClient client = ApacheHttpClient.builder() .maxConnections(100) .build(); SerializableSupplier supplier = () -> S3Client.builder() - .region(Region.of(srcRegion_final)) + .region(Region.of(srcRegion)) .credentialsProvider(StaticCredentialsProvider.create(awsCreds)) .httpClient(client) .build();