Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ private ImmutableMap<String, SourceColumnType> getTableCols(
.put("TINYTEXT", IndexType.STRING)
.put("DATETIME", IndexType.TIME_STAMP)
.put("TIMESTAMP", IndexType.TIME_STAMP)
.put("DECIMAL", IndexType.DECIMAL)
.put("YEAR", IndexType.NUMERIC)
.build();

Expand Down Expand Up @@ -452,6 +453,16 @@ private ImmutableList<SourceColumnIndexInfo> getTableIndexes(
stringMaxLength = null;
}

BigDecimal decimalStepSize = null;
if (indexType.equals(IndexType.FLOAT) || indexType.equals(IndexType.DECIMAL)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we have FLOAT type checked in yet.
Or should I hold on this reivew till FLOAT is merged in

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR has some overlap with the float PR. It would likely be simpler to start with the float PR, and then we will rebase this PR on top of that one.

if (numericScale > 0) {
decimalStepSize = BigDecimal.ONE.scaleByPowerOfTen(-numericScale);
Copy link
Contributor

@VardhanThigle VardhanThigle Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to couple this with stepSize or could we handle the scale as is?

My worry here is any rounding error in this exponentiation.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scale is explicitly required to calculate the StepSize, as the StepSize is used to calculate the smallest possible difference between two values (later on in the Splitter) and then check for inequality of two values by comparing the difference to the smallest possible difference between two values.

I don't see a way to decouple these two concepts, but I also don't expect to have rounding issues given we are using a BigDecimal and a Java built-in method to perform the exponentiation. As I understand the BigDecimal type, it is explicitly designed to handle such calculations without loss of precision.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VardhanThigle I think I see what you mean, and I expect working with the scale directly would actually simplify things a bit (I'm thinking it should allow us to use setScale and then unscaledValue to get a corresponding BigInteger, and then split with that). I'm going to work on swapping out the decimal step size in favour of just the scale itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also say, I personally think it makes more sense for the SourceColumnIndexInfo class to have just the numeric scale, and then this gets used in various ways depending on the column type (since at the end of the day, the idea of "scale" is the same for decimal, double, and float, we just make use of it in different ways). But that can be a later improvement/refactoring.

} else {
decimalStepSize = BigDecimal.ONE;
}
}


indexesBuilder.add(
SourceColumnIndexInfo.builder()
.setColumnName(colName)
Expand All @@ -463,6 +474,7 @@ private ImmutableList<SourceColumnIndexInfo> getTableIndexes(
.setIndexType(indexType)
.setCollationReference(collationReference)
.setStringMaxLength(stringMaxLength)
.setDecimalStepSize(decimalStepSize)
.build());
}
} catch (java.sql.SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,29 @@ public class BoundarySplitterFactory {
private static final BigInteger SECONDS_TO_NANOS =
BigInteger.valueOf(Duration.ofSeconds(1).toNanos());

private static final BoundarySplitter<BigDecimal> BIG_DECIMAL_SPLITTER =
new BoundarySplitter<BigDecimal>() {
@Override
public BigDecimal split(
BigDecimal start,
BigDecimal end,
PartitionColumn partitionColumn,
BoundaryTypeMapper typeMapper,
DoFn.ProcessContext c) {
return splitBigDecimal(start, end);
}

@Override
public boolean isSplittable(BigDecimal start, BigDecimal end) {
return BoundarySplitterFactory.this.isSplittable(start, end);
}

@Override
public boolean areValuesEqual(Object valueA, Object valueB) {
return BoundarySplitterFactory.this.areValuesEqual(valueA, valueB);
}
};

private static final ImmutableMap<Class, BoundarySplitter<?>> splittermap =
ImmutableMap.<Class, BoundarySplitter<?>>builder()
.put(
Expand All @@ -51,11 +74,7 @@ public class BoundarySplitterFactory {
(BoundarySplitter<BigInteger>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitBigIntegers(start, end))
.put(
BigDecimal.class,
(BoundarySplitter<BigDecimal>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitBigDecimal(start, end))
.put(BigDecimal.class, BIG_DECIMAL_SPLITTER)
.put(String.class, (BoundarySplitter<String>) BoundarySplitterFactory::splitStrings)
.put(
BYTE_ARRAY_CLASS,
Expand Down Expand Up @@ -163,13 +182,44 @@ private static Long splitLongs(Long start, Long end) {
}

private static BigDecimal splitBigDecimal(BigDecimal start, BigDecimal end) {
BigInteger startBigInt = (start == null) ? null : start.toBigInteger();
BigInteger endBigInt = (end == null) ? null : end.toBigInteger();
BigInteger split = splitBigIntegers(startBigInt, endBigInt);
if (split == null) {
if (start == null && end == null) {
return null;
}
return new BigDecimal(split);
if (start == null) {
start = BigDecimal.ZERO;
}
if (end == null) {
end = BigDecimal.ZERO;
}

int scale = Math.max(start.scale(), end.scale());

BigInteger startInt = start.movePointRight(scale).toBigInteger();
BigInteger endInt = end.movePointRight(scale).toBigInteger();

BigInteger splitInt = splitBigIntegers(startInt, endInt);
if (splitInt == null) {
return null;
}
return new BigDecimal(splitInt, scale);
}

public boolean areValuesEqual(Object valueA, Object valueB) {
if (valueA instanceof BigDecimal b1 && valueB instanceof BigDecimal b2) {
BigDecimal diff = b1.subtract(b2).abs();
return diff.compareTo(decimalStepSize())
< 0; // provided by SourceColumnIndexInfo after Float PR
}
return Objects.equals(valueA, valueB);
}

public boolean isSplittable(BigDecimal start, BigDecimal end) {
BigDecimal mid = splitBigDecimal(start, end);
if (mid == null) {
return false;
}
// Only splittable if mid is distinct from start and end at schema precision
return !(areValuesEqual(mid, start) || areValuesEqual(mid, end));
}

private static byte[] splitBytes(byte[] start, byte[] end) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.base.Preconditions;
import java.io.Serializable;
import javax.annotation.Nullable;
import java.math.BigDecimal;

/** Details about a partition column. */
@AutoValue
Expand All @@ -35,6 +36,9 @@ public abstract class PartitionColumn implements Serializable {
*/
public abstract Class columnClass();

@Nullable
public abstract BigDecimal decimalStepSize();

/**
* String Collation. Must be set for if {@link PartitionColumn#columnClass()} is {@link String}
* and must not be set otherwise. Defaults to null.
Expand All @@ -51,7 +55,8 @@ public abstract class PartitionColumn implements Serializable {
public static Builder builder() {
return new AutoValue_PartitionColumn.Builder()
.setStringCollation(null)
.setStringMaxLength(null);
.setStringMaxLength(null)
.setDecimalStepSize(null);
}

public abstract Builder toBuilder();
Expand All @@ -67,6 +72,8 @@ public abstract static class Builder {

public abstract Builder setStringMaxLength(Integer value);

public abstract Builder setDecimalStepSize(BigDecimal value);

abstract PartitionColumn autoBuild();

public PartitionColumn build() {
Expand All @@ -80,6 +87,11 @@ public PartitionColumn build() {
&& partitionColumn.stringMaxLength() == null),
"String columns must specify collation, and non string columns must not specify colaltion. PartitionColum = "
+ partitionColumn);
Preconditions.checkState(
(partitionColumn.columnClass() == BigDecimal.class
&& partitionColumn.decimalStepSize() != null)
|| (partitionColumn.columnClass() != BigDecimal.class),
"Decimal columns must specify decimalStepSize. PartitionColumn = " + partitionColumn);
return partitionColumn;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Timestamp;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -143,6 +144,7 @@ public enum IndexType {
BINARY,
STRING,
TIME_STAMP,
DECIMAL,
OTHER
};

Expand All @@ -153,5 +155,6 @@ public enum IndexType {
IndexType.STRING, String.class,
IndexType.BIG_INT_UNSIGNED, BigDecimal.class,
IndexType.BINARY, BoundaryExtractorFactory.BYTE_ARRAY_CLASS,
IndexType.TIME_STAMP, Timestamp.class);
IndexType.TIME_STAMP, Timestamp.class,
IndexType.DECIMAL, BigDecimal.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ public void testDiscoverTablesRsException() throws SQLException {
public void testDiscoverIndexesBasic() throws SQLException, RetriableSchemaDiscoveryException {
ImmutableList<String> testTables = ImmutableList.of("testTable1");
ImmutableList<String> colTypes =
ImmutableList.of("float", "integer", "bit", "char", "varbinary", "binary", "year");
ImmutableList.of(
"float", "integer", "bit", "char", "varbinary", "binary", "decimal", "year");
ImmutableList<SourceColumnIndexInfo> expectedSourceColumnIndexInfos =
ImmutableList.of(
SourceColumnIndexInfo.builder()
Expand Down Expand Up @@ -343,16 +344,28 @@ public void testDiscoverIndexesBasic() throws SQLException, RetriableSchemaDisco
.setCardinality(42L)
.setIndexType(IndexType.BINARY)
.setOrdinalPosition(4)
.build(),
SourceColumnIndexInfo.builder()
.setColumnName("testColDecimal")
.setIndexName("primary")
.setIsUnique(true)
.setIsPrimary(true)
.setCardinality(42L)
.setIndexType(IndexType.DECIMAL)
.setOrdinalPosition(5)
.setPrecision(10)
.setScale(2)
.setDecimalStepSize(BigDecimal.ONE.scaleByPowerOfTen(-2))
.build(),
SourceColumnIndexInfo.builder()
.setColumnName("testColYear")
.setIndexName("primary")
.setIsUnique(true)
.setIsPrimary(true)
.setCardinality(100L)
.setIndexType(IndexType.NUMERIC)
.setOrdinalPosition(6)
.build());
SourceColumnIndexInfo.builder()
.setColumnName("testColYear")
.setIndexName("primary")
.setIsUnique(true)
.setIsPrimary(true)
.setCardinality(100L)
.setIndexType(IndexType.NUMERIC)
.setOrdinalPosition(6)
.build();

final JdbcSchemaReference sourceSchemaReference =
JdbcSchemaReference.builder().setDbName("testDB").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,25 @@ public void testBigDecimalBoundarySplitter() {
assertThat(splitter.getSplitPoint(null, fortyTwo, null, null, null)).isEqualTo(twentyOne);
}

@Test
public void testBigDecimalBoundarySplitterWithScale() {
BoundarySplitter<BigDecimal> splitter = BoundarySplitterFactory.create(BigDecimal.class);
BigDecimal start = new BigDecimal("1.23");
BigDecimal end = new BigDecimal("1.2345");
BigDecimal expectedMid = new BigDecimal("1.23225");

assertThat(splitter.getSplitPoint(start, end, null, null, null)).isEqualTo(expectedMid);

BigDecimal same = new BigDecimal("42.42");

assertThat(splitter.getSplitPoint(same, same, null, null, null)).isEqualTo(same);
assertThat(splitter.getSplitPoint(null, null, null, null, null)).isNull();
assertThat(splitter.getSplitPoint(null, end, null, null, null))
.isEqualTo(new BigDecimal("0.61725"));
assertThat(splitter.getSplitPoint(start, null, null, null, null))
.isEqualTo(new BigDecimal("0.615"));
}

@Test
public void testBytesIntegerBoundarySplitter() {
BoundarySplitter<byte[]> splitter = BoundarySplitterFactory.create(BYTE_ARRAY_CLASS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ private Map<String, List<Map<String, Object>>> getExpectedData() {
"2005-01-01T00:01:54.123456000Z",
"2037-12-30T23:59:59Z",
"2038-01-18T23:59:59Z"));
expectedData.put("decimal_pk", createRows("decimal_pk", "12345.67", "0.01", "99999999.99"));
expectedData.put("year_pk", createRows("year_pk", "1901", "2000"));
return expectedData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ CREATE TABLE year_pk_table (
year_pk_col YEAR NOT NULL
);

CREATE TABLE decimal_pk_table (
id DECIMAL(10,2) PRIMARY KEY,
decimal_pk_col DECIMAL(10,2) NOT NULL
);

ALTER TABLE `bigint_table` MODIFY `id` INT AUTO_INCREMENT;
ALTER TABLE `bigint_unsigned_table` MODIFY `id` INT AUTO_INCREMENT;
ALTER TABLE `binary_table` MODIFY `id` INT AUTO_INCREMENT;
Expand Down Expand Up @@ -454,6 +459,8 @@ INSERT INTO `timestamp_pk_table` (`id`, `timestamp_pk_col`) VALUES ('2005-01-01
SET time_zone = SYSTEM;
INSERT INTO `year_pk_table` (`id`, `year_pk_col`) VALUES (1901, 1901), (2000, 2000);

INSERT INTO `decimal_pk_table` (`id`, `decimal_pk_col`) VALUES (12345.67, 12345.67), (0.01, 0.01), (99999999.99, 99999999.99);

INSERT INTO `bigint_table` (`bigint_col`) VALUES (NULL);
INSERT INTO `bigint_unsigned_table` (`bigint_unsigned_col`) VALUES (NULL);
INSERT INTO `binary_table` (`binary_col`) VALUES (NULL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,11 @@ CREATE TABLE IF NOT EXISTS timestamp_pk_table (
timestamp_pk_col TIMESTAMP NOT NULL,
) PRIMARY KEY(id);

CREATE TABLE IF NOT EXISTS decimal_pk_table (
id NUMERIC(10,2) NOT NULL,
decimal_pk_col NUMERIC(10,2) NOT NULL,
) PRIMARY KEY(id);

CREATE TABLE IF NOT EXISTS year_pk_table (
id INT64 NOT NULL,
year_pk_col INT64 NOT NULL,
Expand Down
Loading