|
|
- /*
- * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
- *
- * Copyright (c) 2016 Blizzard Entertainment
- *
- * This software may be modified and distributed under the terms
- * of the MIT license. See the LICENSE.txt file for details.
- */
-
- var Kafka = require('../');
- var t = require('assert');
-
- var eventListener = require('./listener');
- var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
- var time = Date.now();
-
- function pollForTopic(client, topicName, maxTries, tryDelay, cb, customCondition) {
- var tries = 0;
-
- function getTopicIfExists(innerCb) {
- client.getMetadata({
- topic: topicName,
- }, function(metadataErr, metadata) {
- if (metadataErr) {
- cb(metadataErr);
- return;
- }
-
- var topicFound = metadata.topics.filter(function(topicObj) {
- var foundTopic = topicObj.name === topicName;
-
- // If we have a custom condition for "foundedness", do it here after
- // we make sure we are operating on the correct topic
- if (foundTopic && customCondition) {
- return customCondition(topicObj);
- }
- return foundTopic;
- });
-
- if (topicFound.length >= 1) {
- innerCb(null, topicFound[0]);
- return;
- }
-
- innerCb(new Error('Could not find topic ' + topicName));
- });
- }
-
- function maybeFinish(err, obj) {
- if (err) {
- queueNextTry();
- return;
- }
-
- cb(null, obj);
- }
-
- function queueNextTry() {
- tries += 1;
- if (tries < maxTries) {
- setTimeout(function() {
- getTopicIfExists(maybeFinish);
- }, tryDelay);
- } else {
- cb(new Error('Exceeded max tries of ' + maxTries));
- }
- }
-
- queueNextTry();
- }
-
- describe('Admin', function() {
- var client;
- var producer;
-
- before(function(done) {
- producer = new Kafka.Producer({
- 'metadata.broker.list': kafkaBrokerList,
- });
- producer.connect(null, function(err) {
- t.ifError(err);
- done();
- });
- });
-
- after(function(done) {
- producer.disconnect(function() {
- done();
- });
- });
-
- beforeEach(function() {
- this.timeout(10000);
- client = Kafka.AdminClient.create({
- 'client.id': 'kafka-test',
- 'metadata.broker.list': kafkaBrokerList
- });
- });
-
- describe('createTopic', function() {
- it('should create topic sucessfully', function(done) {
- var topicName = 'admin-test-topic-' + time;
- this.timeout(30000);
- client.createTopic({
- topic: topicName,
- num_partitions: 1,
- replication_factor: 1
- }, function(err) {
- pollForTopic(producer, topicName, 10, 1000, function(err) {
- t.ifError(err);
- done();
- });
- });
- });
-
- it('should raise an error when replication_factor is larger than number of brokers', function(done) {
- var topicName = 'admin-test-topic-bad-' + time;
- this.timeout(30000);
- client.createTopic({
- topic: topicName,
- num_partitions: 9999,
- replication_factor: 9999
- }, function(err) {
- t.equal(typeof err, 'object', 'an error should be returned');
- done();
- });
- });
- });
-
- describe('deleteTopic', function() {
- it('should be able to delete a topic after creation', function(done) {
- var topicName = 'admin-test-topic-2bdeleted-' + time;
- this.timeout(30000);
- client.createTopic({
- topic: topicName,
- num_partitions: 1,
- replication_factor: 1
- }, function(err) {
- pollForTopic(producer, topicName, 10, 1000, function(err) {
- t.ifError(err);
- client.deleteTopic(topicName, function(deleteErr) {
- // Fail if we got an error
- t.ifError(deleteErr);
- done();
- });
- });
- });
- });
- });
-
- describe('createPartitions', function() {
- it('should be able to add partitions to a topic after creation', function(done) {
- var topicName = 'admin-test-topic-newparts-' + time;
- this.timeout(30000);
- client.createTopic({
- topic: topicName,
- num_partitions: 1,
- replication_factor: 1
- }, function(err) {
- pollForTopic(producer, topicName, 10, 1000, function(err) {
- t.ifError(err);
- client.createPartitions(topicName, 20, function(createErr) {
- pollForTopic(producer, topicName, 10, 1000, function(pollErr) {
- t.ifError(pollErr);
- done();
- }, function(topic) {
- return topic.partitions.length === 20;
- });
- });
- });
- });
- });
-
- it('should NOT be able to reduce partitions to a topic after creation', function(done) {
- var topicName = 'admin-test-topic-newparts2-' + time;
- this.timeout(30000);
- client.createTopic({
- topic: topicName,
- num_partitions: 4,
- replication_factor: 1
- }, function(err) {
- pollForTopic(producer, topicName, 10, 1000, function(err) {
- t.ifError(err);
- client.createPartitions(topicName, 1, function(createErr) {
- t.equal(typeof createErr, 'object', 'an error should be returned');
- done();
- });
- });
- });
- });
- });
-
- });
|