You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

470 lines
16 KiB

/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
module.exports = LibrdKafkaError;
var util = require('util');
var librdkafka = require('../librdkafka');
util.inherits(LibrdKafkaError, Error);
LibrdKafkaError.create = createLibrdkafkaError;
LibrdKafkaError.wrap = errorWrap;
/**
* Enum for identifying errors reported by the library
*
* You can find this list in the C++ code at
* https://github.com/edenhill/librdkafka/blob/master/src-cpp/rdkafkacpp.h#L148
*
* @readonly
* @enum {number}
* @constant
*/
// ====== Generated from librdkafka 1.6.1 file src-cpp/rdkafkacpp.h ======
LibrdKafkaError.codes = {
/* Internal errors to rdkafka: */
/** Begin internal error codes */
ERR__BEGIN: -200,
/** Received message is incorrect */
ERR__BAD_MSG: -199,
/** Bad/unknown compression */
ERR__BAD_COMPRESSION: -198,
/** Broker is going away */
ERR__DESTROY: -197,
/** Generic failure */
ERR__FAIL: -196,
/** Broker transport failure */
ERR__TRANSPORT: -195,
/** Critical system resource */
ERR__CRIT_SYS_RESOURCE: -194,
/** Failed to resolve broker */
ERR__RESOLVE: -193,
/** Produced message timed out*/
ERR__MSG_TIMED_OUT: -192,
/** Reached the end of the topic+partition queue on
* the broker. Not really an error.
* This event is disabled by default,
* see the `enable.partition.eof` configuration property. */
ERR__PARTITION_EOF: -191,
/** Permanent: Partition does not exist in cluster. */
ERR__UNKNOWN_PARTITION: -190,
/** File or filesystem error */
ERR__FS: -189,
/** Permanent: Topic does not exist in cluster. */
ERR__UNKNOWN_TOPIC: -188,
/** All broker connections are down. */
ERR__ALL_BROKERS_DOWN: -187,
/** Invalid argument, or invalid configuration */
ERR__INVALID_ARG: -186,
/** Operation timed out */
ERR__TIMED_OUT: -185,
/** Queue is full */
ERR__QUEUE_FULL: -184,
/** ISR count < required.acks */
ERR__ISR_INSUFF: -183,
/** Broker node update */
ERR__NODE_UPDATE: -182,
/** SSL error */
ERR__SSL: -181,
/** Waiting for coordinator to become available. */
ERR__WAIT_COORD: -180,
/** Unknown client group */
ERR__UNKNOWN_GROUP: -179,
/** Operation in progress */
ERR__IN_PROGRESS: -178,
/** Previous operation in progress, wait for it to finish. */
ERR__PREV_IN_PROGRESS: -177,
/** This operation would interfere with an existing subscription */
ERR__EXISTING_SUBSCRIPTION: -176,
/** Assigned partitions (rebalance_cb) */
ERR__ASSIGN_PARTITIONS: -175,
/** Revoked partitions (rebalance_cb) */
ERR__REVOKE_PARTITIONS: -174,
/** Conflicting use */
ERR__CONFLICT: -173,
/** Wrong state */
ERR__STATE: -172,
/** Unknown protocol */
ERR__UNKNOWN_PROTOCOL: -171,
/** Not implemented */
ERR__NOT_IMPLEMENTED: -170,
/** Authentication failure*/
ERR__AUTHENTICATION: -169,
/** No stored offset */
ERR__NO_OFFSET: -168,
/** Outdated */
ERR__OUTDATED: -167,
/** Timed out in queue */
ERR__TIMED_OUT_QUEUE: -166,
/** Feature not supported by broker */
ERR__UNSUPPORTED_FEATURE: -165,
/** Awaiting cache update */
ERR__WAIT_CACHE: -164,
/** Operation interrupted */
ERR__INTR: -163,
/** Key serialization error */
ERR__KEY_SERIALIZATION: -162,
/** Value serialization error */
ERR__VALUE_SERIALIZATION: -161,
/** Key deserialization error */
ERR__KEY_DESERIALIZATION: -160,
/** Value deserialization error */
ERR__VALUE_DESERIALIZATION: -159,
/** Partial response */
ERR__PARTIAL: -158,
/** Modification attempted on read-only object */
ERR__READ_ONLY: -157,
/** No such entry / item not found */
ERR__NOENT: -156,
/** Read underflow */
ERR__UNDERFLOW: -155,
/** Invalid type */
ERR__INVALID_TYPE: -154,
/** Retry operation */
ERR__RETRY: -153,
/** Purged in queue */
ERR__PURGE_QUEUE: -152,
/** Purged in flight */
ERR__PURGE_INFLIGHT: -151,
/** Fatal error: see RdKafka::Handle::fatal_error() */
ERR__FATAL: -150,
/** Inconsistent state */
ERR__INCONSISTENT: -149,
/** Gap-less ordering would not be guaranteed if proceeding */
ERR__GAPLESS_GUARANTEE: -148,
/** Maximum poll interval exceeded */
ERR__MAX_POLL_EXCEEDED: -147,
/** Unknown broker */
ERR__UNKNOWN_BROKER: -146,
/** Functionality not configured */
ERR__NOT_CONFIGURED: -145,
/** Instance has been fenced */
ERR__FENCED: -144,
/** Application generated error */
ERR__APPLICATION: -143,
/** Assignment lost */
ERR__ASSIGNMENT_LOST: -142,
/** No operation performed */
ERR__NOOP: -141,
/** No offset to automatically reset to */
ERR__AUTO_OFFSET_RESET: -140,
/** End internal error codes */
ERR__END: -100,
/* Kafka broker errors: */
/** Unknown broker error */
ERR_UNKNOWN: -1,
/** Success */
ERR_NO_ERROR: 0,
/** Offset out of range */
ERR_OFFSET_OUT_OF_RANGE: 1,
/** Invalid message */
ERR_INVALID_MSG: 2,
/** Unknown topic or partition */
ERR_UNKNOWN_TOPIC_OR_PART: 3,
/** Invalid message size */
ERR_INVALID_MSG_SIZE: 4,
/** Leader not available */
ERR_LEADER_NOT_AVAILABLE: 5,
/** Not leader for partition */
ERR_NOT_LEADER_FOR_PARTITION: 6,
/** Request timed out */
ERR_REQUEST_TIMED_OUT: 7,
/** Broker not available */
ERR_BROKER_NOT_AVAILABLE: 8,
/** Replica not available */
ERR_REPLICA_NOT_AVAILABLE: 9,
/** Message size too large */
ERR_MSG_SIZE_TOO_LARGE: 10,
/** StaleControllerEpochCode */
ERR_STALE_CTRL_EPOCH: 11,
/** Offset metadata string too large */
ERR_OFFSET_METADATA_TOO_LARGE: 12,
/** Broker disconnected before response received */
ERR_NETWORK_EXCEPTION: 13,
/** Coordinator load in progress */
ERR_COORDINATOR_LOAD_IN_PROGRESS: 14,
/** Group coordinator load in progress */
ERR_GROUP_LOAD_IN_PROGRESS: 14,
/** Coordinator not available */
ERR_COORDINATOR_NOT_AVAILABLE: 15,
/** Group coordinator not available */
ERR_GROUP_COORDINATOR_NOT_AVAILABLE: 15,
/** Not coordinator */
ERR_NOT_COORDINATOR: 16,
/** Not coordinator for group */
ERR_NOT_COORDINATOR_FOR_GROUP: 16,
/** Invalid topic */
ERR_TOPIC_EXCEPTION: 17,
/** Message batch larger than configured server segment size */
ERR_RECORD_LIST_TOO_LARGE: 18,
/** Not enough in-sync replicas */
ERR_NOT_ENOUGH_REPLICAS: 19,
/** Message(s) written to insufficient number of in-sync replicas */
ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND: 20,
/** Invalid required acks value */
ERR_INVALID_REQUIRED_ACKS: 21,
/** Specified group generation id is not valid */
ERR_ILLEGAL_GENERATION: 22,
/** Inconsistent group protocol */
ERR_INCONSISTENT_GROUP_PROTOCOL: 23,
/** Invalid group.id */
ERR_INVALID_GROUP_ID: 24,
/** Unknown member */
ERR_UNKNOWN_MEMBER_ID: 25,
/** Invalid session timeout */
ERR_INVALID_SESSION_TIMEOUT: 26,
/** Group rebalance in progress */
ERR_REBALANCE_IN_PROGRESS: 27,
/** Commit offset data size is not valid */
ERR_INVALID_COMMIT_OFFSET_SIZE: 28,
/** Topic authorization failed */
ERR_TOPIC_AUTHORIZATION_FAILED: 29,
/** Group authorization failed */
ERR_GROUP_AUTHORIZATION_FAILED: 30,
/** Cluster authorization failed */
ERR_CLUSTER_AUTHORIZATION_FAILED: 31,
/** Invalid timestamp */
ERR_INVALID_TIMESTAMP: 32,
/** Unsupported SASL mechanism */
ERR_UNSUPPORTED_SASL_MECHANISM: 33,
/** Illegal SASL state */
ERR_ILLEGAL_SASL_STATE: 34,
/** Unuspported version */
ERR_UNSUPPORTED_VERSION: 35,
/** Topic already exists */
ERR_TOPIC_ALREADY_EXISTS: 36,
/** Invalid number of partitions */
ERR_INVALID_PARTITIONS: 37,
/** Invalid replication factor */
ERR_INVALID_REPLICATION_FACTOR: 38,
/** Invalid replica assignment */
ERR_INVALID_REPLICA_ASSIGNMENT: 39,
/** Invalid config */
ERR_INVALID_CONFIG: 40,
/** Not controller for cluster */
ERR_NOT_CONTROLLER: 41,
/** Invalid request */
ERR_INVALID_REQUEST: 42,
/** Message format on broker does not support request */
ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT: 43,
/** Policy violation */
ERR_POLICY_VIOLATION: 44,
/** Broker received an out of order sequence number */
ERR_OUT_OF_ORDER_SEQUENCE_NUMBER: 45,
/** Broker received a duplicate sequence number */
ERR_DUPLICATE_SEQUENCE_NUMBER: 46,
/** Producer attempted an operation with an old epoch */
ERR_INVALID_PRODUCER_EPOCH: 47,
/** Producer attempted a transactional operation in an invalid state */
ERR_INVALID_TXN_STATE: 48,
/** Producer attempted to use a producer id which is not
* currently assigned to its transactional id */
ERR_INVALID_PRODUCER_ID_MAPPING: 49,
/** Transaction timeout is larger than the maximum
* value allowed by the broker's max.transaction.timeout.ms */
ERR_INVALID_TRANSACTION_TIMEOUT: 50,
/** Producer attempted to update a transaction while another
* concurrent operation on the same transaction was ongoing */
ERR_CONCURRENT_TRANSACTIONS: 51,
/** Indicates that the transaction coordinator sending a
* WriteTxnMarker is no longer the current coordinator for a
* given producer */
ERR_TRANSACTION_COORDINATOR_FENCED: 52,
/** Transactional Id authorization failed */
ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED: 53,
/** Security features are disabled */
ERR_SECURITY_DISABLED: 54,
/** Operation not attempted */
ERR_OPERATION_NOT_ATTEMPTED: 55,
/** Disk error when trying to access log file on the disk */
ERR_KAFKA_STORAGE_ERROR: 56,
/** The user-specified log directory is not found in the broker config */
ERR_LOG_DIR_NOT_FOUND: 57,
/** SASL Authentication failed */
ERR_SASL_AUTHENTICATION_FAILED: 58,
/** Unknown Producer Id */
ERR_UNKNOWN_PRODUCER_ID: 59,
/** Partition reassignment is in progress */
ERR_REASSIGNMENT_IN_PROGRESS: 60,
/** Delegation Token feature is not enabled */
ERR_DELEGATION_TOKEN_AUTH_DISABLED: 61,
/** Delegation Token is not found on server */
ERR_DELEGATION_TOKEN_NOT_FOUND: 62,
/** Specified Principal is not valid Owner/Renewer */
ERR_DELEGATION_TOKEN_OWNER_MISMATCH: 63,
/** Delegation Token requests are not allowed on this connection */
ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED: 64,
/** Delegation Token authorization failed */
ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED: 65,
/** Delegation Token is expired */
ERR_DELEGATION_TOKEN_EXPIRED: 66,
/** Supplied principalType is not supported */
ERR_INVALID_PRINCIPAL_TYPE: 67,
/** The group is not empty */
ERR_NON_EMPTY_GROUP: 68,
/** The group id does not exist */
ERR_GROUP_ID_NOT_FOUND: 69,
/** The fetch session ID was not found */
ERR_FETCH_SESSION_ID_NOT_FOUND: 70,
/** The fetch session epoch is invalid */
ERR_INVALID_FETCH_SESSION_EPOCH: 71,
/** No matching listener */
ERR_LISTENER_NOT_FOUND: 72,
/** Topic deletion is disabled */
ERR_TOPIC_DELETION_DISABLED: 73,
/** Leader epoch is older than broker epoch */
ERR_FENCED_LEADER_EPOCH: 74,
/** Leader epoch is newer than broker epoch */
ERR_UNKNOWN_LEADER_EPOCH: 75,
/** Unsupported compression type */
ERR_UNSUPPORTED_COMPRESSION_TYPE: 76,
/** Broker epoch has changed */
ERR_STALE_BROKER_EPOCH: 77,
/** Leader high watermark is not caught up */
ERR_OFFSET_NOT_AVAILABLE: 78,
/** Group member needs a valid member ID */
ERR_MEMBER_ID_REQUIRED: 79,
/** Preferred leader was not available */
ERR_PREFERRED_LEADER_NOT_AVAILABLE: 80,
/** Consumer group has reached maximum size */
ERR_GROUP_MAX_SIZE_REACHED: 81,
/** Static consumer fenced by other consumer with same
* group.instance.id. */
ERR_FENCED_INSTANCE_ID: 82,
/** Eligible partition leaders are not available */
ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE: 83,
/** Leader election not needed for topic partition */
ERR_ELECTION_NOT_NEEDED: 84,
/** No partition reassignment is in progress */
ERR_NO_REASSIGNMENT_IN_PROGRESS: 85,
/** Deleting offsets of a topic while the consumer group is
* subscribed to it */
ERR_GROUP_SUBSCRIBED_TO_TOPIC: 86,
/** Broker failed to validate record */
ERR_INVALID_RECORD: 87,
/** There are unstable offsets that need to be cleared */
ERR_UNSTABLE_OFFSET_COMMIT: 88,
/** Throttling quota has been exceeded */
ERR_THROTTLING_QUOTA_EXCEEDED: 89,
/** There is a newer producer with the same transactionalId
* which fences the current one */
ERR_PRODUCER_FENCED: 90,
/** Request illegally referred to resource that does not exist */
ERR_RESOURCE_NOT_FOUND: 91,
/** Request illegally referred to the same resource twice */
ERR_DUPLICATE_RESOURCE: 92,
/** Requested credential would not meet criteria for acceptability */
ERR_UNACCEPTABLE_CREDENTIAL: 93,
/** Indicates that the either the sender or recipient of a
* voter-only request is not one of the expected voters */
ERR_INCONSISTENT_VOTER_SET: 94,
/** Invalid update version */
ERR_INVALID_UPDATE_VERSION: 95,
/** Unable to update finalized features due to server error */
ERR_FEATURE_UPDATE_FAILED: 96,
/** Request principal deserialization failed during forwarding */
ERR_PRINCIPAL_DESERIALIZATION_FAILURE: 97
};
/**
* Representation of a librdkafka error
*
* This can be created by giving either another error
* to piggy-back on. In this situation it tries to parse
* the error string to figure out the intent. However, more usually,
* it is constructed by an error object created by a C++ Baton.
*
* @param {object|error} e - An object or error to wrap
* @property {string} message - The error message
* @property {number} code - The error code.
* @property {string} origin - The origin, whether it is local or remote
* @constructor
*/
function LibrdKafkaError(e) {
if (!(this instanceof LibrdKafkaError)) {
return new LibrdKafkaError(e);
}
if (typeof e === 'number') {
this.message = librdkafka.err2str(e);
this.code = e;
this.errno = e;
if (e >= LibrdKafkaError.codes.ERR__END) {
this.origin = 'local';
} else {
this.origin = 'kafka';
}
Error.captureStackTrace(this, this.constructor);
} else if (!util.isError(e)) {
// This is the better way
this.message = e.message;
this.code = e.code;
this.errno = e.code;
if (e.code >= LibrdKafkaError.codes.ERR__END) {
this.origin = 'local';
} else {
this.origin = 'kafka';
}
Error.captureStackTrace(this, this.constructor);
} else {
var message = e.message;
var parsedMessage = message.split(': ');
var origin, msg;
if (parsedMessage.length > 1) {
origin = parsedMessage[0].toLowerCase();
msg = parsedMessage[1].toLowerCase();
} else {
origin = 'unknown';
msg = message.toLowerCase();
}
// special cases
if (msg === 'consumer is disconnected' || msg === 'producer is disconnected') {
this.origin = 'local';
this.code = LibrdKafkaError.codes.ERR__STATE;
this.errno = this.code;
this.message = msg;
} else {
this.origin = origin;
this.message = msg;
this.code = typeof e.code === 'number' ? e.code : -1;
this.errno = this.code;
this.stack = e.stack;
}
}
if (e.hasOwnProperty('isFatal')) this.isFatal = e.isFatal;
if (e.hasOwnProperty('isRetriable')) this.isRetriable = e.isRetriable;
if (e.hasOwnProperty('isTxnRequiresAbort')) this.isTxnRequiresAbort = e.isTxnRequiresAbort;
}
function createLibrdkafkaError(e) {
return new LibrdKafkaError(e);
}
function errorWrap(errorCode, intIsError) {
var returnValue = true;
if (intIsError) {
returnValue = errorCode;
errorCode = typeof errorCode === 'number' ? errorCode : 0;
}
if (errorCode !== LibrdKafkaError.codes.ERR_NO_ERROR) {
var e = LibrdKafkaError.create(errorCode);
throw e;
}
return returnValue;
}