From e1855005d39ec9c21bca41339848e896bc2ccc59 Mon Sep 17 00:00:00 2001 From: "Jalpreet Singh Nanda (:imjalpreet)" Date: Tue, 28 Apr 2026 01:22:38 +0530 Subject: [PATCH] Revert "Upgrade to Cassandra Java Driver 4.x (#298)" This reverts commit 0da8dcfffe038434613e49a45814abadf5570a56. --- build.gradle | 4 +- .../table/cassandra/CassandraBatchLoader.java | 34 ++--- .../query/CassandraQueryExecutor.java | 133 ++++++++---------- tempto-examples/docker/docker-compose.yml | 2 +- .../main/resources/tempto-configuration.yaml | 3 - 5 files changed, 77 insertions(+), 99 deletions(-) diff --git a/build.gradle b/build.gradle index 00ef19d6..bab25db4 100644 --- a/build.gradle +++ b/build.gradle @@ -58,7 +58,7 @@ ext.versions = [ freemarker : '2.3.22', objenesis : '1.4', jackson : '2.4.4', - cassandra : '4.19.2', + cassandra : '3.4.0', commons_cli : '1.3.1', thrift : '0.16.0', kafka : '3.9.0', @@ -102,7 +102,7 @@ ext.libraries = [ objenesis : "org.objenesis:objenesis:${versions.objenesis}", jackson_databind : "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}", jackson_datatype_jdk8: "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:${versions.jackson}", - cassandra_driver : "org.apache.cassandra:java-driver-core:${versions.cassandra}", + cassandra_driver : "com.datastax.cassandra:cassandra-driver-core:${versions.cassandra}", commons_cli : "commons-cli:commons-cli:${versions.commons_cli}", thrift : "org.apache.thrift:libthrift:${versions.thrift}", kafka_clients : "org.apache.kafka:kafka-clients:${versions.kafka}", diff --git a/tempto-core/src/main/java/io/prestodb/tempto/internal/fulfillment/table/cassandra/CassandraBatchLoader.java b/tempto-core/src/main/java/io/prestodb/tempto/internal/fulfillment/table/cassandra/CassandraBatchLoader.java index 2772d23f..6141d123 100644 --- a/tempto-core/src/main/java/io/prestodb/tempto/internal/fulfillment/table/cassandra/CassandraBatchLoader.java +++ b/tempto-core/src/main/java/io/prestodb/tempto/internal/fulfillment/table/cassandra/CassandraBatchLoader.java @@ -13,11 +13,9 @@ */ package io.prestodb.tempto.internal.fulfillment.table.cassandra; -import com.datastax.oss.driver.api.core.CqlSession; -import com.datastax.oss.driver.api.core.cql.BatchStatement; -import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder; -import com.datastax.oss.driver.api.core.cql.DefaultBatchType; -import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; import java.util.Iterator; import java.util.List; @@ -30,12 +28,12 @@ public class CassandraBatchLoader { - private final CqlSession session; + private final Session session; private final String insertQuery; private final int columnsCount; private final int batchRowsCount; - public CassandraBatchLoader(CqlSession session, String tableName, List columnNames, int batchRowsCount) + public CassandraBatchLoader(Session session, String tableName, List columnNames, int batchRowsCount) { this.session = requireNonNull(session, "session is null"); requireNonNull(tableName, "tableName is null"); @@ -69,28 +67,24 @@ public void load(Iterator> rows) { PreparedStatement statement = session.prepare(insertQuery); - BatchStatementBuilder batchBuilder = createBatchStatementBuilder(); - int currentBatchSize = 0; - + BatchStatement batch = createBatchStatement(); while (rows.hasNext()) { - if (currentBatchSize >= batchRowsCount) { - session.execute(batchBuilder.build()); - batchBuilder = createBatchStatementBuilder(); - currentBatchSize = 0; + if (batch.size() >= batchRowsCount) { + session.execute(batch); + batch = createBatchStatement(); } List row = rows.next(); checkState(row.size() == columnsCount, "values count in a row is expected to be %d, but found: %d", columnsCount, row.size()); - batchBuilder.addStatement(statement.bind(row.toArray())); - currentBatchSize++; + batch.add(statement.bind(row.toArray())); } - if (currentBatchSize > 0) { - session.execute(batchBuilder.build()); + if (batch.size() > 0) { + session.execute(batch); } } - private static BatchStatementBuilder createBatchStatementBuilder() + private static BatchStatement createBatchStatement() { - return BatchStatement.builder(DefaultBatchType.UNLOGGED); + return new BatchStatement(BatchStatement.Type.UNLOGGED); } } diff --git a/tempto-core/src/main/java/io/prestodb/tempto/internal/query/CassandraQueryExecutor.java b/tempto-core/src/main/java/io/prestodb/tempto/internal/query/CassandraQueryExecutor.java index e3351866..cc6b6f8b 100644 --- a/tempto-core/src/main/java/io/prestodb/tempto/internal/query/CassandraQueryExecutor.java +++ b/tempto-core/src/main/java/io/prestodb/tempto/internal/query/CassandraQueryExecutor.java @@ -13,27 +13,25 @@ */ package io.prestodb.tempto.internal.query; -import com.datastax.oss.driver.api.core.CqlSession; -import com.datastax.oss.driver.api.core.cql.ColumnDefinition; -import com.datastax.oss.driver.api.core.cql.ResultSet; -import com.datastax.oss.driver.api.core.cql.Row; -import com.datastax.oss.driver.api.core.metadata.Metadata; -import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata; -import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; -import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; -import com.datastax.oss.driver.api.core.type.DataType; -import com.datastax.oss.driver.api.core.type.DataTypes; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TableMetadata; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.prestodb.tempto.configuration.Configuration; import io.prestodb.tempto.query.QueryExecutionException; import io.prestodb.tempto.query.QueryResult; -import java.net.InetSocketAddress; import java.sql.JDBCType; import java.util.List; import java.util.Map; -import java.util.Optional; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Lists.newArrayList; @@ -44,25 +42,27 @@ public class CassandraQueryExecutor implements AutoCloseable { private static final Map typeMapping; - private final CqlSession session; + private final Cluster cluster; + private Session session; static { typeMapping = ImmutableMap.builder() - .put(DataTypes.ASCII, JDBCType.VARCHAR) - .put(DataTypes.BIGINT, JDBCType.BIGINT) - .put(DataTypes.BLOB, JDBCType.BLOB) - .put(DataTypes.BOOLEAN, JDBCType.BOOLEAN) - .put(DataTypes.COUNTER, JDBCType.BIGINT) - .put(DataTypes.DATE, JDBCType.DATE) - .put(DataTypes.DECIMAL, JDBCType.DECIMAL) - .put(DataTypes.DOUBLE, JDBCType.DOUBLE) - .put(DataTypes.FLOAT, JDBCType.REAL) - .put(DataTypes.INT, JDBCType.INTEGER) - .put(DataTypes.SMALLINT, JDBCType.SMALLINT) - .put(DataTypes.TEXT, JDBCType.VARCHAR) - .put(DataTypes.TIME, JDBCType.TIME) - .put(DataTypes.TIMESTAMP, JDBCType.TIMESTAMP) - .put(DataTypes.TINYINT, JDBCType.TINYINT) + .put(DataType.ascii(), JDBCType.VARCHAR) + .put(DataType.bigint(), JDBCType.BIGINT) + .put(DataType.blob(), JDBCType.BLOB) + .put(DataType.cboolean(), JDBCType.BOOLEAN) + .put(DataType.counter(), JDBCType.BIGINT) + .put(DataType.date(), JDBCType.DATE) + .put(DataType.decimal(), JDBCType.DECIMAL) + .put(DataType.cdouble(), JDBCType.DOUBLE) + .put(DataType.cfloat(), JDBCType.REAL) + .put(DataType.cint(), JDBCType.INTEGER) + .put(DataType.smallint(), JDBCType.SMALLINT) + //.put(DataType.text(), JDBCType.NVARCHAR) + .put(DataType.time(), JDBCType.TIME) + .put(DataType.timestamp(), JDBCType.TIMESTAMP) + .put(DataType.tinyint(), JDBCType.TINYINT) + .put(DataType.varchar(), JDBCType.VARCHAR) .build(); } @@ -77,36 +77,25 @@ public static class TypeNotSupportedException public CassandraQueryExecutor(Configuration configuration) { - String host = configuration.getStringMandatory("databases.cassandra.host"); - int port = configuration.getIntMandatory("databases.cassandra.port"); - String dc = configuration.getString("databases.cassandra.datacenter").orElse("datacenter1"); - - // Driver 4.x requires a local datacenter to be specified - // Using "datacenter1" as the default, which is the standard for single-datacenter deployments - session = CqlSession.builder() - .addContactPoint(new InetSocketAddress(host, port)) - .withLocalDatacenter(dc) + cluster = Cluster.builder() + .addContactPoint(configuration.getStringMandatory("databases.cassandra.host")) + .withPort(configuration.getIntMandatory("databases.cassandra.port")) .build(); } public QueryResult executeQuery(String sql) throws QueryExecutionException { - checkState(!session.isClosed(), "Trying to execute query using closed Session"); + ensureConnected(); ResultSet rs = session.execute(sql); - List definitions = newArrayList(); - for (ColumnDefinition def : rs.getColumnDefinitions()) { - definitions.add(def); - } - + List definitions = rs.getColumnDefinitions().asList(); List types = definitions.stream() .map(definition -> getJDBCType(definition.getType())) .collect(toList()); List columnNames = definitions.stream() - .map(ColumnDefinition::getName) - .map(Object::toString) + .map(ColumnDefinitions.Definition::getName) .collect(toList()); QueryResult.QueryResultBuilder resultBuilder = new QueryResult.QueryResultBuilder(types, columnNames); @@ -114,7 +103,7 @@ public QueryResult executeQuery(String sql) for (Row row : rs) { List builderRow = newArrayList(); for (int i = 0; i < types.size(); ++i) { - builderRow.add(row.getObject(i)); + builderRow.add(row.getToken(i).getValue()); } resultBuilder.addRow(builderRow); } @@ -122,61 +111,59 @@ public QueryResult executeQuery(String sql) return resultBuilder.build(); } - public CqlSession getSession() + public Session getSession() { return session; } - public List getColumnNames(String keyspaceName, String tableName) + public List getColumnNames(String keySpace, String tableName) { - Optional keyspaceMetadata = session.getMetadata().getKeyspace(keyspaceName); - if (!keyspaceMetadata.isPresent()) { - throw new IllegalStateException(format("Keyspace %s does not exist", keyspaceName)); - } - Optional tableMetadata = keyspaceMetadata.get().getTable(tableName); - if (!tableMetadata.isPresent()) { - throw new IllegalStateException(format("Table %s.%s does not exist", keyspaceName, tableName)); - } - return tableMetadata.get().getColumns().values().stream() - .map(ColumnMetadata::getName) - .map(Object::toString) - .collect(toList()); + checkState(tableExists(keySpace, tableName), "table %s.%s does not exist", keySpace, tableName); + KeyspaceMetadata keyspaceMetadata = session.getCluster().getMetadata().getKeyspace(keySpace); + TableMetadata tableMetadata = keyspaceMetadata.getTable(tableName); + return tableMetadata.getColumns().stream().map(ColumnMetadata::getName).collect(toList()); } - public boolean tableExists(String keyspaceName, String tableName) + public boolean tableExists(String keySpace, String tableName) { - Optional keyspaceMetadata = session.getMetadata().getKeyspace(keyspaceName); - if (!keyspaceMetadata.isPresent()) { + KeyspaceMetadata keyspaceMetadata = cluster.getMetadata().getKeyspace(keySpace); + if (keyspaceMetadata == null) { return false; } - return keyspaceMetadata.get().getTable(tableName).isPresent(); + return keyspaceMetadata.getTable(tableName) != null; } - public List getTableNames(String keyspaceName) + public List getTableNames(String keySpace) { - Metadata clusterMetadata = session.getMetadata(); - Optional keyspaceMetadata = clusterMetadata.getKeyspace(keyspaceName); - if (!keyspaceMetadata.isPresent()) { + Metadata clusterMetadata = cluster.getMetadata(); + KeyspaceMetadata keyspaceMetadata = clusterMetadata.getKeyspace(keySpace); + if (keyspaceMetadata == null) { return ImmutableList.of(); } - return keyspaceMetadata.get().getTables().values().stream() + return keyspaceMetadata.getTables().stream() .map(TableMetadata::getName) - .map(Object::toString) .collect(toList()); } @Override public void close() { - if (session != null && !session.isClosed()) { - session.close(); + cluster.close(); + } + + private void ensureConnected() + { + checkState(!cluster.isClosed(), "Trying to connect using closed Cluster"); + + if (session == null || session.isClosed()) { + session = cluster.connect(); } } private static JDBCType getJDBCType(DataType type) { JDBCType jdbcType = typeMapping.get(type); - if (jdbcType == null) { + if (type == null) { throw new TypeNotSupportedException(type); } diff --git a/tempto-examples/docker/docker-compose.yml b/tempto-examples/docker/docker-compose.yml index d6c25f9a..be8810dc 100644 --- a/tempto-examples/docker/docker-compose.yml +++ b/tempto-examples/docker/docker-compose.yml @@ -32,7 +32,7 @@ services: cassandra: hostname: cassandra - image: 'cassandra:3.11.19' + image: 'cassandra:2.1.15' ports: - '9042:9042' - '9160:9160' diff --git a/tempto-examples/src/main/resources/tempto-configuration.yaml b/tempto-examples/src/main/resources/tempto-configuration.yaml index 9ea80b30..63f1449f 100644 --- a/tempto-examples/src/main/resources/tempto-configuration.yaml +++ b/tempto-examples/src/main/resources/tempto-configuration.yaml @@ -71,9 +71,6 @@ databases: cassandra: host: ${cluster.cassandra} port: 9042 - datacenter: datacenter1 - keyspace: tempto - table_manager_type: cassandra default_schema: test skip_create_schema: false table_manager_type: cassandra