You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

381 lines
10 KiB

  1. /*
  2. * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
  3. *
  4. * Copyright (c) 2016 Blizzard Entertainment
  5. *
  6. * This software may be modified and distributed under the terms
  7. * of the MIT license. See the LICENSE.txt file for details.
  8. */
  9. 'use strict';
  10. module.exports = KafkaConsumerStream;
  11. var Readable = require('stream').Readable;
  12. var util = require('util');
  13. util.inherits(KafkaConsumerStream, Readable);
  14. /**
  15. * ReadableStream integrating with the Kafka Consumer.
  16. *
  17. * This class is used to read data off of Kafka in a streaming way. It is
  18. * useful if you'd like to have a way to pipe Kafka into other systems. You
  19. * should generally not make this class yourself, as it is not even exposed
  20. * as part of module.exports. Instead, you should KafkaConsumer.createReadStream.
  21. *
  22. * The stream implementation is slower than the continuous subscribe callback.
  23. * If you don't care so much about backpressure and would rather squeeze
  24. * out performance, use that method. Using the stream will ensure you read only
  25. * as fast as you write.
  26. *
  27. * The stream detects if Kafka is already connected. If it is, it will begin
  28. * reading. If it is not, it will connect and read when it is ready.
  29. *
  30. * This stream operates in objectMode. It streams {Consumer~Message}
  31. *
  32. * @param {Consumer} consumer - The Kafka Consumer object.
  33. * @param {object} options - Options to configure the stream.
  34. * @param {number} options.waitInterval - Number of ms to wait if Kafka reports
  35. * that it has timed out or that we are out of messages (right now).
  36. * @param {array} options.topics - Array of topics, or a function that parses
  37. * metadata into an array of topics
  38. * @constructor
  39. * @extends stream.Readable
  40. * @see Consumer~Message
  41. */
  42. function KafkaConsumerStream(consumer, options) {
  43. if (!(this instanceof KafkaConsumerStream)) {
  44. return new KafkaConsumerStream(consumer, options);
  45. }
  46. if (options === undefined) {
  47. options = { waitInterval: 1000 };
  48. } else if (typeof options === 'number') {
  49. options = { waitInterval: options };
  50. } else if (options === null || typeof options !== 'object') {
  51. throw new TypeError('"options" argument must be a number or an object');
  52. }
  53. var topics = options.topics;
  54. if (typeof topics === 'function') {
  55. // Just ignore the rest of the checks here
  56. } else if (!Array.isArray(topics)) {
  57. if (typeof topics !== 'string' && !(topics instanceof RegExp)) {
  58. throw new TypeError('"topics" argument must be a string, regex, or an array');
  59. } else {
  60. topics = [topics];
  61. }
  62. }
  63. options = Object.create(options);
  64. var fetchSize = options.fetchSize || 1;
  65. // Run in object mode by default.
  66. if (options.objectMode === null || options.objectMode === undefined) {
  67. options.objectMode = true;
  68. // If they did not explicitly set high water mark, and we are running
  69. // in object mode, set it to the fetch size + 2 to ensure there is room
  70. // for a standard fetch
  71. if (!options.highWaterMark) {
  72. options.highWaterMark = fetchSize + 2;
  73. }
  74. }
  75. if (options.objectMode !== true) {
  76. this._read = this._read_buffer;
  77. } else {
  78. this._read = this._read_message;
  79. }
  80. Readable.call(this, options);
  81. this.consumer = consumer;
  82. this.topics = topics;
  83. this.autoClose = options.autoClose === undefined ? true : !!options.autoClose;
  84. this.waitInterval = options.waitInterval === undefined ? 1000 : options.waitInterval;
  85. this.fetchSize = fetchSize;
  86. this.connectOptions = options.connectOptions || {};
  87. this.streamAsBatch = options.streamAsBatch || false;
  88. // Hold the messages in here
  89. this.messages = [];
  90. var self = this;
  91. this.consumer
  92. .on('unsubscribed', function() {
  93. // Invalidate the stream when we unsubscribe
  94. self.push(null);
  95. });
  96. // Call connect. Handles potentially being connected already
  97. this.connect(this.connectOptions);
  98. this.once('end', function() {
  99. if (this.autoClose) {
  100. this.destroy();
  101. }
  102. });
  103. }
  104. /**
  105. * Internal stream read method. This method reads message objects.
  106. * @param {number} size - This parameter is ignored for our cases.
  107. * @private
  108. */
  109. KafkaConsumerStream.prototype._read_message = function(size) {
  110. if (this.messages.length > 0) {
  111. return this.push(this.messages.shift());
  112. }
  113. if (!this.consumer) {
  114. // This consumer is set to `null` in the close function
  115. return;
  116. }
  117. if (!this.consumer.isConnected()) {
  118. this.consumer.once('ready', function() {
  119. // This is the way Node.js does it
  120. // https://github.com/nodejs/node/blob/master/lib/fs.js#L1733
  121. this._read(size);
  122. }.bind(this));
  123. return;
  124. }
  125. if (this.destroyed) {
  126. return;
  127. }
  128. var self = this;
  129. // If the size (number of messages) we are being advised to fetch is
  130. // greater than or equal to the fetch size, use the fetch size.
  131. // Only opt to use the size in case it is LESS than the fetch size.
  132. // Essentially, we want to use the smaller value here
  133. var fetchSize = size >= this.fetchSize ? this.fetchSize : size;
  134. this.consumer.consume(fetchSize, onread);
  135. // Retry function. Will wait up to the wait interval, with some
  136. // random noise if one is provided. Otherwise, will go immediately.
  137. function retry() {
  138. if (!self.waitInterval) {
  139. setImmediate(function() {
  140. self._read(size);
  141. });
  142. } else {
  143. setTimeout(function() {
  144. self._read(size);
  145. }, self.waitInterval * Math.random()).unref();
  146. }
  147. }
  148. function onread(err, messages) {
  149. // If there was an error we still want to emit it.
  150. // Essentially, if the user does not register an error
  151. // handler, it will still cause the stream to blow up.
  152. //
  153. // But... if one is provided, consumption will move on
  154. // as normal
  155. if (err) {
  156. self.emit('error', err);
  157. }
  158. // If there are no messages it means we reached EOF or a timeout.
  159. // Do what we used to do
  160. if (err || messages.length < 1) {
  161. // If we got an error or if there were no messages, initiate a retry
  162. retry();
  163. return;
  164. } else {
  165. if (self.streamAsBatch) {
  166. self.push(messages);
  167. } else {
  168. for (var i = 0; i < messages.length; i++) {
  169. self.messages.push(messages[i]);
  170. }
  171. // Now that we have added them all the inner messages buffer,
  172. // we can just push the most recent one
  173. self.push(self.messages.shift());
  174. }
  175. }
  176. }
  177. };
  178. /**
  179. * Internal stream read method. This method reads message buffers.
  180. * @param {number} size - This parameter is ignored for our cases.
  181. * @private
  182. */
  183. KafkaConsumerStream.prototype._read_buffer = function(size) {
  184. if (this.messages.length > 0) {
  185. return this.push(this.messages.shift());
  186. }
  187. if (!this.consumer) {
  188. // This consumer is set to `null` in the close function
  189. return;
  190. }
  191. if (!this.consumer.isConnected()) {
  192. this.consumer.once('ready', function() {
  193. // This is the way Node.js does it
  194. // https://github.com/nodejs/node/blob/master/lib/fs.js#L1733
  195. this._read(size);
  196. }.bind(this));
  197. return;
  198. }
  199. if (this.destroyed) {
  200. return;
  201. }
  202. var self = this;
  203. // If the size (number of messages) we are being advised to fetch is
  204. // greater than or equal to the fetch size, use the fetch size.
  205. // Only opt to use the size in case it is LESS than the fetch size.
  206. // Essentially, we want to use the smaller value here
  207. var fetchSize = size >= this.fetchSize ? this.fetchSize : size;
  208. this.consumer.consume(fetchSize, onread);
  209. // Retry function. Will wait up to the wait interval, with some
  210. // random noise if one is provided. Otherwise, will go immediately.
  211. function retry() {
  212. if (!self.waitInterval) {
  213. setImmediate(function() {
  214. self._read(size);
  215. });
  216. } else {
  217. setTimeout(function() {
  218. self._read(size);
  219. }, self.waitInterval * Math.random()).unref();
  220. }
  221. }
  222. function onread(err, messages) {
  223. // If there was an error we still want to emit it.
  224. // Essentially, if the user does not register an error
  225. // handler, it will still cause the stream to blow up.
  226. //
  227. // But... if one is provided, consumption will move on
  228. // as normal
  229. if (err) {
  230. self.emit('error', err);
  231. }
  232. // If there are no messages it means we reached EOF or a timeout.
  233. // Do what we used to do
  234. if (err || messages.length < 1) {
  235. // If we got an error or if there were no messages, initiate a retry
  236. retry();
  237. return;
  238. } else {
  239. if (self.streamAsBatch) {
  240. self.push(messages);
  241. } else {
  242. for (var i = 0; i < messages.length; i++) {
  243. self.messages.push(messages[i].value);
  244. }
  245. // Now that we have added them all the inner messages buffer,
  246. // we can just push the most recent one
  247. self.push(self.messages.shift());
  248. }
  249. }
  250. }
  251. };
  252. KafkaConsumerStream.prototype.connect = function(options) {
  253. var self = this;
  254. function connectCallback(err, metadata) {
  255. if (err) {
  256. self.emit('error', err);
  257. self.destroy();
  258. return;
  259. }
  260. try {
  261. // Subscribe to the topics as well so we will be ready
  262. // If this throws the stream is invalid
  263. // This is the magic part. If topics is a function, before we subscribe,
  264. // pass the metadata in
  265. if (typeof self.topics === 'function') {
  266. var topics = self.topics(metadata);
  267. self.consumer.subscribe(topics);
  268. } else {
  269. self.consumer.subscribe(self.topics);
  270. }
  271. } catch (e) {
  272. self.emit('error', e);
  273. self.destroy();
  274. return;
  275. }
  276. // start the flow of data
  277. self.read();
  278. }
  279. if (!this.consumer.isConnected()) {
  280. self.consumer.connect(options, connectCallback);
  281. } else {
  282. // Immediately call the connect callback
  283. setImmediate(function() {
  284. connectCallback(null, self.consumer._metadata);
  285. });
  286. }
  287. };
  288. KafkaConsumerStream.prototype.destroy = function() {
  289. if (this.destroyed) {
  290. return;
  291. }
  292. this.destroyed = true;
  293. this.close();
  294. };
  295. KafkaConsumerStream.prototype.close = function(cb) {
  296. var self = this;
  297. if (cb) {
  298. this.once('close', cb);
  299. }
  300. if (!self.consumer._isConnecting && !self.consumer._isConnected) {
  301. // If we aren't even connected just exit. We are done.
  302. close();
  303. return;
  304. }
  305. if (self.consumer._isConnecting) {
  306. self.consumer.once('ready', function() {
  307. // Don't pass the CB because it has already been passed.
  308. self.close();
  309. });
  310. return;
  311. }
  312. if (self.consumer._isConnected) {
  313. self.consumer.unsubscribe();
  314. self.consumer.disconnect(function() {
  315. close();
  316. });
  317. }
  318. function close() {
  319. self.emit('close');
  320. }
  321. };