|
|
- /*
- * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
- *
- * Copyright (c) 2016 Blizzard Entertainment
- *
- * This software may be modified and distributed under the terms
- * of the MIT license. See the LICENSE.txt file for details.
- */
-
- var crypto = require('crypto');
- var t = require('assert');
-
- var Kafka = require('../');
- var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
- var eventListener = require('./listener');
-
- describe('Consumer group/Producer', function() {
-
- var producer;
- var consumer;
- var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');
-
- var config = {
- 'metadata.broker.list': kafkaBrokerList,
- 'group.id': grp,
- 'fetch.wait.max.ms': 1000,
- 'session.timeout.ms': 10000,
- 'enable.auto.commit': false,
- 'debug': 'all'
- };
-
- beforeEach(function(done) {
- producer = new Kafka.Producer({
- 'client.id': 'kafka-mocha',
- 'metadata.broker.list': kafkaBrokerList,
- 'fetch.wait.max.ms': 1,
- 'debug': 'all',
- 'dr_cb': true
- });
-
- producer.connect({}, function(err, d) {
- t.ifError(err);
- t.equal(typeof d, 'object', 'metadata should be returned');
- done();
- });
-
- eventListener(producer);
- });
-
- beforeEach(function(done) {
- consumer = new Kafka.KafkaConsumer(config, {
- 'auto.offset.reset': 'largest'
- });
-
- consumer.connect({}, function(err, d) {
- t.ifError(err);
- t.equal(typeof d, 'object', 'metadata should be returned');
- done();
- });
-
- eventListener(consumer);
- });
-
- afterEach(function(done) {
- producer.disconnect(function() {
- done();
- });
- });
-
- it('should be able to commit, read committed and restart from the committed offset', function(done) {
- this.timeout(30000);
- var topic = 'test';
- var key = 'key';
- var payload = Buffer.from('value');
- var count = 0;
- var offsets = {
- 'first': true
- };
-
- var tt = setInterval(function() {
- try {
- producer.produce(topic, null, payload, key);
- } catch (e) {
- clearInterval(tt);
- }
- }, 100);
-
- consumer.on('disconnected', function() {
- var consumer2 = new Kafka.KafkaConsumer(config, {
- 'auto.offset.reset': 'largest'
- });
-
- consumer2.on('data', function(message) {
- if (offsets.first) {
- offsets.first = false;
- t.deepStrictEqual(offsets.committed, message.offset, 'Offset read by consumer 2 incorrect');
- clearInterval(tt);
- consumer2.unsubscribe();
- consumer2.disconnect(function() {
- done();
- });
- }
- });
-
- consumer2.on('ready', function() {
- consumer2.subscribe([topic]);
- consumer2.consume();
- });
- consumer2.connect();
- });
-
- consumer.on('data', function(message) {
- count++;
- if (count === 3) {
- consumer.commitMessageSync(message);
- // test consumer.committed( ) API
- consumer.committed(null, 5000, function(err, topicPartitions) {
- t.ifError(err);
- t.deepStrictEqual(topicPartitions.length, 1);
- t.deepStrictEqual(topicPartitions[0].offset, message.offset + 1, 'Offset read by consumer 1 incorrect');
- offsets.committed = message.offset + 1;
- consumer.unsubscribe();
- consumer.disconnect();
- });
-
- }
- });
-
- consumer.subscribe([topic]);
- consumer.consume();
- });
-
- });
|