conduit

Description

Opening a two-way WebSocket is a very common software pattern. The conduit operator makes it trivially easy to initiate this exchange with a Socket.io server. It pipes an Observable into a two-way websocket. Each item in the Observable will be published to the server. The output stream will be the messages sent from the server to the client.

Messages sent to the conduit operator can include any keys you want. But two keys (topic and binary) are reserved and conduit will handle them differently. (See below for more information.)

Usage

Basic Usage: The most common use for WebSockets is to open a bi-directional data stream to publish messages to the server and receive messages back from it. The package provides a conduit operator for this purpose:

import { from } from 'rxjs';
import { conduit } from '@bottlenose/rxsocketio';

const messagesToSend$ = from([
  {body: 'data'},
  {body: 'more data'},
]);

// send messages over the WebSocket and receive messages back from it...
const socketResponse$ = messageToSend$.pipe(
  conduit({url: 'http://mysite.com'})
);
socketResponse$.subscribe(
  console.log, // log all messages received from the server
  console.error,
  () => console.log('Socket Closed!') 
);

Sending messages to a topic

Socket.io supports "topics" for messages. By default, objects piped into the conduit operator are sent to the 'message' topic. You can simply add a topic key to the objects to send them to a particular topic:

import { from } from 'rxjs';
import { conduit } from '@bottlenose/rxsocketio';

const messagesToSend$ = from([
  {body: 'Network TV is overrated', topic: 'television'},
  {body: 'The Beatles are really, really good.', topic: 'music'},
]);

// send messages over the WebSocket and receive messages back from it...
const socketResponse$ = messageToSend$.pipe(
  conduit({url: 'http://mysite.com', topics: ['television', 'music', 'message']})
);
socketResponse$.subscribe(
  console.log, // log all messages received from the server
  console.error,
  () => console.log('Socket Closed!')
);

Sending messages with binary

Sometimes, it's useful to send binary data (like audio or video) using Socket.io. The conduit operator supports this:

import {from} from 'rxjs';

// binary can be an ArrayBuffer, Uint8Array, Blob, File and Buffer
const messageToSend$ = from([
  {topic: 'audio-stream', binary: new Uint8Array(), part: 0},
  {topic: 'audio-stream', binary: new Uint8Array(), part: 1},
  {topic: 'audio-stream', binary: new Uint8Array(), part: 0},
]);

// the messages will be sent to Socket.io like this:
// socket.emit('audio-stream', jsonData, binary);
// and the server can parse the results like this:
// socket.on('audio-stream', jsonData, binary);
const socketResponse$ = messageToSend$.pipe(
  conduit({url, topics: ['audio-chunk-received']})
);
socketResponse$.subscribe(
  console.log, // log all messages received from the server
  console.error,
  () => console.log('Socket Closed!')
);

Advanced options

You can also provide socketOptions, which will be passed into the Socket.io Manager:

import { from } from 'rxjs';
import { conduit } from '@bottlenose/rxsocketio';

const messagesToSend$ = from([
  {body: 'Network TV is overrated', topic: 'television'},
  {body: 'The Beatles are really, really good.', topic: 'music'},
]);

// You can override defaults by passing any of the options that can normally
// be given to a socket.io Manager object:
const socketOptions = {
  query: {token: 'mysuperseeeecrettoken'},
  transports: ['websocket'],
  timeout: 5000,
};

// send messages over the WebSocket and receive messages back from it...
const socketResponse$ = messageToSend$.pipe(
  conduit({url: 'http://mysite.com', socketOptions})
);
socketResponse$.subscribe(
  console.log, // log all messages received from the server
  console.error,
  () => console.log('Socket Closed!')
);

API

conduit({
  url<String>,
  [topics[String]],
  [socketOptions[Object]]
})

Since

0.5

Parameters

Options

  • topics[String]: Topics to subscribe to when receiving messages from the server. Defaults to ['message'].

  • socketOptions[Object]: Options to pass into the socket.io Manager.

  • stop$[Observable]: The conduit and its observable will terminate (and disconnect) if the stop$ observable emits any items.

Returns

Returns the stream of messages from the server.

Last updated