/*
|
|
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
|
|
*
|
|
* Copyright (c) 2016 Blizzard Entertainment
|
|
*
|
|
* This software may be modified and distributed under the terms
|
|
* of the MIT license. See the LICENSE.txt file for details.
|
|
*/
|
|
|
|
'use strict';
|
|
|
|
module.exports = ProducerStream;
|
|
|
|
var Writable = require('stream').Writable;
|
|
var util = require('util');
|
|
var ErrorCode = require('./error').codes;
|
|
|
|
util.inherits(ProducerStream, Writable);
|
|
|
|
/**
|
|
* Writable stream integrating with the Kafka Producer.
|
|
*
|
|
* This class is used to write data to Kafka in a streaming way. It takes
|
|
* buffers of data and puts them into the appropriate Kafka topic. If you need
|
|
* finer control over partitions or keys, this is probably not the class for
|
|
* you. In that situation just use the Producer itself.
|
|
*
|
|
* The stream detects if Kafka is already connected. You can safely begin
|
|
* writing right away.
|
|
*
|
|
* This stream does not operate in Object mode and can only be given buffers.
|
|
*
|
|
* @param {Producer} producer - The Kafka Producer object.
|
|
* @param {array} topics - Array of topics
|
|
* @param {object} options - Topic configuration.
|
|
* @constructor
|
|
* @extends stream.Writable
|
|
*/
|
|
function ProducerStream(producer, options) {
|
|
if (!(this instanceof ProducerStream)) {
|
|
return new ProducerStream(producer, options);
|
|
}
|
|
|
|
if (options === undefined) {
|
|
options = {};
|
|
} else if (typeof options === 'string') {
|
|
options = { encoding: options };
|
|
} else if (options === null || typeof options !== 'object') {
|
|
throw new TypeError('"streamOptions" argument must be a string or an object');
|
|
}
|
|
|
|
if (!options.objectMode && !options.topic) {
|
|
throw new TypeError('ProducerStreams not using objectMode must provide a topic to produce to.');
|
|
}
|
|
|
|
if (options.objectMode !== true) {
|
|
this._write = this._write_buffer;
|
|
} else {
|
|
this._write = this._write_message;
|
|
}
|
|
|
|
Writable.call(this, options);
|
|
|
|
this.producer = producer;
|
|
this.topicName = options.topic;
|
|
|
|
this.autoClose = options.autoClose === undefined ? true : !!options.autoClose;
|
|
this.connectOptions = options.connectOptions || {};
|
|
|
|
this.producer.setPollInterval(options.pollInterval || 1000);
|
|
|
|
if (options.encoding) {
|
|
this.setDefaultEncoding(options.encoding);
|
|
}
|
|
|
|
// Connect to the producer. Unless we are already connected
|
|
if (!this.producer.isConnected()) {
|
|
this.connect(this.connectOptions);
|
|
}
|
|
|
|
var self = this;
|
|
|
|
this.once('finish', function() {
|
|
if (this.autoClose) {
|
|
this.close();
|
|
}
|
|
});
|
|
|
|
}
|
|
|
|
ProducerStream.prototype.connect = function(options) {
|
|
this.producer.connect(options, function(err, data) {
|
|
if (err) {
|
|
this.emit('error', err);
|
|
return;
|
|
}
|
|
|
|
}.bind(this));
|
|
};
|
|
|
|
/**
|
|
* Internal stream write method for ProducerStream when writing buffers.
|
|
*
|
|
* This method should never be called externally. It has some recursion to
|
|
* handle cases where the producer is not yet connected.
|
|
*
|
|
* @param {buffer} chunk - Chunk to write.
|
|
* @param {string} encoding - Encoding for the buffer
|
|
* @param {Function} cb - Callback to call when the stream is done processing
|
|
* the data.
|
|
* @private
|
|
* @see https://github.com/nodejs/node/blob/master/lib/fs.js#L1901
|
|
*/
|
|
ProducerStream.prototype._write_buffer = function(data, encoding, cb) {
|
|
if (!(data instanceof Buffer)) {
|
|
this.emit('error', new Error('Invalid data. Can only produce buffers'));
|
|
return;
|
|
}
|
|
|
|
var self = this;
|
|
|
|
if (!this.producer.isConnected()) {
|
|
this.producer.once('ready', function() {
|
|
self._write(data, encoding, cb);
|
|
});
|
|
return;
|
|
}
|
|
|
|
try {
|
|
this.producer.produce(self.topicName, null, data, null);
|
|
setImmediate(cb);
|
|
} catch (e) {
|
|
if (ErrorCode.ERR__QUEUE_FULL === e.code) {
|
|
// Poll for good measure
|
|
self.producer.poll();
|
|
|
|
// Just delay this thing a bit and pass the params
|
|
// backpressure will get exerted this way.
|
|
setTimeout(function() {
|
|
self._write(data, encoding, cb);
|
|
}, 500);
|
|
} else {
|
|
if (self.autoClose) {
|
|
self.close();
|
|
}
|
|
setImmediate(function() {
|
|
cb(e);
|
|
});
|
|
}
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Internal stream write method for ProducerStream when writing objects.
|
|
*
|
|
* This method should never be called externally. It has some recursion to
|
|
* handle cases where the producer is not yet connected.
|
|
*
|
|
* @param {object} message - Message to write.
|
|
* @param {string} encoding - Encoding for the buffer
|
|
* @param {Function} cb - Callback to call when the stream is done processing
|
|
* the data.
|
|
* @private
|
|
* @see https://github.com/nodejs/node/blob/master/lib/fs.js#L1901
|
|
*/
|
|
ProducerStream.prototype._write_message = function(message, encoding, cb) {
|
|
var self = this;
|
|
|
|
if (!this.producer.isConnected()) {
|
|
this.producer.once('ready', function() {
|
|
self._write(message, encoding, cb);
|
|
});
|
|
return;
|
|
}
|
|
|
|
try {
|
|
this.producer.produce(message.topic, message.partition, message.value, message.key, message.timestamp, message.opaque, message.headers);
|
|
setImmediate(cb);
|
|
} catch (e) {
|
|
if (ErrorCode.ERR__QUEUE_FULL === e.code) {
|
|
// Poll for good measure
|
|
self.producer.poll();
|
|
|
|
// Just delay this thing a bit and pass the params
|
|
// backpressure will get exerted this way.
|
|
setTimeout(function() {
|
|
self._write(message, encoding, cb);
|
|
}, 500);
|
|
} else {
|
|
if (self.autoClose) {
|
|
self.close();
|
|
}
|
|
setImmediate(function() {
|
|
cb(e);
|
|
});
|
|
}
|
|
}
|
|
};
|
|
|
|
function writev(producer, topic, chunks, cb) {
|
|
|
|
// @todo maybe a produce batch method?
|
|
var doneCount = 0;
|
|
var err = null;
|
|
var chunk = null;
|
|
|
|
function maybeDone(e) {
|
|
if (e) {
|
|
err = e;
|
|
}
|
|
doneCount ++;
|
|
if (doneCount === chunks.length) {
|
|
cb(err);
|
|
}
|
|
}
|
|
|
|
function retry(restChunks) {
|
|
// Poll for good measure
|
|
producer.poll();
|
|
|
|
// Just delay this thing a bit and pass the params
|
|
// backpressure will get exerted this way.
|
|
setTimeout(function() {
|
|
writev(producer, topic, restChunks, cb);
|
|
}, 500);
|
|
}
|
|
|
|
for (var i = 0; i < chunks.length; i++) {
|
|
chunk = chunks[i];
|
|
|
|
try {
|
|
if (Buffer.isBuffer(chunk)) {
|
|
producer.produce(topic, null, chunk, null);
|
|
} else {
|
|
producer.produce(chunk.topic, chunk.partition, chunk.value, chunk.key, chunk.timestamp, chunk.opaque, chunk.headers);
|
|
}
|
|
maybeDone();
|
|
} catch (e) {
|
|
if (ErrorCode.ERR__QUEUE_FULL === e.code) {
|
|
retry(chunks.slice(i));
|
|
} else {
|
|
cb(e);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
ProducerStream.prototype._writev = function(data, cb) {
|
|
if (!this.producer.isConnected()) {
|
|
this.once('ready', function() {
|
|
this._writev(data, cb);
|
|
});
|
|
return;
|
|
}
|
|
|
|
var self = this;
|
|
var len = data.length;
|
|
var chunks = new Array(len);
|
|
var size = 0;
|
|
|
|
for (var i = 0; i < len; i++) {
|
|
var chunk = data[i].chunk;
|
|
|
|
chunks[i] = chunk;
|
|
size += chunk.length;
|
|
}
|
|
|
|
writev(this.producer, this.topicName, chunks, function(err) {
|
|
if (err) {
|
|
self.close();
|
|
cb(err);
|
|
return;
|
|
}
|
|
cb();
|
|
});
|
|
|
|
};
|
|
|
|
ProducerStream.prototype.close = function(cb) {
|
|
var self = this;
|
|
if (cb) {
|
|
this.once('close', cb);
|
|
}
|
|
|
|
// Use interval variables in here
|
|
if (self.producer._isConnected) {
|
|
self.producer.disconnect(function() {
|
|
// Previously this set the producer to null. I'm not sure there is any benefit
|
|
// to that other than I guess helping flag it for GC?
|
|
// https://github.com/Blizzard/node-rdkafka/issues/344
|
|
close();
|
|
});
|
|
} else if (self.producer._isConnecting){
|
|
self.producer.once('ready', function() {
|
|
// Don't pass CB this time because it has already been passed
|
|
self.close();
|
|
});
|
|
} else {
|
|
setImmediate(close);
|
|
}
|
|
|
|
function close() {
|
|
self.emit('close');
|
|
}
|
|
};
|