/*
|
|
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
|
|
*
|
|
* Copyright (c) 2016 Blizzard Entertainment
|
|
*
|
|
* This software may be modified and distributed under the terms
|
|
* of the MIT license. See the LICENSE.txt file for details.
|
|
*/
|
|
|
|
module.exports = Client;
|
|
|
|
var Emitter = require('events').EventEmitter;
|
|
var util = require('util');
|
|
var Kafka = require('../librdkafka.js');
|
|
var assert = require('assert');
|
|
|
|
var LibrdKafkaError = require('./error');
|
|
|
|
util.inherits(Client, Emitter);
|
|
|
|
/**
|
|
* Base class for Consumer and Producer
|
|
*
|
|
* This should not be created independently, but rather is
|
|
* the base class on which both producer and consumer
|
|
* get their common functionality.
|
|
*
|
|
* @param {object} globalConf - Global configuration in key value pairs.
|
|
* @param {function} SubClientType - The function representing the subclient
|
|
* type. In C++ land this needs to be a class that inherits from Connection.
|
|
* @param {object} topicConf - Topic configuration in key value pairs
|
|
* @constructor
|
|
* @extends Emitter
|
|
*/
|
|
function Client(globalConf, SubClientType, topicConf) {
|
|
if (!(this instanceof Client)) {
|
|
return new Client(globalConf, SubClientType, topicConf);
|
|
}
|
|
|
|
Emitter.call(this);
|
|
|
|
// This superclass must be initialized with the Kafka.{Producer,Consumer}
|
|
// @example var client = new Client({}, Kafka.Producer);
|
|
// remember this is a superclass so this will get taken care of in
|
|
// the producer and consumer main wrappers
|
|
|
|
var no_event_cb = globalConf.event_cb === false;
|
|
topicConf = topicConf || {};
|
|
|
|
// delete this because librdkafka will complain since this particular
|
|
// key is a real conf value
|
|
delete globalConf.event_cb;
|
|
|
|
this._client = new SubClientType(globalConf, topicConf);
|
|
|
|
var extractFunctions = function(obj) {
|
|
obj = obj || {};
|
|
var obj2 = {};
|
|
for (var p in obj) {
|
|
if (typeof obj[p] === "function") {
|
|
obj2[p] = obj[p];
|
|
}
|
|
}
|
|
return obj2;
|
|
}
|
|
this._cb_configs = {
|
|
global: extractFunctions(globalConf),
|
|
topic: extractFunctions(topicConf),
|
|
event: {},
|
|
}
|
|
|
|
if (!no_event_cb) {
|
|
this._cb_configs.event.event_cb = function(eventType, eventData) {
|
|
switch (eventType) {
|
|
case 'error':
|
|
this.emit('event.error', LibrdKafkaError.create(eventData));
|
|
break;
|
|
case 'stats':
|
|
this.emit('event.stats', eventData);
|
|
break;
|
|
case 'log':
|
|
this.emit('event.log', eventData);
|
|
break;
|
|
default:
|
|
this.emit('event.event', eventData);
|
|
this.emit('event.' + eventType, eventData);
|
|
}
|
|
}.bind(this);
|
|
}
|
|
|
|
this.metrics = {};
|
|
this._isConnected = false;
|
|
this.errorCounter = 0;
|
|
|
|
/**
|
|
* Metadata object. Starts out empty but will be filled with information after
|
|
* the initial connect.
|
|
*
|
|
* @type {Client~Metadata}
|
|
*/
|
|
this._metadata = {};
|
|
|
|
var self = this;
|
|
|
|
this.on('ready', function(info) {
|
|
self.metrics.connectionOpened = Date.now();
|
|
self.name = info.name;
|
|
})
|
|
.on('disconnected', function() {
|
|
// reset metrics
|
|
self.metrics = {};
|
|
self._isConnected = false;
|
|
// keep the metadata. it still may be useful
|
|
})
|
|
.on('event.error', function(err) {
|
|
self.lastError = err;
|
|
++self.errorCounter;
|
|
});
|
|
|
|
}
|
|
|
|
/**
|
|
* Connect to the broker and receive its metadata.
|
|
*
|
|
* Connects to a broker by establishing the client and fetches its metadata.
|
|
*
|
|
* @param {object} metadataOptions - Options to be sent to the metadata.
|
|
* @param {string} metadataOptions.topic - Topic to fetch metadata for. Empty string is treated as empty.
|
|
* @param {boolean} metadataOptions.allTopics - Fetch metadata for all topics, not just the ones we know about.
|
|
* @param {int} metadataOptions.timeout - The timeout, in ms, to allow for fetching metadata. Defaults to 30000ms
|
|
* @param {Client~connectionCallback} cb - Callback that indicates we are
|
|
* done connecting.
|
|
* @return {Client} - Returns itself.
|
|
*/
|
|
Client.prototype.connect = function(metadataOptions, cb) {
|
|
var self = this;
|
|
|
|
var next = function(err, data) {
|
|
self._isConnecting = false;
|
|
if (cb) {
|
|
cb(err, data);
|
|
}
|
|
};
|
|
|
|
if (this._isConnected) {
|
|
setImmediate(next);
|
|
return self;
|
|
}
|
|
|
|
if (this._isConnecting) {
|
|
this.once('ready', function() {
|
|
next(null, this._metadata);
|
|
});
|
|
return self;
|
|
}
|
|
|
|
this._isConnecting = true;
|
|
|
|
var fail = function(err) {
|
|
var callbackCalled = false;
|
|
var t;
|
|
|
|
if (self._isConnected) {
|
|
self._isConnected = false;
|
|
self._client.disconnect(function() {
|
|
if (callbackCalled) {
|
|
return;
|
|
}
|
|
clearTimeout(t);
|
|
callbackCalled = true;
|
|
|
|
next(err); return;
|
|
});
|
|
|
|
// don't take too long. this is a failure, after all
|
|
t = setTimeout(function() {
|
|
if (callbackCalled) {
|
|
return;
|
|
}
|
|
callbackCalled = true;
|
|
|
|
next(err); return;
|
|
}, 10000).unref();
|
|
|
|
self.emit('connection.failure', err, self.metrics);
|
|
} else {
|
|
|
|
next(err);
|
|
}
|
|
};
|
|
|
|
this._client.configureCallbacks(true, this._cb_configs);
|
|
|
|
this._client.connect(function(err, info) {
|
|
if (err) {
|
|
fail(LibrdKafkaError.create(err)); return;
|
|
}
|
|
|
|
self._isConnected = true;
|
|
|
|
// Otherwise we are successful
|
|
self.getMetadata(metadataOptions || {}, function(err, metadata) {
|
|
if (err) {
|
|
// We are connected so we need to disconnect
|
|
fail(LibrdKafkaError.create(err)); return;
|
|
}
|
|
|
|
self._isConnecting = false;
|
|
// We got the metadata otherwise. It is set according to above param
|
|
// Set it here as well so subsequent ready callbacks
|
|
// can check it
|
|
self._isConnected = true;
|
|
|
|
/**
|
|
* Ready event. Called when the Client connects successfully
|
|
*
|
|
* @event Client#ready
|
|
* @type {object}
|
|
* @property {string} name - the name of the broker.
|
|
*/
|
|
self.emit('ready', info, metadata);
|
|
next(null, metadata); return;
|
|
|
|
});
|
|
|
|
});
|
|
|
|
return self;
|
|
|
|
};
|
|
|
|
/**
|
|
* Get the native Kafka client.
|
|
*
|
|
* You probably shouldn't use this, but if you want to execute methods directly
|
|
* on the c++ wrapper you can do it here.
|
|
*
|
|
* @see connection.cc
|
|
* @return {Connection} - The native Kafka client.
|
|
*/
|
|
Client.prototype.getClient = function() {
|
|
return this._client;
|
|
};
|
|
|
|
/**
|
|
* Find out how long we have been connected to Kafka.
|
|
*
|
|
* @return {number} - Milliseconds since the connection has been established.
|
|
*/
|
|
Client.prototype.connectedTime = function() {
|
|
if (!this.isConnected()) {
|
|
return 0;
|
|
}
|
|
return Date.now() - this.metrics.connectionOpened;
|
|
};
|
|
|
|
/**
|
|
* Whether or not we are connected to Kafka.
|
|
*
|
|
* @return {boolean} - Whether we are connected.
|
|
*/
|
|
Client.prototype.isConnected = function() {
|
|
return !!(this._isConnected && this._client);
|
|
};
|
|
|
|
/**
|
|
* Get the last error emitted if it exists.
|
|
*
|
|
* @return {LibrdKafkaError} - Returns the LibrdKafkaError or null if
|
|
* one hasn't been thrown.
|
|
*/
|
|
Client.prototype.getLastError = function() {
|
|
return this.lastError || null;
|
|
};
|
|
|
|
/**
|
|
* Disconnect from the Kafka client.
|
|
*
|
|
* This method will disconnect us from Kafka unless we are already in a
|
|
* disconnecting state. Use this when you're done reading or producing messages
|
|
* on a given client.
|
|
*
|
|
* It will also emit the disconnected event.
|
|
*
|
|
* @fires Client#disconnected
|
|
* @return {function} - Callback to call when disconnection is complete.
|
|
*/
|
|
Client.prototype.disconnect = function(cb) {
|
|
var self = this;
|
|
|
|
if (!this._isDisconnecting && this._client) {
|
|
this._isDisconnecting = true;
|
|
this._client.disconnect(function() {
|
|
// this take 5000 milliseconds. Librdkafka needs to make sure the memory
|
|
// has been cleaned up before we delete things. @see RdKafka::wait_destroyed
|
|
self._client.configureCallbacks(false, self._cb_configs);
|
|
|
|
// Broadcast metrics. Gives people one last chance to do something with them
|
|
self._isDisconnecting = false;
|
|
/**
|
|
* Disconnect event. Called after disconnection is finished.
|
|
*
|
|
* @event Client#disconnected
|
|
* @type {object}
|
|
* @property {date} connectionOpened - when the connection was opened.
|
|
*/
|
|
var metricsCopy = Object.assign({}, self.metrics);
|
|
self.emit('disconnected', metricsCopy);
|
|
if (cb) {
|
|
cb(null, metricsCopy);
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
return self;
|
|
};
|
|
|
|
/**
|
|
* Get client metadata.
|
|
*
|
|
* Note: using a <code>metadataOptions.topic</code> parameter has a potential side-effect.
|
|
* A Topic object will be created, if it did not exist yet, with default options
|
|
* and it will be cached by librdkafka.
|
|
*
|
|
* A subsequent call to create the topic object with specific options (e.g. <code>acks</code>) will return
|
|
* the previous instance and the specific options will be silently ignored.
|
|
*
|
|
* To avoid this side effect, the topic object can be created with the expected options before requesting metadata,
|
|
* or the metadata request can be performed for all topics (by omitting <code>metadataOptions.topic</code>).
|
|
*
|
|
* @param {object} metadataOptions - Metadata options to pass to the client.
|
|
* @param {string} metadataOptions.topic - Topic string for which to fetch
|
|
* metadata
|
|
* @param {number} metadataOptions.timeout - Max time, in ms, to try to fetch
|
|
* metadata before timing out. Defaults to 3000.
|
|
* @param {Client~metadataCallback} cb - Callback to fire with the metadata.
|
|
*/
|
|
Client.prototype.getMetadata = function(metadataOptions, cb) {
|
|
if (!this.isConnected()) {
|
|
return cb(new Error('Client is disconnected'));
|
|
}
|
|
|
|
var self = this;
|
|
|
|
this._client.getMetadata(metadataOptions || {}, function(err, metadata) {
|
|
if (err) {
|
|
if (cb) {
|
|
cb(LibrdKafkaError.create(err));
|
|
}
|
|
return;
|
|
}
|
|
|
|
// No error otherwise
|
|
self._metadata = metadata;
|
|
|
|
if (cb) {
|
|
cb(null, metadata);
|
|
}
|
|
|
|
});
|
|
|
|
};
|
|
|
|
/**
|
|
* Query offsets from the broker.
|
|
*
|
|
* This function makes a call to the broker to get the current low (oldest/beginning)
|
|
* and high (newest/end) offsets for a topic partition.
|
|
*
|
|
* @param {string} topic - Topic to recieve offsets from.
|
|
* @param {number} partition - Partition of the provided topic to recieve offsets from
|
|
* @param {number} timeout - Number of ms to wait to recieve a response.
|
|
* @param {Client~watermarkOffsetsCallback} cb - Callback to fire with the offsets.
|
|
*/
|
|
Client.prototype.queryWatermarkOffsets = function(topic, partition, timeout, cb) {
|
|
if (!this.isConnected()) {
|
|
if (cb) {
|
|
return cb(new Error('Client is disconnected'));
|
|
} else {
|
|
return;
|
|
}
|
|
}
|
|
|
|
var self = this;
|
|
|
|
if (typeof timeout === 'function') {
|
|
cb = timeout;
|
|
timeout = 1000;
|
|
}
|
|
|
|
if (!timeout) {
|
|
timeout = 1000;
|
|
}
|
|
|
|
this._client.queryWatermarkOffsets(topic, partition, timeout, function(err, offsets) {
|
|
if (err) {
|
|
if (cb) {
|
|
cb(LibrdKafkaError.create(err));
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (cb) {
|
|
cb(null, offsets);
|
|
}
|
|
|
|
});
|
|
};
|
|
|
|
/**
|
|
* Query offsets for times from the broker.
|
|
*
|
|
* This function makes a call to the broker to get the offsets for times specified.
|
|
*
|
|
* @param {TopicPartition[]} toppars - Array of topic partitions. The offset in these
|
|
* should instead refer to a timestamp you want
|
|
* offsets for
|
|
* @param {number} timeout - Number of ms to wait to recieve a response.
|
|
* @param {Client~offsetsForTimesCallback} cb - Callback to fire with the filled in offsets.
|
|
*/
|
|
Client.prototype.offsetsForTimes = function(toppars, timeout, cb) {
|
|
if (!this.isConnected()) {
|
|
if (cb) {
|
|
return cb(new Error('Client is disconnected'));
|
|
} else {
|
|
return;
|
|
}
|
|
}
|
|
|
|
var self = this;
|
|
|
|
if (typeof timeout === 'function') {
|
|
cb = timeout;
|
|
timeout = 1000;
|
|
}
|
|
|
|
if (!timeout) {
|
|
timeout = 1000;
|
|
}
|
|
|
|
this._client.offsetsForTimes(toppars, timeout, function(err, toppars) {
|
|
if (err) {
|
|
if (cb) {
|
|
cb(LibrdKafkaError.create(err));
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (cb) {
|
|
cb(null, toppars);
|
|
}
|
|
|
|
});
|
|
};
|
|
|
|
/**
|
|
* Wrap a potential RdKafka error.
|
|
*
|
|
* This internal method is meant to take a return value
|
|
* from a function that returns an RdKafka error code, throw if it
|
|
* is an error (Making it a proper librdkafka error object), or
|
|
* return the appropriate value otherwise.
|
|
*
|
|
* It is intended to be used in a return statement,
|
|
*
|
|
* @private
|
|
* @param {number} errorCode - Error code returned from a native method
|
|
* @param {bool} intIsError - If specified true, any non-number return type will be classified as a success
|
|
* @return {boolean} - Returns true or the method return value unless it throws.
|
|
*/
|
|
Client.prototype._errorWrap = function(errorCode, intIsError) {
|
|
var returnValue = true;
|
|
if (intIsError) {
|
|
returnValue = errorCode;
|
|
errorCode = typeof errorCode === 'number' ? errorCode : 0;
|
|
}
|
|
|
|
if (errorCode !== LibrdKafkaError.codes.ERR_NO_ERROR) {
|
|
var e = LibrdKafkaError.create(errorCode);
|
|
throw e;
|
|
}
|
|
|
|
return returnValue;
|
|
};
|
|
|
|
/**
|
|
* This callback is used to pass metadata or an error after a successful
|
|
* connection
|
|
*
|
|
* @callback Client~connectionCallback
|
|
* @param {Error} err - An error, if one occurred while connecting.
|
|
* @param {Client~Metadata} metadata - Metadata object.
|
|
*/
|
|
|
|
/**
|
|
* This callback is used to pass offsets or an error after a successful
|
|
* query
|
|
*
|
|
* @callback Client~watermarkOffsetsCallback
|
|
* @param {Error} err - An error, if one occurred while connecting.
|
|
* @param {Client~watermarkOffsets} offsets - Watermark offsets
|
|
*/
|
|
|
|
/**
|
|
* This callback is used to pass toppars or an error after a successful
|
|
* times query
|
|
*
|
|
* @callback Client~offsetsForTimesCallback
|
|
* @param {Error} err - An error, if one occurred while connecting.
|
|
* @param {TopicPartition[]} toppars - Topic partitions with offsets filled in
|
|
*/
|
|
|
|
/**
|
|
* @typedef {object} Client~watermarkOffsets
|
|
* @property {number} high - High (newest/end) offset
|
|
* @property {number} low - Low (oldest/beginning) offset
|
|
*/
|
|
|
|
/**
|
|
* @typedef {object} Client~MetadataBroker
|
|
* @property {number} id - Broker ID
|
|
* @property {string} host - Broker host
|
|
* @property {number} port - Broker port.
|
|
*/
|
|
|
|
/**
|
|
* @typedef {object} Client~MetadataTopic
|
|
* @property {string} name - Topic name
|
|
* @property {Client~MetadataPartition[]} partitions - Array of partitions
|
|
*/
|
|
|
|
/**
|
|
* @typedef {object} Client~MetadataPartition
|
|
* @property {number} id - Partition id
|
|
* @property {number} leader - Broker ID for the partition leader
|
|
* @property {number[]} replicas - Array of replica IDs
|
|
* @property {number[]} isrs - Arrqay of ISRS ids
|
|
*/
|
|
|
|
/**
|
|
* Metadata object.
|
|
*
|
|
* This is the representation of Kafka metadata in JavaScript.
|
|
*
|
|
* @typedef {object} Client~Metadata
|
|
* @property {number} orig_broker_id - The broker ID of the original bootstrap
|
|
* broker.
|
|
* @property {string} orig_broker_name - The name of the original bootstrap
|
|
* broker.
|
|
* @property {Client~MetadataBroker[]} brokers - An array of broker objects
|
|
* @property {Client~MetadataTopic[]} topics - An array of topics.
|
|
*/
|