You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

298 lines
7.7 KiB

  1. /*
  2. * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
  3. *
  4. * Copyright (c) 2016 Blizzard Entertainment
  5. *
  6. * This software may be modified and distributed under the terms
  7. * of the MIT license. See the LICENSE.txt file for details.
  8. */
  9. var KafkaConsumerStream = require('../lib/kafka-consumer-stream');
  10. var t = require('assert');
  11. var Writable = require('stream').Writable;
  12. var Emitter = require('events');
  13. var fakeClient;
  14. module.exports = {
  15. 'KafkaConsumerStream stream': {
  16. 'beforeEach': function() {
  17. fakeClient = new Emitter();
  18. fakeClient._isConnecting = false;
  19. fakeClient._isConnected = true;
  20. fakeClient.isConnected = function() {
  21. return true;
  22. };
  23. fakeClient.unsubscribe = function() {
  24. this.emit('unsubscribed');
  25. return true;
  26. };
  27. fakeClient.disconnect = function(cb) {
  28. this.emit('disconnected');
  29. if (cb) {
  30. t.equal(typeof cb, 'function');
  31. setImmediate(cb);
  32. }
  33. };
  34. fakeClient.consume = function(size, cb) {
  35. if (!size) {
  36. cb = size;
  37. }
  38. t.equal(typeof cb, 'function',
  39. 'Provided callback should always be a function');
  40. setImmediate(function() {
  41. cb(null, [{
  42. value: Buffer.from('test'),
  43. key: 'testkey',
  44. offset: 1
  45. }]);
  46. });
  47. };
  48. fakeClient.subscribe = function(topics) {
  49. t.equal(Array.isArray(topics), true);
  50. return this;
  51. };
  52. },
  53. 'exports a stream class': function() {
  54. t.equal(typeof(KafkaConsumerStream), 'function');
  55. },
  56. 'can be instantiated': function() {
  57. t.equal(typeof new KafkaConsumerStream(fakeClient, {
  58. topics: 'topic'
  59. }), 'object');
  60. },
  61. 'properly reads off the fake client': function(cb) {
  62. var stream = new KafkaConsumerStream(fakeClient, {
  63. topics: 'topic'
  64. });
  65. stream.on('error', function(err) {
  66. t.fail(err);
  67. });
  68. stream.once('readable', function() {
  69. var message = stream.read();
  70. t.notEqual(message, null);
  71. t.ok(Buffer.isBuffer(message.value));
  72. t.equal('test', message.value.toString());
  73. t.equal('testkey', message.key);
  74. t.equal(typeof message.offset, 'number');
  75. stream.pause();
  76. cb();
  77. });
  78. },
  79. 'properly reads off the fake with a topic function': function(cb) {
  80. fakeClient._metadata = {
  81. orig_broker_id: 1,
  82. orig_broker_name: "broker_name",
  83. brokers: [
  84. {
  85. id: 1,
  86. host: 'localhost',
  87. port: 40
  88. }
  89. ],
  90. topics: [
  91. {
  92. name: 'awesome-topic',
  93. partitions: [
  94. {
  95. id: 1,
  96. leader: 20,
  97. replicas: [1, 2],
  98. isrs: [1, 2]
  99. }
  100. ]
  101. }
  102. ]
  103. };
  104. var stream = new KafkaConsumerStream(fakeClient, {
  105. topics: function(metadata) {
  106. var topics = metadata.topics.map(function(v) {
  107. return v.name;
  108. });
  109. return topics;
  110. }
  111. });
  112. fakeClient.subscribe = function(topics) {
  113. t.equal(Array.isArray(topics), true);
  114. t.equal(topics[0], 'awesome-topic');
  115. t.equal(topics.length, 1);
  116. return this;
  117. };
  118. stream.on('error', function(err) {
  119. t.fail(err);
  120. });
  121. stream.once('readable', function() {
  122. var message = stream.read();
  123. t.notEqual(message, null);
  124. t.ok(Buffer.isBuffer(message.value));
  125. t.equal('test', message.value.toString());
  126. t.equal('testkey', message.key);
  127. t.equal(typeof message.offset, 'number');
  128. stream.pause();
  129. cb();
  130. });
  131. },
  132. 'properly reads correct number of messages but does not stop': function(next) {
  133. var numMessages = 10;
  134. var numReceived = 0;
  135. var numSent = 0;
  136. fakeClient.consume = function(size, cb) {
  137. if (numSent < numMessages) {
  138. numSent++;
  139. setImmediate(function() {
  140. cb(null, [{
  141. value: Buffer.from('test'),
  142. offset: 1
  143. }]);
  144. });
  145. } else {
  146. }
  147. };
  148. var stream = new KafkaConsumerStream(fakeClient, {
  149. topics: 'topic'
  150. });
  151. stream.on('error', function(err) {
  152. // Ignore
  153. });
  154. stream.on('readable', function() {
  155. var message = stream.read();
  156. numReceived++;
  157. t.notEqual(message, null);
  158. t.ok(Buffer.isBuffer(message.value));
  159. t.equal(typeof message.offset, 'number');
  160. if (numReceived === numMessages) {
  161. // give it a second to get an error
  162. next();
  163. }
  164. });
  165. },
  166. 'can be piped around': function(cb) {
  167. var stream = new KafkaConsumerStream(fakeClient, {
  168. topics: 'topic'
  169. });
  170. var writable = new Writable({
  171. write: function(message, encoding, next) {
  172. t.notEqual(message, null);
  173. t.ok(Buffer.isBuffer(message.value));
  174. t.equal(typeof message.offset, 'number');
  175. this.cork();
  176. cb();
  177. },
  178. objectMode: true
  179. });
  180. stream.pipe(writable);
  181. stream.on('error', function(err) {
  182. t.fail(err);
  183. });
  184. },
  185. 'streams as batch when specified': function(next) {
  186. var numMessages = 10;
  187. var numReceived = 0;
  188. var numSent = 0;
  189. fakeClient.consume = function(size, cb) {
  190. if (numSent < numMessages) {
  191. numSent++;
  192. setImmediate(function() {
  193. cb(null, [{
  194. value: Buffer.from('test'),
  195. offset: 1
  196. }]);
  197. });
  198. } else {
  199. }
  200. };
  201. var stream = new KafkaConsumerStream(fakeClient, {
  202. topics: 'topic',
  203. streamAsBatch: true
  204. });
  205. stream.on('error', function(err) {
  206. // Ignore
  207. });
  208. stream.on('readable', function() {
  209. var messages = stream.read();
  210. numReceived++;
  211. t.equal(Array.isArray(messages), true);
  212. t.equal(messages.length, 1);
  213. var message = messages[0];
  214. t.notEqual(message, null);
  215. t.ok(Buffer.isBuffer(message.value));
  216. t.equal(typeof message.offset, 'number');
  217. if (numReceived === numMessages) {
  218. // give it a second to get an error
  219. next();
  220. }
  221. });
  222. },
  223. 'stops reading on unsubscribe': function(next) {
  224. var numMessages = 10;
  225. var numReceived = 0;
  226. var numSent = 0;
  227. fakeClient.consume = function(size, cb) {
  228. if (numSent < numMessages) {
  229. numSent++;
  230. setImmediate(function() {
  231. cb(null, [{
  232. value: Buffer.from('test'),
  233. offset: 1
  234. }]);
  235. });
  236. } else {
  237. }
  238. };
  239. var stream = new KafkaConsumerStream(fakeClient, {
  240. topics: 'topic'
  241. });
  242. stream.on('error', function(err) {
  243. // Ignore
  244. });
  245. stream.on('readable', function() {
  246. var message = stream.read();
  247. numReceived++;
  248. if (message) {
  249. t.ok(Buffer.isBuffer(message.value));
  250. t.equal(typeof message.offset, 'number');
  251. if (numReceived === numMessages) {
  252. // give it a second to get an error
  253. fakeClient.emit('unsubscribed');
  254. }
  255. }
  256. });
  257. stream.on('end', function() {
  258. next();
  259. });
  260. },
  261. 'calls the callback on destroy': function (next) {
  262. fakeClient.unsubscribe = function () {};
  263. var stream = new KafkaConsumerStream(fakeClient, {
  264. topics: 'topic'
  265. });
  266. stream.once('readable', function () {
  267. stream.destroy();
  268. stream.once('close', next);
  269. });
  270. },
  271. }
  272. };