|
|
- /*
- * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
- *
- * Copyright (c) 2016 Blizzard Entertainment
- *
- * This software may be modified and distributed under the terms
- * of the MIT license. See the LICENSE.txt file for details.
- */
-
- var Kafka = require('../');
- var crypto = require('crypto');
- var count = 0;
- var total = 0;
- var totalComplete = 0;
- var verifiedComplete = 0;
- var errors = 0;
- var store = [];
- var started;
- var done = false;
- var host = process.argv[2] || '127.0.0.1:9092';
- var topicName = process.argv[3] || 'test';
- var compression = process.argv[4] || 'gzip';
- var MAX = process.argv[5] || 10000000;
-
- var producer = new Kafka.Producer({
- 'metadata.broker.list': host,
- 'group.id': 'node-rdkafka-bench',
- 'compression.codec': compression,
- 'retry.backoff.ms': 200,
- 'message.send.max.retries': 10,
- 'socket.keepalive.enable': true,
- 'queue.buffering.max.messages': 100000,
- 'queue.buffering.max.ms': 1000,
- 'batch.num.messages': 1000
- });
-
- // Track how many messages we see per second
- var interval;
- var ok = true;
-
- function getTimer() {
- if (!interval) {
- interval = setTimeout(function() {
- interval = false;
- if (!done) {
- console.log('%d messages per sent second', count);
- store.push(count);
- count = 0;
- getTimer();
-
- } else {
- console.log('%d messages remaining sent in last batch <1000ms', count);
- }
- }, 1000);
- }
-
- return interval;
- }
-
- var t;
-
- crypto.randomBytes(4096, function(ex, buffer) {
-
- producer.connect()
- .on('ready', function() {
- getTimer();
-
- started = new Date().getTime();
-
- var sendMessage = function() {
- try {
- var errorCode = producer.produce(topicName, null, buffer, null);
- verifiedComplete += 1;
- } catch (e) {
- console.error(e);
- errors++;
- }
-
- count += 1;
- totalComplete += 1;
- if (totalComplete === MAX) {
- shutdown();
- }
- if (total < MAX) {
- total += 1;
-
- // This is 100% sync so we need to setImmediate to give it time
- // to breathe.
- setImmediate(sendMessage);
- }
- };
-
- sendMessage();
-
- })
- .on('event.error', function(err) {
- console.error(err);
- process.exit(1);
- })
- .on('disconnected', shutdown);
-
- });
-
- function shutdown(e) {
- done = true;
-
- clearInterval(interval);
-
- var killTimer = setTimeout(function() {
- process.exit();
- }, 5000);
-
- producer.disconnect(function() {
- clearTimeout(killTimer);
- var ended = new Date().getTime();
- var elapsed = ended - started;
-
- // console.log('Ended %s', ended);
- console.log('total: %d messages over %d ms', total, elapsed);
-
- console.log('%d messages / second', parseInt(total / (elapsed / 1000)));
- process.exit();
- });
-
- }
|