/** * Module dependencies */ import TopicAliasRecv from './topic-alias-recv' import mqttPacket, { IAuthPacket, IConnackPacket, IDisconnectPacket, IPublishPacket, ISubscribePacket, ISubscription, IUnsubscribePacket, Packet, QoS, ISubackPacket, IConnectPacket, } from 'mqtt-packet' import DefaultMessageIdProvider, { IMessageIdProvider, } from './default-message-id-provider' import { DuplexOptions, Writable } from 'readable-stream' import clone from 'rfdc/default' import * as validations from './validations' import _debug from 'debug' import Store, { IStore } from './store' import handlePacket from './handlers' import { ClientOptions } from 'ws' import { ClientRequestArgs } from 'http' import { DoneCallback, ErrorWithReasonCode, GenericCallback, IStream, MQTTJS_VERSION, StreamBuilder, TimerVariant, VoidCallback, nextTick, } from './shared' import TopicAliasSend from './topic-alias-send' import { TypedEventEmitter } from './TypedEmitter' import KeepaliveManager from './KeepaliveManager' import isBrowser, { isWebWorker } from './is-browser' import { Timer } from './get-timer' const setImmediate = globalThis.setImmediate || (((...args: any[]) => { const callback = args.shift() nextTick(() => { callback(...args) }) }) as typeof globalThis.setImmediate) const defaultConnectOptions: IClientOptions = { keepalive: 60, reschedulePings: true, protocolId: 'MQTT', protocolVersion: 4, reconnectPeriod: 1000, connectTimeout: 30 * 1000, clean: true, resubscribe: true, writeCache: true, timerVariant: 'auto', } export type BaseMqttProtocol = | 'wss' | 'ws' | 'mqtt' | 'mqtts' | 'tcp' | 'ssl' | 'wx' | 'wxs' | 'ali' | 'alis' // create a type that allows all MqttProtocol + `+unix` string export type MqttProtocolWithUnix = `${BaseMqttProtocol}+unix` export type MqttProtocol = BaseMqttProtocol | MqttProtocolWithUnix export type StorePutCallback = () => void export interface ISecureClientOptions { /** * optional private keys in PEM format */ key?: string | string[] | Buffer | Buffer[] | any[] keyPath?: string /** * optional cert chains in PEM format */ cert?: string | string[] | Buffer | Buffer[] certPath?: string /** * Optionally override the trusted CA certificates in PEM format */ ca?: string | string[] | Buffer | Buffer[] caPaths?: string | string[] rejectUnauthorized?: boolean /** * optional alpn's */ ALPNProtocols?: string[] | Buffer[] | Uint8Array[] | Buffer | Uint8Array } export type AckHandler = ( topic: string, message: Buffer, packet: any, cb: (error: Error | number, code?: number) => void, ) => void export interface IClientOptions extends ISecureClientOptions { /** CLIENT PROPERTIES */ /** Encoding to use. Example 'binary' */ encoding?: BufferEncoding /** Set browser buffer size. Default to 512KB */ browserBufferSize?: number /** used in ws protocol to set `objectMode` */ binary?: boolean /** Used on ali protocol */ my?: any /** Manually call `connect` after creating client instance */ manualConnect?: boolean /** Custom auth packet properties */ authPacket?: Partial /** Disable/Enable writeToStream.cacheNumbers */ writeCache?: boolean /** Should be set to `host` */ servername?: string /** The default protocol to use when using `servers` and no protocol is specified */ defaultProtocol?: MqttProtocol /** Support clientId passed in the query string of the url */ query?: Record /** Auth string in the format : */ auth?: string /** Custom ack handler */ customHandleAcks?: AckHandler /** Broker port */ port?: number /** Broker host. Does NOT include port */ host?: string /** @deprecated use `host instead */ hostname?: string /** Set to true if the connection is to a unix socket */ unixSocket?: boolean /** Websocket `path` added as suffix or Unix socket path when `unixSocket` option is true */ path?: string /** The `MqttProtocol` to use */ protocol?: MqttProtocol /** Websocket options */ wsOptions?: ClientOptions | ClientRequestArgs | DuplexOptions /** * 1000 milliseconds, interval between two reconnections */ reconnectPeriod?: number /** * Set to true to enable the reconnect period to apply if the initial * connection is denied with an error in the CONNACK packet, such as with an * authentication error. */ reconnectOnConnackError?: boolean /** * 30 * 1000 milliseconds, time to wait before a CONNACK is received */ connectTimeout?: number /** * a Store for the incoming packets */ incomingStore?: IStore /** * a Store for the outgoing packets */ outgoingStore?: IStore /** Enable/Disable queue for QoS 0 packets */ queueQoSZero?: boolean /** Custom log function, default uses `debug` */ log?: (...args: any[]) => void /** automatically use topic alias */ autoUseTopicAlias?: boolean /** automatically assign topic alias */ autoAssignTopicAlias?: boolean /** Set to false to disable ping reschedule. When enabled ping messages are rescheduled on each message sent */ reschedulePings?: boolean /** List of broker servers. On each reconnect try the next server will be used */ servers?: Array<{ host: string port: number protocol?: | 'wss' | 'ws' | 'mqtt' | 'mqtts' | 'tcp' | 'ssl' | 'wx' | 'wxs' }> /** * true, set to false to disable re-subscribe functionality */ resubscribe?: boolean /** when defined this function will be called to transform the url string generated by MqttClient from provided options */ transformWsUrl?: ( url: string, options: IClientOptions, client: MqttClient, ) => string /** when defined this function will be called to create the Websocket instance, used to add custom protocols or websocket implementations */ createWebsocket?: ( url: string, websocketSubProtocols: string[], options: IClientOptions, ) => any /** Custom message id provider */ messageIdProvider?: IMessageIdProvider /** When using websockets, this is the timeout used when writing to socket. Default 1000 (1s) */ browserBufferTimeout?: number /** * When using websockets, this sets the `objectMode` option. * When in objectMode, streams can push Strings and Buffers * as well as any other JavaScript object. * Another major difference is that when in objectMode, * the internal buffering algorithm counts objects rather than bytes. * This means if we have a Transform stream with the highWaterMark option set to 5, * the stream will only buffer a maximum of 5 objects internally */ objectMode?: boolean /** CONNECT PACKET PROPERTIES */ /** * 'mqttjs_' + Math.random().toString(16).substr(2, 8) */ clientId?: string /** * 3=MQTT 3.1 4=MQTT 3.1.1 5=MQTT 5.0. Defaults to 4 */ protocolVersion?: IConnectPacket['protocolVersion'] /** * 'MQTT' */ protocolId?: IConnectPacket['protocolId'] /** * true, set to false to receive QoS 1 and 2 messages while offline */ clean?: boolean /** * 60 seconds, set to 0 to disable */ keepalive?: number /** * the username required by your broker, if any */ username?: string /** * the password required by your broker, if any */ password?: Buffer | string /** * a message that will sent by the broker automatically when the client disconnect badly. */ will?: IConnectPacket['will'] /** see `connect` packet: https://github.com/mqttjs/mqtt-packet/blob/master/types/index.d.ts#L65 */ properties?: IConnectPacket['properties'] /** * @description 'auto', set to 'native' or 'worker' if you're having issues with 'auto' detection * or pass a custom timer object */ timerVariant?: TimerVariant | Timer /** * false, set to true to force the use of native WebSocket if you're having issues with the detection */ forceNativeWebSocket?: boolean } export interface IClientPublishOptions { /** * the QoS */ qos?: QoS /** * the retain flag */ retain?: boolean /** * whether or not mark a message as duplicate */ dup?: boolean /* * MQTT 5.0 properties object */ properties?: IPublishPacket['properties'] /** * callback called when message is put into `outgoingStore` */ cbStorePut?: StorePutCallback } export interface IClientReconnectOptions { /** * a Store for the incoming packets */ incomingStore?: Store /** * a Store for the outgoing packets */ outgoingStore?: Store } export interface IClientSubscribeProperties { /* * MQTT 5.0 properties object of subscribe * */ properties?: ISubscribePacket['properties'] } export interface IClientSubscribeOptions extends IClientSubscribeProperties { /** * the QoS */ qos: QoS /* * no local flag * */ nl?: boolean /* * Retain As Published flag * */ rap?: boolean /* * Retain Handling option * */ rh?: number } export interface ISubscriptionRequest extends IClientSubscribeOptions { /** * is a subscribed to topic */ topic: string } export interface ISubscriptionGrant extends Omit { /** * is the granted qos level on it, may return 128 on error */ qos: QoS | 128 } export type ISubscriptionMap = { /** * object which has topic names as object keys and as value the options, like {'test1': {qos: 0}, 'test2': {qos: 2}}. */ [topic: string]: IClientSubscribeOptions } & { resubscribe?: boolean } export interface IClientUnsubscribeProperties { /* * MQTT 5.0 properties object for unsubscribe * */ properties?: IUnsubscribePacket['properties'] } export { IConnackPacket, IDisconnectPacket, IPublishPacket, Packet } export type OnConnectCallback = (packet: IConnackPacket) => void export type OnDisconnectCallback = (packet: IDisconnectPacket) => void export type ClientSubscribeCallback = ( err: Error | null, granted?: ISubscriptionGrant[], packet?: ISubackPacket, ) => void export type OnMessageCallback = ( topic: string, payload: Buffer, packet: IPublishPacket, ) => void export type OnPacketCallback = (packet: Packet) => void export type OnCloseCallback = () => void export type OnErrorCallback = (error: Error | ErrorWithReasonCode) => void export type PacketCallback = ( error?: Error | ErrorWithReasonCode, packet?: Packet, ) => any export type CloseCallback = (error?: Error) => void export interface MqttClientEventCallbacks { connect: OnConnectCallback message: OnMessageCallback packetsend: OnPacketCallback packetreceive: OnPacketCallback disconnect: OnDisconnectCallback error: OnErrorCallback close: OnCloseCallback end: VoidCallback reconnect: VoidCallback offline: VoidCallback outgoingEmpty: VoidCallback } /** * MqttClient constructor * * @param {Stream} stream - stream * @param {Object} [options] - connection options * (see Connection#connect) */ export default class MqttClient extends TypedEventEmitter { public static VERSION = MQTTJS_VERSION /** Public fields */ /** It's true when client is connected to broker */ public connected: boolean public disconnecting: boolean public disconnected: boolean public reconnecting: boolean public incomingStore: IStore public outgoingStore: IStore public options: IClientOptions public queueQoSZero: boolean public _reconnectCount: number public log: (...args: any[]) => void public messageIdProvider: IMessageIdProvider public outgoing: Record< number, { volatile: boolean; cb: (err: Error, packet?: Packet) => void } > public messageIdToTopic: Record public noop: (error?: any) => void public keepaliveManager: KeepaliveManager /** * The connection to the Broker. In browsers env this also have `socket` property * set to the `WebSocket` instance. */ public stream: IStream public queue: { packet: Packet; cb: PacketCallback }[] /* Private fields */ /** Function used to build the stream */ private streamBuilder: StreamBuilder private _resubscribeTopics: ISubscriptionMap private connackTimer: NodeJS.Timeout private reconnectTimer: NodeJS.Timeout private _storeProcessing: boolean /** keep a reference of packets that have been successfully processed from outgoing store */ private _packetIdsDuringStoreProcessing: Record private _storeProcessingQueue: { invoke: () => any cbStorePut?: DoneCallback callback: GenericCallback }[] private _firstConnection: boolean private topicAliasRecv: TopicAliasRecv private topicAliasSend: TopicAliasSend private _deferredReconnect: () => void private connackPacket: IConnackPacket public static defaultId() { return `mqttjs_${Math.random().toString(16).substr(2, 8)}` } constructor(streamBuilder: StreamBuilder, options: IClientOptions) { super() this.options = options || {} // Defaults for (const k in defaultConnectOptions) { if (typeof this.options[k] === 'undefined') { this.options[k] = defaultConnectOptions[k] } else { this.options[k] = options[k] } } this.log = this.options.log || _debug('mqttjs:client') this.noop = this._noop.bind(this) this.log('MqttClient :: version:', MqttClient.VERSION) if (isWebWorker) { this.log('MqttClient :: environment', 'webworker') } else { this.log( 'MqttClient :: environment', isBrowser ? 'browser' : 'node', ) } this.log('MqttClient :: options.protocol', options.protocol) this.log( 'MqttClient :: options.protocolVersion', options.protocolVersion, ) this.log('MqttClient :: options.username', options.username) this.log('MqttClient :: options.keepalive', options.keepalive) this.log( 'MqttClient :: options.reconnectPeriod', options.reconnectPeriod, ) this.log( 'MqttClient :: options.rejectUnauthorized', options.rejectUnauthorized, ) this.log( 'MqttClient :: options.properties.topicAliasMaximum', options.properties ? options.properties.topicAliasMaximum : undefined, ) this.options.clientId = typeof options.clientId === 'string' ? options.clientId : MqttClient.defaultId() this.log('MqttClient :: clientId', this.options.clientId) this.options.customHandleAcks = options.protocolVersion === 5 && options.customHandleAcks ? options.customHandleAcks : (...args) => { args[3](null, 0) } // Disable pre-generated write cache if requested. Will allocate buffers on-the-fly instead. WARNING: This can affect write performance if (!this.options.writeCache) { mqttPacket.writeToStream.cacheNumbers = false } this.streamBuilder = streamBuilder this.messageIdProvider = typeof this.options.messageIdProvider === 'undefined' ? new DefaultMessageIdProvider() : this.options.messageIdProvider // Inflight message storages this.outgoingStore = options.outgoingStore || new Store() this.incomingStore = options.incomingStore || new Store() // Should QoS zero messages be queued when the connection is broken? this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero // map of subscribed topics to support reconnection this._resubscribeTopics = {} // map of a subscribe messageId and a topic this.messageIdToTopic = {} // Keepalive manager, setup in _setupKeepaliveManager this.keepaliveManager = null // Is the client connected? this.connected = false // Are we disconnecting? this.disconnecting = false // Are we reconnecting? this.reconnecting = false // Packet queue this.queue = [] // connack timer this.connackTimer = null // Reconnect timer this.reconnectTimer = null // Is processing store? this._storeProcessing = false // Packet Ids are put into the store during store processing this._packetIdsDuringStoreProcessing = {} // Store processing queue this._storeProcessingQueue = [] // Inflight callbacks this.outgoing = {} // True if connection is first time. this._firstConnection = true if (options.properties && options.properties.topicAliasMaximum > 0) { if (options.properties.topicAliasMaximum > 0xffff) { this.log( 'MqttClient :: options.properties.topicAliasMaximum is out of range', ) } else { this.topicAliasRecv = new TopicAliasRecv( options.properties.topicAliasMaximum, ) } } // Send queued packets this.on('connect', () => { const { queue } = this const deliver = () => { const entry = queue.shift() this.log('deliver :: entry %o', entry) let packet = null if (!entry) { this._resubscribe() return } packet = entry.packet this.log('deliver :: call _sendPacket for %o', packet) let send = true if (packet.messageId && packet.messageId !== 0) { if (!this.messageIdProvider.register(packet.messageId)) { send = false } } if (send) { this._sendPacket(packet, (err) => { if (entry.cb) { entry.cb(err) } deliver() }) } else { this.log( 'messageId: %d has already used. The message is skipped and removed.', packet.messageId, ) deliver() } } this.log('connect :: sending queued packets') deliver() }) this.on('close', () => { this.log('close :: connected set to `false`') this.connected = false this.log('close :: clearing connackTimer') clearTimeout(this.connackTimer) this._destroyKeepaliveManager() if (this.topicAliasRecv) { this.topicAliasRecv.clear() } this.log('close :: calling _setupReconnect') this._setupReconnect() }) if (!this.options.manualConnect) { this.log('MqttClient :: setting up stream') this.connect() } } /** * @param packet the packet received by the broker * @return the auth packet to be returned to the broker * @api public */ public handleAuth(packet: IAuthPacket, callback: PacketCallback) { callback() } /** * Handle messages with backpressure support, one at a time. * Override at will. * * @param Packet packet the packet * @param Function callback call when finished * @api public */ public handleMessage(packet: IPublishPacket, callback: DoneCallback) { callback() } /** * _nextId * @return unsigned int */ private _nextId() { return this.messageIdProvider.allocate() } /** * getLastMessageId * @return unsigned int */ public getLastMessageId() { return this.messageIdProvider.getLastAllocated() } /** * Setup the event handlers in the inner stream, sends `connect` and `auth` packets */ public connect() { const writable = new Writable() const parser = mqttPacket.parser(this.options) let completeParse = null const packets = [] this.log('connect :: calling method to clear reconnect') this._clearReconnect() if (this.disconnected && !this.reconnecting) { this.incomingStore = this.options.incomingStore || new Store() this.outgoingStore = this.options.outgoingStore || new Store() this.disconnecting = false this.disconnected = false } this.log( 'connect :: using streamBuilder provided to client to create stream', ) this.stream = this.streamBuilder(this) parser.on('packet', (packet) => { this.log('parser :: on packet push to packets array.') packets.push(packet) }) const work = () => { this.log('work :: getting next packet in queue') const packet = packets.shift() if (packet) { this.log('work :: packet pulled from queue') handlePacket(this, packet, nextTickWork) } else { this.log('work :: no packets in queue') const done = completeParse completeParse = null this.log('work :: done flag is %s', !!done) if (done) done() } } const nextTickWork = () => { if (packets.length) { nextTick(work) } else { const done = completeParse completeParse = null done() } } writable._write = (buf, enc, done) => { completeParse = done this.log('writable stream :: parsing buffer') parser.parse(buf) work() } const streamErrorHandler = (error) => { this.log('streamErrorHandler :: error', error.message) // error.code will only be set on NodeJS env, browser don't allow to detect errors on sockets // also emitting errors on browsers seems to create issues if (error.code) { // handle error this.log('streamErrorHandler :: emitting error') this.emit('error', error) } else { this.noop(error) } } this.log('connect :: pipe stream to writable stream') this.stream.pipe(writable) // Suppress connection errors this.stream.on('error', streamErrorHandler) // Echo stream close this.stream.on('close', () => { this.log('(%s)stream :: on close', this.options.clientId) this._flushVolatile() this.log('stream: emit close to MqttClient') this.emit('close') }) // Send a connect packet this.log('connect: sending packet `connect`') const connectPacket: IConnectPacket = { cmd: 'connect', protocolId: this.options.protocolId, protocolVersion: this.options.protocolVersion, clean: this.options.clean, clientId: this.options.clientId, keepalive: this.options.keepalive, username: this.options.username, password: this.options.password as Buffer, properties: this.options.properties, } if (this.options.will) { connectPacket.will = { ...this.options.will, payload: this.options.will?.payload as Buffer, } } if (this.topicAliasRecv) { if (!connectPacket.properties) { connectPacket.properties = {} } if (this.topicAliasRecv) { connectPacket.properties.topicAliasMaximum = this.topicAliasRecv.max } } // avoid message queue this._writePacket(connectPacket) // Echo connection errors parser.on('error', this.emit.bind(this, 'error')) // auth if (this.options.properties) { if ( !this.options.properties.authenticationMethod && this.options.properties.authenticationData ) { this.end(() => this.emit( 'error', new Error('Packet has no Authentication Method'), ), ) return this } if ( this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object' ) { const authPacket: IAuthPacket = { cmd: 'auth', reasonCode: 0, ...this.options.authPacket, } this._writePacket(authPacket) } } // many drain listeners are needed for qos 1 callbacks if the connection is intermittent this.stream.setMaxListeners(1000) clearTimeout(this.connackTimer) this.connackTimer = setTimeout(() => { this.log( '!!connectTimeout hit!! Calling _cleanUp with force `true`', ) this.emit('error', new Error('connack timeout')) this._cleanUp(true) }, this.options.connectTimeout) return this } /** * publish - publish to * * @param {String} topic - topic to publish to * @param {String, Buffer} message - message to publish * @param {Object} [opts] - publish options, includes: * {Number} qos - qos level to publish on * {Boolean} retain - whether or not to retain the message * {Boolean} dup - whether or not mark a message as duplicate * {Function} cbStorePut - function(){} called when message is put into `outgoingStore` * @param {Function} [callback] - function(err){} * called when publish succeeds or fails * @returns {MqttClient} this - for chaining * @api public * * @example client.publish('topic', 'message'); * @example * client.publish('topic', 'message', {qos: 1, retain: true, dup: true}); * @example client.publish('topic', 'message', console.log); */ public publish(topic: string, message: string | Buffer): MqttClient public publish( topic: string, message: string | Buffer, callback?: PacketCallback, ): MqttClient public publish( topic: string, message: string | Buffer, opts?: IClientPublishOptions, callback?: PacketCallback, ): MqttClient public publish( topic: string, message: string | Buffer, opts?: IClientPublishOptions | DoneCallback, callback?: PacketCallback, ): MqttClient { this.log('publish :: message `%s` to topic `%s`', message, topic) const { options } = this // .publish(topic, payload, cb); if (typeof opts === 'function') { callback = opts as DoneCallback opts = null } opts = opts || {} // default opts const defaultOpts: IClientPublishOptions = { qos: 0, retain: false, dup: false, } opts = { ...defaultOpts, ...opts } const { qos, retain, dup, properties, cbStorePut } = opts if (this._checkDisconnecting(callback)) { return this } const publishProc = () => { let messageId = 0 if (qos === 1 || qos === 2) { messageId = this._nextId() if (messageId === null) { this.log('No messageId left') return false } } const packet: IPublishPacket = { cmd: 'publish', topic, payload: message, qos, retain, messageId, dup, } if (options.protocolVersion === 5) { packet.properties = properties } this.log('publish :: qos', qos) switch (qos) { case 1: case 2: // Add to callbacks this.outgoing[packet.messageId] = { volatile: false, cb: callback || this.noop, } this.log('MqttClient:publish: packet cmd: %s', packet.cmd) this._sendPacket(packet, undefined, cbStorePut) break default: this.log('MqttClient:publish: packet cmd: %s', packet.cmd) this._sendPacket(packet, callback, cbStorePut) break } return true } if ( this._storeProcessing || this._storeProcessingQueue.length > 0 || !publishProc() ) { this._storeProcessingQueue.push({ invoke: publishProc, cbStorePut: opts.cbStorePut, callback, }) } return this } public publishAsync( topic: string, message: string | Buffer, ): Promise public publishAsync( topic: string, message: string | Buffer, opts?: IClientPublishOptions, ): Promise public publishAsync( topic: string, message: string | Buffer, opts?: IClientPublishOptions, ): Promise { return new Promise((resolve, reject) => { this.publish(topic, message, opts, (err, packet) => { if (err) { reject(err) } else { resolve(packet) } }) }) } /** * subscribe - subscribe to * * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos} * @param {Object} [opts] - optional subscription options, includes: * {Number} qos - subscribe qos level * @param {Function} [callback] - function(err, granted){} where: * {Error} err - subscription error (none at the moment!) * {Array} granted - array of {topic: 't', qos: 0} * @returns {MqttClient} this - for chaining * @api public * @example client.subscribe('topic'); * @example client.subscribe('topic', {qos: 1}); * @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log); * @example client.subscribe('topic', console.log); */ public subscribe( topicObject: string | string[] | ISubscriptionMap, ): MqttClient public subscribe( topicObject: string | string[] | ISubscriptionMap, callback?: ClientSubscribeCallback, ): MqttClient public subscribe( topicObject: string | string[] | ISubscriptionMap, opts?: IClientSubscribeOptions | IClientSubscribeProperties, ): MqttClient public subscribe( topicObject: string | string[] | ISubscriptionMap, opts?: IClientSubscribeOptions | IClientSubscribeProperties, callback?: ClientSubscribeCallback, ): MqttClient public subscribe( topicObject: string | string[] | ISubscriptionMap, opts?: | IClientSubscribeOptions | IClientSubscribeProperties | ClientSubscribeCallback, callback?: ClientSubscribeCallback, ): MqttClient { const version = this.options.protocolVersion if (typeof opts === 'function') { callback = opts } callback = callback || this.noop // force re-subscribe on reconnect. This is only true // when provided `topicObject` is `this._resubscribeTopics` let resubscribe = false let topicsList = [] if (typeof topicObject === 'string') { topicObject = [topicObject] topicsList = topicObject } else if (Array.isArray(topicObject)) { topicsList = topicObject } else if (typeof topicObject === 'object') { resubscribe = topicObject.resubscribe delete topicObject.resubscribe topicsList = Object.keys(topicObject) } // validate topics const invalidTopic = validations.validateTopics(topicsList) if (invalidTopic !== null) { setImmediate(callback, new Error(`Invalid topic ${invalidTopic}`)) return this } if (this._checkDisconnecting(callback)) { this.log('subscribe: discconecting true') return this } const defaultOpts: Partial = { qos: 0, } if (version === 5) { defaultOpts.nl = false defaultOpts.rap = false defaultOpts.rh = 0 } opts = { ...defaultOpts, ...opts } as IClientSubscribeOptions const properties = opts.properties const subs: ISubscriptionRequest[] = [] const parseSub = ( topic: string, subOptions?: IClientSubscribeOptions, ) => { // subOptions is defined only when providing a subs map, use opts otherwise subOptions = (subOptions || opts) as IClientSubscribeOptions if ( !Object.prototype.hasOwnProperty.call( this._resubscribeTopics, topic, ) || this._resubscribeTopics[topic].qos < subOptions.qos || resubscribe ) { const currentOpts: ISubscription & IClientSubscribeProperties = { topic, qos: subOptions.qos, } if (version === 5) { currentOpts.nl = subOptions.nl currentOpts.rap = subOptions.rap currentOpts.rh = subOptions.rh // use opts.properties currentOpts.properties = properties } this.log( 'subscribe: pushing topic `%s` and qos `%s` to subs list', currentOpts.topic, currentOpts.qos, ) subs.push(currentOpts) } } if (Array.isArray(topicObject)) { // array of topics topicObject.forEach((topic) => { this.log('subscribe: array topic %s', topic) parseSub(topic) }) } else { // object topic --> subOptions (no properties) Object.keys(topicObject).forEach((topic) => { this.log( 'subscribe: object topic %s, %o', topic, topicObject[topic], ) parseSub(topic, topicObject[topic]) }) } if (!subs.length) { callback(null, []) return this } const subscribeProc = () => { const messageId = this._nextId() if (messageId === null) { this.log('No messageId left') return false } const packet: ISubscribePacket = { cmd: 'subscribe', subscriptions: subs, // qos: 1, // retain: false, // dup: false, messageId, } if (properties) { packet.properties = properties } // subscriptions to resubscribe to in case of disconnect if (this.options.resubscribe) { this.log('subscribe :: resubscribe true') const topics = [] subs.forEach((sub) => { if (this.options.reconnectPeriod > 0) { const topic: IClientSubscribeOptions = { qos: sub.qos } if (version === 5) { topic.nl = sub.nl || false topic.rap = sub.rap || false topic.rh = sub.rh || 0 topic.properties = sub.properties } this._resubscribeTopics[sub.topic] = topic topics.push(sub.topic) } }) this.messageIdToTopic[packet.messageId] = topics } this.outgoing[packet.messageId] = { volatile: true, cb(err, packet2: ISubackPacket) { if (!err) { const { granted } = packet2 for (let i = 0; i < granted.length; i += 1) { subs[i].qos = granted[i] as QoS } } callback(err, subs, packet2) }, } this.log('subscribe :: call _sendPacket') this._sendPacket(packet) return true } if ( this._storeProcessing || this._storeProcessingQueue.length > 0 || !subscribeProc() ) { this._storeProcessingQueue.push({ invoke: subscribeProc, callback, }) } return this } public subscribeAsync( topicObject: string | string[] | ISubscriptionMap, ): Promise public subscribeAsync( topicObject: string | string[] | ISubscriptionMap, opts?: IClientSubscribeOptions | IClientSubscribeProperties, ): Promise public subscribeAsync( topicObject: string | string[] | ISubscriptionMap, opts?: IClientSubscribeOptions | IClientSubscribeProperties, ): Promise { return new Promise((resolve, reject) => { this.subscribe(topicObject, opts, (err, granted) => { if (err) { reject(err) } else { resolve(granted) } }) }) } /** * unsubscribe - unsubscribe from topic(s) * * @param {String, Array} topic - topics to unsubscribe from * @param {Object} [opts] - optional subscription options, includes: * {Object} properties - properties of unsubscribe packet * @param {Function} [callback] - callback fired on unsuback * @returns {MqttClient} this - for chaining * @api public * @example client.unsubscribe('topic'); * @example client.unsubscribe('topic', console.log); */ public unsubscribe(topic: string | string[]): MqttClient public unsubscribe( topic: string | string[], opts?: IClientUnsubscribeProperties, ): MqttClient public unsubscribe( topic: string | string[], callback?: PacketCallback, ): MqttClient public unsubscribe( topic: string | string[], opts?: IClientUnsubscribeProperties, callback?: PacketCallback, ): MqttClient public unsubscribe( topic: string | string[], opts?: IClientUnsubscribeProperties | PacketCallback, callback?: PacketCallback, ): MqttClient { if (typeof topic === 'string') { topic = [topic] } if (typeof opts === 'function') { callback = opts } callback = callback || this.noop const invalidTopic = validations.validateTopics(topic) if (invalidTopic !== null) { setImmediate(callback, new Error(`Invalid topic ${invalidTopic}`)) return this } if (this._checkDisconnecting(callback)) { return this } const unsubscribeProc = () => { const messageId = this._nextId() if (messageId === null) { this.log('No messageId left') return false } const packet: IUnsubscribePacket = { cmd: 'unsubscribe', // qos: 1, messageId, unsubscriptions: [], } if (typeof topic === 'string') { packet.unsubscriptions = [topic] } else if (Array.isArray(topic)) { packet.unsubscriptions = topic } if (this.options.resubscribe) { packet.unsubscriptions.forEach((topic2) => { delete this._resubscribeTopics[topic2] }) } if (typeof opts === 'object' && opts.properties) { packet.properties = opts.properties } this.outgoing[packet.messageId] = { volatile: true, cb: callback, } this.log('unsubscribe: call _sendPacket') this._sendPacket(packet) return true } if ( this._storeProcessing || this._storeProcessingQueue.length > 0 || !unsubscribeProc() ) { this._storeProcessingQueue.push({ invoke: unsubscribeProc, callback, }) } return this } public unsubscribeAsync( topic: string | string[], ): Promise public unsubscribeAsync( topic: string | string[], opts?: IClientUnsubscribeProperties, ): Promise public unsubscribeAsync( topic: string | string[], opts?: IClientUnsubscribeProperties, ): Promise { return new Promise((resolve, reject) => { this.unsubscribe(topic, opts, (err, packet) => { if (err) { reject(err) } else { resolve(packet) } }) }) } /** * end - close connection * * @returns {MqttClient} this - for chaining * @param {Boolean} force - do not wait for all in-flight messages to be acked * @param {Object} opts - added to the disconnect packet * @param {Function} cb - called when the client has been closed * * @api public */ public end(cb?: DoneCallback): MqttClient public end(force?: boolean): MqttClient public end(opts?: Partial, cb?: DoneCallback): MqttClient public end(force?: boolean, cb?: DoneCallback): MqttClient public end( force?: boolean, opts?: Partial, cb?: DoneCallback, ): MqttClient public end( force?: boolean | Partial | DoneCallback, opts?: Partial | DoneCallback, cb?: DoneCallback, ): MqttClient { this.log('end :: (%s)', this.options.clientId) if (force == null || typeof force !== 'boolean') { cb = cb || (opts as DoneCallback) opts = force as Partial force = false } if (typeof opts !== 'object') { cb = cb || opts opts = null } this.log('end :: cb? %s', !!cb) if (!cb || typeof cb !== 'function') { cb = this.noop } const closeStores = () => { this.log('end :: closeStores: closing incoming and outgoing stores') this.disconnected = true this.incomingStore.close((e1) => { this.outgoingStore.close((e2) => { this.log('end :: closeStores: emitting end') this.emit('end') if (cb) { const err = e1 || e2 this.log( 'end :: closeStores: invoking callback with args', ) cb(err) } }) }) if (this._deferredReconnect) { this._deferredReconnect() } else if ( this.options.reconnectPeriod === 0 || this.options.manualConnect ) { this.disconnecting = false } } const finish = () => { // defer closesStores of an I/O cycle, // just to make sure things are // ok for websockets this.log( 'end :: (%s) :: finish :: calling _cleanUp with force %s', this.options.clientId, force, ) this._cleanUp( force, () => { this.log( 'end :: finish :: calling process.nextTick on closeStores', ) // const boundProcess = nextTick.bind(null, closeStores) nextTick(closeStores) }, opts, ) } if (this.disconnecting) { cb() return this } this._clearReconnect() this.disconnecting = true if (!force && Object.keys(this.outgoing).length > 0) { // wait 10ms, just to be sure we received all of it this.log( 'end :: (%s) :: calling finish in 10ms once outgoing is empty', this.options.clientId, ) this.once('outgoingEmpty', setTimeout.bind(null, finish, 10)) } else { this.log( 'end :: (%s) :: immediately calling finish', this.options.clientId, ) finish() } return this } public endAsync(): Promise public endAsync(force?: boolean): Promise public endAsync(opts?: Partial): Promise public endAsync( force?: boolean, opts?: Partial, ): Promise public endAsync( force?: boolean | Partial, opts?: Partial, ): Promise { return new Promise((resolve, reject) => { this.end(force as boolean, opts, (err) => { if (err) { reject(err) } else { resolve() } }) }) } /** * removeOutgoingMessage - remove a message in outgoing store * the outgoing callback will be called withe Error('Message removed') if the message is removed * * @param {Number} messageId - messageId to remove message * @returns {MqttClient} this - for chaining * @api public * * @example client.removeOutgoingMessage(client.getLastAllocated()); */ public removeOutgoingMessage(messageId: number): MqttClient { if (this.outgoing[messageId]) { const { cb } = this.outgoing[messageId] this._removeOutgoingAndStoreMessage(messageId, () => { cb(new Error('Message removed')) }) } return this } /** * reconnect - connect again using the same options as connect() * * @param {Object} [opts] - optional reconnect options, includes: * {Store} incomingStore - a store for the incoming packets * {Store} outgoingStore - a store for the outgoing packets * if opts is not given, current stores are used * @returns {MqttClient} this - for chaining * * @api public */ public reconnect( opts?: Pick, ): MqttClient { this.log('client reconnect') const f = () => { if (opts) { this.options.incomingStore = opts.incomingStore this.options.outgoingStore = opts.outgoingStore } else { this.options.incomingStore = null this.options.outgoingStore = null } this.incomingStore = this.options.incomingStore || new Store() this.outgoingStore = this.options.outgoingStore || new Store() this.disconnecting = false this.disconnected = false this._deferredReconnect = null this._reconnect() } if (this.disconnecting && !this.disconnected) { this._deferredReconnect = f } else { f() } return this } /** * PRIVATE METHODS * ===================== * */ /** * Flush all outgoing messages marked as `volatile` in `outgoing` queue. Volatile messages * typically are subscription and unsubscription requests. */ private _flushVolatile() { if (this.outgoing) { this.log( '_flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function', ) Object.keys(this.outgoing).forEach((messageId) => { if ( this.outgoing[messageId].volatile && typeof this.outgoing[messageId].cb === 'function' ) { this.outgoing[messageId].cb(new Error('Connection closed')) delete this.outgoing[messageId] } }) } } /** * Flush all outgoing messages */ private _flush() { if (this.outgoing) { this.log('_flush: queue exists? %b', !!this.outgoing) Object.keys(this.outgoing).forEach((messageId) => { if (typeof this.outgoing[messageId].cb === 'function') { this.outgoing[messageId].cb(new Error('Connection closed')) // This is suspicious. Why do we only delete this if we have a callback? // If this is by-design, then adding no as callback would cause this to get deleted unintentionally. delete this.outgoing[messageId] } }) } } private _removeTopicAliasAndRecoverTopicName(packet: IPublishPacket) { let alias: number | undefined if (packet.properties) { alias = packet.properties.topicAlias } let topic = packet.topic.toString() this.log( '_removeTopicAliasAndRecoverTopicName :: alias %d, topic %o', alias, topic, ) if (topic.length === 0) { // restore topic from alias if (typeof alias === 'undefined') { return new Error('Unregistered Topic Alias') } topic = this.topicAliasSend.getTopicByAlias(alias) if (typeof topic === 'undefined') { return new Error('Unregistered Topic Alias') } packet.topic = topic } if (alias) { delete packet.properties.topicAlias } } private _checkDisconnecting(callback?: GenericCallback) { if (this.disconnecting) { if (callback && callback !== this.noop) { callback(new Error('client disconnecting')) } else { this.emit('error', new Error('client disconnecting')) } } return this.disconnecting } /** * _reconnect - implement reconnection * @api private */ private _reconnect() { this.log('_reconnect: emitting reconnect to client') this.emit('reconnect') if (this.connected) { this.end(() => { this.connect() }) this.log('client already connected. disconnecting first.') } else { this.log('_reconnect: calling connect') this.connect() } } /** * _setupReconnect - setup reconnect timer */ private _setupReconnect() { if ( !this.disconnecting && !this.reconnectTimer && this.options.reconnectPeriod > 0 ) { if (!this.reconnecting) { this.log('_setupReconnect :: emit `offline` state') this.emit('offline') this.log('_setupReconnect :: set `reconnecting` to `true`') this.reconnecting = true } this.log( '_setupReconnect :: setting reconnectTimer for %d ms', this.options.reconnectPeriod, ) this.reconnectTimer = setInterval(() => { this.log('reconnectTimer :: reconnect triggered!') this._reconnect() }, this.options.reconnectPeriod) } else { this.log('_setupReconnect :: doing nothing...') } } /** * _clearReconnect - clear the reconnect timer */ private _clearReconnect() { this.log('_clearReconnect : clearing reconnect timer') if (this.reconnectTimer) { clearInterval(this.reconnectTimer) this.reconnectTimer = null } } /** * _cleanUp - clean up on connection end * @api private */ private _cleanUp(forced: boolean, done?: DoneCallback, opts = {}) { if (done) { this.log('_cleanUp :: done callback provided for on stream close') this.stream.on('close', done) } this.log('_cleanUp :: forced? %s', forced) if (forced) { if (this.options.reconnectPeriod === 0 && this.options.clean) { this._flush() } this.log( '_cleanUp :: (%s) :: destroying stream', this.options.clientId, ) this.stream.destroy() } else { const packet: IDisconnectPacket = { cmd: 'disconnect', ...opts } this.log( '_cleanUp :: (%s) :: call _sendPacket with disconnect packet', this.options.clientId, ) this._sendPacket(packet, () => { this.log( '_cleanUp :: (%s) :: destroying stream', this.options.clientId, ) setImmediate(() => { this.stream.end(() => { this.log( '_cleanUp :: (%s) :: stream destroyed', this.options.clientId, ) // once stream is closed the 'close' event will fire and that will // emit client `close` event and call `done` callback if done is provided }) }) }) } if (!this.disconnecting && !this.reconnecting) { this.log( '_cleanUp :: client not disconnecting/reconnecting. Clearing and resetting reconnect.', ) this._clearReconnect() this._setupReconnect() } this._destroyKeepaliveManager() if (done && !this.connected) { this.log( '_cleanUp :: (%s) :: removing stream `done` callback `close` listener', this.options.clientId, ) this.stream.removeListener('close', done) done() } } private _storeAndSend( packet: Packet, cb: DoneCallback, cbStorePut: DoneCallback, ) { this.log( 'storeAndSend :: store packet with cmd %s to outgoingStore', packet.cmd, ) let storePacket = packet let err: Error | undefined if (storePacket.cmd === 'publish') { // The original packet is for sending. // The cloned storePacket is for storing to resend on reconnect. // Topic Alias must not be used after disconnected. storePacket = clone(packet) err = this._removeTopicAliasAndRecoverTopicName( storePacket as IPublishPacket, ) if (err) { return cb && cb(err) } } this.outgoingStore.put(storePacket, (err2) => { if (err2) { return cb && cb(err2) } cbStorePut() this._writePacket(packet, cb) }) } private _applyTopicAlias(packet: Packet) { if (this.options.protocolVersion === 5) { if (packet.cmd === 'publish') { let alias: number if (packet.properties) { alias = packet.properties.topicAlias } const topic = packet.topic.toString() if (this.topicAliasSend) { if (alias) { if (topic.length !== 0) { // register topic alias this.log( 'applyTopicAlias :: register topic: %s - alias: %d', topic, alias, ) if (!this.topicAliasSend.put(topic, alias)) { this.log( 'applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias, ) return new Error( 'Sending Topic Alias out of range', ) } } } else if (topic.length !== 0) { if (this.options.autoAssignTopicAlias) { alias = this.topicAliasSend.getAliasByTopic(topic) if (alias) { packet.topic = '' packet.properties = { ...packet.properties, topicAlias: alias, } this.log( 'applyTopicAlias :: auto assign(use) topic: %s - alias: %d', topic, alias, ) } else { alias = this.topicAliasSend.getLruAlias() this.topicAliasSend.put(topic, alias) packet.properties = { ...packet.properties, topicAlias: alias, } this.log( 'applyTopicAlias :: auto assign topic: %s - alias: %d', topic, alias, ) } } else if (this.options.autoUseTopicAlias) { alias = this.topicAliasSend.getAliasByTopic(topic) if (alias) { packet.topic = '' packet.properties = { ...packet.properties, topicAlias: alias, } this.log( 'applyTopicAlias :: auto use topic: %s - alias: %d', topic, alias, ) } } } } else if (alias) { this.log( 'applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias, ) return new Error('Sending Topic Alias out of range') } } } } private _noop(err?: Error) { this.log('noop ::', err) } /** Writes the packet to stream and emits events */ private _writePacket(packet: Packet, cb?: DoneCallback) { this.log('_writePacket :: packet: %O', packet) this.log('_writePacket :: emitting `packetsend`') this.emit('packetsend', packet) this.log('_writePacket :: writing to stream') const result = mqttPacket.writeToStream( packet, this.stream, this.options, ) this.log('_writePacket :: writeToStream result %s', result) if (!result && cb && cb !== this.noop) { this.log( '_writePacket :: handle events on `drain` once through callback.', ) this.stream.once('drain', cb) } else if (cb) { this.log('_writePacket :: invoking cb') cb() } } /** * _sendPacket - send or queue a packet * @param {Object} packet - packet options * @param {Function} cb - callback when the packet is sent * @param {Function} cbStorePut - called when message is put into outgoingStore * @param {Boolean} noStore - send without put to the store * @api private */ private _sendPacket( packet: Packet, cb?: DoneCallback, cbStorePut?: DoneCallback, noStore?: boolean, ) { this.log('_sendPacket :: (%s) :: start', this.options.clientId) cbStorePut = cbStorePut || this.noop cb = cb || this.noop const err = this._applyTopicAlias(packet) if (err) { cb(err) return } if (!this.connected) { // allow auth packets to be sent while authenticating with the broker (mqtt5 enhanced auth) if (packet.cmd === 'auth') { this._writePacket(packet, cb) return } this.log( '_sendPacket :: client not connected. Storing packet offline.', ) this._storePacket(packet, cb, cbStorePut) return } // If "noStore" is true, the message is sent without being recorded in the store. // Messages that have not received puback or pubcomp remain in the store after disconnection // and are resent from the store upon reconnection. // For resend upon reconnection, "noStore" is set to true. This is because the message is already stored in the store. // This is to avoid interrupting other processes while recording to the store. if (noStore) { this._writePacket(packet, cb) return } switch (packet.cmd) { case 'publish': break case 'pubrel': this._storeAndSend(packet, cb, cbStorePut) return default: this._writePacket(packet, cb) return } switch (packet.qos) { case 2: case 1: this._storeAndSend(packet, cb, cbStorePut) break /** * no need of case here since it will be caught by default * and jshint comply that before default it must be a break * anyway it will result in -1 evaluation */ case 0: /* falls through */ default: this._writePacket(packet, cb) break } this.log('_sendPacket :: (%s) :: end', this.options.clientId) } /** * _storePacket - queue a packet * @param {Object} packet - packet options * @param {Function} cb - callback when the packet is sent * @param {Function} cbStorePut - called when message is put into outgoingStore * @api private */ private _storePacket( packet: Packet, cb: DoneCallback, cbStorePut: DoneCallback, ) { this.log('_storePacket :: packet: %o', packet) this.log('_storePacket :: cb? %s', !!cb) cbStorePut = cbStorePut || this.noop let storePacket = packet if (storePacket.cmd === 'publish') { // The original packet is for sending. // The cloned storePacket is for storing to resend on reconnect. // Topic Alias must not be used after disconnected. storePacket = clone(packet) const err = this._removeTopicAliasAndRecoverTopicName( storePacket as IPublishPacket, ) if (err) { return cb && cb(err) } } const qos = (storePacket as IPublishPacket).qos || 0 // check that the packet is not a qos of 0, or that the command is not a publish if ((qos === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') { this.queue.push({ packet: storePacket, cb }) } else if (qos > 0) { cb = this.outgoing[storePacket.messageId] ? this.outgoing[storePacket.messageId].cb : null this.outgoingStore.put(storePacket, (err) => { if (err) { return cb && cb(err) } cbStorePut() }) } else if (cb) { cb(new Error('No connection to broker')) } } /** * _setupKeepaliveManager - setup the keepalive manager */ private _setupKeepaliveManager() { this.log( '_setupKeepaliveManager :: keepalive %d (seconds)', this.options.keepalive, ) if (!this.keepaliveManager && this.options.keepalive) { this.keepaliveManager = new KeepaliveManager( this, this.options.timerVariant, ) } } private _destroyKeepaliveManager() { if (this.keepaliveManager) { this.log('_destroyKeepaliveManager :: destroying keepalive manager') this.keepaliveManager.destroy() this.keepaliveManager = null } } /** * Reschedule the ping interval */ public reschedulePing(force = false) { if ( this.keepaliveManager && this.options.keepalive && (force || this.options.reschedulePings) ) { this._reschedulePing() } } /** * Mostly needed for test purposes */ private _reschedulePing() { this.log('_reschedulePing :: rescheduling ping') this.keepaliveManager.reschedule() } public sendPing() { this.log('_sendPing :: sending pingreq') this._sendPacket({ cmd: 'pingreq' }) } public onKeepaliveTimeout() { this.emit('error', new Error('Keepalive timeout')) this.log('onKeepaliveTimeout :: calling _cleanUp with force true') this._cleanUp(true) } /** * _resubscribe * @api private */ private _resubscribe() { this.log('_resubscribe') const _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics) if ( !this._firstConnection && // Only resubscribe in case of clean connection or if the server does not have a stored session. // The Session Present flag is available since v3.1.1 // https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349254 (this.options.clean || (this.options.protocolVersion >= 4 && !this.connackPacket.sessionPresent)) && _resubscribeTopicsKeys.length > 0 ) { if (this.options.resubscribe) { if (this.options.protocolVersion === 5) { this.log('_resubscribe: protocolVersion 5') for ( let topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++ ) { const resubscribeTopic: ISubscriptionMap = {} resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[ _resubscribeTopicsKeys[topicI] ] resubscribeTopic.resubscribe = true this.subscribe(resubscribeTopic, { properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]] .properties, }) } } else { this._resubscribeTopics.resubscribe = true this.subscribe(this._resubscribeTopics) } } else { this._resubscribeTopics = {} } } this._firstConnection = false } /** * _onConnect * * @api private */ private _onConnect(packet: IConnackPacket) { if (this.disconnected) { this.emit('connect', packet) return } this.connackPacket = packet this.messageIdProvider.clear() this._setupKeepaliveManager() this.connected = true /** check if there are packets in outgoing store and stream them */ const startStreamProcess = () => { let outStore = this.outgoingStore.createStream() /** destroy the outgoing store stream */ const remove = () => { outStore.destroy() outStore = null this._flushStoreProcessingQueue() clearStoreProcessing() } /** stop store processing and clear packets id processed */ const clearStoreProcessing = () => { this._storeProcessing = false this._packetIdsDuringStoreProcessing = {} } this.once('close', remove) outStore.on('error', (err) => { clearStoreProcessing() this._flushStoreProcessingQueue() this.removeListener('close', remove) this.emit('error', err) }) /** Read next packet in outgoing store and send it */ const storeDeliver = () => { // edge case, we wrapped this twice if (!outStore) { return } const packet2 = outStore.read(1) let cb: PacketCallback if (!packet2) { // read when data is available in the future outStore.once('readable', storeDeliver) return } this._storeProcessing = true // Skip already processed store packets if (this._packetIdsDuringStoreProcessing[packet2.messageId]) { storeDeliver() return } // Avoid unnecessary stream read operations when disconnected if (!this.disconnecting && !this.reconnectTimer) { cb = this.outgoing[packet2.messageId] ? this.outgoing[packet2.messageId].cb : null this.outgoing[packet2.messageId] = { volatile: false, cb(err, status) { // Ensure that the original callback passed in to publish gets invoked if (cb) { cb(err, status) } storeDeliver() }, } this._packetIdsDuringStoreProcessing[packet2.messageId] = true if (this.messageIdProvider.register(packet2.messageId)) { this._sendPacket(packet2, undefined, undefined, true) } else { this.log( 'messageId: %d has already used.', packet2.messageId, ) } } else if (outStore.destroy) { outStore.destroy() } } outStore.on('end', () => { let allProcessed = true for (const id in this._packetIdsDuringStoreProcessing) { if (!this._packetIdsDuringStoreProcessing[id]) { allProcessed = false break } } this.removeListener('close', remove) if (allProcessed) { clearStoreProcessing() this._invokeAllStoreProcessingQueue() this.emit('connect', packet) } else { startStreamProcess() } }) storeDeliver() } // start flowing startStreamProcess() } private _invokeStoreProcessingQueue() { // If _storeProcessing is true, the message is resending. // During resend, processing is skipped to prevent new messages from interrupting. #1635 if (!this._storeProcessing && this._storeProcessingQueue.length > 0) { const f = this._storeProcessingQueue[0] if (f && f.invoke()) { this._storeProcessingQueue.shift() return true } } return false } private _invokeAllStoreProcessingQueue() { while (this._invokeStoreProcessingQueue()) { /* empty */ } } private _flushStoreProcessingQueue() { for (const f of this._storeProcessingQueue) { if (f.cbStorePut) f.cbStorePut(new Error('Connection closed')) if (f.callback) f.callback(new Error('Connection closed')) } this._storeProcessingQueue.splice(0) } /** * _removeOutgoingAndStoreMessage * @param {Number} messageId - messageId to remove message * @param {Function} cb - called when the message removed * @api private */ private _removeOutgoingAndStoreMessage( messageId: number, cb: PacketCallback, ) { delete this.outgoing[messageId] this.outgoingStore.del({ messageId }, (err, packet) => { cb(err, packet) this.messageIdProvider.deallocate(messageId) this._invokeStoreProcessingQueue() }) } }