You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

77 lines
1.6 KiB

  1. /*
  2. * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
  3. *
  4. * Copyright (c) 2016 Blizzard Entertainment
  5. *
  6. * This software may be modified and distributed under the terms
  7. * of the MIT license. See the LICENSE.txt file for details.
  8. */
  9. var Kafka = require('../');
  10. var count = 0;
  11. var total = 0;
  12. var store = [];
  13. var host = process.argv[2] || 'localhost:9092';
  14. var topic = process.argv[3] || 'test';
  15. var consumer = new Kafka.KafkaConsumer({
  16. 'metadata.broker.list': host,
  17. 'group.id': 'node-rdkafka-bench',
  18. 'fetch.wait.max.ms': 100,
  19. 'fetch.message.max.bytes': 1024 * 1024,
  20. 'enable.auto.commit': false
  21. // paused: true,
  22. }, {
  23. 'auto.offset.reset': 'earliest'
  24. });
  25. var interval;
  26. consumer.connect()
  27. .once('ready', function() {
  28. consumer.subscribe([topic]);
  29. consumer.consume();
  30. })
  31. .once('data', function() {
  32. interval = setInterval(function() {
  33. console.log('%d messages per second', count);
  34. if (count > 0) {
  35. store.push(count);
  36. }
  37. count = 0;
  38. }, 1000);
  39. })
  40. .on('data', function(message) {
  41. count += 1;
  42. total += 1;
  43. });
  44. process.once('SIGTERM', shutdown);
  45. process.once('SIGINT', shutdown);
  46. process.once('SIGHUP', shutdown);
  47. function shutdown() {
  48. clearInterval(interval);
  49. if (store.length > 0) {
  50. var calc = 0;
  51. for (var x in store) {
  52. calc += store[x];
  53. }
  54. var mps = parseFloat(calc * 1.0/store.length);
  55. console.log('%d messages per second on average', mps);
  56. }
  57. var killTimer = setTimeout(function() {
  58. process.exit();
  59. }, 5000);
  60. consumer.disconnect(function() {
  61. clearTimeout(killTimer);
  62. process.exit();
  63. });
  64. }