const Kafka = require("../lib/index.js"); const wait = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); const sendData = async (producer, totalMessages) => { const topic = "node"; const msg = "dkfljaskldfajkldsjfklasdjfalk;dsjfkl;asjfdskl;fjda;lkfjsdklfsajlkfjdsklfajsklfjsklanklsalkjkljkasfak"; const buffer = Buffer.from(msg); const key = "test"; for (let n = 0; n < totalMessages; ++n) { let bufferIsFull = false; do { bufferIsFull = false; try { producer.produce(topic, -1, buffer, key, null, n); } catch (error) { // Based on config, and messages, this will execute once if (error.code === Kafka.CODES.ERRORS.ERR__QUEUE_FULL) { producer.poll(); // The wait introduces 11-12 seconds of latency when dr_cb is true const start = process.hrtime(); await wait(50); const latency = process.hrtime(start); console.info(`Wait took ${latency[0]} seconds`); bufferIsFull = true; } else { throw error; } } } while (bufferIsFull); } console.log("Finished producing"); }; const verifyReports = async (reports, reportsComplete, totalMessages) => { const reportsTimeout = new Promise((resolve, reject) => { setTimeout(() => { reject("Delivery report timed out"); }, 10000); }); await Promise.race([reportsComplete, reportsTimeout]); await wait(500); // wait for some more delivery reports. if (reports.length === totalMessages) { console.log("Reports count match"); } else { console.error("Reports count doesn't match"); return; } for(let n = 0; n < totalMessages; ++n) { if(reports[n].opaque !== n) { console.error("Expect message number does not match"); } } }; const run = async () => { const reports = []; const totalMessages = 1000100; const producer = new Kafka.Producer({ "batch.num.messages": 50000, "compression.codec": "lz4", "delivery.report.only.error": false, "dr_cb": true, "metadata.broker.list": "localhost:9092", "message.send.max.retries": 10000000, "queue.buffering.max.kbytes": 2000000, "queue.buffering.max.messages": 1000000, "queue.buffering.max.ms": 0, "socket.keepalive.enable": true, }, {}); producer.setPollInterval(100); producer.on("event.log", (obj) => console.log(obj)); const reportsComplete = new Promise((resolve) => { producer.on("delivery-report", (err, report) => { reports.push(report); if(reports.length === totalMessages) { resolve(); } }); }); const readyPromise = new Promise((resolve) => { producer.on("ready", async () => { console.log("Producer is ready"); resolve(); }); producer.connect(); }); await readyPromise; await sendData(producer, totalMessages); await verifyReports(reports, reportsComplete, totalMessages); process.exit(0); }; run().catch((err) => { console.error(err); });