- /*
- * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
- *
- * Copyright (c) 2016 Blizzard Entertainment
- *
- * This software may be modified and distributed under the terms
- * of the MIT license. See the LICENSE.txt file for details.
- */
- module.exports = Producer;
- var Client = require('./client');
- var util = require('util');
- var Kafka = require('../librdkafka.js');
- var ProducerStream = require('./producer-stream');
- var LibrdKafkaError = require('./error');
- var shallowCopy = require('./util').shallowCopy;
- util.inherits(Producer, Client);
- /**
- * Producer class for sending messages to Kafka
- *
- * This is the main entry point for writing data to Kafka. You
- * configure this like you do any other client, with a global
- * configuration and default topic configuration.
- *
- * Once you instantiate this object, you need to connect to it first.
- * This allows you to get the metadata and make sure the connection
- * can be made before you depend on it. After that, problems with
- * the connection will by brought down by using poll, which automatically
- * runs when a transaction is made on the object.
- *
- * @param {object} conf - Key value pairs to configure the producer
- * @param {object} topicConf - Key value pairs to create a default
- * topic configuration
- * @extends Client
- * @constructor
- */
- function Producer(conf, topicConf) {
- if (!(this instanceof Producer)) {
- return new Producer(conf, topicConf);
- }
- conf = shallowCopy(conf);
- topicConf = shallowCopy(topicConf);
- /**
- * Producer message. This is sent to the wrapper, not received from it
- *
- * @typedef {object} Producer~Message
- * @property {string|buffer} message - The buffer to send to Kafka.
- * @property {Topic} topic - The Kafka topic to produce to.
- * @property {number} partition - The partition to produce to. Defaults to
- * the partitioner
- * @property {string} key - The key string to use for the message.
- * @see Consumer~Message
- */
- var gTopic = conf.topic || false;
- var gPart = conf.partition || null;
- var dr_cb = conf.dr_cb || null;
- var dr_msg_cb = conf.dr_msg_cb || null;
- // delete keys we don't want to pass on
- delete conf.topic;
- delete conf.partition;
- delete conf.dr_cb;
- delete conf.dr_msg_cb;
- // client is an initialized consumer object
- // @see NodeKafka::Producer::Init
- Client.call(this, conf, Kafka.Producer, topicConf);
- // Delete these keys after saving them in vars
- this.globalConfig = conf;
- this.topicConfig = topicConf;
- this.defaultTopic = gTopic || null;
- this.defaultPartition = gPart == null ? -1 : gPart;
- this.sentMessages = 0;
- this.pollInterval = undefined;
- if (dr_msg_cb || dr_cb) {
- this._cb_configs.event.delivery_cb = function(err, report) {
- if (err) {
- err = LibrdKafkaError.create(err);
- }
- this.emit('delivery-report', err, report);
- }.bind(this);
- this._cb_configs.event.delivery_cb.dr_msg_cb = !!dr_msg_cb;
- if (typeof dr_cb === 'function') {
- this.on('delivery-report', dr_cb);
- }
- }
- }
- /**
- * Produce a message to Kafka synchronously.
- *
- * This is the method mainly used in this class. Use it to produce
- * a message to Kafka.
- *
- * When this is sent off, there is no guarantee it is delivered. If you need
- * guaranteed delivery, change your *acks* settings, or use delivery reports.
- *
- * @param {string} topic - The topic name to produce to.
- * @param {number|null} partition - The partition number to produce to.
- * @param {Buffer|null} message - The message to produce.
- * @param {string} key - The key associated with the message.
- * @param {number|null} timestamp - Timestamp to send with the message.
- * @param {object} opaque - An object you want passed along with this message, if provided.
- * @param {object} headers - A list of custom key value pairs that provide message metadata.
- * @throws {LibrdKafkaError} - Throws a librdkafka error if it failed.
- * @return {boolean} - returns an error if it failed, or true if not
- * @see Producer#produce
- */
- Producer.prototype.produce = function(topic, partition, message, key, timestamp, opaque, headers) {
- if (!this._isConnected) {
- throw new Error('Producer not connected');
- }
- // I have removed support for using a topic object. It is going to be removed
- // from librdkafka soon, and it causes issues with shutting down
- if (!topic || typeof topic !== 'string') {
- throw new TypeError('"topic" must be a string');
- }
- this.sentMessages++;
- partition = partition == null ? this.defaultPartition : partition;
- return this._errorWrap(
- this._client.produce(topic, partition, message, key, timestamp, opaque, headers));
- };
- /**
- * Create a write stream interface for a producer.
- *
- * This stream does not run in object mode. It only takes buffers of data.
- *
- * @param {object} conf - Key value pairs to configure the producer
- * @param {object} topicConf - Key value pairs to create a default
- * topic configuration
- * @param {object} streamOptions - Stream options
- * @return {ProducerStream} - returns the write stream for writing to Kafka.
- */
- Producer.createWriteStream = function(conf, topicConf, streamOptions) {
- var producer = new Producer(conf, topicConf);
- return new ProducerStream(producer, streamOptions);
- };
- /**
- * Poll for events
- *
- * We need to run poll in order to learn about new events that have occurred.
- * This is no longer done automatically when we produce, so we need to run
- * it manually, or set the producer to automatically poll.
- *
- * @return {Producer} - returns itself.
- */
- Producer.prototype.poll = function() {
- if (!this._isConnected) {
- throw new Error('Producer not connected');
- }
- this._client.poll();
- return this;
- };
- /**
- * Set automatic polling for events.
- *
- * We need to run poll in order to learn about new events that have occurred.
- * If you would like this done on an interval with disconnects and reconnections
- * managed, you can do it here
- *
- * @param {number} interval - Interval, in milliseconds, to poll
- *
- * @return {Producer} - returns itself.
- */
- Producer.prototype.setPollInterval = function(interval) {
- // If we already have a poll interval we need to stop it
- if (this.pollInterval) {
- clearInterval(this.pollInterval);
- this.pollInterval = undefined;
- }
- if (interval === 0) {
- // If the interval was set to 0, bail out. We don't want to process this.
- // If there was an interval previously set, it has been removed.
- return;
- }
- var self = this;
- // Now we want to make sure we are connected.
- if (!this._isConnected) {
- // If we are not, execute this once the connection goes through.
- this.once('ready', function() {
- self.setPollInterval(interval);
- });
- return;
- }
- // We know we are connected at this point.
- // Unref this interval
- this.pollInterval = setInterval(function() {
- try {
- self.poll();
- } catch (e) {
- // We can probably ignore errors here as far as broadcasting.
- // Disconnection issues will get handled below
- }
- }, interval).unref();
- // Handle disconnections
- this.once('disconnected', function() {
- // Just rerun this function with interval 0. If any
- // poll interval is set, this will remove it
- self.setPollInterval(0);
- });
- return this;
- };
- /**
- * Flush the producer
- *
- * Flush everything on the internal librdkafka producer buffer. Do this before
- * disconnects usually
- *
- * @param {number} timeout - Number of milliseconds to try to flush before giving up.
- * @param {function} callback - Callback to fire when the flush is done.
- *
- * @return {Producer} - returns itself.
- */
- Producer.prototype.flush = function(timeout, callback) {
- if (!this._isConnected) {
- throw new Error('Producer not connected');
- }
- if (timeout === undefined || timeout === null) {
- timeout = 500;
- }
- this._client.flush(timeout, function(err) {
- if (err) {
- err = LibrdKafkaError.create(err);
- }
- if (callback) {
- callback(err);
- }
- });
- return this;
- };
- /**
- * Save the base disconnect method here so we can overwrite it and add a flush
- */
- Producer.prototype._disconnect = Producer.prototype.disconnect;
- /**
- * Disconnect the producer
- *
- * Flush everything on the internal librdkafka producer buffer. Then disconnect
- *
- * @param {number} timeout - Number of milliseconds to try to flush before giving up, defaults to 5 seconds.
- * @param {function} cb - The callback to fire when
- */
- Producer.prototype.disconnect = function(timeout, cb) {
- var self = this;
- var timeoutInterval = 5000;
- if (typeof timeout === 'function') {
- cb = timeout;
- } else {
- timeoutInterval = timeout;
- }
- this.flush(timeoutInterval, function() {
- self._disconnect(cb);
- });
- };
- /**
- * Init a transaction.
- *
- * Initialize transactions, this is only performed once per transactional producer.
- *
- * @param {number} timeout - Number of milliseconds to try to initialize before giving up, defaults to 5 seconds.
- * @param {function} cb - Callback to return when operation is completed
- * @return {Producer} - returns itself.
- */
- Producer.prototype.initTransactions = function(timeout, cb) {
- if (typeof timeout === 'function') {
- cb = timeout;
- timeout = 5000;
- }
- this._client.initTransactions(timeout, function(err) {
- cb(err ? LibrdKafkaError.create(err) : err);
- });
- };
- /**
- * Begin a transaction.
- *
- * 'initTransaction' must have been called successfully (once) before this function is called.
- *
- * @return {Producer} - returns itself.
- */
- Producer.prototype.beginTransaction = function(cb) {
- this._client.beginTransaction(function(err) {
- cb(err ? LibrdKafkaError.create(err) : err);
- });
- };
- /**
- * Commit the current transaction (as started with 'beginTransaction').
- *
- * @param {number} timeout - Number of milliseconds to try to commit before giving up, defaults to 5 seconds
- * @param {function} cb - Callback to return when operation is completed
- * @return {Producer} - returns itself.
- */
- Producer.prototype.commitTransaction = function(timeout, cb) {
- if (typeof timeout === 'function') {
- cb = timeout;
- timeout = 5000;
- }
- this._client.commitTransaction(timeout, function(err) {
- cb(err ? LibrdKafkaError.create(err) : err);
- });
- };
- /**
- * Aborts the ongoing transaction.
- *
- * @param {number} timeout - Number of milliseconds to try to abort, defaults to 5 seconds
- * @param {function} cb - Callback to return when operation is completed
- * @return {Producer} - returns itself.
- */
- Producer.prototype.abortTransaction = function(timeout, cb) {
- if (typeof timeout === 'function') {
- cb = timeout;
- timeout = 5000;
- }
- this._client.abortTransaction(timeout, function(err) {
- cb(err ? LibrdKafkaError.create(err) : err);
- });
- };
- /**
- * Send the current offsets of the consumer to the ongoing transaction.
- *
- * @param {number} offsets - Offsets to send as part of the next commit
- * @param {Consumer} consumer - An instance of the consumer
- * @param {number} timeout - Number of milliseconds to try to send offsets, defaults to 5 seconds
- * @param {function} cb - Callback to return when operation is completed
- * @return {Producer} - returns itself.
- */
- Producer.prototype.sendOffsetsToTransaction = function(offsets, consumer, timeout, cb) {
- if (typeof timeout === 'function') {
- cb = timeout;
- timeout = 5000;
- }
- this._client.sendOffsetsToTransaction(offsets, consumer.getClient(), timeout, function(err) {
- cb(err ? LibrdKafkaError.create(err) : err);
- });
- };