You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

117 lines
2.4 KiB

  1. /*
  2. * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
  3. *
  4. * Copyright (c) 2016 Blizzard Entertainment
  5. *
  6. * This software may be modified and distributed under the terms
  7. * of the MIT license. See the LICENSE.txt file for details.
  8. */
  9. var Kafka = require('../');
  10. var crypto = require('crypto');
  11. var count = 0;
  12. var total = 0;
  13. var totalComplete = 0;
  14. var store = [];
  15. var host = process.argv[2] || '127.0.0.1:9092';
  16. var topicName = process.argv[3] || 'test';
  17. var compression = process.argv[4] || 'gzip';
  18. var MAX = process.argv[5] || 1000000;
  19. var stream = Kafka.Producer.createWriteStream({
  20. 'metadata.broker.list': host,
  21. 'group.id': 'node-rdkafka-bench',
  22. 'compression.codec': compression,
  23. 'retry.backoff.ms': 200,
  24. 'message.send.max.retries': 10,
  25. 'socket.keepalive.enable': true,
  26. 'queue.buffering.max.messages': 100000,
  27. 'queue.buffering.max.ms': 1000,
  28. 'batch.num.messages': 1000,
  29. }, {}, {
  30. topic: topicName,
  31. pollInterval: 20
  32. });
  33. stream.on('error', function(e) {
  34. console.log(e);
  35. process.exit(1);
  36. });
  37. // Track how many messages we see per second
  38. var interval;
  39. var done = false;
  40. function log() {
  41. console.log('%d messages per sent second', count);
  42. store.push(count);
  43. count = 0;
  44. }
  45. crypto.randomBytes(4096, function(ex, buffer) {
  46. var x = function(e) {
  47. if (e) {
  48. console.error(e);
  49. }
  50. count += 1;
  51. totalComplete += 1;
  52. if (totalComplete >= MAX && !done) {
  53. done = true;
  54. clearInterval(interval);
  55. setTimeout(shutdown, 5000);
  56. }
  57. };
  58. function write() {
  59. if (!stream.write(buffer, 'base64', x)) {
  60. return stream.once('drain', write);
  61. } else {
  62. total++;
  63. }
  64. if (total < MAX) {
  65. // we are not done
  66. setImmediate(write);
  67. }
  68. }
  69. write();
  70. interval = setInterval(log, 1000);
  71. stream.on('error', function(err) {
  72. console.log(err);
  73. });
  74. // stream.on('end', shutdown);
  75. });
  76. process.once('SIGTERM', shutdown);
  77. process.once('SIGINT', shutdown);
  78. process.once('SIGHUP', shutdown);
  79. function shutdown() {
  80. if (store.length > 0) {
  81. var calc = 0;
  82. for (var x in store) {
  83. calc += store[x];
  84. }
  85. var mps = parseFloat(calc * 1.0/store.length);
  86. console.log('%d messages per second on average', mps);
  87. console.log('%d messages total', total);
  88. }
  89. clearInterval(interval);
  90. stream.end();
  91. stream.on('close', function() {
  92. console.log('total: %d', total);
  93. });
  94. }