You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

347 lines
12 KiB

  1. import { Readable, ReadableOptions, Writable, WritableOptions } from 'stream';
  2. import { EventEmitter } from 'events';
  3. import {
  4. GlobalConfig,
  5. TopicConfig,
  6. ConsumerGlobalConfig,
  7. ConsumerTopicConfig,
  8. ProducerGlobalConfig,
  9. ProducerTopicConfig,
  10. } from './config';
  11. export * from './config';
  12. export * from './errors';
  13. export interface LibrdKafkaError {
  14. message: string;
  15. code: number;
  16. errno: number;
  17. origin: string;
  18. stack?: string;
  19. isFatal?: boolean;
  20. isRetriable?: boolean;
  21. isTxnRequiresAbort?: boolean;
  22. }
  23. export interface ReadyInfo {
  24. name: string;
  25. }
  26. export interface ClientMetrics {
  27. connectionOpened: number;
  28. }
  29. export interface MetadataOptions {
  30. topic?: string;
  31. allTopics?: boolean;
  32. timeout?: number;
  33. }
  34. export interface BrokerMetadata {
  35. id: number;
  36. host: string;
  37. port: number;
  38. }
  39. export interface PartitionMetadata {
  40. id: number;
  41. leader: number;
  42. replicas: number[];
  43. isrs: number[];
  44. }
  45. export interface TopicMetadata {
  46. name: string;
  47. partitions: PartitionMetadata[];
  48. }
  49. export interface Metadata {
  50. orig_broker_id: number;
  51. orig_broker_name: string;
  52. topics: TopicMetadata[];
  53. brokers: BrokerMetadata[];
  54. }
  55. export interface WatermarkOffsets{
  56. lowOffset: number;
  57. highOffset: number;
  58. }
  59. export interface TopicPartition {
  60. topic: string;
  61. partition: number;
  62. }
  63. export interface TopicPartitionOffset extends TopicPartition{
  64. offset: number;
  65. }
  66. export type TopicPartitionTime = TopicPartitionOffset;
  67. export type EofEvent = TopicPartitionOffset;
  68. export type Assignment = TopicPartition | TopicPartitionOffset;
  69. export interface DeliveryReport extends TopicPartitionOffset {
  70. value?: MessageValue;
  71. size: number;
  72. key?: MessageKey;
  73. timestamp?: number;
  74. opaque?: any;
  75. }
  76. export type NumberNullUndefined = number | null | undefined;
  77. export type MessageKey = Buffer | string | null | undefined;
  78. export type MessageHeader = { [key: string]: string | Buffer };
  79. export type MessageValue = Buffer | null;
  80. export type SubscribeTopic = string | RegExp;
  81. export type SubscribeTopicList = SubscribeTopic[];
  82. export interface Message extends TopicPartitionOffset {
  83. value: MessageValue;
  84. size: number;
  85. topic: string;
  86. key?: MessageKey;
  87. timestamp?: number;
  88. headers?: MessageHeader[];
  89. opaque?: any;
  90. }
  91. export interface ReadStreamOptions extends ReadableOptions {
  92. topics: SubscribeTopicList | SubscribeTopic | ((metadata: Metadata) => SubscribeTopicList);
  93. waitInterval?: number;
  94. fetchSize?: number;
  95. objectMode?: boolean;
  96. highWaterMark?: number;
  97. autoClose?: boolean;
  98. streamAsBatch?: boolean;
  99. connectOptions?: any;
  100. }
  101. export interface WriteStreamOptions extends WritableOptions {
  102. encoding?: string;
  103. objectMode?: boolean;
  104. topic?: string;
  105. autoClose?: boolean;
  106. pollInterval?: number;
  107. connectOptions?: any;
  108. }
  109. export interface ProducerStream extends Writable {
  110. producer: Producer;
  111. connect(metadataOptions?: MetadataOptions): void;
  112. close(cb?: () => void): void;
  113. }
  114. export interface ConsumerStream extends Readable {
  115. consumer: KafkaConsumer;
  116. connect(options: ConsumerGlobalConfig): void;
  117. close(cb?: () => void): void;
  118. }
  119. type KafkaClientEvents = 'disconnected' | 'ready' | 'connection.failure' | 'event.error' | 'event.stats' | 'event.log' | 'event.event' | 'event.throttle';
  120. type KafkaConsumerEvents = 'data' | 'partition.eof' | 'rebalance' | 'rebalance.error' | 'subscribed' | 'unsubscribed' | 'unsubscribe' | 'offset.commit' | KafkaClientEvents;
  121. type KafkaProducerEvents = 'delivery-report' | KafkaClientEvents;
  122. type EventListenerMap = {
  123. // ### Client
  124. // connectivity events
  125. 'disconnected': (metrics: ClientMetrics) => void,
  126. 'ready': (info: ReadyInfo, metadata: Metadata) => void,
  127. 'connection.failure': (error: LibrdKafkaError, metrics: ClientMetrics) => void,
  128. // event messages
  129. 'event.error': (error: LibrdKafkaError) => void,
  130. 'event.stats': (eventData: any) => void,
  131. 'event.log': (eventData: any) => void,
  132. 'event.event': (eventData: any) => void,
  133. 'event.throttle': (eventData: any) => void,
  134. // ### Consumer only
  135. // domain events
  136. 'data': (arg: Message) => void,
  137. 'partition.eof': (arg: EofEvent) => void,
  138. 'rebalance': (err: LibrdKafkaError, assignments: TopicPartition[]) => void,
  139. 'rebalance.error': (err: Error) => void,
  140. // connectivity events
  141. 'subscribed': (topics: SubscribeTopicList) => void,
  142. 'unsubscribe': () => void,
  143. 'unsubscribed': () => void,
  144. // offsets
  145. 'offset.commit': (error: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void,
  146. // ### Producer only
  147. // delivery
  148. 'delivery-report': (error: LibrdKafkaError, report: DeliveryReport) => void,
  149. }
  150. type EventListener<K extends string> = K extends keyof EventListenerMap ? EventListenerMap[K] : never;
  151. export abstract class Client<Events extends string> extends EventEmitter {
  152. constructor(globalConf: GlobalConfig, SubClientType: any, topicConf: TopicConfig);
  153. connect(metadataOptions?: MetadataOptions, cb?: (err: LibrdKafkaError, data: Metadata) => any): this;
  154. getClient(): any;
  155. connectedTime(): number;
  156. getLastError(): LibrdKafkaError;
  157. disconnect(cb?: (err: any, data: ClientMetrics) => any): this;
  158. disconnect(timeout: number, cb?: (err: any, data: ClientMetrics) => any): this;
  159. isConnected(): boolean;
  160. getMetadata(metadataOptions?: MetadataOptions, cb?: (err: LibrdKafkaError, data: Metadata) => any): any;
  161. queryWatermarkOffsets(topic: string, partition: number, timeout: number, cb?: (err: LibrdKafkaError, offsets: WatermarkOffsets) => any): any;
  162. queryWatermarkOffsets(topic: string, partition: number, cb?: (err: LibrdKafkaError, offsets: WatermarkOffsets) => any): any;
  163. on<E extends Events>(event: E, listener: EventListener<E>): this;
  164. once<E extends Events>(event: E, listener: EventListener<E>): this;
  165. }
  166. export class KafkaConsumer extends Client<KafkaConsumerEvents> {
  167. constructor(conf: ConsumerGlobalConfig, topicConf: ConsumerTopicConfig);
  168. assign(assignments: Assignment[]): this;
  169. assignments(): Assignment[];
  170. commit(topicPartition: TopicPartitionOffset | TopicPartitionOffset[]): this;
  171. commit(): this;
  172. commitMessage(msg: TopicPartitionOffset): this;
  173. commitMessageSync(msg: TopicPartitionOffset): this;
  174. commitSync(topicPartition: TopicPartitionOffset | TopicPartitionOffset[]): this;
  175. committed(toppars: TopicPartition[], timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this;
  176. committed(timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this;
  177. consume(number: number, cb?: (err: LibrdKafkaError, messages: Message[]) => void): void;
  178. consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void;
  179. consume(): void;
  180. getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets;
  181. offsetsStore(topicPartitions: TopicPartitionOffset[]): any;
  182. pause(topicPartitions: TopicPartition[]): any;
  183. position(toppars?: TopicPartition[]): TopicPartitionOffset[];
  184. resume(topicPartitions: TopicPartition[]): any;
  185. seek(toppar: TopicPartitionOffset, timeout: number | null, cb: (err: LibrdKafkaError) => void): this;
  186. setDefaultConsumeTimeout(timeoutMs: number): void;
  187. setDefaultConsumeLoopTimeoutDelay(timeoutMs: number): void;
  188. subscribe(topics: SubscribeTopicList): this;
  189. subscription(): string[];
  190. unassign(): this;
  191. unsubscribe(): this;
  192. offsetsForTimes(topicPartitions: TopicPartitionTime[], timeout: number, cb?: (err: LibrdKafkaError, offsets: TopicPartitionOffset[]) => any): void;
  193. offsetsForTimes(topicPartitions: TopicPartitionTime[], cb?: (err: LibrdKafkaError, offsets: TopicPartitionOffset[]) => any): void;
  194. static createReadStream(conf: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig, streamOptions: ReadStreamOptions | number): ConsumerStream;
  195. }
  196. export class Producer extends Client<KafkaProducerEvents> {
  197. constructor(conf: ProducerGlobalConfig, topicConf?: ProducerTopicConfig);
  198. flush(timeout?: NumberNullUndefined, cb?: (err: LibrdKafkaError) => void): this;
  199. poll(): this;
  200. produce(topic: string, partition: NumberNullUndefined, message: MessageValue, key?: MessageKey, timestamp?: NumberNullUndefined, opaque?: any, headers?: MessageHeader[]): any;
  201. setPollInterval(interval: number): this;
  202. static createWriteStream(conf: ProducerGlobalConfig, topicConf: ProducerTopicConfig, streamOptions: WriteStreamOptions): ProducerStream;
  203. initTransactions(cb: (err: LibrdKafkaError) => void): void;
  204. initTransactions(timeout: number, cb: (err: LibrdKafkaError) => void): void;
  205. beginTransaction(cb: (err: LibrdKafkaError) => void): void;
  206. commitTransaction(cb: (err: LibrdKafkaError) => void): void;
  207. commitTransaction(timeout: number, cb: (err: LibrdKafkaError) => void): void;
  208. abortTransaction(cb: (err: LibrdKafkaError) => void): void;
  209. abortTransaction(timeout: number, cb: (err: LibrdKafkaError) => void): void;
  210. sendOffsetsToTransaction(offsets: TopicPartitionOffset[], consumer: KafkaConsumer, cb: (err: LibrdKafkaError) => void): void;
  211. sendOffsetsToTransaction(offsets: TopicPartitionOffset[], consumer: KafkaConsumer, timeout: number, cb: (err: LibrdKafkaError) => void): void;
  212. }
  213. export class HighLevelProducer extends Producer {
  214. produce(topic: string, partition: NumberNullUndefined, message: any, key: any, timestamp: NumberNullUndefined, callback: (err: any, offset?: NumberNullUndefined) => void): any;
  215. produce(topic: string, partition: NumberNullUndefined, message: any, key: any, timestamp: NumberNullUndefined, headers: MessageHeader[], callback: (err: any, offset?: NumberNullUndefined) => void): any;
  216. setKeySerializer(serializer: (key: any, cb: (err: any, key: MessageKey) => void) => void): void;
  217. setKeySerializer(serializer: (key: any) => MessageKey | Promise<MessageKey>): void;
  218. setValueSerializer(serializer: (value: any, cb: (err: any, value: MessageValue) => void) => void): void;
  219. setValueSerializer(serializer: (value: any) => MessageValue | Promise<MessageValue>): void;
  220. }
  221. export const features: string[];
  222. export const librdkafkaVersion: string;
  223. export function createReadStream(conf: ConsumerGlobalConfig, topicConf: ConsumerTopicConfig, streamOptions: ReadStreamOptions | number): ConsumerStream;
  224. export function createWriteStream(conf: ProducerGlobalConfig, topicConf: ProducerTopicConfig, streamOptions: WriteStreamOptions): ProducerStream;
  225. export interface NewTopic {
  226. topic: string;
  227. num_partitions: number;
  228. replication_factor: number;
  229. config?: {
  230. 'cleanup.policy'?: 'delete' | 'compact' | 'delete,compact' | 'compact,delete';
  231. 'compression.type'?: 'gzip' | 'snappy' | 'lz4' | 'zstd' | 'uncompressed' | 'producer';
  232. 'delete.retention.ms'?: string;
  233. 'file.delete.delay.ms'?: string;
  234. 'flush.messages'?: string;
  235. 'flush.ms'?: string;
  236. 'follower.replication.throttled.replicas'?: string;
  237. 'index.interval.bytes'?: string;
  238. 'leader.replication.throttled.replicas'?: string;
  239. 'max.compaction.lag.ms'?: string;
  240. 'max.message.bytes'?: string;
  241. 'message.format.version'?: string;
  242. 'message.timestamp.difference.max.ms'?: string;
  243. 'message.timestamp.type'?: string;
  244. 'min.cleanable.dirty.ratio'?: string;
  245. 'min.compaction.lag.ms'?: string;
  246. 'min.insync.replicas'?: string;
  247. 'preallocate'?: string;
  248. 'retention.bytes'?: string;
  249. 'retention.ms'?: string;
  250. 'segment.bytes'?: string;
  251. 'segment.index.bytes'?: string;
  252. 'segment.jitter.ms'?: string;
  253. 'segment.ms'?: string;
  254. 'unclean.leader.election.enable'?: string;
  255. 'message.downconversion.enable'?: string;
  256. } | { [cfg: string]: string; };
  257. }
  258. export interface IAdminClient {
  259. createTopic(topic: NewTopic, cb?: (err: LibrdKafkaError) => void): void;
  260. createTopic(topic: NewTopic, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;
  261. deleteTopic(topic: string, cb?: (err: LibrdKafkaError) => void): void;
  262. deleteTopic(topic: string, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;
  263. createPartitions(topic: string, desiredPartitions: number, cb?: (err: LibrdKafkaError) => void): void;
  264. createPartitions(topic: string, desiredPartitions: number, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;
  265. disconnect(): void;
  266. }
  267. export abstract class AdminClient {
  268. static create(conf: GlobalConfig): IAdminClient;
  269. }