- /*
- * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
- *
- * Copyright (c) 2016 Blizzard Entertainment
- *
- * This software may be modified and distributed under the terms
- * of the MIT license. See the LICENSE.txt file for details.
- */
- var Kafka = require('../');
- var t = require('assert');
- var crypto = require('crypto');
- var eventListener = require('./listener');
- var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
- var serviceStopped = false;
- describe('Producer', function() {
- var producer;
- describe('with dr_cb', function() {
- beforeEach(function(done) {
- producer = new Kafka.Producer({
- 'client.id': 'kafka-test',
- 'metadata.broker.list': kafkaBrokerList,
- 'dr_cb': true,
- 'debug': 'all'
- });
- producer.connect({}, function(err) {
- t.ifError(err);
- done();
- });
- eventListener(producer);
- });
- afterEach(function(done) {
- producer.disconnect(function() {
- done();
- });
- });
- it('should connect to Kafka', function(done) {
- producer.getMetadata({}, function(err, metadata) {
- t.ifError(err);
- t.ok(metadata);
- // Ensure it is in the correct format
- t.ok(metadata.orig_broker_name, 'Broker name is not set');
- t.notStrictEqual(metadata.orig_broker_id, undefined, 'Broker id is not set');
- t.equal(Array.isArray(metadata.brokers), true);
- t.equal(Array.isArray(metadata.topics), true);
- done();
- });
- });
- it('should produce a message with a null payload and null key', function(done) {
- this.timeout(3000);
- var tt = setInterval(function() {
- producer.poll();
- }, 200);
- producer.once('delivery-report', function(err, report) {
- clearInterval(tt);
- t.ifError(err);
- t.notStrictEqual(report, undefined);
- t.strictEqual(typeof report.topic, 'string');
- t.strictEqual(typeof report.partition, 'number');
- t.strictEqual(typeof report.offset, 'number');
- t.strictEqual( report.key, null);
- done();
- });
- producer.produce('test', null, null, null);
- });
- it('should produce a message with a payload and key', function(done) {
- this.timeout(3000);
- var tt = setInterval(function() {
- producer.poll();
- }, 200);
- producer.once('delivery-report', function(err, report) {
- clearInterval(tt);
- t.ifError(err);
- t.notStrictEqual(report, undefined);
- t.strictEqual(report.value, undefined);
- t.strictEqual(typeof report.topic, 'string');
- t.strictEqual(typeof report.partition, 'number');
- t.strictEqual(typeof report.offset, 'number');
- t.equal(report.key, 'key');
- done();
- });
- producer.produce('test', null, Buffer.from('value'), 'key');
- });
- it('should produce a message with a payload and key buffer', function(done) {
- this.timeout(3000);
- var tt = setInterval(function() {
- producer.poll();
- }, 200);
- producer.once('delivery-report', function(err, report) {
- clearInterval(tt);
- t.ifError(err);
- t.notStrictEqual(report, undefined);
- t.strictEqual(report.value, undefined);
- t.strictEqual(typeof report.topic, 'string');
- t.strictEqual(typeof report.partition, 'number');
- t.strictEqual(typeof report.offset, 'number');
- t.equal(report.key.length > 3, true);
- done();
- });
- producer.produce('test', null, Buffer.from('value'), Buffer.from('key\0s'));
- });
- it('should produce a message with an opaque', function(done) {
- this.timeout(3000);
- var tt = setInterval(function() {
- producer.poll();
- }, 200);
- producer.once('delivery-report', function(err, report) {
- clearInterval(tt);
- t.ifError(err);
- t.notStrictEqual(report, undefined);
- t.strictEqual(typeof report.topic, 'string');
- t.strictEqual(typeof report.partition, 'number');
- t.strictEqual(typeof report.offset, 'number');
- t.equal(report.opaque, 'opaque');
- done();
- });
- producer.produce('test', null, Buffer.from('value'), null, null, 'opaque');
- });
- it('should get 100% deliverability', function(done) {
- this.timeout(3000);
- var total = 0;
- var max = 10000;
- var verified_received = 0;
- var tt = setInterval(function() {
- producer.poll();
- }, 200);
- producer
- .on('delivery-report', function(err, report) {
- t.ifError(err);
- t.notStrictEqual(report, undefined);
- t.strictEqual(typeof report.topic, 'string');
- t.strictEqual(typeof report.partition, 'number');
- t.strictEqual(typeof report.offset, 'number');
- verified_received++;
- if (verified_received === max) {
- clearInterval(tt);
- done();
- }
- });
- // Produce
- for (total = 0; total <= max; total++) {
- producer.produce('test', null, Buffer.from('message ' + total), null);
- }
- });
- });
- describe('with_dr_msg_cb', function() {
- beforeEach(function(done) {
- producer = new Kafka.Producer({
- 'client.id': 'kafka-test',
- 'metadata.broker.list': kafkaBrokerList,
- 'dr_msg_cb': true,
- 'debug': 'all'
- });
- producer.connect({}, function(err) {
- t.ifError(err);
- done();
- });
- eventListener(producer);
- });
- afterEach(function(done) {
- producer.disconnect(function() {
- done();
- });
- });
- it('should produce a message with a payload and key', function(done) {
- this.timeout(3000);
- var tt = setInterval(function() {
- producer.poll();
- }, 200);
- producer.once('delivery-report', function(err, report) {
- clearInterval(tt);
- t.ifError(err);
- t.notStrictEqual(report, undefined);
- t.strictEqual(typeof report.topic, 'string');
- t.strictEqual(typeof report.partition, 'number');
- t.strictEqual(typeof report.offset, 'number');
- t.ok(report.key.toString(), 'key');
- t.equal(report.value.toString(), 'hai');
- done();
- });
- producer.produce('test', null, Buffer.from('hai'), 'key');
- });
- it('should produce a message with an empty payload and empty key (https://github.com/Blizzard/node-rdkafka/issues/117)', function(done) {
- this.timeout(3000);
- var tt = setInterval(function() {
- producer.poll();
- }, 200);
- producer.once('delivery-report', function(err, report) {
- clearInterval(tt);
- t.ifError(err);
- t.notStrictEqual(report, undefined);
- t.strictEqual(typeof report.topic, 'string');
- t.strictEqual(typeof report.partition, 'number');
- t.strictEqual(typeof report.offset, 'number');
- t.equal(report.key.toString(), '', 'key should be an empty string');
- t.strictEqual(report.value.toString(), '', 'payload should be an empty string');
- done();
- });
- producer.produce('test', null, Buffer.from(''), '');
- });
- it('should produce a message with a null payload and null key (https://github.com/Blizzard/node-rdkafka/issues/117)', function(done) {
- this.timeout(3000);
- producer.setPollInterval(10);
- producer.once('delivery-report', function(err, report) {
- t.ifError(err);
- t.notStrictEqual(report, undefined);
- t.strictEqual(typeof report.topic, 'string');
- t.strictEqual(typeof report.partition, 'number');
- t.strictEqual(typeof report.offset, 'number');
- t.strictEqual(report.key, null, 'key should be null');
- t.strictEqual(report.value, null, 'payload should be null');
- done();
- });
- producer.produce('test', null, null, null);
- });
- it('should produce an int64 key (https://github.com/Blizzard/node-rdkafka/issues/208)', function(done) {
- var v1 = 0x0000000000000084;
- var arr = new Uint8Array(8);
- arr[0] = 0x00;
- arr[1] = 0x00;
- arr[2] = 0x00;
- arr[3] = 0x00;
- arr[4] = 0x00;
- arr[5] = 0x00;
- arr[6] = 0x00;
- arr[7] = 84;
- var buf = Buffer.from(arr.buffer);
- producer.setPollInterval(10);
- producer.once('delivery-report', function(err, report) {
- t.ifError(err);
- t.notStrictEqual(report, undefined);
- t.deepEqual(buf, report.key);
- done();
- });
- producer.produce('test', null, null, Buffer.from(arr.buffer));
- this.timeout(3000);
- });
- });
- });