You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

723 lines
19 KiB

/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
var ProducerStream = require('../lib/producer-stream');
var t = require('assert');
var Readable = require('stream').Readable;
var Emitter = require('events');
var fakeClient;
module.exports = {
'ProducerStream stream': {
'beforeEach': function() {
fakeClient = new Emitter();
fakeClient._isConnected = true;
fakeClient._isConnecting = false;
fakeClient.isConnected = function() {
return true;
};
fakeClient.connect = function(opts, cb) {
setImmediate(function() {
this.emit('ready');
}.bind(this));
return this;
};
fakeClient.disconnect = function(cb) {
setImmediate(function() {
this.emit('disconnected');
}.bind(this));
return this;
};
fakeClient.poll = function() {
return this;
};
fakeClient.setPollInterval = function() {
return this;
};
},
'exports a stream class': function() {
t.equal(typeof(ProducerStream), 'function');
},
'in buffer mode': {
'requires a topic be provided when running in buffer mode': function() {
t.throws(function() {
var x = new ProducerStream(fakeClient, {});
});
},
'can be instantiated': function() {
t.equal(typeof new ProducerStream(fakeClient, {
topic: 'topic'
}), 'object');
},
'does not run connect if the client is already connected': function(cb) {
fakeClient.connect = function() {
t.fail('Should not run connect if the client is already connected');
};
var stream = new ProducerStream(fakeClient, {
topic: 'topic'
});
setTimeout(cb, 10);
},
'does run connect if the client is not already connected': function(cb) {
fakeClient._isConnected = false;
fakeClient.isConnected = function() {
return false;
};
fakeClient.once('ready', cb);
var stream = new ProducerStream(fakeClient, {
topic: 'topic'
});
},
'forwards connectOptions to client options when provided': function(cb) {
var testClientOptions = { timeout: 3000 };
fakeClient._isConnected = false;
fakeClient.isConnected = function() {
return false;
};
var fakeConnect = fakeClient.connect;
fakeClient.connect = function(opts, callback) {
t.deepEqual(opts, testClientOptions);
cb();
};
var stream = new ProducerStream(fakeClient, {
topic: 'topic',
connectOptions: testClientOptions
});
},
'automatically disconnects when autoclose is not provided': function(cb) {
fakeClient.once('disconnected', cb);
var stream = new ProducerStream(fakeClient, {
topic: 'topic'
});
stream.end();
},
'does not automatically disconnect when autoclose is set to false': function(done) {
fakeClient.once('disconnected', function() {
t.fail('Should not run disconnect');
});
var stream = new ProducerStream(fakeClient, {
topic: 'topic',
autoClose: false
});
stream.end();
setTimeout(done, 10);
},
'properly reads off the fake client': function(done) {
var message;
fakeClient.produce = function(topic, partition, message, key) {
t.equal('topic', topic);
t.equal(message.toString(), 'Awesome');
t.equal(Buffer.isBuffer(message), true);
done();
};
var stream = new ProducerStream(fakeClient, {
topic: 'topic'
});
stream.on('error', function(err) {
t.fail(err);
});
stream.write(Buffer.from('Awesome'));
},
'passes a topic string if options are not provided': function(done) {
var message;
fakeClient.produce = function(topic, partition, message, key) {
t.equal('topic', topic);
t.equal(message.toString(), 'Awesome');
t.equal(Buffer.isBuffer(message), true);
done();
};
var stream = new ProducerStream(fakeClient, {
topic: 'topic'
});
stream.on('error', function(err) {
t.fail(err);
});
stream.write(Buffer.from('Awesome'));
},
'properly handles queue errors': function(done) {
var message;
var first = true;
fakeClient.produce = function(topic, partition, message, key) {
t.equal('topic', topic);
t.equal(message.toString(), 'Awesome');
t.equal(Buffer.isBuffer(message), true);
if (first) {
first = false;
var err = new Error('Queue full');
err.code = -184;
throw err;
} else {
done();
}
};
var stream = new ProducerStream(fakeClient, {
topic: 'topic'
});
stream.on('error', function(err) {
t.fail(err);
});
stream.write(Buffer.from('Awesome'));
},
'errors out when a non-queue related error occurs': function(done) {
fakeClient.produce = function(topic, partition, message, key) {
var err = new Error('ERR_MSG_SIZE_TOO_LARGE ');
err.code = 10;
throw err;
};
fakeClient.on('disconnected', function() {
done();
});
var stream = new ProducerStream(fakeClient, {
topic: 'topic'
});
stream.on('error', function(err) {
t.equal(err.code, 10, 'Error was unexpected');
// This is good
});
stream.write(Buffer.from('Awesome'));
},
'errors out when a non-queue related error occurs but does not disconnect if autoclose is false': function(done) {
fakeClient.produce = function(topic, partition, message, key) {
var err = new Error('ERR_MSG_SIZE_TOO_LARGE ');
err.code = 10;
throw err;
};
fakeClient.on('disconnected', function() {
t.fail('Should not try to disconnect');
});
var stream = new ProducerStream(fakeClient, {
topic: 'topic',
autoClose: false
});
stream.on('error', function(err) {
t.equal(err.code, 10, 'Error was unexpected');
// This is good
});
stream.write(Buffer.from('Awesome'));
setTimeout(done, 10);
},
'properly reads more than one message in order': function(done) {
var message;
var currentMessage = 0;
fakeClient.produce = function(topic, partition, message, key) {
currentMessage++;
t.equal('topic', topic);
t.equal(message.toString(), 'Awesome' + currentMessage);
t.equal(Buffer.isBuffer(message), true);
if (currentMessage === 2) {
done();
}
};
var stream = new ProducerStream(fakeClient, {
topic: 'topic'
});
stream.on('error', function(err) {
t.fail(err);
});
stream.write(Buffer.from('Awesome1'));
stream.write(Buffer.from('Awesome2'));
},
'can be piped into a readable': function(done) {
var message;
var currentMessage = 0;
var iteration = 0;
var readable = new Readable({
read: function(size) {
iteration++;
if (iteration > 1) {
} else {
this.push('Awesome1');
this.push('Awesome2');
}
}
});
fakeClient.produce = function(topic, partition, message, key) {
currentMessage++;
t.equal('topic', topic);
t.equal(message.toString(), 'Awesome' + currentMessage);
t.equal(Buffer.isBuffer(message), true);
if (currentMessage === 2) {
done();
}
};
var stream = new ProducerStream(fakeClient, {
topic: 'topic'
});
stream.on('error', function(err) {
t.fail(err);
});
readable.pipe(stream);
},
'can drain buffered chunks': function(done) {
var message;
var currentMessage = 0;
fakeClient.produce = function(topic, partition, message, key) {
currentMessage++;
t.equal('topic', topic);
t.equal(message.toString(), 'Awesome' + currentMessage);
t.equal(Buffer.isBuffer(message), true);
if (currentMessage === 3) {
done();
}
};
var stream = new ProducerStream(fakeClient, {
topic: 'topic'
});
stream.on('error', function(err) {
t.fail(err);
});
fakeClient._isConnected = false;
fakeClient._isConnecting = true;
fakeClient.isConnected = function() {
return false;
};
stream.write(Buffer.from('Awesome1'));
stream.write(Buffer.from('Awesome2'));
stream.write(Buffer.from('Awesome3'));
fakeClient._isConnected = true;
fakeClient._isConnecting = false;
fakeClient.isConnected = function() {
return true;
};
fakeClient.connect();
},
},
'in objectMode': {
'can be instantiated': function() {
t.equal(typeof new ProducerStream(fakeClient, {
objectMode: true
}), 'object');
},
'properly produces message objects': function(done) {
var _timestamp = Date.now();
var _opaque = {
foo: 'bar'
};
var _headers = {
header: 'header value'
};
fakeClient.produce = function(topic, partition, message, key, timestamp, opaque, headers) {
t.equal('topic', topic);
t.equal(message.toString(), 'Awesome');
t.equal(Buffer.isBuffer(message), true);
t.equal(partition, 10);
t.equal(key, 'key');
t.deepEqual(_opaque, opaque);
t.deepEqual(_timestamp, timestamp);
t.deepEqual(_headers, headers);
done();
};
var stream = new ProducerStream(fakeClient, {
objectMode: true
});
stream.on('error', function(err) {
t.fail(err);
});
stream.write({
topic: 'topic',
value: Buffer.from('Awesome'),
partition: 10,
key: 'key',
timestamp: _timestamp,
opaque: _opaque,
headers: _headers
});
},
'properly handles queue errors': function(done) {
var message;
var first = true;
fakeClient.produce = function(topic, partition, message, key) {
t.equal('topic', topic);
t.equal(message.toString(), 'Awesome');
t.equal(Buffer.isBuffer(message), true);
t.equal(partition, 10);
t.equal(key, 'key');
if (first) {
first = false;
var err = new Error('Queue full');
err.code = -184;
throw err;
} else {
done();
}
};
var stream = new ProducerStream(fakeClient, {
objectMode: true
});
stream.on('error', function(err) {
t.fail(err);
});
stream.write({
topic: 'topic',
value: Buffer.from('Awesome'),
partition: 10,
key: 'key'
});
},
'errors out when a non-queue related error occurs': function(done) {
fakeClient.produce = function(topic, partition, message, key) {
var err = new Error('ERR_MSG_SIZE_TOO_LARGE ');
err.code = 10;
throw err;
};
fakeClient.on('disconnected', function() {
done();
});
var stream = new ProducerStream(fakeClient, {
objectMode: true
});
stream.on('error', function(err) {
t.equal(err.code, 10, 'Error was unexpected');
// This is good
});
stream.write(Buffer.from('Awesome'));
},
'errors out when a non-queue related error occurs but does not disconnect if autoclose is false': function(done) {
fakeClient.produce = function(topic, partition, message, key) {
var err = new Error('ERR_MSG_SIZE_TOO_LARGE ');
err.code = 10;
throw err;
};
fakeClient.on('disconnected', function() {
t.fail('Should not try to disconnect');
});
var stream = new ProducerStream(fakeClient, {
objectMode: true,
autoClose: false
});
stream.on('error', function(err) {
t.equal(err.code, 10, 'Error was unexpected');
// This is good
});
stream.write({
value: Buffer.from('Awesome'),
topic: 'topic'
});
setTimeout(done, 10);
},
'properly reads more than one message in order': function(done) {
var message;
var currentMessage = 0;
fakeClient.produce = function(topic, partition, message, key) {
currentMessage++;
t.equal('topic', topic);
t.equal(message.toString(), 'Awesome' + currentMessage);
t.equal(Buffer.isBuffer(message), true);
if (currentMessage === 2) {
done();
}
};
var stream = new ProducerStream(fakeClient, {
objectMode: true
});
stream.on('error', function(err) {
t.fail(err);
});
stream.write({
value: Buffer.from('Awesome1'),
topic: 'topic'
});
stream.write({
value: Buffer.from('Awesome2'),
topic: 'topic'
});
},
'can be piped into a readable': function(done) {
var message;
var currentMessage = 0;
var iteration = 0;
var readable = new Readable({
objectMode: true,
read: function(size) {
iteration++;
if (iteration > 1) {
} else {
this.push({
topic: 'topic',
value: Buffer.from('Awesome1')
});
this.push({
topic: 'topic',
value: Buffer.from('Awesome2')
});
}
}
});
fakeClient.produce = function(topic, partition, message, key) {
currentMessage++;
t.equal('topic', topic);
t.equal(message.toString(), 'Awesome' + currentMessage);
t.equal(Buffer.isBuffer(message), true);
if (currentMessage === 2) {
done();
}
};
var stream = new ProducerStream(fakeClient, {
objectMode: true
});
stream.on('error', function(err) {
t.fail(err);
});
readable.pipe(stream);
},
'can drain buffered messages': function(done) {
var message;
var currentMessage = 0;
fakeClient.produce = function(topic, partition, message, key) {
currentMessage++;
t.equal('topic', topic);
t.equal(message.toString(), 'Awesome' + currentMessage);
t.equal(Buffer.isBuffer(message), true);
if (currentMessage === 3) {
done();
}
};
var stream = new ProducerStream(fakeClient, {
objectMode: true
});
stream.on('error', function(err) {
t.fail(err);
});
fakeClient._isConnected = false;
fakeClient._isConnecting = true;
fakeClient.isConnected = function() {
return false;
};
stream.write({
value: Buffer.from('Awesome1'),
topic: 'topic'
});
stream.write({
value: Buffer.from('Awesome2'),
topic: 'topic'
});
stream.write({
value: Buffer.from('Awesome3'),
topic: 'topic'
});
fakeClient._isConnected = true;
fakeClient._isConnecting = false;
fakeClient.isConnected = function() {
return true;
};
fakeClient.connect();
},
'properly handles queue errors while draining': function(done) {
var message;
var currentMessage = 0;
fakeClient.produce = function(topic, partition, message, key) {
currentMessage++;
if (currentMessage === 3) {
var err = new Error('Queue full');
err.code = -184;
throw err;
} else if (currentMessage === 4) {
done();
}
};
var stream = new ProducerStream(fakeClient, {
objectMode: true
});
stream.on('error', function(err) {
t.fail(err);
});
fakeClient._isConnected = false;
fakeClient._isConnecting = true;
fakeClient.isConnected = function() {
return false;
};
stream.write({
value: Buffer.from('Awesome1'),
topic: 'topic'
});
stream.write({
value: Buffer.from('Awesome2'),
topic: 'topic'
});
stream.write({
value: Buffer.from('Awesome3'),
topic: 'topic'
});
stream.write({
value: Buffer.from('Awesome4'),
topic: 'topic'
});
fakeClient._isConnected = true;
fakeClient._isConnecting = false;
fakeClient.isConnected = function() {
return true;
};
fakeClient.connect();
},
'errors out for non-queue related errors while draining': function (done) {
var currentMessage = 0;
fakeClient.produce = function(topic, partition, message, key) {
currentMessage++;
if (currentMessage === 3) {
var err = new Error('ERR_MSG_SIZE_TOO_LARGE ');
err.code = 10;
throw err;
}
};
fakeClient.on('disconnected', function() {
done();
});
var stream = new ProducerStream(fakeClient, {
objectMode: true
});
stream.on('error', function(err) {
t.equal(err.code, 10, 'Error was unexpected');
// This is good
});
fakeClient._isConnected = false;
fakeClient._isConnecting = true;
fakeClient.isConnected = function() {
return false;
};
stream.write({
value: Buffer.from('Awesome1'),
topic: 'topic'
});
stream.write({
value: Buffer.from('Awesome2'),
topic: 'topic'
});
stream.write({
value: Buffer.from('Awesome3'),
topic: 'topic'
});
stream.write({
value: Buffer.from('Awesome4'),
topic: 'topic'
});
fakeClient._isConnected = true;
fakeClient._isConnecting = false;
fakeClient.isConnected = function() {
return true;
};
fakeClient.connect();
},
}
}
};