Quickstart
This guide helps you quickly set up replication between PostgreSQL database instances using ReJot. For demonstration purposes we’ll use one Postgres database for both publishing and consuming data and configure it using a single manifest file and with no external event store.
Prerequisites
Create a ReJot Manifest
First, we’ll have to initialize a Manifest file. This file contains all
configuration that ReJot needs in order to sync between datastores. By default this will create a
rejot-manifest.json
file in the working directory.
rejot-cli manifest init --slug "my-sync-project"
Add Database Connections
Configure connections to your source and destination databases. In this example, we’ll use one database as both source and destination, so we will need only one connection.
rejot-cli manifest connection add \
--slug "my-db" \
--type postgres \
--database postgres \
--host localhost \
--password example \
--port 5432 \
--user postgres
Set Up Data Store
Define a data store and attach it to the connection we just created.
# Source data store
rejot-cli manifest datastore add \
--connection my-db \
--publication rejot_publication \
--slot rejot_slot
Quickly inspect your current manifest config using rejot-cli manifest info
Set Up Event store
Next up we’ll define an event store, an event store is used by ReJot to store Public Schema events before replicating them to consumers. For demonstration purposes we’ll use the same Postgres database connection we created before but generally a separate dedicated database is better suited. The appropriate schemas and tables will be created on first launch.
rejot-cli manifest eventstore add \
--connection my-db
ReJot can function without an event store, but this means changes to a Public Schema are not stored durably and will be lost if the sync service is stopped or crashes. For local development this is usually fine.
Define Public and Consumer Schemas
Our public and consumer schemas define what data is exposed from the datastore and how that exposed
data should be synced to the destination. For the publishing side, this is done through a simple
select query that will be run each time the source table is updated. The select query is responsible
for transforming the internal schema to the public schema and must include a WHERE
clause using
the primary key for that table. Note that in the example query, the :id
placeholder will be the
primary key value for incoming updates.
Public and Consumer Schemas can be created through Typescript code, create a schemas.ts
file
containing your schemas. See our full guide on defining schemas for
more detailed description of the schema definition process.
// schemas.ts
import { z } from "zod";
import { createPostgresPublicSchemaTransformations } from "@rejot-dev/adapter-postgres";
import { createConsumerSchema } from "@rejot-dev/contract/consumer-schema";
import { createPublicSchema } from "@rejot-dev/contract/public-schema";
// Public schema definition for api_key table
const apiKeyPublicSchema = createPublicSchema("my-public-schema", {
source: { dataStoreSlug: "my-db" },
outputSchema: z.object({
id: z.string(),
api_key: z.string(),
}),
config: {
publicSchemaType: "postgres",
transformations: [
...createPostgresPublicSchemaTransformations(
"insertOrUpdate",
"api_key",
`SELECT id, key AS "api_key" FROM api_key WHERE id = :id`,
),
],
},
version: {
major: 1,
minor: 0,
},
});
// Consumer schema that writes to target_table
const apiKeyConsumerSchema = createConsumerSchema("my-consumer-schema", {
source: {
manifestSlug: "my-sync-project",
publicSchema: {
name: "my-public-schema",
majorVersion: 1,
},
},
config: {
consumerSchemaType: "postgres",
destinationDataStoreSlug: "my-db",
sql: `INSERT INTO target_table (id, api_key) VALUES (:id, :api_key)
ON CONFLICT (id) DO UPDATE SET api_key = :api_key`,
},
});
export default {
apiKeyPublicSchema,
apiKeyConsumerSchema,
};
# schema.py
from pydantic import BaseModel
from rejot_contract.public_schema import (
create_public_schema, PublicSchemaConfig, Source, Version, create_postgres_public_schema_transformation
)
from rejot_contract.consumer_schema import (
create_consumer_schema, ConsumerSchemaConfig, SourceManifest, PublicSchema
)
class ApiKey(BaseModel):
id: int
api_key: str
public = create_public_schema(
public_schema_name="my-public-schema",
source=Source(dataStoreSlug="my-db"),
output_schema=ApiKey,
version=Version(major=1, minor=0),
config=PublicSchemaConfig(
publicSchemaType="postgres",
transformations=[
create_postgres_public_schema_transformation(
operation="insertOrUpdate",
table="api_key",
sql="SELECT id, key AS 'api_key' FROM api_key WHERE id = :id",
),
],
),
)
# Define a consumer schema
consumer = create_consumer_schema(
"my-consumer-schema",
source=SourceManifest(
manifestSlug="my-sync-project",
publicSchema=PublicSchema(
name="my-public-schema",
majorVersion=1,
),
),
config=ConsumerSchemaConfig(
consumerSchemaType="postgres",
destinationDataStoreSlug="my-db",
sql="INSERT INTO target_table (id, api_key) VALUES (:id, :api_key) ON CONFLICT (id) DO UPDATE SET api_key = :api_key",
),
)
Collect Schemas to Manifest
Before your newly defined schemas can be used, they must me collected into your manifest file. You
can do this using the collect
command in the CLI. It might be nice to check your schemas are
correct first using the --check
flag.
rejot-cli collect schemas.ts --check --print
If all looks good, you can materialize the schemas into your manifest.
rejot-cli collect schemas.ts --write
Create Target Table in Destination
Before launching the synchronization process, create the target table in your destination database:
CREATE TABLE target_table (id SERIAL PRIMARY KEY, api_key TEXT NOT NULL);
Start Synchronization
Start the synchronization process:
rejot-cli manifest sync ./rejot-manifest.json
Any new writes to your source tables in the source datastore should now be transformed and written into the destination datastore!