# RxDB Replication Documentation > Complete reference for RxDB replication and data sync: HTTP, GraphQL, WebSocket, CouchDB, Firestore, Supabase, WebRTC P2P, and more. This file contains all documentation content in a single document following the llmstxt.org standard. ## Appwrite Realtime Sync for Local-First Apps import {Tabs} from '@site/src/components/tabs'; import {Steps} from '@site/src/components/steps'; import {VideoBox} from '@site/src/components/video-box'; import {RxdbMongoDiagramPlain} from '@site/src/components/mongodb-sync'; import {HeadlineWithIcon} from '@site/src/components/headline-with-icon'; # RxDB Appwrite Replication This replication plugin allows you to synchronize documents between RxDB and an Appwrite server. It supports both push and pull replication, live updates via Appwrite's real-time subscriptions, [offline-capability](./offline-first.md) and [conflict resolution](./transactions-conflicts-revisions.md).
## Why you should use RxDB with Appwrite? **Appwrite** is a secure, open-source backend server that simplifies backend tasks like user authentication, storage, database management, and real-time APIs. **[RxDatabase](./rx-database.md)** is a reactive database for the frontend that offers offline-first capabilities and rich client-side data handling. Combining the two provides several benefits: 1. [Offline-First](./offline-first.md): RxDB keeps all data locally, so your application remains fully functional even when the network is unavailable. When connectivity returns, the RxDB ↔ Appwrite replication automatically resolves and synchronizes changes. 2. **Real-Time Sync**: With Appwrite’s real-time subscriptions and RxDB’s live replication, you can build collaborative features that update across all clients instantaneously. 3. [Conflict Handling](./transactions-conflicts-revisions.md): RxDB offers flexible conflict resolution strategies, making it simpler to handle concurrent edits across multiple users or devices. 4. **Scalable & Secure**: Appwrite is built to handle production loads with granular access controls, while RxDB easily scales across various storage backends on the client side. 5. **Simplicity & Modularity**: RxDB’s plugin-based architecture, combined with Appwrite’s Cloud offering makes it one of the easiest way to build local-first [realtime apps](./articles/realtime-database.md) that scale. ## Preparing the Appwrite Server You can either use the appwrite cloud or self-host the Appwrite server. In this tutorial we use the Cloud which is recommended for beginners because it is way easier to set up. You can later decide to self-host if needed. ### Set up an Appwrite Endpoint and Project #### Self-Hosted ##### Docker Ensure docker and docker-compose is installed and your version are up to date: ```bash docker-compose -v ``` ##### Run the installation script The installation script runs inside of a docker container. It will create a docker-compose file and an `.env` file. ```bash docker run -it --rm \ --volume /var/run/docker.sock:/var/run/docker.sock \ --volume "$(pwd)"/appwrite:/usr/src/code/appwrite:rw \ --entrypoint="install" \ appwrite/appwrite:1.6.1 ``` ##### Start/Stop After the installation is done, you can manually stop and start the appwrite instance with docker compose: ```bash # stop docker-compose down # start docker-compose up ``` #### Appwrite Cloud ##### Create a Cloud Account Got to the Appwrite Console, create an account and login. #### Create a Project At the console click the `+ Create Project` button to create a new project. Remember the `project-id` which will be used later. ### Create an Appwrite Database and Collection After creating an Appwrite project you have to create an Appwrite Database and a collection, you can either do this in code with the node-appwrite SDK or in the Appwrite Console as shown in this video:
### Add your documents attributes In the appwrite collection, create all attributes of your documents. You have to define all the fields that your document in your [RxDB schema](./rx-schema.md) knows about. Notice that Appwrite does not allow for nested attributes. So when you use RxDB with Appwrite, you should also not have nested attributes in your RxDB schema. ### Add a `deleted` attribute Appwrite (natively) hard-deletes documents. But for offline-handling RxDB needs soft-deleted documents on the server so that the deletion state can be replicated with other clients. In RxDB, `_deleted` indicates that a document is removed locally and you need a similar field in your Appwrite collection on the Server: You must define a deletedField with any name to mark documents as "deleted" in the remote collection. Mostly you would use a boolean field named `deleted` (set it to `required`). The plugin will treat any document with `{ [deletedField]: true }` as deleted and replicate that state to local RxDB. ### Set the Permission on the Appwrite Collection Appwrite uses permissions to control data access on the collection level. Make sure that in the Console at `Collection -> Settings -> Permissions` you have set the permission according to what you want to allow your clients to do. For testing, just enable all of them (Create, Read, Update and Delete).
## Setting up the RxDB - Appwrite Replication Now that we have set up the Appwrite server, we can go to the client side code and set up RxDB and the replication: ### Install the Appwrite SDK and RxDB: ```bash npm install appwrite rxdb ``` ### Import the Appwrite SDK and RxDB ```ts import { replicateAppwrite } from 'rxdb/plugins/replication-appwrite'; import { createRxDatabase, addRxPlugin, RxCollection } from 'rxdb/plugins/core'; import { getRxStorageLocalstorage } from 'rxdb/plugins/storage-localstorage'; import { Client } from 'appwrite'; ``` ### Create a Database with a Collection ```ts const db = await createRxDatabase({ name: 'mydb', storage: getRxStorageLocalstorage() }); const mySchema = { title: 'my schema', version: 0, primaryKey: 'id', type: 'object', properties: { id: { type: 'string', maxLength: 100 }, name: { type: 'string' } }, required: ['id', 'name'] }; await db.addCollections({ humans: { schema: mySchema } }); const collection = db.humans; ``` ### Configure the Appwrite Client #### Appwrite Cloud ```ts const client = new Client(); client.setEndpoint('https://cloud.appwrite.io/v1'); client.setProject('YOUR_APPWRITE_PROJECT_ID'); ``` #### Self-Hosted ```ts const client = new Client(); client.setEndpoint('http://localhost/v1'); client.setProject('YOUR_APPWRITE_PROJECT_ID'); ``` ### Start the Replication ```ts const replicationState = replicateAppwrite({ replicationIdentifier: 'my-appwrite-replication', client, databaseId: 'YOUR_APPWRITE_DATABASE_ID', collectionId: 'YOUR_APPWRITE_COLLECTION_ID', deletedField: 'deleted', // Field that represents deletion in Appwrite collection, pull: { batchSize: 10, }, push: { batchSize: 10 }, /* * ... * You can set all other options for RxDB replication states * like 'live' or 'retryTime' * ... */ }); ``` ### Do other things with the replication state The `RxAppwriteReplicationState` which is returned from `replicateAppwrite()` allows you to run all functionality of the normal [RxReplicationState](./replication.md). ## FAQ
Does Appwrite support multiple databases and subcollections? Yes, Appwrite supports creating multiple top-level databases within a single project, which cleanly partition collections. However, Appwrite is a rigid NoSQL document store that does *not* support nested subcollections (unlike Firebase). When utilizing the **[RxDB Appwrite Replication](./replication.md)** plugin, your local RxDB schema must mirror this flat topology precisely, keeping all documents completely devoid of complex nested relationships.
What database driver does Appwrite use under the hood? Appwrite uses MariaDB (a highly performant MySQL fork) as its core backing database driver. To offer developers a flat NoSQL experience, Appwrite abstracts the MariaDB relational complexity behind a unified Document API. This architectural mapping allows **[RxDB](./rx-database.md)** to replicate data effortlessly into Appwrite via standard REST endpoints without ever dealing with strict SQL table mappings or migrations.
Does Appwrite feature native real-time sync for offline apps? Appwrite natively provides robust WebSocket subscriptions allowing clients to receive real-time document events while the network is active. However, Appwrite does *not* feature a built-in offline-first caching or background-sync engine. To achieve true offline capabilities, you must mount the **[RxDB Appwrite Replication](./replication.md)** plugin on the client. RxDB handles all local caching, queues offline writes securely, and automatically pushes local mutations to Appwrite when connectivity returns.
## Limitations of the Appwrite Replication Plugin - Appwrite primary keys only allow for the characters `a-z`, `A-Z`, `0-9`, and underscore `_` (They cannot start with a leading underscore). Also the primary key has a max length of 36 characters. - The Appwrite replication **only works on browsers**. This is because the Appwrite SDK does not support subscriptions in Node.js. - Appwrite does not allow for bulk write operations so on push one HTTP request will be made per document. Reads run in bulk so this is mostly not a problem. - Appwrite does not allow for transactions or "update-if" calls which can lead to overwriting documents instead of properly handling [conflicts](./transactions-conflicts-revisions.md#conflicts) when multiple clients edit the same document in parallel. This is not a problem for inserts because "insert-if-not" calls are made. - Nested attributes in Appwrite collections are only possible via experimental relationship attributes, and compatibility with RxDB is not tested. Users opting to use these experimental relationship attributes with RxDB do so at their own risk. --- ## RxDB's CouchDB Replication Plugin import {HeadlineWithIcon} from '@site/src/components/headline-with-icon'; # Replication with CouchDB A plugin to replicate between a [RxCollection](./rx-collection.md) and a CouchDB server. This plugin uses the RxDB [Sync Engine](./replication.md) to replicate with a CouchDB endpoint. This plugin **does NOT** use the official [CouchDB replication protocol](https://docs.couchdb.org/en/stable/replication/protocol.html) because the CouchDB protocol was optimized for server-to-server replication and is not suitable for fast client side applications, mostly because it has to run many HTTP-requests (at least one per document) and also it has to store the whole revision tree of the documents at the client. This makes initial replication and querying very slow. Because the way RxDB handles revisions and documents is very similar to CouchDB, using the RxDB replication with a CouchDB endpoint is pretty straightforward. ## Pros - Faster initial replication. - Works with any [RxStorage](./rx-storage.md), not just [PouchDB](./rx-storage-pouchdb.md). - Easier [conflict handling](./transactions-conflicts-revisions.md) because conflicts are handled during replication and not afterwards. - Does not have to store all document revisions on the client, only stores the newest version. ## Cons - Does not support the replication of [attachments](./rx-attachment.md). - Like all CouchDB replication plugins, this one is also limited to replicating 6 collections in parallel. [Read this for workarounds](./replication-couchdb.md#limitations) ## Usage Start the replication via `replicateCouchDB()`. ```ts import { replicateCouchDB } from 'rxdb/plugins/replication-couchdb'; const replicationState = replicateCouchDB( { replicationIdentifier: 'my-couchdb-replication', collection: myRxCollection, // url to the CouchDB endpoint (required) url: 'http://example.com/db/humans', /** * true for live replication, * false for a one-time replication. * [default=true] */ live: true, /** * A custom fetch() method can be provided * to add authentication or credentials. * Can be swapped out dynamically * by running 'replicationState.fetch = newFetchMethod;'. * (optional) */ fetch: myCustomFetchMethod, pull: { /** * Amount of documents to be fetched in one HTTP request * (optional) */ batchSize: 60, /** * Custom modifier to mutate pulled documents * before storing them in RxDB. * (optional) */ modifier: docData => {/* ... */}, /** * Heartbeat time in milliseconds * for the long polling of the changestream. * @link https://docs.couchdb.org/en/3.2.2-docs/api/database/changes.html * (optional, default=60000) */ heartbeat: 60000 }, push: { /** * How many local changes to process at once. * (optional) */ batchSize: 60, /** * Custom modifier to mutate documents * before sending them to the CouchDB endpoint. * (optional) */ modifier: docData => {/* ... */} } } ); ``` When you call `replicateCouchDB()` it returns a `RxCouchDBReplicationState` which can be used to subscribe to events, for debugging or other functions. It extends the [RxReplicationState](./replication.md) so any other method that can be used there can also be used on the CouchDB replication state. ## Conflict handling When conflicts appear during replication, the `conflictHandler` of the `RxCollection` is used, equal to the other replication plugins. Read more about conflict handling [here](./replication.md#conflict-handling). ## Auth example Lets say for authentication you need to add a [bearer token](https://swagger.io/docs/specification/authentication/bearer-authentication/) as HTTP header to each request. You can achieve that by crafting a custom `fetch()` method that adds the header field. ```ts const myCustomFetch = (url, options) => { // flat clone the given options to not mutate the input const optionsWithAuth = Object.assign({}, options); // ensure the headers property exists if(!optionsWithAuth.headers) { optionsWithAuth.headers = {}; } // add bearer token to headers optionsWithAuth.headers['Authorization'] ='Basic S0VLU0UhIExFQ0...'; // call the original fetch function with our custom options. return fetch( url, optionsWithAuth ); }; const replicationState = replicateCouchDB( { replicationIdentifier: 'my-couchdb-replication', collection: myRxCollection, url: 'http://example.com/db/humans', /** * Add the custom fetch function here. */ fetch: myCustomFetch, pull: {}, push: {} } ); ``` Also when your bearer token changes over time, you can set a new custom `fetch` method while the replication is running: ```ts replicationState.fetch = newCustomFetchMethod; ``` Also there is a helper method `getFetchWithCouchDBAuthorization()` to create a fetch handler with authorization: ```ts import { replicateCouchDB, getFetchWithCouchDBAuthorization } from 'rxdb/plugins/replication-couchdb'; const replicationState = replicateCouchDB( { replicationIdentifier: 'my-couchdb-replication', collection: myRxCollection, url: 'http://example.com/db/humans', /** * Add the custom fetch function here. */ fetch: getFetchWithCouchDBAuthorization('myUsername', 'myPassword'), pull: {}, push: {} } ); ``` ## Limitations Since CouchDB only allows synchronization through HTTP/1.1 long polling requests there is a limitation of 6 active synchronization connections before the browser prevents sending any further request. This limitation is at the level of browser per tab per domain (some browser, especially older ones, might have a different limit, [see here](https://docs.pushtechnology.com/cloud/latest/manual/html/designguide/solution/support/connection_limitations.html)). Since this limitation is at the **browser** level there are several solutions: - Use only a single database for all entities and set a "type" field for each of the documents - Create multiple subdomains for CouchDB and use a max of 6 active synchronizations (or less) for each - Use a proxy (ex: HAProxy) between the browser and CouchDB and configure it to use HTTP/2.0, since HTTP/2.0 multiplexes requests. If you use nginx in front of your CouchDB, you can use these settings to enable http2-proxying to prevent the connection limit problem: ``` server { http2 on; location /db { rewrite /db/(.*) /$1 break; proxy_pass http://172.0.0.1:5984; proxy_redirect off; proxy_buffering off; proxy_set_header Host $host; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded; proxy_set_header Connection "keep_alive"; } } ``` ## Known problems ### Database missing In contrast to PouchDB, this plugin **does NOT** automatically create missing CouchDB databases. If your CouchDB server does not have a database yet, you have to create it by yourself by running a `PUT` request to the database `name` url: ```ts // create a 'humans' CouchDB database on the server const remoteDatabaseName = 'humans'; await fetch( 'http://example.com/db/' + remoteDatabaseName, { method: 'PUT' } ); ``` ## React Native React Native does not have a global `fetch` method. You have to import fetch method with the [cross-fetch](https://www.npmjs.com/package/cross-fetch) package: ```ts import crossFetch from 'cross-fetch'; const replicationState = replicateCouchDB( { replicationIdentifier: 'my-couchdb-replication', collection: myRxCollection, url: 'http://example.com/db/humans', fetch: crossFetch, pull: {}, push: {} } ); ``` --- ## Smooth Firestore Sync for Offline Apps import {Steps} from '@site/src/components/steps'; import {HeadlineWithIcon} from '@site/src/components/headline-with-icon'; # Replication with Firestore from Firebase With the `replication-firestore` plugin you can do a two-way realtime replication between your client side [RxDB](./) Database and a [Cloud Firestore](https://firebase.google.com/docs/firestore) database that is hosted on the Firebase platform. It will use the [RxDB Sync Engine](./replication.md) to manage the replication streams, error- and [conflict handling](./transactions-conflicts-revisions.md). Replicating your Firestore state to RxDB can bring multiple benefits compared to using the Firestore directly: - It can reduce your cloud fees because your queries run against the local state of the documents without touching a server and writes can be batched up locally and send to the backend in bulks. This is mostly the case for read heavy applications. - You can run complex [NoSQL queries](./why-nosql.md) on your documents because you are not bound to the [Firestore Query](https://firebase.google.com/docs/firestore/query-data/queries) handling. You can also use local indexes, [compression](./key-compression.md) and [encryption](./encryption.md) and do things like [fulltext search](./fulltext-search.md), fully locally. - Your application can be truly [Offline-First](./offline-first.md) because your data is stored in a client side database. In contrast Firestore by itself only provides options to support [offline also](https://cloud.google.com/firestore/docs/manage-data/enable-offline) which more works like a cache and requires the user to be online at application start to run authentication. - It reduces the vendor lock in because you can switch out the backend server afterwards without having to rebuild big parts of the application. RxDB supports replication plugins with multiple technologies and it is even easy to set up with your [custom backend](./replication.md). - You can use sophisticated [conflict resolution strategies](./replication.md#conflict-handling) so you are not bound to the Firestore [last-write-wins](https://stackoverflow.com/a/47781502/3443137) strategy which is not suitable for many applications. - The initial load time of your application can be decreased because it will do an incremental replication on restarts. ## Usage ### Install the firebase package ```bash npm install firebase ``` ### Initialize your Firestore Database ```ts import * as firebase from 'firebase/app'; import { getFirestore, collection } from 'firebase/firestore'; const projectId = 'my-project-id'; const app = firebase.initializeApp({ projectId, databaseURL: 'http://localhost:8080?ns=' + projectId, /* ... */ }); const firestoreDatabase = getFirestore(app); const firestoreCollection = collection(firestoreDatabase, 'my-collection-name'); ``` ### Start the Replication Start the replication by calling `replicateFirestore()` on your [RxCollection](./rx-collection.md). ```ts const replicationState = replicateFirestore({ replicationIdentifier: `https://firestore.googleapis.com/${projectId}`, collection: myRxCollection, firestore: { projectId, database: firestoreDatabase, collection: firestoreCollection }, /** * (required) Enable push and pull replication with firestore by * providing an object with optional filter * for each type of replication desired. * [default=disabled] */ pull: {}, push: {}, /** * Either do a live or a one-time replication * [default=true] */ live: true, /** * (optional) likely you should just use the default. * * In firestore it is not possible to read out * the internally used write timestamp of a document. * Even if we could read it out, it is not indexed which * is required for fetch 'changes-since-x'. * So instead we have to rely on a custom user defined field * that contains the server time * which is set by firestore via serverTimestamp() * Notice that the serverTimestampField MUST NOT be * part of the collections RxJsonSchema! * [default='serverTimestamp'] */ serverTimestampField: 'serverTimestamp' }); ``` To observe and cancel the replication, you can use any other methods from the [ReplicationState](./replication.md) like `error$`, `cancel()` and `awaitInitialReplication()`. ## Handling deletes RxDB requires you to never [fully delete documents](./replication.md#data-layout-on-the-server). This is needed to be able to replicate the deletion state of a document to other instances. The firestore replication will set a boolean `_deleted` field to all documents to indicate the deletion state. You can change this by setting a different `deletedField` in the sync options. ## Do not set `enableIndexedDbPersistence()` Firestore has the `enableIndexedDbPersistence()` feature which caches document states locally to [IndexedDB](./rx-storage-indexeddb.md). This is not needed when you replicate your Firestore with RxDB because RxDB itself will store the data locally already. ## Using the replication with an already existing Firestore Database State If you have not used RxDB before and you already have documents inside of your Firestore database, you have to manually set the `_deleted` field to `false` and the `serverTimestamp` to all existing documents. ```ts import { getDocs, query, where, serverTimestamp } from 'firebase/firestore'; const allDocsResult = await getDocs(query(firestoreCollection)); allDocsResult.forEach(doc => { doc.update({ _deleted: false, serverTimestamp: serverTimestamp() }) }); ``` Also notice that if you do writes from non-RxDB applications, you have to keep these fields in sync. It is recommended to use the [Firestore triggers](https://firebase.google.com/docs/functions/firestore-events) to ensure that. ## Filtered Replication You might need to replicate only a subset of your collection, either to or from Firestore. You can achieve this using `push.filter` and `pull.filter` options. ```ts const replicationState = replicateFirestore( { collection: myRxCollection, firestore: { projectId, database: firestoreDatabase, collection: firestoreCollection }, pull: { filter: [ where('ownerId', '==', userId) ] }, push: { filter: (item) => item.syncEnabled === true } } ); ``` Keep in mind that you can not use inequality operators `(<, <=, !=, not-in, >, or >=)` in `pull.filter` since that would cause a conflict with ordering by `serverTimestamp`. --- ## Google Drive Sync import {Steps} from '@site/src/components/steps'; import {BetaBlock} from '@site/src/components/beta-block'; import {HeadlineWithIcon} from '@site/src/components/headline-with-icon'; # Replication with Google Drive The `replication-google-drive` plugin allows you to replicate your client-side [RxDB](./) database to a folder in the user's Google Drive. This enables cross-device [sync](./replication.md) for single users without requiring any backend server. ## Overview The replication uses the Google Drive API v3 and v2. - **[Offline-First](./offline-first.md):** Users can work offline. Changes are synced when they go online. - **No Backend Required:** You don't need to host your own database server. - **Cross-Device:** Users can access their data from multiple devices by signing into the same Google account. - **Realtime Sync:** Uses [WebRTC](./replication-webrtc.md) for peer-to-peer signaling to achieve near real-time updates. Uses the same google-drive folder instead of a signaling-server. ## Usage ### Enable Google Drive API You need to enable the Google Drive API in the [Google Cloud Console](https://console.cloud.google.com/) and create credentials (OAuth 2.0 Client ID) for your application. ### Authenticate the User Your application must handle the OAuth flow to get an `accessToken` from Google. You can use libraries like [`@react-oauth/google`](https://www.npmjs.com/package/@react-oauth/google) or the Google Identity Services SDK. ### Start Replication Once you have the `accessToken`, you can start the replication. ```ts import { replicateGoogleDrive } from 'rxdb/plugins/replication-google-drive'; const replicationState = await replicateGoogleDrive({ replicationIdentifier: 'my-app-drive-sync', collection: myRxCollection, // RxCollection googleDrive: { oauthClientId: 'YOUR_GOOGLE_CLIENT_ID', authToken: 'USER_ACCESS_TOKEN', folderPath: 'my-app-data/user-1' }, live: true, pull: { batchSize: 60, modifier: doc => doc // (optional) modify invalid data }, push: { batchSize: 60, modifier: doc => doc // (optional) modify before sending } }); // Observe replication states replicationState.error$.subscribe(err => { console.error('Replication error:', err); }); replicationState.awaitInitialReplication().then(() => { console.log('Initial replication done'); }); ``` ## Signaling & WebRTC Google Drive does not provide real-time events for file changes. If a user changes data on **User Device A**, **User Device B** would not know about it until it periodically polls the Drive API. To achieve real-time updates, this plugin uses **WebRTC** to signal changes between connected devices. 1. Devices create "signal files" in a `signaling` subfolder on Google Drive. 2. Other devices detect these files, read the WebRTC connection data, and establish a direct P2P connection with each other. 3. When a device makes a write, it sends a "RESYNC" signal via WebRTC to all connected peers to notify them about the change. ### Polyfill for Node.js WebRTC is native in browsers but requires a polyfill in Node.js. ```ts import wrtc from 'node-datachannel/polyfill'; // or 'wrtc' package // ... const replicationState = await replicateGoogleDrive({ // ... signalingOptions: { wrtc // Pass the polyfill here } }); ``` ## Options ### googleDrive - **oauthClientId** `string`: The OAuth 2.0 Client ID of your application. - **authToken** `string`: The valid access token associated with the user. - **folderPath** `string`: The path to the folder in Google Drive where data should be stored. - The plugin will ensure this folder exists. - It must **not** be the root folder. - It creates subfolders `docs` (for data) and `signaling` (for WebRTC). - **apiEndpoint** `string` (optional): Defaults to `https://www.googleapis.com`. Useful for mocking or proxies. - **transactionTimeout** `number` (optional): Default `10000` (10s). The plugin uses a `transaction` file in Drive to ensure data integrity during writes. This is the timeout after which a lock is considered stale. ### pull & push Standard RxDB [Replication Options](./replication.md) for batch size, modifiers, etc. ## Technical Details ### File Mapping - Each RxDB document corresponds to **one JSON file** in the `docs` subfolder. - The filename is `[primaryKey].json`. - This simple mapping makes it easy to inspect or backup data manually. ### Checkpointing - The replication relies on the `modifiedTime` of files in Google Drive. ### Conflict Resolution - Conflicts are handled using the standard RxDB [conflict handling](./replication.md#conflict-handling) strategies. - The plugin assumes a master-slave replication pattern where the client (RxDB) merges changes. - If the `transaction` file is locked by another device, the write retries until the lock is released or times out. ## Limitations - **Rate Limits:** Google Drive API has strict rate limits. The plugin attempts to handle 429 errors with exponential backoff, but heavy concurrent writes might hit these limits. - **Latency:** Changes take time to propagate and appear in listings (eventual consistency), which the plugin handles internally. - **Signaling Delay:** The initial WebRTC handshake requires writing and reading files from Drive, which can take a few seconds. Once connected, signaling is instant. ## Testing For testing, it is recommended to use [google-drive-mock](https://github.com/pubkey/google-drive-mock). It simulates the Google Drive API so you can run tests without real credentials. --- ## GraphQL Replication import {HeadlineWithIcon} from '@site/src/components/headline-with-icon'; # Replication with GraphQL The GraphQL replication provides handlers for GraphQL to run [replication](./replication.md) with GraphQL as the transport layer. The GraphQL replication is mostly used when you already have a backend that exposes a GraphQL API that can be adjusted to serve as a replication endpoint. If you do not already have a GraphQL endpoint, using the [HTTP replication](./replication-http.md) is an easier solution. :::note To play around, check out the full example of the RxDB [GraphQL replication with server and client](https://github.com/pubkey/rxdb/tree/master/examples/graphql) ::: ## Usage Before you use the GraphQL replication, make sure you've learned how the [RxDB replication](./replication.md) works. ### Creating a compatible GraphQL Server At the server-side, there must exist an endpoint which returns newer rows when the last `checkpoint` is used as input. For example lets say you create a `Query` `pullHuman` which returns a list of document writes that happened after the given checkpoint. For the push-replication, you also need a `Mutation` `pushHuman` which lets RxDB update data of documents by sending the previous document state and the new client document state. Also for being able to stream all ongoing events, we need a `Subscription` called `streamHuman`. ```graphql input HumanInput { id: ID!, name: String!, lastName: String!, updatedAt: Float!, deleted: Boolean! } type Human { id: ID!, name: String!, lastName: String!, updatedAt: Float!, deleted: Boolean! } input Checkpoint { id: String!, updatedAt: Float! } type HumanPullBulk { documents: [Human]! checkpoint: Checkpoint } type Query { pullHuman(checkpoint: Checkpoint, limit: Int!): HumanPullBulk! } input HumanInputPushRow { assumedMasterState: HeroInputPushRowT0AssumedMasterStateT0 newDocumentState: HeroInputPushRowT0NewDocumentStateT0! } type Mutation { # Returns a list of all conflicts # If no document write caused a conflict, return an empty list. pushHuman(rows: [HumanInputPushRow!]): [Human] } # headers are used to authenticate the subscriptions # over websockets. input Headers { AUTH_TOKEN: String!; } type Subscription { streamHuman(headers: Headers): HumanPullBulk! } ``` The GraphQL resolver for the `pullHuman` would then look like: ```js const rootValue = { pullHuman: args => { const minId = args.checkpoint ? args.checkpoint.id : ''; const minUpdatedAt = args.checkpoint ? args.checkpoint.updatedAt : 0; // sorted by updatedAt first and the id as second const sortedDocuments = documents.sort((a, b) => { if (a.updatedAt > b.updatedAt) return 1; if (a.updatedAt < b.updatedAt) return -1; if (a.updatedAt === b.updatedAt) { if (a.id > b.id) return 1; if (a.id < b.id) return -1; else return 0; } }); // only return documents newer than the input document const filterForMinUpdatedAtAndId = sortedDocuments.filter(doc => { if (doc.updatedAt < minUpdatedAt) return false; if (doc.updatedAt > minUpdatedAt) return true; if (doc.updatedAt === minUpdatedAt) { // if updatedAt is equal, compare by id if (doc.id > minId) return true; else return false; } }); // only return some documents in one batch const limitedDocs = filterForMinUpdatedAtAndId.slice(0, args.limit); // use the last document for the checkpoint const lastDoc = limitedDocs[limitedDocs.length - 1]; const retCheckpoint = lastDoc ? { id: lastDoc.id, updatedAt: lastDoc.updatedAt } : args.checkpoint; return { documents: limitedDocs, checkpoint: retCheckpoint }; } }; ``` For examples for the other resolvers, consult the [GraphQL Example Project](https://github.com/pubkey/rxdb/blob/master/examples/graphql/server/index.js). ### RxDB Client #### Pull replication For the pull-replication, you first need a `pullQueryBuilder`. This is a function that gets the last replication `checkpoint` and a `limit` as input and returns an object with a GraphQL-query and its variables (or a promise that resolves to the same object). RxDB will use the query builder to construct what is later sent to the GraphQL endpoint. ```js const pullQueryBuilder = (checkpoint, limit) => { /** * The first pull does not have a checkpoint * so we fill it up with defaults */ if (!checkpoint) { checkpoint = { id: '', updatedAt: 0 }; } const query = `query PullHuman($checkpoint: CheckpointInput, $limit: Int!) { pullHuman(checkpoint: $checkpoint, limit: $limit) { documents { id name age updatedAt deleted } checkpoint { id updatedAt } } }`; return { query, operationName: 'PullHuman', variables: { checkpoint, limit } }; }; ``` With the queryBuilder, you can then setup the pull-replication. ```js import { replicateGraphQL } from 'rxdb/plugins/replication-graphql'; const replicationState = replicateGraphQL( { collection: myRxCollection, // urls to the GraphQL endpoints url: { http: 'http://example.com/graphql' }, pull: { queryBuilder: pullQueryBuilder, // the queryBuilder from above // (optional) modifies all pulled documents // before they are handled by RxDB modifier: doc => doc, // (optional) specifies the object path to // access the document(s). Otherwise, the // first result of the response data // is used. dataPath: undefined, /** * Amount of documents that the remote will send in one request. * If the response contains less than [batchSize] documents, * RxDB will assume there are no more changes on the backend * that are not replicated. * This value is the same as the limit in the pullHuman() schema. * [default=100] */ batchSize: 50 }, // headers which will be used in http requests against the server. headers: { Authorization: 'Bearer abcde...' }, /** * Options that have been inherited from the RxReplication */ deletedField: 'deleted', live: true, retryTime: 1000 * 5, waitForLeadership: true, autoStart: true, } ); ``` #### Push replication For the push-replication, you also need a `queryBuilder`. Here, the builder receives a changed document as input which has to be sent to the server. It also returns a GraphQL-Query and its data. ```js const pushQueryBuilder = rows => { const query = ` mutation PushHuman($writeRows: [HumanInputPushRow!]) { pushHuman(writeRows: $writeRows) { id name age updatedAt deleted } } `; const variables = { writeRows: rows }; return { query, operationName: 'PushHuman', variables }; }; ``` With the queryBuilder, you can then setup the push-replication. ```js const replicationState = replicateGraphQL( { collection: myRxCollection, // urls to the GraphQL endpoints url: { http: 'http://example.com/graphql' }, push: { queryBuilder: pushQueryBuilder, // the queryBuilder from above /** * batchSize (optional) * Amount of document that will be pushed * to the server in a single request. */ batchSize: 5, /** * modifier (optional) * Modifies all pushed documents before * they are sent to the GraphQL endpoint. * Returning null will skip the document. */ modifier: doc => doc }, headers: { Authorization: 'Bearer abcde...' }, pull: { /* ... */ }, /* ... */ } ); ``` #### Pull Stream To create a **realtime** replication, you need to create a pull stream that pulls ongoing writes from the server. The pull stream gets the `headers` of the `RxReplicationState` as input, so that it can be authenticated on the backend. ```js const pullStreamQueryBuilder = (headers) => { const query = `subscription onStream($headers: Headers) { streamHero(headers: $headers) { documents { id, name, age, updatedAt, deleted }, checkpoint { id updatedAt } } }`; return { query, variables: { headers } }; }; ``` With the `pullStreamQueryBuilder` you can then start a realtime replication. ```js const replicationState = replicateGraphQL( { collection: myRxCollection, // urls to the GraphQL endpoints url: { http: 'http://example.com/graphql', // The websocket has to use a different url. ws: 'ws://example.com/subscriptions' }, push: { batchSize: 100, queryBuilder: pushQueryBuilder }, headers: { Authorization: 'Bearer abcde...' }, pull: { batchSize: 100, queryBuilder: pullQueryBuilder, streamQueryBuilder: pullStreamQueryBuilder, // Includes headers as connection // parameter to Websocket. includeWsHeaders: false, // Websocket options that can be passed // as a parameter to initialize the // subscription // Can be applied anything from the // graphql-ws ClientOptions: // https://the-guild.dev/graphql/ws/docs/interfaces/client.ClientOptions // Except these parameters: 'url', // 'shouldRetry', 'webSocketImpl' - // locked for internal usage // Note: if you provide connectionParams // as a wsOption, make sure it returns any // necessary headers (e.g. authorization) // because providing your own // connectionParams prevents headers from // being included automatically wsOptions: { retryAttempts: 10, } }, deletedField: 'deleted' } ); ``` :::note If it is not possible to create a websocket server on your backend, you can use any other method to pull out the ongoing events from the backend and then you can send them into `RxReplicationState.emitEvent()`. ::: ### Transforming null to undefined in optional fields GraphQL fills up non-existent optional values with `null` while RxDB required them to be `undefined`. Therefore, if your schema contains optional properties, you have to transform the pulled data to switch out `null` to `undefined` ```js const replicationState: RxGraphQLReplicationState = replicateGraphQL( { collection: myRxCollection, url: {/* ... */}, headers: {/* ... */}, push: {/* ... */}, pull: { queryBuilder: pullQueryBuilder, modifier: (doc => { // We have to remove optional non-existent field values // they are set as null by GraphQL but should be undefined Object.entries(doc).forEach(([k, v]) => { if (v === null) { delete doc[k]; } }); return doc; }) }, /* ... */ } ); ``` ### pull.responseModifier With the `pull.responseModifier` you can modify the whole response from the GraphQL endpoint **before** it is processed by RxDB. For example if your endpoint is not capable of returning a valid checkpoint, but instead only returns the plain document array, you can use the `responseModifier` to aggregate the checkpoint from the returned documents. ```ts import { } from 'rxdb'; const replicationState: RxGraphQLReplicationState = replicateGraphQL( { collection: myRxCollection, url: {/* ... */}, headers: {/* ... */}, push: {/* ... */}, pull: { responseModifier: async function( plainResponse, // the exact response that was returned from the server // either 'handler' if plainResponse // came from the pull.handler, // or 'stream' if it came from // the pull.stream origin, // if origin==='handler', the // requestCheckpoint contains the // checkpoint that was sent to // the backend requestCheckpoint ) { /** * In this example we aggregate the * checkpoint from the documents array * that was returned from the graphql endpoint. */ const docs = plainResponse; return { documents: docs, checkpoint: docs.length === 0 ? requestCheckpoint : { name: lastOfArray(docs).name, updatedAt: lastOfArray(docs).updatedAt } }; } }, /* ... */ } ); ``` ### push.responseModifier It's also possible to modify the response of a push mutation. For example if your server returns more than just the conflicting docs: ```graphql type PushResponse { conflicts: [Human] conflictMessages: [ReplicationConflictMessage] } type Mutation { # Returns a PushResponse type that contains # the conflicts along with other information pushHuman(rows: [HumanInputPushRow!]): PushResponse! } ``` ```ts import {} from "rxdb"; const replicationState: RxGraphQLReplicationState = replicateGraphQL( { collection: myRxCollection, url: {/* ... */}, headers: {/* ... */}, push: { responseModifier: async function (plainResponse) { /** * In this example we aggregate the * conflicting documents from a * response object */ return plainResponse.conflicts; }, }, pull: {/* ... */}, /* ... */ } ); ``` #### Helper Functions RxDB provides the helper functions `graphQLSchemaFromRxSchema()`, `pullQueryBuilderFromRxSchema()`, `pullStreamBuilderFromRxSchema()` and `pushQueryBuilderFromRxSchema()` that can be used to generate handlers and schemas from the [RxJsonSchema](./rx-schema.md). To learn how to use them, please inspect the [GraphQL Example](https://github.com/pubkey/rxdb/tree/master/examples/graphql). ### RxGraphQLReplicationState When you call `myCollection.syncGraphQL()` it returns a `RxGraphQLReplicationState` which can be used to subscribe to events, for debugging or other functions. It extends the [RxReplicationState](./replication.md) with some GraphQL specific methods. #### .setHeaders() Changes the headers for the replication after it has been set up. ```js replicationState.setHeaders({ Authorization: `...` }); ``` #### Sending Cookies The underlying fetch framework uses a `same-origin` policy for credentials by default. That means, cookies and session data is only shared if you backend and frontend run on the same domain and port. Pass the credential parameter to `include` cookies in requests to servers from different origins via: ```js replicationState.setCredentials('include'); ``` or directly pass it in the `replicateGraphQL` function: ```js replicateGraphQL( { collection: myRxCollection, /* ... */ credentials: 'include', /* ... */ } ); ``` See [the fetch spec](https://fetch.spec.whatwg.org/#concept-request-credentials-mode) for more information about available options. :::note To play around, check out the full example of the RxDB [GraphQL replication with server and client](https://github.com/pubkey/rxdb/tree/master/examples/graphql) ::: --- ## HTTP Replication import {Steps} from '@site/src/components/steps'; import {Tabs} from '@site/src/components/tabs'; # HTTP Replication from a custom server to RxDB clients While RxDB has a range of backend-specific replication plugins (like [GraphQL](./replication-graphql.md) or [Firestore](./replication-firestore.md)), the replication is built in a way to make it very easy to replicate data from a custom server to RxDB clients. Using **HTTP** as a transport protocol makes it simple to create a compatible backend on top of your existing infrastructure. For events that must be sent from the server to the client, we can use [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events). In this tutorial we will implement a HTTP replication between an RxDB client and a [MongoDB](./rx-storage-mongodb.md) express server. You can adapt this for any other backend database technology like PostgreSQL or even a non-Node.js server like go or java. To create a compatible server for replication, we will start a server and implement the correct HTTP routes and replication handlers. We need a push-handler, a pull-handler and for the ongoing changes `pull.stream` we use **Server-Sent Events**. ## Setup ### Start the Replication on the RxDB Client RxDB does not have a specific HTTP-replication plugin because the [replication primitives plugin](./replication.md) is simple enough to start a HTTP replication on top of it. We import the `replicateRxCollection` function and start the replication from there for a single [RxCollection](./rx-collection.md). ```ts // > client.ts import { replicateRxCollection } from 'rxdb/plugins/replication'; const replicationState = await replicateRxCollection({ collection: myRxCollection, replicationIdentifier: 'my-http-replication', push: { /* add settings from below */ }, pull: { /* add settings from below */ } }); ``` ### Start a Node.js process with Express and MongoDB On the server side, we start an express server that has a MongoDB connection and serves the HTTP requests of the client. ```ts // > server.ts import { MongoClient } from 'mongodb'; import express from 'express'; const mongoClient = new MongoClient('mongodb://localhost:27017/'); const mongoConnection = await mongoClient.connect(); const mongoDatabase = mongoConnection.db('myDatabase'); const mongoCollection = await mongoDatabase.collection('myDocs'); const app = express(); app.use(express.json()); /* ... add routes from below */ app.listen(80, () => { console.log(`Example app listening on port 80`) }); ``` ### Implement the Pull Endpoint As first HTTP Endpoint, we need to implement the pull handler. This is used by the RxDB replication to fetch all documents writes that happened after a given `checkpoint`. The `checkpoint` format is not determined by RxDB, instead the server can use any type of changepoint that can be used to iterate across document writes. Here we will just use a unix timestamp `updatedAt` and a string `id` which is the most common used format. When the pull endpoint is called, the server responds with an array of document data based on the given checkpoint and a new checkpoint. Also the server has to respect the batchSize so that RxDB knows when there are no more new documents and the server returns a non-full array. ```ts // > server.ts import { lastOfArray } from 'rxdb/plugins/core'; app.get('/pull', async (req, res) => { const id = req.query.id; const updatedAt = parseFloat(req.query.updatedAt); const documents = await mongoCollection.find({ $or: [ /** * Notice that we have to compare the updatedAt AND the id field * because the updateAt field is not unique and when two documents * have the same updateAt, we can still "sort" them by their id. */ { updatedAt: { $gt: updatedAt } }, { updatedAt: { $eq: updatedAt }, id: { $gt: id } } ] }) .sort({updatedAt: 1, id: 1}) .limit(parseInt(req.query.batchSize, 10)).toArray(); const newCheckpoint = documents.length === 0 ? { id, updatedAt } : { id: lastOfArray(documents).id, updatedAt: lastOfArray(documents).updatedAt }; res.setHeader('Content-Type', 'application/json'); res.end(JSON.stringify({ documents, checkpoint: newCheckpoint })); }); ``` ### Implement the Pull Handler On the client we add the `pull.handler` to the replication setting. The handler requests the correct server url and fetches the documents. ```ts // > client.ts const replicationState = await replicateRxCollection({ /* ... */ pull: { async handler(checkpointOrNull, batchSize){ const updatedAt = checkpointOrNull ? checkpointOrNull.updatedAt : 0; const id = checkpointOrNull ? checkpointOrNull.id : ''; const url = 'https://localhost/pull' + `?updatedAt=${updatedAt}` + `&id=${id}` + `&limit=${batchSize}`; const response = await fetch(url); const data = await response.json(); return { documents: data.documents, checkpoint: data.checkpoint }; } } /* ... */ }); ``` ### Implement the Push Endpoint To send client side writes to the server, we have to implement the `push.handler`. It gets an array of change rows as input and has to return only the conflicting documents that have not been written to the server. Each change row contains a `newDocumentState` and an optional `assumedMasterState`. For [conflict detection](./transactions-conflicts-revisions.md), on the server we first have to detect if the `assumedMasterState` is correct for each row. If yes, we have to write the new document state to the database, otherwise we have to return the "real" master state in the conflict array. The server also creates an `event` that is emitted to the `pullStream$` which is later used in the [pull.stream$](#implement-the-pullstream-endpoint). ```ts // > server.ts import { lastOfArray } from 'rxdb/plugins/core'; import { Subject } from 'rxjs'; // used in the pull.stream$ below let lastEventId = 0; const pullStream$ = new Subject(); app.get('/push', async (req, res) => { const changeRows = req.body; const conflicts = []; const event = { id: lastEventId++, documents: [], checkpoint: null }; for(const changeRow of changeRows){ const realMasterState = await mongoCollection.findOne( {id: changeRow.newDocumentState.id} ); if( realMasterState && !changeRow.assumedMasterState || ( realMasterState && changeRow.assumedMasterState && /* * For simplicity we detect conflicts * on the server by only compare the * updateAt value. * In reality you might want to do a * more complex check or do a * deep-equal comparison. */ realMasterState.updatedAt !== changeRow.assumedMasterState.updatedAt ) ) { // we have a conflict conflicts.push(realMasterState); } else { // no conflict -> write the document await mongoCollection.updateOne( {id: changeRow.newDocumentState.id}, changeRow.newDocumentState ); event.documents.push(changeRow.newDocumentState); event.checkpoint = { id: changeRow.newDocumentState.id, updatedAt: changeRow.newDocumentState.updatedAt }; } } if(event.documents.length > 0){ myPullStream$.next(event); } res.setHeader('Content-Type', 'application/json'); res.end(JSON.stringify(conflicts)); }); ``` :::note For simplicity in this tutorial, we do not use transactions. In reality you should run the full push function inside of a MongoDB transaction to ensure that no other process can mix up the document state while the writes are processed. Also you should call batch operations on MongoDB instead of running the operations for each change row. ::: ### Implement the Push Handler With the push endpoint in place, we can add a `push.handler` to the replication settings on the client. ```ts // > client.ts const replicationState = await replicateRxCollection({ /* ... */ push: { async handler(changeRows){ const rawResponse = await fetch('https://localhost/push', { method: 'POST', headers: { 'Accept': 'application/json', 'Content-Type': 'application/json' }, body: JSON.stringify(changeRows) }); const conflictsArray = await rawResponse.json(); return conflictsArray; } } /* ... */ }); ``` ### Implement the pullStream$ Endpoint While the normal pull handler is used when the replication is in [iteration mode](./replication.md#checkpoint-iteration), we also need a stream of ongoing changes when the replication is in [event observation mode](./replication.md#event-observation). This brings the realtime replication to RxDB where changes on the server or on a client will directly get propagated to the other instances. On the server we have to implement the `pullStream` route and emit the events. We use the `pullStream$` observable from [above](#implement-the-push-endpoint) to fetch all ongoing events and respond them to the client. Here we use Server-Sent-Events (SSE) which is the most commonly used way to stream data from the server to the client. Other method also exist like [WebSockets or Long-Polling](./articles/websockets-sse-polling-webrtc-webtransport.md). ```ts // > server.ts app.get('/pullStream', (req, res) => { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Connection': 'keep-alive', 'Cache-Control': 'no-cache' }); const subscription = pullStream$.subscribe(event => { res.write('data: ' + JSON.stringify(event) + '\n\n'); }); req.on('close', () => subscription.unsubscribe()); }); ``` :::note How to build the `pullStream$` Observable is not part of this tutorial. This heavily depends on your backend and infrastructure. Likely you have to observe the MongoDB event stream. ::: ### Implement the pullStream$ Handler From the client we can observe this endpoint and create a `pull.stream$` observable that emits all events that are sent from the server to the client. The client connects to an url and receives server-sent-events that contain all ongoing writes. ```ts // > client.ts import { Subject } from 'rxjs'; const myPullStream$ = new Subject(); const eventSource = new EventSource( 'http://localhost/pullStream', { withCredentials: true } ); eventSource.onmessage = event => { const eventData = JSON.parse(event.data); myPullStream$.next({ documents: eventData.documents, checkpoint: eventData.checkpoint }); }; const replicationState = await replicateRxCollection({ /* ... */ pull: { /* ... */ stream$: myPullStream$.asObservable() } /* ... */ }); ``` ### pullStream$ RESYNC flag In case the client loses the connection, the EventSource will automatically reconnect but there might have been some changes that have been missed out in the meantime. The replication has to be informed that it might have missed events by emitting a `RESYNC` flag from the `pull.stream$`. The replication will then catch up by switching to the [iteration mode](./replication.md#checkpoint-iteration) until it is in sync with the server again. ```ts // > client.ts eventSource.onerror = () => myPullStream$.next('RESYNC'); ``` The purpose of the `RESYNC` flag is to tell the client that "something might have changed" and then the client can react on that information without having to run operations in an interval. If your backend is not capable of emitting the actual documents and checkpoint in the pull stream, you could just map all events to the `RESYNC` flag. This would make the replication work with a slight performance drawback: ```ts // > client.ts import { Subject } from 'rxjs'; const myPullStream$ = new Subject(); const eventSource = new EventSource( 'http://localhost/pullStream', { withCredentials: true } ); eventSource.onmessage = () => myPullStream$.next('RESYNC'); const replicationState = await replicateRxCollection({ pull: { stream$: myPullStream$.asObservable() } }); ``` ## Missing implementation details In this tutorial we only covered the basics of doing a HTTP replication between RxDB clients and a server. We did not cover the following aspects of the implementation: - Authentication: To authenticate the client on the server, you might want to send authentication headers with the HTTP requests - Skip events on the `pull.stream$` for the client that caused the changes to improve performance. - Version upgrades: You should add a version-flag to the endpoint urls. If you then update the version of your endpoints in any way, your old endpoints should emit a `Code 426` to outdated clients so that they can update their client version. --- ## Microsoft OneDrive Sync import {Steps} from '@site/src/components/steps'; import {BetaBlock} from '@site/src/components/beta-block'; import {HeadlineWithIcon} from '@site/src/components/headline-with-icon'; # Replication with Microsoft OneDrive The `replication-microsoft-onedrive` plugin allows you to replicate your client-side [RxDB](./) database to a folder in the user's Microsoft OneDrive. This enables cross-device [sync](./replication.md) for single users without requiring any backend server. ## Overview The replication uses the Microsoft Graph API. - **[Offline-First](./offline-first.md):** Users can work offline. Changes are synced when they go online. - **No Backend Required:** You don't need to host your own database server. - **Cross-Device:** Users can access their data from multiple devices by signing into the same Microsoft account. - **Realtime Sync:** Uses [WebRTC](./replication-webrtc.md) for peer-to-peer signaling to achieve near real-time updates. Uses the same onedrive folder instead of a signaling-server. ## Usage ### Enable Microsoft Graph API You need to register your application in the [Azure portal](https://portal.azure.com/) and create credentials (OAuth 2.0 Client ID) with `Files.ReadWrite` permissions for your application. ### Authenticate the User Your application must handle the OAuth flow to get an `accessToken` from Microsoft. You can use libraries like `@azure/msal-browser` or `@azure/msal-react`. ### Start Replication Once you have the `accessToken`, you can start the replication. ```ts import { replicateMicrosoftOneDrive } from 'rxdb/plugins/replication-microsoft-onedrive'; const replicationState = await replicateMicrosoftOneDrive({ replicationIdentifier: 'my-app-onedrive-sync', collection: myRxCollection, // RxCollection oneDrive: { authToken: 'USER_ACCESS_TOKEN', folderPath: 'my-app-data/user-1' }, live: true, pull: { batchSize: 60, modifier: doc => doc // (optional) modify invalid data }, push: { batchSize: 60, modifier: doc => doc // (optional) modify before sending } }); // Observe replication states replicationState.error$.subscribe(err => { console.error('Replication error:', err); }); replicationState.awaitInitialReplication().then(() => { console.log('Initial replication done'); }); ``` ## Signaling & WebRTC Microsoft OneDrive does not provide real-time events for file changes that a client can easily subscribe to in the browser. If a user changes data on **User Device A**, **User Device B** would not know about it until it periodically polls the API. To achieve real-time updates, this plugin uses **WebRTC** to signal changes between connected devices. 1. Devices create "signal files" in a `signaling` subfolder on OneDrive. 2. Other devices detect these files, read the WebRTC connection data, and establish a direct P2P connection with each other. 3. When a device makes a write, it sends a "RESYNC" signal via WebRTC to all connected peers to notify them about the change. ### Polyfill for Node.js WebRTC is native in browsers but requires a polyfill in Node.js. ```ts import wrtc from 'node-datachannel/polyfill'; // or 'wrtc' package // ... const replicationState = await replicateMicrosoftOneDrive({ // ... signalingOptions: { wrtc // Pass the polyfill here } }); ``` ## Options ### oneDrive - **authToken** `string`: The valid access token associated with the user. - **folderPath** `string`: The path to the folder in Microsoft OneDrive where data should be stored. - The plugin will ensure this folder exists. - It must **not** be the root folder. - It creates subfolders `docs` (for data) and `signaling` (for WebRTC). - **apiEndpoint** `string` (optional): Defaults to `https://graph.microsoft.com/v1.0/me/drive`. Useful for mocking or proxies. - **transactionTimeout** `number` (optional): Default `10000` (10s). The plugin uses a `transaction.json` file in OneDrive to ensure data integrity during writes. This is the timeout after which a lock is considered stale. ### pull & push Standard RxDB [Replication Options](./replication.md) for batch size, modifiers, etc. ## Technical Details ### File Mapping - Each RxDB document corresponds to **one JSON file** in the `docs` subfolder. - The filename is `[primaryKey].json`. - This simple mapping makes it easy to inspect or backup data manually. ### Checkpointing - The replication relies on the `lastModifiedDateTime` of files in Microsoft OneDrive. ### Conflict Resolution - Conflicts are handled using the standard RxDB [conflict handling](./replication.md#conflict-handling) strategies. - The plugin assumes a master-slave replication pattern where the client (RxDB) merges changes. - If the `transaction.json` file is locked by another device, the write retries until the lock is released or times out. ## Limitations - **Rate Limits:** Microsoft Graph API has strict rate limits. The plugin attempts to handle 429 errors with exponential backoff, but heavy concurrent writes might hit these limits. - **Latency:** Changes take time to propagate and appear in listings (eventual consistency), which the plugin handles internally. - **Signaling Delay:** The initial WebRTC handshake requires writing and reading files from OneDrive, which can take a few seconds. Once connected, signaling is instant. ## Testing For testing, it is recommended to use [microsoft-onedrive-mock](https://github.com/pubkey/microsoft-onedrive-mock). It simulates the Microsoft Graph API so you can run tests without real credentials. --- ## MongoDB Realtime Sync Engine for Local-First Apps import {Tabs} from '@site/src/components/tabs'; import {Steps} from '@site/src/components/steps'; import {VideoBox} from '@site/src/components/video-box'; import {RxdbMongoDiagramPlain} from '@site/src/components/mongodb-sync'; import {HeadlineWithIcon} from '@site/src/components/headline-with-icon'; # MongoDB Replication Plugin The [MongoDB](https://www.mongodb.com/) Replication Plugin for RxDB delivers seamless, two-way synchronization between [MongoDB](./rx-storage-mongodb.md) and RxDB, enabling [real-time](./articles/realtime-database.md) updates and [offline-first](./offline-first.md) functionality for your applications. Built on **MongoDB Change Streams**, it supports both Atlas and self-hosted deployments, ensuring your data stays consistent across every device and service. Behind the scenes, the plugin is powered by the RxDB [Sync Engine](./replication.md), which manages the complexities of real-world data replication for you. It automatically handles [conflict detection and resolution](./transactions-conflicts-revisions.md), maintains precise checkpoints for incremental updates, and gracefully manages transitions between offline and online states. This means you don't need to manually implement retry logic, reconcile divergent changes, or worry about data loss during connectivity drops, the Sync Engine ensures consistency and reliability in every sync cycle. ## Key Features - **Two-way replication** between MongoDB and RxDB collections - **Offline-first support** with automatic incremental re-sync - **Incremental updates** via MongoDB Change Streams - **Conflict resolution** handled by the RxDB Sync Engine - **Atlas and self-hosted support** for replica sets and sharded clusters ## Architecture Overview The plugin operates in a three-tier architecture: Clients connect to [RxServer](./rx-server.md), which in turn connects to MongoDB. RxServer streams changes from MongoDB to connected clients and pushes client-side updates back to MongoDB. For the client side, RxServer exposes a [replication endpoint](./rx-server.md#replication-endpoint) over WebSocket or HTTP, which your RxDB-powered applications can consume. The following diagram illustrates the flow of updates between clients, RxServer, and MongoDB in a live synchronization setup: :::note The MongoDB Replication Plugin is optimized for Node.js environments (e.g., when RxDB runs within RxServer or other backend services). Direct connections from browsers or mobile apps to MongoDB are not supported because MongoDB does not use HTTP as its wire protocol and requires a driver-level connection to a replica set or sharded cluster. ::: ## Setting up the Client-RxServer-MongoDB Sync ### Install the Client Dependencies In your JavaScript project, install the RxDB libraries and the MongoDB node.js driver: ```npm install rxdb rxdb-server mongodb --save``` ### Set up a MongoDB Server As first step, you need access to a running MongoDB Server. This can be done by either running a server locally or using the Atlas Cloud. Notice that we need to have a [replica set](https://www.mongodb.com/docs/manual/tutorial/deploy-replica-set/) because only on these, the MongoDB changestream can be used. ### Shell If you have installed MongoDB locally, you can start the server with this command: ```mongod --replSet rs0 --bind_ip_all``` ### Docker If you have docker installed, you can start a container that runs the MongoDB server: ```docker run -p 27017:27017 -p 27018:27018 -p 27019:27019 --rm --name rxdb-mongodb mongo:8.0.4 mongod --replSet rs0 --bind_ip_all``` ### MongoDB Atlas Learn here how to create a MongoDB atlas account and how to start a MongoDB cluster that runs in the cloud:
After this step you should have a valid connection string that points to a running MongoDB Server like `mongodb://localhost:27017/`. ### Create a MongoDB Database and Collection On your MongoDB server, make sure to create a database and a collection. ```ts //> server.ts import { MongoClient } from 'mongodb'; const mongoClient = new MongoClient( 'mongodb://localhost:27017/?directConnection=true' ); const mongoDatabase = mongoClient.db('my-database'); await mongoDatabase.createCollection('my-collection', { changeStreamPreAndPostImages: { enabled: true } }); ``` :::note To observe document deletions on the changestream, `changeStreamPreAndPostImages` must be enabled. This is not required if you have an insert/update-only collection where no documents are deleted ever. ::: ### Create a RxDB Database and Collection Now we create an RxDB [database](./rx-database.md) and a [collection](./rx-collection.md). In this example the [memory storage](./rx-storage-memory.md), in production you would use a [persistent storage](./rx-storage.md) instead. ```ts //> server.ts import { createRxDatabase, addRxPlugin } from 'rxdb'; import { getRxStorageMemory } from 'rxdb/plugins/storage-memory'; // Create server-side RxDB instance const db = await createRxDatabase({ name: 'serverdb', storage: getRxStorageMemory() }); // Add your collection schema await db.addCollections({ humans: { schema: { version: 0, primaryKey: 'passportId', type: 'object', properties: { passportId: { type: 'string', maxLength: 100 }, firstName: { type: 'string' }, lastName: { type: 'string' } }, required: ['passportId', 'firstName', 'lastName'] } } }); ``` ### Sync the Collection with the MongoDB Server Now we can start a [replication](./replication.md) that does a two-way replication between the RxDB Collection and the MongoDB Collection. ```ts //> server.ts import { replicateMongoDB } from 'rxdb/plugins/replication-mongodb'; const replicationState = replicateMongoDB({ mongodb: { collectionName: 'my-collection', connection: 'mongodb://localhost:27017', databaseName: 'my-database' }, collection: db.humans, replicationIdentifier: 'humans-mongodb-sync', pull: { batchSize: 50 }, push: { batchSize: 50 }, live: true }); ``` :::note You can do many things with the replication state The `RxMongoDBReplicationState` which is returned from `replicateMongoDB()` allows you to run all functionality of the normal [RxReplicationState](./replication.md) like observing errors or doing start/stop operations. ::: ### Start a RxServer Now that we have a RxDatabase and Collection that is replicated with MongoDB, we can spawn a [RxServer](./rx-server.md) on top of it. This server can then be used by client devices to connect. ```ts //> server.ts import { createRxServer } from 'rxdb-server/plugins/server'; import { RxServerAdapterExpress } from 'rxdb-server/plugins/adapter-express'; const server = await createRxServer({ database: db, adapter: RxServerAdapterExpress, port: 8080, cors: '*' }); const endpoint = server.addReplicationEndpoint({ name: 'humans', collection: db.humans }); console.log('Replication endpoint:', `http://localhost:8080/${endpoint.urlPath}`); // do not forget to start the server! await server.start(); ``` ### Sync a Client with the RxServer On the client-side we create the exact same RxDatabase and collection and then replicate it with the replication endpoint of the RxServer. ```ts //> client.ts import { createRxDatabase } from 'rxdb'; import { getRxStorageDexie } from 'rxdb/plugins/storage-dexie'; import { replicateServer } from 'rxdb-server/plugins/replication-server'; const db = await createRxDatabase({ name: 'mydb-client', storage: getRxStorageDexie() }); await db.addCollections({ humans: { schema: { version: 0, primaryKey: 'passportId', type: 'object', properties: { passportId: { type: 'string', maxLength: 100 }, firstName: { type: 'string' }, lastName: { type: 'string' } }, required: ['passportId', 'firstName', 'lastName'] } } }); // Start replication to the RxServer endpoint printed by the server: // e.g. http://localhost:8080/humans/0 const replicationState = replicateServer({ replicationIdentifier: 'humans-rxserver', collection: db.humans, url: 'http://localhost:8080/humans/0', live: true, pull: { batchSize: 50 }, push: { batchSize: 50 } }); ```
## Follow Up - Try it out with the RxDB-MongoDB [example repository](https://github.com/pubkey/rxdb-mongodb-sync-example) - Read [From Local to Global: Scalable Edge Apps with RxDB + MongoDB][1] [1]: https://www.mongodb.com/company/blog/innovation/from-local-global-scalable-edge-apps-rxdb - [Replication API Reference](./replication.md) - [RxServer Documentation](./rx-server.md) - Join our [Discord Forum](./chat) for questions and feedback --- ## RxDB & NATS - Realtime Sync import {Steps} from '@site/src/components/steps'; import {HeadlineWithIcon} from '@site/src/components/headline-with-icon'; # Replication with NATS With this RxDB plugin you can run a two-way realtime replication with a [NATS](https://nats.io/) server. The replication itself uses the [RxDB Sync Engine](./replication.md) which handles conflicts, errors and retries. On the client side the official [NATS npm package](https://www.npmjs.com/package/nats) is used to connect to the NATS server. NATS is a messaging system that by itself does not have a validation or granulary access control build in. Therefore it is not recommended to directly replicate the NATS server with an untrusted RxDB client application. Instead you should replicated from NATS to your Node.js server side RxDB database. ## Precondition For the replication endpoint the NATS cluster must have enabled [JetStream](https://docs.nats.io/nats-concepts/jetstream) and store all message data as [structured JSON](https://docs.nats.io/using-nats/developer/sending/structure). The easiest way to start a compatible NATS server is to use the official docker image: ```docker run --rm --name rxdb-nats -p 4222:4222 nats:2.9.17 -js``` ## Usage ### Install the nats package ```bash npm install nats --save ``` ### Start the Replication To start the replication, import the `replicateNats()` method from the RxDB plugin and call it with the collection that must be replicated. The replication runs *per [RxCollection](./rx-collection.md)*, you can replicate multiple RxCollections by starting a new replication for each of them. ```typescript import { replicateNats } from 'rxdb/plugins/replication-nats'; const replicationState = replicateNats({ collection: myRxCollection, replicationIdentifier: 'my-nats-replication-collection-A', // in NATS, each stream need a name streamName: 'stream-for-replication-A', /** * The subject prefix determines how the documents are stored in NATS. * For example the document with id 'alice' * will have the subject 'foobar.alice' */ subjectPrefix: 'foobar', connection: { servers: 'localhost:4222' }, live: true, pull: { batchSize: 30 }, push: { batchSize: 30 } }); ``` ## Handling deletes RxDB requires you to never [fully delete documents](./replication.md#data-layout-on-the-server). This is needed to be able to replicate the deletion state of a document to other instances. The NATS replication will set a boolean `_deleted` field to all documents to indicate the deletion state. You can change this by setting a different `deletedField` in the sync options. --- ## Seamless P2P Data Sync # The RxDB Plugin `replication-p2p` has been renamed to `replication-webrtc` The new documentation page has been moved to [here](./replication-webrtc.md) --- ## RxDB Server Replication The *Server Replication Plugin* connects to the [replication](./replication.md) endpoint of an [RxDB Server Replication Endpoint](./rx-server.md#replication-endpoint) and replicates data between the client and the server. ## Usage The replication server plugin is imported from the `rxdb-server` npm package. Then you start the replication with a given collection and endpoint url by calling `replicateServer()`. ```ts import { replicateServer } from 'rxdb-server/plugins/replication-server'; const replicationState = await replicateServer({ collection: usersCollection, replicationIdentifier: 'my-server-replication', // endpoint url with the servers collection // schema version at the end url: 'http://localhost:80/users/0', headers: { Authorization: 'Bearer S0VLU0UhI...' }, push: {}, pull: {}, live: true }); ``` ## outdatedClient$ When you update your schema at the server and run a migration, you end up with a different replication url that has a new schema version number at the end. Your clients might still be running an old version of your application that will no longer be compatible with the endpoint. Therefore when the client tries to call a server endpoint with an outdated schema version, the `outdatedClient$` observable emits to tell your client that the application must be updated. With that event you can tell the client to update the application. On browser application you might want to just reload the page on that event: ```ts replicationState.outdatedClient$.subscribe(() => { location.reload(); }); ``` ## unauthorized$ When you clients auth data is not valid (or no longer valid), the server will no longer accept any requests from you client and inform the client that the auth headers must be updated. The `unauthorized$` observable will emit and expects you to update the headers accordingly so that following requests will be accepted again. ```ts replicationState.unauthorized$.subscribe(() => { replicationState.setHeaders({ Authorization: 'Bearer S0VLU0UhI...' }); }); ``` ## forbidden$ When you client behaves wrong in any case, like update non-allowed values or changing documents that it is not allowed to, the server will drop the connection and the replication state will emit on the `forbidden$` observable. It will also automatically stop the replication so that your client does not accidentally DOS attack the server. ```ts replicationState.forbidden$.subscribe(() => { console.log('Client is behaving wrong'); }); ``` ## Custom EventSource implementation For the server send events, the [eventsource](https://github.com/EventSource/eventsource) npm package is used instead of the native `EventSource` API. We need this because the native browser API does not support sending headers with the request which is required by the server to parse the auth data. If the eventsource package does not work for you, you can set an own implementation when creating the replication. ```ts const replicationState = await replicateServer({ /* ... */ eventSource: MyEventSourceConstructor /* ... */ }); ``` --- ## Supabase Replication Plugin for RxDB - Real-Time, Offline-First Sync import {Tabs} from '@site/src/components/tabs'; import {Steps} from '@site/src/components/steps'; import {VideoBox} from '@site/src/components/video-box'; import {RxdbMongoDiagramPlain} from '@site/src/components/mongodb-sync'; import {HeadlineWithIcon} from '@site/src/components/headline-with-icon'; # Supabase Replication Plugin The **Supabase Replication Plugin** for RxDB delivers seamless, two-way synchronization between your RxDB collections and a Supabase (Postgres) table. It uses **PostgREST** for pull/push and **Supabase Realtime** (logical replication) to stream live updates, so your data stays consistent across devices with first-class [local-first](./articles/local-first-future.md), offline-ready support. Under the hood, the plugin is powered by the RxDB [Sync Engine](./replication.md). It handles checkpointed incremental pulls, robust retry logic, and [conflict detection/resolution](./transactions-conflicts-revisions.md) for you. You focus on features, and RxDB takes care of sync.
## Key Features of the RxDB-Supabase Plugin - **Cloud Only Backend**: No self-hosted server required. Client devices directly sync with the Supabase Servers. - **Two-way replication** between Supabase tables and RxDB [collections](./rx-collection.md) - **Offline-first** with resumable, incremental sync - **Live updates** via Supabase Realtime channels - **Conflict resolution** handled by the [RxDB Sync Engine](./replication.md) - **Works in browsers and Node.js** with `@supabase/supabase-js` ## Architecture Overview Clients connect **directly to Supabase** using the official JS client. The plugin: - **Pulls** documents over PostgREST using a checkpoint `(modified, id)` and deterministic ordering. - **Pushes** inserts/updates using optimistic concurrency guards. - **Streams** new changes using Supabase Realtime so live replication stays up to date. :::note Because Supabase exposes Postgres over **HTTP/WebSocket**, you can safely replicate from browsers and mobile apps. Protect your data with **Row Level Security (RLS)** policies; use the **anon** key on clients and the **service role** key only on trusted servers. ::: ## Setting up RxDB ↔ Supabase Sync ### Install Dependencies ```bash npm install rxdb @supabase/supabase-js ``` ### Create a Supabase Project & Table In your supabase project, create a new table. Ensure that: - The primary key must have the type text (Primary keys must always be strings in RxDB) - You have an modified field which stores the last modification timestamp of a row (default is `_modified`) - You have a boolean field which stores if a row should is "deleted". You should not hard-delete rows in Supabase, because clients would miss the deletion if they haven't been online at the deletion time. Instead, use a deleted `boolean` to mark rows as deleted. This way all clients can still pull the deletion, and RxDB will hide the complexity on the client side. - Enable the realtime observation of writes to the table. Here is an example for a "human" table: ```sql create extension if not exists moddatetime schema extensions; create table "public"."humans" ( "passportId" text primary key, "firstName" text not null, "lastName" text not null, "age" integer, "_deleted" boolean DEFAULT false NOT NULL, "_modified" timestamp with time zone DEFAULT now() NOT NULL ); -- auto-update the _modified timestamp CREATE TRIGGER update_modified_datetime BEFORE UPDATE ON public.humans FOR EACH ROW EXECUTE FUNCTION extensions.moddatetime('_modified'); -- add a table to the publication so we can subscribe to changes alter publication supabase_realtime add table "public"."humans"; ``` ### Create an RxDB Database & Collection Create a normal RxDB database, then add a collection whose **schema mirrors your Supabase table**. The **primary key must match** (same column name and type), and fields should be **top-level simple types** (string/number/boolean). You don’t need to model server internals: the plugin maps the server’s \_deleted flag to doc.\_deleted automatically, and \_modified is optional in your schema (the plugin strips it on push and will include it on pull only if you define it). For browsers use a persistent storage like Localstorage or IndexedDB. For tests you can use the [in-memory storage](./rx-storage-memory.md). ```ts // client import { createRxDatabase } from 'rxdb/plugins/core'; import { getRxStorageLocalstorage } from 'rxdb/plugins/storage-localstorage'; export const db = await createRxDatabase({ name: 'mydb', storage: getRxStorageLocalstorage() }); await db.addCollections({ humans: { schema: { version: 0, primaryKey: 'passportId', type: 'object', properties: { passportId: { type: 'string', maxLength: 100 }, firstName: { type: 'string' }, lastName: { type: 'string' }, age: { type: 'number' } }, required: ['passportId', 'firstName', 'lastName'] } } }); ``` ### Create the Supabase Client Make a single Supabase client and reuse it across your app. In the browser, use the anon key (RLS-protected). On trusted servers you may use the service role key, but never ship that to clients. #### Production ```ts //> client import { createClient } from '@supabase/supabase-js'; export const supabase = createClient( 'https://xyzcompany.supabase.co', 'eyJhbGciOi...' ); ``` #### Vite ```ts //> client import { createClient } from '@supabase/supabase-js'; export const supabase = createClient( import.meta.env.VITE_SUPABASE_URL!, // e.g. https://xyzcompany.supabase.co import.meta.env.VITE_SUPABASE_ANON_KEY! // anon key for browsers // optional options object here ); ``` #### Local Development ```ts //> client import { createClient } from '@supabase/supabase-js'; export const supabase = createClient( 'http://127.0.0.1:54321', 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...' ); ``` ### Start Replication Connect your RxDB collection to the Supabase table to start the replication. ```ts //> client import { replicateSupabase } from 'rxdb/plugins/replication-supabase'; const replication = replicateSupabase({ tableName: 'humans', client: supabase, collection: db.humans, replicationIdentifier: 'humans-supabase', live: true, pull: { batchSize: 50, // optional: shape incoming docs modifier: (doc) => { // map nullable age-field if (!doc.age) delete doc.age; return doc; } // optional: customize the pull query before fetching queryBuilder: ({ query }) => { // Add filters, joins, or other PostgREST query modifiers // This runs before checkpoint filtering and ordering return query.eq("status", "active"); }, }, push: { batchSize: 50 }, // optional overrides if your column names differ: // modifiedField: '_modified', // deletedField: '_deleted' }); // (optional) observe errors and wait for the first sync barrier replication.error$.subscribe(err => console.error('[replication]', err)); await replication.awaitInitialReplication(); ``` :::note Nullable values must be mapped Supabase returns `null` for nullable columns, but in RxDB you often model those fields as optional (i.e., they can be undefined/missing). To avoid schema errors, map `null` → `undefined` in the `pull.modifier` (usually by deleting the key). ::: ## Using Joins You can use the `pull.queryBuilder` to use joins and also pull data from related tables. To do that, you have to create a **new** query object in the `pull.queryBuilder` with the `.select()` method and return it. ```ts const replication = replicateSupabase({ pull: { queryBuilder: (/* ignore the passed query instance from here */) => { /** * Create a totally new query instance * and return that. */ return supabase.from('humans').select('*, pets(*), toys(*)'); } } }); ``` ### Do other things with the replication state The `RxSupabaseReplicationState` which is returned from `replicateSupabase()` allows you to run all functionality of the normal [RxReplicationState](./replication.md). ## FAQ
Why use Supabase and RxDB as a cloud document database for Node.js and TypeScript? Supabase and RxDB offer the best of both worlds for Node.js and TypeScript applications. Supabase provides a powerful PostgreSQL backend. You can use PostgreSQL as a document database by storing JSON data. RxDB provides a reactive local document store. You achieve real-time synchronization between the local document store and the Supabase backend. You benefit from strong TypeScript typing on the client and robust SQL querying on the server.
How to connect an anonymous key to a Supabase project? You connect an anonymous key to a Supabase project by initializing the official `@supabase/supabase-js` client utilizing your project's `SUPABASE_URL` and `SUPABASE_ANON_KEY`. In frontend applications interacting with the **[RxDB Supabase Replication](./replication.md)** plugin, you must inject the `anon` key, while simultaneously configuring strict Row Level Security (RLS) policies within your Supabase PostgreSQL backend to prevent unauthorized data manipulation.
Does Supabase support full offline sync and [CRDT](./crdt.md) capabilities? Natively, the Supabase JavaScript client does not support advanced [offline-first](./offline-first.md) synchronization pipelines or complex Conflict-free Replicated Data Type (CRDT) architectures. To implement full offline sync capable of continuous background disconnected writes, you must attach the **[RxDB](./rx-database.md)** Supabase Replication Plugin. RxDB acts as the offline-first local CRDT-like cache, deferring all local mutations into a unified outbound queue until the Supabase TCP connection is restored.
Should Row Level Security (RLS) be enabled when using Supabase real-time sync? Yes, Row Level Security (RLS) is strictly mandatory whenever you expose a Supabase database directly to the frontend. Without RLS, the anonymous `anon` key used by the **[RxDB](./rx-database.md)** client grants full read and write access to your entire PostgreSQL cluster. You must configure RLS policies that enforce `auth.uid() = user_id` checks to guarantee clients only replicate and mutate their own specific documents.
How does Supabase Realtime architecture work? Supabase Realtime acts as an Elixir-based WebSocket broadcasting server that taps directly into PostgreSQL's logical replication stream. When a row changes on the database, the Realtime server parses the WAL (Write-Ahead Log) and pushes the event down to subscribed clients. The **[RxDB Supabase Replication](./replication.md)** plugin leverages this WebSocket channel strictly for live change detection, triggering rapid localized pulls over PostgREST to guarantee no data is dropped during connection turbulence.
## Follow Up - **Replication API Reference:** Learn the core concepts and lifecycle hooks - [Replication](./replication.md) - **Offline-First Guide:** Caching, retries, and conflict strategies - [Local-First](./articles/local-first-future.md) - **Supabase Crash Course:** Build a React Native app with RxDB - [Webinar Video](https://www.youtube.com/watch?v=F051fX1z6lE) - Row Level Security (RLS) - https://supabase.com/docs/guides/auth/row-level-security - Realtime - https://supabase.com/docs/guides/realtime - Local dev with the Supabase CLI - https://supabase.com/docs/guides/cli - **Community:** Questions or feedback? Join our Discord - [Chat](./chat) --- ## WebRTC P2P Replication with RxDB - Sync Browsers and Devices import {Steps} from '@site/src/components/steps'; import {HeadlineWithIcon} from '@site/src/components/headline-with-icon'; # P2P WebRTC Replication with RxDB WebRTC P2P data connections are revolutionizing real-time web and mobile development by **eliminating central servers** in scenarios where clients can communicate directly. With the **RxDB** [Sync Engine](./replication.md), you can sync your local database state across multiple browsers or devices via **WebRTC P2P (Peer-to-Peer)** connections, ensuring scalable, secure, and **low-latency** data flows without traditional server bottlenecks. ## What is WebRTC? [WebRTC](https://developer.mozilla.org/en-US/docs/Web/API/WebRTC_API) stands for Web [Real-Time](./articles/realtime-database.md) Communication. It is an open standard that enables browsers and native apps to exchange audio, video, or **arbitrary data** directly between peers, bypassing a central server after the initial connection is established. WebRTC uses NAT traversal techniques like [ICE](https://developer.liveswitch.io/liveswitch-server/guides/what-are-stun-turn-and-ice.html) (Interactive Connectivity Establishment) to punch through firewalls and establish direct links. This peer-to-peer nature drastically reduces latency while maintaining **high security** and **end-to-end encryption** capabilities. For a deeper look at comparing WebRTC with **WebSockets** and **WebTransport**, you can read our [comprehensive overview](./articles/websockets-sse-polling-webrtc-webtransport.md). While WebSockets or WebTransport often work in client-server contexts, WebRTC offers direct peer-to-peer connections ideal for fully decentralized data flows.
## Benefits of P2P Sync with WebRTC Compared to Client-Server Architecture 1. **Reduced Latency** - By skipping a central server hop, data travels directly from one client to another, minimizing round-trip times and improving responsiveness. 2. **Scalability** - New peers can join without overloading a central infrastructure. The sync overhead increases linearly with the number of connections rather than requiring a massive server cluster. 3. **Privacy & Ownership** - Data stays within the user’s devices, avoiding risks tied to storing data on third-party servers. This design aligns well with [local-first](./articles/local-first-future.md) or "[zero-latency](./articles/zero-latency-local-first.md)" apps. 4. **Resilience** - In some scenarios, if the central server is unreachable, P2P connections remain operational (assuming a functioning signaling path). Apps can still replicate data among local networks like when they are in the same Wifi or LAN. 5. **Cost Savings** - Reducing the reliance on a high-bandwidth server can cut hosting and bandwidth expenses, particularly in high-traffic or IoT-style use cases.
## Peer-to-Peer (P2P) WebRTC Replication with the RxDB JavaScript Database Traditionally, real-time data synchronization depends on **centralized servers** to manage and distribute updates. In contrast, RxDB’s WebRTC P2P replication allows data to flow **directly** among clients, removing the server as a data store. This approach is **live** and **fully decentralized**, requiring only a [signaling server](#signaling-server) for initial discovery: - **No master-slave** concept - each peer hosts its own local RxDB. - Clients ([browsers](./articles/browser-database.md), devices) connect to each other via WebRTC data channels. - The [RxDB replication protocol](./replication.md) then handles pushing/pulling document changes across peers. Because RxDB is a NoSQL database and the replication protocol is straightforward, setting up robust P2P sync is far **easier** than orchestrating a complex client-server database architecture. ## Using RxDB with the WebRTC Replication Plugin Before you use this plugin, make sure that you understand how [WebRTC works](https://developer.mozilla.org/en-US/docs/Web/API/WebRTC_API). Here we build a todo-app that replicates todo-entries between clients:
You can find a fully build example of this at the [RxDB Quickstart Repository](https://github.com/pubkey/rxdb-quickstart) which you can also [try out online](https://pubkey.github.io/rxdb-quickstart/). First you create the [database](./rx-database.md) and then you can configure the replication: ### Create the Database and Collection Here we create a database with the [localstorage](./rx-storage-localstorage.md) based storage that stores data inside of the [LocalStorage API](./articles/localstorage.md) in a browser. RxDB has a wide [range of storages](./rx-storage.md) for other JavaScript runtimes. ```ts import { createRxDatabase } from 'rxdb/plugins/core'; import { getRxStorageLocalstorage } from 'rxdb/plugins/storage-localstorage'; const db = await createRxDatabase({ name: 'myTodoDB', storage: getRxStorageLocalstorage() }); await db.addCollections({ todos: { schema: { title: 'todo schema', version: 0, type: 'object', primaryKey: 'id', properties: { id: { type: 'string', maxLength: 100 }, title: { type: 'string' }, done: { type: 'boolean', default: false }, created: { type: 'string', format: 'date-time' } }, required: ['id', 'title', 'done'] } } }); // insert an example document await db.todos.insert({ id: 'todo-1', title: 'P2P demo task', done: false, created: new Date().toISOString() }); ``` ### Import the WebRTC replication plugin ```ts import { replicateWebRTC, getConnectionHandlerSimplePeer } from 'rxdb/plugins/replication-webrtc'; ``` ### Start the P2P replication To start the replication you have to call `replicateWebRTC` on the [collection](./rx-collection.md). As options you have to provide a `topic` and a connection handler function that implements the `P2PConnectionHandlerCreator` interface. As default you should start with the `getConnectionHandlerSimplePeer` method which uses the [simple-peer](https://github.com/feross/simple-peer) library and comes shipped with RxDB. ```ts const replicationPool = await replicateWebRTC( { // Start the replication for a single collection collection: db.todos, // The topic is like a 'room-name'. All clients with the same topic // will replicate with each other. In most cases you want to use // a different topic string per user. Also you should prefix the topic with // a unique identifier for your app, to ensure // you do not let your users connect // with other apps that also use the RxDB P2P Replication. topic: 'my-users-pool', /** * You need a collection handler to be able to create WebRTC connections. * Here we use the simple peer handler which * uses the 'simple-peer' npm library. * To learn how to create a custom connection handler, read the source code, * it is pretty simple. */ connectionHandlerCreator: getConnectionHandlerSimplePeer({ // Set the signaling server url. // You can use the server provided by RxDB for tryouts, // but in production you should use your own server instead. signalingServerUrl: 'wss://signaling.rxdb.info/', // only in Node.js, we need the wrtc library // because Node.js does not contain the WebRTC API. wrtc: require('node-datachannel/polyfill'), // only in Node.js, we need the WebSocket library // because Node.js does not contain the WebSocket API. webSocketConstructor: require('ws').WebSocket }), pull: {}, push: {} } ); ``` Notice that in difference to the other [replication plugins](./replication.md), the WebRTC replication returns a `replicationPool` instead of a single `RxReplicationState`. The `replicationPool` contains all replication states of the connected peers in the P2P network. ### Observe Errors To ensure we log out potential errors, observe the `error$` observable of the pool. ```ts replicationPool.error$.subscribe(err => console.error('WebRTC Error:', err)); ``` ### Stop the Replication You can also dynamically stop the replication. ```ts replicationPool.cancel(); ``` ## Live replications The WebRTC replication is **always live** because there can not be a one-time sync when it is always possible to have new Peers that join the connection pool. Therefore you cannot set the `live: false` option like in the other replication plugins. ## Signaling Server For P2P replication to work with the RxDB WebRTC Replication Plugin, a [signaling server](https://developer.mozilla.org/en-US/docs/Web/API/WebRTC_API/Signaling_and_video_calling) is required. The signaling server helps peers discover each other and establish connections. RxDB ships with a default signaling server that can be used with the simple-peer connection handler. This server is made for demonstration purposes and tryouts. It is not reliable and might be offline at any time. In production you must always use your own signaling server instead! Creating a basic signaling server is straightforward. The provided example uses 'socket.io' for WebSocket communication. However, in production, you'd want to create a more robust signaling server with authentication and additional logic to suit your application's needs. Here is a quick example implementation of a signaling server that can be used with the connection handler from `getConnectionHandlerSimplePeer()`: ```ts import { startSignalingServerSimplePeer } from 'rxdb/plugins/replication-webrtc'; const serverState = await startSignalingServerSimplePeer({ port: 8080 // <- port }); ``` For custom signaling servers with more complex logic, you can check the [source code of the default one](https://github.com/pubkey/rxdb/blob/master/src/plugins/replication-webrtc/signaling-server.ts). ## Peer Validation By default the replication will replicate with every peer the signaling server tells them about. You can prevent invalid peers from replication by passing a custom `isPeerValid()` function that either returns `true` on valid peers and `false` on invalid peers. ```ts const replicationPool = await replicateWebRTC( { /* ... */ isPeerValid: async (peer) => { return true; } pull: {}, push: {} /* ... */ } ); ``` ## Conflict detection in WebRTC replication RxDB's conflict handling works by detecting and resolving conflicts that may arise when multiple clients in a decentralized database system attempt to modify the same data concurrently. A **custom conflict handler** can be set up, which is a plain JavaScript function. The conflict handler is run on each replicated document write and resolves the conflict if required. [Find out more about RxDB conflict handling here](https://rxdb.info/transactions-conflicts-revisions.html) ## Known problems ### SimplePeer requires to have `process.nextTick()` In the browser you might not have a process variable or process.nextTick() method. But the [simple peer](https://github.com/feross/simple-peer) uses that so you have to polyfill it. In webpack you can use the `process/browser` package to polyfill it: ```js const plugins = [ /* ... */ new webpack.ProvidePlugin({ process: 'process/browser', }) /* ... */ ]; ``` In angular or other libraries you can add the polyfill manually: ```js window.process = { nextTick: (fn, ...args) => setTimeout(() => fn(...args)), }; ``` ### Polyfill the WebSocket and WebRTC API in Node.js While all modern browsers support the WebRTC and WebSocket APIs, they is missing in Node.js which will throw the error `No WebRTC support: Specify opts.wrtc option in this environment`. Therefore you have to polyfill it with a compatible WebRTC and WebSocket polyfill. It is recommended to use the [node-datachannel package](https://github.com/murat-dogan/node-datachannel/tree/master/src/polyfill) for WebRTC which **does not** come with RxDB but has to be installed before via `npm install node-datachannel --save`. For the Websocket API use the `ws` package that is included into RxDB. ```ts import PERFORMANCE_DATA_NODEchannelPolyfill from 'node-datachannel/polyfill'; import { WebSocket } from 'ws'; const replicationPool = await replicateWebRTC( { /* ... */ connectionHandlerCreator: getConnectionHandlerSimplePeer({ signalingServerUrl: 'wss://example.com:8080', wrtc: PERFORMANCE_DATA_NODEchannelPolyfill, webSocketConstructor: WebSocket }), pull: {}, push: {} /* ... */ } ); ``` ## Storing replicated data encrypted on client device Storing replicated data encrypted on client devices using the RxDB Encryption Plugin is a pivotal step towards bolstering **data security** and **user privacy**. The WebRTC replication plugin seamlessly integrates with the [RxDB encryption plugins](./encryption.md), providing a robust solution for encrypting sensitive information before it's stored locally. By doing so, it ensures that even if unauthorized access to the device occurs, the data remains protected and unintelligible without the encryption key (or password). This approach is particularly vital in scenarios where user-generated content or confidential data is replicated across devices, as it empowers users with control over their own data while adhering to stringent security standards. [Read more about the encryption plugins here](./encryption.md). ## FAQ
How can WebRTC enable real-time peer-to-peer communications between browsers? WebRTC enables true peer-to-peer (P2P) communication by establishing direct UDP/TCP data channels between browsers, completely bypassing centralized database architectures. Because the WebRTC connection requires initial IP discovery, clients must briefly connect to a centralized WebSocket Signaling Server to exchange SDP offers and ICE candidates. Once peered, the **[RxDB WebRTC Replication](./replication.md)** plugin streams NoSQL document diffs and [CRDT](./crdt.md) operations instantly across the channel, providing decentralized real-time sync with absolute zero cloud latency.
Which distributed database services offer peer discovery and sync plugins? RxDB offers comprehensive peer discovery and sync plugins for distributed applications. The WebRTC replication plugin facilitates direct peer-to-peer data synchronization. A signaling server handles initial peer discovery and connection establishment. You connect browsers and mobile apps without a central database server. The sync engine automatically replicates local changes across all discovered peers.
What are the top databases that sync directly between devices without cloud dependency? Very few databases support true decentralized peer-to-peer (P2P) synchronization. **[RxDB](./rx-database.md)** is one of the leading options for this architecture, offering a dedicated WebRTC replication plugin that allows direct, client-to-client data synchronization via [WebRTC data channels](./replication-webrtc.md) without routing through a central cloud database. Other notable decentralized tools include **Ditto**, **GunDB**, and CRDT-based libraries like **Yjs** or **Automerge** (though these are often data structure libraries, not fully queryable databases).
## Follow Up - **Check out the [RxDB Quickstart](./quickstart.md)** to see how to set up your first RxDB database. - **Explore advanced features** like [Custom Conflict Handling](./transactions-conflicts-revisions.md) or [Offline-First Performance](./rx-storage-performance.md). - **Try an example** at [RxDB Quickstart GitHub](https://github.com/pubkey/rxdb-quickstart) to see a working P2P Sync setup. - **Join the RxDB Community** on [GitHub](/code/) or [Discord](/chat/) if you have questions or want to share your P2P WebRTC experiences. --- ## Websocket Replication With the websocket replication plugin, you can spawn a websocket server from a RxDB database in Node.js and replicate with it. :::note The websocket replication plugin does not have any concept for authentication or permission handling. It is designed to create an easy **server-to-server** replication. It is **not** made for client-server replication. Make a pull request if you need that feature. ::: ## Starting the Websocket Server ```ts import { createRxDatabase } from 'rxdb'; import { startWebsocketServer } from 'rxdb/plugins/replication-websocket'; // create a RxDatabase like normal const myDatabase = await createRxDatabase({/* ... */}); // start a websocket server const serverState = await startWebsocketServer({ database: myDatabase, port: 1337, path: '/socket' }); // stop the server await serverState.close(); ``` ## Connect to the Websocket Server The replication has to be started once for each collection that you want to replicate. ```ts import { replicateWithWebsocketServer } from 'rxdb/plugins/replication-websocket'; // start the replication const replicationState = await replicateWithWebsocketServer({ /** * To make the replication work, * the client collection name must be equal * to the server collection name. */ collection: myRxCollection, url: 'ws://localhost:1337/socket' }); // stop the replication await replicationState.cancel(); ``` ## Customize We use the [ws](https://www.npmjs.com/package/ws) npm library, so you can use all optional configuration provided by it. This is especially important to improve performance by opting in of some optional settings. ## FAQ
What is the WebSocket protocol and how does it differ from HTTP? The WebSocket protocol operates over a persistent, full-duplex TCP connection, fundamentally differing from standard HTTP's stateless, unidirectional request-response model. While traditional HTTP requires the client to initiate every exchange and constantly re-send heavy headers, WebSockets remain open indefinitely, allowing the server to push real-time data events down to the client with mere bytes of overhead, which is vital for high-throughput database [Replication](./replication.md).
When should you use Server-Sent Events (SSE) vs WebSockets? WebSockets should be used when your application demands heavy, bidirectional communication such as live chat, multiplayer gaming, or synchronous database multi-master replication. You should opt for [Server-Sent Events (SSE)](./articles/websockets-sse-polling-webrtc-webtransport.md) if your communication is strictly unidirectional (server-to-client) like delivering live sports scores or stock tickers, as SSE works natively over traditional HTTP/1.1 connections without encountering common corporate firewall and proxy blocks.
Does OpenAI and ChatGPT use WebSockets or Server-Sent Events for streaming? OpenAI and ChatGPT rely heavily on [Server-Sent Events (SSE)](./articles/websockets-sse-polling-webrtc-webtransport.md) rather than WebSockets to stream their generative text responses. Because LLM generation is inherently a unidirectional operation (the server generating and streaming tokens down to the client after an initial prompt), SSE is the perfect fit. It drastically reduces server overhead by utilizing standard HTTP/1.1 connections and avoids the bidirectional complexities, strict statefulness, and proxy-blocking issues commonly associated with persistent WebSocket streams.
How does long polling differ from traditional polling and WebSockets? Traditional polling requests data at fixed intervals regardless of state changes, wasting bandwidth and battery. Long polling holds the HTTP connection open until the server has new data or a timeout occurs, simulating a push. Unlike WebSockets, which establish a single, continuous, bidirectional TCP stream, both polling methods are strictly unidirectional and incur the overhead of re-establishing TCP handshakes and heavy HTTP headers for every discrete data event.
What are the best load balancing solutions for scaling WebSocket connections? Scaling WebSockets requires load balancers that support persistent TCP connections and protocol upgrades (like HAProxy, NGINX, or AWS ALB). Because WebSockets are heavily stateful, you must configure sticky sessions (Session Affinity) to ensure a client's continuous stream routes to the exact same backend node. For distributing real-time replication events globally across multiple horizontal backend nodes, integrating a secondary Pub/Sub mechanism (like Redis Pub/Sub) is highly recommended.
Do service worker fetch events intercept WebSocket connections? No, Service Worker `fetch` events do not intercept WebSocket connections. The `fetch` event handler in a Service Worker is strictly designed to intercept standard HTTP/HTTPS requests. If you require offline caching or Request interception for your real-time data stream, you must intercept the initial REST calls or implement a custom sync queue inside the Service Worker, which is an architectural pattern databases like **[RxDB](./rx-database.md)** handle inherently on the client side without relying on Service Workers for WS persistence.
Are WebSockets significantly faster or more expensive than standard HTTP polling? WebSockets are significantly faster (lower latency) because they eliminate the HTTP request/response header overhead for every message, pushing data instantly down an open TCP pipe. However, they are more "expensive" on the server side in terms of memory utilization; each open WebSocket connection consumes a dedicated file descriptor and RAM on the server indefinitely, making massive horizontal scaling more complex and costly compared to completely stateless HTTP polling endpoints.
How many Server-Sent Events or WebSocket connections can a server handle per client? According to the HTTP/1.1 specification (RFC 2616), browsers traditionally limit clients to a maximum of 6 concurrent connections per domain. This limit is rigidly shared across all open tabs. If a user opens 7 tabs connecting to the same WebSocket or SSE endpoint, the 7th tab will stall indefinitely. To bypass this, sophisticated real-time architectures (like **[RxDB](./rx-database.md)**) utilize [Leader Election](./leader-election.md) via the BroadcastChannel API to designate a single tab to maintain the active Socket connection, sharing the data pipeline across all other passive tabs locally.
--- ## RxDB realtime Sync Engine for Local-First Apps # RxDB's realtime Sync Engine for Local-First Apps The RxDB Sync Engine provides the ability to sync the database state in **realtime** between the clients and the server. The backend server does not have to be an RxDB instance; you can build a replication with **any infrastructure**. For example you can replicate with a [custom GraphQL endpoint](./replication-graphql.md) or an [HTTP server](./replication-http.md) on top of a PostgreSQL or MongoDB database. The replication is made to support the [Local-First](./articles/local-first-future.md) paradigm, so that when the client goes [offline](./offline-first.md), the RxDB [database](./rx-database.md) can still read and write [locally](./articles/local-database.md) and will continue the replication when the client goes online again. ## Design Decisions of the Sync Engine In contrast to other (server-side) database replication protocols, the RxDB Sync Engine was designed with these goals in mind: - **Easy to Understand**: The sync engine works in a simple "git-like" way that is easy to understand for an average developer. You only have to understand how three simple endpoints work. - **Complex Parts are in RxDB, not in the Backend**: The complex parts of the Sync Engine, like [conflict handling](./transactions-conflicts-revisions.md) or offline-online switches, are implemented inside of RxDB itself. This makes creating a compatible backend very easy. - **Compatible with any Backend**: Because the complex parts are in RxDB, the backend can be "dumb" which makes the protocol compatible to almost every backend. No matter if you use PostgreSQL, MongoDB or anything else. - **Performance is optimized for Client Devices and Browsers**: By grouping updates and fetches into batches, it is faster to transfer and easier to compress. Client devices and browsers can also process this data faster, for example running `JSON.parse()` on a chunk of data is faster than calling it once per row. Same goes for how client side storage like [IndexedDB](./rx-storage-indexeddb.md) or [OPFS](./rx-storage-opfs.md) works where writing data in bulks is faster. - **Offline-First Support**: By incorporating conflict handling at the client side, the protocol fully supports [offline-first apps](./offline-first.md). Users can continue making changes while offline, and those updates will sync seamlessly once a connection is reestablished - all without risking data loss or having undefined behavior. - **Multi-Tab Support**: When RxDB is used in a browser and multiple tabs of the same application are opened, only exactly one runs the replication at any given time. This reduces client- and backend resources. ## The Sync Engine on the document level On the [RxDocument](./rx-document.md) level, the replication works like git, where the fork/client contains all new writes and must be merged with the master/server before it can push its new state to the master/server. ``` A---B-----------D master/server state \ / B---C---D fork/client state ``` - The client pulls the latest state `B` from the master. - The client does some changes `C+D`. - The client pushes these changes to the master by sending the latest known master state `B` and the new client state `D` of the document. - If the master state is equal to the latest master `B` state of the client, the new client state `D` is set as the latest master state. - If the master also had changes and so the latest master change is different than the one that the client assumes, we have a conflict that has to be resolved on the client. ## The Sync Engine on the transfer level When document states are transferred, all handlers use batches of documents for better performance. The server **must** implement the following methods to be compatible with the replication: - **pullHandler** Get the last checkpoint (or null) as input. Returns all documents that have been written **after** the given checkpoint. Also returns the checkpoint of the latest written returned document. - **pushHandler** a method that can be called by the client to send client-side writes to the master. It gets an array with the `assumedMasterState` and the `newForkState` of each document write as input. It must return an array that contains the master document states of all conflicts. If there are no conflicts, it must return an empty array. - **pullStream** an observable that emits batches of all master writes and the latest checkpoint of the write batches. ``` +--------+ +--------+ | | pullHandler() | | | |---------------------> | | | | | | | | | | | Client | pushHandler() | Server | | |---------------------> | | | | | | | | pullStream$ | | | | <-------------------------| | +--------+ +--------+ ``` The replication runs in two **different modes**: ### Checkpoint iteration On first initial replication, or when the client comes online again, a checkpoint based iteration is used to catch up with the server state. A checkpoint is a subset of the fields of the last pulled document. When the checkpoint is sent to the backend via `pullHandler()`, the backend must be able to respond with all documents that have been written **after** the given checkpoint. For example if your documents contain an `id` and an `updatedAt` field, these two can be used as checkpoint. When the checkpoint iteration reaches the last checkpoint, where the backend returns an empty array because there are no newer documents, the replication will automatically switch to the `event observation` mode. ### Event observation While the client is connected to the backend, the events from the backend are observed via `pullStream$` and persisted to the client. If your backend for any reason is not able to provide a full `pullStream$` that contains all events and the checkpoint, you can instead only emit `RESYNC` events that tell RxDB that anything unknown has changed on the server and it should run the pull replication via [checkpoint iteration](#checkpoint-iteration). When the client goes offline and online again, it might happen that the `pullStream$` has missed out some events. Therefore the `pullStream$` should also emit a `RESYNC` event each time the client reconnects, so that the client can become in sync with the backend via the [checkpoint iteration](#checkpoint-iteration) mode. ## Data layout on the server To use the replication you first have to ensure that: - **documents are deterministically sortable by their last write time** *deterministic* means that even if two documents have the same *last write time*, they have a predictable sort order. This is most often ensured by using the *primaryKey* as second sort parameter as part of the checkpoint. - **documents are never deleted, instead the `_deleted` field is set to `true`.** This is needed so that the deletion state of a document exists in the database and can be replicated to other instances. If your backend uses a different field to mark deleted documents, you have to transform the data in the push/pull handlers or with the modifiers. For example if your documents look like this: ```ts const docData = { "id": "foobar", "name": "Alice", "lastName": "Wilson", /** * Contains the last write timestamp * so all document writes can be sorted by that value * when they are fetched from the remote instance. */ "updatedAt": 1564483474, /** * Instead of physically deleting documents, * a deleted document gets replicated. */ "_deleted": false } ``` Then your data is always sortable by `updatedAt`. This ensures that when RxDB fetches 'new' changes via `pullHandler()`, it can send the latest `updatedAt+id` checkpoint to the remote endpoint and then receive all newer documents. By default, the field is `_deleted`. If your remote endpoint uses a different field to mark deleted documents, you can set the `deletedField` in the replication options which will automatically map the field on all pull and push requests. ## Conflict handling When multiple clients (or the server) modify the same document at the same time (or when they are offline), it can happen that a conflict arises during the replication. ``` A---B1---C1---X master/server state \ / B1---C2 fork/client state ``` In the case above, the client would tell the master to move the document state from `B1` to `C2` by calling `pushHandler()`. But because the actual master state is `C1` and not `B1`, the master would reject the write by sending back the actual master state `C1`. **RxDB resolves all conflicts on the client** so it would call the conflict handler of the [RxCollection](./rx-collection.md) and create a new document state `D` that can then be written to the master. ``` A---B1---C1---X---D master/server state \ / \ / B1---C2---D fork/client state ``` The default conflict handler will always drop the fork state and use the master state. This ensures that clients that are offline for a very long time, do not accidentally overwrite other peoples changes when they go online again. You can specify a custom conflict handler by setting the property `conflictHandler` when calling `addCollection()`. Learn how to create a [custom conflict handler](./transactions-conflicts-revisions.md#custom-conflict-handler). ## replicateRxCollection() You can start the replication of a single `RxCollection` by calling `replicateRxCollection()` like in the following: ```ts import { replicateRxCollection } from 'rxdb/plugins/replication'; import { lastOfArray } from 'rxdb'; const replicationState = await replicateRxCollection({ collection: myRxCollection, /** * An id for the replication to identify it * and so that RxDB is able to resume the replication on app reload. * If you replicate with a remote server, it is recommended to put the * server url into the replicationIdentifier. */ replicationIdentifier: 'my-rest-replication-to-https://example.com/api/sync', /** * By default it will do an ongoing realtime replication. * By settings live: false the replication will run once until the local state * is in sync with the remote state, then it will cancel itself. * (optional), default is true. */ live: true, /** * Time in milliseconds after when a failed backend request * has to be retried. * This time will be skipped if a offline->online switch is detected * via navigator.onLine * (optional), default is 5 seconds. */ retryTime: 5 * 1000, /** * When multiInstance is true, like when you use RxDB in multiple browser tabs, * the replication should always run in only one of the open browser tabs. * If waitForLeadership is true, it will wait until * the current instance is leader. * If waitForLeadership is false, it will start * replicating, even if it is not leader. * [default=true] */ waitForLeadership: true, /** * If this is set to false, * the replication will not start automatically * but will wait for replicationState.start() being called. * (optional), default is true */ autoStart: true, /** * Custom deleted field, the boolean property of the document data that * marks a document as being deleted. * If your backend uses a different field name * than '_deleted', set the field name here. * RxDB will still store the documents internally * with '_deleted', setting this field * only maps the data on the data layer. * * If a custom deleted field contains a non-boolean value, the deleted state * of the documents depends on if the value is * truthy or not. So instead of providing a boolean * deleted value, you could also work with using a * 'deletedAt' timestamp instead. * * [default='_deleted'] */ deletedField: 'deleted', /** * Optional, * only needed when you want to replicate local changes to the remote instance. */ push: { /** * Push handler */ async handler(docs) { /** * Push the local documents to a remote REST server. */ const rawResponse = await fetch('https://example.com/api/sync/push', { method: 'POST', headers: { 'Accept': 'application/json', 'Content-Type': 'application/json' }, body: JSON.stringify({ docs }) }); /** * Contains an array with all conflicts that appeared during this push. * If there were no conflicts, return an empty array. */ const response = await rawResponse.json(); return response; }, /** * Batch size, optional * Defines how many documents will be given to the push handler at once. */ batchSize: 5, /** * Modifies all documents before they are given to the push handler. * Can be used to swap out a custom deleted * flag instead of the '_deleted' field. * If the push modifier return null, the * document will be skipped and not sent to * the remote. * Notice that the modifier can be called * multiple times and should not contain * any side effects. * (optional) */ modifier: d => d, /** * When a local write happens, the * replication will normally start pushing * immediately. * By providing a function here that returns * a promise, the replication waits for that * promise to resolve before starting the * next upstream persist cycle. * This lets you batch writes from multiple * rapid inserts into a single push call, * or defer pushing until the CPU is idle * (e.g. via requestIdleCallback). * NOTE: The longer you wait, the higher * the risk of losing writes if the * replication closes unexpectedly. * (optional) */ waitBeforePersist: () => new Promise(resolve => requestIdleCallback(resolve)) }, /** * Optional, * only needed when you want to replicate remote changes to the local state. */ pull: { /** * Pull handler */ async handler(lastCheckpoint, batchSize) { const minTimestamp = lastCheckpoint ? lastCheckpoint.updatedAt : 0; /** * In this example we replicate with a remote REST server */ const response = await fetch( `https://example.com/api/sync/` + `?minUpdatedAt=${minTimestamp}` + `&limit=${batchSize}` ); const documentsFromRemote = await response.json(); return { /** * Contains the pulled documents from the remote. * Not that if documentsFromRemote.length < batchSize, * then RxDB assumes that there are no more un-replicated documents * on the backend, so the replication * will switch to 'Event observation' * mode. */ documents: documentsFromRemote, /** * The last checkpoint of the returned documents. * On the next call to the pull handler, * this checkpoint will be passed as 'lastCheckpoint' */ checkpoint: documentsFromRemote.length === 0 ? lastCheckpoint : { id: lastOfArray(documentsFromRemote).id, updatedAt: lastOfArray(documentsFromRemote).updatedAt } }; }, batchSize: 10, /** * Modifies all documents after they have been pulled * but before they are used by RxDB. * Notice that the modifier can be called * multiple times and should not contain * any side effects. * (optional) */ modifier: d => d, /** * Stream of the backend document writes. * See below. * You only need a stream$ when you have set live=true */ stream$: pullStream$.asObservable() }, }); /** * Creating the pull stream for realtime replication. * Here we use a websocket but any other way of * sending data to the client can be used, * like long polling or server-sent events. */ const pullStream$ = new Subject>(); let firstOpen = true; function connectSocket() { const socket = new WebSocket('wss://example.com/api/sync/stream'); /** * When the backend sends a new batch of documents+checkpoint, * emit it into the stream$. * * event.data must look like this * { * documents: [ * { * id: 'foobar', * _deleted: false, * updatedAt: 1234 * } * ], * checkpoint: { * id: 'foobar', * updatedAt: 1234 * } * } */ socket.onmessage = event => pullStream$.next(event.data); /** * Automatically reconnect the socket on close and error. */ socket.onclose = () => connectSocket(); socket.onerror = () => socket.close(); socket.onopen = () => { if(firstOpen) { firstOpen = false; } else { /** * When the client is offline and goes online again, * it might have missed out events that happened on the server. * So we have to emit a RESYNC so that the replication goes * into 'Checkpoint iteration' mode until the client is in sync * and then it will go back into 'Event observation' mode again. */ pullStream$.next('RESYNC'); } } } ``` ## Multi Tab support For better performance, the replication runs only in one instance when RxDB is used in multiple browser tabs or Node.js processes. By setting `waitForLeadership: false` you can enforce that each tab runs its own replication cycles. If used in a multi instance setting, so when at database creation `multiInstance: false` was not set, you need to import the [leader election plugin](./leader-election.md) so that RxDB can know how many instances exist and which browser tab should run the replication. ## Error handling When sending a document to the remote fails for any reason, RxDB will send it again in a later point in time. This happens for **all** errors. The document write could have already reached the remote instance and be processed, while only the answering fails. The remote instance must be designed to handle this properly and to not crash on duplicate data transmissions. Depending on your use case, it might be ok to just write the duplicate document data again. But for a more resilient error handling you could compare the last write timestamps or add a unique write id field to the document. This field can then be used to detect duplicates and ignore re-sent data. Also the replication has an `.error$` stream that emits all [RxError](./errors.md) objects that arise during replication. Notice that these errors contain an inner `.parameters.errors` field that contains the original error. Also they contain a `.parameters.direction` field that indicates if the error was thrown during `pull` or `push`. You can use these to properly handle errors. For example when the client is outdated, the server might respond with a `426 Upgrade Required` error code that can then be used to force a page reload. ```ts replicationState.error$.subscribe((error) => { if( error.parameters.errors && error.parameters.errors[0] && error.parameters.errors[0].code === 426 ) { // client is outdated -> enforce a page reload location.reload(); } }); ``` ## Security Be aware that client side clocks can never be trusted. When you have a client-backend replication, the backend should overwrite the `updatedAt` timestamp or use another field, when it receives the change from the client. ## RxReplicationState The function `replicateRxCollection()` returns a `RxReplicationState` that can be used to manage and observe the replication. ### Observable To observe the replication, the `RxReplicationState` has some `Observable` properties: ```ts // emits each document that was received from the remote myRxReplicationState.received$.subscribe(doc => console.dir(doc)); // emits each document that was sent to the remote myRxReplicationState.sent$.subscribe(doc => console.dir(doc)); // emits all errors that happen when running the push- & pull-handlers. myRxReplicationState.error$.subscribe(error => console.dir(error)); // emits true when the replication was canceled, false when not. myRxReplicationState.canceled$.subscribe(bool => console.dir(bool)); // emits true when a replication cycle is running, false when not. myRxReplicationState.active$.subscribe(bool => console.dir(bool)); ``` ### awaitInitialReplication() With `awaitInitialReplication()` you can await the initial replication that is done when a full replication cycle was successfully finished for the first time. The returned promise will never resolve if you cancel the replication before the initial replication can be done. ```ts await myRxReplicationState.awaitInitialReplication(); ``` ### awaitInSync() Returns a `Promise` that resolves when: - `awaitInitialReplication()` has emitted. - All local data is replicated with the remote. - No replication cycle is running or in retry-state. :::warning When `multiInstance: true` and `waitForLeadership: true` and another tab is already running the replication, `awaitInSync()` will not resolve until the other tab is closed and the replication starts in this tab. ```ts await myRxReplicationState.awaitInSync(); ``` ::: :::warning #### `awaitInitialReplication()` and `awaitInSync()` should not be used to block the application A common mistake in RxDB usage is when developers want to block the app usage until the application is in sync. Often they just `await` the promise of `awaitInitialReplication()` or `awaitInSync()` and show a loading spinner until they resolve. This is dangerous and should not be done because: - When `multiInstance: true` and `waitForLeadership: true (default)` and another tab is already running the replication, `awaitInitialReplication()` will not resolve until the other tab is closed and the replication starts in this tab. - Your app can no longer be started when the device is offline because there `awaitInitialReplication()` will never resolve and the app cannot be used. Instead you should store the last in-sync time in a [local document](./rx-local-document.md) and observe its value on all instances. For example if you want to block clients from using the app if they have not been in sync for the last 24 hours, you could use this code: ```ts // update last-in-sync-flag each time replication is in sync // ensure flag exists await myCollection.insertLocal( 'last-in-sync', { time: 0 } ).catch(); myReplicationState.active$.pipe( mergeMap(async() => { await myReplicationState.awaitInSync(); await myCollection.upsertLocal('last-in-sync', { time: Date.now() }) }) ); // observe the flag and toggle loading spinner await showLoadingSpinner(); const oneDay = 1000 * 60 * 60 * 24; await firstValueFrom( myCollection.getLocal$('last-in-sync').pipe( filter(d => d.get('time') > (Date.now() - oneDay)) ) ); await hideLoadingSpinner(); ``` ::: ### reSync() Triggers a `RESYNC` cycle where the replication goes into [checkpoint iteration](#checkpoint-iteration) until the client is in sync with the backend. Used in unit tests or when no proper `pull.stream$` can be implemented so that the client only knows that something has been changed but not what. ```ts myRxReplicationState.reSync(); ``` If your backend is not capable of sending events to the client at all, you could run `reSync()` in an interval so that the client will automatically fetch server changes after some time at least. ```ts // trigger RESYNC each 10 seconds. setInterval(() => myRxReplicationState.reSync(), 10 * 1000); ``` ### cancel() Cancels the replication. Returns a promise that resolves when everything has been cleaned up. ```ts await myRxReplicationState.cancel(); ``` ### pause() Pauses a running replication. The replication can later be resumed with `RxReplicationState.start()`. ```ts await myRxReplicationState.pause(); await myRxReplicationState.start(); // restart ``` ### remove() Cancels the replication and deletes the metadata of the replication state. This can be used to restart the replication "from scratch". Calling `.remove()` will only delete the replication metadata, it will NOT delete the documents from the collection of the replication. ```ts await myRxReplicationState.remove(); ``` ### isStopped() Returns `true` if the replication is stopped. This can be if a non-live replication is finished or a replication got canceled. ```js replicationState.isStopped(); // true/false ``` ### isPaused() Returns `true` if the replication is paused. ```js replicationState.isPaused(); // true/false ``` ### Setting a custom initialCheckpoint By default, the push replication will start from the beginning of time and push all documents from there to the remote. By setting a custom `push.initialCheckpoint`, you can tell the replication to only push writes that are newer than the given checkpoint. ```ts // store the latest checkpoint of a collection let lastLocalCheckpoint: any; myCollection.checkpoint$.subscribe(checkpoint => lastLocalCheckpoint = checkpoint); // start the replication but only push documents // that are newer than the lastLocalCheckpoint const replicationState = replicateRxCollection({ collection: myCollection, replicationIdentifier: 'my-custom-replication-with-init-checkpoint', /* ... */ push: { handler: /* ... */, initialCheckpoint: lastLocalCheckpoint } }); ``` The same can be done for the other direction by setting a `pull.initialCheckpoint`. Notice that here we need the remote checkpoint from the backend instead of the one from the RxDB storage. ```ts // get the last pull checkpoint from the server const lastRemoteCheckpoint = await ( await fetch('http://example.com/pull-checkpoint') ).json(); // start the replication but only pull documents // that are newer than the lastRemoteCheckpoint const replicationState = replicateRxCollection({ collection: myCollection, replicationIdentifier: 'my-custom-replication-with-init-checkpoint', /* ... */ pull: { handler: /* ... */, initialCheckpoint: lastRemoteCheckpoint } }); ``` ### toggleOnDocumentVisible Ensures replication continues running when the document is `visible`. This helps avoid situations where the leader-elected tab becomes stale or is hibernated by the browser to save battery. When the tab becomes hidden, replication is automatically paused; when the tab becomes visible again (or the instance becomes leader), replication resumes. **Default:** `true` ```ts const replicationState = replicateRxCollection({ toggleOnDocumentVisible: true, /* ... */ }); ``` ## Attachment replication Attachment replication is supported in the RxDB Sync Engine itself. However not all replication plugins support it. If you start the replication with a collection which has [enabled RxAttachments](./rx-attachment.md) attachment data will be added to all push- and write data. The pushed documents will contain an `_attachments` object which contains: - The attachment meta data (id, length, digest) of all non-attachments - The full attachment data of all attachments that have been updated/added from the client. - Deleted attachments are spared out in the pushed document. With this data, the backend can decide onto which attachments must be deleted, added or overwritten. Accordingly, the pulled document must contain the same data, if the backend has a new document state with updated attachments. ## Pull-Only Replication With the replication protocol it is possible to do pull only replications where data is pulled from a backend but not pushed from the client. RxDB implements some performance optimizations for these like not storing server metadata on pull only streams. ## Partial Sync with RxDB RxDB supports partial sync patterns where you dynamically manage multiple replication states for different data scopes. This keeps local storage lean and reduces network overhead. Learn more on the dedicated [Partial Sync](./partial-sync.md) page. ## FAQ
I have infinite loops in my replication, how to debug? When you have infinite loops in your replication or random re-runs of http requests after some time, the reason is likely that your pull-handler is crashing. To debug this, add a log to the error$ handler to debug it. `myRxReplicationState.error$.subscribe(err => console.log('error$', err))`.