Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
import io.vlingo.xoom.wire.channel.ResponseSenderChannel;
import io.vlingo.xoom.wire.message.ConsumerByteBuffer;
import io.vlingo.xoom.wire.message.ConsumerByteBufferPool;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.publisher.Sinks;

@SuppressWarnings("deprecation")
class RSocketChannelContext implements RequestResponseContext<FluxSink<ConsumerByteBuffer>> {
private final RequestChannelConsumer consumer;
private final Logger logger;
private final ConsumerByteBufferPool readBufferPool;
private final UnicastProcessor<Payload> processor;
private final Sinks.Many<Payload> sink;
private Object closingData;
private Object consumerData;

Expand All @@ -34,11 +34,11 @@ class RSocketChannelContext implements RequestResponseContext<FluxSink<ConsumerB
this.readBufferPool = new ConsumerByteBufferPool(
ElasticResourcePool.Config.of(maxBufferPoolSize), maxMessageSize);

processor = UnicastProcessor.create();
sink = Sinks.many().unicast().onBackpressureBuffer();
}

UnicastProcessor<Payload> processor() {
return processor;
Flux<Payload> flux() {
return sink.asFlux();
}

@Override
Expand Down Expand Up @@ -97,11 +97,10 @@ public void consume(Payload request) {
@Override
public void abandon() {
close();
processor.dispose();
}

@Override
public void respondWith(final ConsumerByteBuffer buffer) {
processor.onNext(ByteBufPayload.create(buffer.asByteBuffer()));
sink.emitNext(ByteBufPayload.create(buffer.asByteBuffer()), Sinks.EmitFailureHandler.FAIL_FAST);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@
import io.vlingo.xoom.wire.message.ConsumerByteBuffer;
import io.vlingo.xoom.wire.message.ConsumerByteBufferPool;
import io.vlingo.xoom.wire.node.Address;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Sinks;
import reactor.util.retry.Retry;

@SuppressWarnings("deprecation")
public class RSocketClientChannel implements ClientRequestResponseChannel {
private final EmitterProcessor<Payload> publisher;
private final Sinks.Many<Payload> sink;
private final Logger logger;
private final ChannelResponseHandler responseHandler;
private final Address address;
Expand All @@ -44,7 +43,7 @@ public RSocketClientChannel(final ClientTransport clientTransport, final Address

public RSocketClientChannel(final ClientTransport clientTransport, final Address address, final ResponseChannelConsumer consumer, final int maxBufferPoolSize,
final int maxMessageSize, final Logger logger, final Duration connectionTimeout) {
this.publisher = EmitterProcessor.create();
this.sink = Sinks.many().multicast().onBackpressureBuffer();
this.logger = logger;
this.address = address;
this.connectionTimeout = connectionTimeout;
Expand Down Expand Up @@ -73,7 +72,7 @@ public void requestWith(final ByteBuffer buffer) {
data.put(buffer);
data.flip();

this.publisher.onNext(ByteBufPayload.create(data));
this.sink.emitNext(ByteBufPayload.create(data), Sinks.EmitFailureHandler.FAIL_FAST);
} else {
logger.debug("RSocket client channel for {} not ready. Message dropped", this.address);
}
Expand All @@ -92,23 +91,17 @@ private void prepareChannel() {
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(transport)
.timeout(this.connectionTimeout)
.doOnError(throwable -> {
logger.error("Failed to create RSocket client channel for address {}", this.address, throwable);
})
.doOnError(throwable -> logger.error("Failed to create RSocket client channel for address {}", this.address, throwable))
.block();

if (this.channelSocket != null) {
this.channelSocket.requestChannel(this.publisher)
this.channelSocket.requestChannel(this.sink.asFlux())
.retryWhen(Retry.indefinitely()
.filter(throwable -> throwable instanceof ApplicationErrorException)
.doBeforeRetry(retrySignal -> {
logger.debug("RSocket client channel for address {} received a retry-able error", this.address, retrySignal.failure());
})
.doBeforeRetry(retrySignal -> logger.debug("RSocket client channel for address {} received a retry-able error", this.address, retrySignal.failure()))
)
.subscribe(responseHandler::handle, //process server response
throwable -> {
logger.error("RSocket client channel for address {} received unrecoverable error", this.address, throwable);
});
throwable -> logger.error("RSocket client channel for address {} received unrecoverable error", this.address, throwable));

logger.info("RSocket client channel opened for address {}", this.address);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public RSocketServerChannelActor(final RequestChannelConsumerProvider provider,
.doOnError((throwable) -> logger().error("Unexpected error when consuming channel request", throwable))
.subscribe();

return Flux.from(context.processor());
return context.flux();
}))
.bind(serverTransport)
.block();
Expand Down