You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

193 lines
5.1 KiB

/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
var Kafka = require('../');
var t = require('assert');
var eventListener = require('./listener');
var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
var time = Date.now();
function pollForTopic(client, topicName, maxTries, tryDelay, cb, customCondition) {
var tries = 0;
function getTopicIfExists(innerCb) {
client.getMetadata({
topic: topicName,
}, function(metadataErr, metadata) {
if (metadataErr) {
cb(metadataErr);
return;
}
var topicFound = metadata.topics.filter(function(topicObj) {
var foundTopic = topicObj.name === topicName;
// If we have a custom condition for "foundedness", do it here after
// we make sure we are operating on the correct topic
if (foundTopic && customCondition) {
return customCondition(topicObj);
}
return foundTopic;
});
if (topicFound.length >= 1) {
innerCb(null, topicFound[0]);
return;
}
innerCb(new Error('Could not find topic ' + topicName));
});
}
function maybeFinish(err, obj) {
if (err) {
queueNextTry();
return;
}
cb(null, obj);
}
function queueNextTry() {
tries += 1;
if (tries < maxTries) {
setTimeout(function() {
getTopicIfExists(maybeFinish);
}, tryDelay);
} else {
cb(new Error('Exceeded max tries of ' + maxTries));
}
}
queueNextTry();
}
describe('Admin', function() {
var client;
var producer;
before(function(done) {
producer = new Kafka.Producer({
'metadata.broker.list': kafkaBrokerList,
});
producer.connect(null, function(err) {
t.ifError(err);
done();
});
});
after(function(done) {
producer.disconnect(function() {
done();
});
});
beforeEach(function() {
this.timeout(10000);
client = Kafka.AdminClient.create({
'client.id': 'kafka-test',
'metadata.broker.list': kafkaBrokerList
});
});
describe('createTopic', function() {
it('should create topic sucessfully', function(done) {
var topicName = 'admin-test-topic-' + time;
this.timeout(30000);
client.createTopic({
topic: topicName,
num_partitions: 1,
replication_factor: 1
}, function(err) {
pollForTopic(producer, topicName, 10, 1000, function(err) {
t.ifError(err);
done();
});
});
});
it('should raise an error when replication_factor is larger than number of brokers', function(done) {
var topicName = 'admin-test-topic-bad-' + time;
this.timeout(30000);
client.createTopic({
topic: topicName,
num_partitions: 9999,
replication_factor: 9999
}, function(err) {
t.equal(typeof err, 'object', 'an error should be returned');
done();
});
});
});
describe('deleteTopic', function() {
it('should be able to delete a topic after creation', function(done) {
var topicName = 'admin-test-topic-2bdeleted-' + time;
this.timeout(30000);
client.createTopic({
topic: topicName,
num_partitions: 1,
replication_factor: 1
}, function(err) {
pollForTopic(producer, topicName, 10, 1000, function(err) {
t.ifError(err);
client.deleteTopic(topicName, function(deleteErr) {
// Fail if we got an error
t.ifError(deleteErr);
done();
});
});
});
});
});
describe('createPartitions', function() {
it('should be able to add partitions to a topic after creation', function(done) {
var topicName = 'admin-test-topic-newparts-' + time;
this.timeout(30000);
client.createTopic({
topic: topicName,
num_partitions: 1,
replication_factor: 1
}, function(err) {
pollForTopic(producer, topicName, 10, 1000, function(err) {
t.ifError(err);
client.createPartitions(topicName, 20, function(createErr) {
pollForTopic(producer, topicName, 10, 1000, function(pollErr) {
t.ifError(pollErr);
done();
}, function(topic) {
return topic.partitions.length === 20;
});
});
});
});
});
it('should NOT be able to reduce partitions to a topic after creation', function(done) {
var topicName = 'admin-test-topic-newparts2-' + time;
this.timeout(30000);
client.createTopic({
topic: topicName,
num_partitions: 4,
replication_factor: 1
}, function(err) {
pollForTopic(producer, topicName, 10, 1000, function(err) {
t.ifError(err);
client.createPartitions(topicName, 1, function(createErr) {
t.equal(typeof createErr, 'object', 'an error should be returned');
done();
});
});
});
});
});
});