You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

690 lines
19 KiB

  1. /*
  2. * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
  3. *
  4. * Copyright (c) 2016 Blizzard Entertainment
  5. *
  6. * This software may be modified and distributed under the terms
  7. * of the MIT license. See the LICENSE.txt file for details.
  8. */
  9. var crypto = require('crypto');
  10. var t = require('assert');
  11. var Kafka = require('../');
  12. var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
  13. var eventListener = require('./listener');
  14. var topic = 'test';
  15. var topic2 = 'test2';
  16. describe('Consumer/Producer', function() {
  17. var producer;
  18. var consumer;
  19. beforeEach(function(done) {
  20. var finished = 0;
  21. var called = false;
  22. function maybeDone(err) {
  23. if (called) {
  24. return;
  25. }
  26. finished++;
  27. if (err) {
  28. called = true;
  29. return done(err);
  30. }
  31. if (finished === 2) {
  32. done();
  33. }
  34. }
  35. var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');
  36. consumer = new Kafka.KafkaConsumer({
  37. 'metadata.broker.list': kafkaBrokerList,
  38. 'group.id': grp,
  39. 'fetch.wait.max.ms': 1000,
  40. 'session.timeout.ms': 10000,
  41. 'enable.auto.commit': true,
  42. 'enable.partition.eof': true,
  43. 'debug': 'all'
  44. // paused: true,
  45. }, {
  46. 'auto.offset.reset': 'largest'
  47. });
  48. consumer.connect({}, function(err, d) {
  49. t.ifError(err);
  50. t.equal(typeof d, 'object', 'metadata should be returned');
  51. maybeDone(err);
  52. });
  53. eventListener(consumer);
  54. producer = new Kafka.Producer({
  55. 'client.id': 'kafka-mocha',
  56. 'metadata.broker.list': kafkaBrokerList,
  57. 'fetch.wait.max.ms': 1,
  58. 'debug': 'all',
  59. 'dr_cb': true
  60. }, {
  61. 'produce.offset.report': true
  62. });
  63. producer.connect({}, function(err, d) {
  64. t.ifError(err);
  65. t.equal(typeof d, 'object', 'metadata should be returned');
  66. maybeDone(err);
  67. });
  68. eventListener(producer);
  69. });
  70. afterEach(function(done) {
  71. this.timeout(6000);
  72. var finished = 0;
  73. var called = false;
  74. function maybeDone(err) {
  75. if (called) {
  76. return;
  77. }
  78. finished++;
  79. if (err) {
  80. called = true;
  81. return done(err);
  82. }
  83. if (finished === 2) {
  84. done();
  85. }
  86. }
  87. consumer.disconnect(function(err) {
  88. maybeDone(err);
  89. });
  90. producer.disconnect(function(err) {
  91. maybeDone(err);
  92. });
  93. });
  94. it('should be able to produce, consume messages, read position: subscribe/consumeOnce', function(done) {
  95. this.timeout(8000);
  96. crypto.randomBytes(4096, function(ex, buffer) {
  97. producer.setPollInterval(10);
  98. var offset;
  99. producer.once('delivery-report', function(err, report) {
  100. t.ifError(err);
  101. offset = report.offset;
  102. });
  103. consumer.setDefaultConsumeTimeout(10);
  104. consumer.subscribe([topic]);
  105. var ct;
  106. var consumeOne = function() {
  107. consumer.consume(1, function(err, messages) {
  108. if (err && err.code === -185) {
  109. ct = setTimeout(consumeOne, 100);
  110. return;
  111. } else if (messages.length === 0 || (err && err.code === -191)) {
  112. producer.produce(topic, null, buffer, null);
  113. ct = setTimeout(consumeOne, 100);
  114. return;
  115. } else if (err) {
  116. return;
  117. }
  118. var message = messages[0];
  119. t.equal(Array.isArray(consumer.assignments()), true, 'Assignments should be an array');
  120. t.equal(consumer.assignments().length > 0, true, 'Should have at least one assignment');
  121. t.equal(buffer.toString(), message.value.toString(),
  122. 'message is not equal to buffer');
  123. // test consumer.position as we have consumed
  124. var position = consumer.position();
  125. t.equal(position.length, 1);
  126. t.deepStrictEqual(position[0].partition, 0);
  127. t.ok(position[0].offset >= 0);
  128. done();
  129. });
  130. };
  131. // Consume until we get it or time out
  132. consumeOne();
  133. });
  134. });
  135. it('should return ready messages on partition EOF', function(done) {
  136. this.timeout(8000);
  137. crypto.randomBytes(4096, function(ex, buffer) {
  138. producer.setPollInterval(10);
  139. producer.once('delivery-report', function(err, report) {
  140. t.ifError(err);
  141. });
  142. consumer.subscribe([topic]);
  143. var consumeAll = function() {
  144. // Make sure we get the message fast when consuming with large timeout
  145. consumer.setDefaultConsumeTimeout(1000000);
  146. consumer.consume(100000, function(err, messages) {
  147. t.ifError(err);
  148. t.equal(messages.length, 1);
  149. done();
  150. });
  151. };
  152. var consumeNone = function() {
  153. // With no new messages, the consume should wait whole timeout
  154. var start = Date.now();
  155. // Set the timeout to 2000ms to see that it actually waits the whole time
  156. // (Needs to be higher than fetch.max.wait.ms which is 1000 here
  157. // to ensure we don't only wait that long)
  158. consumer.setDefaultConsumeTimeout(2000);
  159. consumer.consume(100000, function(err, messages) {
  160. t.ifError(err);
  161. t.ok(Date.now() - start >= 1998);
  162. t.equal(messages.length, 0);
  163. // Produce one message to cause EOF with waiting message when consuming all
  164. producer.produce(topic, null, buffer, null);
  165. consumeAll();
  166. });
  167. };
  168. consumeNone();
  169. });
  170. });
  171. it('should emit partition.eof event when reaching end of partition', function(done) {
  172. this.timeout(8000);
  173. crypto.randomBytes(4096, function(ex, buffer) {
  174. producer.setPollInterval(10);
  175. producer.once('delivery-report', function(err, report) {
  176. t.ifError(err);
  177. });
  178. consumer.subscribe([topic]);
  179. var events = [];
  180. consumer.once('data', function(msg) {
  181. events.push("data");
  182. });
  183. consumer.once('partition.eof', function(eof) {
  184. events.push("partition.eof");
  185. });
  186. setTimeout(function() {
  187. producer.produce(topic, null, buffer, null);
  188. }, 500)
  189. consumer.setDefaultConsumeTimeout(2000);
  190. consumer.consume(1000, function(err, messages) {
  191. t.ifError(err);
  192. t.equal(messages.length, 1);
  193. t.deepStrictEqual(events, ["data", "partition.eof"]);
  194. done();
  195. });
  196. });
  197. });
  198. it('should emit partition.eof when already at end of partition', function(done) {
  199. this.timeout(8000);
  200. crypto.randomBytes(4096, function(ex, buffer) {
  201. producer.setPollInterval(10);
  202. producer.once('delivery-report', function(err, report) {
  203. t.ifError(err);
  204. });
  205. consumer.subscribe([topic]);
  206. var events = [];
  207. consumer.once('data', function(msg) {
  208. events.push("data");
  209. });
  210. consumer.on('partition.eof', function(eof) {
  211. events.push("partition.eof");
  212. });
  213. setTimeout(function() {
  214. producer.produce(topic, null, buffer, null);
  215. }, 2000)
  216. consumer.setDefaultConsumeTimeout(3000);
  217. consumer.consume(1000, function(err, messages) {
  218. t.ifError(err);
  219. t.equal(messages.length, 1);
  220. t.deepStrictEqual(events, ["partition.eof", "data", "partition.eof"]);
  221. done();
  222. });
  223. });
  224. });
  225. it('should be able to produce and consume messages: consumeLoop', function(done) {
  226. var key = 'key';
  227. this.timeout(5000);
  228. crypto.randomBytes(4096, function(ex, buffer) {
  229. producer.setPollInterval(10);
  230. producer.once('delivery-report', function(err, report) {
  231. if (!err) {
  232. t.equal(topic, report.topic, 'invalid delivery-report topic');
  233. t.equal(key, report.key, 'invalid delivery-report key');
  234. t.ok(report.offset >= 0, 'invalid delivery-report offset');
  235. }
  236. });
  237. consumer.on('data', function(message) {
  238. t.equal(buffer.toString(), message.value.toString(), 'invalid message value');
  239. t.equal(key, message.key, 'invalid message key');
  240. t.equal(topic, message.topic, 'invalid message topic');
  241. t.ok(message.offset >= 0, 'invalid message offset');
  242. done();
  243. });
  244. consumer.subscribe([topic]);
  245. consumer.consume();
  246. setTimeout(function() {
  247. producer.produce(topic, null, buffer, key);
  248. }, 2000);
  249. });
  250. });
  251. it('should emit \'partition.eof\' events in consumeLoop', function(done) {
  252. this.timeout(7000);
  253. crypto.randomBytes(4096, function(ex, buffer) {
  254. producer.setPollInterval(10);
  255. producer.once('delivery-report', function(err, report) {
  256. t.ifError(err);
  257. });
  258. var events = [];
  259. var offsets = [];
  260. consumer.on('data', function(message) {
  261. t.equal(message.topic, topic);
  262. t.equal(message.partition, 0);
  263. offsets.push(message.offset);
  264. events.push('data');
  265. });
  266. consumer.on('partition.eof', function(eofEvent) {
  267. t.equal(eofEvent.topic, topic);
  268. t.equal(eofEvent.partition, 0);
  269. offsets.push(eofEvent.offset);
  270. events.push('partition.eof');
  271. });
  272. consumer.subscribe([topic]);
  273. consumer.consume();
  274. setTimeout(function() {
  275. producer.produce(topic, null, buffer);
  276. }, 2000);
  277. setTimeout(function() {
  278. producer.produce(topic, null, buffer);
  279. }, 4000);
  280. setTimeout(function() {
  281. t.deepStrictEqual(events, ['partition.eof', 'data', 'partition.eof', 'data', 'partition.eof']);
  282. var startOffset = offsets[0];
  283. t.deepStrictEqual(offsets,
  284. [ startOffset,
  285. startOffset,
  286. startOffset + 1,
  287. startOffset + 1,
  288. startOffset + 2 ]);
  289. done();
  290. }, 6000);
  291. });
  292. });
  293. it('should emit [warning] event on UNKNOWN_TOPIC_OR_PART error: consumeLoop', function(done) {
  294. consumer.on('warning', function (err) {
  295. if (err.code === Kafka.CODES.ERRORS.ERR_UNKNOWN_TOPIC_OR_PART) {
  296. consumer.disconnect(function() {
  297. done();
  298. });
  299. } else {
  300. t.ifError(err);
  301. }
  302. });
  303. consumer.subscribe(['non_existing_topic']);
  304. consumer.consume();
  305. });
  306. it('should be able to produce and consume messages with one header value as string: consumeLoop', function(done) {
  307. var headers = [
  308. { key: "value" }
  309. ];
  310. this.timeout(5000);
  311. run_headers_test(done, headers);
  312. });
  313. it('should be able to produce and consume messages with one header value as buffer: consumeLoop', function(done) {
  314. var headers = [
  315. { key: Buffer.from('value') }
  316. ];
  317. this.timeout(5000);
  318. run_headers_test(done, headers);
  319. });
  320. it('should be able to produce and consume messages with one header value as int: consumeLoop', function(done) {
  321. var headers = [
  322. { key: 10 }
  323. ];
  324. this.timeout(5000);
  325. run_headers_test(done, headers);
  326. });
  327. it('should be able to produce and consume messages with one header value as float: consumeLoop', function(done) {
  328. var headers = [
  329. { key: 1.11 }
  330. ];
  331. this.timeout(5000);
  332. run_headers_test(done, headers);
  333. });
  334. it('should be able to produce and consume messages with multiple headers value as buffer: consumeLoop', function(done) {
  335. var headers = [
  336. { key1: Buffer.from('value1') },
  337. { key2: Buffer.from('value2') },
  338. { key3: Buffer.from('value3') },
  339. { key4: Buffer.from('value4') },
  340. ];
  341. this.timeout(5000);
  342. run_headers_test(done, headers);
  343. });
  344. it('should be able to produce and consume messages with multiple headers value as string: consumeLoop', function(done) {
  345. var headers = [
  346. { key1: 'value1' },
  347. { key2: 'value2' },
  348. { key3: 'value3' },
  349. { key4: 'value4' },
  350. ];
  351. this.timeout(5000);
  352. run_headers_test(done, headers);
  353. });
  354. it('should be able to produce and consume messages with multiple headers with mixed values: consumeLoop', function(done) {
  355. var headers = [
  356. { key1: 'value1' },
  357. { key2: Buffer.from('value2') },
  358. { key3: 100 },
  359. { key4: 10.1 },
  360. ];
  361. this.timeout(5000);
  362. run_headers_test(done, headers);
  363. });
  364. it('should be able to produce and consume messages: empty buffer key and empty value', function(done) {
  365. this.timeout(20000);
  366. var emptyString = '';
  367. var key = Buffer.from(emptyString);
  368. var value = Buffer.from('');
  369. producer.setPollInterval(10);
  370. consumer.once('data', function(message) {
  371. t.notEqual(message.value, null, 'message should not be null');
  372. t.equal(value.toString(), message.value.toString(), 'invalid message value');
  373. t.equal(emptyString, message.key, 'invalid message key');
  374. done();
  375. });
  376. consumer.subscribe([topic]);
  377. consumer.consume();
  378. setTimeout(function() {
  379. producer.produce(topic, null, value, key);
  380. }, 2000);
  381. });
  382. it('should be able to produce and consume messages: empty key and empty value', function(done) {
  383. this.timeout(20000);
  384. var key = '';
  385. var value = Buffer.from('');
  386. producer.setPollInterval(10);
  387. consumer.once('data', function(message) {
  388. t.notEqual(message.value, null, 'message should not be null');
  389. t.equal(value.toString(), message.value.toString(), 'invalid message value');
  390. t.equal(key, message.key, 'invalid message key');
  391. done();
  392. });
  393. consumer.subscribe([topic]);
  394. consumer.consume();
  395. setTimeout(function() {
  396. producer.produce(topic, null, value, key);
  397. }, 2000);
  398. });
  399. it('should be able to produce and consume messages: null key and null value', function(done) {
  400. this.timeout(20000);
  401. var key = null;
  402. var value = null;
  403. producer.setPollInterval(10);
  404. consumer.once('data', function(message) {
  405. t.equal(value, message.value, 'invalid message value');
  406. t.equal(key, message.key, 'invalid message key');
  407. done();
  408. });
  409. consumer.subscribe([topic]);
  410. consumer.consume();
  411. setTimeout(function() {
  412. producer.produce(topic, null, value, key);
  413. }, 2000);
  414. });
  415. describe('Exceptional case - offset_commit_cb true', function() {
  416. var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');
  417. var consumerOpts = {
  418. 'metadata.broker.list': kafkaBrokerList,
  419. 'group.id': grp,
  420. 'fetch.wait.max.ms': 1000,
  421. 'session.timeout.ms': 10000,
  422. 'enable.auto.commit': false,
  423. 'debug': 'all',
  424. 'offset_commit_cb': true
  425. };
  426. beforeEach(function(done) {
  427. consumer = new Kafka.KafkaConsumer(consumerOpts, {
  428. 'auto.offset.reset': 'largest',
  429. });
  430. consumer.connect({}, function(err, d) {
  431. t.ifError(err);
  432. t.equal(typeof d, 'object', 'metadata should be returned');
  433. done();
  434. });
  435. eventListener(consumer);
  436. });
  437. afterEach(function(done) {
  438. this.timeout(10000);
  439. consumer.disconnect(function() {
  440. done();
  441. });
  442. });
  443. it('should async commit after consuming', function(done) {
  444. this.timeout(25000);
  445. var key = '';
  446. var value = Buffer.from('');
  447. var lastOffset = null;
  448. consumer.once('data', function(message) {
  449. lastOffset = message.offset;
  450. // disconnect in offset commit callback
  451. consumer.on('offset.commit', function(err, offsets) {
  452. t.ifError(err);
  453. t.equal(typeof offsets, 'object', 'offsets should be returned');
  454. consumer.disconnect(function() {
  455. // reconnect in disconnect callback
  456. consumer.connect({}, function(err, d) {
  457. t.ifError(err);
  458. t.equal(typeof d, 'object', 'metadata should be returned');
  459. // check that no new messages arrive, as the offset was committed
  460. consumer.once('data', function(message) {
  461. done(new Error('Should never be here'));
  462. });
  463. consumer.subscribe([topic]);
  464. consumer.consume();
  465. setTimeout(function() {
  466. done();
  467. }, 5000);
  468. });
  469. });
  470. });
  471. consumer.commitMessage(message);
  472. });
  473. consumer.subscribe([topic]);
  474. consumer.consume();
  475. setTimeout(function() {
  476. producer.produce(topic, null, value, key);
  477. }, 2000);
  478. });
  479. });
  480. describe('Exceptional case - offset_commit_cb function', function() {
  481. var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');
  482. afterEach(function(done) {
  483. this.timeout(10000);
  484. consumer.disconnect(function() {
  485. done();
  486. });
  487. });
  488. it('should callback offset_commit_cb after commit', function(done) {
  489. this.timeout(20000);
  490. var consumerOpts = {
  491. 'metadata.broker.list': kafkaBrokerList,
  492. 'group.id': grp,
  493. 'fetch.wait.max.ms': 1000,
  494. 'session.timeout.ms': 10000,
  495. 'enable.auto.commit': false,
  496. 'debug': 'all',
  497. 'offset_commit_cb': function(offset) {
  498. done();
  499. }
  500. };
  501. consumer = new Kafka.KafkaConsumer(consumerOpts, {
  502. 'auto.offset.reset': 'largest',
  503. });
  504. eventListener(consumer);
  505. consumer.connect({}, function(err, d) {
  506. t.ifError(err);
  507. t.equal(typeof d, 'object', 'metadata should be returned');
  508. consumer.subscribe([topic]);
  509. consumer.consume();
  510. setTimeout(function() {
  511. producer.produce(topic, null, Buffer.from(''), '');
  512. }, 2000);
  513. });
  514. consumer.once('data', function(message) {
  515. consumer.commitMessage(message);
  516. });
  517. });
  518. });
  519. function assert_headers_match(expectedHeaders, messageHeaders) {
  520. t.equal(expectedHeaders.length, messageHeaders.length, 'Headers length does not match expected length');
  521. for (var i = 0; i < expectedHeaders.length; i++) {
  522. var expectedKey = Object.keys(expectedHeaders[i])[0];
  523. var messageKey = Object.keys(messageHeaders[i]);
  524. t.equal(messageKey.length, 1, 'Expected only one Header key');
  525. t.equal(expectedKey, messageKey[0], 'Expected key does not match message key');
  526. var expectedValue = Buffer.isBuffer(expectedHeaders[i][expectedKey]) ?
  527. expectedHeaders[i][expectedKey].toString() :
  528. expectedHeaders[i][expectedKey];
  529. var actualValue = messageHeaders[i][expectedKey].toString();
  530. t.equal(expectedValue, actualValue, 'invalid message header');
  531. }
  532. }
  533. function run_headers_test(done, headers) {
  534. var key = 'key';
  535. crypto.randomBytes(4096, function(ex, buffer) {
  536. producer.setPollInterval(10);
  537. producer.once('delivery-report', function(err, report) {
  538. if (!err) {
  539. t.equal(topic, report.topic, 'invalid delivery-report topic');
  540. t.equal(key, report.key, 'invalid delivery-report key');
  541. t.ok(report.offset >= 0, 'invalid delivery-report offset');
  542. }
  543. });
  544. consumer.on('data', function(message) {
  545. t.equal(buffer.toString(), message.value.toString(), 'invalid message value');
  546. t.equal(key, message.key, 'invalid message key');
  547. t.equal(topic, message.topic, 'invalid message topic');
  548. t.ok(message.offset >= 0, 'invalid message offset');
  549. assert_headers_match(headers, message.headers);
  550. done();
  551. });
  552. consumer.subscribe([topic]);
  553. consumer.consume();
  554. setTimeout(function() {
  555. var timestamp = new Date().getTime();
  556. producer.produce(topic, null, buffer, key, timestamp, "", headers);
  557. }, 2000);
  558. });
  559. }
  560. });