You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

334 lines
9.0 KiB

  1. /*
  2. * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
  3. *
  4. * Copyright (c) 2016 Blizzard Entertainment
  5. *
  6. * This software may be modified and distributed under the terms
  7. * of the MIT license. See the LICENSE.txt file for details.
  8. */
  9. var Kafka = require('../');
  10. var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
  11. describe('Transactional Producer', function () {
  12. this.timeout(5000);
  13. var TRANSACTIONS_TIMEOUT_MS = 30000;
  14. var r = Date.now() + '_' + Math.round(Math.random() * 1000);
  15. var topicIn = 'transaction_input_' + r;
  16. var topicOut = 'transaction_output_' + r;
  17. var producerTras;
  18. var consumerTrans;
  19. before(function (done) {
  20. /*
  21. prepare:
  22. transactional consumer (read from input topic)
  23. transactional producer (write to output topic)
  24. write 3 messages to input topic: A, B, C
  25. A will be skipped, B will be committed, C will be aborted
  26. */
  27. var connecting = 3;
  28. var producerInput;
  29. function connectedCb(err) {
  30. if (err) {
  31. done(err);
  32. return;
  33. }
  34. connecting--;
  35. if (connecting === 0) {
  36. producerInput.produce(topicIn, -1, Buffer.from('A'));
  37. producerInput.produce(topicIn, -1, Buffer.from('B'));
  38. producerInput.produce(topicIn, -1, Buffer.from('C'));
  39. producerInput.disconnect(function (err) {
  40. consumerTrans.subscribe([topicIn]);
  41. done(err);
  42. })
  43. }
  44. }
  45. producerInput = Kafka.Producer({
  46. 'client.id': 'kafka-test',
  47. 'metadata.broker.list': kafkaBrokerList,
  48. 'enable.idempotence': true
  49. });
  50. producerInput.setPollInterval(100);
  51. producerInput.connect({}, connectedCb);
  52. producerTras = new Kafka.Producer({
  53. 'client.id': 'kafka-test',
  54. 'metadata.broker.list': kafkaBrokerList,
  55. 'dr_cb': true,
  56. 'debug': 'all',
  57. 'transactional.id': 'noderdkafka_transactions_send_offset',
  58. 'enable.idempotence': true
  59. });
  60. producerTras.setPollInterval(100);
  61. producerTras.connect({}, connectedCb);
  62. consumerTrans = new Kafka.KafkaConsumer({
  63. 'metadata.broker.list': kafkaBrokerList,
  64. 'group.id': 'gropu_transaction_consumer',
  65. 'enable.auto.commit': false
  66. }, {
  67. 'auto.offset.reset': 'earliest',
  68. });
  69. consumerTrans.connect({}, connectedCb);
  70. });
  71. after(function (done) {
  72. let connected = 2;
  73. function execDisconnect(client) {
  74. if (!client.isConnected) {
  75. connected--;
  76. if (connected === 0) {
  77. done();
  78. }
  79. } else {
  80. client.disconnect(function() {
  81. connected--;
  82. if (connected === 0) {
  83. done();
  84. }
  85. });
  86. }
  87. }
  88. execDisconnect(producerTras);
  89. execDisconnect(consumerTrans);
  90. });
  91. it('should init transactions', function(done) {
  92. producerTras.initTransactions(TRANSACTIONS_TIMEOUT_MS, function (err) {
  93. done(err);
  94. });
  95. });
  96. it('should complete transaction', function(done) {
  97. function readMessage() {
  98. consumerTrans.consume(1, function(err, m) {
  99. if (err) {
  100. done(err);
  101. return;
  102. }
  103. if (m.length === 0) {
  104. readMessage();
  105. } else {
  106. var v = m[0].value.toString();
  107. if (v === 'A') { // skip first message
  108. readMessage();
  109. return;
  110. }
  111. if (v !== 'B') {
  112. done('Expected B');
  113. return;
  114. }
  115. producerTras.beginTransaction(function (err) {
  116. if (err) {
  117. done(err);
  118. return;
  119. }
  120. producerTras.produce(topicOut, -1, Buffer.from(v));
  121. var position = consumerTrans.position();
  122. producerTras.sendOffsetsToTransaction(position, consumerTrans, function(err) {
  123. if (err) {
  124. done(err);
  125. return;
  126. }
  127. producerTras.commitTransaction(function(err) {
  128. if (err) {
  129. done(err);
  130. return;
  131. }
  132. consumerTrans.committed(5000, function(err, tpo) {
  133. if (err) {
  134. done(err);
  135. return;
  136. }
  137. if (JSON.stringify(position) !== JSON.stringify(tpo)) {
  138. done('Committed mismatch');
  139. return;
  140. }
  141. done();
  142. });
  143. });
  144. });
  145. });
  146. }
  147. });
  148. }
  149. readMessage();
  150. });
  151. describe('abort transaction', function() {
  152. var lastConsumerTransPosition;
  153. before(function(done) {
  154. function readMessage() {
  155. consumerTrans.consume(1, function(err, m) {
  156. if (err) {
  157. done(err);
  158. return;
  159. }
  160. if (m.length === 0) {
  161. readMessage();
  162. } else {
  163. var v = m[0].value.toString();
  164. if (v !== 'C') {
  165. done('Expected C');
  166. return;
  167. }
  168. producerTras.beginTransaction(function (err) {
  169. if (err) {
  170. done(err);
  171. return;
  172. }
  173. producerTras.produce(topicOut, -1, Buffer.from(v));
  174. lastConsumerTransPosition = consumerTrans.position();
  175. producerTras.sendOffsetsToTransaction(lastConsumerTransPosition, consumerTrans, function(err) {
  176. if (err) {
  177. done(err);
  178. return;
  179. }
  180. done();
  181. });
  182. });
  183. }
  184. });
  185. }
  186. readMessage();
  187. });
  188. it ('should consume committed and uncommitted for read_uncommitted', function(done) {
  189. var allMsgs = [];
  190. var consumer = new Kafka.KafkaConsumer({
  191. 'metadata.broker.list': kafkaBrokerList,
  192. 'group.id': 'group_read_uncommitted',
  193. 'enable.auto.commit': false,
  194. 'isolation.level': 'read_uncommitted'
  195. }, {
  196. 'auto.offset.reset': 'earliest',
  197. });
  198. consumer.connect({}, function(err) {
  199. if (err) {
  200. done(err);
  201. return;
  202. }
  203. consumer.subscribe([topicOut]);
  204. consumer.consume();
  205. });
  206. consumer.on('data', function(msg) {
  207. var v = msg.value.toString();
  208. allMsgs.push(v);
  209. // both B and C must be consumed
  210. if (allMsgs.length === 2 && allMsgs[0] === 'B' && allMsgs[1] === 'C') {
  211. consumer.disconnect(function(err) {
  212. if (err) {
  213. done(err);
  214. return;
  215. }
  216. done();
  217. })
  218. }
  219. });
  220. });
  221. it ('should consume only committed for read_committed', function(done) {
  222. var allMsgs = [];
  223. var consumer = new Kafka.KafkaConsumer({
  224. 'metadata.broker.list': kafkaBrokerList,
  225. 'group.id': 'group_read_committed',
  226. 'enable.partition.eof': true,
  227. 'enable.auto.commit': false,
  228. 'isolation.level': 'read_committed'
  229. }, {
  230. 'auto.offset.reset': 'earliest',
  231. });
  232. consumer.connect({}, function(err) {
  233. if (err) {
  234. done(err);
  235. return;
  236. }
  237. consumer.subscribe([topicOut]);
  238. consumer.consume();
  239. });
  240. consumer.on('data', function(msg) {
  241. var v = msg.value.toString();
  242. allMsgs.push(v);
  243. });
  244. consumer.on('partition.eof', function(eof) {
  245. if (allMsgs.length === 1 && allMsgs[0] === 'B') {
  246. consumer.disconnect(function(err) {
  247. if (err) {
  248. done(err);
  249. return;
  250. }
  251. done();
  252. })
  253. } else {
  254. done('Expected only B');
  255. return;
  256. }
  257. });
  258. });
  259. it('should abort transaction', function(done) {
  260. producerTras.abortTransaction(function(err) {
  261. if (err) {
  262. done(err);
  263. return;
  264. }
  265. consumerTrans.committed(5000, function(err, tpo) {
  266. if (err) {
  267. done(err);
  268. return;
  269. }
  270. if (lastConsumerTransPosition[0].offset <= tpo[0].offset) {
  271. done('Committed mismatch');
  272. return;
  273. }
  274. done();
  275. });
  276. });
  277. });
  278. it('should consume only committed', function(done) {
  279. var gotB = false;
  280. var consumer = new Kafka.KafkaConsumer({
  281. 'metadata.broker.list': kafkaBrokerList,
  282. 'group.id': 'group_default',
  283. 'enable.partition.eof': true,
  284. 'enable.auto.commit': false,
  285. }, {
  286. 'auto.offset.reset': 'earliest',
  287. });
  288. consumer.connect({}, function(err) {
  289. if (err) {
  290. done(err);
  291. return;
  292. }
  293. consumer.subscribe([topicOut]);
  294. consumer.consume();
  295. });
  296. consumer.on('data', function(msg) {
  297. var v = msg.value.toString();
  298. if (v !== 'B') {
  299. done('Expected B');
  300. return;
  301. }
  302. gotB = true;
  303. });
  304. consumer.on('partition.eof', function(eof) {
  305. if (!gotB) {
  306. done('Expected B');
  307. return;
  308. }
  309. consumer.disconnect(function(err) {
  310. if (err) {
  311. done(err);
  312. return;
  313. }
  314. done();
  315. });
  316. });
  317. });
  318. });
  319. });