You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

723 lines
19 KiB

  1. /*
  2. * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
  3. *
  4. * Copyright (c) 2016 Blizzard Entertainment
  5. *
  6. * This software may be modified and distributed under the terms
  7. * of the MIT license. See the LICENSE.txt file for details.
  8. */
  9. var ProducerStream = require('../lib/producer-stream');
  10. var t = require('assert');
  11. var Readable = require('stream').Readable;
  12. var Emitter = require('events');
  13. var fakeClient;
  14. module.exports = {
  15. 'ProducerStream stream': {
  16. 'beforeEach': function() {
  17. fakeClient = new Emitter();
  18. fakeClient._isConnected = true;
  19. fakeClient._isConnecting = false;
  20. fakeClient.isConnected = function() {
  21. return true;
  22. };
  23. fakeClient.connect = function(opts, cb) {
  24. setImmediate(function() {
  25. this.emit('ready');
  26. }.bind(this));
  27. return this;
  28. };
  29. fakeClient.disconnect = function(cb) {
  30. setImmediate(function() {
  31. this.emit('disconnected');
  32. }.bind(this));
  33. return this;
  34. };
  35. fakeClient.poll = function() {
  36. return this;
  37. };
  38. fakeClient.setPollInterval = function() {
  39. return this;
  40. };
  41. },
  42. 'exports a stream class': function() {
  43. t.equal(typeof(ProducerStream), 'function');
  44. },
  45. 'in buffer mode': {
  46. 'requires a topic be provided when running in buffer mode': function() {
  47. t.throws(function() {
  48. var x = new ProducerStream(fakeClient, {});
  49. });
  50. },
  51. 'can be instantiated': function() {
  52. t.equal(typeof new ProducerStream(fakeClient, {
  53. topic: 'topic'
  54. }), 'object');
  55. },
  56. 'does not run connect if the client is already connected': function(cb) {
  57. fakeClient.connect = function() {
  58. t.fail('Should not run connect if the client is already connected');
  59. };
  60. var stream = new ProducerStream(fakeClient, {
  61. topic: 'topic'
  62. });
  63. setTimeout(cb, 10);
  64. },
  65. 'does run connect if the client is not already connected': function(cb) {
  66. fakeClient._isConnected = false;
  67. fakeClient.isConnected = function() {
  68. return false;
  69. };
  70. fakeClient.once('ready', cb);
  71. var stream = new ProducerStream(fakeClient, {
  72. topic: 'topic'
  73. });
  74. },
  75. 'forwards connectOptions to client options when provided': function(cb) {
  76. var testClientOptions = { timeout: 3000 };
  77. fakeClient._isConnected = false;
  78. fakeClient.isConnected = function() {
  79. return false;
  80. };
  81. var fakeConnect = fakeClient.connect;
  82. fakeClient.connect = function(opts, callback) {
  83. t.deepEqual(opts, testClientOptions);
  84. cb();
  85. };
  86. var stream = new ProducerStream(fakeClient, {
  87. topic: 'topic',
  88. connectOptions: testClientOptions
  89. });
  90. },
  91. 'automatically disconnects when autoclose is not provided': function(cb) {
  92. fakeClient.once('disconnected', cb);
  93. var stream = new ProducerStream(fakeClient, {
  94. topic: 'topic'
  95. });
  96. stream.end();
  97. },
  98. 'does not automatically disconnect when autoclose is set to false': function(done) {
  99. fakeClient.once('disconnected', function() {
  100. t.fail('Should not run disconnect');
  101. });
  102. var stream = new ProducerStream(fakeClient, {
  103. topic: 'topic',
  104. autoClose: false
  105. });
  106. stream.end();
  107. setTimeout(done, 10);
  108. },
  109. 'properly reads off the fake client': function(done) {
  110. var message;
  111. fakeClient.produce = function(topic, partition, message, key) {
  112. t.equal('topic', topic);
  113. t.equal(message.toString(), 'Awesome');
  114. t.equal(Buffer.isBuffer(message), true);
  115. done();
  116. };
  117. var stream = new ProducerStream(fakeClient, {
  118. topic: 'topic'
  119. });
  120. stream.on('error', function(err) {
  121. t.fail(err);
  122. });
  123. stream.write(Buffer.from('Awesome'));
  124. },
  125. 'passes a topic string if options are not provided': function(done) {
  126. var message;
  127. fakeClient.produce = function(topic, partition, message, key) {
  128. t.equal('topic', topic);
  129. t.equal(message.toString(), 'Awesome');
  130. t.equal(Buffer.isBuffer(message), true);
  131. done();
  132. };
  133. var stream = new ProducerStream(fakeClient, {
  134. topic: 'topic'
  135. });
  136. stream.on('error', function(err) {
  137. t.fail(err);
  138. });
  139. stream.write(Buffer.from('Awesome'));
  140. },
  141. 'properly handles queue errors': function(done) {
  142. var message;
  143. var first = true;
  144. fakeClient.produce = function(topic, partition, message, key) {
  145. t.equal('topic', topic);
  146. t.equal(message.toString(), 'Awesome');
  147. t.equal(Buffer.isBuffer(message), true);
  148. if (first) {
  149. first = false;
  150. var err = new Error('Queue full');
  151. err.code = -184;
  152. throw err;
  153. } else {
  154. done();
  155. }
  156. };
  157. var stream = new ProducerStream(fakeClient, {
  158. topic: 'topic'
  159. });
  160. stream.on('error', function(err) {
  161. t.fail(err);
  162. });
  163. stream.write(Buffer.from('Awesome'));
  164. },
  165. 'errors out when a non-queue related error occurs': function(done) {
  166. fakeClient.produce = function(topic, partition, message, key) {
  167. var err = new Error('ERR_MSG_SIZE_TOO_LARGE ');
  168. err.code = 10;
  169. throw err;
  170. };
  171. fakeClient.on('disconnected', function() {
  172. done();
  173. });
  174. var stream = new ProducerStream(fakeClient, {
  175. topic: 'topic'
  176. });
  177. stream.on('error', function(err) {
  178. t.equal(err.code, 10, 'Error was unexpected');
  179. // This is good
  180. });
  181. stream.write(Buffer.from('Awesome'));
  182. },
  183. 'errors out when a non-queue related error occurs but does not disconnect if autoclose is false': function(done) {
  184. fakeClient.produce = function(topic, partition, message, key) {
  185. var err = new Error('ERR_MSG_SIZE_TOO_LARGE ');
  186. err.code = 10;
  187. throw err;
  188. };
  189. fakeClient.on('disconnected', function() {
  190. t.fail('Should not try to disconnect');
  191. });
  192. var stream = new ProducerStream(fakeClient, {
  193. topic: 'topic',
  194. autoClose: false
  195. });
  196. stream.on('error', function(err) {
  197. t.equal(err.code, 10, 'Error was unexpected');
  198. // This is good
  199. });
  200. stream.write(Buffer.from('Awesome'));
  201. setTimeout(done, 10);
  202. },
  203. 'properly reads more than one message in order': function(done) {
  204. var message;
  205. var currentMessage = 0;
  206. fakeClient.produce = function(topic, partition, message, key) {
  207. currentMessage++;
  208. t.equal('topic', topic);
  209. t.equal(message.toString(), 'Awesome' + currentMessage);
  210. t.equal(Buffer.isBuffer(message), true);
  211. if (currentMessage === 2) {
  212. done();
  213. }
  214. };
  215. var stream = new ProducerStream(fakeClient, {
  216. topic: 'topic'
  217. });
  218. stream.on('error', function(err) {
  219. t.fail(err);
  220. });
  221. stream.write(Buffer.from('Awesome1'));
  222. stream.write(Buffer.from('Awesome2'));
  223. },
  224. 'can be piped into a readable': function(done) {
  225. var message;
  226. var currentMessage = 0;
  227. var iteration = 0;
  228. var readable = new Readable({
  229. read: function(size) {
  230. iteration++;
  231. if (iteration > 1) {
  232. } else {
  233. this.push('Awesome1');
  234. this.push('Awesome2');
  235. }
  236. }
  237. });
  238. fakeClient.produce = function(topic, partition, message, key) {
  239. currentMessage++;
  240. t.equal('topic', topic);
  241. t.equal(message.toString(), 'Awesome' + currentMessage);
  242. t.equal(Buffer.isBuffer(message), true);
  243. if (currentMessage === 2) {
  244. done();
  245. }
  246. };
  247. var stream = new ProducerStream(fakeClient, {
  248. topic: 'topic'
  249. });
  250. stream.on('error', function(err) {
  251. t.fail(err);
  252. });
  253. readable.pipe(stream);
  254. },
  255. 'can drain buffered chunks': function(done) {
  256. var message;
  257. var currentMessage = 0;
  258. fakeClient.produce = function(topic, partition, message, key) {
  259. currentMessage++;
  260. t.equal('topic', topic);
  261. t.equal(message.toString(), 'Awesome' + currentMessage);
  262. t.equal(Buffer.isBuffer(message), true);
  263. if (currentMessage === 3) {
  264. done();
  265. }
  266. };
  267. var stream = new ProducerStream(fakeClient, {
  268. topic: 'topic'
  269. });
  270. stream.on('error', function(err) {
  271. t.fail(err);
  272. });
  273. fakeClient._isConnected = false;
  274. fakeClient._isConnecting = true;
  275. fakeClient.isConnected = function() {
  276. return false;
  277. };
  278. stream.write(Buffer.from('Awesome1'));
  279. stream.write(Buffer.from('Awesome2'));
  280. stream.write(Buffer.from('Awesome3'));
  281. fakeClient._isConnected = true;
  282. fakeClient._isConnecting = false;
  283. fakeClient.isConnected = function() {
  284. return true;
  285. };
  286. fakeClient.connect();
  287. },
  288. },
  289. 'in objectMode': {
  290. 'can be instantiated': function() {
  291. t.equal(typeof new ProducerStream(fakeClient, {
  292. objectMode: true
  293. }), 'object');
  294. },
  295. 'properly produces message objects': function(done) {
  296. var _timestamp = Date.now();
  297. var _opaque = {
  298. foo: 'bar'
  299. };
  300. var _headers = {
  301. header: 'header value'
  302. };
  303. fakeClient.produce = function(topic, partition, message, key, timestamp, opaque, headers) {
  304. t.equal('topic', topic);
  305. t.equal(message.toString(), 'Awesome');
  306. t.equal(Buffer.isBuffer(message), true);
  307. t.equal(partition, 10);
  308. t.equal(key, 'key');
  309. t.deepEqual(_opaque, opaque);
  310. t.deepEqual(_timestamp, timestamp);
  311. t.deepEqual(_headers, headers);
  312. done();
  313. };
  314. var stream = new ProducerStream(fakeClient, {
  315. objectMode: true
  316. });
  317. stream.on('error', function(err) {
  318. t.fail(err);
  319. });
  320. stream.write({
  321. topic: 'topic',
  322. value: Buffer.from('Awesome'),
  323. partition: 10,
  324. key: 'key',
  325. timestamp: _timestamp,
  326. opaque: _opaque,
  327. headers: _headers
  328. });
  329. },
  330. 'properly handles queue errors': function(done) {
  331. var message;
  332. var first = true;
  333. fakeClient.produce = function(topic, partition, message, key) {
  334. t.equal('topic', topic);
  335. t.equal(message.toString(), 'Awesome');
  336. t.equal(Buffer.isBuffer(message), true);
  337. t.equal(partition, 10);
  338. t.equal(key, 'key');
  339. if (first) {
  340. first = false;
  341. var err = new Error('Queue full');
  342. err.code = -184;
  343. throw err;
  344. } else {
  345. done();
  346. }
  347. };
  348. var stream = new ProducerStream(fakeClient, {
  349. objectMode: true
  350. });
  351. stream.on('error', function(err) {
  352. t.fail(err);
  353. });
  354. stream.write({
  355. topic: 'topic',
  356. value: Buffer.from('Awesome'),
  357. partition: 10,
  358. key: 'key'
  359. });
  360. },
  361. 'errors out when a non-queue related error occurs': function(done) {
  362. fakeClient.produce = function(topic, partition, message, key) {
  363. var err = new Error('ERR_MSG_SIZE_TOO_LARGE ');
  364. err.code = 10;
  365. throw err;
  366. };
  367. fakeClient.on('disconnected', function() {
  368. done();
  369. });
  370. var stream = new ProducerStream(fakeClient, {
  371. objectMode: true
  372. });
  373. stream.on('error', function(err) {
  374. t.equal(err.code, 10, 'Error was unexpected');
  375. // This is good
  376. });
  377. stream.write(Buffer.from('Awesome'));
  378. },
  379. 'errors out when a non-queue related error occurs but does not disconnect if autoclose is false': function(done) {
  380. fakeClient.produce = function(topic, partition, message, key) {
  381. var err = new Error('ERR_MSG_SIZE_TOO_LARGE ');
  382. err.code = 10;
  383. throw err;
  384. };
  385. fakeClient.on('disconnected', function() {
  386. t.fail('Should not try to disconnect');
  387. });
  388. var stream = new ProducerStream(fakeClient, {
  389. objectMode: true,
  390. autoClose: false
  391. });
  392. stream.on('error', function(err) {
  393. t.equal(err.code, 10, 'Error was unexpected');
  394. // This is good
  395. });
  396. stream.write({
  397. value: Buffer.from('Awesome'),
  398. topic: 'topic'
  399. });
  400. setTimeout(done, 10);
  401. },
  402. 'properly reads more than one message in order': function(done) {
  403. var message;
  404. var currentMessage = 0;
  405. fakeClient.produce = function(topic, partition, message, key) {
  406. currentMessage++;
  407. t.equal('topic', topic);
  408. t.equal(message.toString(), 'Awesome' + currentMessage);
  409. t.equal(Buffer.isBuffer(message), true);
  410. if (currentMessage === 2) {
  411. done();
  412. }
  413. };
  414. var stream = new ProducerStream(fakeClient, {
  415. objectMode: true
  416. });
  417. stream.on('error', function(err) {
  418. t.fail(err);
  419. });
  420. stream.write({
  421. value: Buffer.from('Awesome1'),
  422. topic: 'topic'
  423. });
  424. stream.write({
  425. value: Buffer.from('Awesome2'),
  426. topic: 'topic'
  427. });
  428. },
  429. 'can be piped into a readable': function(done) {
  430. var message;
  431. var currentMessage = 0;
  432. var iteration = 0;
  433. var readable = new Readable({
  434. objectMode: true,
  435. read: function(size) {
  436. iteration++;
  437. if (iteration > 1) {
  438. } else {
  439. this.push({
  440. topic: 'topic',
  441. value: Buffer.from('Awesome1')
  442. });
  443. this.push({
  444. topic: 'topic',
  445. value: Buffer.from('Awesome2')
  446. });
  447. }
  448. }
  449. });
  450. fakeClient.produce = function(topic, partition, message, key) {
  451. currentMessage++;
  452. t.equal('topic', topic);
  453. t.equal(message.toString(), 'Awesome' + currentMessage);
  454. t.equal(Buffer.isBuffer(message), true);
  455. if (currentMessage === 2) {
  456. done();
  457. }
  458. };
  459. var stream = new ProducerStream(fakeClient, {
  460. objectMode: true
  461. });
  462. stream.on('error', function(err) {
  463. t.fail(err);
  464. });
  465. readable.pipe(stream);
  466. },
  467. 'can drain buffered messages': function(done) {
  468. var message;
  469. var currentMessage = 0;
  470. fakeClient.produce = function(topic, partition, message, key) {
  471. currentMessage++;
  472. t.equal('topic', topic);
  473. t.equal(message.toString(), 'Awesome' + currentMessage);
  474. t.equal(Buffer.isBuffer(message), true);
  475. if (currentMessage === 3) {
  476. done();
  477. }
  478. };
  479. var stream = new ProducerStream(fakeClient, {
  480. objectMode: true
  481. });
  482. stream.on('error', function(err) {
  483. t.fail(err);
  484. });
  485. fakeClient._isConnected = false;
  486. fakeClient._isConnecting = true;
  487. fakeClient.isConnected = function() {
  488. return false;
  489. };
  490. stream.write({
  491. value: Buffer.from('Awesome1'),
  492. topic: 'topic'
  493. });
  494. stream.write({
  495. value: Buffer.from('Awesome2'),
  496. topic: 'topic'
  497. });
  498. stream.write({
  499. value: Buffer.from('Awesome3'),
  500. topic: 'topic'
  501. });
  502. fakeClient._isConnected = true;
  503. fakeClient._isConnecting = false;
  504. fakeClient.isConnected = function() {
  505. return true;
  506. };
  507. fakeClient.connect();
  508. },
  509. 'properly handles queue errors while draining': function(done) {
  510. var message;
  511. var currentMessage = 0;
  512. fakeClient.produce = function(topic, partition, message, key) {
  513. currentMessage++;
  514. if (currentMessage === 3) {
  515. var err = new Error('Queue full');
  516. err.code = -184;
  517. throw err;
  518. } else if (currentMessage === 4) {
  519. done();
  520. }
  521. };
  522. var stream = new ProducerStream(fakeClient, {
  523. objectMode: true
  524. });
  525. stream.on('error', function(err) {
  526. t.fail(err);
  527. });
  528. fakeClient._isConnected = false;
  529. fakeClient._isConnecting = true;
  530. fakeClient.isConnected = function() {
  531. return false;
  532. };
  533. stream.write({
  534. value: Buffer.from('Awesome1'),
  535. topic: 'topic'
  536. });
  537. stream.write({
  538. value: Buffer.from('Awesome2'),
  539. topic: 'topic'
  540. });
  541. stream.write({
  542. value: Buffer.from('Awesome3'),
  543. topic: 'topic'
  544. });
  545. stream.write({
  546. value: Buffer.from('Awesome4'),
  547. topic: 'topic'
  548. });
  549. fakeClient._isConnected = true;
  550. fakeClient._isConnecting = false;
  551. fakeClient.isConnected = function() {
  552. return true;
  553. };
  554. fakeClient.connect();
  555. },
  556. 'errors out for non-queue related errors while draining': function (done) {
  557. var currentMessage = 0;
  558. fakeClient.produce = function(topic, partition, message, key) {
  559. currentMessage++;
  560. if (currentMessage === 3) {
  561. var err = new Error('ERR_MSG_SIZE_TOO_LARGE ');
  562. err.code = 10;
  563. throw err;
  564. }
  565. };
  566. fakeClient.on('disconnected', function() {
  567. done();
  568. });
  569. var stream = new ProducerStream(fakeClient, {
  570. objectMode: true
  571. });
  572. stream.on('error', function(err) {
  573. t.equal(err.code, 10, 'Error was unexpected');
  574. // This is good
  575. });
  576. fakeClient._isConnected = false;
  577. fakeClient._isConnecting = true;
  578. fakeClient.isConnected = function() {
  579. return false;
  580. };
  581. stream.write({
  582. value: Buffer.from('Awesome1'),
  583. topic: 'topic'
  584. });
  585. stream.write({
  586. value: Buffer.from('Awesome2'),
  587. topic: 'topic'
  588. });
  589. stream.write({
  590. value: Buffer.from('Awesome3'),
  591. topic: 'topic'
  592. });
  593. stream.write({
  594. value: Buffer.from('Awesome4'),
  595. topic: 'topic'
  596. });
  597. fakeClient._isConnected = true;
  598. fakeClient._isConnecting = false;
  599. fakeClient.isConnected = function() {
  600. return true;
  601. };
  602. fakeClient.connect();
  603. },
  604. }
  605. }
  606. };