Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions java/flight/flight-jdbc-driver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.61</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.10.0</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.arrow.driver.jdbc.client.utils.ClientAuthenticationUtils;
import org.apache.arrow.driver.jdbc.client.utils.KeyedFlightSqlClientObjectPool;
import org.apache.arrow.driver.jdbc.client.utils.KeyedFlightSqlClientObjectPoolFactory;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightClientMiddleware;
Expand All @@ -49,31 +52,39 @@
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.calcite.avatica.Meta.StatementType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link FlightSqlClient} handler.
*/
public final class ArrowFlightSqlClientHandler implements AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(ArrowFlightSqlClientHandler.class);
private final FlightSqlClient sqlClient;
private final Set<CallOption> options = new HashSet<>();
private final KeyedFlightSqlClientObjectPool flightSqlClientPool;

ArrowFlightSqlClientHandler(final FlightSqlClient sqlClient,
final Collection<CallOption> options) {
BufferAllocator allocator, final Collection<CallOption> options) {
this.options.addAll(options);
this.flightSqlClientPool = new KeyedFlightSqlClientObjectPool(
new KeyedFlightSqlClientObjectPoolFactory(allocator));
this.sqlClient = Preconditions.checkNotNull(sqlClient);
}

/**
* Creates a new {@link ArrowFlightSqlClientHandler} from the provided {@code client} and {@code options}.
*
* @param client the {@link FlightClient} to manage under a {@link FlightSqlClient} wrapper.
* @param allocator the {@link BufferAllocator} used to create the client.
* @param options the {@link CallOption}s to persist in between subsequent client calls.
* @return a new {@link ArrowFlightSqlClientHandler}.
*/
public static ArrowFlightSqlClientHandler createNewHandler(final FlightClient client,
final BufferAllocator allocator,
final Collection<CallOption> options) {
return new ArrowFlightSqlClientHandler(new FlightSqlClient(client), options);
return new ArrowFlightSqlClientHandler(new FlightSqlClient(client), allocator, options);
}

