Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -429,4 +429,15 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
Long getMaxCommitDelay();

void setMaxCommitDelay(Long value);

@TemplateParameter.GcsWriteFolder(
order = 34,
optional = true,
description = "GCS directory for AVRO files",
helpText = "This directory is used to write the AVRO files of the records read from source.",
example = "gs://your-bucket/your-path")
@Default.String("")
String getGcsOutputDirectory();

void setGcsOutputDirectory(String value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;

import java.util.Objects;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;

@DefaultCoder(AvroCoder.class)
public class AvroDestination {
public String name;
public String jsonSchema;

// Needed for serialization
public AvroDestination() {}

public AvroDestination(String name, String jsonSchema) {
this.name = name;
this.jsonSchema = jsonSchema;
}

public static AvroDestination of(String name, String jsonSchema) {
return new AvroDestination(name, jsonSchema);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof AvroDestination)) {
return false;
}
AvroDestination that = (AvroDestination) o;
return Objects.equals(name, that.name) && Objects.equals(jsonSchema, that.jsonSchema);
}

@Override
public int hashCode() {
return Objects.hash(name, jsonSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,38 @@
import java.util.Map;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.io.AvroIO;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.Write.FileNaming;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.gcp.spanner.MutationGroup;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerWriteResult;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MigrateTableTransform extends PTransform<PBegin, PCollection<Void>> {

private static final Logger LOG = LoggerFactory.getLogger(MigrateTableTransform.class);
public static final String GCS_RECORDS_WRITTEN = "gcs_records_written";

private SourceDbToSpannerOptions options;
private transient SourceDbToSpannerOptions options;
private SpannerConfig spannerConfig;
private Ddl ddl;
private ISchemaMapper schemaMapper;
Expand Down Expand Up @@ -81,6 +97,19 @@ public PCollection<Void> expand(PBegin input) {
PCollectionTuple rowsAndTables = input.apply("Read_rows", readerTransform.readTransform());
PCollection<SourceRow> sourceRows = rowsAndTables.get(readerTransform.sourceRowTag());

if (options.getGcsOutputDirectory() != null && !options.getGcsOutputDirectory().isEmpty()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking out loud, should this be behind a enable data validation flag instead of GCS DIR flag?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am open to it, although GCS sink functionality is a more general one, with a usage in validation. We can rename to something like gcsValidationOutputDirectory but I would advise against that.

String avroDirectory;
if (shardId.isEmpty()) {
avroDirectory = options.getGcsOutputDirectory();
} else {
avroDirectory =
FileSystems.matchNewResource(options.getGcsOutputDirectory(), true)
.resolve(shardId, StandardResolveOptions.RESOLVE_DIRECTORY)
Comment on lines +106 to +107

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are the pros/cons of choosing this over gcs_dir/table/shardId? are there other folder structures considered?

Copy link
Member Author

@manitgupta manitgupta Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a large advantage of one over the other, and this change-able if we see most customers asking for the other.
gcs_dir/shardId/table felt natural to me since today we shard jobs by shards (if we have to) instead of by table.

.toString();
}
writeToGCS(sourceRows, avroDirectory);
}

CustomTransformation customTransformation =
CustomTransformation.builder(
options.getTransformationJarPath(), options.getTransformationClassName())
Expand Down Expand Up @@ -143,4 +172,56 @@ public PCollection<Void> expand(PBegin input) {
.setCoder(SerializableCoder.of(RowContext.class)));
return spannerWriteResult.getOutput();
}

public WriteFilesResult<AvroDestination> writeToGCS(
PCollection<SourceRow> sourceRows, String gcsOutputDirectory) {
String shardIdForMetric = this.shardId;
String metricName =
StringUtils.isEmpty(shardIdForMetric)
? GCS_RECORDS_WRITTEN
: String.join("_", GCS_RECORDS_WRITTEN, shardIdForMetric);
return sourceRows.apply(
"WriteAvroToGCS",
FileIO.<AvroDestination, SourceRow>writeDynamic()
.by(
(record) ->
AvroDestination.of(
record.tableName(), record.getPayload().getSchema().toString()))
.via(
Contextful.fn(
record -> {
Metrics.counter(MigrateTableTransform.class, metricName).inc();
return record.getPayload();
}),
Contextful.fn(destination -> AvroIO.sink(destination.jsonSchema)))
.withDestinationCoder(AvroCoder.of(AvroDestination.class))
.to(gcsOutputDirectory)
.withNumShards(0)
.withNaming((SerializableFunction<AvroDestination, FileNaming>) AvroFileNaming::new));
}

static class AvroFileNaming implements FileIO.Write.FileNaming {

private final FileIO.Write.FileNaming defaultNaming;
private final AvroDestination avroDestination;

public AvroFileNaming(AvroDestination avroDestination) {
defaultNaming =
FileIO.Write.defaultNaming(DigestUtils.md5Hex(avroDestination.jsonSchema), ".avro");
this.avroDestination = avroDestination;
}

@Override
public String getFilename(
BoundedWindow window,
PaneInfo pane,
int numShards,
int shardIndex,
Compression compression) {
String subDir = avroDestination.name;
return subDir
+ "/"
+ defaultNaming.getFilename(window, pane, numShards, shardIndex, compression);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Unit tests for {@link AvroDestination}. */
@RunWith(JUnit4.class)
public class AvroDestinationTest {

@Test
public void testEqualsAndHashCode() {
AvroDestination dest1 = AvroDestination.of("table1", "schema1");
AvroDestination dest2 = AvroDestination.of("table1", "schema1");
AvroDestination dest3 = AvroDestination.of("table2", "schema1");
AvroDestination dest4 = AvroDestination.of("table1", "schema2");

assertEquals(dest1, dest2);
assertEquals(dest1.hashCode(), dest2.hashCode());
assertNotEquals(dest1, dest3);
assertNotEquals(dest1, dest4);
assertNotEquals(dest3, dest4);
}

@Test
public void testOf() {
AvroDestination destination = AvroDestination.of("tableName", "jsonSchema");
assertEquals("tableName", destination.name);
assertEquals("jsonSchema", destination.jsonSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,23 @@ resource "google_dataflow_flex_template_job" "generated" {
transformationClassName = var.common_params.transformation_class_name
transformationCustomParameters = var.common_params.transformation_custom_parameters
defaultSdkHarnessLogLevel = var.common_params.default_log_level
fetchSize = var.common_params.fetch_size
gcsOutputDirectory = var.common_params.gcs_output_directory
}

service_account_email = var.common_params.service_account_email
additional_experiments = var.common_params.additional_experiments
launcher_machine_type = var.common_params.launcher_machine_type
machine_type = var.common_params.machine_type
max_workers = var.common_params.max_workers
name = "${random_pet.job_prefixes[count.index].id}-${var.common_params.run_id}"
ip_configuration = var.common_params.ip_configuration
network = var.common_params.network != null ? var.common_params.host_project != null ? "projects/${var.common_params.host_project}/global/networks/${var.common_params.network}" : "projects/${var.common_params.project}/global/networks/${var.common_params.network}" : null
subnetwork = var.common_params.subnetwork != null ? var.common_params.host_project != null ? "https://www.googleapis.com/compute/v1/projects/${var.common_params.host_project}/regions/${var.common_params.region}/subnetworks/${var.common_params.subnetwork}" : "https://www.googleapis.com/compute/v1/projects/${var.common_params.project}/regions/${var.common_params.region}/subnetworks/${var.common_params.subnetwork}" : null
num_workers = var.common_params.num_workers
project = var.common_params.project
region = var.common_params.region
service_account_email = var.common_params.service_account_email
additional_experiments = var.common_params.additional_experiments
additional_pipeline_options = var.common_params.additional_pipeline_options
launcher_machine_type = var.common_params.launcher_machine_type
machine_type = var.common_params.machine_type
max_workers = var.common_params.max_workers
name = "${random_pet.job_prefixes[count.index].id}-${var.common_params.run_id}"
ip_configuration = var.common_params.ip_configuration
network = var.common_params.network != null ? var.common_params.host_project != null ? "projects/${var.common_params.host_project}/global/networks/${var.common_params.network}" : "projects/${var.common_params.project}/global/networks/${var.common_params.network}" : null
subnetwork = var.common_params.subnetwork != null ? var.common_params.host_project != null ? "https://www.googleapis.com/compute/v1/projects/${var.common_params.host_project}/regions/${var.common_params.region}/subnetworks/${var.common_params.subnetwork}" : "https://www.googleapis.com/compute/v1/projects/${var.common_params.project}/regions/${var.common_params.region}/subnetworks/${var.common_params.subnetwork}" : null
num_workers = var.common_params.num_workers
project = var.common_params.project
region = var.common_params.region

labels = {
"migration_id" = "${random_pet.job_prefixes[count.index].id}-${var.common_params.run_id}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ variable "common_params" {
transformation_jar_path = optional(string)
transformation_custom_parameters = optional(string)
transformation_class_name = optional(string)
fetch_size = optional(number)
gcs_output_directory = optional(string)

# Dataflow runtime parameters
additional_experiments = optional(list(string), [
"disable_runner_v2", "use_network_tags=allow-dataflow", "use_network_tags_for_flex_templates=allow-dataflow"
])
network = optional(string)
subnetwork = optional(string)
service_account_email = optional(string)
additional_pipeline_options = optional(list(string))
network = optional(string)
subnetwork = optional(string)
service_account_email = optional(string)
# Recommend using larger launcher VMs. Machine with >= 16 vCPUs should be safe.
launcher_machine_type = optional(string, "n1-highmem-32")
machine_type = optional(string, "n1-highmem-4")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,23 @@ resource "google_dataflow_flex_template_job" "generated" {
transformationClassName = var.transformation_class_name
transformationCustomParameters = var.transformation_custom_parameters
defaultSdkHarnessLogLevel = var.default_log_level
fetchSize = var.fetch_size
gcsOutputDirectory = var.gcs_output_directory
}

service_account_email = var.service_account_email
additional_experiments = var.additional_experiments
launcher_machine_type = var.launcher_machine_type
machine_type = var.machine_type
max_workers = var.max_workers
name = var.job_name
ip_configuration = var.ip_configuration
network = var.network != null ? var.host_project != null ? "projects/${var.host_project}/global/networks/${var.network}" : "projects/${var.project}/global/networks/${var.network}" : null
subnetwork = var.subnetwork != null ? var.host_project != null ? "https://www.googleapis.com/compute/v1/projects/${var.host_project}/regions/${var.region}/subnetworks/${var.subnetwork}" : "https://www.googleapis.com/compute/v1/projects/${var.project}/regions/${var.region}/subnetworks/${var.subnetwork}" : null
num_workers = var.num_workers
project = var.project
region = var.region
service_account_email = var.service_account_email
additional_experiments = var.additional_experiments
additional_pipeline_options = var.additional_pipeline_options
launcher_machine_type = var.launcher_machine_type
machine_type = var.machine_type
max_workers = var.max_workers
name = var.job_name
ip_configuration = var.ip_configuration
network = var.network != null ? var.host_project != null ? "projects/${var.host_project}/global/networks/${var.network}" : "projects/${var.project}/global/networks/${var.network}" : null
subnetwork = var.subnetwork != null ? var.host_project != null ? "https://www.googleapis.com/compute/v1/projects/${var.host_project}/regions/${var.region}/subnetworks/${var.subnetwork}" : "https://www.googleapis.com/compute/v1/projects/${var.project}/regions/${var.region}/subnetworks/${var.subnetwork}" : null
num_workers = var.num_workers
project = var.project
region = var.region

labels = {
"migration_id" = var.job_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,22 @@ variable "default_log_level" {
type = string
description = "Default log level for Dataflow jobs (e.g., 'INFO', 'DEBUG')."
default = null
}

variable "fetch_size" {
type = number
description = "Fetch size for the JDBC connection."
default = null
}

variable "gcs_output_directory" {
type = string
description = "GCS output directory for the job."
default = null
}

variable "additional_pipeline_options" {
type = list(string)
description = "Additional Dataflow pipeline options."
default = []
}
Loading