You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

300 lines
8.4 KiB

  1. /*
  2. * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
  3. *
  4. * Copyright (c) 2016 Blizzard Entertainment
  5. *
  6. * This software may be modified and distributed under the terms
  7. * of the MIT license. See the LICENSE.txt file for details.
  8. */
  9. var Kafka = require('../');
  10. var t = require('assert');
  11. var crypto = require('crypto');
  12. var eventListener = require('./listener');
  13. var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
  14. var serviceStopped = false;
  15. describe('Producer', function() {
  16. var producer;
  17. describe('with dr_cb', function() {
  18. beforeEach(function(done) {
  19. producer = new Kafka.Producer({
  20. 'client.id': 'kafka-test',
  21. 'metadata.broker.list': kafkaBrokerList,
  22. 'dr_cb': true,
  23. 'debug': 'all'
  24. });
  25. producer.connect({}, function(err) {
  26. t.ifError(err);
  27. done();
  28. });
  29. eventListener(producer);
  30. });
  31. afterEach(function(done) {
  32. producer.disconnect(function() {
  33. done();
  34. });
  35. });
  36. it('should connect to Kafka', function(done) {
  37. producer.getMetadata({}, function(err, metadata) {
  38. t.ifError(err);
  39. t.ok(metadata);
  40. // Ensure it is in the correct format
  41. t.ok(metadata.orig_broker_name, 'Broker name is not set');
  42. t.notStrictEqual(metadata.orig_broker_id, undefined, 'Broker id is not set');
  43. t.equal(Array.isArray(metadata.brokers), true);
  44. t.equal(Array.isArray(metadata.topics), true);
  45. done();
  46. });
  47. });
  48. it('should produce a message with a null payload and null key', function(done) {
  49. this.timeout(3000);
  50. var tt = setInterval(function() {
  51. producer.poll();
  52. }, 200);
  53. producer.once('delivery-report', function(err, report) {
  54. clearInterval(tt);
  55. t.ifError(err);
  56. t.notStrictEqual(report, undefined);
  57. t.strictEqual(typeof report.topic, 'string');
  58. t.strictEqual(typeof report.partition, 'number');
  59. t.strictEqual(typeof report.offset, 'number');
  60. t.strictEqual( report.key, null);
  61. done();
  62. });
  63. producer.produce('test', null, null, null);
  64. });
  65. it('should produce a message with a payload and key', function(done) {
  66. this.timeout(3000);
  67. var tt = setInterval(function() {
  68. producer.poll();
  69. }, 200);
  70. producer.once('delivery-report', function(err, report) {
  71. clearInterval(tt);
  72. t.ifError(err);
  73. t.notStrictEqual(report, undefined);
  74. t.strictEqual(report.value, undefined);
  75. t.strictEqual(typeof report.topic, 'string');
  76. t.strictEqual(typeof report.partition, 'number');
  77. t.strictEqual(typeof report.offset, 'number');
  78. t.equal(report.key, 'key');
  79. done();
  80. });
  81. producer.produce('test', null, Buffer.from('value'), 'key');
  82. });
  83. it('should produce a message with a payload and key buffer', function(done) {
  84. this.timeout(3000);
  85. var tt = setInterval(function() {
  86. producer.poll();
  87. }, 200);
  88. producer.once('delivery-report', function(err, report) {
  89. clearInterval(tt);
  90. t.ifError(err);
  91. t.notStrictEqual(report, undefined);
  92. t.strictEqual(report.value, undefined);
  93. t.strictEqual(typeof report.topic, 'string');
  94. t.strictEqual(typeof report.partition, 'number');
  95. t.strictEqual(typeof report.offset, 'number');
  96. t.equal(report.key.length > 3, true);
  97. done();
  98. });
  99. producer.produce('test', null, Buffer.from('value'), Buffer.from('key\0s'));
  100. });
  101. it('should produce a message with an opaque', function(done) {
  102. this.timeout(3000);
  103. var tt = setInterval(function() {
  104. producer.poll();
  105. }, 200);
  106. producer.once('delivery-report', function(err, report) {
  107. clearInterval(tt);
  108. t.ifError(err);
  109. t.notStrictEqual(report, undefined);
  110. t.strictEqual(typeof report.topic, 'string');
  111. t.strictEqual(typeof report.partition, 'number');
  112. t.strictEqual(typeof report.offset, 'number');
  113. t.equal(report.opaque, 'opaque');
  114. done();
  115. });
  116. producer.produce('test', null, Buffer.from('value'), null, null, 'opaque');
  117. });
  118. it('should get 100% deliverability', function(done) {
  119. this.timeout(3000);
  120. var total = 0;
  121. var max = 10000;
  122. var verified_received = 0;
  123. var tt = setInterval(function() {
  124. producer.poll();
  125. }, 200);
  126. producer
  127. .on('delivery-report', function(err, report) {
  128. t.ifError(err);
  129. t.notStrictEqual(report, undefined);
  130. t.strictEqual(typeof report.topic, 'string');
  131. t.strictEqual(typeof report.partition, 'number');
  132. t.strictEqual(typeof report.offset, 'number');
  133. verified_received++;
  134. if (verified_received === max) {
  135. clearInterval(tt);
  136. done();
  137. }
  138. });
  139. // Produce
  140. for (total = 0; total <= max; total++) {
  141. producer.produce('test', null, Buffer.from('message ' + total), null);
  142. }
  143. });
  144. });
  145. describe('with_dr_msg_cb', function() {
  146. beforeEach(function(done) {
  147. producer = new Kafka.Producer({
  148. 'client.id': 'kafka-test',
  149. 'metadata.broker.list': kafkaBrokerList,
  150. 'dr_msg_cb': true,
  151. 'debug': 'all'
  152. });
  153. producer.connect({}, function(err) {
  154. t.ifError(err);
  155. done();
  156. });
  157. eventListener(producer);
  158. });
  159. afterEach(function(done) {
  160. producer.disconnect(function() {
  161. done();
  162. });
  163. });
  164. it('should produce a message with a payload and key', function(done) {
  165. this.timeout(3000);
  166. var tt = setInterval(function() {
  167. producer.poll();
  168. }, 200);
  169. producer.once('delivery-report', function(err, report) {
  170. clearInterval(tt);
  171. t.ifError(err);
  172. t.notStrictEqual(report, undefined);
  173. t.strictEqual(typeof report.topic, 'string');
  174. t.strictEqual(typeof report.partition, 'number');
  175. t.strictEqual(typeof report.offset, 'number');
  176. t.ok(report.key.toString(), 'key');
  177. t.equal(report.value.toString(), 'hai');
  178. done();
  179. });
  180. producer.produce('test', null, Buffer.from('hai'), 'key');
  181. });
  182. it('should produce a message with an empty payload and empty key (https://github.com/Blizzard/node-rdkafka/issues/117)', function(done) {
  183. this.timeout(3000);
  184. var tt = setInterval(function() {
  185. producer.poll();
  186. }, 200);
  187. producer.once('delivery-report', function(err, report) {
  188. clearInterval(tt);
  189. t.ifError(err);
  190. t.notStrictEqual(report, undefined);
  191. t.strictEqual(typeof report.topic, 'string');
  192. t.strictEqual(typeof report.partition, 'number');
  193. t.strictEqual(typeof report.offset, 'number');
  194. t.equal(report.key.toString(), '', 'key should be an empty string');
  195. t.strictEqual(report.value.toString(), '', 'payload should be an empty string');
  196. done();
  197. });
  198. producer.produce('test', null, Buffer.from(''), '');
  199. });
  200. it('should produce a message with a null payload and null key (https://github.com/Blizzard/node-rdkafka/issues/117)', function(done) {
  201. this.timeout(3000);
  202. producer.setPollInterval(10);
  203. producer.once('delivery-report', function(err, report) {
  204. t.ifError(err);
  205. t.notStrictEqual(report, undefined);
  206. t.strictEqual(typeof report.topic, 'string');
  207. t.strictEqual(typeof report.partition, 'number');
  208. t.strictEqual(typeof report.offset, 'number');
  209. t.strictEqual(report.key, null, 'key should be null');
  210. t.strictEqual(report.value, null, 'payload should be null');
  211. done();
  212. });
  213. producer.produce('test', null, null, null);
  214. });
  215. it('should produce an int64 key (https://github.com/Blizzard/node-rdkafka/issues/208)', function(done) {
  216. var v1 = 0x0000000000000084;
  217. var arr = new Uint8Array(8);
  218. arr[0] = 0x00;
  219. arr[1] = 0x00;
  220. arr[2] = 0x00;
  221. arr[3] = 0x00;
  222. arr[4] = 0x00;
  223. arr[5] = 0x00;
  224. arr[6] = 0x00;
  225. arr[7] = 84;
  226. var buf = Buffer.from(arr.buffer);
  227. producer.setPollInterval(10);
  228. producer.once('delivery-report', function(err, report) {
  229. t.ifError(err);
  230. t.notStrictEqual(report, undefined);
  231. t.deepEqual(buf, report.key);
  232. done();
  233. });
  234. producer.produce('test', null, null, Buffer.from(arr.buffer));
  235. this.timeout(3000);
  236. });
  237. });
  238. });