import { Readable, ReadableOptions, Writable, WritableOptions } from 'stream';
import { EventEmitter } from 'events';
import {
} from './config';
export * from './config';
export * from './errors';
export interface LibrdKafkaError {
message: string;
code: number;
errno: number;
origin: string;
stack?: string;
isFatal?: boolean;
isRetriable?: boolean;
isTxnRequiresAbort?: boolean;
export interface ReadyInfo {
name: string;
export interface ClientMetrics {
connectionOpened: number;
export interface MetadataOptions {
topic?: string;
allTopics?: boolean;
timeout?: number;
export interface BrokerMetadata {
id: number;
host: string;
port: number;
export interface PartitionMetadata {
id: number;
leader: number;
replicas: number[];
isrs: number[];
export interface TopicMetadata {
name: string;
partitions: PartitionMetadata[];
export interface Metadata {
orig_broker_id: number;
orig_broker_name: string;
topics: TopicMetadata[];
brokers: BrokerMetadata[];
export interface WatermarkOffsets{
lowOffset: number;
highOffset: number;
export interface TopicPartition {
topic: string;
partition: number;
export interface TopicPartitionOffset extends TopicPartition{
offset: number;
export type TopicPartitionTime = TopicPartitionOffset;
export type EofEvent = TopicPartitionOffset;
export type Assignment = TopicPartition | TopicPartitionOffset;
export interface DeliveryReport extends TopicPartitionOffset {
value?: MessageValue;
size: number;
key?: MessageKey;
timestamp?: number;
opaque?: any;
export type NumberNullUndefined = number | null | undefined;
export type MessageKey = Buffer | string | null | undefined;
export type MessageHeader = { [key: string]: string | Buffer };
export type MessageValue = Buffer | null;
export type SubscribeTopic = string | RegExp;
export type SubscribeTopicList = SubscribeTopic[];
export interface Message extends TopicPartitionOffset {
value: MessageValue;
size: number;
topic: string;
key?: MessageKey;
timestamp?: number;
headers?: MessageHeader[];
opaque?: any;
export interface ReadStreamOptions extends ReadableOptions {
topics: SubscribeTopicList | SubscribeTopic | ((metadata: Metadata) => SubscribeTopicList);
waitInterval?: number;
fetchSize?: number;
objectMode?: boolean;
highWaterMark?: number;
autoClose?: boolean;
streamAsBatch?: boolean;
connectOptions?: any;
export interface WriteStreamOptions extends WritableOptions {
encoding?: string;
objectMode?: boolean;
topic?: string;
autoClose?: boolean;
pollInterval?: number;
connectOptions?: any;
export interface ProducerStream extends Writable {
producer: Producer;
connect(metadataOptions?: MetadataOptions): void;
close(cb?: () => void): void;
export interface ConsumerStream extends Readable {
consumer: KafkaConsumer;
connect(options: ConsumerGlobalConfig): void;
close(cb?: () => void): void;
type KafkaClientEvents = 'disconnected' | 'ready' | 'connection.failure' | 'event.error' | 'event.stats' | 'event.log' | 'event.event' | 'event.throttle';
type KafkaConsumerEvents = 'data' | 'partition.eof' | 'rebalance' | 'rebalance.error' | 'subscribed' | 'unsubscribed' | 'unsubscribe' | 'offset.commit' | KafkaClientEvents;
type KafkaProducerEvents = 'delivery-report' | KafkaClientEvents;
type EventListenerMap = {
// ### Client
// connectivity events
'disconnected': (metrics: ClientMetrics) => void,
'ready': (info: ReadyInfo, metadata: Metadata) => void,
'connection.failure': (error: LibrdKafkaError, metrics: ClientMetrics) => void,
// event messages
'event.error': (error: LibrdKafkaError) => void,
'event.stats': (eventData: any) => void,
'event.log': (eventData: any) => void,
'event.event': (eventData: any) => void,
'event.throttle': (eventData: any) => void,
// ### Consumer only
// domain events
'data': (arg: Message) => void,
'partition.eof': (arg: EofEvent) => void,
'rebalance': (err: LibrdKafkaError, assignments: TopicPartition[]) => void,
'rebalance.error': (err: Error) => void,
// connectivity events
'subscribed': (topics: SubscribeTopicList) => void,
'unsubscribe': () => void,
'unsubscribed': () => void,
// offsets
'offset.commit': (error: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void,
// ### Producer only
// delivery
'delivery-report': (error: LibrdKafkaError, report: DeliveryReport) => void,
type EventListener<K extends string> = K extends keyof EventListenerMap ? EventListenerMap[K] : never;
export abstract class Client<Events extends string> extends EventEmitter {
constructor(globalConf: GlobalConfig, SubClientType: any, topicConf: TopicConfig);
connect(metadataOptions?: MetadataOptions, cb?: (err: LibrdKafkaError, data: Metadata) => any): this;
getClient(): any;
connectedTime(): number;
getLastError(): LibrdKafkaError;
disconnect(cb?: (err: any, data: ClientMetrics) => any): this;
disconnect(timeout: number, cb?: (err: any, data: ClientMetrics) => any): this;
isConnected(): boolean;
getMetadata(metadataOptions?: MetadataOptions, cb?: (err: LibrdKafkaError, data: Metadata) => any): any;
queryWatermarkOffsets(topic: string, partition: number, timeout: number, cb?: (err: LibrdKafkaError, offsets: WatermarkOffsets) => any): any;
queryWatermarkOffsets(topic: string, partition: number, cb?: (err: LibrdKafkaError, offsets: WatermarkOffsets) => any): any;
on<E extends Events>(event: E, listener: EventListener<E>): this;
once<E extends Events>(event: E, listener: EventListener<E>): this;
export class KafkaConsumer extends Client<KafkaConsumerEvents> {
constructor(conf: ConsumerGlobalConfig, topicConf: ConsumerTopicConfig);
assign(assignments: Assignment[]): this;
assignments(): Assignment[];
commit(topicPartition: TopicPartitionOffset | TopicPartitionOffset[]): this;
commit(): this;
commitMessage(msg: TopicPartitionOffset): this;
commitMessageSync(msg: TopicPartitionOffset): this;
commitSync(topicPartition: TopicPartitionOffset | TopicPartitionOffset[]): this;
committed(toppars: TopicPartition[], timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this;
committed(timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this;
consume(number: number, cb?: (err: LibrdKafkaError, messages: Message[]) => void): void;
consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void;
consume(): void;
getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets;
offsetsStore(topicPartitions: TopicPartitionOffset[]): any;
pause(topicPartitions: TopicPartition[]): any;
position(toppars?: TopicPartition[]): TopicPartitionOffset[];
resume(topicPartitions: TopicPartition[]): any;
seek(toppar: TopicPartitionOffset, timeout: number | null, cb: (err: LibrdKafkaError) => void): this;
setDefaultConsumeTimeout(timeoutMs: number): void;
setDefaultConsumeLoopTimeoutDelay(timeoutMs: number): void;
subscribe(topics: SubscribeTopicList): this;
subscription(): string[];
unassign(): this;
unsubscribe(): this;
offsetsForTimes(topicPartitions: TopicPartitionTime[], timeout: number, cb?: (err: LibrdKafkaError, offsets: TopicPartitionOffset[]) => any): void;
offsetsForTimes(topicPartitions: TopicPartitionTime[], cb?: (err: LibrdKafkaError, offsets: TopicPartitionOffset[]) => any): void;
static createReadStream(conf: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig, streamOptions: ReadStreamOptions | number): ConsumerStream;
export class Producer extends Client<KafkaProducerEvents> {
constructor(conf: ProducerGlobalConfig, topicConf?: ProducerTopicConfig);
flush(timeout?: NumberNullUndefined, cb?: (err: LibrdKafkaError) => void): this;
poll(): this;
produce(topic: string, partition: NumberNullUndefined, message: MessageValue, key?: MessageKey, timestamp?: NumberNullUndefined, opaque?: any, headers?: MessageHeader[]): any;
setPollInterval(interval: number): this;
static createWriteStream(conf: ProducerGlobalConfig, topicConf: ProducerTopicConfig, streamOptions: WriteStreamOptions): ProducerStream;
initTransactions(cb: (err: LibrdKafkaError) => void): void;
initTransactions(timeout: number, cb: (err: LibrdKafkaError) => void): void;
beginTransaction(cb: (err: LibrdKafkaError) => void): void;
commitTransaction(cb: (err: LibrdKafkaError) => void): void;
commitTransaction(timeout: number, cb: (err: LibrdKafkaError) => void): void;
abortTransaction(cb: (err: LibrdKafkaError) => void): void;
abortTransaction(timeout: number, cb: (err: LibrdKafkaError) => void): void;
sendOffsetsToTransaction(offsets: TopicPartitionOffset[], consumer: KafkaConsumer, cb: (err: LibrdKafkaError) => void): void;
sendOffsetsToTransaction(offsets: TopicPartitionOffset[], consumer: KafkaConsumer, timeout: number, cb: (err: LibrdKafkaError) => void): void;
export class HighLevelProducer extends Producer {
produce(topic: string, partition: NumberNullUndefined, message: any, key: any, timestamp: NumberNullUndefined, callback: (err: any, offset?: NumberNullUndefined) => void): any;
produce(topic: string, partition: NumberNullUndefined, message: any, key: any, timestamp: NumberNullUndefined, headers: MessageHeader[], callback: (err: any, offset?: NumberNullUndefined) => void): any;
setKeySerializer(serializer: (key: any, cb: (err: any, key: MessageKey) => void) => void): void;
setKeySerializer(serializer: (key: any) => MessageKey | Promise<MessageKey>): void;
setValueSerializer(serializer: (value: any, cb: (err: any, value: MessageValue) => void) => void): void;
setValueSerializer(serializer: (value: any) => MessageValue | Promise<MessageValue>): void;
export const features: string[];
export const librdkafkaVersion: string;
export function createReadStream(conf: ConsumerGlobalConfig, topicConf: ConsumerTopicConfig, streamOptions: ReadStreamOptions | number): ConsumerStream;
export function createWriteStream(conf: ProducerGlobalConfig, topicConf: ProducerTopicConfig, streamOptions: WriteStreamOptions): ProducerStream;
export interface NewTopic {
topic: string;
num_partitions: number;
replication_factor: number;
config?: {
'cleanup.policy'?: 'delete' | 'compact' | 'delete,compact' | 'compact,delete';
'compression.type'?: 'gzip' | 'snappy' | 'lz4' | 'zstd' | 'uncompressed' | 'producer';
''?: string;
''?: string;
'flush.messages'?: string;
''?: string;
'follower.replication.throttled.replicas'?: string;
'index.interval.bytes'?: string;
'leader.replication.throttled.replicas'?: string;
''?: string;
'max.message.bytes'?: string;
'message.format.version'?: string;
''?: string;
'message.timestamp.type'?: string;
'min.cleanable.dirty.ratio'?: string;
''?: string;
'min.insync.replicas'?: string;
'preallocate'?: string;
'retention.bytes'?: string;
''?: string;
'segment.bytes'?: string;
'segment.index.bytes'?: string;
''?: string;
''?: string;
'unclean.leader.election.enable'?: string;
'message.downconversion.enable'?: string;
} | { [cfg: string]: string; };
export interface IAdminClient {
createTopic(topic: NewTopic, cb?: (err: LibrdKafkaError) => void): void;
createTopic(topic: NewTopic, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;
deleteTopic(topic: string, cb?: (err: LibrdKafkaError) => void): void;
deleteTopic(topic: string, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;
createPartitions(topic: string, desiredPartitions: number, cb?: (err: LibrdKafkaError) => void): void;
createPartitions(topic: string, desiredPartitions: number, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;
disconnect(): void;
export abstract class AdminClient {
static create(conf: GlobalConfig): IAdminClient;