You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

496 lines
13 KiB

  1. /*
  2. * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
  3. *
  4. * Copyright (c) 2016 Blizzard Entertainment
  5. *
  6. * This software may be modified and distributed under the terms
  7. * of the MIT license. See the LICENSE.txt file for details.
  8. */
  9. var HighLevelProducer = require('../../lib/producer/high-level-producer');
  10. var t = require('assert');
  11. var Promise = require('bluebird');
  12. // var Mock = require('./mock');
  13. var client;
  14. var defaultConfig = {
  15. 'client.id': 'kafka-mocha',
  16. 'metadata.broker.list': 'localhost:9092',
  17. 'socket.timeout.ms': 250
  18. };
  19. var topicConfig = {};
  20. var server;
  21. module.exports = {
  22. 'High Level Producer client': {
  23. 'beforeEach': function() {
  24. client = new HighLevelProducer(defaultConfig, topicConfig);
  25. },
  26. 'afterEach': function() {
  27. client = null;
  28. },
  29. 'is an object': function() {
  30. t.equal(typeof(client), 'object');
  31. },
  32. 'requires configuration': function() {
  33. t.throws(function() {
  34. return new HighLevelProducer();
  35. });
  36. },
  37. 'has necessary methods from superclass': function() {
  38. var methods = ['_oldProduce'];
  39. methods.forEach(function(m) {
  40. t.equal(typeof(client[m]), 'function', 'Client is missing ' + m + ' method');
  41. });
  42. },
  43. 'does not modify config and clones it': function () {
  44. t.deepStrictEqual(defaultConfig, {
  45. 'client.id': 'kafka-mocha',
  46. 'metadata.broker.list': 'localhost:9092',
  47. 'socket.timeout.ms': 250
  48. });
  49. t.deepStrictEqual(client.globalConfig, {
  50. 'client.id': 'kafka-mocha',
  51. 'metadata.broker.list': 'localhost:9092',
  52. 'socket.timeout.ms': 250
  53. });
  54. t.notEqual(defaultConfig, client.globalConfig);
  55. },
  56. 'does not modify topic config and clones it': function () {
  57. t.deepStrictEqual(topicConfig, {});
  58. t.deepStrictEqual(client.topicConfig, {});
  59. t.notEqual(topicConfig, client.topicConfig);
  60. },
  61. 'produce method': {
  62. 'headers support': function(next) {
  63. var v = 'foo';
  64. var k = 'key';
  65. var h = [
  66. { key1: "value1A" },
  67. { key1: "value1B" },
  68. { key2: "value2" },
  69. { key1: "value1C" },
  70. ];
  71. var jsonH = JSON.stringify(h);
  72. client._oldProduce = function(topic, partition, value, key, timestamp, opaque, headers) {
  73. t.equal(value, 'foo');
  74. t.equal(key, 'key');
  75. t.equal(JSON.stringify(headers), jsonH);
  76. next();
  77. };
  78. client.produce('tawpic', 0, v, k, null, h, function() {
  79. });
  80. },
  81. 'can use a custom serializer': function(next) {
  82. var v = {
  83. disparaging: 'hyena',
  84. };
  85. var k = {
  86. delicious: 'cookie',
  87. };
  88. var valueSerializerCalled = false;
  89. var keySerializerCalled = false;
  90. client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
  91. t.equal(valueSerializerCalled, true);
  92. t.equal(keySerializerCalled, true);
  93. t.deepEqual(v, Buffer.from('foo'));
  94. t.equal(k, 'key');
  95. next();
  96. };
  97. client.setValueSerializer(function(_) {
  98. valueSerializerCalled = true;
  99. t.deepEqual(_, v);
  100. return Buffer.from('foo');
  101. });
  102. client.setKeySerializer(function(_) {
  103. keySerializerCalled = true;
  104. t.deepEqual(_, k);
  105. return 'key';
  106. });
  107. client.produce('tawpic', 0, v, k, null, function() {
  108. });
  109. },
  110. 'can use a value asynchronous custom serializer': function(next) {
  111. var v = {
  112. disparaging: 'hyena',
  113. };
  114. var k = {
  115. delicious: 'cookie',
  116. };
  117. var valueSerializerCalled = false;
  118. var keySerializerCalled = false;
  119. client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
  120. t.equal(valueSerializerCalled, true);
  121. t.equal(keySerializerCalled, true);
  122. t.deepEqual(v, Buffer.from('foo'));
  123. t.equal(k, 'key');
  124. next();
  125. };
  126. client.setValueSerializer(function(_, cb) {
  127. valueSerializerCalled = true;
  128. t.deepEqual(_, v);
  129. setImmediate(function() {
  130. cb(null, Buffer.from('foo'));
  131. });
  132. });
  133. client.setKeySerializer(function(_) {
  134. keySerializerCalled = true;
  135. t.deepEqual(_, k);
  136. return 'key';
  137. });
  138. client.produce('tawpic', 0, v, k, null, function() {
  139. });
  140. },
  141. 'can use a key asynchronous custom serializer': function(next) {
  142. var v = {
  143. disparaging: 'hyena',
  144. };
  145. var k = {
  146. delicious: 'cookie',
  147. };
  148. var valueSerializerCalled = false;
  149. var keySerializerCalled = false;
  150. client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
  151. t.equal(valueSerializerCalled, true);
  152. t.equal(keySerializerCalled, true);
  153. t.deepEqual(v, Buffer.from('foo'));
  154. t.equal(k, 'key');
  155. next();
  156. };
  157. client.setValueSerializer(function(_) {
  158. valueSerializerCalled = true;
  159. t.deepEqual(_, v);
  160. return Buffer.from('foo');
  161. });
  162. client.setKeySerializer(function(_, cb) {
  163. keySerializerCalled = true;
  164. t.deepEqual(_, k);
  165. setImmediate(function() {
  166. cb(null, 'key');
  167. });
  168. });
  169. client.produce('tawpic', 0, v, k, null, function() {
  170. });
  171. },
  172. 'can use two asynchronous custom serializers': function(next) {
  173. var v = {
  174. disparaging: 'hyena',
  175. };
  176. var k = {
  177. delicious: 'cookie',
  178. };
  179. var valueSerializerCalled = false;
  180. var keySerializerCalled = false;
  181. client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
  182. t.equal(valueSerializerCalled, true);
  183. t.equal(keySerializerCalled, true);
  184. t.deepEqual(v, Buffer.from('foo'));
  185. t.equal(k, 'key');
  186. next();
  187. };
  188. client.setValueSerializer(function(_, cb) {
  189. valueSerializerCalled = true;
  190. t.deepEqual(_, v);
  191. setImmediate(function() {
  192. cb(null, Buffer.from('foo'));
  193. });
  194. });
  195. client.setKeySerializer(function(_, cb) {
  196. keySerializerCalled = true;
  197. t.deepEqual(_, k);
  198. setImmediate(function() {
  199. cb(null, 'key');
  200. });
  201. });
  202. client.produce('tawpic', 0, v, k, null, function() {
  203. });
  204. },
  205. // Promise API
  206. 'can use a value promise-based custom serializer': function(next) {
  207. var v = {
  208. disparaging: 'hyena',
  209. };
  210. var k = {
  211. delicious: 'cookie',
  212. };
  213. var valueSerializerCalled = false;
  214. var keySerializerCalled = false;
  215. client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
  216. t.equal(valueSerializerCalled, true);
  217. t.equal(keySerializerCalled, true);
  218. next();
  219. };
  220. client.setValueSerializer(function(_) {
  221. valueSerializerCalled = true;
  222. t.deepEqual(_, v);
  223. return new Promise(function(resolve) {
  224. resolve(Buffer.from(''));
  225. });
  226. });
  227. client.setKeySerializer(function(_) {
  228. keySerializerCalled = true;
  229. t.deepEqual(_, k);
  230. return null;
  231. });
  232. client.produce('tawpic', 0, v, k, null, function() {
  233. });
  234. },
  235. 'can use a key promise-based custom serializer': function(next) {
  236. var v = {
  237. disparaging: 'hyena',
  238. };
  239. var k = {
  240. delicious: 'cookie',
  241. };
  242. var valueSerializerCalled = false;
  243. var keySerializerCalled = false;
  244. client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
  245. t.equal(valueSerializerCalled, true);
  246. t.equal(keySerializerCalled, true);
  247. t.deepEqual(v, Buffer.from('foo'));
  248. t.equal(k, 'key');
  249. next();
  250. };
  251. client.setValueSerializer(function(_) {
  252. valueSerializerCalled = true;
  253. t.deepEqual(_, v);
  254. return Buffer.from('foo');
  255. });
  256. client.setKeySerializer(function(_) {
  257. keySerializerCalled = true;
  258. t.deepEqual(_, k);
  259. return new Promise(function(resolve) {
  260. resolve('key');
  261. });
  262. });
  263. client.produce('tawpic', 0, v, k, null, function() {
  264. });
  265. },
  266. 'can use two promise-based custom serializers': function(next) {
  267. var v = {
  268. disparaging: 'hyena',
  269. };
  270. var k = {
  271. delicious: 'cookie',
  272. };
  273. var valueSerializerCalled = false;
  274. var keySerializerCalled = false;
  275. client._oldProduce = function(topic, partition, v, k, timestamp, opaque) {
  276. t.equal(valueSerializerCalled, true);
  277. t.equal(keySerializerCalled, true);
  278. t.deepEqual(v, Buffer.from('foo'));
  279. t.equal(k, 'key');
  280. next();
  281. };
  282. client.setValueSerializer(function(_) {
  283. valueSerializerCalled = true;
  284. t.deepEqual(_, v);
  285. return new Promise(function(resolve) {
  286. resolve(Buffer.from('foo'));
  287. });
  288. });
  289. client.setKeySerializer(function(_) {
  290. keySerializerCalled = true;
  291. t.deepEqual(_, k);
  292. return new Promise(function(resolve) {
  293. resolve('key');
  294. });
  295. });
  296. client.produce('tawpic', 0, v, k, null, function() {
  297. });
  298. },
  299. 'bubbles up serializer errors in an async value serializer': function(next) {
  300. var v = {
  301. disparaging: 'hyena',
  302. };
  303. var k = {
  304. delicious: 'cookie',
  305. };
  306. client.setValueSerializer(function(_, cb) {
  307. t.deepEqual(_, v);
  308. setImmediate(function() {
  309. cb(new Error('even together we failed'));
  310. });
  311. });
  312. client.produce('tawpic', 0, v, k, null, function(err) {
  313. t.equal(typeof err, 'object', 'an error should be returned');
  314. next();
  315. });
  316. },
  317. 'bubbles up serializer errors in an async key serializer': function(next) {
  318. var v = {
  319. disparaging: 'hyena',
  320. };
  321. var k = {
  322. delicious: 'cookie',
  323. };
  324. client.setKeySerializer(function(_, cb) {
  325. t.deepEqual(_, v);
  326. setImmediate(function() {
  327. cb(new Error('even together we failed'));
  328. });
  329. });
  330. client.produce('tawpic', 0, v, k, null, function(err) {
  331. t.equal(typeof err, 'object', 'an error should be returned');
  332. next();
  333. });
  334. },
  335. 'bubbles up serializer errors in a sync value serializer': function(next) {
  336. var v = {
  337. disparaging: 'hyena',
  338. };
  339. var k = {
  340. delicious: 'cookie',
  341. };
  342. client.setValueSerializer(function(_, cb) {
  343. t.deepEqual(_, v);
  344. throw new Error('even together we failed');
  345. });
  346. client.produce('tawpic', 0, v, k, null, function(err) {
  347. t.equal(typeof err, 'object', 'an error should be returned');
  348. next();
  349. });
  350. },
  351. 'bubbles up serializer errors in a sync key serializer': function(next) {
  352. var v = {
  353. disparaging: 'hyena',
  354. };
  355. var k = {
  356. delicious: 'cookie',
  357. };
  358. client.setKeySerializer(function(_, cb) {
  359. t.deepEqual(_, v);
  360. throw new Error('even together we failed');
  361. });
  362. client.produce('tawpic', 0, v, k, null, function(err) {
  363. t.equal(typeof err, 'object', 'an error should be returned');
  364. next();
  365. });
  366. },
  367. 'bubbles up serializer errors in a promise-based value serializer': function(next) {
  368. var v = {
  369. disparaging: 'hyena',
  370. };
  371. var k = {
  372. delicious: 'cookie',
  373. };
  374. client.setValueSerializer(function(_) {
  375. t.deepEqual(_, v);
  376. return new Promise(function (resolve, reject) {
  377. reject(new Error('even together we failed'));
  378. });
  379. });
  380. client.produce('tawpic', 0, v, k, null, function(err) {
  381. t.equal(typeof err, 'object', 'an error should be returned');
  382. next();
  383. });
  384. },
  385. 'bubbles up serializer errors in a promise-based key serializer': function(next) {
  386. var v = {
  387. disparaging: 'hyena',
  388. };
  389. var k = {
  390. delicious: 'cookie',
  391. };
  392. client.setKeySerializer(function(_) {
  393. t.deepEqual(_, v);
  394. return new Promise(function(resolve, reject) {
  395. return new Promise(function (resolve, reject) {
  396. reject(new Error('even together we failed'));
  397. });
  398. });
  399. });
  400. client.produce('tawpic', 0, v, k, null, function(err) {
  401. t.equal(typeof err, 'object', 'an error should be returned');
  402. next();
  403. });
  404. },
  405. }
  406. },
  407. };