npx skills add https://github.com/tursodatabase/turso --skill cdcCDC 通过将变更记录写入专用的 CDC 表(默认为 turso_cdc)来跟踪数据库表上的 INSERT/UPDATE/DELETE 变更。它是按连接启用的,通过 PRAGMA 启用,并在字节码生成(翻译)层运行。同步引擎消费 CDC 记录以将本地变更推送到远程。
User SQL (INSERT/UPDATE/DELETE/DDL)
|
v
┌─────────────────────────────────────────────────┐
│ Translate layer (core/translate/) │
│ ┌───────────────────────────────────────────┐ │
│ │ prepare_cdc_if_necessary() │ │
│ │ - checks CaptureDataChangesInfo │ │
│ │ - opens CDC table cursor (OpenWrite) │ │
│ │ - skips if target == CDC table itself │ │
│ └───────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────┐ │
│ │ emit_cdc_insns() │ │
│ │ - writes (change_id, change_time, │ │
│ │ change_type, table_name, id, │ │
│ │ before, after, updates) into CDC tbl │ │
│ └───────────────────────────────────────────┘ │
│ + emit_cdc_full_record() / emit_cdc_patch_record() │
└─────────────────────────────────────────────────┘
|
v
CDC table (turso_cdc or custom name)
|
v
┌─────────────────────────────────────────────────┐
│ Sync engine (sync/engine/) │
│ DatabaseTape reads CDC table → DatabaseChange │
│ → apply/revert → push to remote │
└─────────────────────────────────────────────────┘
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
CaptureDataChangesMode + CaptureDataChangesInfo — core/lib.rsCDC 行为由两种类型控制:
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[repr(u8)]
enum CdcVersion {
V1 = 1,
V2 = 2,
}
const CDC_VERSION_CURRENT: CdcVersion = CdcVersion::V2;
enum CaptureDataChangesMode {
Id, // 仅捕获 rowid
Before, // 捕获前镜像
After, // 捕获后镜像
Full, // 前镜像 + 后镜像 + 更新
}
struct CaptureDataChangesInfo {
mode: CaptureDataChangesMode,
table: String, // CDC 表名
version: Option<CdcVersion>, // 架构版本 (V1 或 V2)
}
连接存储 Option<CaptureDataChangesInfo> — None 表示 CDC 关闭。
CdcVersion 上的关键方法:
has_commit_record() — self >= V2,控制 COMMIT 记录的生成Display/FromStr — 往返转换 "v1" ↔ V1,"v2" ↔ V2CaptureDataChangesInfo 上的关键方法:
parse(value: &str, version: Option<CdcVersion>) — 解析 PRAGMA 参数 "<mode>[,<table_name>]",对于 "off" 返回 Nonecdc_version() — 返回 CdcVersion(如果 version 为 None 则 panic)。替代旧的 is_v1()/is_v2()/version() 方法的单一访问器。has_before() / has_after() / has_updates() — 模式能力检查mode_name() — 以字符串形式返回模式便利特性 CaptureDataChangesExt 在 Option<CaptureDataChangesInfo> 上提供:
has_before() / has_after() / has_updates() — 委托给内部,对于 None 返回 falsetable() — 返回 Option<&str>,当 CDC 关闭时为 None默认表名:turso_cdc(常量 TURSO_CDC_DEFAULT_TABLE_NAME)
CREATE TABLE turso_cdc (
change_id INTEGER PRIMARY KEY AUTOINCREMENT,
change_time INTEGER, -- unixepoch()
change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE
table_name TEXT,
id <untyped>, -- 变更行的 rowid
before BLOB, -- 二进制记录(前镜像)
after BLOB, -- 二进制记录(后镜像)
updates BLOB -- 每列变更的二进制记录
);
CREATE TABLE turso_cdc (
change_id INTEGER PRIMARY KEY AUTOINCREMENT,
change_time INTEGER, -- unixepoch()
change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE, 2=COMMIT
table_name TEXT,
id <untyped>, -- 变更行的 rowid
before BLOB, -- 二进制记录(前镜像)
after BLOB, -- 二进制记录(后镜像)
updates BLOB, -- 每列变更的二进制记录
change_txn_id INTEGER -- 事务 ID(将行分组到事务中)
);
v2 新增:
change_txn_id 列 — 按事务对 CDC 行进行分组。通过 conn_txn_id(candidate) 操作码分配,该操作码获取或设置每个连接的事务 ID。change_type=2(COMMIT)记录 — 标记事务边界。在自动提交模式下每个语句生成一次,或在显式 COMMIT 时生成。CDC 表在运行时由 InitCdcVersion 操作码通过 CREATE TABLE IF NOT EXISTS 创建。
首次启用 CDC 时,会创建一个版本跟踪表:
CREATE TABLE turso_cdc_version (
table_name TEXT PRIMARY KEY,
version TEXT NOT NULL
);
当前版本:CDC_VERSION_CURRENT = CdcVersion::V2(定义于 core/lib.rs,从 core/translate/pragma.rs 重新导出)
InitCdcVersion 操作码通过在创建 CDC 表之前检查其是否存在来检测 v1 与 v2:
DatabaseChange — sync/engine/src/types.rs:229-249同步引擎对 CDC 行的 Rust 表示。具有用于向前/向后重放的 into_apply() 和 into_revert() 方法。
OperationMode — core/translate/emitter.rs由 emit_cdc_insns() 用于确定 change_type 值:
INSERT → 1UPDATE / SELECT → 0DELETE → -1COMMIT → 2(仅 v2,由 emit_cdc_commit_insns 生成)设置: core/translate/pragma.rs
CaptureDataChangesInfo::parse() 使用 CDC_VERSION_CURRENT 解析模式字符串InitCdcVersion 操作码 — 所有 CDC 设置(表创建、版本跟踪、状态变更)都在执行时发生获取(读取当前模式): core/translate/pragma.rs
mode、table、version("off", NULL, NULL)(mode_name, table, version)Pragma 注册: core/pragma.rs — CaptureDataChangesConn(以及已弃用的别名 UnstableCaptureDataChangesConn),列名为 ["mode", "table", "version"]
字段: core/connection.rs — capture_data_changes: RwLock<Option<CaptureDataChangesInfo>> 获取器: get_capture_data_changes_info() — 返回读锁 设置器: set_capture_data_changes_info(opts: Option<CaptureDataChangesInfo>) 默认值: 初始化为 None(CDC 关闭)
字段: core/vdbe/builder.rs — capture_data_changes_info: Option<CaptureDataChangesInfo> 访问器: capture_data_changes_info() — 返回 &Option<CaptureDataChangesInfo> 传递来源: core/translate/mod.rs — 创建 builder 时从连接读取
字段: core/vdbe/mod.rs — capture_data_changes: Option<CaptureDataChangesInfo> 设置来源: PrepareContext::from_connection() — 从 connection.get_capture_data_changes_info() 克隆
core/vdbe/execute.rs始终由 PRAGMA SET 生成。在执行时处理所有 CDC 设置:
None 存储在 state.pending_cdc_info 中,提前返回CREATE TABLE IF NOT EXISTS <cdc_table_name> ...)— 包含 change_txn_id 列的 v2 架构CREATE TABLE IF NOT EXISTS turso_cdc_version ...)INSERT OR IGNORE 保留现有版本行。CaptureDataChangesInfo 存储在 state.pending_cdc_info 中连接的 CDC 状态不在操作码中应用。相反,pending_cdc_info 仅在事务成功提交后在 halt() 中应用。这确保了原子性:如果任何步骤失败且事务回滚,连接的 CDC 状态保持不变。
所有表创建都是通过嵌套的 conn.prepare()/run_ignore_rows() 调用完成的,而不是通过字节码生成,因为 PRAGMA 计划不能包含针对架构中尚不存在的表的 DML。
这些是核心的 CDC 代码生成函数:
| 函数 | 目的 |
|---|---|
prepare_cdc_if_necessary() | 如果 CDC 激活且目标 != CDC 表,则打开 CDC 表游标 |
emit_cdc_full_record() | 从游标读取所有列到 MakeRecord 中(用于前/后镜像) |
emit_cdc_patch_record() | 从运行中的寄存器值构建记录(用于 INSERT/UPDATE 的后镜像) |
emit_cdc_insns() | 为每个变更行写入单个 CDC 行(INSERT/UPDATE/DELETE)。在 DML 循环内每行调用。 |
emit_cdc_commit_insns() | 将 COMMIT 记录(change_type=2)写入 CDC 表(仅 v2)。原始生成,无自动提交检查。 |
emit_cdc_autocommit_commit() | 语句结束时的 COMMIT 生成。在运行时检查 is_autocommit() — 仅在自动提交模式下生成 COMMIT。仅 v2。 |
每行调用点使用 emit_cdc_insns()(无 COMMIT)。语句结束点调用 emit_cdc_autocommit_commit(),该函数在运行时检查 is_autocommit():
BEGIN...COMMIT): 跳过每语句 COMMIT;显式 COMMIT 语句通过 emit_cdc_commit_insns() 生成 COMMIT 记录这确保了像 INSERT INTO t VALUES (1),(2),(3) 这样的多行语句在结束时产生一个 COMMIT,而不是每行一个。
core/translate/insert.rsemit_cdc_insns()emit_epilogue() 中调用 emit_cdc_autocommit_commit()core/translate/emitter.rsemit_cdc_insns()emit_cdc_autocommit_commit()core/translate/emitter.rsemit_cdc_insns()emit_cdc_autocommit_commit()core/translate/upsert.rsemit_cdc_insns():纯插入、冲突后更新、替换core/translate/schema.rsemit_cdc_insns()(插入到 sqlite_schema)+ emit_cdc_autocommit_commit()emit_cdc_insns() + 循环后调用 emit_cdc_autocommit_commit()emit_cdc_insns() + emit_cdc_autocommit_commit()(core/translate/schema.rs)emit_cdc_insns() + 循环后调用 emit_cdc_autocommit_commit()(core/translate/index.rs)显式事务中的 DDL(BEGIN; CREATE TABLE t(x); COMMIT)不会生成每语句 COMMIT — 自动提交检查会阻止它。
core/translate/update.rscdc_update_alter_statementcore/translate/view.rs — 为 CDC 游标传递 Nonecore/translate/trigger.rs — 为 CDC 游标传递 Nonecore/translate/subquery.rs — cdc_cursor_id: Nonetable_columns_json_array(table_name) — core/function.rs, core/vdbe/execute.rs返回表的列名 JSON 数组。用于解释二进制记录。
bin_record_json_object(columns_json, blob) — core/function.rs, core/vdbe/execute.rs使用列名将二进制记录(来自 before/after/updates 列)解码为 JSON 对象。
同步引擎是 CDC 数据的主要消费者。
sync/engine/src/database_tape.rsDEFAULT_CDC_TABLE_NAME = "turso_cdc",DEFAULT_CDC_MODE = "full"CDC_PRAGMA_NAME = "capture_data_changes_conn"connect() 设置 CDC pragma 并从 turso_cdc_version 表缓存 cdc_version。必须在 iterate_changes() 之前调用。cdc_version: RwLock<Option<CdcVersion>> — 由 connect() 设置,由 iterate_changes() 读取。如果未设置则 panic。DatabaseChangesIterator 批量读取 CDC 表,生成 DatabaseTapeOperation。对于 v2,生成表中的真实 COMMIT 记录。对于 v1,在批次末尾追加一个合成的 Commit。ignore_schema_changes: true(默认)过滤掉 sqlite_schema 行变更但不包括 COMMIT 记录。sync/engine/src/database_sync_operations.rsSELECT COUNT(*) FROM turso_cdc WHERE change_id > ?sync/engine/src/database_sync_engine.rsopen_db() 调用 main_tape.connect(coro) 以确保在任何 iterate_changes() 调用之前设置 CDC 并缓存版本。apply_changes 期间,检查 CDC 表是否存在,在同步后重新创建它sync/engine/src/database_replay_generator.rsupdates 列已填充(完整模式)所有绑定都将 cdc_operations 作为同步统计信息的一部分公开:
| 绑定 | 文件 |
|---|---|
| Python | bindings/python/src/turso_sync.rs |
| JavaScript | bindings/javascript/sync/src/lib.rs |
| JS(生成器) | bindings/javascript/sync/src/generator.rs |
| Go | bindings/go/bindings_sync.go |
| React Native | bindings/react-native/src/types.ts |
| SDK Kit(C 头文件) | sync/sdk-kit/turso_sync.h |
| SDK Kit(Rust) | sync/sdk-kit/src/bindings.rs |
tests/integration/functions/test_cdc.rs — 涵盖所有模式、CRUD、事务、架构变更、版本表、向后兼容性。在 tests/integration/functions/mod.rs 中注册。sync/engine/src/database_tape.rs — CDC 表读取、磁带迭代、架构变更重放。bindings/javascript/sync/packages/{wasm,native}/promise.test.ts运行:cargo test -- test_cdc(集成)或 cargo test -p turso_sync_engine -- database_tape(同步引擎)。
cli/manuals/cdc.md — 可通过 REPL 中的 .manual cdc 访问docs/manual.md — 目录中链接的 CDC 部分turso_cdc_version 表的变更从不捕获(在 prepare_cdc_if_necessary 中检查)。sqlite_schema 表的变更。turso_cdc_version 表中,并在 CaptureDataChangesInfo.version 中携带,用于未来的架构演进。ProgramState 中的 pending_cdc_info 延迟应用,仅在 Halt 时应用。如果 PRAGMA 的磁盘写入失败且事务回滚,则连接状态保持不变。is_autocommit() 的 emit_cdc_autocommit_commit()。在显式事务中,只有最终的 COMMIT 生成 COMMIT CDC 记录。turso_cdc_version)通过在创建前检查表存在性来检测。现有表将 CdcVersion::V1 插入版本表。#[repr(u8)] 和 Ord/PartialOrd 的 CdcVersion 枚举支持通过整数比较进行功能门控(has_commit_record() = self >= V2)。Display/FromStr 处理数据库往返。每周安装量
92
仓库
GitHub 星标数
18.1K
首次出现
2026年2月17日
安全审计
安装于
claude-code84
codex82
opencode81
github-copilot81
gemini-cli81
cursor80
CDC tracks INSERT/UPDATE/DELETE changes on database tables by writing change records into a dedicated CDC table (turso_cdc by default). It is per-connection, enabled via PRAGMA, and operates at the bytecode generation (translate) layer. The sync engine consumes CDC records to push local changes to the remote.
User SQL (INSERT/UPDATE/DELETE/DDL)
|
v
┌─────────────────────────────────────────────────┐
│ Translate layer (core/translate/) │
│ ┌───────────────────────────────────────────┐ │
│ │ prepare_cdc_if_necessary() │ │
│ │ - checks CaptureDataChangesInfo │ │
│ │ - opens CDC table cursor (OpenWrite) │ │
│ │ - skips if target == CDC table itself │ │
│ └───────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────┐ │
│ │ emit_cdc_insns() │ │
│ │ - writes (change_id, change_time, │ │
│ │ change_type, table_name, id, │ │
│ │ before, after, updates) into CDC tbl │ │
│ └───────────────────────────────────────────┘ │
│ + emit_cdc_full_record() / emit_cdc_patch_record() │
└─────────────────────────────────────────────────┘
|
v
CDC table (turso_cdc or custom name)
|
v
┌─────────────────────────────────────────────────┐
│ Sync engine (sync/engine/) │
│ DatabaseTape reads CDC table → DatabaseChange │
│ → apply/revert → push to remote │
└─────────────────────────────────────────────────┘
CaptureDataChangesMode + CaptureDataChangesInfo — core/lib.rsCDC behavior is controlled by two types:
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[repr(u8)]
enum CdcVersion {
V1 = 1,
V2 = 2,
}
const CDC_VERSION_CURRENT: CdcVersion = CdcVersion::V2;
enum CaptureDataChangesMode {
Id, // capture only rowid
Before, // capture before-image
After, // capture after-image
Full, // before + after + updates
}
struct CaptureDataChangesInfo {
mode: CaptureDataChangesMode,
table: String, // CDC table name
version: Option<CdcVersion>, // schema version (V1 or V2)
}
The connection stores Option<CaptureDataChangesInfo> — None means CDC is off.
Key methods on CdcVersion:
has_commit_record() — self >= V2, gates COMMIT record emissionDisplay/FromStr — round-trips "v1" ↔ V1, "v2" ↔ V2Key methods on CaptureDataChangesInfo:
parse(value: &str, version: Option<CdcVersion>) — parses PRAGMA argument "<mode>[,<table_name>]", returns None for "off"cdc_version() — returns CdcVersion (panics if version is None). Single accessor replacing old is_v1()/is_v2()/version() methods.has_before() / has_after() / has_updates() — mode capability checksConvenience trait CaptureDataChangesExt on Option<CaptureDataChangesInfo> provides:
has_before() / has_after() / has_updates() — delegates to inner, returns false for Nonetable() — returns Option<&str>, None when CDC is offDefault table name: turso_cdc (constant TURSO_CDC_DEFAULT_TABLE_NAME)
CREATE TABLE turso_cdc (
change_id INTEGER PRIMARY KEY AUTOINCREMENT,
change_time INTEGER, -- unixepoch()
change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE
table_name TEXT,
id <untyped>, -- rowid of changed row
before BLOB, -- binary record (before-image)
after BLOB, -- binary record (after-image)
updates BLOB -- binary record of per-column changes
);
CREATE TABLE turso_cdc (
change_id INTEGER PRIMARY KEY AUTOINCREMENT,
change_time INTEGER, -- unixepoch()
change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE, 2=COMMIT
table_name TEXT,
id <untyped>, -- rowid of changed row
before BLOB, -- binary record (before-image)
after BLOB, -- binary record (after-image)
updates BLOB, -- binary record of per-column changes
change_txn_id INTEGER -- transaction ID (groups rows into transactions)
);
v2 adds:
change_txn_id column — groups CDC rows by transaction. Assigned via conn_txn_id(candidate) opcode which get-or-sets a per-connection transaction ID.change_type=2 (COMMIT) records — mark transaction boundaries. Emitted once per statement in autocommit mode, or on explicit COMMIT.The CDC table is created at runtime by the InitCdcVersion opcode via CREATE TABLE IF NOT EXISTS.
When CDC is first enabled, a version tracking table is created:
CREATE TABLE turso_cdc_version (
table_name TEXT PRIMARY KEY,
version TEXT NOT NULL
);
Current version: CDC_VERSION_CURRENT = CdcVersion::V2 (defined in core/lib.rs, re-exported from core/translate/pragma.rs)
The InitCdcVersion opcode detects v1 vs v2 by checking whether the CDC table already exists before creating it:
DatabaseChange — sync/engine/src/types.rs:229-249Sync engine's Rust representation of a CDC row. Has into_apply() and into_revert() methods for forward/backward replay.
OperationMode — core/translate/emitter.rsUsed by emit_cdc_insns() to determine change_type value:
INSERT → 1UPDATE / SELECT → 0DELETE → -1COMMIT → 2 (v2 only, emitted by emit_cdc_commit_insns)Set: core/translate/pragma.rs
CaptureDataChangesInfo::parse() with CDC_VERSION_CURRENTInitCdcVersion opcode — all CDC setup (table creation, version tracking, state change) happens at execution timeGet (read current mode): core/translate/pragma.rs
mode, table, version("off", NULL, NULL)(mode_name, table, version)Pragma registration: core/pragma.rs — CaptureDataChangesConn (and deprecated alias UnstableCaptureDataChangesConn) with columns ["mode", "table", "version"]
Field: core/connection.rs — capture_data_changes: RwLock<Option<CaptureDataChangesInfo>> Getter: get_capture_data_changes_info() — returns read guard Setter: set_capture_data_changes_info(opts: Option<CaptureDataChangesInfo>) Default: initialized as None (CDC off)
Field: core/vdbe/builder.rs — capture_data_changes_info: Option<CaptureDataChangesInfo> Accessor: capture_data_changes_info() — returns &Option<CaptureDataChangesInfo> Passed from: core/translate/mod.rs — read from connection when creating builder
Field: core/vdbe/mod.rs — capture_data_changes: Option<CaptureDataChangesInfo> Set from: PrepareContext::from_connection() — clones from connection.get_capture_data_changes_info()
core/vdbe/execute.rsAlways emitted by PRAGMA SET. Handles all CDC setup at execution time:
None in state.pending_cdc_info, returns earlyCREATE TABLE IF NOT EXISTS <cdc_table_name> ...) — v2 schema with change_txn_id columnCREATE TABLE IF NOT EXISTS turso_cdc_version ...)INSERT OR IGNORE to preserve existing version rows.CaptureDataChangesInfo in state.pending_cdc_infoThe connection's CDC state is not applied in the opcode. Instead, pending_cdc_info is applied in halt() only after the transaction commits successfully. This ensures atomicity: if any step fails and the transaction rolls back, the connection's CDC state remains unchanged.
All table creation is done via nested conn.prepare()/run_ignore_rows() calls rather than bytecode emission, because the PRAGMA plan can't contain DML against tables that don't exist yet in the schema.
These are the core CDC code generation functions:
| Function | Purpose |
|---|---|
prepare_cdc_if_necessary() | Opens CDC table cursor if CDC is active and target != CDC table |
emit_cdc_full_record() | Reads all columns from cursor into a MakeRecord (for before/after image) |
emit_cdc_patch_record() | Builds record from in-flight register values (for after-image of INSERT/UPDATE) |
emit_cdc_insns() | Writes a single CDC row per changed row (INSERT/UPDATE/DELETE). Called per-row inside DML loops. |
emit_cdc_commit_insns() | Writes a COMMIT record (change_type=2) into CDC table (v2 only). Raw emission, no autocommit check. |
Per-row call sites use emit_cdc_insns() (no COMMIT). End-of-statement sites call emit_cdc_autocommit_commit() which checks is_autocommit() at runtime:
BEGIN...COMMIT): skips per-statement COMMIT; the explicit COMMIT statement emits the COMMIT record via emit_cdc_commit_insns()This ensures multi-row statements like INSERT INTO t VALUES (1),(2),(3) produce one COMMIT at the end, not one per row.
core/translate/insert.rsemit_cdc_insns() after insert, and before delete for REPLACE/conflictemit_cdc_autocommit_commit() in emit_epilogue() after the insert loopcore/translate/emitter.rsemit_cdc_insns()emit_cdc_autocommit_commit() after the update loopcore/translate/emitter.rsemit_cdc_insns()emit_cdc_autocommit_commit() after the delete loopcore/translate/upsert.rsemit_cdc_insns() for all three cases: pure insert, update after conflict, replacecore/translate/schema.rsemit_cdc_insns() (insert into sqlite_schema) + emit_cdc_autocommit_commit()emit_cdc_insns() per-row in metadata loop + emit_cdc_autocommit_commit() after loopemit_cdc_insns() + emit_cdc_autocommit_commit() (core/translate/schema.rs)emit_cdc_insns() per-row + after loop ()DDL in explicit transactions (BEGIN; CREATE TABLE t(x); COMMIT) does NOT emit per-statement COMMIT — the autocommit check prevents it.
core/translate/update.rscdc_update_alter_statement on the update plan when CDC has updates modecore/translate/view.rs — passes None for CDC cursorcore/translate/trigger.rs — passes None for CDC cursorcore/translate/subquery.rs — cdc_cursor_id: Nonetable_columns_json_array(table_name) — core/function.rs, core/vdbe/execute.rsReturns JSON array of column names for a table. Used to interpret binary records.
bin_record_json_object(columns_json, blob) — core/function.rs, core/vdbe/execute.rsDecodes a binary record (from before/after/updates columns) into a JSON object using column names.
The sync engine is the primary consumer of CDC data.
sync/engine/src/database_tape.rsDEFAULT_CDC_TABLE_NAME = "turso_cdc", DEFAULT_CDC_MODE = "full"CDC_PRAGMA_NAME = "capture_data_changes_conn"connect() sets CDC pragma and caches cdc_version from turso_cdc_version table. Must be called before iterate_changes().cdc_version: RwLock<Option<CdcVersion>> — set by connect(), read by . Panics if not set.sync/engine/src/database_sync_operations.rsSELECT COUNT(*) FROM turso_cdc WHERE change_id > ?sync/engine/src/database_sync_engine.rsopen_db() calls main_tape.connect(coro) to ensure CDC is set up and version is cached before any iterate_changes() calls.apply_changes, checks if CDC table existed, re-creates it after syncsync/engine/src/database_replay_generator.rsupdates column to be populated (full mode)All bindings expose cdc_operations as part of sync stats:
| Binding | File |
|---|---|
| Python | bindings/python/src/turso_sync.rs |
| JavaScript | bindings/javascript/sync/src/lib.rs |
| JS (generator) | bindings/javascript/sync/src/generator.rs |
| Go | bindings/go/bindings_sync.go |
| React Native | bindings/react-native/src/types.ts |
| SDK Kit (C header) | sync/sdk-kit/turso_sync.h |
tests/integration/functions/test_cdc.rs — covers all modes, CRUD, transactions, schema changes, version table, backward compatibility. Registered in tests/integration/functions/mod.rs.sync/engine/src/database_tape.rs — CDC table reads, tape iteration, replay of schema changes.bindings/javascript/sync/packages/{wasm,native}/promise.test.tsRun: cargo test -- test_cdc (integration) or cargo test -p turso_sync_engine -- database_tape (sync engine).
cli/manuals/cdc.md — accessible via .manual cdc in the REPLdocs/manual.md — CDC section linked in TOCturso_cdc_version table are never captured (checked in prepare_cdc_if_necessary).sqlite_schema table.turso_cdc_version table and carried in CaptureDataChangesInfo.version for future schema evolution.pending_cdc_info in and applied only at Halt. If the PRAGMA's disk writes fail and the transaction rolls back, the connection state stays unchanged.Weekly Installs
92
Repository
GitHub Stars
18.1K
First Seen
Feb 17, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
claude-code84
codex82
opencode81
github-copilot81
gemini-cli81
cursor80
Kotlin Exposed ORM 模式指南:DSL查询、DAO、事务管理与生产配置
1,100 周安装
Lambda Labs GPU云:按需GPU实例与一键集群,专为AI/机器学习训练优化
206 周安装
Claude Code 状态行生成器 | 多行布局、成本跟踪、Git 状态显示与自定义工具
211 周安装
代码原则审计工具:自动化检测DRY、KISS、YAGNI违规,提升代码质量与架构合规性
218 周安装
AI 3D模型生成工具 - 文本/图像转3D模型,支持游戏、电商、建筑等行业
215 周安装
JavaScript/TypeScript 开发实战指南:配置、类型模式与现代编程技巧
208 周安装
DOCX文件处理全攻略:创建、编辑、分析Word文档的自动化与编程技巧
208 周安装
mode_name() — returns mode as stringemit_cdc_autocommit_commit()End-of-statement COMMIT emission. Checks is_autocommit() at runtime — only emits COMMIT if in autocommit mode. v2 only. |
emit_cdc_autocommit_commit()core/translate/index.rsiterate_changes()DatabaseChangesIterator reads CDC table in batches, emits DatabaseTapeOperation. For v2, real COMMIT records from the table are emitted. For v1, a synthetic Commit is appended at end of batch. ignore_schema_changes: true (default) filters out sqlite_schema row changes but not COMMIT records.| SDK Kit (Rust) | sync/sdk-kit/src/bindings.rs |
ProgramStateemit_cdc_autocommit_commit() which checks is_autocommit() at runtime. In explicit transactions, only the final COMMIT emits a COMMIT CDC record.turso_cdc_version) are detected by checking table existence before creation. Existing tables get CdcVersion::V1 inserted into the version table.CdcVersion enum with #[repr(u8)] and Ord/PartialOrd enables feature gating via integer comparison (has_commit_record() = self >= V2). Display/FromStr handles database round-trip.