You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

307 lines
7.7 KiB

  1. /*
  2. * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
  3. *
  4. * Copyright (c) 2016 Blizzard Entertainment
  5. *
  6. * This software may be modified and distributed under the terms
  7. * of the MIT license. See the LICENSE.txt file for details.
  8. */
  9. 'use strict';
  10. module.exports = ProducerStream;
  11. var Writable = require('stream').Writable;
  12. var util = require('util');
  13. var ErrorCode = require('./error').codes;
  14. util.inherits(ProducerStream, Writable);
  15. /**
  16. * Writable stream integrating with the Kafka Producer.
  17. *
  18. * This class is used to write data to Kafka in a streaming way. It takes
  19. * buffers of data and puts them into the appropriate Kafka topic. If you need
  20. * finer control over partitions or keys, this is probably not the class for
  21. * you. In that situation just use the Producer itself.
  22. *
  23. * The stream detects if Kafka is already connected. You can safely begin
  24. * writing right away.
  25. *
  26. * This stream does not operate in Object mode and can only be given buffers.
  27. *
  28. * @param {Producer} producer - The Kafka Producer object.
  29. * @param {array} topics - Array of topics
  30. * @param {object} options - Topic configuration.
  31. * @constructor
  32. * @extends stream.Writable
  33. */
  34. function ProducerStream(producer, options) {
  35. if (!(this instanceof ProducerStream)) {
  36. return new ProducerStream(producer, options);
  37. }
  38. if (options === undefined) {
  39. options = {};
  40. } else if (typeof options === 'string') {
  41. options = { encoding: options };
  42. } else if (options === null || typeof options !== 'object') {
  43. throw new TypeError('"streamOptions" argument must be a string or an object');
  44. }
  45. if (!options.objectMode && !options.topic) {
  46. throw new TypeError('ProducerStreams not using objectMode must provide a topic to produce to.');
  47. }
  48. if (options.objectMode !== true) {
  49. this._write = this._write_buffer;
  50. } else {
  51. this._write = this._write_message;
  52. }
  53. Writable.call(this, options);
  54. this.producer = producer;
  55. this.topicName = options.topic;
  56. this.autoClose = options.autoClose === undefined ? true : !!options.autoClose;
  57. this.connectOptions = options.connectOptions || {};
  58. this.producer.setPollInterval(options.pollInterval || 1000);
  59. if (options.encoding) {
  60. this.setDefaultEncoding(options.encoding);
  61. }
  62. // Connect to the producer. Unless we are already connected
  63. if (!this.producer.isConnected()) {
  64. this.connect(this.connectOptions);
  65. }
  66. var self = this;
  67. this.once('finish', function() {
  68. if (this.autoClose) {
  69. this.close();
  70. }
  71. });
  72. }
  73. ProducerStream.prototype.connect = function(options) {
  74. this.producer.connect(options, function(err, data) {
  75. if (err) {
  76. this.emit('error', err);
  77. return;
  78. }
  79. }.bind(this));
  80. };
  81. /**
  82. * Internal stream write method for ProducerStream when writing buffers.
  83. *
  84. * This method should never be called externally. It has some recursion to
  85. * handle cases where the producer is not yet connected.
  86. *
  87. * @param {buffer} chunk - Chunk to write.
  88. * @param {string} encoding - Encoding for the buffer
  89. * @param {Function} cb - Callback to call when the stream is done processing
  90. * the data.
  91. * @private
  92. * @see https://github.com/nodejs/node/blob/master/lib/fs.js#L1901
  93. */
  94. ProducerStream.prototype._write_buffer = function(data, encoding, cb) {
  95. if (!(data instanceof Buffer)) {
  96. this.emit('error', new Error('Invalid data. Can only produce buffers'));
  97. return;
  98. }
  99. var self = this;
  100. if (!this.producer.isConnected()) {
  101. this.producer.once('ready', function() {
  102. self._write(data, encoding, cb);
  103. });
  104. return;
  105. }
  106. try {
  107. this.producer.produce(self.topicName, null, data, null);
  108. setImmediate(cb);
  109. } catch (e) {
  110. if (ErrorCode.ERR__QUEUE_FULL === e.code) {
  111. // Poll for good measure
  112. self.producer.poll();
  113. // Just delay this thing a bit and pass the params
  114. // backpressure will get exerted this way.
  115. setTimeout(function() {
  116. self._write(data, encoding, cb);
  117. }, 500);
  118. } else {
  119. if (self.autoClose) {
  120. self.close();
  121. }
  122. setImmediate(function() {
  123. cb(e);
  124. });
  125. }
  126. }
  127. };
  128. /**
  129. * Internal stream write method for ProducerStream when writing objects.
  130. *
  131. * This method should never be called externally. It has some recursion to
  132. * handle cases where the producer is not yet connected.
  133. *
  134. * @param {object} message - Message to write.
  135. * @param {string} encoding - Encoding for the buffer
  136. * @param {Function} cb - Callback to call when the stream is done processing
  137. * the data.
  138. * @private
  139. * @see https://github.com/nodejs/node/blob/master/lib/fs.js#L1901
  140. */
  141. ProducerStream.prototype._write_message = function(message, encoding, cb) {
  142. var self = this;
  143. if (!this.producer.isConnected()) {
  144. this.producer.once('ready', function() {
  145. self._write(message, encoding, cb);
  146. });
  147. return;
  148. }
  149. try {
  150. this.producer.produce(message.topic, message.partition, message.value, message.key, message.timestamp, message.opaque, message.headers);
  151. setImmediate(cb);
  152. } catch (e) {
  153. if (ErrorCode.ERR__QUEUE_FULL === e.code) {
  154. // Poll for good measure
  155. self.producer.poll();
  156. // Just delay this thing a bit and pass the params
  157. // backpressure will get exerted this way.
  158. setTimeout(function() {
  159. self._write(message, encoding, cb);
  160. }, 500);
  161. } else {
  162. if (self.autoClose) {
  163. self.close();
  164. }
  165. setImmediate(function() {
  166. cb(e);
  167. });
  168. }
  169. }
  170. };
  171. function writev(producer, topic, chunks, cb) {
  172. // @todo maybe a produce batch method?
  173. var doneCount = 0;
  174. var err = null;
  175. var chunk = null;
  176. function maybeDone(e) {
  177. if (e) {
  178. err = e;
  179. }
  180. doneCount ++;
  181. if (doneCount === chunks.length) {
  182. cb(err);
  183. }
  184. }
  185. function retry(restChunks) {
  186. // Poll for good measure
  187. producer.poll();
  188. // Just delay this thing a bit and pass the params
  189. // backpressure will get exerted this way.
  190. setTimeout(function() {
  191. writev(producer, topic, restChunks, cb);
  192. }, 500);
  193. }
  194. for (var i = 0; i < chunks.length; i++) {
  195. chunk = chunks[i];
  196. try {
  197. if (Buffer.isBuffer(chunk)) {
  198. producer.produce(topic, null, chunk, null);
  199. } else {
  200. producer.produce(chunk.topic, chunk.partition, chunk.value, chunk.key, chunk.timestamp, chunk.opaque, chunk.headers);
  201. }
  202. maybeDone();
  203. } catch (e) {
  204. if (ErrorCode.ERR__QUEUE_FULL === e.code) {
  205. retry(chunks.slice(i));
  206. } else {
  207. cb(e);
  208. }
  209. break;
  210. }
  211. }
  212. }
  213. ProducerStream.prototype._writev = function(data, cb) {
  214. if (!this.producer.isConnected()) {
  215. this.once('ready', function() {
  216. this._writev(data, cb);
  217. });
  218. return;
  219. }
  220. var self = this;
  221. var len = data.length;
  222. var chunks = new Array(len);
  223. var size = 0;
  224. for (var i = 0; i < len; i++) {
  225. var chunk = data[i].chunk;
  226. chunks[i] = chunk;
  227. size += chunk.length;
  228. }
  229. writev(this.producer, this.topicName, chunks, function(err) {
  230. if (err) {
  231. self.close();
  232. cb(err);
  233. return;
  234. }
  235. cb();
  236. });
  237. };
  238. ProducerStream.prototype.close = function(cb) {
  239. var self = this;
  240. if (cb) {
  241. this.once('close', cb);
  242. }
  243. // Use interval variables in here
  244. if (self.producer._isConnected) {
  245. self.producer.disconnect(function() {
  246. // Previously this set the producer to null. I'm not sure there is any benefit
  247. // to that other than I guess helping flag it for GC?
  248. // https://github.com/Blizzard/node-rdkafka/issues/344
  249. close();
  250. });
  251. } else if (self.producer._isConnecting){
  252. self.producer.once('ready', function() {
  253. // Don't pass CB this time because it has already been passed
  254. self.close();
  255. });
  256. } else {
  257. setImmediate(close);
  258. }
  259. function close() {
  260. self.emit('close');
  261. }
  262. };