You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

554 lines
15 KiB

  1. /*
  2. * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
  3. *
  4. * Copyright (c) 2016 Blizzard Entertainment
  5. *
  6. * This software may be modified and distributed under the terms
  7. * of the MIT license. See the LICENSE.txt file for details.
  8. */
  9. module.exports = Client;
  10. var Emitter = require('events').EventEmitter;
  11. var util = require('util');
  12. var Kafka = require('../librdkafka.js');
  13. var assert = require('assert');
  14. var LibrdKafkaError = require('./error');
  15. util.inherits(Client, Emitter);
  16. /**
  17. * Base class for Consumer and Producer
  18. *
  19. * This should not be created independently, but rather is
  20. * the base class on which both producer and consumer
  21. * get their common functionality.
  22. *
  23. * @param {object} globalConf - Global configuration in key value pairs.
  24. * @param {function} SubClientType - The function representing the subclient
  25. * type. In C++ land this needs to be a class that inherits from Connection.
  26. * @param {object} topicConf - Topic configuration in key value pairs
  27. * @constructor
  28. * @extends Emitter
  29. */
  30. function Client(globalConf, SubClientType, topicConf) {
  31. if (!(this instanceof Client)) {
  32. return new Client(globalConf, SubClientType, topicConf);
  33. }
  34. Emitter.call(this);
  35. // This superclass must be initialized with the Kafka.{Producer,Consumer}
  36. // @example var client = new Client({}, Kafka.Producer);
  37. // remember this is a superclass so this will get taken care of in
  38. // the producer and consumer main wrappers
  39. var no_event_cb = globalConf.event_cb === false;
  40. topicConf = topicConf || {};
  41. // delete this because librdkafka will complain since this particular
  42. // key is a real conf value
  43. delete globalConf.event_cb;
  44. this._client = new SubClientType(globalConf, topicConf);
  45. var extractFunctions = function(obj) {
  46. obj = obj || {};
  47. var obj2 = {};
  48. for (var p in obj) {
  49. if (typeof obj[p] === "function") {
  50. obj2[p] = obj[p];
  51. }
  52. }
  53. return obj2;
  54. }
  55. this._cb_configs = {
  56. global: extractFunctions(globalConf),
  57. topic: extractFunctions(topicConf),
  58. event: {},
  59. }
  60. if (!no_event_cb) {
  61. this._cb_configs.event.event_cb = function(eventType, eventData) {
  62. switch (eventType) {
  63. case 'error':
  64. this.emit('event.error', LibrdKafkaError.create(eventData));
  65. break;
  66. case 'stats':
  67. this.emit('event.stats', eventData);
  68. break;
  69. case 'log':
  70. this.emit('event.log', eventData);
  71. break;
  72. default:
  73. this.emit('event.event', eventData);
  74. this.emit('event.' + eventType, eventData);
  75. }
  76. }.bind(this);
  77. }
  78. this.metrics = {};
  79. this._isConnected = false;
  80. this.errorCounter = 0;
  81. /**
  82. * Metadata object. Starts out empty but will be filled with information after
  83. * the initial connect.
  84. *
  85. * @type {Client~Metadata}
  86. */
  87. this._metadata = {};
  88. var self = this;
  89. this.on('ready', function(info) {
  90. self.metrics.connectionOpened = Date.now();
  91. self.name = info.name;
  92. })
  93. .on('disconnected', function() {
  94. // reset metrics
  95. self.metrics = {};
  96. self._isConnected = false;
  97. // keep the metadata. it still may be useful
  98. })
  99. .on('event.error', function(err) {
  100. self.lastError = err;
  101. ++self.errorCounter;
  102. });
  103. }
  104. /**
  105. * Connect to the broker and receive its metadata.
  106. *
  107. * Connects to a broker by establishing the client and fetches its metadata.
  108. *
  109. * @param {object} metadataOptions - Options to be sent to the metadata.
  110. * @param {string} metadataOptions.topic - Topic to fetch metadata for. Empty string is treated as empty.
  111. * @param {boolean} metadataOptions.allTopics - Fetch metadata for all topics, not just the ones we know about.
  112. * @param {int} metadataOptions.timeout - The timeout, in ms, to allow for fetching metadata. Defaults to 30000ms
  113. * @param {Client~connectionCallback} cb - Callback that indicates we are
  114. * done connecting.
  115. * @return {Client} - Returns itself.
  116. */
  117. Client.prototype.connect = function(metadataOptions, cb) {
  118. var self = this;
  119. var next = function(err, data) {
  120. self._isConnecting = false;
  121. if (cb) {
  122. cb(err, data);
  123. }
  124. };
  125. if (this._isConnected) {
  126. setImmediate(next);
  127. return self;
  128. }
  129. if (this._isConnecting) {
  130. this.once('ready', function() {
  131. next(null, this._metadata);
  132. });
  133. return self;
  134. }
  135. this._isConnecting = true;
  136. var fail = function(err) {
  137. var callbackCalled = false;
  138. var t;
  139. if (self._isConnected) {
  140. self._isConnected = false;
  141. self._client.disconnect(function() {
  142. if (callbackCalled) {
  143. return;
  144. }
  145. clearTimeout(t);
  146. callbackCalled = true;
  147. next(err); return;
  148. });
  149. // don't take too long. this is a failure, after all
  150. t = setTimeout(function() {
  151. if (callbackCalled) {
  152. return;
  153. }
  154. callbackCalled = true;
  155. next(err); return;
  156. }, 10000).unref();
  157. self.emit('connection.failure', err, self.metrics);
  158. } else {
  159. next(err);
  160. }
  161. };
  162. this._client.configureCallbacks(true, this._cb_configs);
  163. this._client.connect(function(err, info) {
  164. if (err) {
  165. fail(LibrdKafkaError.create(err)); return;
  166. }
  167. self._isConnected = true;
  168. // Otherwise we are successful
  169. self.getMetadata(metadataOptions || {}, function(err, metadata) {
  170. if (err) {
  171. // We are connected so we need to disconnect
  172. fail(LibrdKafkaError.create(err)); return;
  173. }
  174. self._isConnecting = false;
  175. // We got the metadata otherwise. It is set according to above param
  176. // Set it here as well so subsequent ready callbacks
  177. // can check it
  178. self._isConnected = true;
  179. /**
  180. * Ready event. Called when the Client connects successfully
  181. *
  182. * @event Client#ready
  183. * @type {object}
  184. * @property {string} name - the name of the broker.
  185. */
  186. self.emit('ready', info, metadata);
  187. next(null, metadata); return;
  188. });
  189. });
  190. return self;
  191. };
  192. /**
  193. * Get the native Kafka client.
  194. *
  195. * You probably shouldn't use this, but if you want to execute methods directly
  196. * on the c++ wrapper you can do it here.
  197. *
  198. * @see connection.cc
  199. * @return {Connection} - The native Kafka client.
  200. */
  201. Client.prototype.getClient = function() {
  202. return this._client;
  203. };
  204. /**
  205. * Find out how long we have been connected to Kafka.
  206. *
  207. * @return {number} - Milliseconds since the connection has been established.
  208. */
  209. Client.prototype.connectedTime = function() {
  210. if (!this.isConnected()) {
  211. return 0;
  212. }
  213. return Date.now() - this.metrics.connectionOpened;
  214. };
  215. /**
  216. * Whether or not we are connected to Kafka.
  217. *
  218. * @return {boolean} - Whether we are connected.
  219. */
  220. Client.prototype.isConnected = function() {
  221. return !!(this._isConnected && this._client);
  222. };
  223. /**
  224. * Get the last error emitted if it exists.
  225. *
  226. * @return {LibrdKafkaError} - Returns the LibrdKafkaError or null if
  227. * one hasn't been thrown.
  228. */
  229. Client.prototype.getLastError = function() {
  230. return this.lastError || null;
  231. };
  232. /**
  233. * Disconnect from the Kafka client.
  234. *
  235. * This method will disconnect us from Kafka unless we are already in a
  236. * disconnecting state. Use this when you're done reading or producing messages
  237. * on a given client.
  238. *
  239. * It will also emit the disconnected event.
  240. *
  241. * @fires Client#disconnected
  242. * @return {function} - Callback to call when disconnection is complete.
  243. */
  244. Client.prototype.disconnect = function(cb) {
  245. var self = this;
  246. if (!this._isDisconnecting && this._client) {
  247. this._isDisconnecting = true;
  248. this._client.disconnect(function() {
  249. // this take 5000 milliseconds. Librdkafka needs to make sure the memory
  250. // has been cleaned up before we delete things. @see RdKafka::wait_destroyed
  251. self._client.configureCallbacks(false, self._cb_configs);
  252. // Broadcast metrics. Gives people one last chance to do something with them
  253. self._isDisconnecting = false;
  254. /**
  255. * Disconnect event. Called after disconnection is finished.
  256. *
  257. * @event Client#disconnected
  258. * @type {object}
  259. * @property {date} connectionOpened - when the connection was opened.
  260. */
  261. var metricsCopy = Object.assign({}, self.metrics);
  262. self.emit('disconnected', metricsCopy);
  263. if (cb) {
  264. cb(null, metricsCopy);
  265. }
  266. });
  267. }
  268. return self;
  269. };
  270. /**
  271. * Get client metadata.
  272. *
  273. * Note: using a <code>metadataOptions.topic</code> parameter has a potential side-effect.
  274. * A Topic object will be created, if it did not exist yet, with default options
  275. * and it will be cached by librdkafka.
  276. *
  277. * A subsequent call to create the topic object with specific options (e.g. <code>acks</code>) will return
  278. * the previous instance and the specific options will be silently ignored.
  279. *
  280. * To avoid this side effect, the topic object can be created with the expected options before requesting metadata,
  281. * or the metadata request can be performed for all topics (by omitting <code>metadataOptions.topic</code>).
  282. *
  283. * @param {object} metadataOptions - Metadata options to pass to the client.
  284. * @param {string} metadataOptions.topic - Topic string for which to fetch
  285. * metadata
  286. * @param {number} metadataOptions.timeout - Max time, in ms, to try to fetch
  287. * metadata before timing out. Defaults to 3000.
  288. * @param {Client~metadataCallback} cb - Callback to fire with the metadata.
  289. */
  290. Client.prototype.getMetadata = function(metadataOptions, cb) {
  291. if (!this.isConnected()) {
  292. return cb(new Error('Client is disconnected'));
  293. }
  294. var self = this;
  295. this._client.getMetadata(metadataOptions || {}, function(err, metadata) {
  296. if (err) {
  297. if (cb) {
  298. cb(LibrdKafkaError.create(err));
  299. }
  300. return;
  301. }
  302. // No error otherwise
  303. self._metadata = metadata;
  304. if (cb) {
  305. cb(null, metadata);
  306. }
  307. });
  308. };
  309. /**
  310. * Query offsets from the broker.
  311. *
  312. * This function makes a call to the broker to get the current low (oldest/beginning)
  313. * and high (newest/end) offsets for a topic partition.
  314. *
  315. * @param {string} topic - Topic to recieve offsets from.
  316. * @param {number} partition - Partition of the provided topic to recieve offsets from
  317. * @param {number} timeout - Number of ms to wait to recieve a response.
  318. * @param {Client~watermarkOffsetsCallback} cb - Callback to fire with the offsets.
  319. */
  320. Client.prototype.queryWatermarkOffsets = function(topic, partition, timeout, cb) {
  321. if (!this.isConnected()) {
  322. if (cb) {
  323. return cb(new Error('Client is disconnected'));
  324. } else {
  325. return;
  326. }
  327. }
  328. var self = this;
  329. if (typeof timeout === 'function') {
  330. cb = timeout;
  331. timeout = 1000;
  332. }
  333. if (!timeout) {
  334. timeout = 1000;
  335. }
  336. this._client.queryWatermarkOffsets(topic, partition, timeout, function(err, offsets) {
  337. if (err) {
  338. if (cb) {
  339. cb(LibrdKafkaError.create(err));
  340. }
  341. return;
  342. }
  343. if (cb) {
  344. cb(null, offsets);
  345. }
  346. });
  347. };
  348. /**
  349. * Query offsets for times from the broker.
  350. *
  351. * This function makes a call to the broker to get the offsets for times specified.
  352. *
  353. * @param {TopicPartition[]} toppars - Array of topic partitions. The offset in these
  354. * should instead refer to a timestamp you want
  355. * offsets for
  356. * @param {number} timeout - Number of ms to wait to recieve a response.
  357. * @param {Client~offsetsForTimesCallback} cb - Callback to fire with the filled in offsets.
  358. */
  359. Client.prototype.offsetsForTimes = function(toppars, timeout, cb) {
  360. if (!this.isConnected()) {
  361. if (cb) {
  362. return cb(new Error('Client is disconnected'));
  363. } else {
  364. return;
  365. }
  366. }
  367. var self = this;
  368. if (typeof timeout === 'function') {
  369. cb = timeout;
  370. timeout = 1000;
  371. }
  372. if (!timeout) {
  373. timeout = 1000;
  374. }
  375. this._client.offsetsForTimes(toppars, timeout, function(err, toppars) {
  376. if (err) {
  377. if (cb) {
  378. cb(LibrdKafkaError.create(err));
  379. }
  380. return;
  381. }
  382. if (cb) {
  383. cb(null, toppars);
  384. }
  385. });
  386. };
  387. /**
  388. * Wrap a potential RdKafka error.
  389. *
  390. * This internal method is meant to take a return value
  391. * from a function that returns an RdKafka error code, throw if it
  392. * is an error (Making it a proper librdkafka error object), or
  393. * return the appropriate value otherwise.
  394. *
  395. * It is intended to be used in a return statement,
  396. *
  397. * @private
  398. * @param {number} errorCode - Error code returned from a native method
  399. * @param {bool} intIsError - If specified true, any non-number return type will be classified as a success
  400. * @return {boolean} - Returns true or the method return value unless it throws.
  401. */
  402. Client.prototype._errorWrap = function(errorCode, intIsError) {
  403. var returnValue = true;
  404. if (intIsError) {
  405. returnValue = errorCode;
  406. errorCode = typeof errorCode === 'number' ? errorCode : 0;
  407. }
  408. if (errorCode !== LibrdKafkaError.codes.ERR_NO_ERROR) {
  409. var e = LibrdKafkaError.create(errorCode);
  410. throw e;
  411. }
  412. return returnValue;
  413. };
  414. /**
  415. * This callback is used to pass metadata or an error after a successful
  416. * connection
  417. *
  418. * @callback Client~connectionCallback
  419. * @param {Error} err - An error, if one occurred while connecting.
  420. * @param {Client~Metadata} metadata - Metadata object.
  421. */
  422. /**
  423. * This callback is used to pass offsets or an error after a successful
  424. * query
  425. *
  426. * @callback Client~watermarkOffsetsCallback
  427. * @param {Error} err - An error, if one occurred while connecting.
  428. * @param {Client~watermarkOffsets} offsets - Watermark offsets
  429. */
  430. /**
  431. * This callback is used to pass toppars or an error after a successful
  432. * times query
  433. *
  434. * @callback Client~offsetsForTimesCallback
  435. * @param {Error} err - An error, if one occurred while connecting.
  436. * @param {TopicPartition[]} toppars - Topic partitions with offsets filled in
  437. */
  438. /**
  439. * @typedef {object} Client~watermarkOffsets
  440. * @property {number} high - High (newest/end) offset
  441. * @property {number} low - Low (oldest/beginning) offset
  442. */
  443. /**
  444. * @typedef {object} Client~MetadataBroker
  445. * @property {number} id - Broker ID
  446. * @property {string} host - Broker host
  447. * @property {number} port - Broker port.
  448. */
  449. /**
  450. * @typedef {object} Client~MetadataTopic
  451. * @property {string} name - Topic name
  452. * @property {Client~MetadataPartition[]} partitions - Array of partitions
  453. */
  454. /**
  455. * @typedef {object} Client~MetadataPartition
  456. * @property {number} id - Partition id
  457. * @property {number} leader - Broker ID for the partition leader
  458. * @property {number[]} replicas - Array of replica IDs
  459. * @property {number[]} isrs - Arrqay of ISRS ids
  460. */
  461. /**
  462. * Metadata object.
  463. *
  464. * This is the representation of Kafka metadata in JavaScript.
  465. *
  466. * @typedef {object} Client~Metadata
  467. * @property {number} orig_broker_id - The broker ID of the original bootstrap
  468. * broker.
  469. * @property {string} orig_broker_name - The name of the original bootstrap
  470. * broker.
  471. * @property {Client~MetadataBroker[]} brokers - An array of broker objects
  472. * @property {Client~MetadataTopic[]} topics - An array of topics.
  473. */