|
|
- /*
- * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
- *
- * Copyright (c) 2016 Blizzard Entertainment
- *
- * This software may be modified and distributed under the terms
- * of the MIT license. See the LICENSE.txt file for details.
- */
-
- 'use strict';
-
- module.exports = ProducerStream;
-
- var Writable = require('stream').Writable;
- var util = require('util');
- var ErrorCode = require('./error').codes;
-
- util.inherits(ProducerStream, Writable);
-
- /**
- * Writable stream integrating with the Kafka Producer.
- *
- * This class is used to write data to Kafka in a streaming way. It takes
- * buffers of data and puts them into the appropriate Kafka topic. If you need
- * finer control over partitions or keys, this is probably not the class for
- * you. In that situation just use the Producer itself.
- *
- * The stream detects if Kafka is already connected. You can safely begin
- * writing right away.
- *
- * This stream does not operate in Object mode and can only be given buffers.
- *
- * @param {Producer} producer - The Kafka Producer object.
- * @param {array} topics - Array of topics
- * @param {object} options - Topic configuration.
- * @constructor
- * @extends stream.Writable
- */
- function ProducerStream(producer, options) {
- if (!(this instanceof ProducerStream)) {
- return new ProducerStream(producer, options);
- }
-
- if (options === undefined) {
- options = {};
- } else if (typeof options === 'string') {
- options = { encoding: options };
- } else if (options === null || typeof options !== 'object') {
- throw new TypeError('"streamOptions" argument must be a string or an object');
- }
-
- if (!options.objectMode && !options.topic) {
- throw new TypeError('ProducerStreams not using objectMode must provide a topic to produce to.');
- }
-
- if (options.objectMode !== true) {
- this._write = this._write_buffer;
- } else {
- this._write = this._write_message;
- }
-
- Writable.call(this, options);
-
- this.producer = producer;
- this.topicName = options.topic;
-
- this.autoClose = options.autoClose === undefined ? true : !!options.autoClose;
- this.connectOptions = options.connectOptions || {};
-
- this.producer.setPollInterval(options.pollInterval || 1000);
-
- if (options.encoding) {
- this.setDefaultEncoding(options.encoding);
- }
-
- // Connect to the producer. Unless we are already connected
- if (!this.producer.isConnected()) {
- this.connect(this.connectOptions);
- }
-
- var self = this;
-
- this.once('finish', function() {
- if (this.autoClose) {
- this.close();
- }
- });
-
- }
-
- ProducerStream.prototype.connect = function(options) {
- this.producer.connect(options, function(err, data) {
- if (err) {
- this.emit('error', err);
- return;
- }
-
- }.bind(this));
- };
-
- /**
- * Internal stream write method for ProducerStream when writing buffers.
- *
- * This method should never be called externally. It has some recursion to
- * handle cases where the producer is not yet connected.
- *
- * @param {buffer} chunk - Chunk to write.
- * @param {string} encoding - Encoding for the buffer
- * @param {Function} cb - Callback to call when the stream is done processing
- * the data.
- * @private
- * @see https://github.com/nodejs/node/blob/master/lib/fs.js#L1901
- */
- ProducerStream.prototype._write_buffer = function(data, encoding, cb) {
- if (!(data instanceof Buffer)) {
- this.emit('error', new Error('Invalid data. Can only produce buffers'));
- return;
- }
-
- var self = this;
-
- if (!this.producer.isConnected()) {
- this.producer.once('ready', function() {
- self._write(data, encoding, cb);
- });
- return;
- }
-
- try {
- this.producer.produce(self.topicName, null, data, null);
- setImmediate(cb);
- } catch (e) {
- if (ErrorCode.ERR__QUEUE_FULL === e.code) {
- // Poll for good measure
- self.producer.poll();
-
- // Just delay this thing a bit and pass the params
- // backpressure will get exerted this way.
- setTimeout(function() {
- self._write(data, encoding, cb);
- }, 500);
- } else {
- if (self.autoClose) {
- self.close();
- }
- setImmediate(function() {
- cb(e);
- });
- }
- }
- };
-
- /**
- * Internal stream write method for ProducerStream when writing objects.
- *
- * This method should never be called externally. It has some recursion to
- * handle cases where the producer is not yet connected.
- *
- * @param {object} message - Message to write.
- * @param {string} encoding - Encoding for the buffer
- * @param {Function} cb - Callback to call when the stream is done processing
- * the data.
- * @private
- * @see https://github.com/nodejs/node/blob/master/lib/fs.js#L1901
- */
- ProducerStream.prototype._write_message = function(message, encoding, cb) {
- var self = this;
-
- if (!this.producer.isConnected()) {
- this.producer.once('ready', function() {
- self._write(message, encoding, cb);
- });
- return;
- }
-
- try {
- this.producer.produce(message.topic, message.partition, message.value, message.key, message.timestamp, message.opaque, message.headers);
- setImmediate(cb);
- } catch (e) {
- if (ErrorCode.ERR__QUEUE_FULL === e.code) {
- // Poll for good measure
- self.producer.poll();
-
- // Just delay this thing a bit and pass the params
- // backpressure will get exerted this way.
- setTimeout(function() {
- self._write(message, encoding, cb);
- }, 500);
- } else {
- if (self.autoClose) {
- self.close();
- }
- setImmediate(function() {
- cb(e);
- });
- }
- }
- };
-
- function writev(producer, topic, chunks, cb) {
-
- // @todo maybe a produce batch method?
- var doneCount = 0;
- var err = null;
- var chunk = null;
-
- function maybeDone(e) {
- if (e) {
- err = e;
- }
- doneCount ++;
- if (doneCount === chunks.length) {
- cb(err);
- }
- }
-
- function retry(restChunks) {
- // Poll for good measure
- producer.poll();
-
- // Just delay this thing a bit and pass the params
- // backpressure will get exerted this way.
- setTimeout(function() {
- writev(producer, topic, restChunks, cb);
- }, 500);
- }
-
- for (var i = 0; i < chunks.length; i++) {
- chunk = chunks[i];
-
- try {
- if (Buffer.isBuffer(chunk)) {
- producer.produce(topic, null, chunk, null);
- } else {
- producer.produce(chunk.topic, chunk.partition, chunk.value, chunk.key, chunk.timestamp, chunk.opaque, chunk.headers);
- }
- maybeDone();
- } catch (e) {
- if (ErrorCode.ERR__QUEUE_FULL === e.code) {
- retry(chunks.slice(i));
- } else {
- cb(e);
- }
- break;
- }
- }
-
- }
-
- ProducerStream.prototype._writev = function(data, cb) {
- if (!this.producer.isConnected()) {
- this.once('ready', function() {
- this._writev(data, cb);
- });
- return;
- }
-
- var self = this;
- var len = data.length;
- var chunks = new Array(len);
- var size = 0;
-
- for (var i = 0; i < len; i++) {
- var chunk = data[i].chunk;
-
- chunks[i] = chunk;
- size += chunk.length;
- }
-
- writev(this.producer, this.topicName, chunks, function(err) {
- if (err) {
- self.close();
- cb(err);
- return;
- }
- cb();
- });
-
- };
-
- ProducerStream.prototype.close = function(cb) {
- var self = this;
- if (cb) {
- this.once('close', cb);
- }
-
- // Use interval variables in here
- if (self.producer._isConnected) {
- self.producer.disconnect(function() {
- // Previously this set the producer to null. I'm not sure there is any benefit
- // to that other than I guess helping flag it for GC?
- // https://github.com/Blizzard/node-rdkafka/issues/344
- close();
- });
- } else if (self.producer._isConnecting){
- self.producer.once('ready', function() {
- // Don't pass CB this time because it has already been passed
- self.close();
- });
- } else {
- setImmediate(close);
- }
-
- function close() {
- self.emit('close');
- }
- };
|