You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

300 lines
8.4 KiB

/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
var Kafka = require('../');
var t = require('assert');
var crypto = require('crypto');
var eventListener = require('./listener');
var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
var serviceStopped = false;
describe('Producer', function() {
var producer;
describe('with dr_cb', function() {
beforeEach(function(done) {
producer = new Kafka.Producer({
'client.id': 'kafka-test',
'metadata.broker.list': kafkaBrokerList,
'dr_cb': true,
'debug': 'all'
});
producer.connect({}, function(err) {
t.ifError(err);
done();
});
eventListener(producer);
});
afterEach(function(done) {
producer.disconnect(function() {
done();
});
});
it('should connect to Kafka', function(done) {
producer.getMetadata({}, function(err, metadata) {
t.ifError(err);
t.ok(metadata);
// Ensure it is in the correct format
t.ok(metadata.orig_broker_name, 'Broker name is not set');
t.notStrictEqual(metadata.orig_broker_id, undefined, 'Broker id is not set');
t.equal(Array.isArray(metadata.brokers), true);
t.equal(Array.isArray(metadata.topics), true);
done();
});
});
it('should produce a message with a null payload and null key', function(done) {
this.timeout(3000);
var tt = setInterval(function() {
producer.poll();
}, 200);
producer.once('delivery-report', function(err, report) {
clearInterval(tt);
t.ifError(err);
t.notStrictEqual(report, undefined);
t.strictEqual(typeof report.topic, 'string');
t.strictEqual(typeof report.partition, 'number');
t.strictEqual(typeof report.offset, 'number');
t.strictEqual( report.key, null);
done();
});
producer.produce('test', null, null, null);
});
it('should produce a message with a payload and key', function(done) {
this.timeout(3000);
var tt = setInterval(function() {
producer.poll();
}, 200);
producer.once('delivery-report', function(err, report) {
clearInterval(tt);
t.ifError(err);
t.notStrictEqual(report, undefined);
t.strictEqual(report.value, undefined);
t.strictEqual(typeof report.topic, 'string');
t.strictEqual(typeof report.partition, 'number');
t.strictEqual(typeof report.offset, 'number');
t.equal(report.key, 'key');
done();
});
producer.produce('test', null, Buffer.from('value'), 'key');
});
it('should produce a message with a payload and key buffer', function(done) {
this.timeout(3000);
var tt = setInterval(function() {
producer.poll();
}, 200);
producer.once('delivery-report', function(err, report) {
clearInterval(tt);
t.ifError(err);
t.notStrictEqual(report, undefined);
t.strictEqual(report.value, undefined);
t.strictEqual(typeof report.topic, 'string');
t.strictEqual(typeof report.partition, 'number');
t.strictEqual(typeof report.offset, 'number');
t.equal(report.key.length > 3, true);
done();
});
producer.produce('test', null, Buffer.from('value'), Buffer.from('key\0s'));
});
it('should produce a message with an opaque', function(done) {
this.timeout(3000);
var tt = setInterval(function() {
producer.poll();
}, 200);
producer.once('delivery-report', function(err, report) {
clearInterval(tt);
t.ifError(err);
t.notStrictEqual(report, undefined);
t.strictEqual(typeof report.topic, 'string');
t.strictEqual(typeof report.partition, 'number');
t.strictEqual(typeof report.offset, 'number');
t.equal(report.opaque, 'opaque');
done();
});
producer.produce('test', null, Buffer.from('value'), null, null, 'opaque');
});
it('should get 100% deliverability', function(done) {
this.timeout(3000);
var total = 0;
var max = 10000;
var verified_received = 0;
var tt = setInterval(function() {
producer.poll();
}, 200);
producer
.on('delivery-report', function(err, report) {
t.ifError(err);
t.notStrictEqual(report, undefined);
t.strictEqual(typeof report.topic, 'string');
t.strictEqual(typeof report.partition, 'number');
t.strictEqual(typeof report.offset, 'number');
verified_received++;
if (verified_received === max) {
clearInterval(tt);
done();
}
});
// Produce
for (total = 0; total <= max; total++) {
producer.produce('test', null, Buffer.from('message ' + total), null);
}
});
});
describe('with_dr_msg_cb', function() {
beforeEach(function(done) {
producer = new Kafka.Producer({
'client.id': 'kafka-test',
'metadata.broker.list': kafkaBrokerList,
'dr_msg_cb': true,
'debug': 'all'
});
producer.connect({}, function(err) {
t.ifError(err);
done();
});
eventListener(producer);
});
afterEach(function(done) {
producer.disconnect(function() {
done();
});
});
it('should produce a message with a payload and key', function(done) {
this.timeout(3000);
var tt = setInterval(function() {
producer.poll();
}, 200);
producer.once('delivery-report', function(err, report) {
clearInterval(tt);
t.ifError(err);
t.notStrictEqual(report, undefined);
t.strictEqual(typeof report.topic, 'string');
t.strictEqual(typeof report.partition, 'number');
t.strictEqual(typeof report.offset, 'number');
t.ok(report.key.toString(), 'key');
t.equal(report.value.toString(), 'hai');
done();
});
producer.produce('test', null, Buffer.from('hai'), 'key');
});
it('should produce a message with an empty payload and empty key (https://github.com/Blizzard/node-rdkafka/issues/117)', function(done) {
this.timeout(3000);
var tt = setInterval(function() {
producer.poll();
}, 200);
producer.once('delivery-report', function(err, report) {
clearInterval(tt);
t.ifError(err);
t.notStrictEqual(report, undefined);
t.strictEqual(typeof report.topic, 'string');
t.strictEqual(typeof report.partition, 'number');
t.strictEqual(typeof report.offset, 'number');
t.equal(report.key.toString(), '', 'key should be an empty string');
t.strictEqual(report.value.toString(), '', 'payload should be an empty string');
done();
});
producer.produce('test', null, Buffer.from(''), '');
});
it('should produce a message with a null payload and null key (https://github.com/Blizzard/node-rdkafka/issues/117)', function(done) {
this.timeout(3000);
producer.setPollInterval(10);
producer.once('delivery-report', function(err, report) {
t.ifError(err);
t.notStrictEqual(report, undefined);
t.strictEqual(typeof report.topic, 'string');
t.strictEqual(typeof report.partition, 'number');
t.strictEqual(typeof report.offset, 'number');
t.strictEqual(report.key, null, 'key should be null');
t.strictEqual(report.value, null, 'payload should be null');
done();
});
producer.produce('test', null, null, null);
});
it('should produce an int64 key (https://github.com/Blizzard/node-rdkafka/issues/208)', function(done) {
var v1 = 0x0000000000000084;
var arr = new Uint8Array(8);
arr[0] = 0x00;
arr[1] = 0x00;
arr[2] = 0x00;
arr[3] = 0x00;
arr[4] = 0x00;
arr[5] = 0x00;
arr[6] = 0x00;
arr[7] = 84;
var buf = Buffer.from(arr.buffer);
producer.setPollInterval(10);
producer.once('delivery-report', function(err, report) {
t.ifError(err);
t.notStrictEqual(report, undefined);
t.deepEqual(buf, report.key);
done();
});
producer.produce('test', null, null, Buffer.from(arr.buffer));
this.timeout(3000);
});
});
});