Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
532 changes: 532 additions & 0 deletions .claude/commands/test-sync-roundtrip-rls.md

Large diffs are not rendered by default.

33 changes: 32 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,41 @@ jobs:
path: dist/${{ matrix.name == 'apple-xcframework' && 'CloudSync.*' || 'cloudsync.*'}}
if-no-files-found: error

postgres-test:
runs-on: ubuntu-22.04
name: postgresql build + test
timeout-minutes: 10

steps:

- uses: actions/checkout@v4.2.2

- name: build and start postgresql container
run: make postgres-docker-rebuild

- name: wait for postgresql to be ready
run: |
for i in $(seq 1 30); do
if docker exec cloudsync-postgres pg_isready -U postgres > /dev/null 2>&1; then
echo "PostgreSQL is ready"
exit 0
fi
sleep 2
done
echo "PostgreSQL failed to start within 60s"
docker logs cloudsync-postgres
exit 1
- name: run postgresql tests
run: |
docker exec cloudsync-postgres mkdir -p /tmp/cloudsync/test
docker cp test/postgresql cloudsync-postgres:/tmp/cloudsync/test/postgresql
docker exec cloudsync-postgres psql -U postgres -d postgres -f /tmp/cloudsync/test/postgresql/full_test.sql
release:
runs-on: ubuntu-22.04
name: release
needs: build
needs: [build, postgres-test]
if: github.ref == 'refs/heads/main'

env:
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ jniLibs/

# System
.DS_Store
Thumbs.db
Thumbs.db
CLAUDE.md
30 changes: 15 additions & 15 deletions docker/Makefile.postgresql
Original file line number Diff line number Diff line change
Expand Up @@ -137,32 +137,32 @@ PG_DOCKER_DB_PASSWORD ?= postgres

# Build Docker image with pre-installed extension
postgres-docker-build:
@echo "Building Docker image via docker-compose (rebuilt when sources change)..."
@echo "Building Docker image via docker compose (rebuilt when sources change)..."
# To force plaintext BuildKit logs, run: make postgres-docker-build DOCKER_BUILD_ARGS="--progress=plain"
cd docker/postgresql && docker-compose build $(DOCKER_BUILD_ARGS)
cd docker/postgresql && docker compose build $(DOCKER_BUILD_ARGS)
@echo ""
@echo "Docker image built successfully!"

# Build Docker image with AddressSanitizer enabled (override compose file)
postgres-docker-build-asan:
@echo "Building Docker image with ASAN via docker-compose..."
@echo "Building Docker image with ASAN via docker compose..."
# To force plaintext BuildKit logs, run: make postgres-docker-build-asan DOCKER_BUILD_ARGS=\"--progress=plain\"
cd docker/postgresql && docker-compose -f docker-compose.debug.yml -f docker-compose.asan.yml build $(DOCKER_BUILD_ARGS)
cd docker/postgresql && docker compose -f docker-compose.debug.yml -f docker-compose.asan.yml build $(DOCKER_BUILD_ARGS)
@echo ""
@echo "ASAN Docker image built successfully!"

# Build Docker image using docker-compose.debug.yml
postgres-docker-debug-build:
@echo "Building debug Docker image via docker-compose..."
@echo "Building debug Docker image via docker compose..."
# To force plaintext BuildKit logs, run: make postgres-docker-debug-build DOCKER_BUILD_ARGS=\"--progress=plain\"
cd docker/postgresql && docker-compose -f docker-compose.debug.yml build $(DOCKER_BUILD_ARGS)
cd docker/postgresql && docker compose -f docker-compose.debug.yml build $(DOCKER_BUILD_ARGS)
@echo ""
@echo "Debug Docker image built successfully!"

