Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5ea40aa
Update tests for GetTables -- start refactor to use proper schemas
Jul 16, 2021
596205b
Flight SQL Ratification Based On Community Feedback #7 (#98)
rafael-telles Aug 26, 2021
7e0fc85
Fix maven build from different directories (#114)
rafael-telles Sep 3, 2021
fc5a221
Add CrossReference methods to SqlProducer
jcralmeida Oct 18, 2021
ddac189
Create a module for keeping all Arrow Flight-related submodules
Jun 4, 2021
a996080
Fix checkstyle violations in Arrow Flight JDBC Driver
Jun 4, 2021
4650555
Create a module for keeping all Arrow Flight-related submodules
Jun 4, 2021
99cfa1b
Fix checkstyle violations in Arrow Flight JDBC Driver
Jun 4, 2021
695e92f
Create a module for keeping all Arrow Flight-related submodules
Jun 4, 2021
2a723e4
Fix checkstyle violations in Arrow Flight JDBC Driver
Jun 4, 2021
ec56507
Create a module for keeping all Arrow Flight-related submodules
Jun 4, 2021
ec5cd1b
Fix checkstyle violations in Arrow Flight JDBC Driver
Jun 4, 2021
3de26e5
Fix rebase issues
vfraga Oct 28, 2021
20be27f
Add missing metadata on Arrow schemas returned by Flight SQL's GetTab…
rafael-telles Dec 9, 2021
a1caf7e
Change unsupported operation exception to SQL exception
escobargabriel Mar 9, 2022
b65103e
Make getOperationSupported return a SQL Exception
escobargabriel Mar 9, 2022
6a314b2
Throw the exception and ensure that client will be closed
escobargabriel Mar 28, 2022
6d1c811
Map Runtime exceptions to SQL exceptions
escobargabriel Mar 28, 2022
427270a
Test mapping passing error codes
escobargabriel Mar 28, 2022
281f49a
Remove withDescription from the test
escobargabriel Mar 28, 2022
50d9682
Change variable name from map to exceptionsMap
escobargabriel Mar 28, 2022
62d4b07
Add a prefix to variable names
escobargabriel Mar 28, 2022
4f5f07b
Make methods use the mapped runtime exceptions to SQL exceptions
escobargabriel Mar 28, 2022
546c9fe
Change messages return
escobargabriel Mar 28, 2022
3a38fb8
Change to return appropriate exceptions messages
escobargabriel Mar 28, 2022
19d1fde
Fix rebase automerge
vfraga Mar 29, 2022
57985ee
Fix rebase automerge 2
vfraga Mar 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import java.util.concurrent.TimeUnit;

import org.apache.arrow.driver.jdbc.utils.FlightStreamQueue;
import org.apache.arrow.driver.jdbc.utils.FlightToJDBCExceptionMapper;
import org.apache.arrow.driver.jdbc.utils.VectorSchemaRootTransformer;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorSchemaRoot;
Expand Down Expand Up @@ -162,45 +164,49 @@ private void executeForCurrentFlightStream() throws SQLException {

@Override
public boolean next() throws SQLException {
if (currentVectorSchemaRoot == null) {
return false;
}
while (true) {
final boolean hasNext = super.next();
final int maxRows = statement != null ? statement.getMaxRows() : 0;
if (maxRows != 0 && this.getRow() > maxRows) {
if (statement.isCloseOnCompletion()) {
statement.close();
}
try {
if (currentVectorSchemaRoot == null) {
return false;
}
while (true) {
final boolean hasNext = super.next();
final int maxRows = statement != null ? statement.getMaxRows() : 0;
if (maxRows != 0 && this.getRow() > maxRows) {
if (statement.isCloseOnCompletion()) {
statement.close();
}
return false;
}

if (hasNext) {
return true;
}
if (hasNext) {
return true;
}

if (currentFlightStream != null) {
currentFlightStream.getRoot().clear();
if (currentFlightStream.next()) {
executeForCurrentFlightStream();
continue;
if (currentFlightStream != null) {
currentFlightStream.getRoot().clear();
if (currentFlightStream.next()) {
executeForCurrentFlightStream();
continue;
}

flightStreamQueue.enqueue(currentFlightStream);
}

flightStreamQueue.enqueue(currentFlightStream);
}
currentFlightStream = getNextFlightStream(false);

currentFlightStream = getNextFlightStream(false);
if (currentFlightStream != null) {
executeForCurrentFlightStream();
continue;
}

if (currentFlightStream != null) {
executeForCurrentFlightStream();
continue;
}
if (statement != null && statement.isCloseOnCompletion()) {
statement.close();
}

if (statement != null && statement.isCloseOnCompletion()) {
statement.close();
return false;
}

return false;
} catch (final FlightRuntimeException e) {
throw FlightToJDBCExceptionMapper.map(e, "Failed to retrieve next row on ResultSet.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.stream.Collectors;

import org.apache.arrow.driver.jdbc.client.utils.ClientAuthenticationUtils;
import org.apache.arrow.driver.jdbc.utils.FlightToJDBCExceptionMapper;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightClientMiddleware;
Expand Down Expand Up @@ -92,11 +93,15 @@ private CallOption[] getOptions() {
* @param flightInfo The {@link FlightInfo} instance from which to fetch results.
* @return a {@code FlightStream} of results.
*/
public List<FlightStream> getStreams(final FlightInfo flightInfo) {
return flightInfo.getEndpoints().stream()
.map(FlightEndpoint::getTicket)
.map(ticket -> sqlClient.getStream(ticket, getOptions()))
.collect(Collectors.toList());
public List<FlightStream> getStreams(final FlightInfo flightInfo) throws SQLException {
try {
return flightInfo.getEndpoints().stream()
.map(FlightEndpoint::getTicket)
.map(ticket -> sqlClient.getStream(ticket, getOptions()))
.collect(Collectors.toList());
} catch (final FlightRuntimeException e) {
throw FlightToJDBCExceptionMapper.map(e, "Failed to get streams.");
}
}

/**
Expand Down Expand Up @@ -162,7 +167,8 @@ public interface PreparedStatement extends AutoCloseable {
* @param query the SQL query.
* @return a new prepared statement.
*/
public PreparedStatement prepare(final String query) {
public PreparedStatement prepare(final String query) throws SQLException {

final FlightSqlClient.PreparedStatement preparedStatement =
sqlClient.prepare(query, getOptions());
return new PreparedStatement() {
Expand Down Expand Up @@ -199,8 +205,12 @@ public void close() {
*
* @return a {@code FlightStream} of results.
*/
public FlightInfo getCatalogs() {
return sqlClient.getCatalogs(getOptions());
public FlightInfo getCatalogs() throws SQLException {
try {
return sqlClient.getCatalogs(getOptions());
} catch (final FlightRuntimeException e) {
throw FlightToJDBCExceptionMapper.map(e, "Failed to get catalogs.");
}
}

/**
Expand All @@ -215,8 +225,12 @@ public FlightInfo getCatalogs() {
* @param table The table name. Must match the table name as it is stored in the database.
* @return a {@code FlightStream} of results.
*/
public FlightInfo getImportedKeys(final String catalog, final String schema, final String table) {
return sqlClient.getImportedKeys(TableRef.of(catalog, schema, table), getOptions());
public FlightInfo getImportedKeys(final String catalog, final String schema, final String table) throws SQLException {
try {
return sqlClient.getImportedKeys(TableRef.of(catalog, schema, table), getOptions());
} catch (final FlightRuntimeException e) {
throw FlightToJDBCExceptionMapper.map(e, "Failed to get imported keys.");
}
}

/**
Expand All @@ -231,8 +245,12 @@ public FlightInfo getImportedKeys(final String catalog, final String schema, fin
* @param table The table name. Must match the table name as it is stored in the database.
* @return a {@code FlightStream} of results.
*/
public FlightInfo getExportedKeys(final String catalog, final String schema, final String table) {
return sqlClient.getExportedKeys(TableRef.of(catalog, schema, table), getOptions());
public FlightInfo getExportedKeys(final String catalog, final String schema, final String table) throws SQLException {
try {
return sqlClient.getExportedKeys(TableRef.of(catalog, schema, table), getOptions());
} catch (final FlightRuntimeException e) {
throw FlightToJDBCExceptionMapper.map(e, "Failed to get exported keys.");
}
}

/**
Expand All @@ -245,17 +263,25 @@ public FlightInfo getExportedKeys(final String catalog, final String schema, fin
* Null means that schema name should not be used to narrow down the search.
* @return a {@code FlightStream} of results.
*/
public FlightInfo getSchemas(final String catalog, final String schemaPattern) {
return sqlClient.getSchemas(catalog, schemaPattern, getOptions());
public FlightInfo getSchemas(final String catalog, final String schemaPattern) throws SQLException {
try {
return sqlClient.getSchemas(catalog, schemaPattern, getOptions());
} catch (final FlightRuntimeException e) {
throw FlightToJDBCExceptionMapper.map(e, "Failed to get schemas.");
}
}

/**
* Makes an RPC "getTableTypes" request.
*
* @return a {@code FlightStream} of results.
*/
public FlightInfo getTableTypes() {
return sqlClient.getTableTypes(getOptions());
public FlightInfo getTableTypes() throws SQLException {
try {
return sqlClient.getTableTypes(getOptions());
} catch (final FlightRuntimeException e) {
throw FlightToJDBCExceptionMapper.map(e, "Failed to get table types.");
}
}

/**
Expand All @@ -275,19 +301,26 @@ public FlightInfo getTableTypes() {
*/
public FlightInfo getTables(final String catalog, final String schemaPattern,
final String tableNamePattern,
final List<String> types, final boolean includeSchema) {

return sqlClient.getTables(catalog, schemaPattern, tableNamePattern, types, includeSchema,
getOptions());
final List<String> types, final boolean includeSchema) throws SQLException {
try {
return sqlClient.getTables(catalog, schemaPattern, tableNamePattern, types, includeSchema,
getOptions());
} catch (final FlightRuntimeException e) {
throw FlightToJDBCExceptionMapper.map(e, "Failed to get tables.");
}
}

/**
* Gets SQL info.
*
* @return the SQL info.
*/
public FlightInfo getSqlInfo(SqlInfo... info) {
return sqlClient.getSqlInfo(info, getOptions());
public FlightInfo getSqlInfo(SqlInfo... info) throws SQLException {
try {
return sqlClient.getSqlInfo(info, getOptions());
} catch (final FlightRuntimeException e) {
throw FlightToJDBCExceptionMapper.map(e, "Failed to get SQL info.");
}
}

/**
Expand All @@ -302,8 +335,12 @@ public FlightInfo getSqlInfo(SqlInfo... info) {
* @param table The table name. Must match the table name as it is stored in the database.
* @return a {@code FlightStream} of results.
*/
public FlightInfo getPrimaryKeys(final String catalog, final String schema, final String table) {
return sqlClient.getPrimaryKeys(TableRef.of(catalog, schema, table), getOptions());
public FlightInfo getPrimaryKeys(final String catalog, final String schema, final String table) throws SQLException {
try {
return sqlClient.getPrimaryKeys(TableRef.of(catalog, schema, table), getOptions());
} catch (final FlightRuntimeException e) {
throw FlightToJDBCExceptionMapper.map(e, "Failed to get primary keys.");
}
}

/**
Expand All @@ -326,10 +363,14 @@ public FlightInfo getPrimaryKeys(final String catalog, final String schema, fina
* @return a {@code FlightStream} of results.
*/
public FlightInfo getCrossReference(String pkCatalog, String pkSchema, String pkTable,
String fkCatalog, String fkSchema, String fkTable) {
return sqlClient.getCrossReference(TableRef.of(pkCatalog, pkSchema, pkTable),
TableRef.of(fkCatalog, fkSchema, fkTable),
getOptions());
String fkCatalog, String fkSchema, String fkTable) throws SQLException {
try {
return sqlClient.getCrossReference(TableRef.of(pkCatalog, pkSchema, pkTable),
TableRef.of(fkCatalog, fkSchema, fkTable),
getOptions());
} catch (final FlightRuntimeException e) {
throw FlightToJDBCExceptionMapper.map(e, "Failed to get cross reference.");
}
}

/**
Expand Down Expand Up @@ -427,8 +468,9 @@ public Builder withTlsEncryption(final boolean useTls) {

/**
* Sets the token used in the token authetication.
*
* @param token the token value.
* @return this builder instance.
* @return this builder instance.
*/
public Builder withToken(final String token) {
this.token = token;
Expand Down Expand Up @@ -531,8 +573,8 @@ public ArrowFlightSqlClientHandler build() throws SQLException {
}
return ArrowFlightSqlClientHandler.createNewHandler(client, options);

} catch (final IllegalArgumentException | GeneralSecurityException | IOException | FlightRuntimeException e) {
final SQLException originalException = new SQLException(e);
} catch (final IllegalArgumentException | GeneralSecurityException | IOException e) {
SQLException originalException = new SQLException(e);
if (client != null) {
try {
client.close();
Expand All @@ -541,6 +583,17 @@ public ArrowFlightSqlClientHandler build() throws SQLException {
}
}
throw originalException;
} catch (final FlightRuntimeException e) {
SQLException runtimeToSQLException = FlightToJDBCExceptionMapper
.map(e, "Failed to connect.");
if (client != null) {
try {
client.close();
} catch (final InterruptedException interruptedException) {
runtimeToSQLException.addSuppressed(interruptedException);
}
}
throw runtimeToSQLException;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.driver.jdbc.utils;

import java.sql.SQLException;
import java.util.Map;

import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightStatusCode;

import com.google.common.collect.ImmutableMap;

/**
* Parent class for rpc exceptions.
*/
public class FlightToJDBCExceptionMapper {

private static final String SQL_STATE_UNAUTHENTICATED = "28000";
private static final String SQL_STATE_UNAUTHORIZED = "42000";
private static final String SQL_STATE_UNAVAILABLE = "08001";
private static final String SQL_STATE_UNIMPLEMENTED = "0A000";
private static final String SQL_STATE_CANCELLED = "HY008";
private static final String SQL_STATE_ALREADY_EXISTS = "21000";
private static final String SQL_STATE_NOT_FOUND = "42000";
private static final String SQL_STATE_TIMED_OUT = "HYT01";
private static final String SQL_STATE_INVALID_ARGUMENT = "2200T";
private static final String SQL_STATE_INTERNAL = "01000";
private static final String SQL_STATE_UNKNOWN = "01000";

private static final Map<FlightStatusCode, String> errorStatusSQLStateMap =
new ImmutableMap.Builder<FlightStatusCode, String>()
.put(FlightStatusCode.UNAUTHENTICATED, SQL_STATE_UNAUTHENTICATED)
.put(FlightStatusCode.UNAUTHORIZED, SQL_STATE_UNAUTHORIZED)
.put(FlightStatusCode.UNAVAILABLE, SQL_STATE_UNAVAILABLE)
.put(FlightStatusCode.UNIMPLEMENTED, SQL_STATE_UNIMPLEMENTED)
.put(FlightStatusCode.CANCELLED, SQL_STATE_CANCELLED)
.put(FlightStatusCode.ALREADY_EXISTS, SQL_STATE_ALREADY_EXISTS)
.put(FlightStatusCode.NOT_FOUND, SQL_STATE_NOT_FOUND)
.put(FlightStatusCode.TIMED_OUT, SQL_STATE_TIMED_OUT)
.put(FlightStatusCode.INVALID_ARGUMENT, SQL_STATE_INVALID_ARGUMENT)
.put(FlightStatusCode.INTERNAL, SQL_STATE_INTERNAL)
.put(FlightStatusCode.UNKNOWN, SQL_STATE_UNKNOWN).build();

private FlightToJDBCExceptionMapper() {}

public static SQLException map(FlightRuntimeException flightRuntimeException) {
return map(flightRuntimeException, flightRuntimeException.getMessage());
}

/**
* Map the given RpcException into an equivalent SQLException.
* <p>
* An appropriate SQLState will be chosen for the RpcException, if one is available.
*
* @param flightRuntimeException The remote exception to map.
* @param message The message format string to use for the SQLException.
* @return The equivalently mapped SQLException.
*/
public static SQLException map(FlightRuntimeException flightRuntimeException, String message) {
return new SQLException(message, getSqlCodeFromRpcExceptionType(flightRuntimeException), flightRuntimeException);
}

private static String getSqlCodeFromRpcExceptionType(FlightRuntimeException flightRuntimeException) {
return errorStatusSQLStateMap.get(flightRuntimeException.status().code());
}
}
Loading