You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

133 lines
3.3 KiB

  1. /*
  2. * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
  3. *
  4. * Copyright (c) 2016 Blizzard Entertainment
  5. *
  6. * This software may be modified and distributed under the terms
  7. * of the MIT license. See the LICENSE.txt file for details.
  8. */
  9. var crypto = require('crypto');
  10. var t = require('assert');
  11. var Kafka = require('../');
  12. var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
  13. var eventListener = require('./listener');
  14. describe('Consumer group/Producer', function() {
  15. var producer;
  16. var consumer;
  17. var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');
  18. var config = {
  19. 'metadata.broker.list': kafkaBrokerList,
  20. 'group.id': grp,
  21. 'fetch.wait.max.ms': 1000,
  22. 'session.timeout.ms': 10000,
  23. 'enable.auto.commit': false,
  24. 'debug': 'all'
  25. };
  26. beforeEach(function(done) {
  27. producer = new Kafka.Producer({
  28. 'client.id': 'kafka-mocha',
  29. 'metadata.broker.list': kafkaBrokerList,
  30. 'fetch.wait.max.ms': 1,
  31. 'debug': 'all',
  32. 'dr_cb': true
  33. });
  34. producer.connect({}, function(err, d) {
  35. t.ifError(err);
  36. t.equal(typeof d, 'object', 'metadata should be returned');
  37. done();
  38. });
  39. eventListener(producer);
  40. });
  41. beforeEach(function(done) {
  42. consumer = new Kafka.KafkaConsumer(config, {
  43. 'auto.offset.reset': 'largest'
  44. });
  45. consumer.connect({}, function(err, d) {
  46. t.ifError(err);
  47. t.equal(typeof d, 'object', 'metadata should be returned');
  48. done();
  49. });
  50. eventListener(consumer);
  51. });
  52. afterEach(function(done) {
  53. producer.disconnect(function() {
  54. done();
  55. });
  56. });
  57. it('should be able to commit, read committed and restart from the committed offset', function(done) {
  58. this.timeout(30000);
  59. var topic = 'test';
  60. var key = 'key';
  61. var payload = Buffer.from('value');
  62. var count = 0;
  63. var offsets = {
  64. 'first': true
  65. };
  66. var tt = setInterval(function() {
  67. try {
  68. producer.produce(topic, null, payload, key);
  69. } catch (e) {
  70. clearInterval(tt);
  71. }
  72. }, 100);
  73. consumer.on('disconnected', function() {
  74. var consumer2 = new Kafka.KafkaConsumer(config, {
  75. 'auto.offset.reset': 'largest'
  76. });
  77. consumer2.on('data', function(message) {
  78. if (offsets.first) {
  79. offsets.first = false;
  80. t.deepStrictEqual(offsets.committed, message.offset, 'Offset read by consumer 2 incorrect');
  81. clearInterval(tt);
  82. consumer2.unsubscribe();
  83. consumer2.disconnect(function() {
  84. done();
  85. });
  86. }
  87. });
  88. consumer2.on('ready', function() {
  89. consumer2.subscribe([topic]);
  90. consumer2.consume();
  91. });
  92. consumer2.connect();
  93. });
  94. consumer.on('data', function(message) {
  95. count++;
  96. if (count === 3) {
  97. consumer.commitMessageSync(message);
  98. // test consumer.committed( ) API
  99. consumer.committed(null, 5000, function(err, topicPartitions) {
  100. t.ifError(err);
  101. t.deepStrictEqual(topicPartitions.length, 1);
  102. t.deepStrictEqual(topicPartitions[0].offset, message.offset + 1, 'Offset read by consumer 1 incorrect');
  103. offsets.committed = message.offset + 1;
  104. consumer.unsubscribe();
  105. consumer.disconnect();
  106. });
  107. }
  108. });
  109. consumer.subscribe([topic]);
  110. consumer.consume();
  111. });
  112. });