# Run PostgreSQL container with CloudSync
postgres-docker-run:
@echo "Starting PostgreSQL with CloudSync..."
cd docker/postgresql && docker-compose up -d --build
cd docker/postgresql && docker compose up -d --build
@echo ""
@echo "Container started successfully!"
@echo ""
Expand All @@ -179,7 +179,7 @@ postgres-docker-run:
# Run PostgreSQL container with CloudSync and AddressSanitizer enabled
postgres-docker-run-asan:
@echo "Starting PostgreSQL with CloudSync (ASAN enabled)..."
cd docker/postgresql && docker-compose -f docker-compose.debug.yml -f docker-compose.asan.yml up -d --build
cd docker/postgresql && docker compose -f docker-compose.debug.yml -f docker-compose.asan.yml up -d --build
@echo ""
@echo "Container started successfully!"
@echo ""
Expand All @@ -196,7 +196,7 @@ postgres-docker-run-asan:
# Run PostgreSQL container using docker-compose.debug.yml
postgres-docker-debug-run:
@echo "Starting PostgreSQL with CloudSync (debug compose)..."
cd docker/postgresql && docker-compose -f docker-compose.debug.yml up -d --build
cd docker/postgresql && docker compose -f docker-compose.debug.yml up -d --build
@echo ""
@echo "Container started successfully!"
@echo ""
Expand All @@ -213,21 +213,21 @@ postgres-docker-debug-run:
# Stop PostgreSQL container
postgres-docker-stop:
@echo "Stopping PostgreSQL container..."
cd docker/postgresql && docker-compose down
cd docker/postgresql && docker compose down
@echo "Container stopped"

# Rebuild and restart container
postgres-docker-rebuild: postgres-docker-build
@echo "Rebuilding and restarting container..."
cd docker/postgresql && docker-compose down
cd docker/postgresql && docker-compose up -d --build
cd docker/postgresql && docker compose down
cd docker/postgresql && docker compose up -d --build
@echo "Container restarted with new image"

# Rebuild and restart container using docker-compose.debug.yml
postgres-docker-debug-rebuild: postgres-docker-debug-build
@echo "Rebuilding and restarting debug container..."
cd docker/postgresql && docker-compose -f docker-compose.debug.yml down
cd docker/postgresql && docker-compose -f docker-compose.debug.yml up -d --build
cd docker/postgresql && docker compose -f docker-compose.debug.yml down
cd docker/postgresql && docker compose -f docker-compose.debug.yml up -d --build
@echo "Debug container restarted with new image"

# Interactive shell in container
Expand Down Expand Up @@ -353,5 +353,5 @@ postgres-help:
# Simple smoke test: rebuild image/container, create extension, and query version
unittest-pg: postgres-docker-rebuild
@echo "Running PostgreSQL extension smoke test..."
cd docker/postgresql && docker-compose exec -T postgres psql -U postgres -d cloudsync_test -f /tmp/cloudsync/docker/postgresql/smoke_test.sql
cd docker/postgresql && docker compose exec -T postgres psql -U postgres -d cloudsync_test -f /tmp/cloudsync/docker/postgresql/smoke_test.sql
@echo "Smoke test completed."
158 changes: 129 additions & 29 deletions src/cloudsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@
#define CLOUDSYNC_INIT_NTABLES 64
#define CLOUDSYNC_MIN_DB_VERSION 0

#define CLOUDSYNC_PAYLOAD_SKIP_SCHEMA_HASH_CHECK 1
#define CLOUDSYNC_PAYLOAD_MINBUF_SIZE (512*1024)
#define CLOUDSYNC_PAYLOAD_SIGNATURE 0x434C5359 /* 'C','L','S','Y' */
#define CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL 1
#define CLOUDSYNC_PAYLOAD_VERSION_1 CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL
#define CLOUDSYNC_PAYLOAD_VERSION_2 2
#define CLOUDSYNC_PAYLOAD_VERSION_LATEST CLOUDSYNC_PAYLOAD_VERSION_2
#define CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM CLOUDSYNC_PAYLOAD_VERSION_2

#ifndef MAX
Expand All @@ -63,10 +63,6 @@

#define DEBUG_DBERROR(_rc, _fn, _data) do {if (_rc != DBRES_OK) printf("Error in %s: %s\n", _fn, database_errmsg(_data));} while (0)

#if CLOUDSYNC_PAYLOAD_SKIP_SCHEMA_HASH_CHECK
bool schema_hash_disabled = true;
#endif

typedef enum {
CLOUDSYNC_PK_INDEX_TBL = 0,
CLOUDSYNC_PK_INDEX_PK = 1,
Expand Down Expand Up @@ -1208,18 +1204,20 @@ int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, c
return rc;
}

// bind value
// bind value (always bind all expected parameters for correct prepared statement handling)
if (col_value) {
rc = databasevm_bind_value(vm, table->npks+1, col_value);
if (rc == DBRES_OK) rc = databasevm_bind_value(vm, table->npks+2, col_value);
if (rc != DBRES_OK) {
cloudsync_set_dberror(data);
dbvm_reset(vm);
return rc;
}

} else {
rc = databasevm_bind_null(vm, table->npks+1);
if (rc == DBRES_OK) rc = databasevm_bind_null(vm, table->npks+2);
}

