Sender.js 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. /*!
  2. * ws: a node.js websocket client
  3. * Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com>
  4. * MIT Licensed
  5. */
  6. var events = require('events')
  7. , util = require('util')
  8. , crypto = require('crypto')
  9. , EventEmitter = events.EventEmitter
  10. , ErrorCodes = require('./ErrorCodes')
  11. , bufferUtil = require('./BufferUtil')
  12. , PerMessageDeflate = require('./PerMessageDeflate');
  13. /**
  14. * HyBi Sender implementation
  15. */
  16. function Sender(socket, extensions) {
  17. if (this instanceof Sender === false) {
  18. throw new TypeError("Classes can't be function-called");
  19. }
  20. events.EventEmitter.call(this);
  21. this._socket = socket;
  22. this.extensions = extensions || {};
  23. this.firstFragment = true;
  24. this.compress = false;
  25. this.messageHandlers = [];
  26. this.processing = false;
  27. }
  28. /**
  29. * Inherits from EventEmitter.
  30. */
  31. util.inherits(Sender, events.EventEmitter);
  32. /**
  33. * Sends a close instruction to the remote party.
  34. *
  35. * @api public
  36. */
  37. Sender.prototype.close = function(code, data, mask, cb) {
  38. if (typeof code !== 'undefined') {
  39. if (typeof code !== 'number' ||
  40. !ErrorCodes.isValidErrorCode(code)) throw new Error('first argument must be a valid error code number');
  41. }
  42. code = code || 1000;
  43. var dataBuffer = new Buffer(2 + (data ? Buffer.byteLength(data) : 0));
  44. writeUInt16BE.call(dataBuffer, code, 0);
  45. if (dataBuffer.length > 2) dataBuffer.write(data, 2);
  46. var self = this;
  47. this.messageHandlers.push(function() {
  48. self.frameAndSend(0x8, dataBuffer, true, mask);
  49. if (typeof cb == 'function') cb();
  50. });
  51. this.flush();
  52. };
  53. /**
  54. * Sends a ping message to the remote party.
  55. *
  56. * @api public
  57. */
  58. Sender.prototype.ping = function(data, options) {
  59. var mask = options && options.mask;
  60. var self = this;
  61. this.messageHandlers.push(function() {
  62. self.frameAndSend(0x9, data || '', true, mask);
  63. });
  64. this.flush();
  65. };
  66. /**
  67. * Sends a pong message to the remote party.
  68. *
  69. * @api public
  70. */
  71. Sender.prototype.pong = function(data, options) {
  72. var mask = options && options.mask;
  73. var self = this;
  74. this.messageHandlers.push(function() {
  75. self.frameAndSend(0xa, data || '', true, mask);
  76. });
  77. this.flush();
  78. };
  79. /**
  80. * Sends text or binary data to the remote party.
  81. *
  82. * @api public
  83. */
  84. Sender.prototype.send = function(data, options, cb) {
  85. var finalFragment = options && options.fin === false ? false : true;
  86. var mask = options && options.mask;
  87. var compress = options && options.compress;
  88. var opcode = options && options.binary ? 2 : 1;
  89. if (this.firstFragment === false) {
  90. opcode = 0;
  91. compress = false;
  92. } else {
  93. this.firstFragment = false;
  94. this.compress = compress;
  95. }
  96. if (finalFragment) this.firstFragment = true
  97. var compressFragment = this.compress;
  98. var self = this;
  99. this.messageHandlers.push(function() {
  100. if (!data || !compressFragment) {
  101. self.frameAndSend(opcode, data, finalFragment, mask, compress, cb);
  102. return;
  103. }
  104. self.processing = true;
  105. self.applyExtensions(data, finalFragment, compressFragment, function(err, data) {
  106. if (err) {
  107. if (typeof cb == 'function') cb(err);
  108. else self.emit('error', err);
  109. return;
  110. }
  111. self.frameAndSend(opcode, data, finalFragment, mask, compress, cb);
  112. self.processing = false;
  113. self.flush();
  114. });
  115. });
  116. this.flush();
  117. };
  118. /**
  119. * Frames and sends a piece of data according to the HyBi WebSocket protocol.
  120. *
  121. * @api private
  122. */
  123. Sender.prototype.frameAndSend = function(opcode, data, finalFragment, maskData, compressed, cb) {
  124. var canModifyData = false;
  125. if (!data) {
  126. try {
  127. this._socket.write(new Buffer([opcode | (finalFragment ? 0x80 : 0), 0 | (maskData ? 0x80 : 0)].concat(maskData ? [0, 0, 0, 0] : [])), 'binary', cb);
  128. }
  129. catch (e) {
  130. if (typeof cb == 'function') cb(e);
  131. else this.emit('error', e);
  132. }
  133. return;
  134. }
  135. if (!Buffer.isBuffer(data)) {
  136. canModifyData = true;
  137. if (data && (typeof data.byteLength !== 'undefined' || typeof data.buffer !== 'undefined')) {
  138. data = getArrayBuffer(data);
  139. } else {
  140. //
  141. // If people want to send a number, this would allocate the number in
  142. // bytes as memory size instead of storing the number as buffer value. So
  143. // we need to transform it to string in order to prevent possible
  144. // vulnerabilities / memory attacks.
  145. //
  146. if (typeof data === 'number') data = data.toString();
  147. data = new Buffer(data);
  148. }
  149. }
  150. var dataLength = data.length
  151. , dataOffset = maskData ? 6 : 2
  152. , secondByte = dataLength;
  153. if (dataLength >= 65536) {
  154. dataOffset += 8;
  155. secondByte = 127;
  156. }
  157. else if (dataLength > 125) {
  158. dataOffset += 2;
  159. secondByte = 126;
  160. }
  161. var mergeBuffers = dataLength < 32768 || (maskData && !canModifyData);
  162. var totalLength = mergeBuffers ? dataLength + dataOffset : dataOffset;
  163. var outputBuffer = new Buffer(totalLength);
  164. outputBuffer[0] = finalFragment ? opcode | 0x80 : opcode;
  165. if (compressed) outputBuffer[0] |= 0x40;
  166. switch (secondByte) {
  167. case 126:
  168. writeUInt16BE.call(outputBuffer, dataLength, 2);
  169. break;
  170. case 127:
  171. writeUInt32BE.call(outputBuffer, 0, 2);
  172. writeUInt32BE.call(outputBuffer, dataLength, 6);
  173. }
  174. if (maskData) {
  175. outputBuffer[1] = secondByte | 0x80;
  176. var mask = getRandomMask();
  177. outputBuffer[dataOffset - 4] = mask[0];
  178. outputBuffer[dataOffset - 3] = mask[1];
  179. outputBuffer[dataOffset - 2] = mask[2];
  180. outputBuffer[dataOffset - 1] = mask[3];
  181. if (mergeBuffers) {
  182. bufferUtil.mask(data, mask, outputBuffer, dataOffset, dataLength);
  183. try {
  184. this._socket.write(outputBuffer, 'binary', cb);
  185. }
  186. catch (e) {
  187. if (typeof cb == 'function') cb(e);
  188. else this.emit('error', e);
  189. }
  190. }
  191. else {
  192. bufferUtil.mask(data, mask, data, 0, dataLength);
  193. try {
  194. this._socket.write(outputBuffer, 'binary');
  195. this._socket.write(data, 'binary', cb);
  196. }
  197. catch (e) {
  198. if (typeof cb == 'function') cb(e);
  199. else this.emit('error', e);
  200. }
  201. }
  202. }
  203. else {
  204. outputBuffer[1] = secondByte;
  205. if (mergeBuffers) {
  206. data.copy(outputBuffer, dataOffset);
  207. try {
  208. this._socket.write(outputBuffer, 'binary', cb);
  209. }
  210. catch (e) {
  211. if (typeof cb == 'function') cb(e);
  212. else this.emit('error', e);
  213. }
  214. }
  215. else {
  216. try {
  217. this._socket.write(outputBuffer, 'binary');
  218. this._socket.write(data, 'binary', cb);
  219. }
  220. catch (e) {
  221. if (typeof cb == 'function') cb(e);
  222. else this.emit('error', e);
  223. }
  224. }
  225. }
  226. };
  227. /**
  228. * Execute message handler buffers
  229. *
  230. * @api private
  231. */
  232. Sender.prototype.flush = function() {
  233. while (!this.processing && this.messageHandlers.length) {
  234. this.messageHandlers.shift()();
  235. }
  236. };
  237. /**
  238. * Apply extensions to message
  239. *
  240. * @api private
  241. */
  242. Sender.prototype.applyExtensions = function(data, fin, compress, callback) {
  243. if ((data.buffer || data) instanceof ArrayBuffer) {
  244. data = getArrayBuffer(data);
  245. }
  246. this.extensions[PerMessageDeflate.extensionName].compress(data, fin, callback);
  247. };
  248. module.exports = Sender;
  249. function writeUInt16BE(value, offset) {
  250. this[offset] = (value & 0xff00)>>8;
  251. this[offset+1] = value & 0xff;
  252. }
  253. function writeUInt32BE(value, offset) {
  254. this[offset] = (value & 0xff000000)>>24;
  255. this[offset+1] = (value & 0xff0000)>>16;
  256. this[offset+2] = (value & 0xff00)>>8;
  257. this[offset+3] = value & 0xff;
  258. }
  259. function getArrayBuffer(data) {
  260. // data is either an ArrayBuffer or ArrayBufferView.
  261. var array = new Uint8Array(data.buffer || data)
  262. , l = data.byteLength || data.length
  263. , o = data.byteOffset || 0
  264. , buffer = new Buffer(l);
  265. for (var i = 0; i < l; ++i) {
  266. buffer[i] = array[o+i];
  267. }
  268. return buffer;
  269. }
  270. function getRandomMask() {
  271. return crypto.randomBytes(4);
  272. }