Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/broker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,5 +241,21 @@ describe('NodeAmqpBroker', () => {
expect((broker as any)._channelConsumers.get(newChannel)).to.include(consumer);
expect((broker as any)._channelConsumers.get(mockChannel)).to.be.undefined;
});

it('should skip rewiring if the channel does not start a consumer', async () => {
await broker.connect();
const consumer = await broker.consume('test-queue');
const newChannel = createMockChannel(6);
const oldChannel = consumer.channel;

// Simulate a channel that does not start a consumer
newChannel.consume.rejects(new Error('Consumer not started'));

broker.pool!.emit('channelReplaced', oldChannel, newChannel);
await new Promise(r => setImmediate(r)); // give dispatch loop a tick

sinon.assert.calledOnce(newChannel.consume);
expect((broker as any)._channelConsumers.get(newChannel)).to.be.empty;
});
});
});
66 changes: 38 additions & 28 deletions src/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,26 @@ export default class NodeAmqpBroker {
}
}

private async startConsumer(ch: Channel, consumer: Consumer) {
const res = await ch.consume(consumer.queue, (msg: ConsumeMessage | null) => {
if (msg) {
consumer.handleDelivery(msg);
} else {
consumer.handleCancel(true);
}
});

consumer.tag = res.consumerTag;
consumer.channel = ch;

// clear any previous cancel event handler, since this consumer might have
// been rewired to a new channel
consumer.removeAllListeners('cancel');
consumer.once('cancel', () => {
this._channelConsumers.get(ch)?.delete(consumer);
});
}

/**
* Recreates consumers on a new channel in the same way as the old one
* @todo properly handle errors, e.g. if the channel is closed when we try to consume (#67)
Expand All @@ -233,47 +253,37 @@ export default class NodeAmqpBroker {
if (!set || set.size < 1) return;

for (const consumer of set) {
// FIXME (#67): handle potential errors
const res = await newCh.consume(consumer.queue, (msg: ConsumeMessage | null) => {
if (msg && consumer) {
consumer.handleDelivery(msg);
}
});

consumer.tag = res.consumerTag;
consumer.channel = newCh;
consumer.resume();
try {
await this.startConsumer(newCh, consumer);
consumer.resume();
} catch (ex: any) {
set.delete(consumer);
debug('Failed to rewire consumer', { queue: consumer.queue, reason: ex });
}
}

this._channelConsumers.delete(ch);
this._channelConsumers.set(newCh, set);
}

private async _startConsumer(ch: Channel, queue: string): Promise<Consumer> {
private async createConsumer(ch: Channel, queue: string): Promise<Consumer> {
let consumer = new Consumer(ch, queue);
await this.startConsumer(ch, consumer);

return await ch.consume(queue, (msg: ConsumeMessage | null) => {
if (msg) {
consumer.handleDelivery(msg);
}
}).then((res: Replies.Consume) => {
consumer.tag = res.consumerTag;

if (!this._channelConsumers.has(ch)) {
this._channelConsumers.set(ch, new Set([consumer]));
} else {
const set = this._channelConsumers.get(ch);
set!.add(consumer);
}
if (!this._channelConsumers.has(ch)) {
this._channelConsumers.set(ch, new Set([consumer]));
} else {
const set = this._channelConsumers.get(ch);
set!.add(consumer);
}

return consumer;
});
return consumer;
}

public consume(queue: string): PromiseLike<Consumer> {
if (!this.pool) throw new PeanarAdapterError('Not connected!');

return this.pool.acquireAndRun(async ch => this._startConsumer(ch, queue));
return this.pool.acquireAndRun(async ch => this.createConsumer(ch, queue));
}

public consumeOver(queues: string[]) {
Expand All @@ -283,7 +293,7 @@ export default class NodeAmqpBroker {
return {
queue,
channel: ch,
consumer: await this._startConsumer(ch, queue)
consumer: await this.createConsumer(ch, queue)
};
});
}
Expand Down
30 changes: 29 additions & 1 deletion test/broker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { rejects } from 'assert';

import sinon from 'sinon';
import { expect } from 'chai';
import amqplib, { ChannelModel } from 'amqplib';
import amqplib, { Channel, ChannelModel } from 'amqplib';
import { IConnectionParams, IMessage } from '../src/types';

import { brokerOptions } from './config';
Expand Down Expand Up @@ -403,6 +403,34 @@ describe('Broker', () => {
expect(consumerCount).to.be.eq(3);
await Promise.all(consumers.map(c => c.consumer.cancel()));
});

it('handles client cancellation', async function() {
const consumer = await broker.consume(q1);
const canceledPromise = once(consumer, 'cancel');

// Cancel the consumer
await consumer.cancel();

const [{ server }] = await canceledPromise;
expect(server).to.be.false;

expect(broker.channelConsumers.size).to.eq(1);
expect(broker.channelConsumers.get(consumer.channel as Channel)).to.be.empty;
});

it('handles server cancellation', async function() {
const consumer = await broker.consume(q1);
const canceledPromise = once(consumer, 'cancel');

// Cause a server-initiated cancellation
await broker.pool!.acquireAndRun(ch => ch.deleteQueue(q1));

const [{ server }] = await canceledPromise;
expect(server).to.be.true;

expect(broker.channelConsumers.size).to.eq(1);
expect(broker.channelConsumers.get(consumer.channel as Channel)).to.be.empty;
});
});

describe('Error handling', function() {
Expand Down