You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

193 lines
5.1 KiB

  1. /*
  2. * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
  3. *
  4. * Copyright (c) 2016 Blizzard Entertainment
  5. *
  6. * This software may be modified and distributed under the terms
  7. * of the MIT license. See the LICENSE.txt file for details.
  8. */
  9. var Kafka = require('../');
  10. var t = require('assert');
  11. var eventListener = require('./listener');
  12. var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
  13. var time = Date.now();
  14. function pollForTopic(client, topicName, maxTries, tryDelay, cb, customCondition) {
  15. var tries = 0;
  16. function getTopicIfExists(innerCb) {
  17. client.getMetadata({
  18. topic: topicName,
  19. }, function(metadataErr, metadata) {
  20. if (metadataErr) {
  21. cb(metadataErr);
  22. return;
  23. }
  24. var topicFound = metadata.topics.filter(function(topicObj) {
  25. var foundTopic = topicObj.name === topicName;
  26. // If we have a custom condition for "foundedness", do it here after
  27. // we make sure we are operating on the correct topic
  28. if (foundTopic && customCondition) {
  29. return customCondition(topicObj);
  30. }
  31. return foundTopic;
  32. });
  33. if (topicFound.length >= 1) {
  34. innerCb(null, topicFound[0]);
  35. return;
  36. }
  37. innerCb(new Error('Could not find topic ' + topicName));
  38. });
  39. }
  40. function maybeFinish(err, obj) {
  41. if (err) {
  42. queueNextTry();
  43. return;
  44. }
  45. cb(null, obj);
  46. }
  47. function queueNextTry() {
  48. tries += 1;
  49. if (tries < maxTries) {
  50. setTimeout(function() {
  51. getTopicIfExists(maybeFinish);
  52. }, tryDelay);
  53. } else {
  54. cb(new Error('Exceeded max tries of ' + maxTries));
  55. }
  56. }
  57. queueNextTry();
  58. }
  59. describe('Admin', function() {
  60. var client;
  61. var producer;
  62. before(function(done) {
  63. producer = new Kafka.Producer({
  64. 'metadata.broker.list': kafkaBrokerList,
  65. });
  66. producer.connect(null, function(err) {
  67. t.ifError(err);
  68. done();
  69. });
  70. });
  71. after(function(done) {
  72. producer.disconnect(function() {
  73. done();
  74. });
  75. });
  76. beforeEach(function() {
  77. this.timeout(10000);
  78. client = Kafka.AdminClient.create({
  79. 'client.id': 'kafka-test',
  80. 'metadata.broker.list': kafkaBrokerList
  81. });
  82. });
  83. describe('createTopic', function() {
  84. it('should create topic sucessfully', function(done) {
  85. var topicName = 'admin-test-topic-' + time;
  86. this.timeout(30000);
  87. client.createTopic({
  88. topic: topicName,
  89. num_partitions: 1,
  90. replication_factor: 1
  91. }, function(err) {
  92. pollForTopic(producer, topicName, 10, 1000, function(err) {
  93. t.ifError(err);
  94. done();
  95. });
  96. });
  97. });
  98. it('should raise an error when replication_factor is larger than number of brokers', function(done) {
  99. var topicName = 'admin-test-topic-bad-' + time;
  100. this.timeout(30000);
  101. client.createTopic({
  102. topic: topicName,
  103. num_partitions: 9999,
  104. replication_factor: 9999
  105. }, function(err) {
  106. t.equal(typeof err, 'object', 'an error should be returned');
  107. done();
  108. });
  109. });
  110. });
  111. describe('deleteTopic', function() {
  112. it('should be able to delete a topic after creation', function(done) {
  113. var topicName = 'admin-test-topic-2bdeleted-' + time;
  114. this.timeout(30000);
  115. client.createTopic({
  116. topic: topicName,
  117. num_partitions: 1,
  118. replication_factor: 1
  119. }, function(err) {
  120. pollForTopic(producer, topicName, 10, 1000, function(err) {
  121. t.ifError(err);
  122. client.deleteTopic(topicName, function(deleteErr) {
  123. // Fail if we got an error
  124. t.ifError(deleteErr);
  125. done();
  126. });
  127. });
  128. });
  129. });
  130. });
  131. describe('createPartitions', function() {
  132. it('should be able to add partitions to a topic after creation', function(done) {
  133. var topicName = 'admin-test-topic-newparts-' + time;
  134. this.timeout(30000);
  135. client.createTopic({
  136. topic: topicName,
  137. num_partitions: 1,
  138. replication_factor: 1
  139. }, function(err) {
  140. pollForTopic(producer, topicName, 10, 1000, function(err) {
  141. t.ifError(err);
  142. client.createPartitions(topicName, 20, function(createErr) {
  143. pollForTopic(producer, topicName, 10, 1000, function(pollErr) {
  144. t.ifError(pollErr);
  145. done();
  146. }, function(topic) {
  147. return topic.partitions.length === 20;
  148. });
  149. });
  150. });
  151. });
  152. });
  153. it('should NOT be able to reduce partitions to a topic after creation', function(done) {
  154. var topicName = 'admin-test-topic-newparts2-' + time;
  155. this.timeout(30000);
  156. client.createTopic({
  157. topic: topicName,
  158. num_partitions: 4,
  159. replication_factor: 1
  160. }, function(err) {
  161. pollForTopic(producer, topicName, 10, 1000, function(err) {
  162. t.ifError(err);
  163. client.createPartitions(topicName, 1, function(createErr) {
  164. t.equal(typeof createErr, 'object', 'an error should be returned');
  165. done();
  166. });
  167. });
  168. });
  169. });
  170. });
  171. });