Skip to content
This repository was archived by the owner on Apr 17, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,21 @@ new KinesisWritable({

### Events

* `error`: Emitted every time records are failed to be written.
| Name | Description | Parameters |
|---|---|---|
| `error`| Emitted every time records are failed to be written after all retries fail | `err` will contain the latest retry error. Failed encoded records will be in `err.records` |
| `error`| Emitted every time records succeed to push to the kinesis stream. | `records` will contain `records.FailedRecordCount:0` and an array of objects on `records.Records` containing each `SequenceNumber:` and `ShardId` |

```javascript
new KinesisWritable({
region: 'AWS_REGION',
streamName: 'MyKinesisStream'
}).on('error', (err) => {
console.log(`Failed to push ${err.records}`)
}).on('success', (records) => {
console.log(`Succesfully pushed ${err.length} records`)
})
```
**Note**: Amazon Credentials are not required. It will either use the environment variables, `~/.aws/credentials` or roles as every other aws sdk.

## Issue Reporting
Expand Down
9 changes: 8 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ KinesisStream.prototype.dispatch = function(records, cb) {
});

operation.attempt(() => {
this.putRecords(formattedRecords, (err) => {
this.putRecords(formattedRecords, (err, data) => {
if (!err) {
this.emitRecordSuccess(data);
}
if (operation.retry(err)) {
return;
}
Expand Down Expand Up @@ -170,4 +173,8 @@ KinesisStream.prototype.emitRecordError = function (err, records) {
this.emit('error', err);
};

KinesisStream.prototype.emitRecordSuccess = function (records) {
this.emit('success', records);
};

module.exports = KinesisStream;
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "aws-kinesis-writable",
"description": "A stream implementation for kinesis.",
"version": "4.3.0",
"version": "4.2.5",
"author": "José F. Romaniello <jfromaniello@gmail.com> (http://joseoncode.com)",
"license": "MIT",
"repository": {
Expand Down
27 changes: 23 additions & 4 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ describe('KinesisStream', function() {
});
it('should return immediately if no messages are provided', function(done) {
kinesis.putRecords = sinon.spy();
ks.dispatch([], function() {
ks.dispatch([], function(err) {
expect(err).to.be.null;
expect(kinesis.putRecords.calledOnce).to.be.false;
done();
});
Expand All @@ -125,7 +126,6 @@ describe('KinesisStream', function() {

ks.on('error', function(err) {
expect(err).to.exist;
expect(err.records.length).to.equal(2);
expect(stub.calledThrice).to.be.true;
});

Expand All @@ -138,16 +138,35 @@ describe('KinesisStream', function() {
it('should putRecords without error when record contains circular references', function() {
const logMessage = (s) => { return s.getCall(0).args[0][0].Data; };
ks.putRecords = sinon.spy();

const message = {
hi: 'hello'
};
message.message = message;

ks.dispatch([message]);

expect(ks.putRecords.calledOnce).to.be.true;
expect(ks.putRecords.calledOnce).to.be.true
expect('{"hi":"hello","message":"[Circular]"}').to.equal(logMessage(ks.putRecords));

});

it('should invoke success when the records succeed to push', function(done) {
const stub = sinon.stub(ks, 'putRecords').callsFake(function(records, cb) {
return cb(null, records);
});

const message = {
hi: 'hello'
};
message.message = message;

ks.on('success', function(records) {
expect(records).to.exist;
expect(records.length).to.equal(1);
done();
});

ks.dispatch([message]);
});
});
});