Skip to content
Chenhan Shen edited this page Aug 23, 2015 · 1 revision
'use strict';

var
	_ = require('lodash'),
	precond = require('precond'),
	rx = require('rx'),
	AWS = require('aws-sdk');

/**
 * Observable to receive message from a given sqs queue.
 * - create queue if it doesn't exist
 *
 * @param {Object} opts - options
 * @param {AWS.SQS} opts.sqs - the instance of new AWS.SQS()
 * @param {String} opts.queueName - name of the sqs queue
 * @param {Object} opts.createAttributes - attributes used to create the queue
 * @param {Object} opts.receiveParams - parameters used to receive message
 *
 * @return {Observable} a sequence of messages received
 */
module.exports = (opts) => {
	precond.checkArgument(opts);
	precond.checkArgument(opts.sqs instanceof AWS.SQS);
	precond.checkArgument(opts.queueName);

	let
		sqs = rx.Observable.fromNodeCallbackAll(opts.sqs);

	return sqs
		.createQueue({
			QueueName: opts.queueName,
			Attributes: _.assign(
				{
					MessageRetentionPeriod: '1209600',
					VisibilityTimeout: '60'
				},
				opts.createAttributes || {}
			)
		})
		.pluck('QueueUrl')
		.concatMap(
			(queueUrl) => rx
				.Observable
				.create((observer) => {
					let
						// receiver
						rcvr = () => {
							let msgSrc = sqs.receiveMessage(
								_.assign(
									opts.receiveParams || {},
									{
										QueueUrl: queueUrl,
										MaxNumberOfMessages: '1',
										WaitTimeSeconds: '20'
									}
								)
							);

							msgSrc.subscribe(
								(resp) => {
									resp.Messages
										&& resp.Messages.length
										&& _.map(
											resp.Messages,
											(msg) => {
												observer.onNext({
													value: msg,

													delete: () => sqs.deleteMessage({
														QueueUrl: queueUrl,
														ReceiptHandle: msg.ReceiptHandle
													})
												});
											}
										);
								},

								(err) => observer.onError(err),

								// recursive case
								() => rcvr()
							);
						};

					rcvr();
				})
		);
};

Clone this wiki locally