Opening a two-way WebSocket is a very common software pattern. The conduit operator makes it trivially easy to initiate this exchange with a Socket.io server. It pipes an Observable into a two-way websocket. Each item in the Observable will be published to the server. The output stream will be the messages sent from the server to the client.
Messages sent to the conduit operator can include any keys you want. But two keys (topic and binary) are reserved and conduit will handle them differently. (See below for more information.)
Usage
Basic Usage: The most common use for WebSockets is to open a bi-directional data stream to publish messages to the server and receive messages back from it. The package provides a conduit operator for this purpose:
import { from } from'rxjs';import { conduit } from'@bottlenose/rxsocketio';constmessagesToSend$=from([ {body:'data'}, {body:'more data'},]);// send messages over the WebSocket and receive messages back from it...constsocketResponse$=messageToSend$.pipe(conduit({url:'http://mysite.com'}));socketResponse$.subscribe(console.log,// log all messages received from the serverconsole.error, () =>console.log('Socket Closed!') );
Sending messages to a topic
Socket.io supports "topics" for messages. By default, objects piped into the conduit operator are sent to the 'message' topic. You can simply add a topic key to the objects to send them to a particular topic:
import { from } from'rxjs';import { conduit } from'@bottlenose/rxsocketio';constmessagesToSend$=from([ {body:'Network TV is overrated', topic:'television'}, {body:'The Beatles are really, really good.', topic:'music'},]);// send messages over the WebSocket and receive messages back from it...constsocketResponse$=messageToSend$.pipe(conduit({url:'http://mysite.com', topics: ['television','music','message']}));socketResponse$.subscribe(console.log,// log all messages received from the serverconsole.error, () =>console.log('Socket Closed!'));
Sending messages with binary
Sometimes, it's useful to send binary data (like audio or video) using Socket.io. The conduit operator supports this:
import {from} from'rxjs';// binary can be an ArrayBuffer, Uint8Array, Blob, File and BufferconstmessageToSend$=from([ {topic:'audio-stream', binary:newUint8Array(), part:0}, {topic:'audio-stream', binary:newUint8Array(), part:1}, {topic:'audio-stream', binary:newUint8Array(), part:0},]);// the messages will be sent to Socket.io like this:// socket.emit('audio-stream', jsonData, binary);// and the server can parse the results like this:// socket.on('audio-stream', jsonData, binary);constsocketResponse$=messageToSend$.pipe(conduit({url, topics: ['audio-chunk-received']}));socketResponse$.subscribe(console.log,// log all messages received from the serverconsole.error, () =>console.log('Socket Closed!'));
Advanced options
You can also provide socketOptions, which will be passed into the Socket.io Manager:
import { from } from'rxjs';import { conduit } from'@bottlenose/rxsocketio';constmessagesToSend$=from([ {body:'Network TV is overrated', topic:'television'}, {body:'The Beatles are really, really good.', topic:'music'},]);// You can override defaults by passing any of the options that can normally// be given to a socket.io Manager object:constsocketOptions= { query: {token:'mysuperseeeecrettoken'}, transports: ['websocket'], timeout:5000,};// send messages over the WebSocket and receive messages back from it...constsocketResponse$=messageToSend$.pipe(conduit({url:'http://mysite.com', socketOptions}));socketResponse$.subscribe(console.log,// log all messages received from the serverconsole.error, () =>console.log('Socket Closed!'));