/**
Expand All @@ -92,11 +103,36 @@ private CallOption[] getOptions() {
* @param flightInfo The {@link FlightInfo} instance from which to fetch results.
* @return a {@code FlightStream} of results.
*/
public List<FlightStream> getStreams(final FlightInfo flightInfo) {
return flightInfo.getEndpoints().stream()
.map(FlightEndpoint::getTicket)
.map(ticket -> sqlClient.getStream(ticket, getOptions()))
.collect(Collectors.toList());
public List<FlightStream> getStreams(final FlightInfo flightInfo) throws SQLException {
final List<FlightStream> allStreams = new ArrayList<>();

for (final FlightEndpoint endpoint : flightInfo.getEndpoints()) {
List<Location> locations = endpoint.getLocations();
if (locations.isEmpty()) {
allStreams.add(sqlClient.getStream(endpoint.getTicket(), getOptions()));
continue;
}
final Location location = locations.get(0); // purposefully discard other locations

logger.info(String.format("Getting a client for location %s.", location));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use String.format() with loggers. This executes the work to generate a string, even if logging is off.
Use the logger directly. Note that placeholders are curly braces in SLF4J eg:
logger.info("Getting client for location {}.", location);

please fix this throughout the PR.

FlightSqlClient flightSqlClient;
try {
flightSqlClient = flightSqlClientPool.borrowObject(location);
} catch (final NoSuchElementException e) {
try {
flightSqlClientPool.addObject(location);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think this happens automatically rather than needing try/catch.

flightSqlClient = flightSqlClientPool.borrowObject(location);
} catch (final Exception addObjectEx) {
throw new SQLException("Failed to create and get a new FlightSqlClient in the ObjectPool.", addObjectEx);
}
} catch (final Exception borrowObjectEx) {
throw new SQLException("Failed to borrow a FlightSqlClient from the ObjectPool", borrowObjectEx);
}

allStreams.add(flightSqlClient.getStream(endpoint.getTicket(), getOptions()));
flightSqlClientPool.returnObject(location, flightSqlClient);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot return the client to the pool until we are done getting the stream. Otherwise it allows another query being executed to re-use the client while it's still being used.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can write an auto-closeable wrapper on top of FlightStream that returns to the pool on closure.

}
return allStreams;
}

/**
Expand All @@ -114,6 +150,7 @@ public FlightInfo getInfo(final String query) {
public void close() throws SQLException {
try {
AutoCloseables.close(sqlClient);
flightSqlClientPool.close();
} catch (final Exception e) {
throw new SQLException("Failed to clean up client resources.", e);
}
Expand Down Expand Up @@ -529,7 +566,7 @@ public ArrowFlightSqlClientHandler build() throws SQLException {
ClientAuthenticationUtils.getAuthenticate(
client, new CredentialCallOption(new BearerCredentialWriter(token))));
}
return ArrowFlightSqlClientHandler.createNewHandler(client, options);
return ArrowFlightSqlClientHandler.createNewHandler(client, allocator, options);

} catch (final IllegalArgumentException | GeneralSecurityException | IOException | FlightRuntimeException e) {
final SQLException originalException = new SQLException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.driver.jdbc.client.utils;

import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.sql.FlightSqlClient;
import org.apache.commons.pool2.KeyedPooledObjectFactory;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;

/**
* Draft.
*/
public class KeyedFlightSqlClientObjectPool extends GenericKeyedObjectPool<Location, FlightSqlClient> {

public KeyedFlightSqlClientObjectPool(KeyedPooledObjectFactory<Location, FlightSqlClient> factory) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The casting of the factory to call a close method on it is weird.
Instead, you can make this class own the allocator:

  • Have the allocator be the parameter to the constructor.
  • call super(new KeyedFlightSql...()) and have that take in the allocator
  • override close() in this class, and move all the allocator cleanup there.

super(factory);
}

@Override
public synchronized void close() {
((KeyedFlightSqlClientObjectPoolFactory) getFactory()).closeAllocator();
super.close();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should actually call super.close() first to clean up the clients which are depending on the allocator, then clean up the allocator. (Normally the right thing to do is do child class clean-up, then base class clean-up but I think the logic needs to be different in this scenario).

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.driver.jdbc.client.utils;

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.sql.FlightSqlClient;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Draft.
*/
public class KeyedFlightSqlClientObjectPoolFactory extends BaseKeyedPooledObjectFactory<Location, FlightSqlClient> {
private static final Logger logger = LoggerFactory.getLogger(KeyedFlightSqlClientObjectPoolFactory.class);
private final BufferAllocator parentAllocator;
private final AtomicInteger clientCounter = new AtomicInteger();

/**
* Draft.
* @param parentAllocator allocator.
*/
public KeyedFlightSqlClientObjectPoolFactory(final BufferAllocator parentAllocator) {
super();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: let's avoid default super calls.

this.parentAllocator = parentAllocator
.newChildAllocator("KeyedFlightSqlClientObjectPoolFactory", 0, parentAllocator.getLimit());
}

@Override
public FlightSqlClient create(Location key) throws Exception {
logger.info("Trying to create a new FlightSqlClient.");
return new FlightSqlClient(
FlightClient.builder(
parentAllocator.newChildAllocator(
"flight-sql-client-pool_id-" + clientCounter.getAndIncrement(),
0,
parentAllocator.getLimit()),
key).build());
}

public void closeAllocator() {
parentAllocator.getChildAllocators().forEach(BufferAllocator::close);
parentAllocator.close();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about closing the parent allocator here... I don't see a call to close() being removed which means prior to this patch we were either leaking an allocator or we are double-closing this.

}

@Override
public PooledObject<FlightSqlClient> wrap(FlightSqlClient value) {
logger.info("Wrapping an existing FlightSqlClient.");
return new DefaultPooledObject<>(value);
}

@Override
public void destroyObject(Location key, PooledObject<FlightSqlClient> p) throws Exception {
logger.info("Closing a client.");
p.getObject().close();
}
}