|
|
- /*
- * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
- *
- * Copyright (c) 2016 Blizzard Entertainment
- *
- * This software may be modified and distributed under the terms
- * of the MIT license. See the LICENSE.txt file for details.
- */
-
- var Kafka = require('../');
-
- var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
-
- describe('Transactional Producer', function () {
- this.timeout(5000);
- var TRANSACTIONS_TIMEOUT_MS = 30000;
- var r = Date.now() + '_' + Math.round(Math.random() * 1000);
- var topicIn = 'transaction_input_' + r;
- var topicOut = 'transaction_output_' + r;
-
- var producerTras;
- var consumerTrans;
-
- before(function (done) {
- /*
- prepare:
- transactional consumer (read from input topic)
- transactional producer (write to output topic)
- write 3 messages to input topic: A, B, C
- A will be skipped, B will be committed, C will be aborted
- */
- var connecting = 3;
- var producerInput;
- function connectedCb(err) {
- if (err) {
- done(err);
- return;
- }
- connecting--;
- if (connecting === 0) {
- producerInput.produce(topicIn, -1, Buffer.from('A'));
- producerInput.produce(topicIn, -1, Buffer.from('B'));
- producerInput.produce(topicIn, -1, Buffer.from('C'));
- producerInput.disconnect(function (err) {
- consumerTrans.subscribe([topicIn]);
- done(err);
- })
- }
- }
- producerInput = Kafka.Producer({
- 'client.id': 'kafka-test',
- 'metadata.broker.list': kafkaBrokerList,
- 'enable.idempotence': true
- });
- producerInput.setPollInterval(100);
- producerInput.connect({}, connectedCb);
-
- producerTras = new Kafka.Producer({
- 'client.id': 'kafka-test',
- 'metadata.broker.list': kafkaBrokerList,
- 'dr_cb': true,
- 'debug': 'all',
- 'transactional.id': 'noderdkafka_transactions_send_offset',
- 'enable.idempotence': true
- });
- producerTras.setPollInterval(100);
- producerTras.connect({}, connectedCb);
-
- consumerTrans = new Kafka.KafkaConsumer({
- 'metadata.broker.list': kafkaBrokerList,
- 'group.id': 'gropu_transaction_consumer',
- 'enable.auto.commit': false
- }, {
- 'auto.offset.reset': 'earliest',
- });
- consumerTrans.connect({}, connectedCb);
- });
-
- after(function (done) {
- let connected = 2;
- function execDisconnect(client) {
- if (!client.isConnected) {
- connected--;
- if (connected === 0) {
- done();
- }
- } else {
- client.disconnect(function() {
- connected--;
- if (connected === 0) {
- done();
- }
- });
- }
- }
- execDisconnect(producerTras);
- execDisconnect(consumerTrans);
- });
-
- it('should init transactions', function(done) {
- producerTras.initTransactions(TRANSACTIONS_TIMEOUT_MS, function (err) {
- done(err);
- });
- });
-
- it('should complete transaction', function(done) {
- function readMessage() {
- consumerTrans.consume(1, function(err, m) {
- if (err) {
- done(err);
- return;
- }
- if (m.length === 0) {
- readMessage();
- } else {
- var v = m[0].value.toString();
- if (v === 'A') { // skip first message
- readMessage();
- return;
- }
- if (v !== 'B') {
- done('Expected B');
- return;
- }
- producerTras.beginTransaction(function (err) {
- if (err) {
- done(err);
- return;
- }
- producerTras.produce(topicOut, -1, Buffer.from(v));
- var position = consumerTrans.position();
- producerTras.sendOffsetsToTransaction(position, consumerTrans, function(err) {
- if (err) {
- done(err);
- return;
- }
- producerTras.commitTransaction(function(err) {
- if (err) {
- done(err);
- return;
- }
- consumerTrans.committed(5000, function(err, tpo) {
- if (err) {
- done(err);
- return;
- }
- if (JSON.stringify(position) !== JSON.stringify(tpo)) {
- done('Committed mismatch');
- return;
- }
- done();
- });
- });
- });
- });
- }
- });
- }
- readMessage();
- });
-
- describe('abort transaction', function() {
- var lastConsumerTransPosition;
- before(function(done) {
- function readMessage() {
- consumerTrans.consume(1, function(err, m) {
- if (err) {
- done(err);
- return;
- }
- if (m.length === 0) {
- readMessage();
- } else {
- var v = m[0].value.toString();
- if (v !== 'C') {
- done('Expected C');
- return;
- }
- producerTras.beginTransaction(function (err) {
- if (err) {
- done(err);
- return;
- }
- producerTras.produce(topicOut, -1, Buffer.from(v));
- lastConsumerTransPosition = consumerTrans.position();
- producerTras.sendOffsetsToTransaction(lastConsumerTransPosition, consumerTrans, function(err) {
- if (err) {
- done(err);
- return;
- }
- done();
- });
- });
- }
- });
- }
- readMessage();
- });
-
- it ('should consume committed and uncommitted for read_uncommitted', function(done) {
- var allMsgs = [];
- var consumer = new Kafka.KafkaConsumer({
- 'metadata.broker.list': kafkaBrokerList,
- 'group.id': 'group_read_uncommitted',
- 'enable.auto.commit': false,
- 'isolation.level': 'read_uncommitted'
- }, {
- 'auto.offset.reset': 'earliest',
- });
- consumer.connect({}, function(err) {
- if (err) {
- done(err);
- return;
- }
- consumer.subscribe([topicOut]);
- consumer.consume();
- });
- consumer.on('data', function(msg) {
- var v = msg.value.toString();
- allMsgs.push(v);
- // both B and C must be consumed
- if (allMsgs.length === 2 && allMsgs[0] === 'B' && allMsgs[1] === 'C') {
- consumer.disconnect(function(err) {
- if (err) {
- done(err);
- return;
- }
- done();
- })
- }
- });
- });
-
- it ('should consume only committed for read_committed', function(done) {
- var allMsgs = [];
- var consumer = new Kafka.KafkaConsumer({
- 'metadata.broker.list': kafkaBrokerList,
- 'group.id': 'group_read_committed',
- 'enable.partition.eof': true,
- 'enable.auto.commit': false,
- 'isolation.level': 'read_committed'
- }, {
- 'auto.offset.reset': 'earliest',
- });
- consumer.connect({}, function(err) {
- if (err) {
- done(err);
- return;
- }
- consumer.subscribe([topicOut]);
- consumer.consume();
- });
- consumer.on('data', function(msg) {
- var v = msg.value.toString();
- allMsgs.push(v);
- });
- consumer.on('partition.eof', function(eof) {
- if (allMsgs.length === 1 && allMsgs[0] === 'B') {
- consumer.disconnect(function(err) {
- if (err) {
- done(err);
- return;
- }
- done();
- })
- } else {
- done('Expected only B');
- return;
- }
- });
- });
-
- it('should abort transaction', function(done) {
- producerTras.abortTransaction(function(err) {
- if (err) {
- done(err);
- return;
- }
- consumerTrans.committed(5000, function(err, tpo) {
- if (err) {
- done(err);
- return;
- }
- if (lastConsumerTransPosition[0].offset <= tpo[0].offset) {
- done('Committed mismatch');
- return;
- }
- done();
- });
- });
- });
-
- it('should consume only committed', function(done) {
- var gotB = false;
- var consumer = new Kafka.KafkaConsumer({
- 'metadata.broker.list': kafkaBrokerList,
- 'group.id': 'group_default',
- 'enable.partition.eof': true,
- 'enable.auto.commit': false,
- }, {
- 'auto.offset.reset': 'earliest',
- });
- consumer.connect({}, function(err) {
- if (err) {
- done(err);
- return;
- }
- consumer.subscribe([topicOut]);
- consumer.consume();
- });
- consumer.on('data', function(msg) {
- var v = msg.value.toString();
- if (v !== 'B') {
- done('Expected B');
- return;
- }
- gotB = true;
- });
- consumer.on('partition.eof', function(eof) {
- if (!gotB) {
- done('Expected B');
- return;
- }
- consumer.disconnect(function(err) {
- if (err) {
- done(err);
- return;
- }
- done();
- });
- });
- });
- });
- });
|