Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f140f71
Add IcebergResourceManager
tarun-google Nov 13, 2025
df42009
Add IcebergResource Manager for IT
tarun-google Nov 14, 2025
4d0b2b8
Fix Testcase
tarun-google Nov 14, 2025
5a481f1
Order fields
tarun-google Nov 14, 2025
a6f1bc9
Apply spotless
tarun-google Nov 14, 2025
2290ce6
Add Unit tests
tarun-google Nov 15, 2025
25233f2
Apply Spotless
tarun-google Nov 15, 2025
b5ab434
Add Postgres to Iceberg
tarun-google Nov 18, 2025
7fbf4e3
Add Pipeline Options
tarun-google Nov 24, 2025
bdbdc1d
Merge branch 'GoogleCloudPlatform:main' into iceberg_yaml
tarun-google Nov 24, 2025
35d4a4f
fix options
tarun-google Nov 24, 2025
0d25345
fix options
tarun-google Nov 24, 2025
cfe6f04
Merge branch 'iceberg_yaml' of github.com:tarun-google/DataflowTempla…
tarun-google Nov 24, 2025
b1f70f3
Add Postgres to Iceberg Template
tarun-google Nov 25, 2025
b347d08
Add test case for framework
tarun-google Nov 26, 2025
1461815
Fix Tests
tarun-google Nov 26, 2025
7698e22
spotless
tarun-google Nov 26, 2025
5bedfa6
Fix dependency
tarun-google Nov 27, 2025
72691a6
Add PostgresToIceberg Template
tarun-google Dec 1, 2025
3a5e082
Fix failing test
tarun-google Dec 1, 2025
afc72f4
Fix names
tarun-google Dec 1, 2025
8a58236
Fix data type
tarun-google Dec 3, 2025
f378296
Add Iceberg setup beforehand
tarun-google Dec 3, 2025
4cc74c3
use GCS Bucket for warehouse
tarun-google Dec 3, 2025
2f1a0b4
Use base path
tarun-google Dec 4, 2025
98aafb0
test2
tarun-google Dec 4, 2025
f0539f1
test
tarun-google Dec 4, 2025
0493cd5
test
tarun-google Dec 4, 2025
84afec9
Merge branch 'main' into iceberg_test2
tarun-google Dec 4, 2025
6113a73
test
tarun-google Dec 4, 2025
771692f
test
tarun-google Dec 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cicd/internal/flags/common-flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
SPANNER = "SPANNER"
BIGTABLE = "BIGTABLE"
DATASTREAM = "DATASTREAM"
YAML = "YAML"
)

