You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

375 lines
11 KiB

  1. /*
  2. * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
  3. *
  4. * Copyright (c) 2016 Blizzard Entertainment
  5. *
  6. * This software may be modified and distributed under the terms
  7. * of the MIT license. See the LICENSE.txt file for details.
  8. */
  9. module.exports = Producer;
  10. var Client = require('./client');
  11. var util = require('util');
  12. var Kafka = require('../librdkafka.js');
  13. var ProducerStream = require('./producer-stream');
  14. var LibrdKafkaError = require('./error');
  15. var shallowCopy = require('./util').shallowCopy;
  16. util.inherits(Producer, Client);
  17. /**
  18. * Producer class for sending messages to Kafka
  19. *
  20. * This is the main entry point for writing data to Kafka. You
  21. * configure this like you do any other client, with a global
  22. * configuration and default topic configuration.
  23. *
  24. * Once you instantiate this object, you need to connect to it first.
  25. * This allows you to get the metadata and make sure the connection
  26. * can be made before you depend on it. After that, problems with
  27. * the connection will by brought down by using poll, which automatically
  28. * runs when a transaction is made on the object.
  29. *
  30. * @param {object} conf - Key value pairs to configure the producer
  31. * @param {object} topicConf - Key value pairs to create a default
  32. * topic configuration
  33. * @extends Client
  34. * @constructor
  35. */
  36. function Producer(conf, topicConf) {
  37. if (!(this instanceof Producer)) {
  38. return new Producer(conf, topicConf);
  39. }
  40. conf = shallowCopy(conf);
  41. topicConf = shallowCopy(topicConf);
  42. /**
  43. * Producer message. This is sent to the wrapper, not received from it
  44. *
  45. * @typedef {object} Producer~Message
  46. * @property {string|buffer} message - The buffer to send to Kafka.
  47. * @property {Topic} topic - The Kafka topic to produce to.
  48. * @property {number} partition - The partition to produce to. Defaults to
  49. * the partitioner
  50. * @property {string} key - The key string to use for the message.
  51. * @see Consumer~Message
  52. */
  53. var gTopic = conf.topic || false;
  54. var gPart = conf.partition || null;
  55. var dr_cb = conf.dr_cb || null;
  56. var dr_msg_cb = conf.dr_msg_cb || null;
  57. // delete keys we don't want to pass on
  58. delete conf.topic;
  59. delete conf.partition;
  60. delete conf.dr_cb;
  61. delete conf.dr_msg_cb;
  62. // client is an initialized consumer object
  63. // @see NodeKafka::Producer::Init
  64. Client.call(this, conf, Kafka.Producer, topicConf);
  65. // Delete these keys after saving them in vars
  66. this.globalConfig = conf;
  67. this.topicConfig = topicConf;
  68. this.defaultTopic = gTopic || null;
  69. this.defaultPartition = gPart == null ? -1 : gPart;
  70. this.sentMessages = 0;
  71. this.pollInterval = undefined;
  72. if (dr_msg_cb || dr_cb) {
  73. this._cb_configs.event.delivery_cb = function(err, report) {
  74. if (err) {
  75. err = LibrdKafkaError.create(err);
  76. }
  77. this.emit('delivery-report', err, report);
  78. }.bind(this);
  79. this._cb_configs.event.delivery_cb.dr_msg_cb = !!dr_msg_cb;
  80. if (typeof dr_cb === 'function') {
  81. this.on('delivery-report', dr_cb);
  82. }
  83. }
  84. }
  85. /**
  86. * Produce a message to Kafka synchronously.
  87. *
  88. * This is the method mainly used in this class. Use it to produce
  89. * a message to Kafka.
  90. *
  91. * When this is sent off, there is no guarantee it is delivered. If you need
  92. * guaranteed delivery, change your *acks* settings, or use delivery reports.
  93. *
  94. * @param {string} topic - The topic name to produce to.
  95. * @param {number|null} partition - The partition number to produce to.
  96. * @param {Buffer|null} message - The message to produce.
  97. * @param {string} key - The key associated with the message.
  98. * @param {number|null} timestamp - Timestamp to send with the message.
  99. * @param {object} opaque - An object you want passed along with this message, if provided.
  100. * @param {object} headers - A list of custom key value pairs that provide message metadata.
  101. * @throws {LibrdKafkaError} - Throws a librdkafka error if it failed.
  102. * @return {boolean} - returns an error if it failed, or true if not
  103. * @see Producer#produce
  104. */
  105. Producer.prototype.produce = function(topic, partition, message, key, timestamp, opaque, headers) {
  106. if (!this._isConnected) {
  107. throw new Error('Producer not connected');
  108. }
  109. // I have removed support for using a topic object. It is going to be removed
  110. // from librdkafka soon, and it causes issues with shutting down
  111. if (!topic || typeof topic !== 'string') {
  112. throw new TypeError('"topic" must be a string');
  113. }
  114. this.sentMessages++;
  115. partition = partition == null ? this.defaultPartition : partition;
  116. return this._errorWrap(
  117. this._client.produce(topic, partition, message, key, timestamp, opaque, headers));
  118. };
  119. /**
  120. * Create a write stream interface for a producer.
  121. *
  122. * This stream does not run in object mode. It only takes buffers of data.
  123. *
  124. * @param {object} conf - Key value pairs to configure the producer
  125. * @param {object} topicConf - Key value pairs to create a default
  126. * topic configuration
  127. * @param {object} streamOptions - Stream options
  128. * @return {ProducerStream} - returns the write stream for writing to Kafka.
  129. */
  130. Producer.createWriteStream = function(conf, topicConf, streamOptions) {
  131. var producer = new Producer(conf, topicConf);
  132. return new ProducerStream(producer, streamOptions);
  133. };
  134. /**
  135. * Poll for events
  136. *
  137. * We need to run poll in order to learn about new events that have occurred.
  138. * This is no longer done automatically when we produce, so we need to run
  139. * it manually, or set the producer to automatically poll.
  140. *
  141. * @return {Producer} - returns itself.
  142. */
  143. Producer.prototype.poll = function() {
  144. if (!this._isConnected) {
  145. throw new Error('Producer not connected');
  146. }
  147. this._client.poll();
  148. return this;
  149. };
  150. /**
  151. * Set automatic polling for events.
  152. *
  153. * We need to run poll in order to learn about new events that have occurred.
  154. * If you would like this done on an interval with disconnects and reconnections
  155. * managed, you can do it here
  156. *
  157. * @param {number} interval - Interval, in milliseconds, to poll
  158. *
  159. * @return {Producer} - returns itself.
  160. */
  161. Producer.prototype.setPollInterval = function(interval) {
  162. // If we already have a poll interval we need to stop it
  163. if (this.pollInterval) {
  164. clearInterval(this.pollInterval);
  165. this.pollInterval = undefined;
  166. }
  167. if (interval === 0) {
  168. // If the interval was set to 0, bail out. We don't want to process this.
  169. // If there was an interval previously set, it has been removed.
  170. return;
  171. }
  172. var self = this;
  173. // Now we want to make sure we are connected.
  174. if (!this._isConnected) {
  175. // If we are not, execute this once the connection goes through.
  176. this.once('ready', function() {
  177. self.setPollInterval(interval);
  178. });
  179. return;
  180. }
  181. // We know we are connected at this point.
  182. // Unref this interval
  183. this.pollInterval = setInterval(function() {
  184. try {
  185. self.poll();
  186. } catch (e) {
  187. // We can probably ignore errors here as far as broadcasting.
  188. // Disconnection issues will get handled below
  189. }
  190. }, interval).unref();
  191. // Handle disconnections
  192. this.once('disconnected', function() {
  193. // Just rerun this function with interval 0. If any
  194. // poll interval is set, this will remove it
  195. self.setPollInterval(0);
  196. });
  197. return this;
  198. };
  199. /**
  200. * Flush the producer
  201. *
  202. * Flush everything on the internal librdkafka producer buffer. Do this before
  203. * disconnects usually
  204. *
  205. * @param {number} timeout - Number of milliseconds to try to flush before giving up.
  206. * @param {function} callback - Callback to fire when the flush is done.
  207. *
  208. * @return {Producer} - returns itself.
  209. */
  210. Producer.prototype.flush = function(timeout, callback) {
  211. if (!this._isConnected) {
  212. throw new Error('Producer not connected');
  213. }
  214. if (timeout === undefined || timeout === null) {
  215. timeout = 500;
  216. }
  217. this._client.flush(timeout, function(err) {
  218. if (err) {
  219. err = LibrdKafkaError.create(err);
  220. }
  221. if (callback) {
  222. callback(err);
  223. }
  224. });
  225. return this;
  226. };
  227. /**
  228. * Save the base disconnect method here so we can overwrite it and add a flush
  229. */
  230. Producer.prototype._disconnect = Producer.prototype.disconnect;
  231. /**
  232. * Disconnect the producer
  233. *
  234. * Flush everything on the internal librdkafka producer buffer. Then disconnect
  235. *
  236. * @param {number} timeout - Number of milliseconds to try to flush before giving up, defaults to 5 seconds.
  237. * @param {function} cb - The callback to fire when
  238. */
  239. Producer.prototype.disconnect = function(timeout, cb) {
  240. var self = this;
  241. var timeoutInterval = 5000;
  242. if (typeof timeout === 'function') {
  243. cb = timeout;
  244. } else {
  245. timeoutInterval = timeout;
  246. }
  247. this.flush(timeoutInterval, function() {
  248. self._disconnect(cb);
  249. });
  250. };
  251. /**
  252. * Init a transaction.
  253. *
  254. * Initialize transactions, this is only performed once per transactional producer.
  255. *
  256. * @param {number} timeout - Number of milliseconds to try to initialize before giving up, defaults to 5 seconds.
  257. * @param {function} cb - Callback to return when operation is completed
  258. * @return {Producer} - returns itself.
  259. */
  260. Producer.prototype.initTransactions = function(timeout, cb) {
  261. if (typeof timeout === 'function') {
  262. cb = timeout;
  263. timeout = 5000;
  264. }
  265. this._client.initTransactions(timeout, function(err) {
  266. cb(err ? LibrdKafkaError.create(err) : err);
  267. });
  268. };
  269. /**
  270. * Begin a transaction.
  271. *
  272. * 'initTransaction' must have been called successfully (once) before this function is called.
  273. *
  274. * @return {Producer} - returns itself.
  275. */
  276. Producer.prototype.beginTransaction = function(cb) {
  277. this._client.beginTransaction(function(err) {
  278. cb(err ? LibrdKafkaError.create(err) : err);
  279. });
  280. };
  281. /**
  282. * Commit the current transaction (as started with 'beginTransaction').
  283. *
  284. * @param {number} timeout - Number of milliseconds to try to commit before giving up, defaults to 5 seconds
  285. * @param {function} cb - Callback to return when operation is completed
  286. * @return {Producer} - returns itself.
  287. */
  288. Producer.prototype.commitTransaction = function(timeout, cb) {
  289. if (typeof timeout === 'function') {
  290. cb = timeout;
  291. timeout = 5000;
  292. }
  293. this._client.commitTransaction(timeout, function(err) {
  294. cb(err ? LibrdKafkaError.create(err) : err);
  295. });
  296. };
  297. /**
  298. * Aborts the ongoing transaction.
  299. *
  300. * @param {number} timeout - Number of milliseconds to try to abort, defaults to 5 seconds
  301. * @param {function} cb - Callback to return when operation is completed
  302. * @return {Producer} - returns itself.
  303. */
  304. Producer.prototype.abortTransaction = function(timeout, cb) {
  305. if (typeof timeout === 'function') {
  306. cb = timeout;
  307. timeout = 5000;
  308. }
  309. this._client.abortTransaction(timeout, function(err) {
  310. cb(err ? LibrdKafkaError.create(err) : err);
  311. });
  312. };
  313. /**
  314. * Send the current offsets of the consumer to the ongoing transaction.
  315. *
  316. * @param {number} offsets - Offsets to send as part of the next commit
  317. * @param {Consumer} consumer - An instance of the consumer
  318. * @param {number} timeout - Number of milliseconds to try to send offsets, defaults to 5 seconds
  319. * @param {function} cb - Callback to return when operation is completed
  320. * @return {Producer} - returns itself.
  321. */
  322. Producer.prototype.sendOffsetsToTransaction = function(offsets, consumer, timeout, cb) {
  323. if (typeof timeout === 'function') {
  324. cb = timeout;
  325. timeout = 5000;
  326. }
  327. this._client.sendOffsetsToTransaction(offsets, consumer.getClient(), timeout, function(err) {
  328. cb(err ? LibrdKafkaError.create(err) : err);
  329. });
  330. };