|
|
- import { Readable, ReadableOptions, Writable, WritableOptions } from 'stream';
- import { EventEmitter } from 'events';
- import {
- GlobalConfig,
- TopicConfig,
- ConsumerGlobalConfig,
- ConsumerTopicConfig,
- ProducerGlobalConfig,
- ProducerTopicConfig,
- } from './config';
-
- export * from './config';
- export * from './errors';
-
- export interface LibrdKafkaError {
- message: string;
- code: number;
- errno: number;
- origin: string;
- stack?: string;
- isFatal?: boolean;
- isRetriable?: boolean;
- isTxnRequiresAbort?: boolean;
- }
-
- export interface ReadyInfo {
- name: string;
- }
-
- export interface ClientMetrics {
- connectionOpened: number;
- }
-
- export interface MetadataOptions {
- topic?: string;
- allTopics?: boolean;
- timeout?: number;
- }
-
- export interface BrokerMetadata {
- id: number;
- host: string;
- port: number;
- }
-
- export interface PartitionMetadata {
- id: number;
- leader: number;
- replicas: number[];
- isrs: number[];
- }
-
- export interface TopicMetadata {
- name: string;
- partitions: PartitionMetadata[];
- }
-
- export interface Metadata {
- orig_broker_id: number;
- orig_broker_name: string;
- topics: TopicMetadata[];
- brokers: BrokerMetadata[];
- }
-
- export interface WatermarkOffsets{
- lowOffset: number;
- highOffset: number;
- }
-
- export interface TopicPartition {
- topic: string;
- partition: number;
- }
-
- export interface TopicPartitionOffset extends TopicPartition{
- offset: number;
- }
-
- export type TopicPartitionTime = TopicPartitionOffset;
-
- export type EofEvent = TopicPartitionOffset;
-
- export type Assignment = TopicPartition | TopicPartitionOffset;
-
- export interface DeliveryReport extends TopicPartitionOffset {
- value?: MessageValue;
- size: number;
- key?: MessageKey;
- timestamp?: number;
- opaque?: any;
- }
-
- export type NumberNullUndefined = number | null | undefined;
-
- export type MessageKey = Buffer | string | null | undefined;
- export type MessageHeader = { [key: string]: string | Buffer };
- export type MessageValue = Buffer | null;
- export type SubscribeTopic = string | RegExp;
- export type SubscribeTopicList = SubscribeTopic[];
-
- export interface Message extends TopicPartitionOffset {
- value: MessageValue;
- size: number;
- topic: string;
- key?: MessageKey;
- timestamp?: number;
- headers?: MessageHeader[];
- opaque?: any;
- }
-
- export interface ReadStreamOptions extends ReadableOptions {
- topics: SubscribeTopicList | SubscribeTopic | ((metadata: Metadata) => SubscribeTopicList);
- waitInterval?: number;
- fetchSize?: number;
- objectMode?: boolean;
- highWaterMark?: number;
- autoClose?: boolean;
- streamAsBatch?: boolean;
- connectOptions?: any;
- }
-
- export interface WriteStreamOptions extends WritableOptions {
- encoding?: string;
- objectMode?: boolean;
- topic?: string;
- autoClose?: boolean;
- pollInterval?: number;
- connectOptions?: any;
- }
-
- export interface ProducerStream extends Writable {
- producer: Producer;
- connect(metadataOptions?: MetadataOptions): void;
- close(cb?: () => void): void;
- }
-
- export interface ConsumerStream extends Readable {
- consumer: KafkaConsumer;
- connect(options: ConsumerGlobalConfig): void;
- close(cb?: () => void): void;
- }
-
- type KafkaClientEvents = 'disconnected' | 'ready' | 'connection.failure' | 'event.error' | 'event.stats' | 'event.log' | 'event.event' | 'event.throttle';
- type KafkaConsumerEvents = 'data' | 'partition.eof' | 'rebalance' | 'rebalance.error' | 'subscribed' | 'unsubscribed' | 'unsubscribe' | 'offset.commit' | KafkaClientEvents;
- type KafkaProducerEvents = 'delivery-report' | KafkaClientEvents;
-
- type EventListenerMap = {
- // ### Client
- // connectivity events
- 'disconnected': (metrics: ClientMetrics) => void,
- 'ready': (info: ReadyInfo, metadata: Metadata) => void,
- 'connection.failure': (error: LibrdKafkaError, metrics: ClientMetrics) => void,
- // event messages
- 'event.error': (error: LibrdKafkaError) => void,
- 'event.stats': (eventData: any) => void,
- 'event.log': (eventData: any) => void,
- 'event.event': (eventData: any) => void,
- 'event.throttle': (eventData: any) => void,
- // ### Consumer only
- // domain events
- 'data': (arg: Message) => void,
- 'partition.eof': (arg: EofEvent) => void,
- 'rebalance': (err: LibrdKafkaError, assignments: TopicPartition[]) => void,
- 'rebalance.error': (err: Error) => void,
- // connectivity events
- 'subscribed': (topics: SubscribeTopicList) => void,
- 'unsubscribe': () => void,
- 'unsubscribed': () => void,
- // offsets
- 'offset.commit': (error: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void,
- // ### Producer only
- // delivery
- 'delivery-report': (error: LibrdKafkaError, report: DeliveryReport) => void,
- }
-
- type EventListener<K extends string> = K extends keyof EventListenerMap ? EventListenerMap[K] : never;
-
- export abstract class Client<Events extends string> extends EventEmitter {
- constructor(globalConf: GlobalConfig, SubClientType: any, topicConf: TopicConfig);
-
- connect(metadataOptions?: MetadataOptions, cb?: (err: LibrdKafkaError, data: Metadata) => any): this;
-
- getClient(): any;
-
- connectedTime(): number;
-
- getLastError(): LibrdKafkaError;
-
- disconnect(cb?: (err: any, data: ClientMetrics) => any): this;
- disconnect(timeout: number, cb?: (err: any, data: ClientMetrics) => any): this;
-
- isConnected(): boolean;
-
- getMetadata(metadataOptions?: MetadataOptions, cb?: (err: LibrdKafkaError, data: Metadata) => any): any;
-
- queryWatermarkOffsets(topic: string, partition: number, timeout: number, cb?: (err: LibrdKafkaError, offsets: WatermarkOffsets) => any): any;
- queryWatermarkOffsets(topic: string, partition: number, cb?: (err: LibrdKafkaError, offsets: WatermarkOffsets) => any): any;
-
- on<E extends Events>(event: E, listener: EventListener<E>): this;
- once<E extends Events>(event: E, listener: EventListener<E>): this;
- }
-
- export class KafkaConsumer extends Client<KafkaConsumerEvents> {
- constructor(conf: ConsumerGlobalConfig, topicConf: ConsumerTopicConfig);
-
- assign(assignments: Assignment[]): this;
-
- assignments(): Assignment[];
-
- commit(topicPartition: TopicPartitionOffset | TopicPartitionOffset[]): this;
- commit(): this;
-
- commitMessage(msg: TopicPartitionOffset): this;
-
- commitMessageSync(msg: TopicPartitionOffset): this;
-
- commitSync(topicPartition: TopicPartitionOffset | TopicPartitionOffset[]): this;
-
- committed(toppars: TopicPartition[], timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this;
- committed(timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this;
-
- consume(number: number, cb?: (err: LibrdKafkaError, messages: Message[]) => void): void;
- consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void;
- consume(): void;
-
- getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets;
-
- offsetsStore(topicPartitions: TopicPartitionOffset[]): any;
-
- pause(topicPartitions: TopicPartition[]): any;
-
- position(toppars?: TopicPartition[]): TopicPartitionOffset[];
-
- resume(topicPartitions: TopicPartition[]): any;
-
- seek(toppar: TopicPartitionOffset, timeout: number | null, cb: (err: LibrdKafkaError) => void): this;
-
- setDefaultConsumeTimeout(timeoutMs: number): void;
-
- setDefaultConsumeLoopTimeoutDelay(timeoutMs: number): void;
-
- subscribe(topics: SubscribeTopicList): this;
-
- subscription(): string[];
-
- unassign(): this;
-
- unsubscribe(): this;
-
- offsetsForTimes(topicPartitions: TopicPartitionTime[], timeout: number, cb?: (err: LibrdKafkaError, offsets: TopicPartitionOffset[]) => any): void;
- offsetsForTimes(topicPartitions: TopicPartitionTime[], cb?: (err: LibrdKafkaError, offsets: TopicPartitionOffset[]) => any): void;
-
- static createReadStream(conf: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig, streamOptions: ReadStreamOptions | number): ConsumerStream;
- }
-
- export class Producer extends Client<KafkaProducerEvents> {
- constructor(conf: ProducerGlobalConfig, topicConf?: ProducerTopicConfig);
-
- flush(timeout?: NumberNullUndefined, cb?: (err: LibrdKafkaError) => void): this;
-
- poll(): this;
-
- produce(topic: string, partition: NumberNullUndefined, message: MessageValue, key?: MessageKey, timestamp?: NumberNullUndefined, opaque?: any, headers?: MessageHeader[]): any;
-
- setPollInterval(interval: number): this;
-
- static createWriteStream(conf: ProducerGlobalConfig, topicConf: ProducerTopicConfig, streamOptions: WriteStreamOptions): ProducerStream;
-
- initTransactions(cb: (err: LibrdKafkaError) => void): void;
- initTransactions(timeout: number, cb: (err: LibrdKafkaError) => void): void;
- beginTransaction(cb: (err: LibrdKafkaError) => void): void;
- commitTransaction(cb: (err: LibrdKafkaError) => void): void;
- commitTransaction(timeout: number, cb: (err: LibrdKafkaError) => void): void;
- abortTransaction(cb: (err: LibrdKafkaError) => void): void;
- abortTransaction(timeout: number, cb: (err: LibrdKafkaError) => void): void;
- sendOffsetsToTransaction(offsets: TopicPartitionOffset[], consumer: KafkaConsumer, cb: (err: LibrdKafkaError) => void): void;
- sendOffsetsToTransaction(offsets: TopicPartitionOffset[], consumer: KafkaConsumer, timeout: number, cb: (err: LibrdKafkaError) => void): void;
- }
-
- export class HighLevelProducer extends Producer {
- produce(topic: string, partition: NumberNullUndefined, message: any, key: any, timestamp: NumberNullUndefined, callback: (err: any, offset?: NumberNullUndefined) => void): any;
- produce(topic: string, partition: NumberNullUndefined, message: any, key: any, timestamp: NumberNullUndefined, headers: MessageHeader[], callback: (err: any, offset?: NumberNullUndefined) => void): any;
-
- setKeySerializer(serializer: (key: any, cb: (err: any, key: MessageKey) => void) => void): void;
- setKeySerializer(serializer: (key: any) => MessageKey | Promise<MessageKey>): void;
- setValueSerializer(serializer: (value: any, cb: (err: any, value: MessageValue) => void) => void): void;
- setValueSerializer(serializer: (value: any) => MessageValue | Promise<MessageValue>): void;
- }
-
- export const features: string[];
-
- export const librdkafkaVersion: string;
-
- export function createReadStream(conf: ConsumerGlobalConfig, topicConf: ConsumerTopicConfig, streamOptions: ReadStreamOptions | number): ConsumerStream;
-
- export function createWriteStream(conf: ProducerGlobalConfig, topicConf: ProducerTopicConfig, streamOptions: WriteStreamOptions): ProducerStream;
-
- export interface NewTopic {
- topic: string;
- num_partitions: number;
- replication_factor: number;
- config?: {
- 'cleanup.policy'?: 'delete' | 'compact' | 'delete,compact' | 'compact,delete';
- 'compression.type'?: 'gzip' | 'snappy' | 'lz4' | 'zstd' | 'uncompressed' | 'producer';
- 'delete.retention.ms'?: string;
- 'file.delete.delay.ms'?: string;
- 'flush.messages'?: string;
- 'flush.ms'?: string;
- 'follower.replication.throttled.replicas'?: string;
- 'index.interval.bytes'?: string;
- 'leader.replication.throttled.replicas'?: string;
- 'max.compaction.lag.ms'?: string;
- 'max.message.bytes'?: string;
- 'message.format.version'?: string;
- 'message.timestamp.difference.max.ms'?: string;
- 'message.timestamp.type'?: string;
- 'min.cleanable.dirty.ratio'?: string;
- 'min.compaction.lag.ms'?: string;
- 'min.insync.replicas'?: string;
- 'preallocate'?: string;
- 'retention.bytes'?: string;
- 'retention.ms'?: string;
- 'segment.bytes'?: string;
- 'segment.index.bytes'?: string;
- 'segment.jitter.ms'?: string;
- 'segment.ms'?: string;
- 'unclean.leader.election.enable'?: string;
- 'message.downconversion.enable'?: string;
- } | { [cfg: string]: string; };
- }
-
- export interface IAdminClient {
- createTopic(topic: NewTopic, cb?: (err: LibrdKafkaError) => void): void;
- createTopic(topic: NewTopic, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;
-
- deleteTopic(topic: string, cb?: (err: LibrdKafkaError) => void): void;
- deleteTopic(topic: string, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;
-
- createPartitions(topic: string, desiredPartitions: number, cb?: (err: LibrdKafkaError) => void): void;
- createPartitions(topic: string, desiredPartitions: number, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;
-
- disconnect(): void;
- }
-
- export abstract class AdminClient {
- static create(conf: GlobalConfig): IAdminClient;
- }
|