Defining Schemas
When using ReJot, you define Public and Consumer schemas right from your codebase. This allows you to group related pieces of code together. In this case you can define your public schemas close to where you define the underlying database tables.
This guide will show you how to define (Postgres) schemas in your TypeScript codebase.
If you want to use ReJot with a non-TypeScript codebase, you can modify your manifest directly to create schemas, or you can choose to define your schemas in TypeScript anyway.
Setting Up
First, add the required dependencies to your project:
npm install --save @rejot-dev/contract @rejot-dev/adapter-postgres
bun add @rejot-dev/contract @rejot-dev/adapter-postgres
Creating Public and Consumer Schema Definitions
Create a new file called schemas.ts
that will contain your public and consumer schemas:
// schema.ts
import { z } from "zod";
import { createConsumerSchema } from "@rejot-dev/contract/consumer-schema";
import { createPublicSchema } from "@rejot-dev/contract/public-schema";
const myPublicSchema = createPublicSchema("my-public-schema", {
source: { dataStoreSlug: "my-source-datastore" },
outputSchema: z.object({
id: z.string(),
name: z.string(),
}),
config: {
publicSchemaType: "postgres",
transformations: [], // See next step
},
version: {
major: 1,
minor: 0,
},
});
const myConsumerSchema = createConsumerSchema("my-consumer-schema", {
source: {
manifestSlug: "my-manifest",
publicSchema: {
name: "my-public-schema",
majorVersion: 1,
},
},
config: {
consumerSchemaType: "postgres",
destinationDataStoreSlug: "my-destination-datastore",
sql: "", // See next step
},
});
export default {
myPublicSchema,
myConsumerSchema,
};
# schema.py
from pydantic import BaseModel
from rejot_contract.public_schema import (
create_public_schema, PublicSchemaConfig, Source, Version
)
from rejot_contract.consumer_schema import (
create_consumer_schema, ConsumerSchemaConfig, SourceManifest, PublicSchema
)
class Account(BaseModel):
id: int
name: str
public = create_public_schema(
public_schema_name="my-public-schema",
source=Source(dataStoreSlug="my-source-datastore"),
output_schema=Account,
version=Version(major=1, minor=0),
config=PublicSchemaConfig(
publicSchemaType="postgres",
transformations=[], # See next step
),
)
# Define a consumer schema
consumer = create_consumer_schema(
"my-consumer-schema",
source=SourceManifest(
manifestSlug="my-manifest",
publicSchema=PublicSchema(
name="my-public-schema",
majorVersion=1,
),
),
config=ConsumerSchemaConfig(
consumerSchemaType="postgres",
destinationDataStoreSlug="my-destination-datastore",
sql="", # See next step
),
)
Make sure you adjust the data store and manifest slugs in this example to the manifest you’ve created.
Transformations
Transformations are a crucial part of schema definitions that specify how data is transformed between different stages of the sync process. Transformation are used by both the publishing and consuming parties, for publishers, the transformation defines how data from the internal representation should be mapped to the public schema. Conversely on the consuming side the transformation maps the public schema back into an internal representation for the consumer.
Public Schema Transformations
For public schemas, you need to create a SQL transformation for each table referenced in the schema. These transformations:
- Take the modified row as input parameters. They can be accessed in column order using positional
parameters (e.g.,
$1
,$2
, etc). We also provide support for named parameters, using the name of the column. E.g.:id
. - MUST return exactly one row that matches the public schema’s output structure.
Here’s an example of a public schema transformation:
import { createPostgresPublicSchemaTransformations } from "@rejot-dev/adapter-postgres";
// ... in myPublicSchema
transformations: createPostgresPublicSchemaTransformations(
"insertOrUpdate",
"my_table",
`SELECT id, name FROM my_table WHERE id = :id`,
),
# ... in myPublicSchema
transformations=[
create_postgres_public_schema_transformation(
operation="insertOrUpdate",
table="account",
sql="SELECT id, name FROM my_table WHERE id = :id",
),
],
The row returned from the transformation query must exactly match the public schema’s output
structure, including casing in column names. In Postgres, when not double-quoting column names,
they are always returned in lowercase. It’s recommended to always add an explicit “AS” statement,
to ensure the casing is correct. E.g. SELECT id, first_name AS "firstName" FROM my_table
.
Multi-table transformations
A common case is to (partially) de-normalize your data for the public schema, to ensure correct results ReJot should listen to updates to all of these tables. As a data producer you will have to define schema transformations for each table.
import { createPostgresPublicSchemaTransformations } from "@rejot-dev/adapter-postgres";
// ... in myPublicSchema
transformations: [
...createPostgresPublicSchemaTransformations(
"insertOrUpdate",
"accounts",
`SELECT
accounts.id AS "id",
accounts.name AS "name",
addresses.country AS "country"
FROM
accounts
JOIN addresses ON accounts.id = addresses.account_id
WHERE
accounts.id = :id`,
),
...createPostgresPublicSchemaTransformations(
"insertOrUpdate",
"addresses",
`SELECT
accounts.id AS "id",
accounts.name AS "name",
addresses.country AS "country"
FROM
accounts
JOIN addresses ON accounts.id = addresses.account_id
WHERE
addresses.id = :id`,
),
],
# ... in myPublicSchema
transformations=[
create_postgres_public_schema_transformation(
operation="insertOrUpdate",
table="accounts",
sql="""SELECT
accounts.id AS "id",
accounts.name AS "name",
addresses.country AS "country"
FROM
accounts
JOIN addresses ON accounts.id = addresses.account_id
WHERE
accounts.id = :id""",
),
create_postgres_public_schema_transformation(
operation="insertOrUpdate",
table="addresses",
sql="""SELECT
accounts.id AS "id",
accounts.name AS "name",
addresses.country AS "country"
FROM
accounts
JOIN addresses ON accounts.id = addresses.account_id
WHERE
addresses.id = :id""",
),
],
Note that your public schema transformation must always return one result, in case there is a one-to-many relationship between tables you cannot include them in your ReJot public schema without doing some kind aggregation in your query. For PostgreSQL, use one of the aggregation functions that are available.
Consumer Schema Transformations
For consumer schemas, transformations handle the insertion of public schema data into the destination data store. These transformations:
- Use named parameters (e.g.,
:name
,:country
) to reference fields from the public schema - Must handle conflicts appropriately since ReJot doesn’t guarantee exactly-once delivery
Here’s an example of a consumer schema transformation:
// ... in myConsumerSchema
sql: "INSERT INTO destination_table (id, name) VALUES (:id, :name) ON CONFLICT (id) DO UPDATE SET name = :name",
# ... in myConsumerSchema
sql="INSERT INTO destination_table (id, name) VALUES (:id, :name) ON CONFLICT (id) DO UPDATE SET name = :name",
Materializing Schemas
Code-based schemas need to be materialized into your manifest file to be usable by sync services.
This is done using the collect
command:
rejot-cli collect --write --check schemas.ts
This command will:
- Read your schema definitions from
schemas.ts
. - Generate the corresponding manifest entries.
- Check that all consumer schemas are referencing valid public schemas (if
--check
is specified). - Update your manifest file (if
--write
is specified).
The manifest includes a definitionFile
key for each schema, which points to the actual source file
and is re-used later on to update collected schemas.
If you want to change a schema, edit the file referenced by definitionFile
and run the collect
command again.
Next Steps
Now that you know how to define schemas, you can run sync services to start and manage sync services using manifests.