Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
fd11bbd
refactor: update RabbitMQ integration
luisgomez29 Mar 9, 2026
4dd307c
Merge remote-tracking branch 'origin/master' into feature/eclipse-vertex
luisgomez29 Mar 23, 2026
591002d
feat: add Reactor RabbitMQ integration and related tests
luisgomez29 Mar 24, 2026
e69f58c
refactor: replace volatile FluxSink with AtomicReference in ReactiveM…
luisgomez29 Mar 24, 2026
2be7014
refactor: replace volatile fields with AtomicReference in message lis…
luisgomez29 Mar 24, 2026
b50ea6c
refactor: replace MyOutboundMessage with OutboundMessage in message h…
luisgomez29 Mar 24, 2026
cc1a098
refactor: update CloudEventContextWriter to handle numeric values acc…
luisgomez29 Mar 24, 2026
dae4588
refactor: simplify CloudEventDeserializer by consolidating data readi…
luisgomez29 Mar 24, 2026
5ce58fb
docs: update README to include Reactor RabbitMQ module and original l…
luisgomez29 Mar 24, 2026
016eee8
chore: delete starter async-rabbit-standalone
luisgomez29 Mar 25, 2026
a95581d
refactor: update resource specifications to use reactor.rabbitmq package
luisgomez29 Mar 25, 2026
8ff147f
Revert "refactor: replace volatile FluxSink with AtomicReference in R…
luisgomez29 Mar 25, 2026
e177c3d
Revert "refactor: replace volatile fields with AtomicReference in mes…
luisgomez29 Mar 25, 2026
76b8709
refactor: replace AtomicBoolean with volatile boolean for published s…
luisgomez29 Mar 25, 2026
b0a1d58
docs: update README to reflect removal of volatile fields and other e…
luisgomez29 Mar 25, 2026
fccaac5
refactor: update imports to use reactor.rabbitmq specifications
luisgomez29 Mar 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,39 @@
![](https://github.com/reactive-commons/reactive-commons-java/workflows/reactive-commons-ci-cd/badge.svg)
[![Reactor RabbitMQ](https://maven-badges.herokuapp.com/maven-central/org.reactivecommons/async-commons-rabbit-starter/badge.svg)](https://mvnrepository.com/artifact/org.reactivecommons/async-commons-rabbit-starter)
![CI/CD](https://github.com/reactive-commons/reactive-commons-java/workflows/reactive-commons-ci-cd/badge.svg)
[![Maven Central](https://img.shields.io/maven-central/v/org.reactivecommons/async-commons-rabbit-starter)](https://central.sonatype.com/artifact/org.reactivecommons/async-commons-rabbit-starter)

# reactive-commons-java

The purpose of reactive-commons is to provide a set of abstractions and implementations over different patterns and practices that make the foundation of a reactive microservices architecture.

Docs: [https://bancolombia.github.io/reactive-commons-java/](https://bancolombia.github.io/reactive-commons-java)
Even though the main purpose is to provide such abstractions in a mostly generic way, they would be of little use without a concrete implementation. So we provide implementations in a best-effort manner that aim to be easy to change, personalize, and extend.

The first approach to this work was to release simple abstractions and a corresponding implementation over asynchronous message-driven communication between microservices, built on top of Project Reactor and Spring Boot.

---

## Documentation

**Full documentation is available at:**

> ### 👉 [https://bancolombia.github.io/reactive-commons-java/](https://bancolombia.github.io/reactive-commons-java)

---

## Related

> - **Other projects:** [https://github.com/bancolombia](https://github.com/bancolombia)
> - **Sponsored by:** [Bancolombia Tech](https://medium.com/bancolombia-tech)

---

Other projects: https://github.com/bancolombia
## Third-Party Code Credits

Sponsor by: https://medium.com/bancolombia-tech
This project includes source code internalized from the following open-source libraries:

Even though the main purpose is to provide such abstractions in a mostly generic way such abstractions would be of little use without a concrete implementation so we provide some implementations in a best effors maner that aim to be easy to change, personalize and extend.
### reactor-rabbitmq
- **Repository:** [https://github.com/spring-attic/reactor-rabbitmq](https://github.com/spring-attic/reactor-rabbitmq)
- **License:** [Apache License 2.0](https://github.com/spring-attic/reactor-rabbitmq/blob/main/LICENSE)

The first approach to this work was to release a very simple abstractions and a corresponding implementation over asyncronous message driven communication between microservices build on top of project-reactor and spring boot.
### CloudEvents JSON Jackson
- **Repository:** [https://github.com/cloudevents/sdk-java](https://github.com/cloudevents/sdk-java)
- **License:** [Apache License 2.0](https://github.com/cloudevents/sdk-java/blob/main/LICENSE)
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,110 @@ void serveQueryDelegateWithLambda() {
.hasSize(1);
}

@Test
void handleCommandWithDomain() {
SomeDomainCommandHandler<SomeDataClass> handler = new SomeDomainCommandHandler<>();
registry.handleCommand(domain, name, handler, SomeDataClass.class);
assertThat(registry.getCommandHandlers().get(domain))
.anySatisfy(registered -> assertThat(registered)
.extracting(RegisteredCommandHandler::path, RegisteredCommandHandler::inputClass)
.containsExactly(name, SomeDataClass.class)).hasSize(1);
}

@Test
void handleCloudEventCommandWithDomain() {
SomeCloudCommandHandler handler = new SomeCloudCommandHandler();
registry.handleCloudEventCommand(domain, name, handler);
assertThat(registry.getCommandHandlers().get(domain))
.anySatisfy(registered -> assertThat(registered)
.extracting(RegisteredCommandHandler::path, RegisteredCommandHandler::inputClass)
.containsExactly(name, CloudEvent.class)).hasSize(1);
}

@Test
void handleRawCommandWithDomain() {
SomeRawCommandEventHandler handler = new SomeRawCommandEventHandler();
registry.handleRawCommand(domain, handler);
assertThat(registry.getCommandHandlers().get(domain))
.anySatisfy(registered -> assertThat(registered)
.extracting(RegisteredCommandHandler::path, RegisteredCommandHandler::inputClass)
.containsExactly("", RawMessage.class)).hasSize(1);
}

@Test
void listenDomainRawEvent() {
SomeRawEventHandler handler = new SomeRawEventHandler();
registry.listenDomainRawEvent(domain, name, handler);
assertThat(registry.getDomainEventListeners().get(domain))
.anySatisfy(registered -> assertThat(registered)
.extracting(RegisteredEventListener::path, RegisteredEventListener::inputClass)
.containsExactly(name, RawMessage.class)).hasSize(1);
}

@Test
void listenNotificationEventWithDomain() {
registry.listenNotificationEvent(domain, name, evt -> Mono.empty(), SomeDataClass.class);
assertThat(registry.getEventNotificationListener().get(domain))
.anySatisfy(listener -> assertThat(listener.path()).isEqualTo(name)).hasSize(1);
}

@Test
void listenNotificationCloudEventWithDomain() {
registry.listenNotificationCloudEvent(domain, name, ce -> Mono.empty());
assertThat(registry.getEventNotificationListener().get(domain))
.anySatisfy(listener -> assertThat(listener)
.extracting(RegisteredEventListener::path, RegisteredEventListener::inputClass)
.containsExactly(name, CloudEvent.class)).hasSize(1);
}

@Test
void listenNotificationRawEventWithDomain() {
SomeRawEventHandler handler = new SomeRawEventHandler();
registry.listenNotificationRawEvent(domain, name, handler);
assertThat(registry.getEventNotificationListener().get(domain))
.anySatisfy(registered -> assertThat(registered)
.extracting(RegisteredEventListener::path, RegisteredEventListener::inputClass)
.containsExactly(name, RawMessage.class)).hasSize(1);
}

@Test
void serveCloudEventQueryDelegate() {
QueryHandlerDelegate<Void, CloudEvent> delegate = (from, ce) -> Mono.empty();
registry.serveCloudEventQuery(name, delegate);
assertThat(registry.getHandlers().get(DEFAULT_DOMAIN))
.anySatisfy(registered -> assertThat(registered)
.extracting(RegisteredQueryHandler::path, RegisteredQueryHandler::queryClass)
.containsExactly(name, CloudEvent.class)).hasSize(1);
}

@Test
void listenQueueSimple() {
registry.listenQueue("myQueue", msg -> Mono.empty());
assertThat(registry.getQueueHandlers().get(DEFAULT_DOMAIN))
.anySatisfy(q -> assertThat(q.queueName()).isEqualTo("myQueue")).hasSize(1);
}

@Test
void listenQueueWithDomain() {
registry.listenQueue(domain, "myQueue", msg -> Mono.empty());
assertThat(registry.getQueueHandlers().get(domain))
.anySatisfy(q -> assertThat(q.queueName()).isEqualTo("myQueue")).hasSize(1);
}

@Test
void listenQueueWithTopology() {
registry.listenQueue("myQueue", msg -> Mono.empty(), creator -> Mono.empty());
assertThat(registry.getQueueHandlers().get(DEFAULT_DOMAIN))
.anySatisfy(q -> assertThat(q.queueName()).isEqualTo("myQueue")).hasSize(1);
}

@Test
void listenQueueWithDomainAndTopology() {
registry.listenQueue(domain, "myQueue", msg -> Mono.empty(), creator -> Mono.empty());
assertThat(registry.getQueueHandlers().get(domain))
.anySatisfy(q -> assertThat(q.queueName()).isEqualTo("myQueue")).hasSize(1);
}

private static class SomeQueryHandlerDelegate implements QueryHandlerDelegate<Void, SomeDataClass> {
@Override
public Mono<Void> handle(From from, SomeDataClass message) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.reactivecommons.async.commons;

import org.junit.jupiter.api.Test;
import org.reactivecommons.async.api.handlers.CommandHandler;
import org.reactivecommons.async.commons.communications.Message;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.util.Map;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class CommandExecutorTest {

@SuppressWarnings("unchecked")
@Test
void executeCallsHandlerWithConvertedMessage() {
CommandHandler<String> handler = mock(CommandHandler.class);
when(handler.handle(any())).thenReturn(Mono.empty());

CommandExecutor<String> executor = new CommandExecutor<>(handler, msg -> "converted");
StepVerifier.create(executor.execute(createMessage()))
.verifyComplete();
}

private Message createMessage() {
return new Message() {
@Override
public String getType() { return "test"; }
@Override
public byte[] getBody() { return new byte[0]; }
@Override
public Properties getProperties() {
return new Properties() {
@Override
public String getContentType() { return "application/json"; }
@Override
public long getContentLength() { return 0; }
@Override
public Map<String, Object> getHeaders() { return Map.of(); }
};
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package org.reactivecommons.async.commons;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.exceptions.MessageConversionException;
import reactor.core.publisher.Mono;

import java.net.URI;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

@SuppressWarnings("unchecked")
class DLQDiscardNotifierTest {

DomainEventBus eventBus;
MessageConverter messageConverter;
DLQDiscardNotifier notifier;

@BeforeEach
void setUp() {
eventBus = mock(DomainEventBus.class);
messageConverter = mock(MessageConverter.class);
notifier = new DLQDiscardNotifier(eventBus, messageConverter);
when(eventBus.emit(any(DomainEvent.class))).thenReturn(Mono.empty());
when(eventBus.emit(any(CloudEvent.class))).thenReturn(Mono.empty());
}

@Test
void notifyDiscardWithCloudEvent() {
var message = createMessage("body".getBytes(), "application/cloudevents+json");
var cloudEvent = CloudEventBuilder.v1()
.withId("123")
.withType("test.type")
.withSource(URI.create("/test"))
.build();
when(messageConverter.readCloudEvent(message)).thenReturn(cloudEvent);

notifier.notifyDiscard(message).block();

verify(eventBus).emit(argThat((CloudEvent ce) -> ce.getType().equals("test.type.dlq")));
}

@Test
void notifyDiscardWithCommandJson() {
var message = createMessage(
"{\"name\":\"myCmd\",\"commandId\":\"cid\",\"data\":{}}".getBytes(),
"application/json");

// The notifier reads the message as JsonSkeleton via messageConverter.readValue
// We mock readValue to throw for CloudEvent check (not cloud event), then succeed for skeleton
when(messageConverter.readCloudEvent(message)).thenThrow(new RuntimeException("not a CE"));

// Since message content-type is not cloud event, it reads JsonSkeleton
// We need to mock readValue for the private JsonSkeleton class
// Instead, use doAnswer to return a command-like object
doAnswer(inv -> {
Class<?> cls = inv.getArgument(1);
var mapper = new tools.jackson.databind.json.JsonMapper();
return mapper.readValue(message.getBody(), cls);
}).when(messageConverter).readValue(eq(message), any(Class.class));

notifier.notifyDiscard(message).block();

verify(eventBus).emit(argThat((DomainEvent<?> ev) -> ev.getName().equals("myCmd.dlq")));
}

@Test
void notifyDiscardWithEventJson() {
var message = createMessage(
"{\"name\":\"myEvt\",\"eventId\":\"eid\",\"data\":{}}".getBytes(),
"application/json");

doAnswer(inv -> {
Class<?> cls = inv.getArgument(1);
var mapper = new tools.jackson.databind.json.JsonMapper();
return mapper.readValue(message.getBody(), cls);
}).when(messageConverter).readValue(eq(message), any(Class.class));

notifier.notifyDiscard(message).block();

verify(eventBus).emit(argThat((DomainEvent<?> ev) -> ev.getName().equals("myEvt.dlq")));
}

@Test
void notifyDiscardWithQueryJson() {
var message = createMessage(
"{\"resource\":\"query.test\",\"queryData\":{}}".getBytes(),
"application/json");

doAnswer(inv -> {
Class<?> cls = inv.getArgument(1);
var mapper = new tools.jackson.databind.json.JsonMapper();
return mapper.readValue(message.getBody(), cls);
}).when(messageConverter).readValue(eq(message), any(Class.class));

notifier.notifyDiscard(message).block();

verify(eventBus).emit(argThat((DomainEvent<?> ev) -> ev.getName().equals("query.test.dlq")));
}

@Test
void notifyDiscardWithUnreadableMessage() {
var message = createMessage("garbage".getBytes(), "application/json");

when(messageConverter.readValue(eq(message), any(Class.class)))
.thenThrow(new MessageConversionException("cannot read"));

notifier.notifyDiscard(message).block();

verify(eventBus).emit(argThat((DomainEvent<?> ev) -> ev.getName().equals("corruptData.dlq")));
}

@Test
void notifyDiscardHandlesEmitError() {
var message = createMessage("body".getBytes(), "application/json");
when(messageConverter.readValue(eq(message), any(Class.class)))
.thenThrow(new MessageConversionException("bad"));
when(eventBus.emit(any(DomainEvent.class))).thenReturn(Mono.error(new RuntimeException("bus error")));

// Should not throw, returns empty
var result = notifier.notifyDiscard(message).block();
assertThat(result).isNull();
}

private Message createMessage(byte[] body, String contentType) {
return new Message() {
@Override
public String getType() {
return "test";
}

@Override
public byte[] getBody() {
return body;
}

@Override
public Properties getProperties() {
return new Properties() {
@Override
public String getContentType() {
return contentType;
}

@Override
public long getContentLength() {
return body.length;
}

@Override
public Map<String, Object> getHeaders() {
return Map.of();
}
};
}
};
}
}
Loading
Loading