Skip to main content

RxPipeline (beta)

The RxPipeline plugin enables you to run operations depending on writes to a collection. Whenever a write happens on the source collection of a pipeline, a handler is called to process the writes and run operations on another collection.

You could have a similar behavior as observing the collection stream and process data on emits:

mySourceCollection.$.subscribe(event => {/* ...process...*/});

While this could work in some cases, it causes many problems that are fixed by using the pipeline plugin instead:

  • In a RxPipeline, only the Leading Instance runs the operations. For example when you have multiple browser tabs open, only one will run the processing and when that tab is closed, another tab will become elected leader and continue the pipeline processing.
  • On sudden stops and restarts of the JavaScript process, the processing will continue at the correct checkpoint and not miss out any documents even on unexpected crashes.
  • Reads/Writes on the destination collection are halted while the pipeline is processing. This ensures your queries only return fully processed documents and no partial results. So when you do a query to the destination collection directly after a write to the source collection, you can be sure you query results are up to date and the pipeline has already been run at the moment the query resolved:
await mySourceCollection.insert({/* ... */});

// because our pipeline blocks reads to the destination, we know that the result array
// contains data created on top of the previously inserted documents.
const result = myDestinationCollection.find().exec();

Creating a RxPipeline

Pipelines are created on top of a source RxCollection and have another RxCollection as destination. An identifier is used to identify the state of the pipeline so that different pipelines have a different processing checkpoint state. A plain JavaScript function handler is used to process the data of the source collection writes.

const pipeline = await mySourceCollection.addPipeline({
identifier: 'my-pipeline',
destination: myDestinationCollection,
handler: async (docs) => {
/**
* Here you can process the documents and to writes to
* the destination collection.
*/
for (const doc of docs) {
await myDestinationCollection.insert({
id: doc.primary,
category: doc.category
});
}
}
});
beta

The pipeline plugin is in beta mode and the API might be changed without a major RxDB release.

Pipeline handlers must be idempotent

Because a JavaScript process can exit at any time, like when the user closes a browser tab, the pipeline handler function must be idempotent. This means when it only runs partially and is started again with the same input, it should still end up in the correct results.

Pipeline handlers must not throw

Pipeline handlers must never throw. If you run operations inside of the handler that might cause errors, you must wrap the handlers code with a try catch by yourself and also handle retries.

Be careful when doing http requests in the handler

When you run http requests inside of your handler, you no longer have an offline first application because reads to the destination collection will be blocked until all handlers have finished. When your client is offline, therefore the collection is blocked for reads and writes.

Use Cases for RxPipeline

The RxPipeline is a handy building block for different features and plugins. You can use it to aggregate data or restructure local data.

UseCase: Re-Index data that comes from replication

Sometimes you want to replicate atomic documents over the wire but locally you want to split these documents for better indexing. For example you replicate email documents that have multiple receivers in a string-array. While string-arrays cannot be indexes, locally you need a way to query for all emails of a given receiver. To handle this case you can set up a RxPipeline that writes the mapping into a separate collection:

const pipeline = await emailCollection.addPipeline({
identifier: 'map-email-receivers',
destination: emailByReceiverCollection,
handler: async (docs) => {
for (const doc of docs) {
// remove previous mapping
await emailByReceiverCollection.find({emailId: doc.primary}).remove();
// add new mapping
if(!doc.deleted) {
await emailByReceiverCollection.bulkInsert(
doc.receivers.map(receiver => ({
emailId: doc.primary,
receiver: receiver
}))
);
}
}
}
});

With this you can efficiently query for "all emails that a person received" by running:

const mailIds = await emailByReceiverCollection.find({receiver: 'foobar@example.com'}).exec();

You can utilize the pipeline plugin to index text data for efficient fulltext search.

const pipeline = await emailCollection.addPipeline({
identifier: 'email-fulltext-search',
destination: mailByWordCollection,
handler: async (docs) => {
for (const doc of docs) {
// remove previous mapping
await mailByWordCollection.find({emailId: doc.primary}).remove();
// add new mapping
if(!doc.deleted) {
const words = doc.text.split(' ');
await mailByWordCollection.bulkInsert(
words.map(word => ({
emailId: doc.primary,
word: word
}))
);
}
}
}
});

With this you can efficiently query for "all emails that contain a given word" by running:

const mailIds = await emailByReceiverCollection.find({word: 'foobar'}).exec();

UseCase: Download data based on source documents

When you have to fetch data for each document of a collection from a server, you can use the pipeline to ensure all documents have their data downloaded and no document is missed out.

const pipeline = await emailCollection.addPipeline({
identifier: 'download-data',
destination: serverDataCollection,
handler: async (docs) => {
for (const doc of docs) {
const response = await fetch('https://example.com/doc/' + doc.primary);
const serverData = await response.json();
await serverDataCollection.upsert({
id: doc.primary,
data: serverData
});
}
}
});

RxPipeline method

awaitIdle()

You can await the idleness of a pipeline with await myRxPipeline.awaitIdle(). This will await a promise that resolved when the pipeline has processed all documents and is not running anymore.

destroy()

await myRxPipeline.destroy() stops the pipeline so that is no longer doing stuff. This is automatically called when the RxCollection or RxDatabase of the pipeline is destroyed.

remove()

await myRxPipeline.remove() removes the pipeline and all metadata which it has stored. Recreating the pipeline afterwards will start processing all source document from scratch.