Connection.prototype.doRead = function () { var buffer, temp
// Fetches the data // 从内部缓存区读取数据 buffer = this.socket.read() if (!buffer) { // Waits for more data return }
// Save to the internal buffer // 保存当前进来的数据 this.buffer = Buffer.concat([this.buffer, buffer], this.buffer.length + buffer.length) // 如果连接状态为正在连接 if (this.readyState === this.CONNECTING) { // 握手处理 if (!this.readHandshake()) { // May have failed or we're waiting for more data return } }
if (this.readyState !== this.CLOSED) { // 读取数据 while ((temp = this.extractFrame()) === true) {} if (temp === false) { // Protocol error this.close(1002) // 读取数据超过最大长度限制处理 } elseif (this.buffer.length > Connection.maxBufferLength) { // Frame too big this.close(1009) } } }
// 将请求头解析成对象形式的headers Connection.prototype.readHeaders = function (lines) { var i, match
// Extract all headers // Ignore bad-formed lines and ignore the first line (HTTP header) for (i = 1; i < lines.length; i++) { if ((match = lines[i].match(/^([a-z-]+): (.+)$/i))) { this.headers[match[1].toLowerCase()] = match[2] } } }
Connection.prototype.extractFrame = function () { var fin, opcode, B, HB, mask, len, payload, start, i, hasMask
if (this.buffer.length < 2) { return }
// Is this the last frame in a sequence? B = this.buffer[0] HB = B >> 4 // 判断RSV1, RSV2, RSV3位组成的自定义协议 if (HB % 8) { // RSV1, RSV2 and RSV3 must be clear returnfalse } // 用于指示当前的 frame 是消息的最后一个分段,1为最后一个frame fin = HB === 8 // 操作码 // * %x0 表示连续消息片断 // * %x1 表示文本消息片断 // * %x2 表未二进制消息片断 // * %x3-7 为将来的非控制消息片断保留的操作码 // * %x8 表示连接关闭 // * %x9 表示心跳检查的ping // * %xA 表示心跳检查的pong // * %xB-F 为将来的控制消息片断的保留操作码 opcode = B % 16 // 判断是否是有效操作码,0~F,除了这里列出的,其他都为保留的操作码 if (opcode !== 0 && opcode !== 1 && opcode !== 2 && opcode !== 8 && opcode !== 9 && opcode !== 10) { // Invalid opcode returnfalse } if (opcode >= 8 && !fin) { // Control frames must not be fragmented returnfalse }
B = this.buffer[1] // 取出Mask位,判断传输的数据是否有加掩码,设置为1,掩码键必须放在Masking-key区域 // 客户端发送给服务端的所有消息,此位的值都是1; hasMask = B >> 7 if ((this.server && !hasMask) || (!this.server && hasMask)) { // Frames sent by clients must be masked returnfalse }
// Payload length处理 // 获取 Payload length:传输数据的长度 // 如果这个值以字节表示是0-125这个范围,那这个值就表示传输数据的长度; // 如果这个值是126,则随后的两个字节表示的是一个16进制无符号数,用来表示传输数据的长度; // 如果这个值是127,则随后的是8个字节表示的一个64位无符合数,这个数用来表示传输数据的长度 len = B % 128 start = hasMask ? 6 : 2 if (this.buffer.length < start + len) { // Not enough data in the buffer return } // 获取实际的 payload length if (len === 126) { len = this.buffer.readUInt16BE(2) start += 2 } elseif (len === 127) { // Warning: JS can only store up to 2^53 in its number format len = this.buffer.readUInt32BE(2) * Math.pow(2, 32) + this.buffer.readUInt32BE(6) start += 8 } if (this.buffer.length < start + len) { return }
// 提取有效数据 payload = this.buffer.slice(start, start + len) // 如果有Mask位,使用给定掩码解码 if (hasMask) { // Decode with the given mask mask = this.buffer.slice(start - 4, start) for (i = 0; i < payload.length; i++) { payload[i] ^= mask[i % 4] } } this.buffer = this.buffer.slice(start + len)
// Proceeds to frame processing returnthis.processFrame(fin, opcode, payload) }
// 这部分逻辑是直接拿过来用的,还是别人的更香,注释就看上面 const parseData = function (buffer) { var fin, opcode, B, HB, mask, len, payload, start, i, hasMask
if (buffer.length < 2) { return }
// Is this the last frame in a sequence? B = buffer[0] HB = B >> 4 if (HB % 8) { // RSV1, RSV2 and RSV3 must be clear returnfalse } fin = HB === 8 opcode = B % 16
if (opcode !== 0 && opcode !== 1 && opcode !== 2 && opcode !== 8 && opcode !== 9 && opcode !== 10) { // Invalid opcode returnfalse } if (opcode >= 8 && !fin) { // Control frames must not be fragmented returnfalse }
B = buffer[1] hasMask = B >> 7 if (!hasMask) { // Frames sent by clients must be masked returnfalse } len = B % 128 start = hasMask ? 6 : 2
if (buffer.length < start + len) { // Not enough data in the buffer return }
// Get the actual payload length if (len === 126) { len = buffer.readUInt16BE(2) start += 2 } elseif (len === 127) { // Warning: JS can only store up to 2^53 in its number format len = buffer.readUInt32BE(2) * Math.pow(2, 32) + buffer.readUInt32BE(6) start += 8 } if (buffer.length < start + len) { return }
// Extract the payload payload = buffer.slice(start, start + len) if (hasMask) { // Decode with the given mask mask = buffer.slice(start - 4, start) for (i = 0; i < payload.length; i++) { payload[i] ^= mask[i % 4] } } buffer = buffer.slice(start + len)
// Proceeds to frame processing return payload } // 这部分逻辑是直接拿过来用的,还是别人的更香,注释就看上面 functiongenerateMetaData(fin, opcode, masked, payload) { var len, meta, start, mask, i
len = payload.length
// Creates the buffer for meta-data meta = Buffer.alloc(2 + (len < 126 ? 0 : (len < 65536 ? 2 : 8)) + (masked ? 4 : 0))
// Sets fin and opcode meta[0] = (fin ? 128 : 0) + opcode