You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

690 lines
19 KiB

/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
var crypto = require('crypto');
var t = require('assert');
var Kafka = require('../');
var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
var eventListener = require('./listener');
var topic = 'test';
var topic2 = 'test2';
describe('Consumer/Producer', function() {
var producer;
var consumer;
beforeEach(function(done) {
var finished = 0;
var called = false;
function maybeDone(err) {
if (called) {
return;
}
finished++;
if (err) {
called = true;
return done(err);
}
if (finished === 2) {
done();
}
}
var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');
consumer = new Kafka.KafkaConsumer({
'metadata.broker.list': kafkaBrokerList,
'group.id': grp,
'fetch.wait.max.ms': 1000,
'session.timeout.ms': 10000,
'enable.auto.commit': true,
'enable.partition.eof': true,
'debug': 'all'
// paused: true,
}, {
'auto.offset.reset': 'largest'
});
consumer.connect({}, function(err, d) {
t.ifError(err);
t.equal(typeof d, 'object', 'metadata should be returned');
maybeDone(err);
});
eventListener(consumer);
producer = new Kafka.Producer({
'client.id': 'kafka-mocha',
'metadata.broker.list': kafkaBrokerList,
'fetch.wait.max.ms': 1,
'debug': 'all',
'dr_cb': true
}, {
'produce.offset.report': true
});
producer.connect({}, function(err, d) {
t.ifError(err);
t.equal(typeof d, 'object', 'metadata should be returned');
maybeDone(err);
});
eventListener(producer);
});
afterEach(function(done) {
this.timeout(6000);
var finished = 0;
var called = false;
function maybeDone(err) {
if (called) {
return;
}
finished++;
if (err) {
called = true;
return done(err);
}
if (finished === 2) {
done();
}
}
consumer.disconnect(function(err) {
maybeDone(err);
});
producer.disconnect(function(err) {
maybeDone(err);
});
});
it('should be able to produce, consume messages, read position: subscribe/consumeOnce', function(done) {
this.timeout(8000);
crypto.randomBytes(4096, function(ex, buffer) {
producer.setPollInterval(10);
var offset;
producer.once('delivery-report', function(err, report) {
t.ifError(err);
offset = report.offset;
});
consumer.setDefaultConsumeTimeout(10);
consumer.subscribe([topic]);
var ct;
var consumeOne = function() {
consumer.consume(1, function(err, messages) {
if (err && err.code === -185) {
ct = setTimeout(consumeOne, 100);
return;
} else if (messages.length === 0 || (err && err.code === -191)) {
producer.produce(topic, null, buffer, null);
ct = setTimeout(consumeOne, 100);
return;
} else if (err) {
return;
}
var message = messages[0];
t.equal(Array.isArray(consumer.assignments()), true, 'Assignments should be an array');
t.equal(consumer.assignments().length > 0, true, 'Should have at least one assignment');
t.equal(buffer.toString(), message.value.toString(),
'message is not equal to buffer');
// test consumer.position as we have consumed
var position = consumer.position();
t.equal(position.length, 1);
t.deepStrictEqual(position[0].partition, 0);
t.ok(position[0].offset >= 0);
done();
});
};
// Consume until we get it or time out
consumeOne();
});
});
it('should return ready messages on partition EOF', function(done) {
this.timeout(8000);
crypto.randomBytes(4096, function(ex, buffer) {
producer.setPollInterval(10);
producer.once('delivery-report', function(err, report) {
t.ifError(err);
});
consumer.subscribe([topic]);
var consumeAll = function() {
// Make sure we get the message fast when consuming with large timeout
consumer.setDefaultConsumeTimeout(1000000);
consumer.consume(100000, function(err, messages) {
t.ifError(err);
t.equal(messages.length, 1);
done();
});
};
var consumeNone = function() {
// With no new messages, the consume should wait whole timeout
var start = Date.now();
// Set the timeout to 2000ms to see that it actually waits the whole time
// (Needs to be higher than fetch.max.wait.ms which is 1000 here
// to ensure we don't only wait that long)
consumer.setDefaultConsumeTimeout(2000);
consumer.consume(100000, function(err, messages) {
t.ifError(err);
t.ok(Date.now() - start >= 1998);
t.equal(messages.length, 0);
// Produce one message to cause EOF with waiting message when consuming all
producer.produce(topic, null, buffer, null);
consumeAll();
});
};
consumeNone();
});
});
it('should emit partition.eof event when reaching end of partition', function(done) {
this.timeout(8000);
crypto.randomBytes(4096, function(ex, buffer) {
producer.setPollInterval(10);
producer.once('delivery-report', function(err, report) {
t.ifError(err);
});
consumer.subscribe([topic]);
var events = [];
consumer.once('data', function(msg) {
events.push("data");
});
consumer.once('partition.eof', function(eof) {
events.push("partition.eof");
});
setTimeout(function() {
producer.produce(topic, null, buffer, null);
}, 500)
consumer.setDefaultConsumeTimeout(2000);
consumer.consume(1000, function(err, messages) {
t.ifError(err);
t.equal(messages.length, 1);
t.deepStrictEqual(events, ["data", "partition.eof"]);
done();
});
});
});
it('should emit partition.eof when already at end of partition', function(done) {
this.timeout(8000);
crypto.randomBytes(4096, function(ex, buffer) {
producer.setPollInterval(10);
producer.once('delivery-report', function(err, report) {
t.ifError(err);
});
consumer.subscribe([topic]);
var events = [];
consumer.once('data', function(msg) {
events.push("data");
});
consumer.on('partition.eof', function(eof) {
events.push("partition.eof");
});
setTimeout(function() {
producer.produce(topic, null, buffer, null);
}, 2000)
consumer.setDefaultConsumeTimeout(3000);
consumer.consume(1000, function(err, messages) {
t.ifError(err);
t.equal(messages.length, 1);
t.deepStrictEqual(events, ["partition.eof", "data", "partition.eof"]);
done();
});
});
});
it('should be able to produce and consume messages: consumeLoop', function(done) {
var key = 'key';
this.timeout(5000);
crypto.randomBytes(4096, function(ex, buffer) {
producer.setPollInterval(10);
producer.once('delivery-report', function(err, report) {
if (!err) {
t.equal(topic, report.topic, 'invalid delivery-report topic');
t.equal(key, report.key, 'invalid delivery-report key');
t.ok(report.offset >= 0, 'invalid delivery-report offset');
}
});
consumer.on('data', function(message) {
t.equal(buffer.toString(), message.value.toString(), 'invalid message value');
t.equal(key, message.key, 'invalid message key');
t.equal(topic, message.topic, 'invalid message topic');
t.ok(message.offset >= 0, 'invalid message offset');
done();
});
consumer.subscribe([topic]);
consumer.consume();
setTimeout(function() {
producer.produce(topic, null, buffer, key);
}, 2000);
});
});
it('should emit \'partition.eof\' events in consumeLoop', function(done) {
this.timeout(7000);
crypto.randomBytes(4096, function(ex, buffer) {
producer.setPollInterval(10);
producer.once('delivery-report', function(err, report) {
t.ifError(err);
});
var events = [];
var offsets = [];
consumer.on('data', function(message) {
t.equal(message.topic, topic);
t.equal(message.partition, 0);
offsets.push(message.offset);
events.push('data');
});
consumer.on('partition.eof', function(eofEvent) {
t.equal(eofEvent.topic, topic);
t.equal(eofEvent.partition, 0);
offsets.push(eofEvent.offset);
events.push('partition.eof');
});
consumer.subscribe([topic]);
consumer.consume();
setTimeout(function() {
producer.produce(topic, null, buffer);
}, 2000);
setTimeout(function() {
producer.produce(topic, null, buffer);
}, 4000);
setTimeout(function() {
t.deepStrictEqual(events, ['partition.eof', 'data', 'partition.eof', 'data', 'partition.eof']);
var startOffset = offsets[0];
t.deepStrictEqual(offsets,
[ startOffset,
startOffset,
startOffset + 1,
startOffset + 1,
startOffset + 2 ]);
done();
}, 6000);
});
});
it('should emit [warning] event on UNKNOWN_TOPIC_OR_PART error: consumeLoop', function(done) {
consumer.on('warning', function (err) {
if (err.code === Kafka.CODES.ERRORS.ERR_UNKNOWN_TOPIC_OR_PART) {
consumer.disconnect(function() {
done();
});
} else {
t.ifError(err);
}
});
consumer.subscribe(['non_existing_topic']);
consumer.consume();
});
it('should be able to produce and consume messages with one header value as string: consumeLoop', function(done) {
var headers = [
{ key: "value" }
];
this.timeout(5000);
run_headers_test(done, headers);
});
it('should be able to produce and consume messages with one header value as buffer: consumeLoop', function(done) {
var headers = [
{ key: Buffer.from('value') }
];
this.timeout(5000);
run_headers_test(done, headers);
});
it('should be able to produce and consume messages with one header value as int: consumeLoop', function(done) {
var headers = [
{ key: 10 }
];
this.timeout(5000);
run_headers_test(done, headers);
});
it('should be able to produce and consume messages with one header value as float: consumeLoop', function(done) {
var headers = [
{ key: 1.11 }
];
this.timeout(5000);
run_headers_test(done, headers);
});
it('should be able to produce and consume messages with multiple headers value as buffer: consumeLoop', function(done) {
var headers = [
{ key1: Buffer.from('value1') },
{ key2: Buffer.from('value2') },
{ key3: Buffer.from('value3') },
{ key4: Buffer.from('value4') },
];
this.timeout(5000);
run_headers_test(done, headers);
});
it('should be able to produce and consume messages with multiple headers value as string: consumeLoop', function(done) {
var headers = [
{ key1: 'value1' },
{ key2: 'value2' },
{ key3: 'value3' },
{ key4: 'value4' },
];
this.timeout(5000);
run_headers_test(done, headers);
});
it('should be able to produce and consume messages with multiple headers with mixed values: consumeLoop', function(done) {
var headers = [
{ key1: 'value1' },
{ key2: Buffer.from('value2') },
{ key3: 100 },
{ key4: 10.1 },
];
this.timeout(5000);
run_headers_test(done, headers);
});
it('should be able to produce and consume messages: empty buffer key and empty value', function(done) {
this.timeout(20000);
var emptyString = '';
var key = Buffer.from(emptyString);
var value = Buffer.from('');
producer.setPollInterval(10);
consumer.once('data', function(message) {
t.notEqual(message.value, null, 'message should not be null');
t.equal(value.toString(), message.value.toString(), 'invalid message value');
t.equal(emptyString, message.key, 'invalid message key');
done();
});
consumer.subscribe([topic]);
consumer.consume();
setTimeout(function() {
producer.produce(topic, null, value, key);
}, 2000);
});
it('should be able to produce and consume messages: empty key and empty value', function(done) {
this.timeout(20000);
var key = '';
var value = Buffer.from('');
producer.setPollInterval(10);
consumer.once('data', function(message) {
t.notEqual(message.value, null, 'message should not be null');
t.equal(value.toString(), message.value.toString(), 'invalid message value');
t.equal(key, message.key, 'invalid message key');
done();
});
consumer.subscribe([topic]);
consumer.consume();
setTimeout(function() {
producer.produce(topic, null, value, key);
}, 2000);
});
it('should be able to produce and consume messages: null key and null value', function(done) {
this.timeout(20000);
var key = null;
var value = null;
producer.setPollInterval(10);
consumer.once('data', function(message) {
t.equal(value, message.value, 'invalid message value');
t.equal(key, message.key, 'invalid message key');
done();
});
consumer.subscribe([topic]);
consumer.consume();
setTimeout(function() {
producer.produce(topic, null, value, key);
}, 2000);
});
describe('Exceptional case - offset_commit_cb true', function() {
var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');
var consumerOpts = {
'metadata.broker.list': kafkaBrokerList,
'group.id': grp,
'fetch.wait.max.ms': 1000,
'session.timeout.ms': 10000,
'enable.auto.commit': false,
'debug': 'all',
'offset_commit_cb': true
};
beforeEach(function(done) {
consumer = new Kafka.KafkaConsumer(consumerOpts, {
'auto.offset.reset': 'largest',
});
consumer.connect({}, function(err, d) {
t.ifError(err);
t.equal(typeof d, 'object', 'metadata should be returned');
done();
});
eventListener(consumer);
});
afterEach(function(done) {
this.timeout(10000);
consumer.disconnect(function() {
done();
});
});
it('should async commit after consuming', function(done) {
this.timeout(25000);
var key = '';
var value = Buffer.from('');
var lastOffset = null;
consumer.once('data', function(message) {
lastOffset = message.offset;
// disconnect in offset commit callback
consumer.on('offset.commit', function(err, offsets) {
t.ifError(err);
t.equal(typeof offsets, 'object', 'offsets should be returned');
consumer.disconnect(function() {
// reconnect in disconnect callback
consumer.connect({}, function(err, d) {
t.ifError(err);
t.equal(typeof d, 'object', 'metadata should be returned');
// check that no new messages arrive, as the offset was committed
consumer.once('data', function(message) {
done(new Error('Should never be here'));
});
consumer.subscribe([topic]);
consumer.consume();
setTimeout(function() {
done();
}, 5000);
});
});
});
consumer.commitMessage(message);
});
consumer.subscribe([topic]);
consumer.consume();
setTimeout(function() {
producer.produce(topic, null, value, key);
}, 2000);
});
});
describe('Exceptional case - offset_commit_cb function', function() {
var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');
afterEach(function(done) {
this.timeout(10000);
consumer.disconnect(function() {
done();
});
});
it('should callback offset_commit_cb after commit', function(done) {
this.timeout(20000);
var consumerOpts = {
'metadata.broker.list': kafkaBrokerList,
'group.id': grp,
'fetch.wait.max.ms': 1000,
'session.timeout.ms': 10000,
'enable.auto.commit': false,
'debug': 'all',
'offset_commit_cb': function(offset) {
done();
}
};
consumer = new Kafka.KafkaConsumer(consumerOpts, {
'auto.offset.reset': 'largest',
});
eventListener(consumer);
consumer.connect({}, function(err, d) {
t.ifError(err);
t.equal(typeof d, 'object', 'metadata should be returned');
consumer.subscribe([topic]);
consumer.consume();
setTimeout(function() {
producer.produce(topic, null, Buffer.from(''), '');
}, 2000);
});
consumer.once('data', function(message) {
consumer.commitMessage(message);
});
});
});
function assert_headers_match(expectedHeaders, messageHeaders) {
t.equal(expectedHeaders.length, messageHeaders.length, 'Headers length does not match expected length');
for (var i = 0; i < expectedHeaders.length; i++) {
var expectedKey = Object.keys(expectedHeaders[i])[0];
var messageKey = Object.keys(messageHeaders[i]);
t.equal(messageKey.length, 1, 'Expected only one Header key');
t.equal(expectedKey, messageKey[0], 'Expected key does not match message key');
var expectedValue = Buffer.isBuffer(expectedHeaders[i][expectedKey]) ?
expectedHeaders[i][expectedKey].toString() :
expectedHeaders[i][expectedKey];
var actualValue = messageHeaders[i][expectedKey].toString();
t.equal(expectedValue, actualValue, 'invalid message header');
}
}
function run_headers_test(done, headers) {
var key = 'key';
crypto.randomBytes(4096, function(ex, buffer) {
producer.setPollInterval(10);
producer.once('delivery-report', function(err, report) {
if (!err) {
t.equal(topic, report.topic, 'invalid delivery-report topic');
t.equal(key, report.key, 'invalid delivery-report key');
t.ok(report.offset >= 0, 'invalid delivery-report offset');
}
});
consumer.on('data', function(message) {
t.equal(buffer.toString(), message.value.toString(), 'invalid message value');
t.equal(key, message.key, 'invalid message key');
t.equal(topic, message.topic, 'invalid message topic');
t.ok(message.offset >= 0, 'invalid message offset');
assert_headers_match(headers, message.headers);
done();
});
consumer.subscribe([topic]);
consumer.consume();
setTimeout(function() {
var timestamp = new Date().getTime();
producer.produce(topic, null, buffer, key, timestamp, "", headers);
}, 2000);
});
}
});