You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

656 lines
20 KiB

  1. /*
  2. * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
  3. *
  4. * Copyright (c) 2016 Blizzard Entertainment
  5. *
  6. * This software may be modified and distributed under the terms
  7. * of the MIT license. See the LICENSE.txt file for details.
  8. */
  9. 'use strict';
  10. module.exports = KafkaConsumer;
  11. var Client = require('./client');
  12. var util = require('util');
  13. var Kafka = require('../librdkafka');
  14. var KafkaConsumerStream = require('./kafka-consumer-stream');
  15. var LibrdKafkaError = require('./error');
  16. var TopicPartition = require('./topic-partition');
  17. var shallowCopy = require('./util').shallowCopy;
  18. var DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY = 500;
  19. var DEFAULT_CONSUME_TIME_OUT = 1000;
  20. util.inherits(KafkaConsumer, Client);
  21. /**
  22. * KafkaConsumer class for reading messages from Kafka
  23. *
  24. * This is the main entry point for reading data from Kafka. You
  25. * configure this like you do any other client, with a global
  26. * configuration and default topic configuration.
  27. *
  28. * Once you instantiate this object, connecting will open a socket.
  29. * Data will not be read until you tell the consumer what topics
  30. * you want to read from.
  31. *
  32. * @param {object} conf - Key value pairs to configure the consumer
  33. * @param {object} topicConf - Key value pairs to create a default
  34. * topic configuration
  35. * @extends Client
  36. * @constructor
  37. */
  38. function KafkaConsumer(conf, topicConf) {
  39. if (!(this instanceof KafkaConsumer)) {
  40. return new KafkaConsumer(conf, topicConf);
  41. }
  42. conf = shallowCopy(conf);
  43. topicConf = shallowCopy(topicConf);
  44. var onRebalance = conf.rebalance_cb;
  45. var self = this;
  46. // If rebalance is undefined we don't want any part of this
  47. if (onRebalance && typeof onRebalance === 'boolean') {
  48. conf.rebalance_cb = function(err, assignment) {
  49. // Create the librdkafka error
  50. err = LibrdKafkaError.create(err);
  51. // Emit the event
  52. self.emit('rebalance', err, assignment);
  53. // That's it
  54. try {
  55. if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
  56. self.assign(assignment);
  57. } else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
  58. self.unassign();
  59. }
  60. } catch (e) {
  61. // Ignore exceptions if we are not connected
  62. if (self.isConnected()) {
  63. self.emit('rebalance.error', e);
  64. }
  65. }
  66. };
  67. } else if (onRebalance && typeof onRebalance === 'function') {
  68. /*
  69. * Once this is opted in to, that's it. It's going to manually rebalance
  70. * forever. There is no way to unset config values in librdkafka, just
  71. * a way to override them.
  72. */
  73. conf.rebalance_cb = function(err, assignment) {
  74. // Create the librdkafka error
  75. err = err ? LibrdKafkaError.create(err) : undefined;
  76. self.emit('rebalance', err, assignment);
  77. onRebalance.call(self, err, assignment);
  78. };
  79. }
  80. // Same treatment for offset_commit_cb
  81. var onOffsetCommit = conf.offset_commit_cb;
  82. if (onOffsetCommit && typeof onOffsetCommit === 'boolean') {
  83. conf.offset_commit_cb = function(err, offsets) {
  84. if (err) {
  85. err = LibrdKafkaError.create(err);
  86. }
  87. // Emit the event
  88. self.emit('offset.commit', err, offsets);
  89. };
  90. } else if (onOffsetCommit && typeof onOffsetCommit === 'function') {
  91. conf.offset_commit_cb = function(err, offsets) {
  92. if (err) {
  93. err = LibrdKafkaError.create(err);
  94. }
  95. // Emit the event
  96. self.emit('offset.commit', err, offsets);
  97. onOffsetCommit.call(self, err, offsets);
  98. };
  99. }
  100. /**
  101. * KafkaConsumer message.
  102. *
  103. * This is the representation of a message read from Kafka.
  104. *
  105. * @typedef {object} KafkaConsumer~Message
  106. * @property {buffer} value - the message buffer from Kafka.
  107. * @property {string} topic - the topic name
  108. * @property {number} partition - the partition on the topic the
  109. * message was on
  110. * @property {number} offset - the offset of the message
  111. * @property {string} key - the message key
  112. * @property {number} size - message size, in bytes.
  113. * @property {number} timestamp - message timestamp
  114. */
  115. Client.call(this, conf, Kafka.KafkaConsumer, topicConf);
  116. this.globalConfig = conf;
  117. this.topicConfig = topicConf;
  118. this._consumeTimeout = DEFAULT_CONSUME_TIME_OUT;
  119. this._consumeLoopTimeoutDelay = DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY;
  120. }
  121. /**
  122. * Set the default consume timeout provided to c++land
  123. * @param {number} timeoutMs - number of milliseconds to wait for a message to be fetched
  124. */
  125. KafkaConsumer.prototype.setDefaultConsumeTimeout = function(timeoutMs) {
  126. this._consumeTimeout = timeoutMs;
  127. };
  128. /**
  129. * Set the default sleep delay for the next consume loop after the previous one has timed out.
  130. * @param {number} intervalMs - number of milliseconds to sleep after a message fetch has timed out
  131. */
  132. KafkaConsumer.prototype.setDefaultConsumeLoopTimeoutDelay = function(intervalMs) {
  133. this._consumeLoopTimeoutDelay = intervalMs;
  134. };
  135. /**
  136. * Get a stream representation of this KafkaConsumer
  137. *
  138. * @see TopicReadable
  139. * @example
  140. * var consumerStream = Kafka.KafkaConsumer.createReadStream({
  141. * 'metadata.broker.list': 'localhost:9092',
  142. * 'group.id': 'librd-test',
  143. * 'socket.keepalive.enable': true,
  144. * 'enable.auto.commit': false
  145. * }, {}, { topics: [ 'test' ] });
  146. *
  147. * @param {object} conf - Key value pairs to configure the consumer
  148. * @param {object} topicConf - Key value pairs to create a default
  149. * topic configuration
  150. * @param {object} streamOptions - Stream options
  151. * @param {array} streamOptions.topics - Array of topics to subscribe to.
  152. * @return {KafkaConsumerStream} - Readable stream that receives messages
  153. * when new ones become available.
  154. */
  155. KafkaConsumer.createReadStream = function(conf, topicConf, streamOptions) {
  156. var consumer = new KafkaConsumer(conf, topicConf);
  157. return new KafkaConsumerStream(consumer, streamOptions);
  158. };
  159. /**
  160. * Get a current list of the committed offsets per topic partition
  161. *
  162. * Returns an array of objects in the form of a topic partition list
  163. *
  164. * @param {TopicPartition[]} toppars - Topic partition list to query committed
  165. * offsets for. Defaults to the current assignment
  166. * @param {number} timeout - Number of ms to block before calling back
  167. * and erroring
  168. * @param {Function} cb - Callback method to execute when finished or timed
  169. * out
  170. * @return {Client} - Returns itself
  171. */
  172. KafkaConsumer.prototype.committed = function(toppars, timeout, cb) {
  173. // We want to be backwards compatible here, and the previous version of
  174. // this function took two arguments
  175. // If CB is not set, shift to backwards compatible version
  176. if (!cb) {
  177. cb = arguments[1];
  178. timeout = arguments[0];
  179. toppars = this.assignments();
  180. } else {
  181. toppars = toppars || this.assignments();
  182. }
  183. var self = this;
  184. this._client.committed(toppars, timeout, function(err, topicPartitions) {
  185. if (err) {
  186. cb(LibrdKafkaError.create(err));
  187. return;
  188. }
  189. cb(null, topicPartitions);
  190. });
  191. return this;
  192. };
  193. /**
  194. * Seek consumer for topic+partition to offset which is either an absolute or
  195. * logical offset.
  196. *
  197. * Does not return anything, as it is asynchronous. There are special cases
  198. * with the timeout parameter. The consumer must have previously been assigned
  199. * to topics and partitions that seek seeks to seek.
  200. *
  201. * @example
  202. * consumer.seek({ topic: 'topic', partition: 0, offset: 1000 }, 0, function(err) {
  203. * if (err) {
  204. *
  205. * }
  206. * });
  207. *
  208. * @param {TopicPartition} toppar - Topic partition to seek.
  209. * @param {number} timeout - Number of ms to block before calling back
  210. * and erroring. If the parameter is null or 0, the call will not wait
  211. * for the seek to be performed. Essentially, it will happen in the background
  212. * with no notification
  213. * @param {Function} cb - Callback method to execute when finished or timed
  214. * out. If the seek timed out, the internal state of the consumer is unknown.
  215. * @return {Client} - Returns itself
  216. */
  217. KafkaConsumer.prototype.seek = function(toppar, timeout, cb) {
  218. var self = this;
  219. this._client.seek(TopicPartition.create(toppar), timeout, function(err) {
  220. if (err) {
  221. cb(LibrdKafkaError.create(err));
  222. return;
  223. }
  224. cb();
  225. });
  226. return this;
  227. };
  228. /**
  229. * Assign the consumer specific partitions and topics
  230. *
  231. * @param {array} assignments - Assignments array. Should contain
  232. * objects with topic and partition set.
  233. * @return {Client} - Returns itself
  234. */
  235. KafkaConsumer.prototype.assign = function(assignments) {
  236. this._client.assign(TopicPartition.map(assignments));
  237. return this;
  238. };
  239. /**
  240. * Unassign the consumer from its assigned partitions and topics.
  241. *
  242. * @return {Client} - Returns itself
  243. */
  244. KafkaConsumer.prototype.unassign = function() {
  245. this._client.unassign();
  246. return this;
  247. };
  248. /**
  249. * Get the assignments for the consumer
  250. *
  251. * @return {array} assignments - Array of topic partitions
  252. */
  253. KafkaConsumer.prototype.assignments = function() {
  254. return this._errorWrap(this._client.assignments(), true);
  255. };
  256. /**
  257. * Subscribe to an array of topics (synchronously).
  258. *
  259. * This operation is pretty fast because it just sets
  260. * an assignment in librdkafka. This is the recommended
  261. * way to deal with subscriptions in a situation where you
  262. * will be reading across multiple files or as part of
  263. * your configure-time initialization.
  264. *
  265. * This is also a good way to do it for streams.
  266. *
  267. * @param {array} topics - An array of topics to listen to
  268. * @throws - Throws when an error code came back from native land
  269. * @return {KafkaConsumer} - Returns itself.
  270. */
  271. KafkaConsumer.prototype.subscribe = function(topics) {
  272. // Will throw if it is a bad error.
  273. this._errorWrap(this._client.subscribe(topics));
  274. this.emit('subscribed', topics);
  275. return this;
  276. };
  277. /**
  278. * Get the current subscription of the KafkaConsumer
  279. *
  280. * Get a list of subscribed topics. Should generally match what you
  281. * passed on via subscribe
  282. *
  283. * @see KafkaConsumer::subscribe
  284. * @throws - Throws when an error code came back from native land
  285. * @return {array} - Array of strings to show the current assignment
  286. */
  287. KafkaConsumer.prototype.subscription = function() {
  288. return this._errorWrap(this._client.subscription(), true);
  289. };
  290. /**
  291. * Get the current offset position of the KafkaConsumer
  292. *
  293. * Returns a list of RdKafka::TopicPartitions on success, or throws
  294. * an error on failure
  295. *
  296. * @param {TopicPartition[]} toppars - List of topic partitions to query
  297. * position for. Defaults to the current assignment
  298. * @throws - Throws when an error code came back from native land
  299. * @return {array} - TopicPartition array. Each item is an object with
  300. * an offset, topic, and partition
  301. */
  302. KafkaConsumer.prototype.position = function(toppars) {
  303. if (!toppars) {
  304. toppars = this.assignments();
  305. }
  306. return this._errorWrap(this._client.position(toppars), true);
  307. };
  308. /**
  309. * Unsubscribe from all currently subscribed topics
  310. *
  311. * Before you subscribe to new topics you need to unsubscribe
  312. * from the old ones, if there is an active subscription.
  313. * Otherwise, you will get an error because there is an
  314. * existing subscription.
  315. *
  316. * @throws - Throws when an error code comes back from native land
  317. * @return {KafkaConsumer} - Returns itself.
  318. */
  319. KafkaConsumer.prototype.unsubscribe = function() {
  320. this._errorWrap(this._client.unsubscribe());
  321. this.emit('unsubscribed', []);
  322. // Backwards compatible change
  323. this.emit('unsubscribe', []);
  324. return this;
  325. };
  326. /**
  327. * Read a number of messages from Kafka.
  328. *
  329. * This method is similar to the main one, except that it reads a number
  330. * of messages before calling back. This may get better performance than
  331. * reading a single message each time in stream implementations.
  332. *
  333. * This will keep going until it gets ERR__PARTITION_EOF or ERR__TIMED_OUT
  334. * so the array may not be the same size you ask for. The size is advisory,
  335. * but we will not exceed it.
  336. *
  337. * @param {number} size - Number of messages to read
  338. * @param {KafkaConsumer~readCallback} cb - Callback to return when work is done.
  339. *//**
  340. * Read messages from Kafka as fast as possible
  341. *
  342. * This method keeps a background thread running to fetch the messages
  343. * as quickly as it can, sleeping only in between EOF and broker timeouts.
  344. *
  345. * Use this to get the maximum read performance if you don't care about the
  346. * stream backpressure.
  347. * @param {KafkaConsumer~readCallback} cb - Callback to return when a message
  348. * is fetched.
  349. */
  350. KafkaConsumer.prototype.consume = function(number, cb) {
  351. var timeoutMs = this._consumeTimeout || DEFAULT_CONSUME_TIME_OUT;
  352. var self = this;
  353. if ((number && typeof number === 'number') || (number && cb)) {
  354. if (cb === undefined) {
  355. cb = function() {};
  356. } else if (typeof cb !== 'function') {
  357. throw new TypeError('Callback must be a function');
  358. }
  359. this._consumeNum(timeoutMs, number, cb);
  360. } else {
  361. // See https://github.com/Blizzard/node-rdkafka/issues/220
  362. // Docs specify just a callback can be provided but really we needed
  363. // a fallback to the number argument
  364. // @deprecated
  365. if (cb === undefined) {
  366. if (typeof number === 'function') {
  367. cb = number;
  368. } else {
  369. cb = function() {};
  370. }
  371. }
  372. this._consumeLoop(timeoutMs, cb);
  373. }
  374. };
  375. /**
  376. * Open a background thread and keep getting messages as fast
  377. * as we can. Should not be called directly, and instead should
  378. * be called using consume.
  379. *
  380. * @private
  381. * @see consume
  382. */
  383. KafkaConsumer.prototype._consumeLoop = function(timeoutMs, cb) {
  384. var self = this;
  385. var retryReadInterval = this._consumeLoopTimeoutDelay;
  386. self._client.consumeLoop(timeoutMs, retryReadInterval, function readCallback(err, message, eofEvent, warning) {
  387. if (err) {
  388. // A few different types of errors here
  389. // but the two we do NOT care about are
  390. // time outs at least now
  391. // Broker no more messages will also not come here
  392. cb(LibrdKafkaError.create(err));
  393. } else if (eofEvent) {
  394. self.emit('partition.eof', eofEvent);
  395. } else if (warning) {
  396. self.emit('warning', LibrdKafkaError.create(warning));
  397. } else {
  398. /**
  399. * Data event. called whenever a message is received.
  400. *
  401. * @event KafkaConsumer#data
  402. * @type {KafkaConsumer~Message}
  403. */
  404. self.emit('data', message);
  405. cb(err, message);
  406. }
  407. });
  408. };
  409. /**
  410. * Consume a number of messages and wrap in a try catch with
  411. * proper error reporting. Should not be called directly,
  412. * and instead should be called using consume.
  413. *
  414. * @private
  415. * @see consume
  416. */
  417. KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) {
  418. var self = this;
  419. this._client.consume(timeoutMs, numMessages, function(err, messages, eofEvents) {
  420. if (err) {
  421. err = LibrdKafkaError.create(err);
  422. if (cb) {
  423. cb(err);
  424. }
  425. return;
  426. }
  427. var currentEofEventsIndex = 0;
  428. function emitEofEventsFor(messageIndex) {
  429. while (currentEofEventsIndex < eofEvents.length && eofEvents[currentEofEventsIndex].messageIndex === messageIndex) {
  430. delete eofEvents[currentEofEventsIndex].messageIndex;
  431. self.emit('partition.eof', eofEvents[currentEofEventsIndex])
  432. ++currentEofEventsIndex;
  433. }
  434. }
  435. emitEofEventsFor(-1);
  436. for (var i = 0; i < messages.length; i++) {
  437. self.emit('data', messages[i]);
  438. emitEofEventsFor(i);
  439. }
  440. emitEofEventsFor(messages.length);
  441. if (cb) {
  442. cb(null, messages);
  443. }
  444. });
  445. };
  446. /**
  447. * This callback returns the message read from Kafka.
  448. *
  449. * @callback KafkaConsumer~readCallback
  450. * @param {LibrdKafkaError} err - An error, if one occurred while reading
  451. * the data.
  452. * @param {KafkaConsumer~Message} message
  453. */
  454. /**
  455. * Commit a topic partition or all topic partitions that have been read
  456. *
  457. * If you provide a topic partition, it will commit that. Otherwise,
  458. * it will commit all read offsets for all topic partitions.
  459. *
  460. * @param {object|array|null} - Topic partition object to commit, list of topic
  461. * partitions, or null if you want to commit all read offsets.
  462. * @throws When commit returns a non 0 error code
  463. *
  464. * @return {KafkaConsumer} - returns itself.
  465. */
  466. KafkaConsumer.prototype.commit = function(topicPartition) {
  467. this._errorWrap(this._client.commit(topicPartition), true);
  468. return this;
  469. };
  470. /**
  471. * Commit a message
  472. *
  473. * This is basically a convenience method to map commit properly. We need to
  474. * add one to the offset in this case
  475. *
  476. * @param {object} - Message object to commit
  477. * @throws When commit returns a non 0 error code
  478. *
  479. * @return {KafkaConsumer} - returns itself.
  480. */
  481. KafkaConsumer.prototype.commitMessage = function(msg) {
  482. var topicPartition = {
  483. topic: msg.topic,
  484. partition: msg.partition,
  485. offset: msg.offset + 1
  486. };
  487. this._errorWrap(this._client.commit(topicPartition), true);
  488. return this;
  489. };
  490. /**
  491. * Commit a topic partition (or all topic partitions) synchronously
  492. *
  493. * @param {object|array|null} - Topic partition object to commit, list of topic
  494. * partitions, or null if you want to commit all read offsets.
  495. * @throws {LibrdKafkaError} - if the commit fails
  496. *
  497. * @return {KafkaConsumer} - returns itself.
  498. */
  499. KafkaConsumer.prototype.commitSync = function(topicPartition) {
  500. this._errorWrap(this._client.commitSync(topicPartition), true);
  501. return this;
  502. };
  503. /**
  504. * Commit a message synchronously
  505. *
  506. * @see KafkaConsumer#commitMessageSync
  507. * @param {object} msg - A message object to commit.
  508. *
  509. * @throws {LibrdKafkaError} - if the commit fails
  510. *
  511. * @return {KafkaConsumer} - returns itself.
  512. */
  513. KafkaConsumer.prototype.commitMessageSync = function(msg) {
  514. var topicPartition = {
  515. topic: msg.topic,
  516. partition: msg.partition,
  517. offset: msg.offset + 1
  518. };
  519. this._errorWrap(this._client.commitSync(topicPartition), true);
  520. return this;
  521. };
  522. /**
  523. * Get last known offsets from the client.
  524. *
  525. * The low offset is updated periodically (if statistics.interval.ms is set)
  526. * while the high offset is updated on each fetched message set from the
  527. * broker.
  528. *
  529. * If there is no cached offset (either low or high, or both), then this will
  530. * throw an error.
  531. *
  532. * @param {string} topic - Topic to recieve offsets from.
  533. * @param {number} partition - Partition of the provided topic to recieve offsets from
  534. * @return {Client~watermarkOffsets} - Returns an object with a high and low property, specifying
  535. * the high and low offsets for the topic partition
  536. * @throws {LibrdKafkaError} - Throws when there is no offset stored
  537. */
  538. KafkaConsumer.prototype.getWatermarkOffsets = function(topic, partition) {
  539. if (!this.isConnected()) {
  540. throw new Error('Client is disconnected');
  541. }
  542. return this._errorWrap(this._client.getWatermarkOffsets(topic, partition), true);
  543. };
  544. /**
  545. * Store offset for topic partition.
  546. *
  547. * The offset will be committed (written) to the offset store according to the auto commit interval,
  548. * if auto commit is on, or next manual offset if not.
  549. *
  550. * enable.auto.offset.store must be set to false to use this API,
  551. *
  552. * @see https://github.com/edenhill/librdkafka/blob/261371dc0edef4cea9e58a076c8e8aa7dc50d452/src-cpp/rdkafkacpp.h#L1702
  553. *
  554. * @param {Array.<TopicPartition>} topicPartitions - Topic partitions with offsets to store offsets for.
  555. * @throws {LibrdKafkaError} - Throws when there is no offset stored
  556. */
  557. KafkaConsumer.prototype.offsetsStore = function(topicPartitions) {
  558. if (!this.isConnected()) {
  559. throw new Error('Client is disconnected');
  560. }
  561. return this._errorWrap(this._client.offsetsStore(topicPartitions), true);
  562. };
  563. /**
  564. * Resume consumption for the provided list of partitions.
  565. *
  566. * @param {Array.<TopicPartition>} topicPartitions - List of topic partitions to resume consumption on.
  567. * @throws {LibrdKafkaError} - Throws when there is no offset stored
  568. */
  569. KafkaConsumer.prototype.resume = function(topicPartitions) {
  570. if (!this.isConnected()) {
  571. throw new Error('Client is disconnected');
  572. }
  573. return this._errorWrap(this._client.resume(topicPartitions), true);
  574. };
  575. /**
  576. * Pause producing or consumption for the provided list of partitions.
  577. *
  578. * @param {Array.<TopicPartition>} topicPartitions - List of topics to pause consumption on.
  579. * @throws {LibrdKafkaError} - Throws when there is no offset stored
  580. */
  581. KafkaConsumer.prototype.pause = function(topicPartitions) {
  582. if (!this.isConnected()) {
  583. throw new Error('Client is disconnected');
  584. }
  585. return this._errorWrap(this._client.pause(topicPartitions), true);
  586. };