diff --git a/pom.xml b/pom.xml index 7baaae7..354b438 100644 --- a/pom.xml +++ b/pom.xml @@ -1,343 +1,348 @@ - - 4.0.0 - - org.factor45.hotpotato - hotpotato - 0.8.0 - jar - - hotpotato - http://hotpotato.biasedbit.com - - Missing description - - 2010 - - - Bruno de Carvalho - bruno@biasedbit.com - http://bruno.biasedbit.com - - - - - - Apache License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0 - - - - - UTF-8 - - - - - repository.jboss.org - http://repository.jboss.org/nexus/content/groups/public/ - - false - - - - - - - - repository.jboss.org - http://repository.jboss.org/nexus/content/groups/public/ - - false - - - - - - - junit - junit - 4.7 - test - - - org.jboss.netty - netty - 3.2.2.Final - compile - - - org.slf4j - slf4j-api - 1.6.1 - compile - - - - - true - org.slf4j - slf4j-log4j12 - 1.6.1 - runtime - - - true - log4j - log4j - 1.2.16 - runtime - - - - - - - - maven-compiler-plugin - - 1.5 - 1.5 - - - - - - org.apache.maven.plugins - maven-jar-plugin - 2.3.1 - - - **/log4j.properties - - - - - - - maven-surefire-plugin - - never - - **/Abstract* - **/*TestUtil* - - - - - - - maven-javadoc-plugin - 2.7 - - - generate-javadoc - package - - javadoc - - - - - org.jboss.apiviz.APIviz - - org.jboss.apiviz - apiviz - 1.3.1.GA - - false - public - ${basedir}/src/javadoc/stylesheet.css - ${basedir}/src/javadoc/ - true - true - ${project.build.directory}/api - ${project.build.directory}/api - api - UTF-8 - UTF-8 - true - true - true - true - ${basedir}/src/javadoc/overview.html - ${project.name} API Reference (${project.version}) - ${project.name} API Reference (${project.version) - - -link http://java.sun.com/javase/6/docs/api/ - -link http://docs.jboss.org/netty/3.2/api/ - - -group "Connection to server" ${project.groupId}.client.* - -group "Request/response lifecycle" ${project.groupId}.request*:${project.groupId}.response* - - -sourceclasspath ${project.build.outputDirectory} - -nopackagediagram - - UTF-8 - en_GB - - ${project.groupId}.util*:org.jboss* - - - - - - - maven-jxr-plugin - - - generate-xref - package - - jxr - - - - - UTF-8 - UTF-8 - true - ${project.build.directory}/xref - ${project.build.directory}/api - ${basedir}/src/xref/stylesheet.css - ${project.name} Source Xref (${project.version}) - ${project.name} Source Xref (${project.version}) - 120 - - - - - - org.jboss.maven.plugins - maven-jdocbook-plugin - 2.2.1 - - - generate-docbook - package - - resources - generate - - - - - - org.jboss - jbossorg-docbook-xslt - 1.1.0 - - - org.eclipse.wst.css - core - - - org.eclipse.wst.sse - core - - - - - org.jboss - jbossorg-fonts - 1.0.0 - jdocbook-style - - - - master.xml - en-US - ${basedir}/src/docbook - - ${basedir}/src/docbook - - css/**/* - - - - ${basedir}/src/docbook - - images/**/* - - - - - html - file:///${basedir}/src/docbook/xslt/xhtml.xsl - index.html - - - html_single - file:///${basedir}/src/docbook/xslt/xhtml-single.xsl - index.html - - - pdf - file:///${basedir}/src/docbook/xslt/pdf.xsl - hotpotato-userguide.pdf - - - - true - saxon - 1.72.0 - - - true - - - - - - - org.apache.maven.plugins - maven-source-plugin - 2.1.1 - - - attach-source - package - - jar - - - true - - - - - - - - maven-assembly-plugin - - - generate-distribution - package - - single - - - - - - ${basedir}/src/assembly/default.xml - - false - true - gnu - - - - - - org.apache.maven.plugins - maven-idea-plugin - 2.2 - - true - 1.5 - - - - - + + + InfoServ + com.digcy + 2.12 + + 4.0.0 + org.factor45.hotpotato + hotpotato + 1.0-SNAPSHOT + jar + + hotpotato + http://hotpotato.biasedbit.com + + Missing description + + 2010 + + + Bruno de Carvalho + bruno@biasedbit.com + http://bruno.biasedbit.com + + + + + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0 + + + + + UTF-8 + + + + + repository.jboss.org + http://repository.jboss.org/nexus/content/groups/public/ + + false + + + + + + + + repository.jboss.org + http://repository.jboss.org/nexus/content/groups/public/ + + false + + + + + + + junit + junit + 4.7 + test + + + org.jboss.netty + netty + 3.2.2.Final + compile + + + org.slf4j + slf4j-api + 1.6.1 + compile + + + + + true + org.slf4j + slf4j-log4j12 + 1.6.1 + runtime + + + true + log4j + log4j + 1.2.16 + runtime + + + + + + + + maven-compiler-plugin + + 1.5 + 1.5 + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.3.1 + + + **/log4j.properties + + + + + + + maven-surefire-plugin + + never + + **/Abstract* + **/*TestUtil* + + + + + + + maven-javadoc-plugin + 2.7 + + + generate-javadoc + package + + javadoc + + + + + org.jboss.apiviz.APIviz + + org.jboss.apiviz + apiviz + 1.3.1.GA + + false + public + ${basedir}/src/javadoc/stylesheet.css + ${basedir}/src/javadoc/ + true + true + ${project.build.directory}/api + ${project.build.directory}/api + api + UTF-8 + UTF-8 + true + true + true + true + ${basedir}/src/javadoc/overview.html + ${project.name} API Reference (${project.version}) + ${project.name} API Reference (${project.version) + + -link http://java.sun.com/javase/6/docs/api/ + -link http://docs.jboss.org/netty/3.2/api/ + + -group "Connection to server" ${project.groupId}.client.* + -group "Request/response lifecycle" ${project.groupId}.request*:${project.groupId}.response* + + -sourceclasspath ${project.build.outputDirectory} + -nopackagediagram + + UTF-8 + en_GB + + ${project.groupId}.util*:org.jboss* + + + + + + + maven-jxr-plugin + + + generate-xref + package + + jxr + + + + + UTF-8 + UTF-8 + true + ${project.build.directory}/xref + ${project.build.directory}/api + ${basedir}/src/xref/stylesheet.css + ${project.name} Source Xref (${project.version}) + ${project.name} Source Xref (${project.version}) + 120 + + + + + + org.jboss.maven.plugins + maven-jdocbook-plugin + 2.2.1 + + + generate-docbook + package + + resources + generate + + + + + + org.jboss + jbossorg-docbook-xslt + 1.1.0 + + + org.eclipse.wst.css + core + + + org.eclipse.wst.sse + core + + + + + org.jboss + jbossorg-fonts + 1.0.0 + jdocbook-style + + + + master.xml + en-US + ${basedir}/src/docbook + + ${basedir}/src/docbook + + css/**/* + + + + ${basedir}/src/docbook + + images/**/* + + + + + html + file:///${basedir}/src/docbook/xslt/xhtml.xsl + index.html + + + html_single + file:///${basedir}/src/docbook/xslt/xhtml-single.xsl + index.html + + + pdf + file:///${basedir}/src/docbook/xslt/pdf.xsl + hotpotato-userguide.pdf + + + + true + saxon + 1.72.0 + - + true + + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.1.1 + + + attach-source + package + + jar + + + true + + + + + + + + maven-assembly-plugin + + + generate-distribution + package + + single + + + + + + ${basedir}/src/assembly/default.xml + + false + true + gnu + + + + + + org.apache.maven.plugins + maven-idea-plugin + 2.2 + + true + 1.5 + + + + + + diff --git a/src/main/java/com/biasedbit/hotpotato/client/AbstractHttpClient.java b/src/main/java/com/biasedbit/hotpotato/client/AbstractHttpClient.java index c828baf..568829b 100644 --- a/src/main/java/com/biasedbit/hotpotato/client/AbstractHttpClient.java +++ b/src/main/java/com/biasedbit/hotpotato/client/AbstractHttpClient.java @@ -1,1066 +1,1079 @@ -/* - * Copyright 2010 Bruno de Carvalho - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.biasedbit.hotpotato.client; - -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.net.ssl.SSLEngine; - -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory; -import org.jboss.netty.example.securechat.SecureChatSslContextFactory; -import org.jboss.netty.handler.codec.http.HttpChunkAggregator; -import org.jboss.netty.handler.codec.http.HttpClientCodec; -import org.jboss.netty.handler.codec.http.HttpContentCompressor; -import org.jboss.netty.handler.codec.http.HttpContentDecompressor; -import org.jboss.netty.handler.codec.http.HttpHeaders; -import org.jboss.netty.handler.codec.http.HttpRequest; -import org.jboss.netty.handler.ssl.SslHandler; -import org.jboss.netty.util.internal.ExecutorUtil; - -import com.biasedbit.hotpotato.client.connection.HttpConnection; -import com.biasedbit.hotpotato.client.connection.HttpConnectionListener; -import com.biasedbit.hotpotato.client.connection.factory.DefaultHttpConnectionFactory; -import com.biasedbit.hotpotato.client.connection.factory.HttpConnectionFactory; -import com.biasedbit.hotpotato.client.event.ConnectionClosedEvent; -import com.biasedbit.hotpotato.client.event.ConnectionFailedEvent; -import com.biasedbit.hotpotato.client.event.ConnectionOpenEvent; -import com.biasedbit.hotpotato.client.event.EventType; -import com.biasedbit.hotpotato.client.event.ExecuteRequestEvent; -import com.biasedbit.hotpotato.client.event.HttpClientEvent; -import com.biasedbit.hotpotato.client.event.RequestCompleteEvent; -import com.biasedbit.hotpotato.client.host.HostContext; -import com.biasedbit.hotpotato.client.host.factory.DefaultHostContextFactory; -import com.biasedbit.hotpotato.client.host.factory.HostContextFactory; -import com.biasedbit.hotpotato.client.timeout.HashedWheelTimeoutManager; -import com.biasedbit.hotpotato.client.timeout.TimeoutManager; -import com.biasedbit.hotpotato.logging.Logger; -import com.biasedbit.hotpotato.request.HttpRequestFuture; -import com.biasedbit.hotpotato.request.factory.DefaultHttpRequestFutureFactory; -import com.biasedbit.hotpotato.request.factory.HttpRequestFutureFactory; -import com.biasedbit.hotpotato.response.DiscardProcessor; -import com.biasedbit.hotpotato.response.HttpResponseProcessor; -import com.biasedbit.hotpotato.util.CleanupChannelGroup; -import com.biasedbit.hotpotato.util.NamedThreadFactory; - -/** - * Abstract implementation of the {@link HttpClient} interface. Contains most of the boilerplate code that other - * {@link HttpClient} implementations would also need. - *

- * This abstract implementation is in itself complete. If you extend this class, you needn't implement a single method - * (just like {@link DefaultHttpClient} does). However you may want to override the specific behavior of one or other - * method, rather than reimplement the whole class all over again (just like {@link StatsGatheringHttpClient} does - - * only overrides the {@code eventHandlingLoop()} method to gather execution statistics). - *

