import { StreamBuilder } from '../shared' import { Buffer } from 'buffer' import { Transform } from 'readable-stream' import MqttClient, { IClientOptions } from '../client' import { BufferedDuplex } from '../BufferedDuplex' /* global wx */ let socketTask: any let proxy: Transform let stream: BufferedDuplex declare global { const wx: any } function buildProxy() { const _proxy = new Transform() _proxy._write = (chunk, encoding, next) => { socketTask.send({ data: chunk.buffer, success() { next() }, fail(errMsg) { next(new Error(errMsg)) }, }) } _proxy._flush = (done) => { socketTask.close({ success() { done() }, }) } return _proxy } function setDefaultOpts(opts) { if (!opts.hostname) { opts.hostname = 'localhost' } if (!opts.path) { opts.path = '/' } if (!opts.wsOptions) { opts.wsOptions = {} } } function buildUrl(opts: IClientOptions, client: MqttClient) { const protocol = opts.protocol === 'wxs' ? 'wss' : 'ws' let url = `${protocol}://${opts.hostname}${opts.path}` if (opts.port && opts.port !== 80 && opts.port !== 443) { url = `${protocol}://${opts.hostname}:${opts.port}${opts.path}` } if (typeof opts.transformWsUrl === 'function') { url = opts.transformWsUrl(url, opts, client) } return url } function bindEventHandler() { socketTask.onOpen(() => { stream.socketReady() }) socketTask.onMessage((res) => { let { data } = res if (data instanceof ArrayBuffer) data = Buffer.from(data) else data = Buffer.from(data, 'utf8') proxy.push(data) }) socketTask.onClose(() => { stream.emit('close') stream.end() stream.destroy() }) socketTask.onError((error) => { const err = new Error(error.errMsg) stream.destroy(err) }) } const buildStream: StreamBuilder = (client, opts) => { opts.hostname = opts.hostname || opts.host if (!opts.hostname) { throw new Error('Could not determine host. Specify host manually.') } const websocketSubProtocol = opts.protocolId === 'MQIsdp' && opts.protocolVersion === 3 ? 'mqttv3.1' : 'mqtt' setDefaultOpts(opts) const url = buildUrl(opts, client) // https://github.com/wechat-miniprogram/api-typings/blob/master/types/wx/lib.wx.api.d.ts#L20984 socketTask = wx.connectSocket({ url, protocols: [websocketSubProtocol], }) proxy = buildProxy() stream = new BufferedDuplex(opts, proxy, socketTask) stream._destroy = (err, cb) => { socketTask.close({ success() { if (cb) cb(err) }, }) } const destroyRef = stream.destroy stream.destroy = (err, cb) => { stream.destroy = destroyRef setTimeout(() => { socketTask.close({ fail() { stream._destroy(err, cb) }, }) }, 0) return stream } bindEventHandler() return stream } export default buildStream