5 Streams in a WebSocket connection
6 ---------------------------------
8 We model a WebSocket as two duplex streams: one stream is for the wire protocol
9 over an I/O socket, and the other is for incoming/outgoing messages.
12 +----------+ +---------+ +----------+
13 [1] write(chunk) -->| ~~~~~~~~ +----->| parse() +----->| ~~~~~~~~ +--> emit('data') [2]
16 | IO | | [5] | Messages |
19 [4] emit('data') <--+ ~~~~~~~~ |<-----+ frame() |<-----+ ~~~~~~~~ |<-- write(chunk) [3]
20 +----------+ +---------+ +----------+
23 Message transfer in each direction is simple: IO receives a byte stream [1] and
24 sends this stream for parsing. The parser will periodically emit a complete
25 message text on the Messages stream [2]. Similarly, when messages are written
26 to the Messages stream [3], they are framed using the WebSocket wire format and
29 There is a feedback loop via [5] since some input from [1] will be things like
30 ping, pong and close frames. In these cases the protocol responds by emitting
31 responses directly back to [4] rather than emitting messages via [2].
33 For the purposes of flow control, we consider the sources of each Readable
34 stream to be as follows:
36 * [2] receives input from [1]
37 * [4] receives input from [1] and [3]
39 The classes below express the relationships described above without prescribing
40 anything about how parse() and frame() work, other than assuming they emit
41 'data' events to the IO and Messages streams. They will work with any protocol
42 driver having these two methods.
46 var Stream = require('stream').Stream,
47 util = require('util');
50 var IO = function(driver) {
51 this.readable = this.writable = true;
53 this._driver = driver;
55 util.inherits(IO, Stream);
57 // The IO pause() and resume() methods will be called when the socket we are
58 // piping to gets backed up and drains. Since IO output [4] comes from IO input
59 // [1] and Messages input [3], we need to tell both of those to return false
60 // from write() when this stream is paused.
62 IO.prototype.pause = function() {
64 this._driver.messages._paused = true;
67 IO.prototype.resume = function() {
71 var messages = this._driver.messages;
72 messages._paused = false;
73 messages.emit('drain');
76 // When we receive input from a socket, send it to the parser and tell the
77 // source whether to back off.
78 IO.prototype.write = function(chunk) {
79 if (!this.writable) return false;
80 this._driver.parse(chunk);
84 // The IO end() method will be called when the socket piping into it emits
85 // 'close' or 'end', i.e. the socket is closed. In this situation the Messages
86 // stream will not emit any more data so we emit 'end'.
87 IO.prototype.end = function(chunk) {
88 if (!this.writable) return;
89 if (chunk !== undefined) this.write(chunk);
90 this.writable = false;
92 var messages = this._driver.messages;
93 if (messages.readable) {
94 messages.readable = messages.writable = false;
99 IO.prototype.destroy = function() {
104 var Messages = function(driver) {
105 this.readable = this.writable = true;
106 this._paused = false;
107 this._driver = driver;
109 util.inherits(Messages, Stream);
111 // The Messages pause() and resume() methods will be called when the app that's
112 // processing the messages gets backed up and drains. If we're emitting
113 // messages too fast we should tell the source to slow down. Message output [2]
114 // comes from IO input [1].
116 Messages.prototype.pause = function() {
117 this._driver.io._paused = true;
120 Messages.prototype.resume = function() {
121 this._driver.io._paused = false;
122 this._driver.io.emit('drain');
125 // When we receive messages from the user, send them to the formatter and tell
126 // the source whether to back off.
127 Messages.prototype.write = function(message) {
128 if (!this.writable) return false;
129 if (typeof message === 'string') this._driver.text(message);
130 else this._driver.binary(message);
131 return !this._paused;
134 // The Messages end() method will be called when a stream piping into it emits
135 // 'end'. Many streams may be piped into the WebSocket and one of them ending
136 // does not mean the whole socket is done, so just process the input and move
137 // on leaving the socket open.
138 Messages.prototype.end = function(message) {
139 if (message !== undefined) this.write(message);
142 Messages.prototype.destroy = function() {};
146 exports.Messages = Messages;