- /*
- * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
- *
- * Copyright (c) 2016 Blizzard Entertainment
- *
- * This software may be modified and distributed under the terms
- * of the MIT license. See the LICENSE.txt file for details.
- */
- var ProducerStream = require('../lib/producer-stream');
- var t = require('assert');
- var Readable = require('stream').Readable;
- var Emitter = require('events');
- var fakeClient;
- module.exports = {
- 'ProducerStream stream': {
- 'beforeEach': function() {
- fakeClient = new Emitter();
- fakeClient._isConnected = true;
- fakeClient._isConnecting = false;
- fakeClient.isConnected = function() {
- return true;
- };
- fakeClient.connect = function(opts, cb) {
- setImmediate(function() {
- this.emit('ready');
- }.bind(this));
- return this;
- };
- fakeClient.disconnect = function(cb) {
- setImmediate(function() {
- this.emit('disconnected');
- }.bind(this));
- return this;
- };
- fakeClient.poll = function() {
- return this;
- };
- fakeClient.setPollInterval = function() {
- return this;
- };
- },
- 'exports a stream class': function() {
- t.equal(typeof(ProducerStream), 'function');
- },
- 'in buffer mode': {
- 'requires a topic be provided when running in buffer mode': function() {
- t.throws(function() {
- var x = new ProducerStream(fakeClient, {});
- });
- },
- 'can be instantiated': function() {
- t.equal(typeof new ProducerStream(fakeClient, {
- topic: 'topic'
- }), 'object');
- },
- 'does not run connect if the client is already connected': function(cb) {
- fakeClient.connect = function() {
- t.fail('Should not run connect if the client is already connected');
- };
- var stream = new ProducerStream(fakeClient, {
- topic: 'topic'
- });
- setTimeout(cb, 10);
- },
- 'does run connect if the client is not already connected': function(cb) {
- fakeClient._isConnected = false;
- fakeClient.isConnected = function() {
- return false;
- };
- fakeClient.once('ready', cb);
- var stream = new ProducerStream(fakeClient, {
- topic: 'topic'
- });
- },
- 'forwards connectOptions to client options when provided': function(cb) {
- var testClientOptions = { timeout: 3000 };
- fakeClient._isConnected = false;
- fakeClient.isConnected = function() {
- return false;
- };
- var fakeConnect = fakeClient.connect;
- fakeClient.connect = function(opts, callback) {
- t.deepEqual(opts, testClientOptions);
- cb();
- };
- var stream = new ProducerStream(fakeClient, {
- topic: 'topic',
- connectOptions: testClientOptions
- });
- },
- 'automatically disconnects when autoclose is not provided': function(cb) {
- fakeClient.once('disconnected', cb);
- var stream = new ProducerStream(fakeClient, {
- topic: 'topic'
- });
- stream.end();
- },
- 'does not automatically disconnect when autoclose is set to false': function(done) {
- fakeClient.once('disconnected', function() {
- t.fail('Should not run disconnect');
- });
- var stream = new ProducerStream(fakeClient, {
- topic: 'topic',
- autoClose: false
- });
- stream.end();
- setTimeout(done, 10);
- },
- 'properly reads off the fake client': function(done) {
- var message;
- fakeClient.produce = function(topic, partition, message, key) {
- t.equal('topic', topic);
- t.equal(message.toString(), 'Awesome');
- t.equal(Buffer.isBuffer(message), true);
- done();
- };
- var stream = new ProducerStream(fakeClient, {
- topic: 'topic'
- });
- stream.on('error', function(err) {
- t.fail(err);
- });
- stream.write(Buffer.from('Awesome'));
- },
- 'passes a topic string if options are not provided': function(done) {
- var message;
- fakeClient.produce = function(topic, partition, message, key) {
- t.equal('topic', topic);
- t.equal(message.toString(), 'Awesome');
- t.equal(Buffer.isBuffer(message), true);
- done();
- };
- var stream = new ProducerStream(fakeClient, {
- topic: 'topic'
- });
- stream.on('error', function(err) {
- t.fail(err);
- });
- stream.write(Buffer.from('Awesome'));
- },
- 'properly handles queue errors': function(done) {
- var message;
- var first = true;
- fakeClient.produce = function(topic, partition, message, key) {
- t.equal('topic', topic);
- t.equal(message.toString(), 'Awesome');
- t.equal(Buffer.isBuffer(message), true);
- if (first) {
- first = false;
- var err = new Error('Queue full');
- err.code = -184;
- throw err;
- } else {
- done();
- }
- };
- var stream = new ProducerStream(fakeClient, {
- topic: 'topic'
- });
- stream.on('error', function(err) {
- t.fail(err);
- });
- stream.write(Buffer.from('Awesome'));
- },
- 'errors out when a non-queue related error occurs': function(done) {
- fakeClient.produce = function(topic, partition, message, key) {
- var err = new Error('ERR_MSG_SIZE_TOO_LARGE ');
- err.code = 10;
- throw err;
- };
- fakeClient.on('disconnected', function() {
- done();
- });
- var stream = new ProducerStream(fakeClient, {
- topic: 'topic'
- });
- stream.on('error', function(err) {
- t.equal(err.code, 10, 'Error was unexpected');
- // This is good
- });
- stream.write(Buffer.from('Awesome'));
- },
- 'errors out when a non-queue related error occurs but does not disconnect if autoclose is false': function(done) {
- fakeClient.produce = function(topic, partition, message, key) {
- var err = new Error('ERR_MSG_SIZE_TOO_LARGE ');
- err.code = 10;
- throw err;
- };
- fakeClient.on('disconnected', function() {
- t.fail('Should not try to disconnect');
- });
- var stream = new ProducerStream(fakeClient, {
- topic: 'topic',
- autoClose: false
- });
- stream.on('error', function(err) {
- t.equal(err.code, 10, 'Error was unexpected');
- // This is good
- });
- stream.write(Buffer.from('Awesome'));
- setTimeout(done, 10);
- },
- 'properly reads more than one message in order': function(done) {
- var message;
- var currentMessage = 0;
- fakeClient.produce = function(topic, partition, message, key) {
- currentMessage++;
- t.equal('topic', topic);
- t.equal(message.toString(), 'Awesome' + currentMessage);
- t.equal(Buffer.isBuffer(message), true);
- if (currentMessage === 2) {
- done();
- }
- };
- var stream = new ProducerStream(fakeClient, {
- topic: 'topic'
- });
- stream.on('error', function(err) {
- t.fail(err);
- });
- stream.write(Buffer.from('Awesome1'));
- stream.write(Buffer.from('Awesome2'));
- },
- 'can be piped into a readable': function(done) {
- var message;
- var currentMessage = 0;
- var iteration = 0;
- var readable = new Readable({
- read: function(size) {
- iteration++;
- if (iteration > 1) {
- } else {
- this.push('Awesome1');
- this.push('Awesome2');
- }
- }
- });
- fakeClient.produce = function(topic, partition, message, key) {
- currentMessage++;
- t.equal('topic', topic);
- t.equal(message.toString(), 'Awesome' + currentMessage);
- t.equal(Buffer.isBuffer(message), true);
- if (currentMessage === 2) {
- done();
- }
- };
- var stream = new ProducerStream(fakeClient, {
- topic: 'topic'
- });
- stream.on('error', function(err) {
- t.fail(err);
- });
- readable.pipe(stream);
- },
- 'can drain buffered chunks': function(done) {
- var message;
- var currentMessage = 0;
- fakeClient.produce = function(topic, partition, message, key) {
- currentMessage++;
- t.equal('topic', topic);
- t.equal(message.toString(), 'Awesome' + currentMessage);
- t.equal(Buffer.isBuffer(message), true);
- if (currentMessage === 3) {
- done();
- }
- };
- var stream = new ProducerStream(fakeClient, {
- topic: 'topic'
- });
- stream.on('error', function(err) {
- t.fail(err);
- });
- fakeClient._isConnected = false;
- fakeClient._isConnecting = true;
- fakeClient.isConnected = function() {
- return false;
- };
- stream.write(Buffer.from('Awesome1'));
- stream.write(Buffer.from('Awesome2'));
- stream.write(Buffer.from('Awesome3'));
- fakeClient._isConnected = true;
- fakeClient._isConnecting = false;
- fakeClient.isConnected = function() {
- return true;
- };
- fakeClient.connect();
- },
- },
- 'in objectMode': {
- 'can be instantiated': function() {
- t.equal(typeof new ProducerStream(fakeClient, {
- objectMode: true
- }), 'object');
- },
- 'properly produces message objects': function(done) {
- var _timestamp = Date.now();
- var _opaque = {
- foo: 'bar'
- };
- var _headers = {
- header: 'header value'
- };
- fakeClient.produce = function(topic, partition, message, key, timestamp, opaque, headers) {
- t.equal('topic', topic);
- t.equal(message.toString(), 'Awesome');
- t.equal(Buffer.isBuffer(message), true);
- t.equal(partition, 10);
- t.equal(key, 'key');
- t.deepEqual(_opaque, opaque);
- t.deepEqual(_timestamp, timestamp);
- t.deepEqual(_headers, headers);
- done();
- };
- var stream = new ProducerStream(fakeClient, {
- objectMode: true
- });
- stream.on('error', function(err) {
- t.fail(err);
- });
- stream.write({
- topic: 'topic',
- value: Buffer.from('Awesome'),
- partition: 10,
- key: 'key',
- timestamp: _timestamp,
- opaque: _opaque,
- headers: _headers
- });
- },
- 'properly handles queue errors': function(done) {
- var message;
- var first = true;
- fakeClient.produce = function(topic, partition, message, key) {
- t.equal('topic', topic);
- t.equal(message.toString(), 'Awesome');
- t.equal(Buffer.isBuffer(message), true);
- t.equal(partition, 10);
- t.equal(key, 'key');
- if (first) {
- first = false;
- var err = new Error('Queue full');
- err.code = -184;
- throw err;
- } else {
- done();
- }
- };
- var stream = new ProducerStream(fakeClient, {
- objectMode: true
- });
- stream.on('error', function(err) {
- t.fail(err);
- });
- stream.write({
- topic: 'topic',
- value: Buffer.from('Awesome'),
- partition: 10,
- key: 'key'
- });
- },
- 'errors out when a non-queue related error occurs': function(done) {
- fakeClient.produce = function(topic, partition, message, key) {
- var err = new Error('ERR_MSG_SIZE_TOO_LARGE ');
- err.code = 10;
- throw err;
- };
- fakeClient.on('disconnected', function() {
- done();
- });
- var stream = new ProducerStream(fakeClient, {
- objectMode: true
- });
- stream.on('error', function(err) {
- t.equal(err.code, 10, 'Error was unexpected');
- // This is good
- });
- stream.write(Buffer.from('Awesome'));
- },
- 'errors out when a non-queue related error occurs but does not disconnect if autoclose is false': function(done) {
- fakeClient.produce = function(topic, partition, message, key) {
- var err = new Error('ERR_MSG_SIZE_TOO_LARGE ');
- err.code = 10;
- throw err;
- };
- fakeClient.on('disconnected', function() {
- t.fail('Should not try to disconnect');
- });
- var stream = new ProducerStream(fakeClient, {
- objectMode: true,
- autoClose: false
- });
- stream.on('error', function(err) {
- t.equal(err.code, 10, 'Error was unexpected');
- // This is good
- });
- stream.write({
- value: Buffer.from('Awesome'),
- topic: 'topic'
- });
- setTimeout(done, 10);
- },
- 'properly reads more than one message in order': function(done) {
- var message;
- var currentMessage = 0;
- fakeClient.produce = function(topic, partition, message, key) {
- currentMessage++;
- t.equal('topic', topic);
- t.equal(message.toString(), 'Awesome' + currentMessage);
- t.equal(Buffer.isBuffer(message), true);
- if (currentMessage === 2) {
- done();
- }
- };
- var stream = new ProducerStream(fakeClient, {
- objectMode: true
- });
- stream.on('error', function(err) {
- t.fail(err);
- });
- stream.write({
- value: Buffer.from('Awesome1'),
- topic: 'topic'
- });
- stream.write({
- value: Buffer.from('Awesome2'),
- topic: 'topic'
- });
- },
- 'can be piped into a readable': function(done) {
- var message;
- var currentMessage = 0;
- var iteration = 0;
- var readable = new Readable({
- objectMode: true,
- read: function(size) {
- iteration++;
- if (iteration > 1) {
- } else {
- this.push({
- topic: 'topic',
- value: Buffer.from('Awesome1')
- });
- this.push({
- topic: 'topic',
- value: Buffer.from('Awesome2')
- });
- }
- }
- });
- fakeClient.produce = function(topic, partition, message, key) {
- currentMessage++;
- t.equal('topic', topic);
- t.equal(message.toString(), 'Awesome' + currentMessage);
- t.equal(Buffer.isBuffer(message), true);
- if (currentMessage === 2) {
- done();
- }
- };
- var stream = new ProducerStream(fakeClient, {
- objectMode: true
- });
- stream.on('error', function(err) {
- t.fail(err);
- });
- readable.pipe(stream);
- },
- 'can drain buffered messages': function(done) {
- var message;
- var currentMessage = 0;
- fakeClient.produce = function(topic, partition, message, key) {
- currentMessage++;
- t.equal('topic', topic);
- t.equal(message.toString(), 'Awesome' + currentMessage);
- t.equal(Buffer.isBuffer(message), true);
- if (currentMessage === 3) {
- done();
- }
- };
- var stream = new ProducerStream(fakeClient, {
- objectMode: true
- });
- stream.on('error', function(err) {
- t.fail(err);
- });
- fakeClient._isConnected = false;
- fakeClient._isConnecting = true;
- fakeClient.isConnected = function() {
- return false;
- };
- stream.write({
- value: Buffer.from('Awesome1'),
- topic: 'topic'
- });
- stream.write({
- value: Buffer.from('Awesome2'),
- topic: 'topic'
- });
- stream.write({
- value: Buffer.from('Awesome3'),
- topic: 'topic'
- });
- fakeClient._isConnected = true;
- fakeClient._isConnecting = false;
- fakeClient.isConnected = function() {
- return true;
- };
- fakeClient.connect();
- },
- 'properly handles queue errors while draining': function(done) {
- var message;
- var currentMessage = 0;
- fakeClient.produce = function(topic, partition, message, key) {
- currentMessage++;
- if (currentMessage === 3) {
- var err = new Error('Queue full');
- err.code = -184;
- throw err;
- } else if (currentMessage === 4) {
- done();
- }
- };
- var stream = new ProducerStream(fakeClient, {
- objectMode: true
- });
- stream.on('error', function(err) {
- t.fail(err);
- });
- fakeClient._isConnected = false;
- fakeClient._isConnecting = true;
- fakeClient.isConnected = function() {
- return false;
- };
- stream.write({
- value: Buffer.from('Awesome1'),
- topic: 'topic'
- });
- stream.write({
- value: Buffer.from('Awesome2'),
- topic: 'topic'
- });
- stream.write({
- value: Buffer.from('Awesome3'),
- topic: 'topic'
- });
- stream.write({
- value: Buffer.from('Awesome4'),
- topic: 'topic'
- });
- fakeClient._isConnected = true;
- fakeClient._isConnecting = false;
- fakeClient.isConnected = function() {
- return true;
- };
- fakeClient.connect();
- },
- 'errors out for non-queue related errors while draining': function (done) {
- var currentMessage = 0;
- fakeClient.produce = function(topic, partition, message, key) {
- currentMessage++;
- if (currentMessage === 3) {
- var err = new Error('ERR_MSG_SIZE_TOO_LARGE ');
- err.code = 10;
- throw err;
- }
- };
- fakeClient.on('disconnected', function() {
- done();
- });
- var stream = new ProducerStream(fakeClient, {
- objectMode: true
- });
- stream.on('error', function(err) {
- t.equal(err.code, 10, 'Error was unexpected');
- // This is good
- });
- fakeClient._isConnected = false;
- fakeClient._isConnecting = true;
- fakeClient.isConnected = function() {
- return false;
- };
- stream.write({
- value: Buffer.from('Awesome1'),
- topic: 'topic'
- });
- stream.write({
- value: Buffer.from('Awesome2'),
- topic: 'topic'
- });
- stream.write({
- value: Buffer.from('Awesome3'),
- topic: 'topic'
- });
- stream.write({
- value: Buffer.from('Awesome4'),
- topic: 'topic'
- });
- fakeClient._isConnected = true;
- fakeClient._isConnecting = false;
- fakeClient.isConnected = function() {
- return true;
- };
- fakeClient.connect();
- },
- }
- }
- };