You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

392 lines
10 KiB

  1. 'use strict';
  2. const { randomFillSync } = require('crypto');
  3. const PerMessageDeflate = require('./permessage-deflate');
  4. const { EMPTY_BUFFER } = require('./constants');
  5. const { isValidStatusCode } = require('./validation');
  6. const { mask: applyMask, toBuffer } = require('./buffer-util');
  7. const mask = Buffer.alloc(4);
  8. /**
  9. * HyBi Sender implementation.
  10. */
  11. class Sender {
  12. /**
  13. * Creates a Sender instance.
  14. *
  15. * @param {net.Socket} socket The connection socket
  16. * @param {Object} extensions An object containing the negotiated extensions
  17. */
  18. constructor(socket, extensions) {
  19. this._extensions = extensions || {};
  20. this._socket = socket;
  21. this._firstFragment = true;
  22. this._compress = false;
  23. this._bufferedBytes = 0;
  24. this._deflating = false;
  25. this._queue = [];
  26. }
  27. /**
  28. * Frames a piece of data according to the HyBi WebSocket protocol.
  29. *
  30. * @param {Buffer} data The data to frame
  31. * @param {Object} options Options object
  32. * @param {Number} options.opcode The opcode
  33. * @param {Boolean} options.readOnly Specifies whether `data` can be modified
  34. * @param {Boolean} options.fin Specifies whether or not to set the FIN bit
  35. * @param {Boolean} options.mask Specifies whether or not to mask `data`
  36. * @param {Boolean} options.rsv1 Specifies whether or not to set the RSV1 bit
  37. * @return {Buffer[]} The framed data as a list of `Buffer` instances
  38. * @public
  39. */
  40. static frame(data, options) {
  41. const merge = options.mask && options.readOnly;
  42. let offset = options.mask ? 6 : 2;
  43. let payloadLength = data.length;
  44. if (data.length >= 65536) {
  45. offset += 8;
  46. payloadLength = 127;
  47. } else if (data.length > 125) {
  48. offset += 2;
  49. payloadLength = 126;
  50. }
  51. const target = Buffer.allocUnsafe(merge ? data.length + offset : offset);
  52. target[0] = options.fin ? options.opcode | 0x80 : options.opcode;
  53. if (options.rsv1) target[0] |= 0x40;
  54. target[1] = payloadLength;
  55. if (payloadLength === 126) {
  56. target.writeUInt16BE(data.length, 2);
  57. } else if (payloadLength === 127) {
  58. target.writeUInt32BE(0, 2);
  59. target.writeUInt32BE(data.length, 6);
  60. }
  61. if (!options.mask) return [target, data];
  62. randomFillSync(mask, 0, 4);
  63. target[1] |= 0x80;
  64. target[offset - 4] = mask[0];
  65. target[offset - 3] = mask[1];
  66. target[offset - 2] = mask[2];
  67. target[offset - 1] = mask[3];
  68. if (merge) {
  69. applyMask(data, mask, target, offset, data.length);
  70. return [target];
  71. }
  72. applyMask(data, mask, data, 0, data.length);
  73. return [target, data];
  74. }
  75. /**
  76. * Sends a close message to the other peer.
  77. *
  78. * @param {(Number|undefined)} code The status code component of the body
  79. * @param {String} data The message component of the body
  80. * @param {Boolean} mask Specifies whether or not to mask the message
  81. * @param {Function} cb Callback
  82. * @public
  83. */
  84. close(code, data, mask, cb) {
  85. let buf;
  86. if (code === undefined) {
  87. buf = EMPTY_BUFFER;
  88. } else if (typeof code !== 'number' || !isValidStatusCode(code)) {
  89. throw new TypeError('First argument must be a valid error code number');
  90. } else if (data === undefined || data === '') {
  91. buf = Buffer.allocUnsafe(2);
  92. buf.writeUInt16BE(code, 0);
  93. } else {
  94. const length = Buffer.byteLength(data);
  95. if (length > 123) {
  96. throw new RangeError('The message must not be greater than 123 bytes');
  97. }
  98. buf = Buffer.allocUnsafe(2 + length);
  99. buf.writeUInt16BE(code, 0);
  100. buf.write(data, 2);
  101. }
  102. if (this._deflating) {
  103. this.enqueue([this.doClose, buf, mask, cb]);
  104. } else {
  105. this.doClose(buf, mask, cb);
  106. }
  107. }
  108. /**
  109. * Frames and sends a close message.
  110. *
  111. * @param {Buffer} data The message to send
  112. * @param {Boolean} mask Specifies whether or not to mask `data`
  113. * @param {Function} cb Callback
  114. * @private
  115. */
  116. doClose(data, mask, cb) {
  117. this.sendFrame(
  118. Sender.frame(data, {
  119. fin: true,
  120. rsv1: false,
  121. opcode: 0x08,
  122. mask,
  123. readOnly: false
  124. }),
  125. cb
  126. );
  127. }
  128. /**
  129. * Sends a ping message to the other peer.
  130. *
  131. * @param {*} data The message to send
  132. * @param {Boolean} mask Specifies whether or not to mask `data`
  133. * @param {Function} cb Callback
  134. * @public
  135. */
  136. ping(data, mask, cb) {
  137. const buf = toBuffer(data);
  138. if (buf.length > 125) {
  139. throw new RangeError('The data size must not be greater than 125 bytes');
  140. }
  141. if (this._deflating) {
  142. this.enqueue([this.doPing, buf, mask, toBuffer.readOnly, cb]);
  143. } else {
  144. this.doPing(buf, mask, toBuffer.readOnly, cb);
  145. }
  146. }
  147. /**
  148. * Frames and sends a ping message.
  149. *
  150. * @param {Buffer} data The message to send
  151. * @param {Boolean} mask Specifies whether or not to mask `data`
  152. * @param {Boolean} readOnly Specifies whether `data` can be modified
  153. * @param {Function} cb Callback
  154. * @private
  155. */
  156. doPing(data, mask, readOnly, cb) {
  157. this.sendFrame(
  158. Sender.frame(data, {
  159. fin: true,
  160. rsv1: false,
  161. opcode: 0x09,
  162. mask,
  163. readOnly
  164. }),
  165. cb
  166. );
  167. }
  168. /**
  169. * Sends a pong message to the other peer.
  170. *
  171. * @param {*} data The message to send
  172. * @param {Boolean} mask Specifies whether or not to mask `data`
  173. * @param {Function} cb Callback
  174. * @public
  175. */
  176. pong(data, mask, cb) {
  177. const buf = toBuffer(data);
  178. if (buf.length > 125) {
  179. throw new RangeError('The data size must not be greater than 125 bytes');
  180. }
  181. if (this._deflating) {
  182. this.enqueue([this.doPong, buf, mask, toBuffer.readOnly, cb]);
  183. } else {
  184. this.doPong(buf, mask, toBuffer.readOnly, cb);
  185. }
  186. }
  187. /**
  188. * Frames and sends a pong message.
  189. *
  190. * @param {Buffer} data The message to send
  191. * @param {Boolean} mask Specifies whether or not to mask `data`
  192. * @param {Boolean} readOnly Specifies whether `data` can be modified
  193. * @param {Function} cb Callback
  194. * @private
  195. */
  196. doPong(data, mask, readOnly, cb) {
  197. this.sendFrame(
  198. Sender.frame(data, {
  199. fin: true,
  200. rsv1: false,
  201. opcode: 0x0a,
  202. mask,
  203. readOnly
  204. }),
  205. cb
  206. );
  207. }
  208. /**
  209. * Sends a data message to the other peer.
  210. *
  211. * @param {*} data The message to send
  212. * @param {Object} options Options object
  213. * @param {Boolean} options.compress Specifies whether or not to compress `data`
  214. * @param {Boolean} options.binary Specifies whether `data` is binary or text
  215. * @param {Boolean} options.fin Specifies whether the fragment is the last one
  216. * @param {Boolean} options.mask Specifies whether or not to mask `data`
  217. * @param {Function} cb Callback
  218. * @public
  219. */
  220. send(data, options, cb) {
  221. const buf = toBuffer(data);
  222. const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
  223. let opcode = options.binary ? 2 : 1;
  224. let rsv1 = options.compress;
  225. if (this._firstFragment) {
  226. this._firstFragment = false;
  227. if (rsv1 && perMessageDeflate) {
  228. rsv1 = buf.length >= perMessageDeflate._threshold;
  229. }
  230. this._compress = rsv1;
  231. } else {
  232. rsv1 = false;
  233. opcode = 0;
  234. }
  235. if (options.fin) this._firstFragment = true;
  236. if (perMessageDeflate) {
  237. const opts = {
  238. fin: options.fin,
  239. rsv1,
  240. opcode,
  241. mask: options.mask,
  242. readOnly: toBuffer.readOnly
  243. };
  244. if (this._deflating) {
  245. this.enqueue([this.dispatch, buf, this._compress, opts, cb]);
  246. } else {
  247. this.dispatch(buf, this._compress, opts, cb);
  248. }
  249. } else {
  250. this.sendFrame(
  251. Sender.frame(buf, {
  252. fin: options.fin,
  253. rsv1: false,
  254. opcode,
  255. mask: options.mask,
  256. readOnly: toBuffer.readOnly
  257. }),
  258. cb
  259. );
  260. }
  261. }
  262. /**
  263. * Dispatches a data message.
  264. *
  265. * @param {Buffer} data The message to send
  266. * @param {Boolean} compress Specifies whether or not to compress `data`
  267. * @param {Object} options Options object
  268. * @param {Number} options.opcode The opcode
  269. * @param {Boolean} options.readOnly Specifies whether `data` can be modified
  270. * @param {Boolean} options.fin Specifies whether or not to set the FIN bit
  271. * @param {Boolean} options.mask Specifies whether or not to mask `data`
  272. * @param {Boolean} options.rsv1 Specifies whether or not to set the RSV1 bit
  273. * @param {Function} cb Callback
  274. * @private
  275. */
  276. dispatch(data, compress, options, cb) {
  277. if (!compress) {
  278. this.sendFrame(Sender.frame(data, options), cb);
  279. return;
  280. }
  281. const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
  282. this._bufferedBytes += data.length;
  283. this._deflating = true;
  284. perMessageDeflate.compress(data, options.fin, (_, buf) => {
  285. if (this._socket.destroyed) {
  286. const err = new Error(
  287. 'The socket was closed while data was being compressed'
  288. );
  289. if (typeof cb === 'function') cb(err);
  290. for (let i = 0; i < this._queue.length; i++) {
  291. const callback = this._queue[i][4];
  292. if (typeof callback === 'function') callback(err);
  293. }
  294. return;
  295. }
  296. this._bufferedBytes -= data.length;
  297. this._deflating = false;
  298. options.readOnly = false;
  299. this.sendFrame(Sender.frame(buf, options), cb);
  300. this.dequeue();
  301. });
  302. }
  303. /**
  304. * Executes queued send operations.
  305. *
  306. * @private
  307. */
  308. dequeue() {
  309. while (!this._deflating && this._queue.length) {
  310. const params = this._queue.shift();
  311. this._bufferedBytes -= params[1].length;
  312. Reflect.apply(params[0], this, params.slice(1));
  313. }
  314. }
  315. /**
  316. * Enqueues a send operation.
  317. *
  318. * @param {Array} params Send operation parameters.
  319. * @private
  320. */
  321. enqueue(params) {
  322. this._bufferedBytes += params[1].length;
  323. this._queue.push(params);
  324. }
  325. /**
  326. * Sends a frame.
  327. *
  328. * @param {Buffer[]} list The frame to send
  329. * @param {Function} cb Callback
  330. * @private
  331. */
  332. sendFrame(list, cb) {
  333. if (list.length === 2) {
  334. this._socket.cork();
  335. this._socket.write(list[0]);
  336. this._socket.write(list[1], cb);
  337. this._socket.uncork();
  338. } else {
  339. this._socket.write(list[0], cb);
  340. }
  341. }
  342. }
  343. module.exports = Sender;