/* * node-rdkafka - Node.js wrapper for RdKafka C/C++ library * * Copyright (c) 2016 Blizzard Entertainment * * This software may be modified and distributed under the terms * of the MIT license. See the LICENSE.txt file for details. */ module.exports = Producer; var Client = require('./client'); var util = require('util'); var Kafka = require('../librdkafka.js'); var ProducerStream = require('./producer-stream'); var LibrdKafkaError = require('./error'); var shallowCopy = require('./util').shallowCopy; util.inherits(Producer, Client); /** * Producer class for sending messages to Kafka * * This is the main entry point for writing data to Kafka. You * configure this like you do any other client, with a global * configuration and default topic configuration. * * Once you instantiate this object, you need to connect to it first. * This allows you to get the metadata and make sure the connection * can be made before you depend on it. After that, problems with * the connection will by brought down by using poll, which automatically * runs when a transaction is made on the object. * * @param {object} conf - Key value pairs to configure the producer * @param {object} topicConf - Key value pairs to create a default * topic configuration * @extends Client * @constructor */ function Producer(conf, topicConf) { if (!(this instanceof Producer)) { return new Producer(conf, topicConf); } conf = shallowCopy(conf); topicConf = shallowCopy(topicConf); /** * Producer message. This is sent to the wrapper, not received from it * * @typedef {object} Producer~Message * @property {string|buffer} message - The buffer to send to Kafka. * @property {Topic} topic - The Kafka topic to produce to. * @property {number} partition - The partition to produce to. Defaults to * the partitioner * @property {string} key - The key string to use for the message. * @see Consumer~Message */ var gTopic = conf.topic || false; var gPart = conf.partition || null; var dr_cb = conf.dr_cb || null; var dr_msg_cb = conf.dr_msg_cb || null; // delete keys we don't want to pass on delete conf.topic; delete conf.partition; delete conf.dr_cb; delete conf.dr_msg_cb; // client is an initialized consumer object // @see NodeKafka::Producer::Init Client.call(this, conf, Kafka.Producer, topicConf); // Delete these keys after saving them in vars this.globalConfig = conf; this.topicConfig = topicConf; this.defaultTopic = gTopic || null; this.defaultPartition = gPart == null ? -1 : gPart; this.sentMessages = 0; this.pollInterval = undefined; if (dr_msg_cb || dr_cb) { this._cb_configs.event.delivery_cb = function(err, report) { if (err) { err = LibrdKafkaError.create(err); } this.emit('delivery-report', err, report); }.bind(this); this._cb_configs.event.delivery_cb.dr_msg_cb = !!dr_msg_cb; if (typeof dr_cb === 'function') { this.on('delivery-report', dr_cb); } } } /** * Produce a message to Kafka synchronously. * * This is the method mainly used in this class. Use it to produce * a message to Kafka. * * When this is sent off, there is no guarantee it is delivered. If you need * guaranteed delivery, change your *acks* settings, or use delivery reports. * * @param {string} topic - The topic name to produce to. * @param {number|null} partition - The partition number to produce to. * @param {Buffer|null} message - The message to produce. * @param {string} key - The key associated with the message. * @param {number|null} timestamp - Timestamp to send with the message. * @param {object} opaque - An object you want passed along with this message, if provided. * @param {object} headers - A list of custom key value pairs that provide message metadata. * @throws {LibrdKafkaError} - Throws a librdkafka error if it failed. * @return {boolean} - returns an error if it failed, or true if not * @see Producer#produce */ Producer.prototype.produce = function(topic, partition, message, key, timestamp, opaque, headers) { if (!this._isConnected) { throw new Error('Producer not connected'); } // I have removed support for using a topic object. It is going to be removed // from librdkafka soon, and it causes issues with shutting down if (!topic || typeof topic !== 'string') { throw new TypeError('"topic" must be a string'); } this.sentMessages++; partition = partition == null ? this.defaultPartition : partition; return this._errorWrap( this._client.produce(topic, partition, message, key, timestamp, opaque, headers)); }; /** * Create a write stream interface for a producer. * * This stream does not run in object mode. It only takes buffers of data. * * @param {object} conf - Key value pairs to configure the producer * @param {object} topicConf - Key value pairs to create a default * topic configuration * @param {object} streamOptions - Stream options * @return {ProducerStream} - returns the write stream for writing to Kafka. */ Producer.createWriteStream = function(conf, topicConf, streamOptions) { var producer = new Producer(conf, topicConf); return new ProducerStream(producer, streamOptions); }; /** * Poll for events * * We need to run poll in order to learn about new events that have occurred. * This is no longer done automatically when we produce, so we need to run * it manually, or set the producer to automatically poll. * * @return {Producer} - returns itself. */ Producer.prototype.poll = function() { if (!this._isConnected) { throw new Error('Producer not connected'); } this._client.poll(); return this; }; /** * Set automatic polling for events. * * We need to run poll in order to learn about new events that have occurred. * If you would like this done on an interval with disconnects and reconnections * managed, you can do it here * * @param {number} interval - Interval, in milliseconds, to poll * * @return {Producer} - returns itself. */ Producer.prototype.setPollInterval = function(interval) { // If we already have a poll interval we need to stop it if (this.pollInterval) { clearInterval(this.pollInterval); this.pollInterval = undefined; } if (interval === 0) { // If the interval was set to 0, bail out. We don't want to process this. // If there was an interval previously set, it has been removed. return; } var self = this; // Now we want to make sure we are connected. if (!this._isConnected) { // If we are not, execute this once the connection goes through. this.once('ready', function() { self.setPollInterval(interval); }); return; } // We know we are connected at this point. // Unref this interval this.pollInterval = setInterval(function() { try { self.poll(); } catch (e) { // We can probably ignore errors here as far as broadcasting. // Disconnection issues will get handled below } }, interval).unref(); // Handle disconnections this.once('disconnected', function() { // Just rerun this function with interval 0. If any // poll interval is set, this will remove it self.setPollInterval(0); }); return this; }; /** * Flush the producer * * Flush everything on the internal librdkafka producer buffer. Do this before * disconnects usually * * @param {number} timeout - Number of milliseconds to try to flush before giving up. * @param {function} callback - Callback to fire when the flush is done. * * @return {Producer} - returns itself. */ Producer.prototype.flush = function(timeout, callback) { if (!this._isConnected) { throw new Error('Producer not connected'); } if (timeout === undefined || timeout === null) { timeout = 500; } this._client.flush(timeout, function(err) { if (err) { err = LibrdKafkaError.create(err); } if (callback) { callback(err); } }); return this; }; /** * Save the base disconnect method here so we can overwrite it and add a flush */ Producer.prototype._disconnect = Producer.prototype.disconnect; /** * Disconnect the producer * * Flush everything on the internal librdkafka producer buffer. Then disconnect * * @param {number} timeout - Number of milliseconds to try to flush before giving up, defaults to 5 seconds. * @param {function} cb - The callback to fire when */ Producer.prototype.disconnect = function(timeout, cb) { var self = this; var timeoutInterval = 5000; if (typeof timeout === 'function') { cb = timeout; } else { timeoutInterval = timeout; } this.flush(timeoutInterval, function() { self._disconnect(cb); }); }; /** * Init a transaction. * * Initialize transactions, this is only performed once per transactional producer. * * @param {number} timeout - Number of milliseconds to try to initialize before giving up, defaults to 5 seconds. * @param {function} cb - Callback to return when operation is completed * @return {Producer} - returns itself. */ Producer.prototype.initTransactions = function(timeout, cb) { if (typeof timeout === 'function') { cb = timeout; timeout = 5000; } this._client.initTransactions(timeout, function(err) { cb(err ? LibrdKafkaError.create(err) : err); }); }; /** * Begin a transaction. * * 'initTransaction' must have been called successfully (once) before this function is called. * * @return {Producer} - returns itself. */ Producer.prototype.beginTransaction = function(cb) { this._client.beginTransaction(function(err) { cb(err ? LibrdKafkaError.create(err) : err); }); }; /** * Commit the current transaction (as started with 'beginTransaction'). * * @param {number} timeout - Number of milliseconds to try to commit before giving up, defaults to 5 seconds * @param {function} cb - Callback to return when operation is completed * @return {Producer} - returns itself. */ Producer.prototype.commitTransaction = function(timeout, cb) { if (typeof timeout === 'function') { cb = timeout; timeout = 5000; } this._client.commitTransaction(timeout, function(err) { cb(err ? LibrdKafkaError.create(err) : err); }); }; /** * Aborts the ongoing transaction. * * @param {number} timeout - Number of milliseconds to try to abort, defaults to 5 seconds * @param {function} cb - Callback to return when operation is completed * @return {Producer} - returns itself. */ Producer.prototype.abortTransaction = function(timeout, cb) { if (typeof timeout === 'function') { cb = timeout; timeout = 5000; } this._client.abortTransaction(timeout, function(err) { cb(err ? LibrdKafkaError.create(err) : err); }); }; /** * Send the current offsets of the consumer to the ongoing transaction. * * @param {number} offsets - Offsets to send as part of the next commit * @param {Consumer} consumer - An instance of the consumer * @param {number} timeout - Number of milliseconds to try to send offsets, defaults to 5 seconds * @param {function} cb - Callback to return when operation is completed * @return {Producer} - returns itself. */ Producer.prototype.sendOffsetsToTransaction = function(offsets, consumer, timeout, cb) { if (typeof timeout === 'function') { cb = timeout; timeout = 5000; } this._client.sendOffsetsToTransaction(offsets, consumer.getClient(), timeout, function(err) { cb(err ? LibrdKafkaError.create(err) : err); }); };