/* * node-rdkafka - Node.js wrapper for RdKafka C/C++ library * * Copyright (c) 2016 Blizzard Entertainment * * This software may be modified and distributed under the terms * of the MIT license. See the LICENSE.txt file for details. */ var crypto = require('crypto'); var t = require('assert'); var Kafka = require('../'); var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092'; var eventListener = require('./listener'); var topic = 'test'; var topic2 = 'test2'; describe('Consumer/Producer', function() { var producer; var consumer; beforeEach(function(done) { var finished = 0; var called = false; function maybeDone(err) { if (called) { return; } finished++; if (err) { called = true; return done(err); } if (finished === 2) { done(); } } var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex'); consumer = new Kafka.KafkaConsumer({ 'metadata.broker.list': kafkaBrokerList, 'group.id': grp, 'fetch.wait.max.ms': 1000, 'session.timeout.ms': 10000, 'enable.auto.commit': true, 'enable.partition.eof': true, 'debug': 'all' // paused: true, }, { 'auto.offset.reset': 'largest' }); consumer.connect({}, function(err, d) { t.ifError(err); t.equal(typeof d, 'object', 'metadata should be returned'); maybeDone(err); }); eventListener(consumer); producer = new Kafka.Producer({ 'client.id': 'kafka-mocha', 'metadata.broker.list': kafkaBrokerList, 'fetch.wait.max.ms': 1, 'debug': 'all', 'dr_cb': true }, { 'produce.offset.report': true }); producer.connect({}, function(err, d) { t.ifError(err); t.equal(typeof d, 'object', 'metadata should be returned'); maybeDone(err); }); eventListener(producer); }); afterEach(function(done) { this.timeout(6000); var finished = 0; var called = false; function maybeDone(err) { if (called) { return; } finished++; if (err) { called = true; return done(err); } if (finished === 2) { done(); } } consumer.disconnect(function(err) { maybeDone(err); }); producer.disconnect(function(err) { maybeDone(err); }); }); it('should be able to produce, consume messages, read position: subscribe/consumeOnce', function(done) { this.timeout(8000); crypto.randomBytes(4096, function(ex, buffer) { producer.setPollInterval(10); var offset; producer.once('delivery-report', function(err, report) { t.ifError(err); offset = report.offset; }); consumer.setDefaultConsumeTimeout(10); consumer.subscribe([topic]); var ct; var consumeOne = function() { consumer.consume(1, function(err, messages) { if (err && err.code === -185) { ct = setTimeout(consumeOne, 100); return; } else if (messages.length === 0 || (err && err.code === -191)) { producer.produce(topic, null, buffer, null); ct = setTimeout(consumeOne, 100); return; } else if (err) { return; } var message = messages[0]; t.equal(Array.isArray(consumer.assignments()), true, 'Assignments should be an array'); t.equal(consumer.assignments().length > 0, true, 'Should have at least one assignment'); t.equal(buffer.toString(), message.value.toString(), 'message is not equal to buffer'); // test consumer.position as we have consumed var position = consumer.position(); t.equal(position.length, 1); t.deepStrictEqual(position[0].partition, 0); t.ok(position[0].offset >= 0); done(); }); }; // Consume until we get it or time out consumeOne(); }); }); it('should return ready messages on partition EOF', function(done) { this.timeout(8000); crypto.randomBytes(4096, function(ex, buffer) { producer.setPollInterval(10); producer.once('delivery-report', function(err, report) { t.ifError(err); }); consumer.subscribe([topic]); var consumeAll = function() { // Make sure we get the message fast when consuming with large timeout consumer.setDefaultConsumeTimeout(1000000); consumer.consume(100000, function(err, messages) { t.ifError(err); t.equal(messages.length, 1); done(); }); }; var consumeNone = function() { // With no new messages, the consume should wait whole timeout var start = Date.now(); // Set the timeout to 2000ms to see that it actually waits the whole time // (Needs to be higher than fetch.max.wait.ms which is 1000 here // to ensure we don't only wait that long) consumer.setDefaultConsumeTimeout(2000); consumer.consume(100000, function(err, messages) { t.ifError(err); t.ok(Date.now() - start >= 1998); t.equal(messages.length, 0); // Produce one message to cause EOF with waiting message when consuming all producer.produce(topic, null, buffer, null); consumeAll(); }); }; consumeNone(); }); }); it('should emit partition.eof event when reaching end of partition', function(done) { this.timeout(8000); crypto.randomBytes(4096, function(ex, buffer) { producer.setPollInterval(10); producer.once('delivery-report', function(err, report) { t.ifError(err); }); consumer.subscribe([topic]); var events = []; consumer.once('data', function(msg) { events.push("data"); }); consumer.once('partition.eof', function(eof) { events.push("partition.eof"); }); setTimeout(function() { producer.produce(topic, null, buffer, null); }, 500) consumer.setDefaultConsumeTimeout(2000); consumer.consume(1000, function(err, messages) { t.ifError(err); t.equal(messages.length, 1); t.deepStrictEqual(events, ["data", "partition.eof"]); done(); }); }); }); it('should emit partition.eof when already at end of partition', function(done) { this.timeout(8000); crypto.randomBytes(4096, function(ex, buffer) { producer.setPollInterval(10); producer.once('delivery-report', function(err, report) { t.ifError(err); }); consumer.subscribe([topic]); var events = []; consumer.once('data', function(msg) { events.push("data"); }); consumer.on('partition.eof', function(eof) { events.push("partition.eof"); }); setTimeout(function() { producer.produce(topic, null, buffer, null); }, 2000) consumer.setDefaultConsumeTimeout(3000); consumer.consume(1000, function(err, messages) { t.ifError(err); t.equal(messages.length, 1); t.deepStrictEqual(events, ["partition.eof", "data", "partition.eof"]); done(); }); }); }); it('should be able to produce and consume messages: consumeLoop', function(done) { var key = 'key'; this.timeout(5000); crypto.randomBytes(4096, function(ex, buffer) { producer.setPollInterval(10); producer.once('delivery-report', function(err, report) { if (!err) { t.equal(topic, report.topic, 'invalid delivery-report topic'); t.equal(key, report.key, 'invalid delivery-report key'); t.ok(report.offset >= 0, 'invalid delivery-report offset'); } }); consumer.on('data', function(message) { t.equal(buffer.toString(), message.value.toString(), 'invalid message value'); t.equal(key, message.key, 'invalid message key'); t.equal(topic, message.topic, 'invalid message topic'); t.ok(message.offset >= 0, 'invalid message offset'); done(); }); consumer.subscribe([topic]); consumer.consume(); setTimeout(function() { producer.produce(topic, null, buffer, key); }, 2000); }); }); it('should emit \'partition.eof\' events in consumeLoop', function(done) { this.timeout(7000); crypto.randomBytes(4096, function(ex, buffer) { producer.setPollInterval(10); producer.once('delivery-report', function(err, report) { t.ifError(err); }); var events = []; var offsets = []; consumer.on('data', function(message) { t.equal(message.topic, topic); t.equal(message.partition, 0); offsets.push(message.offset); events.push('data'); }); consumer.on('partition.eof', function(eofEvent) { t.equal(eofEvent.topic, topic); t.equal(eofEvent.partition, 0); offsets.push(eofEvent.offset); events.push('partition.eof'); }); consumer.subscribe([topic]); consumer.consume(); setTimeout(function() { producer.produce(topic, null, buffer); }, 2000); setTimeout(function() { producer.produce(topic, null, buffer); }, 4000); setTimeout(function() { t.deepStrictEqual(events, ['partition.eof', 'data', 'partition.eof', 'data', 'partition.eof']); var startOffset = offsets[0]; t.deepStrictEqual(offsets, [ startOffset, startOffset, startOffset + 1, startOffset + 1, startOffset + 2 ]); done(); }, 6000); }); }); it('should emit [warning] event on UNKNOWN_TOPIC_OR_PART error: consumeLoop', function(done) { consumer.on('warning', function (err) { if (err.code === Kafka.CODES.ERRORS.ERR_UNKNOWN_TOPIC_OR_PART) { consumer.disconnect(function() { done(); }); } else { t.ifError(err); } }); consumer.subscribe(['non_existing_topic']); consumer.consume(); }); it('should be able to produce and consume messages with one header value as string: consumeLoop', function(done) { var headers = [ { key: "value" } ]; this.timeout(5000); run_headers_test(done, headers); }); it('should be able to produce and consume messages with one header value as buffer: consumeLoop', function(done) { var headers = [ { key: Buffer.from('value') } ]; this.timeout(5000); run_headers_test(done, headers); }); it('should be able to produce and consume messages with one header value as int: consumeLoop', function(done) { var headers = [ { key: 10 } ]; this.timeout(5000); run_headers_test(done, headers); }); it('should be able to produce and consume messages with one header value as float: consumeLoop', function(done) { var headers = [ { key: 1.11 } ]; this.timeout(5000); run_headers_test(done, headers); }); it('should be able to produce and consume messages with multiple headers value as buffer: consumeLoop', function(done) { var headers = [ { key1: Buffer.from('value1') }, { key2: Buffer.from('value2') }, { key3: Buffer.from('value3') }, { key4: Buffer.from('value4') }, ]; this.timeout(5000); run_headers_test(done, headers); }); it('should be able to produce and consume messages with multiple headers value as string: consumeLoop', function(done) { var headers = [ { key1: 'value1' }, { key2: 'value2' }, { key3: 'value3' }, { key4: 'value4' }, ]; this.timeout(5000); run_headers_test(done, headers); }); it('should be able to produce and consume messages with multiple headers with mixed values: consumeLoop', function(done) { var headers = [ { key1: 'value1' }, { key2: Buffer.from('value2') }, { key3: 100 }, { key4: 10.1 }, ]; this.timeout(5000); run_headers_test(done, headers); }); it('should be able to produce and consume messages: empty buffer key and empty value', function(done) { this.timeout(20000); var emptyString = ''; var key = Buffer.from(emptyString); var value = Buffer.from(''); producer.setPollInterval(10); consumer.once('data', function(message) { t.notEqual(message.value, null, 'message should not be null'); t.equal(value.toString(), message.value.toString(), 'invalid message value'); t.equal(emptyString, message.key, 'invalid message key'); done(); }); consumer.subscribe([topic]); consumer.consume(); setTimeout(function() { producer.produce(topic, null, value, key); }, 2000); }); it('should be able to produce and consume messages: empty key and empty value', function(done) { this.timeout(20000); var key = ''; var value = Buffer.from(''); producer.setPollInterval(10); consumer.once('data', function(message) { t.notEqual(message.value, null, 'message should not be null'); t.equal(value.toString(), message.value.toString(), 'invalid message value'); t.equal(key, message.key, 'invalid message key'); done(); }); consumer.subscribe([topic]); consumer.consume(); setTimeout(function() { producer.produce(topic, null, value, key); }, 2000); }); it('should be able to produce and consume messages: null key and null value', function(done) { this.timeout(20000); var key = null; var value = null; producer.setPollInterval(10); consumer.once('data', function(message) { t.equal(value, message.value, 'invalid message value'); t.equal(key, message.key, 'invalid message key'); done(); }); consumer.subscribe([topic]); consumer.consume(); setTimeout(function() { producer.produce(topic, null, value, key); }, 2000); }); describe('Exceptional case - offset_commit_cb true', function() { var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex'); var consumerOpts = { 'metadata.broker.list': kafkaBrokerList, 'group.id': grp, 'fetch.wait.max.ms': 1000, 'session.timeout.ms': 10000, 'enable.auto.commit': false, 'debug': 'all', 'offset_commit_cb': true }; beforeEach(function(done) { consumer = new Kafka.KafkaConsumer(consumerOpts, { 'auto.offset.reset': 'largest', }); consumer.connect({}, function(err, d) { t.ifError(err); t.equal(typeof d, 'object', 'metadata should be returned'); done(); }); eventListener(consumer); }); afterEach(function(done) { this.timeout(10000); consumer.disconnect(function() { done(); }); }); it('should async commit after consuming', function(done) { this.timeout(25000); var key = ''; var value = Buffer.from(''); var lastOffset = null; consumer.once('data', function(message) { lastOffset = message.offset; // disconnect in offset commit callback consumer.on('offset.commit', function(err, offsets) { t.ifError(err); t.equal(typeof offsets, 'object', 'offsets should be returned'); consumer.disconnect(function() { // reconnect in disconnect callback consumer.connect({}, function(err, d) { t.ifError(err); t.equal(typeof d, 'object', 'metadata should be returned'); // check that no new messages arrive, as the offset was committed consumer.once('data', function(message) { done(new Error('Should never be here')); }); consumer.subscribe([topic]); consumer.consume(); setTimeout(function() { done(); }, 5000); }); }); }); consumer.commitMessage(message); }); consumer.subscribe([topic]); consumer.consume(); setTimeout(function() { producer.produce(topic, null, value, key); }, 2000); }); }); describe('Exceptional case - offset_commit_cb function', function() { var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex'); afterEach(function(done) { this.timeout(10000); consumer.disconnect(function() { done(); }); }); it('should callback offset_commit_cb after commit', function(done) { this.timeout(20000); var consumerOpts = { 'metadata.broker.list': kafkaBrokerList, 'group.id': grp, 'fetch.wait.max.ms': 1000, 'session.timeout.ms': 10000, 'enable.auto.commit': false, 'debug': 'all', 'offset_commit_cb': function(offset) { done(); } }; consumer = new Kafka.KafkaConsumer(consumerOpts, { 'auto.offset.reset': 'largest', }); eventListener(consumer); consumer.connect({}, function(err, d) { t.ifError(err); t.equal(typeof d, 'object', 'metadata should be returned'); consumer.subscribe([topic]); consumer.consume(); setTimeout(function() { producer.produce(topic, null, Buffer.from(''), ''); }, 2000); }); consumer.once('data', function(message) { consumer.commitMessage(message); }); }); }); function assert_headers_match(expectedHeaders, messageHeaders) { t.equal(expectedHeaders.length, messageHeaders.length, 'Headers length does not match expected length'); for (var i = 0; i < expectedHeaders.length; i++) { var expectedKey = Object.keys(expectedHeaders[i])[0]; var messageKey = Object.keys(messageHeaders[i]); t.equal(messageKey.length, 1, 'Expected only one Header key'); t.equal(expectedKey, messageKey[0], 'Expected key does not match message key'); var expectedValue = Buffer.isBuffer(expectedHeaders[i][expectedKey]) ? expectedHeaders[i][expectedKey].toString() : expectedHeaders[i][expectedKey]; var actualValue = messageHeaders[i][expectedKey].toString(); t.equal(expectedValue, actualValue, 'invalid message header'); } } function run_headers_test(done, headers) { var key = 'key'; crypto.randomBytes(4096, function(ex, buffer) { producer.setPollInterval(10); producer.once('delivery-report', function(err, report) { if (!err) { t.equal(topic, report.topic, 'invalid delivery-report topic'); t.equal(key, report.key, 'invalid delivery-report key'); t.ok(report.offset >= 0, 'invalid delivery-report offset'); } }); consumer.on('data', function(message) { t.equal(buffer.toString(), message.value.toString(), 'invalid message value'); t.equal(key, message.key, 'invalid message key'); t.equal(topic, message.topic, 'invalid message topic'); t.ok(message.offset >= 0, 'invalid message offset'); assert_headers_match(headers, message.headers); done(); }); consumer.subscribe([topic]); consumer.consume(); setTimeout(function() { var timestamp = new Date().getTime(); producer.produce(topic, null, buffer, key, timestamp, "", headers); }, 2000); }); } });