/*
|
|
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
|
|
*
|
|
* Copyright (c) 2016 Blizzard Entertainment
|
|
*
|
|
* This software may be modified and distributed under the terms
|
|
* of the MIT license. See the LICENSE.txt file for details.
|
|
*/
|
|
|
|
var HighLevelProducer = require('../../lib/producer/high-level-producer');
|
|
var t = require('assert');
|
|
var Promise = require('bluebird');
|
|
// var Mock = require('./mock');
|
|
|
|
var client;
|
|
var defaultConfig = {
|
|
'client.id': 'kafka-mocha',
|
|
'metadata.broker.list': 'localhost:9092',
|
|
'socket.timeout.ms': 250
|
|
};
|
|
var topicConfig = {};
|
|
|
|
var server;
|
|
|
|
module.exports = {
|
|
'High Level Producer client': {
|
|
'beforeEach': function() {
|
|
client = new HighLevelProducer(defaultConfig, topicConfig);
|
|
},
|
|
'afterEach': function() {
|
|
client = null;
|
|
},
|
|
'is an object': function() {
|
|
t.equal(typeof(client), 'object');
|
|
},
|
|
'requires configuration': function() {
|
|
t.throws(function() {
|
|
return new HighLevelProducer();
|
|
});
|
|
},
|
|
'has necessary methods from superclass': function() {
|
|
var methods = ['_oldProduce'];
|
|
methods.forEach(function(m) {
|
|
t.equal(typeof(client[m]), 'function', 'Client is missing ' + m + ' method');
|
|
});
|
|
},
|
|
'does not modify config and clones it': function () {
|
|
t.deepStrictEqual(defaultConfig, {
|
|
'client.id': 'kafka-mocha',
|
|
'metadata.broker.list': 'localhost:9092',
|
|
'socket.timeout.ms': 250
|
|
});
|
|
t.deepStrictEqual(client.globalConfig, {
|
|
'client.id': 'kafka-mocha',
|
|
'metadata.broker.list': 'localhost:9092',
|
|
'socket.timeout.ms': 250
|
|
});
|
|
t.notEqual(defaultConfig, client.globalConfig);
|
|
},
|
|
'does not modify topic config and clones it': function () {
|
|
t.deepStrictEqual(topicConfig, {});
|
|
t.deepStrictEqual(client.topicConfig, {});
|
|
t.notEqual(topicConfig, client.topicConfig);
|
|
},
|
|
'produce method': {
|
|
'headers support': function(next) {
|
|
var v = 'foo';
|
|
var k = 'key';
|
|
var h = [
|
|
{ key1: "value1A" },
|
|
{ key1: "value1B" },
|
|
{ key2: "value2" },
|
|
{ key1: "value1C" },
|
|
];
|
|
var jsonH = JSON.stringify(h);
|
|
|
|
client._oldProduce = function(topic, partition, value, key, timestamp, opaque, headers) {
|
|
t.equal(value, 'foo');
|
|
t.equal(key, 'key');
|
|
t.equal(JSON.stringify(headers), jsonH);
|
|
next();
|
|
};
|
|
|
|
client.produce('tawpic', 0, v, k, null, h, function() {
|
|
|
|
});
|
|
},
|
|
|
|
'can use a custom serializer': function(next) {
|
|
var v = {
|
|
disparaging: 'hyena',
|
|
};
|
|
|
|
var k = {
|
|
delicious: 'cookie',
|
|
};
|
|
|
|
var valueSerializerCalled = false;
|
|
var keySerializerCalled = false;
|
|
|
|
client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
|
|
t.equal(valueSerializerCalled, true);
|
|
t.equal(keySerializerCalled, true);
|
|
t.deepEqual(v, Buffer.from('foo'));
|
|
t.equal(k, 'key');
|
|
next();
|
|
};
|
|
|
|
client.setValueSerializer(function(_) {
|
|
valueSerializerCalled = true;
|
|
t.deepEqual(_, v);
|
|
return Buffer.from('foo');
|
|
});
|
|
|
|
client.setKeySerializer(function(_) {
|
|
keySerializerCalled = true;
|
|
t.deepEqual(_, k);
|
|
return 'key';
|
|
});
|
|
|
|
client.produce('tawpic', 0, v, k, null, function() {
|
|
|
|
});
|
|
},
|
|
|
|
'can use a value asynchronous custom serializer': function(next) {
|
|
var v = {
|
|
disparaging: 'hyena',
|
|
};
|
|
|
|
var k = {
|
|
delicious: 'cookie',
|
|
};
|
|
|
|
var valueSerializerCalled = false;
|
|
var keySerializerCalled = false;
|
|
|
|
client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
|
|
t.equal(valueSerializerCalled, true);
|
|
t.equal(keySerializerCalled, true);
|
|
t.deepEqual(v, Buffer.from('foo'));
|
|
t.equal(k, 'key');
|
|
next();
|
|
};
|
|
|
|
client.setValueSerializer(function(_, cb) {
|
|
valueSerializerCalled = true;
|
|
t.deepEqual(_, v);
|
|
setImmediate(function() {
|
|
cb(null, Buffer.from('foo'));
|
|
});
|
|
});
|
|
|
|
client.setKeySerializer(function(_) {
|
|
keySerializerCalled = true;
|
|
t.deepEqual(_, k);
|
|
return 'key';
|
|
});
|
|
|
|
client.produce('tawpic', 0, v, k, null, function() {
|
|
|
|
});
|
|
},
|
|
|
|
'can use a key asynchronous custom serializer': function(next) {
|
|
var v = {
|
|
disparaging: 'hyena',
|
|
};
|
|
|
|
var k = {
|
|
delicious: 'cookie',
|
|
};
|
|
|
|
var valueSerializerCalled = false;
|
|
var keySerializerCalled = false;
|
|
|
|
client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
|
|
t.equal(valueSerializerCalled, true);
|
|
t.equal(keySerializerCalled, true);
|
|
t.deepEqual(v, Buffer.from('foo'));
|
|
t.equal(k, 'key');
|
|
next();
|
|
};
|
|
|
|
client.setValueSerializer(function(_) {
|
|
valueSerializerCalled = true;
|
|
t.deepEqual(_, v);
|
|
return Buffer.from('foo');
|
|
});
|
|
|
|
client.setKeySerializer(function(_, cb) {
|
|
keySerializerCalled = true;
|
|
t.deepEqual(_, k);
|
|
setImmediate(function() {
|
|
cb(null, 'key');
|
|
});
|
|
});
|
|
|
|
client.produce('tawpic', 0, v, k, null, function() {
|
|
|
|
});
|
|
},
|
|
|
|
'can use two asynchronous custom serializers': function(next) {
|
|
var v = {
|
|
disparaging: 'hyena',
|
|
};
|
|
|
|
var k = {
|
|
delicious: 'cookie',
|
|
};
|
|
|
|
var valueSerializerCalled = false;
|
|
var keySerializerCalled = false;
|
|
|
|
client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
|
|
t.equal(valueSerializerCalled, true);
|
|
t.equal(keySerializerCalled, true);
|
|
t.deepEqual(v, Buffer.from('foo'));
|
|
t.equal(k, 'key');
|
|
next();
|
|
};
|
|
|
|
client.setValueSerializer(function(_, cb) {
|
|
valueSerializerCalled = true;
|
|
t.deepEqual(_, v);
|
|
setImmediate(function() {
|
|
cb(null, Buffer.from('foo'));
|
|
});
|
|
});
|
|
|
|
client.setKeySerializer(function(_, cb) {
|
|
keySerializerCalled = true;
|
|
t.deepEqual(_, k);
|
|
setImmediate(function() {
|
|
cb(null, 'key');
|
|
});
|
|
});
|
|
|
|
client.produce('tawpic', 0, v, k, null, function() {
|
|
|
|
});
|
|
},
|
|
|
|
// Promise API
|
|
'can use a value promise-based custom serializer': function(next) {
|
|
var v = {
|
|
disparaging: 'hyena',
|
|
};
|
|
|
|
var k = {
|
|
delicious: 'cookie',
|
|
};
|
|
|
|
var valueSerializerCalled = false;
|
|
var keySerializerCalled = false;
|
|
|
|
client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
|
|
t.equal(valueSerializerCalled, true);
|
|
t.equal(keySerializerCalled, true);
|
|
next();
|
|
};
|
|
|
|
client.setValueSerializer(function(_) {
|
|
valueSerializerCalled = true;
|
|
t.deepEqual(_, v);
|
|
return new Promise(function(resolve) {
|
|
resolve(Buffer.from(''));
|
|
});
|
|
});
|
|
|
|
client.setKeySerializer(function(_) {
|
|
keySerializerCalled = true;
|
|
t.deepEqual(_, k);
|
|
return null;
|
|
});
|
|
|
|
client.produce('tawpic', 0, v, k, null, function() {
|
|
|
|
});
|
|
},
|
|
|
|
'can use a key promise-based custom serializer': function(next) {
|
|
var v = {
|
|
disparaging: 'hyena',
|
|
};
|
|
|
|
var k = {
|
|
delicious: 'cookie',
|
|
};
|
|
|
|
var valueSerializerCalled = false;
|
|
var keySerializerCalled = false;
|
|
|
|
client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
|
|
t.equal(valueSerializerCalled, true);
|
|
t.equal(keySerializerCalled, true);
|
|
t.deepEqual(v, Buffer.from('foo'));
|
|
t.equal(k, 'key');
|
|
next();
|
|
};
|
|
|
|
client.setValueSerializer(function(_) {
|
|
valueSerializerCalled = true;
|
|
t.deepEqual(_, v);
|
|
return Buffer.from('foo');
|
|
});
|
|
|
|
client.setKeySerializer(function(_) {
|
|
keySerializerCalled = true;
|
|
t.deepEqual(_, k);
|
|
return new Promise(function(resolve) {
|
|
resolve('key');
|
|
});
|
|
});
|
|
|
|
client.produce('tawpic', 0, v, k, null, function() {
|
|
|
|
});
|
|
},
|
|
|
|
'can use two promise-based custom serializers': function(next) {
|
|
var v = {
|
|
disparaging: 'hyena',
|
|
};
|
|
|
|
var k = {
|
|
delicious: 'cookie',
|
|
};
|
|
|
|
var valueSerializerCalled = false;
|
|
var keySerializerCalled = false;
|
|
|
|
client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
|
|
t.equal(valueSerializerCalled, true);
|
|
t.equal(keySerializerCalled, true);
|
|
t.deepEqual(v, Buffer.from('foo'));
|
|
t.equal(k, 'key');
|
|
next();
|
|
};
|
|
|
|
client.setValueSerializer(function(_) {
|
|
valueSerializerCalled = true;
|
|
t.deepEqual(_, v);
|
|
return new Promise(function(resolve) {
|
|
resolve(Buffer.from('foo'));
|
|
});
|
|
});
|
|
|
|
client.setKeySerializer(function(_) {
|
|
keySerializerCalled = true;
|
|
t.deepEqual(_, k);
|
|
return new Promise(function(resolve) {
|
|
resolve('key');
|
|
});
|
|
});
|
|
|
|
client.produce('tawpic', 0, v, k, null, function() {
|
|
|
|
});
|
|
},
|
|
|
|
'bubbles up serializer errors in an async value serializer': function(next) {
|
|
var v = {
|
|
disparaging: 'hyena',
|
|
};
|
|
|
|
var k = {
|
|
delicious: 'cookie',
|
|
};
|
|
|
|
client.setValueSerializer(function(_, cb) {
|
|
t.deepEqual(_, v);
|
|
setImmediate(function() {
|
|
cb(new Error('even together we failed'));
|
|
});
|
|
});
|
|
|
|
client.produce('tawpic', 0, v, k, null, function(err) {
|
|
t.equal(typeof err, 'object', 'an error should be returned');
|
|
next();
|
|
});
|
|
},
|
|
|
|
'bubbles up serializer errors in an async key serializer': function(next) {
|
|
var v = {
|
|
disparaging: 'hyena',
|
|
};
|
|
|
|
var k = {
|
|
delicious: 'cookie',
|
|
};
|
|
|
|
client.setKeySerializer(function(_, cb) {
|
|
t.deepEqual(_, v);
|
|
setImmediate(function() {
|
|
cb(new Error('even together we failed'));
|
|
});
|
|
});
|
|
|
|
client.produce('tawpic', 0, v, k, null, function(err) {
|
|
t.equal(typeof err, 'object', 'an error should be returned');
|
|
next();
|
|
});
|
|
},
|
|
|
|
'bubbles up serializer errors in a sync value serializer': function(next) {
|
|
var v = {
|
|
disparaging: 'hyena',
|
|
};
|
|
|
|
var k = {
|
|
delicious: 'cookie',
|
|
};
|
|
|
|
client.setValueSerializer(function(_, cb) {
|
|
t.deepEqual(_, v);
|
|
throw new Error('even together we failed');
|
|
});
|
|
|
|
client.produce('tawpic', 0, v, k, null, function(err) {
|
|
t.equal(typeof err, 'object', 'an error should be returned');
|
|
next();
|
|
});
|
|
},
|
|
|
|
'bubbles up serializer errors in a sync key serializer': function(next) {
|
|
var v = {
|
|
disparaging: 'hyena',
|
|
};
|
|
|
|
var k = {
|
|
delicious: 'cookie',
|
|
};
|
|
|
|
client.setKeySerializer(function(_, cb) {
|
|
t.deepEqual(_, v);
|
|
throw new Error('even together we failed');
|
|
});
|
|
|
|
client.produce('tawpic', 0, v, k, null, function(err) {
|
|
t.equal(typeof err, 'object', 'an error should be returned');
|
|
next();
|
|
});
|
|
},
|
|
|
|
'bubbles up serializer errors in a promise-based value serializer': function(next) {
|
|
var v = {
|
|
disparaging: 'hyena',
|
|
};
|
|
|
|
var k = {
|
|
delicious: 'cookie',
|
|
};
|
|
|
|
client.setValueSerializer(function(_) {
|
|
t.deepEqual(_, v);
|
|
|
|
return new Promise(function (resolve, reject) {
|
|
reject(new Error('even together we failed'));
|
|
});
|
|
});
|
|
|
|
client.produce('tawpic', 0, v, k, null, function(err) {
|
|
t.equal(typeof err, 'object', 'an error should be returned');
|
|
next();
|
|
});
|
|
},
|
|
|
|
'bubbles up serializer errors in a promise-based key serializer': function(next) {
|
|
var v = {
|
|
disparaging: 'hyena',
|
|
};
|
|
|
|
var k = {
|
|
delicious: 'cookie',
|
|
};
|
|
|
|
client.setKeySerializer(function(_) {
|
|
t.deepEqual(_, v);
|
|
|
|
return new Promise(function(resolve, reject) {
|
|
return new Promise(function (resolve, reject) {
|
|
reject(new Error('even together we failed'));
|
|
});
|
|
});
|
|
});
|
|
|
|
client.produce('tawpic', 0, v, k, null, function(err) {
|
|
t.equal(typeof err, 'object', 'an error should be returned');
|
|
next();
|
|
});
|
|
},
|
|
}
|
|
},
|
|
};
|