// Avoid making these vars public.
Expand All @@ -36,6 +37,10 @@ var (
moduleMap = map[string][]string{
ALL: {},
DEFAULT: {},
YAML: {
"it/iceberg/",
"yaml/",
},
KAFKA: {"v2/kafka-common/",
"v2/kafka-to-bigquery/",
"v2/kafka-to-gcs/",
Expand Down
48 changes: 48 additions & 0 deletions yaml/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<postgresql.version>42.6.1</postgresql.version>
<iceberg.version>1.6.1</iceberg.version>
<bigdataoss.version>3.1.10</bigdataoss.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -65,6 +68,51 @@
<artifactId>beam-it-kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>util-hadoop</artifactId>
<version>${bigdataoss.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<version>${bigdataoss.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-gcp</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-iceberg</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud.teleport</groupId>
<artifactId>it-jdbc</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud.teleport</groupId>
<artifactId>iceberg</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcsio</artifactId>
<version>${bigdataoss.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates.yaml;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Validation;

@Template(
name = "Postgres_To_Iceberg_Yaml",
category = TemplateCategory.BATCH,
type = Template.TemplateType.YAML,
displayName = "Postgres to Iceberg (YAML)",
description =
"A YAML template for Postgres to Iceberg is a batch pipeline with executes the user provided SQL query to read data from Postgres table"
+ "and outputs the records to Iceberg table.",
flexContainerName = "postgres-to-iceberg-yaml",
yamlTemplateFile = "PostgresToIceberg.yaml",
filesToCopy = {
"template.yaml",
"main.py",
"requirements.txt",
"options/postgres_options.yaml",
"options/iceberg_options.yaml"
},
contactInformation = "https://cloud.google.com/support",
requirements = {
"The Input Postgres instance and table must exist.",
"The Output Iceberg table need not exist, but the storage must exist and passed through catalog_properties."
},
hidden = false)
public interface PostgresToIcebergYaml {

// Common JDBC Parameters
@TemplateParameter.Text(
order = 1,
name = "jdbcUrl",
description = "Connection URL for the JDBC source/sink.",
helpText = "The JDBC connection URL.",
example = "jdbc:postgresql://your-host:5432/your-db")
@Validation.Required
String getJdbcUrl();

@TemplateParameter.Text(
order = 2,
name = "username",
optional = true,
description = "Username for the JDBC connection.",
helpText = "The database username.",
example = "my_user")
String getUsername();

@TemplateParameter.Password(
order = 3,
name = "password",
optional = true,
description = "Password for the JDBC connection.",
helpText = "The database password.",
example = "my_secret_password")
String getPassword();

@TemplateParameter.Text(
order = 4,
name = "driverClassName",
optional = true,
description =
"The fully-qualified class name of the JDBC driver. Default: org.postgresql.Driver",
helpText = "The fully-qualified class name of the JDBC driver to use.",
example = "org.postgresql.Driver")
@Default.String("org.postgresql.Driver")
String getDriverClassName();

@TemplateParameter.Text(
order = 5,
name = "driverJars",
optional = true,
description = "Comma-separated GCS paths of the JDBC driver jars.",
helpText = "A comma-separated list of GCS paths to the JDBC driver JAR files.",
example = "gs://your-bucket/postgresql-42.2.23.jar")
String getDriverJars();

@TemplateParameter.Text(
order = 6,
name = "connectionProperties",
optional = true,
description = "JDBC connection properties.",
helpText = "A semicolon-separated list of key-value pairs for the JDBC connection.",
example = "key1=value1;key2=value2")
String getConnectionProperties();

@TemplateParameter.Text(
order = 7,
name = "connectionInitSql",
optional = true,
description = "A list of SQL statements to execute upon connection initialization.",
helpText = "A list of SQL statements to execute when a new connection is established.",
example = "[\"SET TIME ZONE UTC\"]")
String getConnectionInitSql();

@TemplateParameter.Text(
order = 8,
name = "jdbcType",
optional = true,
description = "Type of JDBC source. Default: postgres.",
helpText =
"Specifies the type of JDBC source. An appropriate default driver will be packaged.",
example = "postgres")
@Default.String("postgres")
String getJdbcType();

// JDBC Read Parameters
@TemplateParameter.Text(
order = 9,
name = "location",
optional = true,
description = "The name of the table to read from.",
helpText = "The name of the database table to read data from.",
example = "public.my_table")
String getReadLocation();

@TemplateParameter.Text(
order = 10,
name = "readQuery",
optional = true,
description = "The SQL query to execute for reading data.",
helpText = "The SQL query to execute on the source to extract data.",
example = "SELECT * FROM my_table WHERE status = 'active'")
String getReadQuery();

@TemplateParameter.Text(
order = 11,
name = "partitionColumn",
optional = true,
description = "The name of a numeric column to be used for partitioning.",
helpText = "The name of a numeric column that will be used for partitioning the data.",
example = "id")
String getPartitionColumn();

@TemplateParameter.Integer(
order = 12,
name = "numPartitions",
optional = true,
description = "The number of partitions to divide the data into.",
helpText = "The number of partitions to create for parallel reading.",
example = "10")
Integer getNumPartitions();

@TemplateParameter.Integer(
order = 13,
name = "fetchSize",
optional = true,
description = "The number of rows to fetch from the database at a time.",
helpText =
"The number of rows to fetch per database call. It should ONLY be used if the default value throws memory errors.",
example = "50000")
Integer getFetchSize();

@TemplateParameter.Boolean(
order = 14,
name = "disableAutoCommit",
optional = true,
description = "Whether to disable auto-commit on read.",
helpText =
"Whether to disable auto-commit on read. Required for some databases like Postgres.",
example = "true")
Boolean getDisableAutoCommit();

@TemplateParameter.Boolean(
order = 15,
name = "outputParallelization",
optional = true,
description = "Whether to reshuffle the PCollection to distribute results to all workers.",
helpText = "If true, the resulting PCollection will be reshuffled.",
example = "true")
Boolean getOutputParallelization();

// Iceberg Common Parameters
@TemplateParameter.Text(
order = 16,
name = "table",
description = "A fully-qualified table identifier.",
helpText = "A fully-qualified table identifier, e.g., my_dataset.my_table.",
example = "my_dataset.my_table")
@Validation.Required
String getTable();

@TemplateParameter.Text(
order = 17,
name = "catalogName",
description = "Name of the catalog containing the table.",
helpText = "The name of the Iceberg catalog that contains the table.",
example = "my_hadoop_catalog")
@Validation.Required
String getCatalogName();

@TemplateParameter.Text(
order = 18,
name = "catalogProperties",
description = "Properties used to set up the Iceberg catalog.",
helpText = "A map of properties for setting up the Iceberg catalog.",
example = "{\"type\": \"hadoop\", \"warehouse\": \"gs://your-bucket/warehouse\"}")
@Validation.Required
String getCatalogProperties();

@TemplateParameter.Text(
order = 19,
name = "configProperties",
optional = true,
description = "Properties passed to the Hadoop Configuration.",
helpText = "A map of properties to pass to the Hadoop Configuration.",
example = "{\"fs.gs.impl\": \"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem\"}")
String getConfigProperties();

@TemplateParameter.Text(
order = 20,
name = "drop",
optional = true,
description = "A list of field names to drop from the input record before writing.",
helpText = "A list of field names to drop. Mutually exclusive with 'keep' and 'only'.",
example = "[\"field_to_drop_1\", \"field_to_drop_2\"]")
String getDrop();

@TemplateParameter.Text(
order = 21,
name = "keep",
optional = true,
description = "A list of field names to keep in the input record.",
helpText = "A list of field names to keep. Mutually exclusive with 'drop' and 'only'.",
example = "[\"field_to_keep_1\", \"field_to_keep_2\"]")
String getKeep();

// Iceberg Write Parameters
@TemplateParameter.Text(
order = 22,
name = "only",
optional = true,
description = "The name of a single record field that should be written.",
helpText = "The name of a single field to write. Mutually exclusive with 'keep' and 'drop'.",
example = "my_record_field")
String getOnly();

@TemplateParameter.Text(
order = 23,
name = "partitionFields",
optional = true,
description = "Fields used to create a partition spec for new tables.",
helpText = "A list of fields and transforms for partitioning, e.g., ['day(ts)', 'category'].",
example = "[\"day(ts)\", \"bucket(id, 4)\"]")
String getPartitionFields();

@TemplateParameter.Text(
order = 24,
name = "tableProperties",
optional = true,
description = "Iceberg table properties to be set on table creation.",
helpText = "A map of Iceberg table properties to set when the table is created.",
example = "{\"commit.retry.num-retries\": \"2\"}")
String getTableProperties();
}
Loading
Loading