Websocket库Ws原理分析

 

前言:本文几基于nodejs的原理ws模块分析websocket的原理。

ws服务器逻辑由websocket-server.js的分析WebSocketServer类实现。该类初始化了一些参数后就执行以下代码

if (this._server) {       // 给server注册下面事件,原理返回一个注销函数(用于注销下面注册的分析事件)       this._removeListeners = addListeners(this._server, {         // listen成功的回调         listening: this.emit.bind(this, listening),         error: this.emit.bind(this, error),         // 收到协议升级请求的回调         upgrade: (req, socket, head) => {           this.handleUpgrade(req, socket, head, (ws) => {             // 处理成功,触发链接成功事件             this.emit(connection,原理 ws, req);           });         }       }); 

我们看到ws监听了upgrade事件,当有websocket请求到来时就会执行handleUpgrade处理升级请求,分析升级成功后触发connection事件。原理我们先看handleUpgrade。分析handleUpgrade逻辑不多,原理主要是分析处理和校验升级请求的一些http头。ws提供了一个校验的原理钩子。处理完http头后,分析会调verifyClient校验是原理否允许升级请求。如果成功则执行completeUpgrade。分析顾名思义,原理completeUpgrade是完成升级请求的函数,该函数返回同意协议升级并且设置一些http响应头。另外还有一些重要的逻辑处理。

const ws = new WebSocket(null); // 设置管理socket的数据 ws.setSocket(socket, head, this.options.maxPayload); // cb就是this.emit(connection, ws, req); cb(ws); 

我们看到这里新建了一个WebSocket对象并且调用了他的亿华云setSocket函数。我们来看看他做了什么。setSocket的逻辑非常多,我们慢慢分析。

数据接收者

class Receiver extends Writable {} 

我们看到数据接收者是一个可写流。这就意味着我们可以往里面写数据。

const receiver = new Receiver(); receiver.write(hello); 

我们看一下这时候Receiver的逻辑。

_write(chunk, encoding, cb) {     if (this._opcode === 0x08 && this._state == GET_INFO) return cb();     this._bufferedBytes += chunk.length;     this._buffers.push(chunk);     this.startLoop(cb);   } 

首先记录当前数据的大小,然后把数据存起来,最后执行startLoop。

startLoop(cb) {     let err;     this._loop = true;     do {       switch (this._state) {         // 忽略其他case         case GET_DATA:           err = this.getData(cb);           break;         default:           // `INFLATING`           this._loop = false;           return;       }     } while (this._loop);     cb(err);   } 

我们知道websocket是基于tcp上层的应用层协议,所以我们收到数据时,需要解析出一个个数据包(粘包问题),所以Receiver其实就是一个状态机,每次收到数据的时候,都会根据当前的状态进行状态流转。比如当前处于GET_DATA状态,那么就会进行数据的处理。我们接着看一下数据处理的逻辑。

getData(cb) {     let data = EMPTY_BUFFER;     // 提取数据部分     if (this._payloadLength) {       data = this.consume(this._payloadLength);       if (this._masked) unmask(data, this._mask);     }     // 是控制报文则执行controlMessage     if (this._opcode > 0x07) return this.controlMessage(data);     // 做了压缩,则先解压     if (this._compressed) {       this._state = INFLATING;       this.decompress(data, cb);       return;     }     // 没有压缩则直接处理(先存到_fragments,然后执行dataMessage)     if (data.length) {       this._messageLength = this._totalPayloadLength;       this._fragments.push(data);     }     return this.dataMessage();   } 

我们执行websocket协议定义了报文的亿华云计算类型,比如控制报文,数据报文。我们分别看一下这两个的逻辑。

controlMessage(data) {     // 连接关闭     if (this._opcode === 0x08) {       this._loop = false;       if (data.length === 0) {         this.emit(conclude, 1005, );         this.end();       }     } else if (this._opcode === 0x09) {       this.emit(ping, data);     } else {       this.emit(pong, data);     }     this._state = GET_INFO;   } 

我们看到控制报文包括三种(conclude、ping、pong)。而数据报文只有this.emit(message, data);一种。这个就是接收者的整体逻辑。

2 数据发送者

数据发送者是对websocket协议的封装,当用户调研数据发送者的send接口发送数据时,数据发送者会组装成一个websocket协议的包再发送出去。

send(data, options, cb) {     const buf = toBuffer(data);     const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];     let opcode = options.binary ? 2 : 1;     let rsv1 = options.compress;     if (this._firstFragment) {       this._firstFragment = false;       if (rsv1 && perMessageDeflate) {         rsv1 = buf.length >= perMessageDeflate._threshold;       }       this._compress = rsv1;     } else {       rsv1 = false;       opcode = 0;     }     if (options.fin) this._firstFragment = true;     // 需要压缩     if (perMessageDeflate) {       const opts = {         fin: options.fin,         rsv1,         opcode,         mask: options.mask,         readOnly: toBuffer.readOnly       };       // 正在压缩,则排队等待,否则执行压缩       if (this._deflating) {         this.enqueue([this.dispatch, buf, this._compress, opts, cb]);       } else {         this.dispatch(buf, this._compress, opts, cb);       }     } else {       // 不需要压缩,直接发送       this.sendFrame(         Sender.frame(buf, {           fin: options.fin,           rsv1: false,           opcode,           mask: options.mask,           readOnly: toBuffer.readOnly         }),         cb       );     }   } 

send函数做了一些参数的处理后发送数据,但是如果需要压缩的话,要压缩后才能发送。数据处理完成后调用真正的发送函数

sendFrame(list, cb) {     if (list.length === 2) {       this._socket.cork();       this._socket.write(list[0]);       this._socket.write(list[1], cb);       this._socket.uncork();     } else {       this._socket.write(list[0], cb);     }   } 

了解了数据接收者和发送者的逻辑后,我们看一下websocket对象和setSocket函数做了什么事情,websocket对象本质是站群服务器对TCP socket的封装。它接收来自底层的数据,然后透传给数据接收者,数据接收者处理完后,触发websocket对应的对应的事件,比如message事件。发送数据的时候,websocket会调用数据发送者的接口,数据发送者组装成websocket协议的数据包后再发送出去,架构如下图所示。

接下来我们看看setSocket的逻辑

setSocket(socket, head, maxPayload) {     // 数据接收者,负责处理tcp上收到的数据(socket是tcp层的socket)     const receiver = new Receiver(...);     // 数据发送者,负责发送数据给对端     this._sender = new Sender(socket, this._extensions);     // 数据接收者,负责解析数据     this._receiver = receiver;     // net模块的tcp socket     this._socket = socket;     // 关联起来     receiver[kWebSocket] = this;     socket[kWebSocket] = this;     // 监听接收者的事件,解析数据的时候会回调     receiver.on(conclude, receiverOnConclude);     // 下面两个事件由Writable触发     receiver.on(drain, receiverOnDrain);     receiver.on(error, receiverOnError);     receiver.on(message, receiverOnMessage);     receiver.on(ping, receiverOnPing);     receiver.on(pong, receiverOnPong);     // 清除定时器     socket.setTimeout(0);     // 关闭nagle算法     socket.setNoDelay();     // 升级请求中,携带的http body,通常是空     if (head.length > 0) socket.unshift(head);     // 监听tcp底层的事件     socket.on(close, socketOnClose);     socket.on(data, socketOnData);     socket.on(end, socketOnEnd);     socket.on(error, socketOnError);     this.readyState = WebSocket.OPEN;     this.emit(open);   } 

我们看到里面监听了各种事件,下面以data事件为例,看一下处理过程。当tcp socket收到数据的时候会执行socketOnData函数。

function socketOnData(chunk) {   // 会调用receiver里的_write函数,其实就是换成到receiver对象上,如果数据解析出错,会触发socket error事件   if (!this[kWebSocket]._receiver.write(chunk)) {     this.pause();   } } 

socketOnData通过接收者的接口把数据传给接收者,接收者会解析数据,然后触发对应的事件,比如message。

receiver.on(message, receiverOnMessage); function receiverOnMessage(data) {   this[kWebSocket].emit(message, data); } 

然后ws的socket对象继续往上层触发message事件。this[kWebSocket]的值是ws提供的socket对象本身。架构图如下。

这就是ws实现websocket协议的基本原理,具体细节可以参考源码。

人工智能
上一篇:联想电脑账户密码错误的原因及解决方法(探索联想电脑账户密码错误的背后问题,教你轻松解决困扰)
下一篇:探索GalaxyNote3Neo的卓越功能与性能(发现GalaxyNote3Neo的创新之处,了解其先进的技术特性)