From e2c146dc72036d9ce3dbb23756fa41f4c267a50e Mon Sep 17 00:00:00 2001 From: Jakub Zalas Date: Fri, 3 Sep 2021 13:02:15 +0100 Subject: [PATCH] Replace deprecated reactor processors See https://projectreactor.io/docs/core/release/reference/#processors "Since 3.4.0, sinks become the first class citizen and Processor are being phased out entirely" Signed-off-by: Jakub Zalas --- .../rsocket/RSocketChannelContext.java | 15 ++++++------ .../rsocket/RSocketClientChannel.java | 23 +++++++------------ .../rsocket/RSocketServerChannelActor.java | 2 +- 3 files changed, 16 insertions(+), 24 deletions(-) diff --git a/src/main/java/io/vlingo/xoom/wire/fdx/bidirectional/rsocket/RSocketChannelContext.java b/src/main/java/io/vlingo/xoom/wire/fdx/bidirectional/rsocket/RSocketChannelContext.java index 3fa94be..e9c36ed 100644 --- a/src/main/java/io/vlingo/xoom/wire/fdx/bidirectional/rsocket/RSocketChannelContext.java +++ b/src/main/java/io/vlingo/xoom/wire/fdx/bidirectional/rsocket/RSocketChannelContext.java @@ -16,15 +16,15 @@ import io.vlingo.xoom.wire.channel.ResponseSenderChannel; import io.vlingo.xoom.wire.message.ConsumerByteBuffer; import io.vlingo.xoom.wire.message.ConsumerByteBufferPool; +import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; -import reactor.core.publisher.UnicastProcessor; +import reactor.core.publisher.Sinks; -@SuppressWarnings("deprecation") class RSocketChannelContext implements RequestResponseContext> { private final RequestChannelConsumer consumer; private final Logger logger; private final ConsumerByteBufferPool readBufferPool; - private final UnicastProcessor processor; + private final Sinks.Many sink; private Object closingData; private Object consumerData; @@ -34,11 +34,11 @@ class RSocketChannelContext implements RequestResponseContext processor() { - return processor; + Flux flux() { + return sink.asFlux(); } @Override @@ -97,11 +97,10 @@ public void consume(Payload request) { @Override public void abandon() { close(); - processor.dispose(); } @Override public void respondWith(final ConsumerByteBuffer buffer) { - processor.onNext(ByteBufPayload.create(buffer.asByteBuffer())); + sink.emitNext(ByteBufPayload.create(buffer.asByteBuffer()), Sinks.EmitFailureHandler.FAIL_FAST); } } diff --git a/src/main/java/io/vlingo/xoom/wire/fdx/bidirectional/rsocket/RSocketClientChannel.java b/src/main/java/io/vlingo/xoom/wire/fdx/bidirectional/rsocket/RSocketClientChannel.java index 71b5f26..2d146ea 100644 --- a/src/main/java/io/vlingo/xoom/wire/fdx/bidirectional/rsocket/RSocketClientChannel.java +++ b/src/main/java/io/vlingo/xoom/wire/fdx/bidirectional/rsocket/RSocketClientChannel.java @@ -24,12 +24,11 @@ import io.vlingo.xoom.wire.message.ConsumerByteBuffer; import io.vlingo.xoom.wire.message.ConsumerByteBufferPool; import io.vlingo.xoom.wire.node.Address; -import reactor.core.publisher.EmitterProcessor; +import reactor.core.publisher.Sinks; import reactor.util.retry.Retry; -@SuppressWarnings("deprecation") public class RSocketClientChannel implements ClientRequestResponseChannel { - private final EmitterProcessor publisher; + private final Sinks.Many sink; private final Logger logger; private final ChannelResponseHandler responseHandler; private final Address address; @@ -44,7 +43,7 @@ public RSocketClientChannel(final ClientTransport clientTransport, final Address public RSocketClientChannel(final ClientTransport clientTransport, final Address address, final ResponseChannelConsumer consumer, final int maxBufferPoolSize, final int maxMessageSize, final Logger logger, final Duration connectionTimeout) { - this.publisher = EmitterProcessor.create(); + this.sink = Sinks.many().multicast().onBackpressureBuffer(); this.logger = logger; this.address = address; this.connectionTimeout = connectionTimeout; @@ -73,7 +72,7 @@ public void requestWith(final ByteBuffer buffer) { data.put(buffer); data.flip(); - this.publisher.onNext(ByteBufPayload.create(data)); + this.sink.emitNext(ByteBufPayload.create(data), Sinks.EmitFailureHandler.FAIL_FAST); } else { logger.debug("RSocket client channel for {} not ready. Message dropped", this.address); } @@ -92,23 +91,17 @@ private void prepareChannel() { .payloadDecoder(PayloadDecoder.ZERO_COPY) .connect(transport) .timeout(this.connectionTimeout) - .doOnError(throwable -> { - logger.error("Failed to create RSocket client channel for address {}", this.address, throwable); - }) + .doOnError(throwable -> logger.error("Failed to create RSocket client channel for address {}", this.address, throwable)) .block(); if (this.channelSocket != null) { - this.channelSocket.requestChannel(this.publisher) + this.channelSocket.requestChannel(this.sink.asFlux()) .retryWhen(Retry.indefinitely() .filter(throwable -> throwable instanceof ApplicationErrorException) - .doBeforeRetry(retrySignal -> { - logger.debug("RSocket client channel for address {} received a retry-able error", this.address, retrySignal.failure()); - }) + .doBeforeRetry(retrySignal -> logger.debug("RSocket client channel for address {} received a retry-able error", this.address, retrySignal.failure())) ) .subscribe(responseHandler::handle, //process server response - throwable -> { - logger.error("RSocket client channel for address {} received unrecoverable error", this.address, throwable); - }); + throwable -> logger.error("RSocket client channel for address {} received unrecoverable error", this.address, throwable)); logger.info("RSocket client channel opened for address {}", this.address); diff --git a/src/main/java/io/vlingo/xoom/wire/fdx/bidirectional/rsocket/RSocketServerChannelActor.java b/src/main/java/io/vlingo/xoom/wire/fdx/bidirectional/rsocket/RSocketServerChannelActor.java index cc74b99..159e7d3 100644 --- a/src/main/java/io/vlingo/xoom/wire/fdx/bidirectional/rsocket/RSocketServerChannelActor.java +++ b/src/main/java/io/vlingo/xoom/wire/fdx/bidirectional/rsocket/RSocketServerChannelActor.java @@ -42,7 +42,7 @@ public RSocketServerChannelActor(final RequestChannelConsumerProvider provider, .doOnError((throwable) -> logger().error("Unexpected error when consuming channel request", throwable)) .subscribe(); - return Flux.from(context.processor()); + return context.flux(); })) .bind(serverTransport) .block();