if (rc != DBRES_OK) {
cloudsync_set_dberror(data);
dbvm_reset(vm);
return rc;
}

// perform real operation and disable triggers

// in case of GOS we reused the table->col_merge_stmt statement
Expand Down Expand Up @@ -1794,13 +1792,104 @@ int cloudsync_commit_alter (cloudsync_context *data, const char *table_name) {
return rc;
}

// MARK: - Filter Rewrite -

// Replace bare column names in a filter expression with prefix-qualified names.
// E.g., filter="user_id = 42", prefix="NEW", columns=["user_id","id"] → "NEW.\"user_id\" = 42"
// Columns must be sorted by length descending by the caller to avoid partial matches.
// Skips content inside single-quoted string literals.
// Returns a newly allocated string (caller must free with cloudsync_memory_free), or NULL on error.
// Helper: check if an identifier token matches a column name.
static bool filter_is_column (const char *token, size_t token_len, char **columns, int ncols) {
for (int i = 0; i < ncols; ++i) {
if (strlen(columns[i]) == token_len && strncmp(token, columns[i], token_len) == 0)
return true;
}
return false;
}

// Helper: check if character is part of a SQL identifier.
static bool filter_is_ident_char (char c) {
return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
(c >= '0' && c <= '9') || c == '_';
}

char *cloudsync_filter_add_row_prefix (const char *filter, const char *prefix, char **columns, int ncols) {
if (!filter || !prefix || !columns || ncols <= 0) return NULL;

size_t filter_len = strlen(filter);
size_t prefix_len = strlen(prefix);

// Each identifier match grows by at most (prefix_len + 3) bytes.
// Worst case: the entire filter is one repeated column reference separated by
// single characters, so up to (filter_len / 2) matches. Use a safe upper bound.
size_t max_growth = (filter_len / 2 + 1) * (prefix_len + 3);
size_t cap = filter_len + max_growth + 64;
char *result = (char *)cloudsync_memory_alloc(cap);
if (!result) return NULL;
size_t out = 0;

// Single pass: tokenize into identifiers, quoted strings, and everything else.
size_t i = 0;
while (i < filter_len) {
// Skip single-quoted string literals verbatim (handle '' escape)
if (filter[i] == '\'') {
result[out++] = filter[i++];
while (i < filter_len) {
if (filter[i] == '\'') {
result[out++] = filter[i++];
// '' is an escaped quote — keep going
if (i < filter_len && filter[i] == '\'') {
result[out++] = filter[i++];
continue;
}
break; // single ' ends the literal
}
result[out++] = filter[i++];
}
continue;
}

// Extract identifier token
if (filter_is_ident_char(filter[i])) {
size_t start = i;
while (i < filter_len && filter_is_ident_char(filter[i])) ++i;
size_t token_len = i - start;

if (filter_is_column(&filter[start], token_len, columns, ncols)) {
// Emit PREFIX."column_name"
memcpy(&result[out], prefix, prefix_len); out += prefix_len;
result[out++] = '.';
result[out++] = '"';
memcpy(&result[out], &filter[start], token_len); out += token_len;
result[out++] = '"';
} else {
// Not a column — copy as-is
memcpy(&result[out], &filter[start], token_len); out += token_len;
}
continue;
}

// Any other character — copy as-is
result[out++] = filter[i++];
}

result[out] = '\0';
return result;
}

int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name) {
cloudsync_table_context *table = table_lookup(data, table_name);
if (!table) return DBRES_ERROR;

dbvm_t *vm = NULL;
int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);

// Read row-level filter from settings (if any)
char filter_buf[2048];
int frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", filter_buf, sizeof(filter_buf));
const char *filter = (frc == DBRES_OK && filter_buf[0]) ? filter_buf : NULL;

const char *schema = table->schema ? table->schema : "";
char *sql = sql_build_pk_collist_query(schema, table_name);
char *pkclause_identifiers = NULL;
Expand All @@ -1810,18 +1899,22 @@ int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name)
char *pkvalues_identifiers = (pkclause_identifiers) ? pkclause_identifiers : "rowid";

