You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

100 lines
2.2 KiB

  1. /*
  2. * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
  3. *
  4. * Copyright (c) 2016 Blizzard Entertainment
  5. *
  6. * This software may be modified and distributed under the terms
  7. * of the MIT license. See the LICENSE.txt file for details.
  8. */
  9. var Writable = require('stream').Writable;
  10. var Kafka = require('../');
  11. var count = 0;
  12. var total = 0;
  13. var store = [];
  14. var host = process.argv[2] || 'localhost:9092';
  15. var topic = process.argv[3] || 'test';
  16. var stream = Kafka.createReadStream({
  17. 'metadata.broker.list': host,
  18. 'group.id': 'node-rdkafka-benchs',
  19. 'fetch.wait.max.ms': 100,
  20. 'fetch.message.max.bytes': 1024 * 1024,
  21. 'enable.auto.commit': false
  22. // paused: true,
  23. }, {
  24. 'auto.offset.reset': 'earliest'
  25. }, {
  26. fetchSize: 16,
  27. topics: [topic]
  28. });
  29. // Track how many messages we see per second
  30. var interval;
  31. var isShuttingDown = false;
  32. stream
  33. .on('error', function(err) {
  34. console.log('Shutting down due to error');
  35. console.log(err.stack);
  36. shutdown();
  37. })
  38. .once('data', function(d) {
  39. interval = setInterval(function() {
  40. if (isShuttingDown) {
  41. clearInterval(interval);
  42. }
  43. console.log('%d messages per second', count);
  44. if (count > 0) {
  45. // Don't store ones when we didn't get data i guess?
  46. store.push(count);
  47. // setTimeout(shutdown, 500);
  48. }
  49. count = 0;
  50. }, 1000).unref();
  51. })
  52. .on('end', function() {
  53. // Can be called more than once without issue because of guard var
  54. console.log('Shutting down due to stream end');
  55. shutdown();
  56. })
  57. .pipe(new Writable({
  58. objectMode: true,
  59. write: function(message, encoding, cb) {
  60. count += 1;
  61. total += 1;
  62. setImmediate(cb);
  63. }
  64. }));
  65. process.once('SIGTERM', shutdown);
  66. process.once('SIGINT', shutdown);
  67. process.once('SIGHUP', shutdown);
  68. function shutdown() {
  69. if (isShuttingDown) {
  70. return;
  71. }
  72. clearInterval(interval);
  73. isShuttingDown = true;
  74. if (store.length > 0) {
  75. var calc = 0;
  76. for (var x in store) {
  77. calc += store[x];
  78. }
  79. var mps = parseFloat(calc * 1.0/store.length);
  80. console.log('%d messages per second on average', mps);
  81. }
  82. // Destroy the stream
  83. stream.destroy();
  84. stream.once('end', function() {
  85. console.log('total: %d', total);
  86. });
  87. }