StarTower Orb is a Go library that provides automatic OpenTelemetry instrumentation for RabbitMQ using the amqp091-go client. It enables distributed tracing across your RabbitMQ-based microservices with minimal code changes.
- Automatic Tracing: Instruments RabbitMQ publish and consume operations
- Context Propagation: Propagates trace context via W3C headers in message headers
- Semantic Conventions: Follows OpenTelemetry semantic conventions for messaging
- Configurable: Customizable tracers, propagators, and span attributes
- Drop-in Replacement: Minimal changes to existing
amqp091-gocode - Production Ready: Comprehensive error handling and graceful degradation
go get github.com/startower-observability/orbpackage main
import (
"context"
"log"
"github.com/rabbitmq/amqp091-go"
orb "github.com/startower-observability/orb/instrumentation"
"go.opentelemetry.io/otel"
)
func main() {
// Initialize OpenTelemetry (tracer provider, etc.)
// ... your OTel setup code ...
// Connect to RabbitMQ with instrumentation
conn, err := orb.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// Create instrumented channel
ch, err := conn.ChannelWithTracing()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
ctx := context.Background()
// Publish with tracing
err = ch.PublishWithTracing(ctx,
"my-exchange", "routing.key", false, false,
amqp091.Publishing{
ContentType: "text/plain",
Body: []byte("Hello, World!"),
})
if err != nil {
log.Fatal(err)
}
}func consumeMessages(ch *orb.Channel) {
ctx := context.Background()
// Define message handler
handler := func(ctx context.Context, delivery amqp091.Delivery) error {
// Process message with trace context
log.Printf("Received message: %s", delivery.Body)
// Your business logic here
return nil
}
// Start consuming with tracing
err := ch.ConsumeWithTracing(ctx,
"my-queue", "consumer-tag", false, false, false, false, nil, handler)
if err != nil {
log.Fatal(err)
}
}// Custom publisher configuration
publisherConfig := orb.PublisherConfig{
Tracer: otel.Tracer("my-service"),
SpanNameFormatter: func(exchange, routingKey string) string {
return fmt.Sprintf("publish to %s", exchange)
},
AttributeEnricher: func(ctx context.Context, exchange, routingKey string, msg *amqp091.Publishing) []trace.SpanStartOption {
return []trace.SpanStartOption{
trace.WithAttributes(
attribute.String("custom.attribute", "value"),
),
}
},
}
// Custom consumer configuration
consumerConfig := orb.ConsumerConfig{
Tracer: otel.Tracer("my-service"),
SpanNameFormatter: func(queueName string, delivery *amqp091.Delivery) string {
return fmt.Sprintf("process from %s", queueName)
},
}
// Create connection with custom config
conn, err := orb.DialWithConfig("amqp://localhost:5672/", orb.ConnectionConfig{
ChannelConfig: orb.ChannelConfig{
PublisherConfig: publisherConfig,
ConsumerConfig: consumerConfig,
},
})// Get raw deliveries channel
deliveries, err := ch.ConsumeWithContext(ctx, "my-queue", "", false, false, false, false, nil)
if err != nil {
log.Fatal(err)
}
for delivery := range deliveries {
// Wrap delivery with tracing
ctx, span := ch.WrapDeliveryWithTracing(context.Background(), "my-queue", &delivery)
// Process message
err := processMessage(ctx, delivery)
// Handle acknowledgment
if err != nil {
delivery.Nack(false, true)
span.RecordError(err)
} else {
delivery.Ack(false)
}
span.End()
}// Use publisher directly
publisher := orb.NewDefaultPublisher()
err := publisher.Publish(ctx, channel, "exchange", "key", false, false, msg)
// Use consumer directly
consumer := orb.NewDefaultConsumer()
err := consumer.ProcessDelivery(ctx, "queue", delivery, handler)
// Use propagation directly
orb.InjectToPublishing(ctx, &publishing)
ctx = orb.ExtractFromDelivery(ctx, &delivery)The library follows OpenTelemetry semantic conventions for messaging:
| Attribute | Description | Example |
|---|---|---|
messaging.system |
Messaging system | rabbitmq |
messaging.destination |
Queue or exchange name | user.events |
messaging.destination_kind |
Destination type | queue, topic |
messaging.rabbitmq.routing_key |
Routing key | user.created |
messaging.operation |
Operation type | publish, receive |
messaging.message_id |
Message ID | msg-123 |
messaging.conversation_id |
Correlation ID | conv-456 |
- Producer spans: Created for publish operations (
SpanKindProducer) - Consumer spans: Created for consume operations (
SpanKindConsumer)
The library automatically:
- Injects trace context into message headers when publishing
- Extracts trace context from message headers when consuming
- Links producer and consumer spans across service boundaries
Headers are injected using the W3C Trace Context format in the amqp091.Publishing.Headers field.
The library provides graceful error handling:
- Spans are marked with error status when operations fail
- Missing headers or context don't cause failures
- Original
amqp091-goerrors are preserved and returned - Instrumentation errors are recorded but don't interrupt message flow
- Minimal overhead: ~1-2μs per operation
- Headers are only modified when necessary
- Lazy initialization of OpenTelemetry components
- No goroutine leaks or memory retention
Run tests with RabbitMQ:
# Start RabbitMQ container
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# Run tests
go test ./...
# Run integration tests
go test -tags=integration ./...See the examples directory for complete working examples:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
This project is licensed under the MIT License - see the LICENSE file for details.
