You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

125 lines
2.8 KiB

  1. /*
  2. * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
  3. *
  4. * Copyright (c) 2016 Blizzard Entertainment
  5. *
  6. * This software may be modified and distributed under the terms
  7. * of the MIT license. See the LICENSE.txt file for details.
  8. */
  9. var Kafka = require('../');
  10. var crypto = require('crypto');
  11. var count = 0;
  12. var total = 0;
  13. var totalComplete = 0;
  14. var verifiedComplete = 0;
  15. var errors = 0;
  16. var store = [];
  17. var started;
  18. var done = false;
  19. var host = process.argv[2] || '127.0.0.1:9092';
  20. var topicName = process.argv[3] || 'test';
  21. var compression = process.argv[4] || 'gzip';
  22. var MAX = process.argv[5] || 10000000;
  23. var producer = new Kafka.Producer({
  24. 'metadata.broker.list': host,
  25. 'group.id': 'node-rdkafka-bench',
  26. 'compression.codec': compression,
  27. 'retry.backoff.ms': 200,
  28. 'message.send.max.retries': 10,
  29. 'socket.keepalive.enable': true,
  30. 'queue.buffering.max.messages': 100000,
  31. 'queue.buffering.max.ms': 1000,
  32. 'batch.num.messages': 1000
  33. });
  34. // Track how many messages we see per second
  35. var interval;
  36. var ok = true;
  37. function getTimer() {
  38. if (!interval) {
  39. interval = setTimeout(function() {
  40. interval = false;
  41. if (!done) {
  42. console.log('%d messages per sent second', count);
  43. store.push(count);
  44. count = 0;
  45. getTimer();
  46. } else {
  47. console.log('%d messages remaining sent in last batch <1000ms', count);
  48. }
  49. }, 1000);
  50. }
  51. return interval;
  52. }
  53. var t;
  54. crypto.randomBytes(4096, function(ex, buffer) {
  55. producer.connect()
  56. .on('ready', function() {
  57. getTimer();
  58. started = new Date().getTime();
  59. var sendMessage = function() {
  60. try {
  61. var errorCode = producer.produce(topicName, null, buffer, null);
  62. verifiedComplete += 1;
  63. } catch (e) {
  64. console.error(e);
  65. errors++;
  66. }
  67. count += 1;
  68. totalComplete += 1;
  69. if (totalComplete === MAX) {
  70. shutdown();
  71. }
  72. if (total < MAX) {
  73. total += 1;
  74. // This is 100% sync so we need to setImmediate to give it time
  75. // to breathe.
  76. setImmediate(sendMessage);
  77. }
  78. };
  79. sendMessage();
  80. })
  81. .on('event.error', function(err) {
  82. console.error(err);
  83. process.exit(1);
  84. })
  85. .on('disconnected', shutdown);
  86. });
  87. function shutdown(e) {
  88. done = true;
  89. clearInterval(interval);
  90. var killTimer = setTimeout(function() {
  91. process.exit();
  92. }, 5000);
  93. producer.disconnect(function() {
  94. clearTimeout(killTimer);
  95. var ended = new Date().getTime();
  96. var elapsed = ended - started;
  97. // console.log('Ended %s', ended);
  98. console.log('total: %d messages over %d ms', total, elapsed);
  99. console.log('%d messages / second', parseInt(total / (elapsed / 1000)));
  100. process.exit();
  101. });
  102. }