/* * node-rdkafka - Node.js wrapper for RdKafka C/C++ library * * Copyright (c) 2016 Blizzard Entertainment * * This software may be modified and distributed under the terms * of the MIT license. See the LICENSE.txt file for details. */ var Kafka = require('../'); var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092'; describe('Transactional Producer', function () { this.timeout(5000); var TRANSACTIONS_TIMEOUT_MS = 30000; var r = Date.now() + '_' + Math.round(Math.random() * 1000); var topicIn = 'transaction_input_' + r; var topicOut = 'transaction_output_' + r; var producerTras; var consumerTrans; before(function (done) { /* prepare: transactional consumer (read from input topic) transactional producer (write to output topic) write 3 messages to input topic: A, B, C A will be skipped, B will be committed, C will be aborted */ var connecting = 3; var producerInput; function connectedCb(err) { if (err) { done(err); return; } connecting--; if (connecting === 0) { producerInput.produce(topicIn, -1, Buffer.from('A')); producerInput.produce(topicIn, -1, Buffer.from('B')); producerInput.produce(topicIn, -1, Buffer.from('C')); producerInput.disconnect(function (err) { consumerTrans.subscribe([topicIn]); done(err); }) } } producerInput = Kafka.Producer({ 'client.id': 'kafka-test', 'metadata.broker.list': kafkaBrokerList, 'enable.idempotence': true }); producerInput.setPollInterval(100); producerInput.connect({}, connectedCb); producerTras = new Kafka.Producer({ 'client.id': 'kafka-test', 'metadata.broker.list': kafkaBrokerList, 'dr_cb': true, 'debug': 'all', 'transactional.id': 'noderdkafka_transactions_send_offset', 'enable.idempotence': true }); producerTras.setPollInterval(100); producerTras.connect({}, connectedCb); consumerTrans = new Kafka.KafkaConsumer({ 'metadata.broker.list': kafkaBrokerList, 'group.id': 'gropu_transaction_consumer', 'enable.auto.commit': false }, { 'auto.offset.reset': 'earliest', }); consumerTrans.connect({}, connectedCb); }); after(function (done) { let connected = 2; function execDisconnect(client) { if (!client.isConnected) { connected--; if (connected === 0) { done(); } } else { client.disconnect(function() { connected--; if (connected === 0) { done(); } }); } } execDisconnect(producerTras); execDisconnect(consumerTrans); }); it('should init transactions', function(done) { producerTras.initTransactions(TRANSACTIONS_TIMEOUT_MS, function (err) { done(err); }); }); it('should complete transaction', function(done) { function readMessage() { consumerTrans.consume(1, function(err, m) { if (err) { done(err); return; } if (m.length === 0) { readMessage(); } else { var v = m[0].value.toString(); if (v === 'A') { // skip first message readMessage(); return; } if (v !== 'B') { done('Expected B'); return; } producerTras.beginTransaction(function (err) { if (err) { done(err); return; } producerTras.produce(topicOut, -1, Buffer.from(v)); var position = consumerTrans.position(); producerTras.sendOffsetsToTransaction(position, consumerTrans, function(err) { if (err) { done(err); return; } producerTras.commitTransaction(function(err) { if (err) { done(err); return; } consumerTrans.committed(5000, function(err, tpo) { if (err) { done(err); return; } if (JSON.stringify(position) !== JSON.stringify(tpo)) { done('Committed mismatch'); return; } done(); }); }); }); }); } }); } readMessage(); }); describe('abort transaction', function() { var lastConsumerTransPosition; before(function(done) { function readMessage() { consumerTrans.consume(1, function(err, m) { if (err) { done(err); return; } if (m.length === 0) { readMessage(); } else { var v = m[0].value.toString(); if (v !== 'C') { done('Expected C'); return; } producerTras.beginTransaction(function (err) { if (err) { done(err); return; } producerTras.produce(topicOut, -1, Buffer.from(v)); lastConsumerTransPosition = consumerTrans.position(); producerTras.sendOffsetsToTransaction(lastConsumerTransPosition, consumerTrans, function(err) { if (err) { done(err); return; } done(); }); }); } }); } readMessage(); }); it ('should consume committed and uncommitted for read_uncommitted', function(done) { var allMsgs = []; var consumer = new Kafka.KafkaConsumer({ 'metadata.broker.list': kafkaBrokerList, 'group.id': 'group_read_uncommitted', 'enable.auto.commit': false, 'isolation.level': 'read_uncommitted' }, { 'auto.offset.reset': 'earliest', }); consumer.connect({}, function(err) { if (err) { done(err); return; } consumer.subscribe([topicOut]); consumer.consume(); }); consumer.on('data', function(msg) { var v = msg.value.toString(); allMsgs.push(v); // both B and C must be consumed if (allMsgs.length === 2 && allMsgs[0] === 'B' && allMsgs[1] === 'C') { consumer.disconnect(function(err) { if (err) { done(err); return; } done(); }) } }); }); it ('should consume only committed for read_committed', function(done) { var allMsgs = []; var consumer = new Kafka.KafkaConsumer({ 'metadata.broker.list': kafkaBrokerList, 'group.id': 'group_read_committed', 'enable.partition.eof': true, 'enable.auto.commit': false, 'isolation.level': 'read_committed' }, { 'auto.offset.reset': 'earliest', }); consumer.connect({}, function(err) { if (err) { done(err); return; } consumer.subscribe([topicOut]); consumer.consume(); }); consumer.on('data', function(msg) { var v = msg.value.toString(); allMsgs.push(v); }); consumer.on('partition.eof', function(eof) { if (allMsgs.length === 1 && allMsgs[0] === 'B') { consumer.disconnect(function(err) { if (err) { done(err); return; } done(); }) } else { done('Expected only B'); return; } }); }); it('should abort transaction', function(done) { producerTras.abortTransaction(function(err) { if (err) { done(err); return; } consumerTrans.committed(5000, function(err, tpo) { if (err) { done(err); return; } if (lastConsumerTransPosition[0].offset <= tpo[0].offset) { done('Committed mismatch'); return; } done(); }); }); }); it('should consume only committed', function(done) { var gotB = false; var consumer = new Kafka.KafkaConsumer({ 'metadata.broker.list': kafkaBrokerList, 'group.id': 'group_default', 'enable.partition.eof': true, 'enable.auto.commit': false, }, { 'auto.offset.reset': 'earliest', }); consumer.connect({}, function(err) { if (err) { done(err); return; } consumer.subscribe([topicOut]); consumer.consume(); }); consumer.on('data', function(msg) { var v = msg.value.toString(); if (v !== 'B') { done('Expected B'); return; } gotB = true; }); consumer.on('partition.eof', function(eof) { if (!gotB) { done('Expected B'); return; } consumer.disconnect(function(err) { if (err) { done(err); return; } done(); }); }); }); }); });