|
|
- /*
- * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
- *
- * Copyright (c) 2016 Blizzard Entertainment
- *
- * This software may be modified and distributed under the terms
- * of the MIT license. See the LICENSE.txt file for details.
- */
-
- var HighLevelProducer = require('../../lib/producer/high-level-producer');
- var t = require('assert');
- var Promise = require('bluebird');
- // var Mock = require('./mock');
-
- var client;
- var defaultConfig = {
- 'client.id': 'kafka-mocha',
- 'metadata.broker.list': 'localhost:9092',
- 'socket.timeout.ms': 250
- };
- var topicConfig = {};
-
- var server;
-
- module.exports = {
- 'High Level Producer client': {
- 'beforeEach': function() {
- client = new HighLevelProducer(defaultConfig, topicConfig);
- },
- 'afterEach': function() {
- client = null;
- },
- 'is an object': function() {
- t.equal(typeof(client), 'object');
- },
- 'requires configuration': function() {
- t.throws(function() {
- return new HighLevelProducer();
- });
- },
- 'has necessary methods from superclass': function() {
- var methods = ['_oldProduce'];
- methods.forEach(function(m) {
- t.equal(typeof(client[m]), 'function', 'Client is missing ' + m + ' method');
- });
- },
- 'does not modify config and clones it': function () {
- t.deepStrictEqual(defaultConfig, {
- 'client.id': 'kafka-mocha',
- 'metadata.broker.list': 'localhost:9092',
- 'socket.timeout.ms': 250
- });
- t.deepStrictEqual(client.globalConfig, {
- 'client.id': 'kafka-mocha',
- 'metadata.broker.list': 'localhost:9092',
- 'socket.timeout.ms': 250
- });
- t.notEqual(defaultConfig, client.globalConfig);
- },
- 'does not modify topic config and clones it': function () {
- t.deepStrictEqual(topicConfig, {});
- t.deepStrictEqual(client.topicConfig, {});
- t.notEqual(topicConfig, client.topicConfig);
- },
- 'produce method': {
- 'headers support': function(next) {
- var v = 'foo';
- var k = 'key';
- var h = [
- { key1: "value1A" },
- { key1: "value1B" },
- { key2: "value2" },
- { key1: "value1C" },
- ];
- var jsonH = JSON.stringify(h);
-
- client._oldProduce = function(topic, partition, value, key, timestamp, opaque, headers) {
- t.equal(value, 'foo');
- t.equal(key, 'key');
- t.equal(JSON.stringify(headers), jsonH);
- next();
- };
-
- client.produce('tawpic', 0, v, k, null, h, function() {
-
- });
- },
-
- 'can use a custom serializer': function(next) {
- var v = {
- disparaging: 'hyena',
- };
-
- var k = {
- delicious: 'cookie',
- };
-
- var valueSerializerCalled = false;
- var keySerializerCalled = false;
-
- client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
- t.equal(valueSerializerCalled, true);
- t.equal(keySerializerCalled, true);
- t.deepEqual(v, Buffer.from('foo'));
- t.equal(k, 'key');
- next();
- };
-
- client.setValueSerializer(function(_) {
- valueSerializerCalled = true;
- t.deepEqual(_, v);
- return Buffer.from('foo');
- });
-
- client.setKeySerializer(function(_) {
- keySerializerCalled = true;
- t.deepEqual(_, k);
- return 'key';
- });
-
- client.produce('tawpic', 0, v, k, null, function() {
-
- });
- },
-
- 'can use a value asynchronous custom serializer': function(next) {
- var v = {
- disparaging: 'hyena',
- };
-
- var k = {
- delicious: 'cookie',
- };
-
- var valueSerializerCalled = false;
- var keySerializerCalled = false;
-
- client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
- t.equal(valueSerializerCalled, true);
- t.equal(keySerializerCalled, true);
- t.deepEqual(v, Buffer.from('foo'));
- t.equal(k, 'key');
- next();
- };
-
- client.setValueSerializer(function(_, cb) {
- valueSerializerCalled = true;
- t.deepEqual(_, v);
- setImmediate(function() {
- cb(null, Buffer.from('foo'));
- });
- });
-
- client.setKeySerializer(function(_) {
- keySerializerCalled = true;
- t.deepEqual(_, k);
- return 'key';
- });
-
- client.produce('tawpic', 0, v, k, null, function() {
-
- });
- },
-
- 'can use a key asynchronous custom serializer': function(next) {
- var v = {
- disparaging: 'hyena',
- };
-
- var k = {
- delicious: 'cookie',
- };
-
- var valueSerializerCalled = false;
- var keySerializerCalled = false;
-
- client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
- t.equal(valueSerializerCalled, true);
- t.equal(keySerializerCalled, true);
- t.deepEqual(v, Buffer.from('foo'));
- t.equal(k, 'key');
- next();
- };
-
- client.setValueSerializer(function(_) {
- valueSerializerCalled = true;
- t.deepEqual(_, v);
- return Buffer.from('foo');
- });
-
- client.setKeySerializer(function(_, cb) {
- keySerializerCalled = true;
- t.deepEqual(_, k);
- setImmediate(function() {
- cb(null, 'key');
- });
- });
-
- client.produce('tawpic', 0, v, k, null, function() {
-
- });
- },
-
- 'can use two asynchronous custom serializers': function(next) {
- var v = {
- disparaging: 'hyena',
- };
-
- var k = {
- delicious: 'cookie',
- };
-
- var valueSerializerCalled = false;
- var keySerializerCalled = false;
-
- client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
- t.equal(valueSerializerCalled, true);
- t.equal(keySerializerCalled, true);
- t.deepEqual(v, Buffer.from('foo'));
- t.equal(k, 'key');
- next();
- };
-
- client.setValueSerializer(function(_, cb) {
- valueSerializerCalled = true;
- t.deepEqual(_, v);
- setImmediate(function() {
- cb(null, Buffer.from('foo'));
- });
- });
-
- client.setKeySerializer(function(_, cb) {
- keySerializerCalled = true;
- t.deepEqual(_, k);
- setImmediate(function() {
- cb(null, 'key');
- });
- });
-
- client.produce('tawpic', 0, v, k, null, function() {
-
- });
- },
-
- // Promise API
- 'can use a value promise-based custom serializer': function(next) {
- var v = {
- disparaging: 'hyena',
- };
-
- var k = {
- delicious: 'cookie',
- };
-
- var valueSerializerCalled = false;
- var keySerializerCalled = false;
-
- client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
- t.equal(valueSerializerCalled, true);
- t.equal(keySerializerCalled, true);
- next();
- };
-
- client.setValueSerializer(function(_) {
- valueSerializerCalled = true;
- t.deepEqual(_, v);
- return new Promise(function(resolve) {
- resolve(Buffer.from(''));
- });
- });
-
- client.setKeySerializer(function(_) {
- keySerializerCalled = true;
- t.deepEqual(_, k);
- return null;
- });
-
- client.produce('tawpic', 0, v, k, null, function() {
-
- });
- },
-
- 'can use a key promise-based custom serializer': function(next) {
- var v = {
- disparaging: 'hyena',
- };
-
- var k = {
- delicious: 'cookie',
- };
-
- var valueSerializerCalled = false;
- var keySerializerCalled = false;
-
- client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
- t.equal(valueSerializerCalled, true);
- t.equal(keySerializerCalled, true);
- t.deepEqual(v, Buffer.from('foo'));
- t.equal(k, 'key');
- next();
- };
-
- client.setValueSerializer(function(_) {
- valueSerializerCalled = true;
- t.deepEqual(_, v);
- return Buffer.from('foo');
- });
-
- client.setKeySerializer(function(_) {
- keySerializerCalled = true;
- t.deepEqual(_, k);
- return new Promise(function(resolve) {
- resolve('key');
- });
- });
-
- client.produce('tawpic', 0, v, k, null, function() {
-
- });
- },
-
- 'can use two promise-based custom serializers': function(next) {
- var v = {
- disparaging: 'hyena',
- };
-
- var k = {
- delicious: 'cookie',
- };
-
- var valueSerializerCalled = false;
- var keySerializerCalled = false;
-
- client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
- t.equal(valueSerializerCalled, true);
- t.equal(keySerializerCalled, true);
- t.deepEqual(v, Buffer.from('foo'));
- t.equal(k, 'key');
- next();
- };
-
- client.setValueSerializer(function(_) {
- valueSerializerCalled = true;
- t.deepEqual(_, v);
- return new Promise(function(resolve) {
- resolve(Buffer.from('foo'));
- });
- });
-
- client.setKeySerializer(function(_) {
- keySerializerCalled = true;
- t.deepEqual(_, k);
- return new Promise(function(resolve) {
- resolve('key');
- });
- });
-
- client.produce('tawpic', 0, v, k, null, function() {
-
- });
- },
-
- 'bubbles up serializer errors in an async value serializer': function(next) {
- var v = {
- disparaging: 'hyena',
- };
-
- var k = {
- delicious: 'cookie',
- };
-
- client.setValueSerializer(function(_, cb) {
- t.deepEqual(_, v);
- setImmediate(function() {
- cb(new Error('even together we failed'));
- });
- });
-
- client.produce('tawpic', 0, v, k, null, function(err) {
- t.equal(typeof err, 'object', 'an error should be returned');
- next();
- });
- },
-
- 'bubbles up serializer errors in an async key serializer': function(next) {
- var v = {
- disparaging: 'hyena',
- };
-
- var k = {
- delicious: 'cookie',
- };
-
- client.setKeySerializer(function(_, cb) {
- t.deepEqual(_, v);
- setImmediate(function() {
- cb(new Error('even together we failed'));
- });
- });
-
- client.produce('tawpic', 0, v, k, null, function(err) {
- t.equal(typeof err, 'object', 'an error should be returned');
- next();
- });
- },
-
- 'bubbles up serializer errors in a sync value serializer': function(next) {
- var v = {
- disparaging: 'hyena',
- };
-
- var k = {
- delicious: 'cookie',
- };
-
- client.setValueSerializer(function(_, cb) {
- t.deepEqual(_, v);
- throw new Error('even together we failed');
- });
-
- client.produce('tawpic', 0, v, k, null, function(err) {
- t.equal(typeof err, 'object', 'an error should be returned');
- next();
- });
- },
-
- 'bubbles up serializer errors in a sync key serializer': function(next) {
- var v = {
- disparaging: 'hyena',
- };
-
- var k = {
- delicious: 'cookie',
- };
-
- client.setKeySerializer(function(_, cb) {
- t.deepEqual(_, v);
- throw new Error('even together we failed');
- });
-
- client.produce('tawpic', 0, v, k, null, function(err) {
- t.equal(typeof err, 'object', 'an error should be returned');
- next();
- });
- },
-
- 'bubbles up serializer errors in a promise-based value serializer': function(next) {
- var v = {
- disparaging: 'hyena',
- };
-
- var k = {
- delicious: 'cookie',
- };
-
- client.setValueSerializer(function(_) {
- t.deepEqual(_, v);
-
- return new Promise(function (resolve, reject) {
- reject(new Error('even together we failed'));
- });
- });
-
- client.produce('tawpic', 0, v, k, null, function(err) {
- t.equal(typeof err, 'object', 'an error should be returned');
- next();
- });
- },
-
- 'bubbles up serializer errors in a promise-based key serializer': function(next) {
- var v = {
- disparaging: 'hyena',
- };
-
- var k = {
- delicious: 'cookie',
- };
-
- client.setKeySerializer(function(_) {
- t.deepEqual(_, v);
-
- return new Promise(function(resolve, reject) {
- return new Promise(function (resolve, reject) {
- reject(new Error('even together we failed'));
- });
- });
- });
-
- client.produce('tawpic', 0, v, k, null, function(err) {
- t.equal(typeof err, 'object', 'an error should be returned');
- next();
- });
- },
- }
- },
- };
|