// Use database-specific query builder to handle type differences in composite PKs
sql = sql_build_insert_missing_pks_query(schema, table_name, pkvalues_identifiers, table->base_ref, table->meta_ref);
sql = sql_build_insert_missing_pks_query(schema, table_name, pkvalues_identifiers, table->base_ref, table->meta_ref, filter);
if (!sql) {rc = DBRES_NOMEM; goto finalize;}
rc = database_exec(data, sql);
cloudsync_memory_free(sql);
if (rc != DBRES_OK) goto finalize;

// fill missing colums
// for each non-pk column:
// The new query does 1 encode per source row and one indexed NOT-EXISTS probe.
// The old plan does many decodes per candidate and can’t use an index to rule out matches quickly—so it burns CPU and I/O.

sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL, pkvalues_identifiers, table->base_ref, table->meta_ref);
// The old plan does many decodes per candidate and can't use an index to rule out matches quickly—so it burns CPU and I/O.

if (filter) {
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL_FILTERED, pkvalues_identifiers, table->base_ref, filter, table->meta_ref);
} else {
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL, pkvalues_identifiers, table->base_ref, table->meta_ref);
}
rc = databasevm_prepare(data, sql, (void **)&vm, DBFLAG_PERSISTENT);
cloudsync_memory_free(sql);
if (rc != DBRES_OK) goto finalize;
Expand Down Expand Up @@ -2263,15 +2356,17 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
header.nrows = ntohl(header.nrows);
header.schema_hash = ntohll(header.schema_hash);

#if !CLOUDSYNC_PAYLOAD_SKIP_SCHEMA_HASH_CHECK
if (!data || header.schema_hash != data->schema_hash) {
if (!database_check_schema_hash(data, header.schema_hash)) {
char buffer[1024];
snprintf(buffer, sizeof(buffer), "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
return cloudsync_set_error(data, buffer, DBRES_MISUSE);
// compare schema_hash only if not disabled and if the received payload was created with the current header version
// to avoid schema hash mismatch when processed by a peer with a different extension version during software updates.
if (dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_SKIP_SCHEMA_HASH_CHECK) == 0 && header.version == CLOUDSYNC_PAYLOAD_VERSION_LATEST ) {
if (header.schema_hash != data->schema_hash) {
if (!database_check_schema_hash(data, header.schema_hash)) {
char buffer[1024];
snprintf(buffer, sizeof(buffer), "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
return cloudsync_set_error(data, buffer, DBRES_MISUSE);
}
}
}
#endif

// sanity check header
if ((header.signature != CLOUDSYNC_PAYLOAD_SIGNATURE) || (header.ncols == 0)) {
Expand Down Expand Up @@ -2444,8 +2539,8 @@ int cloudsync_payload_get (cloudsync_context *data, char **blob, int *blob_size,

// retrieve BLOB
char sql[1024];
snprintf(sql, sizeof(sql), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes) "
"SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version, MAX(IIF(db_version = max_db_version, seq, NULL)) FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND (db_version>%d OR (db_version=%d AND seq>%d))) WHERE payload IS NOT NULL", *db_version, *db_version, *seq);
snprintf(sql, sizeof(sql), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes WHERE site_id=cloudsync_siteid()) "
"SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version, MAX(IIF(db_version = max_db_version, seq, 0)) FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND (db_version>%d OR (db_version=%d AND seq>%d))) WHERE payload IS NOT NULL", *db_version, *db_version, *seq);

int64_t len = 0;
int rc = database_select_blob_2int(data, sql, blob, &len, new_db_version, new_seq);
Expand Down Expand Up @@ -2723,8 +2818,13 @@ int cloudsync_init_table (cloudsync_context *data, const char *table_name, const
// sync algo with table (unused in this version)
// cloudsync_sync_table_key(data, table_name, "*", CLOUDSYNC_KEY_ALGO, crdt_algo_name(algo_new));

// read row-level filter from settings (if any)
char init_filter_buf[2048];
int init_frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", init_filter_buf, sizeof(init_filter_buf));
const char *init_filter = (init_frc == DBRES_OK && init_filter_buf[0]) ? init_filter_buf : NULL;

// check triggers
rc = database_create_triggers(data, table_name, algo_new);
rc = database_create_triggers(data, table_name, algo_new, init_filter);
if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating triggers", DBRES_MISUSE);

// check meta-table
Expand Down
Loading