- *

Thread safety and performance

This default implementation is thread-safe and, unlike Apache HttpClient, the performance does not - * degrade when the instance is shared by multiple threads accessing it at the same time. - *

- *

Event queue (producer/consumer)

When this implementation is initialised, it fires up an auxilliary thread, - * the consumer. - *

- * Every time one of the variants of the method {@code execute()} is called, a new {@link HttpClientEvent} is generated - * and introduced in a blocking event queue (on the caller's thread execution time). The consumer then grabs that - * request and acts accordingly - it can either queue the request so that it is later executed in an available - * connection, request a new connection in case no connections are available, directly execute this request, etc. - *

- *

Order

By using an event queue, absolute order is guaranteed. If thread A calls {@code execute()} with a - * request to host H prior to thread B (which also places a request for the same host), then the request provided by - * thread A is guaranteed to be placed (i.e. written to the network) before the request placed by - * thread B. - *

- * This doesn't mean that request A will hit the server before request B or that the response for request A will arrive - * before B. The reasons are obvious: - * - *

- * If you need to guarantee that a request B can only hit the server after a request A, you can either manually manage - * that in your code through the {@link HttpRequestFuture} API or configure the concrete instance of this class to allow - * at most 1 connection per host - although this last option will hurt performance globally. - * - *
- *
Note:
- * Calling {@linkplain #execute(String, int, HttpRequest, HttpResponseProcessor) one of the variants of {@code execute}} - * with the client configured with {@linkplain #setAutoInflate(boolean) auto-inflation} turned on will cause a - * 'ACCEPT_ENCODING' header to be added with value 'GZIP'. - *
- * - * @author Bruno de Carvalho - */ -public abstract class AbstractHttpClient implements HttpClient, HttpConnectionListener { - - // constants ------------------------------------------------------------------------------------------------------ - - protected static final Logger LOG = Logger.getLogger(AbstractHttpClient.class); - protected static final HttpClientEvent POISON = new HttpClientEvent() { - - public EventType getEventType() { - return null; - } - - - @Override - public String toString() { - return "POISON"; - } - }; - - // configuration defaults ----------------------------------------------------------------------------------------- - - protected static final boolean USE_SSL = false; - protected static final int REQUEST_COMPRESSION_LEVEL = 0; - protected static final boolean AUTO_INFLATE = false; - protected static final int REQUEST_CHUNK_SIZE = 8192; - protected static final boolean AGGREGATE_RESPONSE_CHUNKS = false; - protected static final int CONNECTION_TIMEOUT_IN_MILLIS = 2000; - protected static final int REQUEST_TIMEOUT_IN_MILLIS = 2000; - protected static final int MAX_CONNECTIONS_PER_HOST = 3; - protected static final int MAX_QUEUED_REQUESTS = Short.MAX_VALUE; - protected static final boolean USE_NIO = false; - protected static final int MAX_IO_WORKER_THREADS = 50; - protected static final int MAX_EVENT_PROCESSOR_HELPER_THREADS = 20; - protected static final boolean CLEANUP_INACTIVE_HOST_CONTEXTS = true; - - // configuration -------------------------------------------------------------------------------------------------- - - protected boolean useSsl; - protected int requestCompressionLevel; - protected boolean autoInflate; - protected int requestChunkSize; - protected boolean aggregateResponseChunks; - protected int maxConnectionsPerHost; - protected int maxQueuedRequests; - protected int connectionTimeoutInMillis; - protected int requestTimeoutInMillis; - protected boolean useNio; - protected int maxIoWorkerThreads; - protected int maxEventProcessorHelperThreads; - protected HttpConnectionFactory connectionFactory; - protected HostContextFactory hostContextFactory; - protected HttpRequestFutureFactory futureFactory; - protected TimeoutManager timeoutManager; - protected boolean cleanupInactiveHostContexts; - - // internal vars -------------------------------------------------------------------------------------------------- - - protected Executor executor; - protected ChannelFactory channelFactory; - protected ChannelPipelineFactory pipelineFactory; - protected ChannelGroup channelGroup; - protected BlockingQueue eventQueue; - protected final Map contextMap; - protected final AtomicInteger queuedRequests; - protected int connectionCounter; - protected CountDownLatch eventConsumerLatch; - protected volatile boolean terminate; - protected boolean internalTimeoutManager; - - // constructors --------------------------------------------------------------------------------------------------- - - public AbstractHttpClient() { - this.useSsl = USE_SSL; - this.requestCompressionLevel = REQUEST_COMPRESSION_LEVEL; - this.autoInflate = AUTO_INFLATE; - this.requestChunkSize = REQUEST_CHUNK_SIZE; - this.aggregateResponseChunks = AGGREGATE_RESPONSE_CHUNKS; - this.connectionTimeoutInMillis = CONNECTION_TIMEOUT_IN_MILLIS; - this.requestTimeoutInMillis = REQUEST_TIMEOUT_IN_MILLIS; - this.maxConnectionsPerHost = MAX_CONNECTIONS_PER_HOST; - this.maxQueuedRequests = MAX_QUEUED_REQUESTS; - this.useNio = USE_NIO; - this.maxIoWorkerThreads = MAX_IO_WORKER_THREADS; - this.maxEventProcessorHelperThreads = MAX_EVENT_PROCESSOR_HELPER_THREADS; - this.cleanupInactiveHostContexts = CLEANUP_INACTIVE_HOST_CONTEXTS; - - this.queuedRequests = new AtomicInteger(0); - - // No need for synchronized structures here, as they'll be accessed by a single thread - this.contextMap = new HashMap(); - } - - // HttpClient ----------------------------------------------------------------------------------------------------- - - - public boolean init() { - if (this.timeoutManager == null) { - // Consumes less resources, puts less emphasis on precision. - this.timeoutManager = new HashedWheelTimeoutManager(); - //this.timeoutManager = new BasicTimeoutManager(10); - this.timeoutManager.init(); - this.internalTimeoutManager = true; - } - - if (this.hostContextFactory == null) { - this.hostContextFactory = new DefaultHostContextFactory(); - } - if (this.connectionFactory == null) { - this.connectionFactory = new DefaultHttpConnectionFactory(); - } - if (this.futureFactory == null) { - this.futureFactory = new DefaultHttpRequestFutureFactory(); - } - - this.eventConsumerLatch = new CountDownLatch(1); - this.eventQueue = new LinkedBlockingQueue(); - - // TODO instead of fixed size thread pool, use a cached thread pool with size limit (limited growth cached pool) - this.executor = Executors.newFixedThreadPool(this.maxEventProcessorHelperThreads, - new NamedThreadFactory("hpttHandyman")); - Executor workerPool = Executors.newFixedThreadPool(this.maxIoWorkerThreads); - - if (this.useNio) { - // It's only going to create 1 thread, so no harm done here. - Executor bossPool = Executors.newCachedThreadPool(); - this.channelFactory = new NioClientSocketChannelFactory(bossPool, workerPool); - } else { - this.channelFactory = new OioClientSocketChannelFactory(workerPool); - } - - this.channelGroup = new CleanupChannelGroup(this.toString()); - // Create a pipeline without the last handler (it will be added right before connecting). - this.pipelineFactory = new ChannelPipelineFactory() { - - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - if (useSsl) { - SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine(); - engine.setUseClientMode(true); - pipeline.addLast("ssl", new SslHandler(engine)); - } - - if (requestCompressionLevel > 0) { - pipeline.addLast("deflater", new HttpContentCompressor(requestCompressionLevel)); - } - - pipeline.addLast("codec", new HttpClientCodec(4096, 8192, requestChunkSize)); - if (autoInflate) { - pipeline.addLast("inflater", new HttpContentDecompressor()); - } - if (aggregateResponseChunks) { - pipeline.addLast("aggregator", new HttpChunkAggregator(1048576)); - } - return pipeline; - } - }; - - this.executor.execute(new Runnable() { - - public void run() { - eventHandlingLoop(); - } - }); - return true; - } - - - public void terminate() { - if (this.terminate || (this.eventQueue == null)) { - return; - } - - // Stop accepting requests. - this.terminate = true; - // Copy any pending operations in order to signal execution request failures. - Collection pendingEvents = new ArrayList(this.eventQueue); - // Clear the queue and kill the consumer thread by "poisoning" the event queue. - this.eventQueue.clear(); - this.eventQueue.add(POISON); - try { - this.eventConsumerLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - // Fail all requests that were still in the event queue. - for (HttpClientEvent event : pendingEvents) { - switch (event.getEventType()) { - case EXECUTE_REQUEST: - ((ExecuteRequestEvent) event).getContext().getFuture().setFailure(HttpRequestFuture.SHUTTING_DOWN); - case CONNECTION_CLOSED: - ConnectionClosedEvent closedEvent = (ConnectionClosedEvent) event; - if ((closedEvent.getRetryRequests() != null) && !closedEvent.getRetryRequests().isEmpty()) { - for (HttpRequestContext context : closedEvent.getRetryRequests()) { - context.getFuture().setFailure(HttpRequestFuture.SHUTTING_DOWN); - } - } - } - } - - // Kill all connections (will cause failure on requests executing in those connections) and fail context-queued - // requests. - for (HostContext hostContext : this.contextMap.values()) { - for (HttpRequestContext context : hostContext.getQueue()) { - context.getFuture().setFailure(HttpRequestFuture.SHUTTING_DOWN); - } - for (HttpConnection connection : hostContext.getConnectionPool().getConnections()) { - connection.terminate(); - } - } - this.contextMap.clear(); - - try { - this.channelGroup.close().await(1000); - } catch (InterruptedException e) { - Thread.interrupted(); - } - - this.channelFactory.releaseExternalResources(); - if (this.executor != null) { - ExecutorUtil.terminate(this.executor); - } - - if (this.internalTimeoutManager) { - this.timeoutManager.terminate(); - } - } - - - public HttpRequestFuture execute(final String host, final int port, final HttpRequest request, - final HttpResponseProcessor processor) - throws CannotExecuteRequestException { - return this.execute(host, port, this.requestTimeoutInMillis, request, processor); - } - - - public HttpRequestFuture execute(final String host, final int port, final HttpRequest request) throws CannotExecuteRequestException { - return this.execute(host, port, request, DiscardProcessor.getInstance()); - } - - - public HttpRequestFuture execute(final String host, final int port, final int timeout, final HttpRequest request, - final HttpResponseProcessor processor) - throws CannotExecuteRequestException { - if (this.eventQueue == null) { - throw new CannotExecuteRequestException(this.getClass().getSimpleName() + " was not initialised"); - } - - if (this.queuedRequests.incrementAndGet() > this.maxQueuedRequests) { - this.queuedRequests.decrementAndGet(); - throw new CannotExecuteRequestException("Request queue is full"); - } - - // Perform these checks on the caller thread's time rather than the event dispatcher's. - if (this.autoInflate) { - request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); - } - - HttpRequestFuture future = this.futureFactory.getFuture(true); - HttpRequestContext context = new HttpRequestContext(host, port, timeout, request, processor, future); - if (this.terminate || !this.eventQueue.offer(new ExecuteRequestEvent(context))) { - throw new CannotExecuteRequestException("Failed to add request to queue"); - } - - return future; - } - - - public boolean isHttps() { - return this.useSsl; - } - - // HttpConnectionListener ----------------------------------------------------------------------------------------- - - - public void connectionOpened(final HttpConnection connection) { - if (this.terminate) { - return; - } - this.eventQueue.offer(new ConnectionOpenEvent(connection)); - } - - - public void connectionTerminated(final HttpConnection connection, final Collection retryRequests) { - if (this.terminate) { - if ((retryRequests != null) && !retryRequests.isEmpty()) { - for (HttpRequestContext request : retryRequests) { - request.getFuture().setFailure(HttpRequestFuture.SHUTTING_DOWN); - } - } - return; - } - this.eventQueue.offer(new ConnectionClosedEvent(connection, retryRequests)); - } - - - public void connectionTerminated(final HttpConnection connection) { - if (this.terminate) { - return; - } - this.eventQueue.offer(new ConnectionClosedEvent(connection, null)); - } - - - public void connectionFailed(final HttpConnection connection) { - if (this.terminate) { - return; - } - this.eventQueue.offer(new ConnectionFailedEvent(connection)); - } - - - public void requestFinished(final HttpConnection connection, final HttpRequestContext context) { - if (this.terminate) { - return; - } - this.eventQueue.offer(new RequestCompleteEvent(context)); - } - - // protected helpers ---------------------------------------------------------------------------------------------- - - protected void eventHandlingLoop() { - for (;;) { - // Manual synchronization here because before removing an element, we first need to check whether an - // active available connection exists to satisfy the request. - try { - HttpClientEvent event = this.eventQueue.take(); - if (event == POISON) { - this.eventConsumerLatch.countDown(); - return; - } - - switch (event.getEventType()) { - case EXECUTE_REQUEST: - this.handleExecuteRequest((ExecuteRequestEvent) event); - break; - case REQUEST_COMPLETE: - this.handleRequestComplete((RequestCompleteEvent) event); - break; - case CONNECTION_OPEN: - this.handleConnectionOpen((ConnectionOpenEvent) event); - break; - case CONNECTION_CLOSED: - this.handleConnectionClosed((ConnectionClosedEvent) event); - break; - case CONNECTION_FAILED: - this.handleConnectionFailed((ConnectionFailedEvent) event); - break; - default: - // Consume and do nothing, unknown event. - } - } catch (InterruptedException e) { - // ignore, poisoning the queue is the only way to stop - } - } - } - - // private helpers -------------------------------------------------------------------------------------------- - - protected void handleExecuteRequest(final ExecuteRequestEvent event) { - // First, add it to the queue (or create a queue for given host if one does not exist) - String id = this.hostId(event.getContext()); - HostContext context = this.contextMap.get(id); - if (context == null) { - context = this.hostContextFactory - .createHostContext(event.getContext().getHost(), event.getContext().getPort(), - this.maxConnectionsPerHost); - this.contextMap.put(id, context); - } - - context.addToQueue(event.getContext()); - this.drainQueueAndProcessResult(context); - } - - protected void handleRequestComplete(final RequestCompleteEvent event) { - this.queuedRequests.decrementAndGet(); - - HostContext context = this.contextMap.get(this.hostId(event.getContext())); - if (context == null) { - // Can only happen if context is cleaned meanwhile... ignore and bail out. - return; - } - - this.drainQueueAndProcessResult(context); - } - - protected void handleConnectionOpen(final ConnectionOpenEvent event) { - String id = this.hostId(event.getConnection()); - HostContext context = this.contextMap.get(id); - if (context == null) { - throw new IllegalStateException("Context for id '" + id + - "' does not exist (it may have been incorrectly cleaned up)"); - } - - context.getConnectionPool().connectionOpen(event.getConnection()); - // Rather than go through the whole process of drainQueue(), simplyp poll a single element from the head of - // the queue into this connection (a newly opened connection is ALWAYS available). - HttpRequestContext nextRequest = context.pollQueue(); - if (nextRequest != null) { - event.getConnection().execute(nextRequest); - } - } - - protected void handleConnectionClosed(final ConnectionClosedEvent event) { - // Update the list of available connections for the same host:port. - String id = this.hostId(event.getConnection()); - HostContext context = this.contextMap.get(id); - if (context == null) { - throw new IllegalStateException("Context for id '" + id + - "' does not exist (it may have been incorrectly cleaned up)"); - } - - context.getConnectionPool().connectionClosed(event.getConnection()); - - if (event.getRetryRequests() != null) { - context.restoreRequestsToQueue(event.getRetryRequests()); - } - - if ((context.getConnectionPool().getTotalConnections() == 0) && context.getQueue().isEmpty() && - this.cleanupInactiveHostContexts) { - // No requests in queue, no connections open or opening... Cleanup resources. - this.contextMap.remove(id); - } - - this.drainQueueAndProcessResult(context); - } - - protected void handleConnectionFailed(final ConnectionFailedEvent event) { - // Update the list of available connections for the same host:port. - String id = this.hostId(event.getConnection()); - HostContext context = this.contextMap.get(id); - if (context == null) { - throw new IllegalStateException("Context for id '" + id + - "' does not exist (it may have been incorrectly cleaned up)"); - } - - context.getConnectionPool().connectionFailed(); - if ((context.getConnectionPool().hasConnectionFailures() && - (context.getConnectionPool().getTotalConnections() == 0))) { - // Connection failures occured and there are no more connections active or establishing, so its time to - // fail all queued requests. - context.failAllRequests(HttpRequestFuture.CANNOT_CONNECT); - } - } - - protected void drainQueueAndProcessResult(final HostContext context) { - HostContext.DrainQueueResult result = context.drainQueue(); - switch (result) { - case OPEN_CONNECTION: - this.openConnection(context); - break; - case QUEUE_EMPTY: - case NOT_DRAINED: - case DRAINED: - default: - } - } - - protected String hostId(final HttpConnection connection) { - return this.hostId(connection.getHost(), connection.getPort()); - } - - protected String hostId(final HttpRequestContext context) { - return this.hostId(context.getHost(), context.getPort()); - } - - protected String hostId(final HostContext context) { - return this.hostId(context.getHost(), context.getPort()); - } - - protected String hostId(final String host, final int port) { - return new StringBuilder().append(host).append(":").append(port).toString(); - } - - protected void openConnection(final HostContext context) { - // No need to recheck whether a connection can be opened or not, that was done already inside the HttpContext. - - // Try to create a pipeline before signalling a new connection is being open. - // This should never throw exceptions but who knows... - final ChannelPipeline pipeline; - try { - pipeline = this.pipelineFactory.getPipeline(); - } catch (Exception e) { - LOG.error("Failed to create pipeline.", e); - // bail out before marking a connection as opening. - return; - } - - // Signal that a new connection is opening. - context.getConnectionPool().connectionOpening(); - - // server:port-X - String id = new StringBuilder().append(this.hostId(context)).append("-") - .append(this.connectionCounter++).toString(); - - // If not using NIO, then delegate the blocking write() call to the executor. - Executor writeDelegator = this.useNio ? null : this.executor; - - final HttpConnection connection = this.connectionFactory - .createConnection(id, context.getHost(), context.getPort(), this, this.timeoutManager, writeDelegator); - - pipeline.addLast("handler", connection); - - // Delegate actual connection to other thread, since calling connect is a blocking call. - this.executor.execute(new Runnable() { - - public void run() { - ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); - bootstrap.setOption("reuseAddress", true); - bootstrap.setOption("connectTimeoutMillis", connectionTimeoutInMillis); - bootstrap.setPipeline(pipeline); - bootstrap.connect(new InetSocketAddress(context.getHost(), context.getPort())) - .addListener(new ChannelFutureListener() { - - public void operationComplete(final ChannelFuture future) throws Exception { - if (future.isSuccess()) { - // Don't even bother checking if client was already instructed to terminate since - // CleanupChannelGroup takes care of that. - channelGroup.add(future.getChannel()); - } - } - }); - } - }); - } - - // getters & setters ---------------------------------------------------------------------------------------------- - - public boolean isUseSsl() { - return useSsl; - } - - /** - * Whether this client should create SSL or non-SSL connections. - *

- * All connections are affected by this flag. - *

- * Defaults to {@code false}. - * - * @param useSsl {@code true} if all connections will have SSL support, {@code false} otherwise. - */ - public void setUseSsl(final boolean useSsl) { - if (this.eventQueue != null) { - throw new IllegalStateException("Cannot modify property after initialization"); - } - this.useSsl = useSsl; - } - - public int getRequestCompressionLevel() { - return requestCompressionLevel; - } - - /** - * Level of compression when sending requests. - *

- * Defaults to 0. - * - * @param requestCompressionLevel Level of compression between 0 and 9; 0 = off and 9 = max. - */ - public void setRequestCompressionLevel(final int requestCompressionLevel) { - if ((requestCompressionLevel < 0) || (requestCompressionLevel > 9)) { - throw new IllegalArgumentException("RequestCompressionLevel must be in range [0;9] (0 = none, 9 = max)"); - } - if (this.eventQueue != null) { - throw new IllegalStateException("Cannot modify property after initialization"); - } - this.requestCompressionLevel = requestCompressionLevel; - } - - public boolean isAutoInflate() { - return autoInflate; - } - - /** - * Whether responses should be auto inflated (decompressed) or not. - *

- * Setting this flag to true will cause a 'Accept-Encoding' header with value 'gzip' to be added to the requests - * submitted. - *

- * Defaults to {@code true}. - * - * @param autoInflate {@code true} if the connections should automatically decompress gzip content, {@code false} - * otherwise. - */ - public void setAutoInflate(final boolean autoInflate) { - if (this.eventQueue != null) { - throw new IllegalStateException("Cannot modify property after initialization"); - } - this.autoInflate = autoInflate; - } - - public int getRequestChunkSize() { - return requestChunkSize; - } - - /** - * Maximum size for HTTP request chunks. - * If the contents of the requests exceed this value, the request will be chunked and a 'Transfer-Encoding' header - * will be added with value 'chunked'. - *

- * Defaults to 8192. - * - * @param requestChunkSize If request or response body exceeds this value - */ - public void setRequestChunkSize(final int requestChunkSize) { - if (requestChunkSize < 128) { - throw new IllegalArgumentException("Minimum accepted chunk size is 128b"); - } - if (this.eventQueue != null) { - throw new IllegalStateException("Cannot modify property after initialization"); - } - this.requestChunkSize = requestChunkSize; - } - - public boolean isAggregateResponseChunks() { - return aggregateResponseChunks; - } - - /** - * If the response is transferred in chunks, whether they should be automatically grouped or not. - *

- * Defaults to {@code true}. - * - * @param aggregateResponseChunks {@code true} to aggregate http response chunks automatically, {@code false} - * otherwise. - */ - public void setAggregateResponseChunks(final boolean aggregateResponseChunks) { - if (this.eventQueue != null) { - throw new IllegalStateException("Cannot modify property after initialization"); - } - this.aggregateResponseChunks = aggregateResponseChunks; - } - - public int getMaxConnectionsPerHost() { - return maxConnectionsPerHost; - } - - /** - * Sets the maximum number of active connections per host. - *

- * This number also limits the number of connections being established so that - * {@code connectionsOpen + connectionsOpening <= maxConnectionsPerHost} is always true. - *

- * Defaults to 3. - * - * @param maxConnectionsPerHost Maximum number of total active connections (open + opening) per host at a given - * time. Minimum value is 1. - */ - public void setMaxConnectionsPerHost(final int maxConnectionsPerHost) { - if (maxConnectionsPerHost < 1) { - throw new IllegalArgumentException("MaxConnectionsPerHost must be > 1"); - } - if (this.eventQueue != null) { - throw new IllegalStateException("Cannot modify property after initialization"); - } - this.maxConnectionsPerHost = maxConnectionsPerHost; - } - - public int getMaxQueuedRequests() { - return this.maxQueuedRequests; - } - - /** - * Sets the maximum number of queued requests for this client. - *

- * If the number of queued requests is exceeded, calling - * {@linkplain #execute(String, int, HttpRequest, HttpResponseProcessor) one of the variants of {@code execute()}} - * will throw a {@link CannotExecuteRequestException}. - *

- * Defaults to {@link Short#MAX_VALUE}. - * - * @param maxQueuedRequests Maximum number of queued requests at any given moment. - */ - public void setMaxQueuedRequests(final int maxQueuedRequests) { - if (maxQueuedRequests < 1) { - throw new IllegalArgumentException("MaxQueuedRequests must be > 1"); - } - if (this.eventQueue != null) { - throw new IllegalStateException("Cannot modify property after initialization"); - } - this.maxQueuedRequests = maxQueuedRequests; - } - - public int getConnectionTimeoutInMillis() { - return connectionTimeoutInMillis; - } - - /** - * Sets the connection to host timeout, in milliseconds. - *

- * Defaults to 2000. - * - * @param connectionTimeoutInMillis Connection to host timeout, in milliseconds. - */ - public void setConnectionTimeoutInMillis(final int connectionTimeoutInMillis) { - if (connectionTimeoutInMillis < 0) { - throw new IllegalArgumentException("ConnectionTimeoutInMillis must be >= 0 (0 means infinite)"); - } - if (this.eventQueue != null) { - throw new IllegalStateException("Cannot modify property after initialization"); - } - this.connectionTimeoutInMillis = connectionTimeoutInMillis; - } - - public int getRequestTimeoutInMillis() { - return requestTimeoutInMillis; - } - - /** - * Sets the default request timeout, in milliseconds. - *

- * When {@link #execute(String, int, HttpRequest, HttpResponseProcessor)} is called (i.e. the variant without - * explicit request timeout) then this value is applied as the request timeout. - *

- * Requests whose execution time exceeds (precision depends on the {@link TimeoutManager} chosen) this value will be - * considered failed and their {@link HttpRequestFuture} will be released with cause - * {@link HttpRequestFuture#TIMED_OUT}. - *

- * Defaults to 2000. - * - * @param requestTimeoutInMillis Default request timeout, in milliseconds. - */ - public void setRequestTimeoutInMillis(final int requestTimeoutInMillis) { - if (requestTimeoutInMillis <= 0) { - throw new IllegalArgumentException("RequestTimeoutInMillis must be >= 0 (0 means infinite)"); - } - if (this.eventQueue != null) { - throw new IllegalStateException("Cannot modify property after initialization"); - } - this.requestTimeoutInMillis = requestTimeoutInMillis; - } - - public boolean isUseNio() { - return useNio; - } - - /** - * Whether this client should use non-blocking IO (New I/O or NIO) or blocking IO (Plain Socket Old IO or OIO). - *

- * NIO is generally better for higher throughput (scenarios with an elevated number of open connections) while OIO - * is always better for latency (and scenarios where a low number of connections is open). - *

- * If the number of connections open is not supposed to exceed 10~20, then use OIO as it typically presents better - * results. - *

- * Since the writes in OIO are blocking, the HTTP connections will delegate the call to - * {@link org.jboss.netty.channel.Channel#write(Object)} to an executor (provided by this {@link HttpClient}). - *

- * Defaults to {@code true}. - * - * @param useNio {@code true} if this client should use NIO, {@code false} if it should use OIO. - */ - public void setUseNio(final boolean useNio) { - if (this.eventQueue != null) { - throw new IllegalStateException("Cannot modify property after initialization"); - } - this.useNio = useNio; - } - - public int getMaxIoWorkerThreads() { - return maxIoWorkerThreads; - } - - /** - * Maximum number of worker threads for the executor provided to Netty's {@link ChannelFactory}. - *

- * Defaults to 50. - * - * @param maxIoWorkerThreads Maximum number of IO worker threads. - */ - public void setMaxIoWorkerThreads(final int maxIoWorkerThreads) { - if (maxIoWorkerThreads <= 1) { - throw new IllegalArgumentException("Minimum value for maxIoWorkerThreads is 1"); - } - if (this.eventQueue != null) { - throw new IllegalStateException("Cannot modify property after initialization"); - } - this.maxIoWorkerThreads = maxIoWorkerThreads; - } - - public int getMaxEventProcessorHelperThreads() { - return maxEventProcessorHelperThreads; - } - - /** - * Maximum number of helper threads for the event processor. - *

- * There are tasks performed by the internal event processor that are blocking and/or slow and need not be executed - * in serial mode. Therefore the event processor delegates them to helper threads in order to keep doing what it's - * supposed to do: consume events from the event queue. - *

- * Defaults to 20. - * - * @param maxEventProcessorHelperThreads Maximum number of IO worker threads. - */ - public void setMaxEventProcessorHelperThreads(final int maxEventProcessorHelperThreads) { - if (maxEventProcessorHelperThreads <= 3) { - throw new IllegalArgumentException("Minimum value for maxEventProcessorHelperThreads is 3"); - } - if (this.eventQueue != null) { - throw new IllegalStateException("Cannot modify property after initialization"); - } - this.maxEventProcessorHelperThreads = maxEventProcessorHelperThreads; - } - - public HostContextFactory getHostContextFactory() { - return hostContextFactory; - } - - /** - * The {@link HostContextFactory} that will be used to create new {@link HostContext} instances. - *

- * Defaults to {@link DefaultHostContextFactory} if none is provided. - * - * @param hostContextFactory The {@link HostContextFactory} to be used. - * - * @see com.biasedbit.hotpotato.client.host.factory.HostContextFactory - * @see com.biasedbit.hotpotato.client.host.HostContext - */ - public void setHostContextFactory(final HostContextFactory hostContextFactory) { - if (this.eventQueue != null) { - throw new IllegalStateException("Cannot modify property after initialization"); - } - this.hostContextFactory = hostContextFactory; - } - - public HttpConnectionFactory getConnectionFactory() { - return connectionFactory; - } - - /** - * The {@link HttpConnectionFactory} that will be used to create new {@link HttpConnection}. - *

- * Defaults to {@link DefaultHttpConnectionFactory} if none is provided. - * - * @param connectionFactory The {@link HttpConnectionFactory} to be used. - * - * @see com.biasedbit.hotpotato.client.connection.factory.HttpConnectionFactory - * @see com.biasedbit.hotpotato.client.connection.HttpConnection - */ - public void setConnectionFactory(final HttpConnectionFactory connectionFactory) { - if (this.eventQueue != null) { - throw new IllegalStateException("Cannot modify property after initialization"); - } - this.connectionFactory = connectionFactory; - } - - public HttpRequestFutureFactory getFutureFactory() { - return futureFactory; - } - - /** - * The {@link HttpRequestFutureFactory} that will be used to create new {@link HttpRequestFuture}. - *

- * Defaults to {@link DefaultHttpRequestFutureFactory} if none is provided. - * - * @param futureFactory The {@link HttpRequestFutureFactory} to be used. - * - * @see com.biasedbit.hotpotato.request.factory.HttpRequestFutureFactory - * @see com.biasedbit.hotpotato.request.HttpRequestFuture - */ - public void setFutureFactory(final HttpRequestFutureFactory futureFactory) { - if (this.eventQueue != null) { - throw new IllegalStateException("Cannot modify property after initialization"); - } - this.futureFactory = futureFactory; - } - - public TimeoutManager getTimeoutManager() { - return timeoutManager; - } - - /** - * The {@link TimeoutManager} that will be used to check request timeouts. - *

- * If no instance is provided, a new instance is created upon calling {@link #init()}. This instance will be - * automatically terminated when {@link #terminate()} is called. - *

- * If an external {@link TimeoutManager} is provided, then it must be pre-initialised (i.e. its - * {@link TimeoutManager#init()} must be called and return {@code true}) and it must be post-terminated (i.e. its - * {@link TimeoutManager#terminate()} must be called after this instance of {@link HttpClient} is disposed). - *

- * Defaults to a new instance of {@link HashedWheelTimeoutManager}. - * - * @param timeoutManager The {@link TimeoutManager} instance to use. - * - * @see com.biasedbit.hotpotato.client.timeout.TimeoutManager - */ - public void setTimeoutManager(final TimeoutManager timeoutManager) { - if (this.eventQueue != null) { - throw new IllegalStateException("Cannot modify property after initialization"); - } - - this.timeoutManager = timeoutManager; - } - - public boolean isCleanupInactiveHostContexts() { - return cleanupInactiveHostContexts; - } - - /** - * Whether empty {@link HostContext}s should be immediately cleaned up. - *

- * When a {@linkplain HostContext host context} has no more queued requests nor active connections nor connections - * opening, it is eligible for cleanup. Setting this flag to {@code true} will cause them to be instantly reaped - * when such conditions are met. - *

- * Unless your client will be performing requests to many different host/port combinations, you should set this flag - * to {@code false}. While the overhead of creating/cleaning these contexts is minimal, it can be avoided in these - * scenarios. - *

- * Defaults to {@code true}. - * - * @param cleanupInactiveHostContexts {@code true} if inactive host contexts should be cleaned up, {@code false} - * otherwise. - * - * @see com.biasedbit.hotpotato.client.host.HostContext - */ - public void setCleanupInactiveHostContexts(final boolean cleanupInactiveHostContexts) { - if (this.eventQueue != null) { - throw new IllegalStateException("Cannot modify property after initialization"); - } - - this.cleanupInactiveHostContexts = cleanupInactiveHostContexts; - } - - public Map getContextMap() { - // Purely for unit testing purposes... - return Collections.unmodifiableMap(this.contextMap); - } - - // low level overrides -------------------------------------------------------------------------------------------- - - - - @Override - public String toString() { - return this.getClass().getSimpleName() + '@' + Integer.toHexString(this.hashCode()); - } -} +/* + * Copyright 2010 Bruno de Carvalho + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.biasedbit.hotpotato.client; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLContext; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory; +import org.jboss.netty.handler.codec.http.HttpChunkAggregator; +import org.jboss.netty.handler.codec.http.HttpClientCodec; +import org.jboss.netty.handler.codec.http.HttpContentCompressor; +import org.jboss.netty.handler.codec.http.HttpContentDecompressor; +import org.jboss.netty.handler.codec.http.HttpHeaders; +import org.jboss.netty.handler.codec.http.HttpRequest; +import org.jboss.netty.handler.ssl.SslHandler; +import org.jboss.netty.util.internal.ExecutorUtil; + +import com.biasedbit.hotpotato.client.connection.HttpConnection; +import com.biasedbit.hotpotato.client.connection.HttpConnectionListener; +import com.biasedbit.hotpotato.client.connection.factory.DefaultHttpConnectionFactory; +import com.biasedbit.hotpotato.client.connection.factory.HttpConnectionFactory; +import com.biasedbit.hotpotato.client.event.ConnectionClosedEvent; +import com.biasedbit.hotpotato.client.event.ConnectionFailedEvent; +import com.biasedbit.hotpotato.client.event.ConnectionOpenEvent; +import com.biasedbit.hotpotato.client.event.EventType; +import com.biasedbit.hotpotato.client.event.ExecuteRequestEvent; +import com.biasedbit.hotpotato.client.event.HttpClientEvent; +import com.biasedbit.hotpotato.client.event.RequestCompleteEvent; +import com.biasedbit.hotpotato.client.host.HostContext; +import com.biasedbit.hotpotato.client.host.factory.DefaultHostContextFactory; +import com.biasedbit.hotpotato.client.host.factory.HostContextFactory; +import com.biasedbit.hotpotato.client.timeout.HashedWheelTimeoutManager; +import com.biasedbit.hotpotato.client.timeout.TimeoutManager; +import com.biasedbit.hotpotato.logging.Logger; +import com.biasedbit.hotpotato.request.HttpRequestFuture; +import com.biasedbit.hotpotato.request.factory.DefaultHttpRequestFutureFactory; +import com.biasedbit.hotpotato.request.factory.HttpRequestFutureFactory; +import com.biasedbit.hotpotato.response.DiscardProcessor; +import com.biasedbit.hotpotato.response.HttpResponseProcessor; +import com.biasedbit.hotpotato.util.CleanupChannelGroup; +import com.biasedbit.hotpotato.util.NamedThreadFactory; + +/** + * Abstract implementation of the {@link HttpClient} interface. Contains most of the boilerplate code that other + * {@link HttpClient} implementations would also need. + *

+ * This abstract implementation is in itself complete. If you extend this class, you needn't implement a single method + * (just like {@link DefaultHttpClient} does). However you may want to override the specific behavior of one or other + * method, rather than reimplement the whole class all over again (just like {@link StatsGatheringHttpClient} does - + * only overrides the {@code eventHandlingLoop()} method to gather execution statistics). + *

+ *

Thread safety and performance

This default implementation is thread-safe and, unlike Apache HttpClient, the performance does not + * degrade when the instance is shared by multiple threads accessing it at the same time. + *

+ *

Event queue (producer/consumer)

When this implementation is initialised, it fires up an auxilliary thread, + * the consumer. + *

+ * Every time one of the variants of the method {@code execute()} is called, a new {@link HttpClientEvent} is generated + * and introduced in a blocking event queue (on the caller's thread execution time). The consumer then grabs that + * request and acts accordingly - it can either queue the request so that it is later executed in an available + * connection, request a new connection in case no connections are available, directly execute this request, etc. + *

+ *

Order

By using an event queue, absolute order is guaranteed. If thread A calls {@code execute()} with a + * request to host H prior to thread B (which also places a request for the same host), then the request provided by + * thread A is guaranteed to be placed (i.e. written to the network) before the request placed by + * thread B. + *

+ * This doesn't mean that request A will hit the server before request B or that the response for request A will arrive + * before B. The reasons are obvious: + * + *

    + *
  • A can end up in a connection slower than B's
  • + *
  • Server can respond faster on one socket than on the other
  • + *
  • Response for request B can have 10b and for request A 10bKb
  • + *
  • etc
  • + *
+ * If you need to guarantee that a request B can only hit the server after a request A, you can either manually manage + * that in your code through the {@link HttpRequestFuture} API or configure the concrete instance of this class to allow + * at most 1 connection per host - although this last option will hurt performance globally. + * + *
+ *
Note:
+ * Calling {@linkplain #execute(String, int, HttpRequest, HttpResponseProcessor) one of the variants of {@code execute}} + * with the client configured with {@linkplain #setAutoInflate(boolean) auto-inflation} turned on will cause a + * 'ACCEPT_ENCODING' header to be added with value 'GZIP'. + *
+ * + * @author Bruno de Carvalho + */ +public abstract class AbstractHttpClient implements HttpClient, HttpConnectionListener { + + // constants ------------------------------------------------------------------------------------------------------ + + protected static final Logger LOG = Logger.getLogger(AbstractHttpClient.class); + protected static final HttpClientEvent POISON = new HttpClientEvent() { + + public EventType getEventType() { + return null; + } + + + @Override + public String toString() { + return "POISON"; + } + }; + + // configuration defaults ----------------------------------------------------------------------------------------- + + protected static final boolean USE_SSL = false; + protected static final int REQUEST_COMPRESSION_LEVEL = 0; + protected static final boolean AUTO_INFLATE = false; + protected static final int REQUEST_CHUNK_SIZE = 8192; + protected static final boolean AGGREGATE_RESPONSE_CHUNKS = false; + protected static final int CONNECTION_TIMEOUT_IN_MILLIS = 2000; + protected static final int REQUEST_TIMEOUT_IN_MILLIS = 2000; + protected static final int MAX_CONNECTIONS_PER_HOST = 3; + protected static final int MAX_QUEUED_REQUESTS = Short.MAX_VALUE; + protected static final boolean USE_NIO = false; + protected static final int MAX_IO_WORKER_THREADS = 50; + protected static final int MAX_EVENT_PROCESSOR_HELPER_THREADS = 20; + protected static final boolean CLEANUP_INACTIVE_HOST_CONTEXTS = true; + + // configuration -------------------------------------------------------------------------------------------------- + + protected boolean useSsl; + protected int requestCompressionLevel; + protected boolean autoInflate; + protected int requestChunkSize; + protected boolean aggregateResponseChunks; + protected int maxConnectionsPerHost; + protected int maxQueuedRequests; + protected int connectionTimeoutInMillis; + protected int requestTimeoutInMillis; + protected boolean useNio; + protected int maxIoWorkerThreads; + protected int maxEventProcessorHelperThreads; + protected HttpConnectionFactory connectionFactory; + protected HostContextFactory hostContextFactory; + protected HttpRequestFutureFactory futureFactory; + protected TimeoutManager timeoutManager; + protected boolean cleanupInactiveHostContexts; + + // internal vars -------------------------------------------------------------------------------------------------- + + protected Executor executor; + protected ChannelFactory channelFactory; + protected ChannelPipelineFactory pipelineFactory; + protected ChannelGroup channelGroup; + protected BlockingQueue eventQueue; + protected final Map contextMap; + protected final AtomicInteger queuedRequests; + protected int connectionCounter; + protected CountDownLatch eventConsumerLatch; + protected volatile boolean terminate; + protected boolean internalTimeoutManager; + protected SSLContext sslContext = null; + + // constructors --------------------------------------------------------------------------------------------------- + + public AbstractHttpClient() { + this.useSsl = USE_SSL; + this.requestCompressionLevel = REQUEST_COMPRESSION_LEVEL; + this.autoInflate = AUTO_INFLATE; + this.requestChunkSize = REQUEST_CHUNK_SIZE; + this.aggregateResponseChunks = AGGREGATE_RESPONSE_CHUNKS; + this.connectionTimeoutInMillis = CONNECTION_TIMEOUT_IN_MILLIS; + this.requestTimeoutInMillis = REQUEST_TIMEOUT_IN_MILLIS; + this.maxConnectionsPerHost = MAX_CONNECTIONS_PER_HOST; + this.maxQueuedRequests = MAX_QUEUED_REQUESTS; + this.useNio = USE_NIO; + this.maxIoWorkerThreads = MAX_IO_WORKER_THREADS; + this.maxEventProcessorHelperThreads = MAX_EVENT_PROCESSOR_HELPER_THREADS; + this.cleanupInactiveHostContexts = CLEANUP_INACTIVE_HOST_CONTEXTS; + + this.queuedRequests = new AtomicInteger(0); + + // No need for synchronized structures here, as they'll be accessed by a single thread + this.contextMap = new HashMap(); + } + + // HttpClient ----------------------------------------------------------------------------------------------------- + + + public boolean init() { + if (this.timeoutManager == null) { + // Consumes less resources, puts less emphasis on precision. + this.timeoutManager = new HashedWheelTimeoutManager(); + //this.timeoutManager = new BasicTimeoutManager(10); + this.timeoutManager.init(); + this.internalTimeoutManager = true; + } + + if (this.hostContextFactory == null) { + this.hostContextFactory = new DefaultHostContextFactory(); + } + if (this.connectionFactory == null) { + this.connectionFactory = new DefaultHttpConnectionFactory(); + } + if (this.futureFactory == null) { + this.futureFactory = new DefaultHttpRequestFutureFactory(); + } + + this.eventConsumerLatch = new CountDownLatch(1); + this.eventQueue = new LinkedBlockingQueue(); + + // TODO instead of fixed size thread pool, use a cached thread pool with size limit (limited growth cached pool) + this.executor = Executors.newFixedThreadPool(this.maxEventProcessorHelperThreads, + new NamedThreadFactory("hpttHandyman")); + Executor workerPool = Executors.newFixedThreadPool(this.maxIoWorkerThreads); + + if (this.useNio) { + // It's only going to create 1 thread, so no harm done here. + Executor bossPool = Executors.newCachedThreadPool(); + this.channelFactory = new NioClientSocketChannelFactory(bossPool, workerPool); + } else { + this.channelFactory = new OioClientSocketChannelFactory(workerPool); + } + + this.channelGroup = new CleanupChannelGroup(this.toString()); + // Create a pipeline without the last handler (it will be added right before connecting). + this.pipelineFactory = new ChannelPipelineFactory() { + + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + if (useSsl) { + if (null == sslContext) { + throw new IllegalStateException( + "Cannot establish ssl connection because no SSLContext was provided" + ); + } + SSLEngine engine = sslContext.createSSLEngine(); + engine.setUseClientMode(true); + pipeline.addLast("ssl", new SslHandler(engine)); + } + if (requestCompressionLevel > 0) { + pipeline.addLast("deflater", new HttpContentCompressor(requestCompressionLevel)); + } + pipeline.addLast("codec", new HttpClientCodec(4096, 8192, requestChunkSize)); + if (autoInflate) { + pipeline.addLast("inflater", new HttpContentDecompressor()); + } + if (aggregateResponseChunks) { + pipeline.addLast("aggregator", new HttpChunkAggregator(1048576)); + } + return pipeline; + } + }; + + this.executor.execute(new Runnable() { + + public void run() { + eventHandlingLoop(); + } + }); + return true; + } + + + public void terminate() { + if (this.terminate || (this.eventQueue == null)) { + return; + } + + // Stop accepting requests. + this.terminate = true; + // Copy any pending operations in order to signal execution request failures. + Collection pendingEvents = new ArrayList(this.eventQueue); + // Clear the queue and kill the consumer thread by "poisoning" the event queue. + this.eventQueue.clear(); + this.eventQueue.add(POISON); + try { + this.eventConsumerLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Fail all requests that were still in the event queue. + for (HttpClientEvent event : pendingEvents) { + switch (event.getEventType()) { + case EXECUTE_REQUEST: + ((ExecuteRequestEvent) event).getContext().getFuture().setFailure(HttpRequestFuture.SHUTTING_DOWN); + case CONNECTION_CLOSED: + ConnectionClosedEvent closedEvent = (ConnectionClosedEvent) event; + if ((closedEvent.getRetryRequests() != null) && !closedEvent.getRetryRequests().isEmpty()) { + for (HttpRequestContext context : closedEvent.getRetryRequests()) { + context.getFuture().setFailure(HttpRequestFuture.SHUTTING_DOWN); + } + } + } + } + + // Kill all connections (will cause failure on requests executing in those connections) and fail context-queued + // requests. + for (HostContext hostContext : this.contextMap.values()) { + for (HttpRequestContext context : hostContext.getQueue()) { + context.getFuture().setFailure(HttpRequestFuture.SHUTTING_DOWN); + } + for (HttpConnection connection : hostContext.getConnectionPool().getConnections()) { + connection.terminate(); + } + } + this.contextMap.clear(); + + try { + this.channelGroup.close().await(1000); + } catch (InterruptedException e) { + Thread.interrupted(); + } + + this.channelFactory.releaseExternalResources(); + if (this.executor != null) { + ExecutorUtil.terminate(this.executor); + } + + if (this.internalTimeoutManager) { + this.timeoutManager.terminate(); + } + } + + + public HttpRequestFuture execute(final String host, final int port, final HttpRequest request, + final HttpResponseProcessor processor) + throws CannotExecuteRequestException { + return this.execute(host, port, this.requestTimeoutInMillis, request, processor); + } + + + public HttpRequestFuture execute(final String host, final int port, final HttpRequest request) throws CannotExecuteRequestException { + return this.execute(host, port, request, DiscardProcessor.getInstance()); + } + + + public HttpRequestFuture execute(final String host, final int port, final int timeout, final HttpRequest request, + final HttpResponseProcessor processor) + throws CannotExecuteRequestException { + if (this.eventQueue == null) { + throw new CannotExecuteRequestException(this.getClass().getSimpleName() + " was not initialised"); + } + + if (this.queuedRequests.incrementAndGet() > this.maxQueuedRequests) { + this.queuedRequests.decrementAndGet(); + throw new CannotExecuteRequestException("Request queue is full"); + } + + // Perform these checks on the caller thread's time rather than the event dispatcher's. + if (this.autoInflate) { + request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); + } + + HttpRequestFuture future = this.futureFactory.getFuture(true); + HttpRequestContext context = new HttpRequestContext(host, port, timeout, request, processor, future); + if (this.terminate || !this.eventQueue.offer(new ExecuteRequestEvent(context))) { + throw new CannotExecuteRequestException("Failed to add request to queue"); + } + + return future; + } + + + public boolean isHttps() { + return this.useSsl; + } + + // HttpConnectionListener ----------------------------------------------------------------------------------------- + + + public void connectionOpened(final HttpConnection connection) { + if (this.terminate) { + return; + } + this.eventQueue.offer(new ConnectionOpenEvent(connection)); + } + + + public void connectionTerminated(final HttpConnection connection, final Collection retryRequests) { + if (this.terminate) { + if ((retryRequests != null) && !retryRequests.isEmpty()) { + for (HttpRequestContext request : retryRequests) { + request.getFuture().setFailure(HttpRequestFuture.SHUTTING_DOWN); + } + } + return; + } + this.eventQueue.offer(new ConnectionClosedEvent(connection, retryRequests)); + } + + + public void connectionTerminated(final HttpConnection connection) { + if (this.terminate) { + return; + } + this.eventQueue.offer(new ConnectionClosedEvent(connection, null)); + } + + + public void connectionFailed(final HttpConnection connection) { + if (this.terminate) { + return; + } + this.eventQueue.offer(new ConnectionFailedEvent(connection)); + } + + + public void requestFinished(final HttpConnection connection, final HttpRequestContext context) { + if (this.terminate) { + return; + } + this.eventQueue.offer(new RequestCompleteEvent(context)); + } + + // protected helpers ---------------------------------------------------------------------------------------------- + + protected void eventHandlingLoop() { + for (;;) { + // Manual synchronization here because before removing an element, we first need to check whether an + // active available connection exists to satisfy the request. + try { + HttpClientEvent event = this.eventQueue.take(); + if (event == POISON) { + this.eventConsumerLatch.countDown(); + return; + } + + switch (event.getEventType()) { + case EXECUTE_REQUEST: + this.handleExecuteRequest((ExecuteRequestEvent) event); + break; + case REQUEST_COMPLETE: + this.handleRequestComplete((RequestCompleteEvent) event); + break; + case CONNECTION_OPEN: + this.handleConnectionOpen((ConnectionOpenEvent) event); + break; + case CONNECTION_CLOSED: + this.handleConnectionClosed((ConnectionClosedEvent) event); + break; + case CONNECTION_FAILED: + this.handleConnectionFailed((ConnectionFailedEvent) event); + break; + default: + // Consume and do nothing, unknown event. + } + } catch (InterruptedException e) { + // ignore, poisoning the queue is the only way to stop + } + } + } + + // private helpers -------------------------------------------------------------------------------------------- + + protected void handleExecuteRequest(final ExecuteRequestEvent event) { + // First, add it to the queue (or create a queue for given host if one does not exist) + String id = this.hostId(event.getContext()); + HostContext context = this.contextMap.get(id); + if (context == null) { + context = this.hostContextFactory + .createHostContext(event.getContext().getHost(), event.getContext().getPort(), + this.maxConnectionsPerHost); + this.contextMap.put(id, context); + } + + context.addToQueue(event.getContext()); + this.drainQueueAndProcessResult(context); + } + + protected void handleRequestComplete(final RequestCompleteEvent event) { + this.queuedRequests.decrementAndGet(); + + HostContext context = this.contextMap.get(this.hostId(event.getContext())); + if (context == null) { + // Can only happen if context is cleaned meanwhile... ignore and bail out. + return; + } + + this.drainQueueAndProcessResult(context); + } + + protected void handleConnectionOpen(final ConnectionOpenEvent event) { + String id = this.hostId(event.getConnection()); + HostContext context = this.contextMap.get(id); + if (context == null) { + throw new IllegalStateException("Context for id '" + id + + "' does not exist (it may have been incorrectly cleaned up)"); + } + + context.getConnectionPool().connectionOpen(event.getConnection()); + // Rather than go through the whole process of drainQueue(), simplyp poll a single element from the head of + // the queue into this connection (a newly opened connection is ALWAYS available). + HttpRequestContext nextRequest = context.pollQueue(); + if (nextRequest != null) { + event.getConnection().execute(nextRequest); + } + } + + protected void handleConnectionClosed(final ConnectionClosedEvent event) { + // Update the list of available connections for the same host:port. + String id = this.hostId(event.getConnection()); + HostContext context = this.contextMap.get(id); + if (context == null) { + throw new IllegalStateException("Context for id '" + id + + "' does not exist (it may have been incorrectly cleaned up)"); + } + + context.getConnectionPool().connectionClosed(event.getConnection()); + + if (event.getRetryRequests() != null) { + context.restoreRequestsToQueue(event.getRetryRequests()); + } + + if ((context.getConnectionPool().getTotalConnections() == 0) && context.getQueue().isEmpty() && + this.cleanupInactiveHostContexts) { + // No requests in queue, no connections open or opening... Cleanup resources. + this.contextMap.remove(id); + } + + this.drainQueueAndProcessResult(context); + } + + protected void handleConnectionFailed(final ConnectionFailedEvent event) { + // Update the list of available connections for the same host:port. + String id = this.hostId(event.getConnection()); + HostContext context = this.contextMap.get(id); + if (context == null) { + throw new IllegalStateException("Context for id '" + id + + "' does not exist (it may have been incorrectly cleaned up)"); + } + + context.getConnectionPool().connectionFailed(); + if ((context.getConnectionPool().hasConnectionFailures() && + (context.getConnectionPool().getTotalConnections() == 0))) { + // Connection failures occured and there are no more connections active or establishing, so its time to + // fail all queued requests. + context.failAllRequests(HttpRequestFuture.CANNOT_CONNECT); + } + } + + protected void drainQueueAndProcessResult(final HostContext context) { + HostContext.DrainQueueResult result = context.drainQueue(); + switch (result) { + case OPEN_CONNECTION: + this.openConnection(context); + break; + case QUEUE_EMPTY: + case NOT_DRAINED: + case DRAINED: + default: + } + } + + protected String hostId(final HttpConnection connection) { + return this.hostId(connection.getHost(), connection.getPort()); + } + + protected String hostId(final HttpRequestContext context) { + return this.hostId(context.getHost(), context.getPort()); + } + + protected String hostId(final HostContext context) { + return this.hostId(context.getHost(), context.getPort()); + } + + protected String hostId(final String host, final int port) { + return new StringBuilder().append(host).append(":").append(port).toString(); + } + + protected void openConnection(final HostContext context) { + // No need to recheck whether a connection can be opened or not, that was done already inside the HttpContext. + + // Try to create a pipeline before signalling a new connection is being open. + // This should never throw exceptions but who knows... + final ChannelPipeline pipeline; + try { + pipeline = this.pipelineFactory.getPipeline(); + } catch (Exception e) { + LOG.error("Failed to create pipeline.", e); + // bail out before marking a connection as opening. + return; + } + + // Signal that a new connection is opening. + context.getConnectionPool().connectionOpening(); + + // server:port-X + String id = new StringBuilder().append(this.hostId(context)).append("-") + .append(this.connectionCounter++).toString(); + + // If not using NIO, then delegate the blocking write() call to the executor. + Executor writeDelegator = this.useNio ? null : this.executor; + + final HttpConnection connection = this.connectionFactory + .createConnection(id, context.getHost(), context.getPort(), this, this.timeoutManager, writeDelegator); + + pipeline.addLast("handler", connection); + + // Delegate actual connection to other thread, since calling connect is a blocking call. + this.executor.execute(new Runnable() { + + public void run() { + ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); + bootstrap.setOption("reuseAddress", true); + bootstrap.setOption("connectTimeoutMillis", connectionTimeoutInMillis); + bootstrap.setPipeline(pipeline); + bootstrap.connect(new InetSocketAddress(context.getHost(), context.getPort())).addListener(new ChannelFutureListener() { + public void operationComplete(final ChannelFuture future) throws Exception { + if (future.isSuccess()) { + // Don't even bother checking if client was already instructed to terminate since + // CleanupChannelGroup takes care of that. + channelGroup.add(future.getChannel()); + } + } + }); + } + }); + } + + // getters & setters ---------------------------------------------------------------------------------------------- + + public boolean isUseSsl() { + return useSsl; + } + + /** + * Whether this client should create SSL or non-SSL connections. + *

+ * All connections are affected by this flag. + *

+ * Defaults to {@code false}. + * + * @param useSsl {@code true} if all connections will have SSL support, {@code false} otherwise. + */ + public void setUseSsl(final boolean useSsl) { + if (this.eventQueue != null) { + throw new IllegalStateException("Cannot modify property after initialization"); + } + this.useSsl = useSsl; + } + + public SSLContext getSSLContext() + { + return sslContext; + } + + public void setSSLContext(SSLContext sslContext) + { + this.sslContext = sslContext; + } + + public int getRequestCompressionLevel() { + return requestCompressionLevel; + } + + /** + * Level of compression when sending requests. + *

+ * Defaults to 0. + * + * @param requestCompressionLevel Level of compression between 0 and 9; 0 = off and 9 = max. + */ + public void setRequestCompressionLevel(final int requestCompressionLevel) { + if ((requestCompressionLevel < 0) || (requestCompressionLevel > 9)) { + throw new IllegalArgumentException("RequestCompressionLevel must be in range [0;9] (0 = none, 9 = max)"); + } + if (this.eventQueue != null) { + throw new IllegalStateException("Cannot modify property after initialization"); + } + this.requestCompressionLevel = requestCompressionLevel; + } + + public boolean isAutoInflate() { + return autoInflate; + } + + /** + * Whether responses should be auto inflated (decompressed) or not. + *

+ * Setting this flag to true will cause a 'Accept-Encoding' header with value 'gzip' to be added to the requests + * submitted. + *

+ * Defaults to {@code true}. + * + * @param autoInflate {@code true} if the connections should automatically decompress gzip content, {@code false} + * otherwise. + */ + public void setAutoInflate(final boolean autoInflate) { + if (this.eventQueue != null) { + throw new IllegalStateException("Cannot modify property after initialization"); + } + this.autoInflate = autoInflate; + } + + public int getRequestChunkSize() { + return requestChunkSize; + } + + /** + * Maximum size for HTTP request chunks. + * If the contents of the requests exceed this value, the request will be chunked and a 'Transfer-Encoding' header + * will be added with value 'chunked'. + *

+ * Defaults to 8192. + * + * @param requestChunkSize If request or response body exceeds this value + */ + public void setRequestChunkSize(final int requestChunkSize) { + if (requestChunkSize < 128) { + throw new IllegalArgumentException("Minimum accepted chunk size is 128b"); + } + if (this.eventQueue != null) { + throw new IllegalStateException("Cannot modify property after initialization"); + } + this.requestChunkSize = requestChunkSize; + } + + public boolean isAggregateResponseChunks() { + return aggregateResponseChunks; + } + + /** + * If the response is transferred in chunks, whether they should be automatically grouped or not. + *

+ * Defaults to {@code true}. + * + * @param aggregateResponseChunks {@code true} to aggregate http response chunks automatically, {@code false} + * otherwise. + */ + public void setAggregateResponseChunks(final boolean aggregateResponseChunks) { + if (this.eventQueue != null) { + throw new IllegalStateException("Cannot modify property after initialization"); + } + this.aggregateResponseChunks = aggregateResponseChunks; + } + + public int getMaxConnectionsPerHost() { + return maxConnectionsPerHost; + } + + /** + * Sets the maximum number of active connections per host. + *

+ * This number also limits the number of connections being established so that + * {@code connectionsOpen + connectionsOpening <= maxConnectionsPerHost} is always true. + *

+ * Defaults to 3. + * + * @param maxConnectionsPerHost Maximum number of total active connections (open + opening) per host at a given + * time. Minimum value is 1. + */ + public void setMaxConnectionsPerHost(final int maxConnectionsPerHost) { + if (maxConnectionsPerHost < 1) { + throw new IllegalArgumentException("MaxConnectionsPerHost must be > 1"); + } + if (this.eventQueue != null) { + throw new IllegalStateException("Cannot modify property after initialization"); + } + this.maxConnectionsPerHost = maxConnectionsPerHost; + } + + public int getMaxQueuedRequests() { + return this.maxQueuedRequests; + } + + /** + * Sets the maximum number of queued requests for this client. + *

+ * If the number of queued requests is exceeded, calling + * {@linkplain #execute(String, int, HttpRequest, HttpResponseProcessor) one of the variants of {@code execute()}} + * will throw a {@link CannotExecuteRequestException}. + *

+ * Defaults to {@link Short#MAX_VALUE}. + * + * @param maxQueuedRequests Maximum number of queued requests at any given moment. + */ + public void setMaxQueuedRequests(final int maxQueuedRequests) { + if (maxQueuedRequests < 1) { + throw new IllegalArgumentException("MaxQueuedRequests must be > 1"); + } + if (this.eventQueue != null) { + throw new IllegalStateException("Cannot modify property after initialization"); + } + this.maxQueuedRequests = maxQueuedRequests; + } + + public int getConnectionTimeoutInMillis() { + return connectionTimeoutInMillis; + } + + /** + * Sets the connection to host timeout, in milliseconds. + *

+ * Defaults to 2000. + * + * @param connectionTimeoutInMillis Connection to host timeout, in milliseconds. + */ + public void setConnectionTimeoutInMillis(final int connectionTimeoutInMillis) { + if (connectionTimeoutInMillis < 0) { + throw new IllegalArgumentException("ConnectionTimeoutInMillis must be >= 0 (0 means infinite)"); + } + if (this.eventQueue != null) { + throw new IllegalStateException("Cannot modify property after initialization"); + } + this.connectionTimeoutInMillis = connectionTimeoutInMillis; + } + + public int getRequestTimeoutInMillis() { + return requestTimeoutInMillis; + } + + /** + * Sets the default request timeout, in milliseconds. + *

+ * When {@link #execute(String, int, HttpRequest, HttpResponseProcessor)} is called (i.e. the variant without + * explicit request timeout) then this value is applied as the request timeout. + *

+ * Requests whose execution time exceeds (precision depends on the {@link TimeoutManager} chosen) this value will be + * considered failed and their {@link HttpRequestFuture} will be released with cause + * {@link HttpRequestFuture#TIMED_OUT}. + *

+ * Defaults to 2000. + * + * @param requestTimeoutInMillis Default request timeout, in milliseconds. + */ + public void setRequestTimeoutInMillis(final int requestTimeoutInMillis) { + if (requestTimeoutInMillis <= 0) { + throw new IllegalArgumentException("RequestTimeoutInMillis must be >= 0 (0 means infinite)"); + } + if (this.eventQueue != null) { + throw new IllegalStateException("Cannot modify property after initialization"); + } + this.requestTimeoutInMillis = requestTimeoutInMillis; + } + + public boolean isUseNio() { + return useNio; + } + + /** + * Whether this client should use non-blocking IO (New I/O or NIO) or blocking IO (Plain Socket Old IO or OIO). + *

+ * NIO is generally better for higher throughput (scenarios with an elevated number of open connections) while OIO + * is always better for latency (and scenarios where a low number of connections is open). + *

+ * If the number of connections open is not supposed to exceed 10~20, then use OIO as it typically presents better + * results. + *

+ * Since the writes in OIO are blocking, the HTTP connections will delegate the call to + * {@link org.jboss.netty.channel.Channel#write(Object)} to an executor (provided by this {@link HttpClient}). + *

+ * Defaults to {@code true}. + * + * @param useNio {@code true} if this client should use NIO, {@code false} if it should use OIO. + */ + public void setUseNio(final boolean useNio) { + if (this.eventQueue != null) { + throw new IllegalStateException("Cannot modify property after initialization"); + } + this.useNio = useNio; + } + + public int getMaxIoWorkerThreads() { + return maxIoWorkerThreads; + } + + /** + * Maximum number of worker threads for the executor provided to Netty's {@link ChannelFactory}. + *

+ * Defaults to 50. + * + * @param maxIoWorkerThreads Maximum number of IO worker threads. + */ + public void setMaxIoWorkerThreads(final int maxIoWorkerThreads) { + if (maxIoWorkerThreads <= 1) { + throw new IllegalArgumentException("Minimum value for maxIoWorkerThreads is 1"); + } + if (this.eventQueue != null) { + throw new IllegalStateException("Cannot modify property after initialization"); + } + this.maxIoWorkerThreads = maxIoWorkerThreads; + } + + public int getMaxEventProcessorHelperThreads() { + return maxEventProcessorHelperThreads; + } + + /** + * Maximum number of helper threads for the event processor. + *

+ * There are tasks performed by the internal event processor that are blocking and/or slow and need not be executed + * in serial mode. Therefore the event processor delegates them to helper threads in order to keep doing what it's + * supposed to do: consume events from the event queue. + *

+ * Defaults to 20. + * + * @param maxEventProcessorHelperThreads Maximum number of IO worker threads. + */ + public void setMaxEventProcessorHelperThreads(final int maxEventProcessorHelperThreads) { + if (maxEventProcessorHelperThreads <= 3) { + throw new IllegalArgumentException("Minimum value for maxEventProcessorHelperThreads is 3"); + } + if (this.eventQueue != null) { + throw new IllegalStateException("Cannot modify property after initialization"); + } + this.maxEventProcessorHelperThreads = maxEventProcessorHelperThreads; + } + + public HostContextFactory getHostContextFactory() { + return hostContextFactory; + } + + /** + * The {@link HostContextFactory} that will be used to create new {@link HostContext} instances. + *

+ * Defaults to {@link DefaultHostContextFactory} if none is provided. + * + * @param hostContextFactory The {@link HostContextFactory} to be used. + * + * @see com.biasedbit.hotpotato.client.host.factory.HostContextFactory + * @see com.biasedbit.hotpotato.client.host.HostContext + */ + public void setHostContextFactory(final HostContextFactory hostContextFactory) { + if (this.eventQueue != null) { + throw new IllegalStateException("Cannot modify property after initialization"); + } + this.hostContextFactory = hostContextFactory; + } + + public HttpConnectionFactory getConnectionFactory() { + return connectionFactory; + } + + /** + * The {@link HttpConnectionFactory} that will be used to create new {@link HttpConnection}. + *

+ * Defaults to {@link DefaultHttpConnectionFactory} if none is provided. + * + * @param connectionFactory The {@link HttpConnectionFactory} to be used. + * + * @see com.biasedbit.hotpotato.client.connection.factory.HttpConnectionFactory + * @see com.biasedbit.hotpotato.client.connection.HttpConnection + */ + public void setConnectionFactory(final HttpConnectionFactory connectionFactory) { + if (this.eventQueue != null) { + throw new IllegalStateException("Cannot modify property after initialization"); + } + this.connectionFactory = connectionFactory; + } + + public HttpRequestFutureFactory getFutureFactory() { + return futureFactory; + } + + /** + * The {@link HttpRequestFutureFactory} that will be used to create new {@link HttpRequestFuture}. + *

+ * Defaults to {@link DefaultHttpRequestFutureFactory} if none is provided. + * + * @param futureFactory The {@link HttpRequestFutureFactory} to be used. + * + * @see com.biasedbit.hotpotato.request.factory.HttpRequestFutureFactory + * @see com.biasedbit.hotpotato.request.HttpRequestFuture + */ + public void setFutureFactory(final HttpRequestFutureFactory futureFactory) { + if (this.eventQueue != null) { + throw new IllegalStateException("Cannot modify property after initialization"); + } + this.futureFactory = futureFactory; + } + + public TimeoutManager getTimeoutManager() { + return timeoutManager; + } + + /** + * The {@link TimeoutManager} that will be used to check request timeouts. + *

+ * If no instance is provided, a new instance is created upon calling {@link #init()}. This instance will be + * automatically terminated when {@link #terminate()} is called. + *

+ * If an external {@link TimeoutManager} is provided, then it must be pre-initialised (i.e. its + * {@link TimeoutManager#init()} must be called and return {@code true}) and it must be post-terminated (i.e. its + * {@link TimeoutManager#terminate()} must be called after this instance of {@link HttpClient} is disposed). + *

+ * Defaults to a new instance of {@link HashedWheelTimeoutManager}. + * + * @param timeoutManager The {@link TimeoutManager} instance to use. + * + * @see com.biasedbit.hotpotato.client.timeout.TimeoutManager + */ + public void setTimeoutManager(final TimeoutManager timeoutManager) { + if (this.eventQueue != null) { + throw new IllegalStateException("Cannot modify property after initialization"); + } + + this.timeoutManager = timeoutManager; + } + + public boolean isCleanupInactiveHostContexts() { + return cleanupInactiveHostContexts; + } + + /** + * Whether empty {@link HostContext}s should be immediately cleaned up. + *

+ * When a {@linkplain HostContext host context} has no more queued requests nor active connections nor connections + * opening, it is eligible for cleanup. Setting this flag to {@code true} will cause them to be instantly reaped + * when such conditions are met. + *

+ * Unless your client will be performing requests to many different host/port combinations, you should set this flag + * to {@code false}. While the overhead of creating/cleaning these contexts is minimal, it can be avoided in these + * scenarios. + *

+ * Defaults to {@code true}. + * + * @param cleanupInactiveHostContexts {@code true} if inactive host contexts should be cleaned up, {@code false} + * otherwise. + * + * @see com.biasedbit.hotpotato.client.host.HostContext + */ + public void setCleanupInactiveHostContexts(final boolean cleanupInactiveHostContexts) { + if (this.eventQueue != null) { + throw new IllegalStateException("Cannot modify property after initialization"); + } + + this.cleanupInactiveHostContexts = cleanupInactiveHostContexts; + } + + public Map getContextMap() { + // Purely for unit testing purposes... + return Collections.unmodifiableMap(this.contextMap); + } + + // low level overrides -------------------------------------------------------------------------------------------- + + + + @Override + public String toString() { + return this.getClass().getSimpleName() + '@' + Integer.toHexString(this.hashCode()); + } +} + diff --git a/src/main/java/com/biasedbit/hotpotato/client/connection/DefaultHttpConnection.java b/src/main/java/com/biasedbit/hotpotato/client/connection/DefaultHttpConnection.java index 578b374..5382a69 100644 --- a/src/main/java/com/biasedbit/hotpotato/client/connection/DefaultHttpConnection.java +++ b/src/main/java/com/biasedbit/hotpotato/client/connection/DefaultHttpConnection.java @@ -33,6 +33,7 @@ import com.biasedbit.hotpotato.client.HttpRequestContext; import com.biasedbit.hotpotato.client.timeout.TimeoutManager; import com.biasedbit.hotpotato.request.HttpRequestFuture; +import com.biasedbit.hotpotato.request.HttpRequestFutureListener; /** * Non-pipelining implementation of {@link HttpConnection} interface. @@ -51,7 +52,7 @@ * * @author Bruno de Carvalho */ -public class DefaultHttpConnection extends SimpleChannelUpstreamHandler implements HttpConnection { +public class DefaultHttpConnection extends SimpleChannelUpstreamHandler implements HttpConnection, HttpRequestFutureListener { // configuration defaults ----------------------------------------------------------------------------------------- @@ -282,6 +283,8 @@ public boolean execute(final HttpRequestContext context) { this.listener.requestFinished(this, context); return true; } + + context.getFuture().addListener(this); synchronized (this.mutex) { // This implementation only allows one execution at a time. If requests are performed during the period in @@ -495,4 +498,11 @@ public String toString() { .append('(').append(this.host).append(':').append(this.port) .append(")}").toString(); } + + public void operationComplete(HttpRequestFuture future) throws Exception + { + if(future.isCancelled()) { + terminate(); + } + } } diff --git a/src/main/java/com/biasedbit/hotpotato/util/DummyHttpServer.java b/src/main/java/com/biasedbit/hotpotato/util/DummyHttpServer.java index ac95f80..d4ffb40 100644 --- a/src/main/java/com/biasedbit/hotpotato/util/DummyHttpServer.java +++ b/src/main/java/com/biasedbit/hotpotato/util/DummyHttpServer.java @@ -154,7 +154,7 @@ public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); if (useSsl) { - SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine(); + SSLEngine engine = SecureChatSslContextFactory.getInstance().getServerContext().createSSLEngine(); engine.setUseClientMode(false); pipeline.addLast("ssl", new SslHandler(engine)); } @@ -342,3 +342,4 @@ public void run() { }); } } + diff --git a/src/main/java/org/jboss/netty/example/securechat/SecureChatSslContextFactory.java b/src/main/java/org/jboss/netty/example/securechat/SecureChatSslContextFactory.java index 387e2b5..2a1a281 100644 --- a/src/main/java/org/jboss/netty/example/securechat/SecureChatSslContextFactory.java +++ b/src/main/java/org/jboss/netty/example/securechat/SecureChatSslContextFactory.java @@ -35,17 +35,26 @@ public class SecureChatSslContextFactory { private static final String PROTOCOL = "TLS"; - private static final SSLContext SERVER_CONTEXT; - private static final SSLContext CLIENT_CONTEXT; - static { + private final SSLContext serverContext; + private final SSLContext clientContext; + + private static final SecureChatSslContextFactory INSTANCE = new SecureChatSslContextFactory(); + + public static final SecureChatSslContextFactory getInstance() + { + return INSTANCE; + } + + public SecureChatSslContextFactory() + { String algorithm = Security.getProperty("ssl.KeyManagerFactory.algorithm"); if (algorithm == null) { algorithm = "SunX509"; } - SSLContext serverContext; - SSLContext clientContext; + SSLContext tmpServerContext; + SSLContext tmpClientContext; try { KeyStore ks = KeyStore.getInstance("JKS"); ks.load(SecureChatKeyStore.asInputStream(), SecureChatKeyStore.getKeyStorePassword()); @@ -55,28 +64,29 @@ public class SecureChatSslContextFactory { kmf.init(ks, SecureChatKeyStore.getCertificatePassword()); // Initialize the SSLContext to work with our key managers. - serverContext = SSLContext.getInstance(PROTOCOL); - serverContext.init(kmf.getKeyManagers(), SecureChatTrustManagerFactory.getTrustManagers(), null); + tmpServerContext = SSLContext.getInstance(PROTOCOL); + tmpServerContext.init(kmf.getKeyManagers(), SecureChatTrustManagerFactory.getTrustManagers(), null); } catch (Exception e) { throw new Error("Failed to initialize the server-side SSLContext", e); } try { - clientContext = SSLContext.getInstance(PROTOCOL); - clientContext.init(null, SecureChatTrustManagerFactory.getTrustManagers(), null); + tmpClientContext = SSLContext.getInstance(PROTOCOL); + tmpClientContext.init(null, SecureChatTrustManagerFactory.getTrustManagers(), null); } catch (Exception e) { throw new Error("Failed to initialize the client-side SSLContext", e); } - SERVER_CONTEXT = serverContext; - CLIENT_CONTEXT = clientContext; + serverContext = tmpServerContext; + clientContext = tmpClientContext; } - public static SSLContext getServerContext() { - return SERVER_CONTEXT; + public SSLContext getServerContext() { + return serverContext; } - public static SSLContext getClientContext() { - return CLIENT_CONTEXT; + public SSLContext getClientContext() { + return clientContext; } } + diff --git a/src/main/java/org/jboss/netty/example/securechat/SecureChatTrustManagerFactory.java b/src/main/java/org/jboss/netty/example/securechat/SecureChatTrustManagerFactory.java index 02bd1c7..e7b4ec9 100644 --- a/src/main/java/org/jboss/netty/example/securechat/SecureChatTrustManagerFactory.java +++ b/src/main/java/org/jboss/netty/example/securechat/SecureChatTrustManagerFactory.java @@ -72,3 +72,4 @@ protected void engineInit(ManagerFactoryParameters managerFactoryParameters) // Unused } } + diff --git a/src/test/java/com/biasedbit/hotpotato/client/HttpsTest.java b/src/test/java/com/biasedbit/hotpotato/client/HttpsTest.java new file mode 100644 index 0000000..a11b2b4 --- /dev/null +++ b/src/test/java/com/biasedbit/hotpotato/client/HttpsTest.java @@ -0,0 +1,127 @@ +package com.biasedbit.hotpotato.client; + +import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.*; + +import java.io.Closeable; +import java.io.FileInputStream; + +import java.security.KeyStore; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.List; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManagerFactory; + +import org.junit.Before; +import org.junit.Test; + +import com.biasedbit.hotpotato.response.BodyAsStringProcessor; +import com.biasedbit.hotpotato.request.HttpRequestFuture; +import com.biasedbit.hotpotato.client.DefaultHttpClient; + +import org.jboss.netty.handler.codec.http.DefaultHttpRequest; +import org.jboss.netty.handler.codec.http.HttpVersion; +import org.jboss.netty.handler.codec.http.HttpHeaders; +import org.jboss.netty.handler.codec.http.HttpMethod; + +/** + * + */ +public class HttpsTest { + @Test + public void testHttpsRequestSuccess() throws Exception + { + executeHttpsRequest("encrypted.google.com", "/", loadGoogleSslContext(), false); + } + + @Test + public void testHttpsRequestFailure() throws Exception + { + // This should fail since we don't have a cert for wellsfargo in our test trustStore... + executeHttpsRequest("www.wellsfargo.com", "/", loadGoogleSslContext(), true); + } + + private static SSLContext loadGoogleSslContext() throws Exception + { + String algorithm = "SunX509"; + String password = "ez24get"; + + KeyStore keyStore = KeyStore.getInstance("JKS"); + FileInputStream keyAsStream = null; + + try + { + keyAsStream = new FileInputStream("src/test/resources/certs/encrypted.google.com.jks"); + keyStore.load(keyAsStream, password.toCharArray()); + } + finally + { + closeQuietly(keyAsStream); + } + + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(algorithm); + TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(algorithm); + + keyManagerFactory.init(keyStore, password.toCharArray()); + trustManagerFactory.init(keyStore); + + SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), new SecureRandom()); + + return sslContext; + } + + private static void closeQuietly(Closeable... closeables) + { + if (null != closeables) + { + for (Closeable closeable : closeables) + { + if (null != closeable) + { + try + { + closeable.close(); + } + catch (Exception e) + { + // Shhh... + } + } + } + } + } + + private static void executeHttpsRequest(final String host, final String resource, final SSLContext sslContext, final boolean shouldFail) + throws Exception + { + DefaultHttpClient client = new DefaultHttpClient(); + client.setSSLContext(sslContext); + client.setUseSsl(true); + client.setUseNio(true); + client.init(); + + DefaultHttpRequest request = + new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, resource); + + request.setHeader(HttpHeaders.Names.HOST, host); + + HttpRequestFuture future = + client.execute(host, 443, request, new BodyAsStringProcessor()).await(); + + if (shouldFail) + { + assertThat(future.getResponseStatusCode(), is(not(200))); + } + else + { + assertEquals(200, future.getResponseStatusCode()); + } + + assertTrue(true); + } +} + diff --git a/src/test/resources/certs/encrypted.google.com.jks b/src/test/resources/certs/encrypted.google.com.jks new file mode 100644 index 0000000..2a25a4d Binary files /dev/null and b/src/test/resources/certs/encrypted.google.com.jks differ