/** * Module dependencies */ import { Readable } from 'readable-stream' import { Packet } from 'mqtt-packet' import { DoneCallback } from './shared' const streamsOpts = { objectMode: true } const defaultStoreOptions = { clean: true, } export interface IStoreOptions { /** * true, clear _inflights at close */ clean?: boolean } export type PacketCallback = (error?: Error, packet?: Packet) => void export interface IStore { /** * Adds a packet to the store, a packet is * anything that has a messageId property. * */ put(packet: Packet, cb: DoneCallback): IStore /** * Creates a stream with all the packets in the store * */ createStream(): Readable /** * deletes a packet from the store. */ del(packet: Pick, cb: PacketCallback): IStore /** * get a packet from the store. */ get(packet: Pick, cb: PacketCallback): IStore /** * Close the store */ close(cb: DoneCallback): void } /** * In-memory implementation of the message store * This can actually be saved into files. * * @param {Object} [options] - store options */ export default class Store implements IStore { private options: IStoreOptions private _inflights: Map constructor(options?: IStoreOptions) { this.options = options || {} // Defaults this.options = { ...defaultStoreOptions, ...options } this._inflights = new Map() } /** * Adds a packet to the store, a packet is * anything that has a messageId property. * */ put(packet: Packet, cb: DoneCallback) { this._inflights.set(packet.messageId, packet) if (cb) { cb() } return this } /** * Creates a stream with all the packets in the store * */ createStream() { const stream = new Readable(streamsOpts) const values = [] let destroyed = false let i = 0 this._inflights.forEach((value, key) => { values.push(value) }) stream._read = () => { if (!destroyed && i < values.length) { stream.push(values[i++]) } else { stream.push(null) } } stream.destroy = (err) => { if (destroyed) { return } destroyed = true setTimeout(() => { stream.emit('close') }, 0) return stream } return stream } /** * deletes a packet from the store. */ del(packet: Pick, cb: PacketCallback) { const toDelete = this._inflights.get(packet.messageId) if (toDelete) { this._inflights.delete(packet.messageId) cb(null, toDelete) } else if (cb) { cb(new Error('missing packet')) } return this } /** * get a packet from the store. */ get(packet: Pick, cb: PacketCallback) { const storedPacket = this._inflights.get(packet.messageId) if (storedPacket) { cb(null, storedPacket) } else if (cb) { cb(new Error('missing packet')) } return this } /** * Close the store */ close(cb: DoneCallback) { if (this.options.clean) { this._inflights = null } if (cb) { cb() } } }