kysely by bobmatnyc/claude-mpm-skills
npx skills add https://github.com/bobmatnyc/claude-mpm-skills --skill kyselyKysely 是一个类型安全的 TypeScript SQL 查询构建器,提供从数据库模式到查询结果的端到端类型安全。与 ORM 不同,它生成纯 SQL 并让您保持完全控制,同时保持完美的 TypeScript 类型推断。
主要特性 :
安装 :
npm install kysely
# 数据库驱动(选择一个)
npm install pg # PostgreSQL
npm install mysql2 # MySQL
npm install better-sqlite3 # SQLite
import { Generated, Selectable, Insertable, Updateable } from 'kysely';
// 表接口(所有列)
interface UserTable {
id: Generated<number>;
email: string;
name: string | null;
created_at: Generated<Date>;
updated_at: Date;
}
interface PostTable {
id: Generated<number>;
user_id: number;
title: string;
content: string;
published: Generated<boolean>;
created_at: Generated<Date>;
}
// 数据库接口
interface Database {
users: UserTable;
posts: PostTable;
}
// 类型安全的查询结果类型
type User = Selectable<UserTable>;
type NewUser = Insertable<UserTable>;
type UserUpdate = Updateable<UserTable>;
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
import { Kysely, PostgresDialect } from 'kysely';
import { Pool } from 'pg';
const db = new Kysely<Database>({
dialect: new PostgresDialect({
pool: new Pool({
host: process.env.DB_HOST,
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
max: 10,
}),
}),
});
// SELECT 查询,具有完整的类型推断
const users = await db
.selectFrom('users')
.select(['id', 'email', 'name'])
.where('created_at', '>', new Date('2024-01-01'))
.execute();
// 类型: Array<{ id: number; email: string; name: string | null }>
// INSERT 查询,具有类型检查
const newUser: NewUser = {
email: 'alice@example.com',
name: 'Alice',
updated_at: new Date(),
};
const inserted = await db
.insertInto('users')
.values(newUser)
.returningAll()
.executeTakeFirstOrThrow();
// 类型: User
// UPDATE 查询
await db
.updateTable('users')
.set({ name: 'Alice Updated', updated_at: new Date() })
.where('id', '=', 1)
.execute();
// DELETE 查询
await db
.deleteFrom('users')
.where('email', 'like', '%@spam.com')
.execute();
// INNER JOIN
const usersWithPosts = await db
.selectFrom('users')
.innerJoin('posts', 'posts.user_id', 'users.id')
.select([
'users.id',
'users.name',
'posts.title',
'posts.content',
])
.execute();
// 类型: Array<{ id: number; name: string | null; title: string; content: string }>
// LEFT JOIN 与空值处理
const usersWithOptionalPosts = await db
.selectFrom('users')
.leftJoin('posts', 'posts.user_id', 'users.id')
.select([
'users.id',
'users.email',
'posts.title', // 类型: string | null (来自 LEFT JOIN)
])
.execute();
// 多重连接
const complexQuery = await db
.selectFrom('posts')
.innerJoin('users', 'users.id', 'posts.user_id')
.leftJoin('comments', 'comments.post_id', 'posts.id')
.select([
'posts.id as postId',
'posts.title',
'users.name as authorName',
'comments.id as commentId',
])
.execute();
import { sql } from 'kysely';
// COUNT, AVG, SUM
const stats = await db
.selectFrom('posts')
.select([
'user_id',
db.fn.count<number>('id').as('post_count'),
db.fn.avg<number>('views').as('avg_views'),
])
.groupBy('user_id')
.having(db.fn.count('id'), '>', 5)
.execute();
// 类型: Array<{ user_id: number; post_count: number; avg_views: number }>
// 使用原始 SQL 的复杂聚合
const advanced = await db
.selectFrom('users')
.select([
'users.id',
sql<number>`COUNT(DISTINCT posts.id)`.as('total_posts'),
sql<Date>`MAX(posts.created_at)`.as('latest_post'),
])
.leftJoin('posts', 'posts.user_id', 'users.id')
.groupBy('users.id')
.execute();
// 标量子查询
const usersWithPostCount = await db
.selectFrom('users')
.select([
'users.id',
'users.name',
(eb) =>
eb
.selectFrom('posts')
.select(eb.fn.count<number>('id').as('count'))
.whereRef('posts.user_id', '=', 'users.id')
.as('post_count'),
])
.execute();
// EXISTS 子查询
const activeUsers = await db
.selectFrom('users')
.selectAll()
.where((eb) =>
eb.exists(
eb
.selectFrom('posts')
.select('id')
.whereRef('posts.user_id', '=', 'users.id')
.where('created_at', '>', new Date('2024-01-01'))
)
)
.execute();
// IN 子查询
const usersInTopTier = await db
.selectFrom('users')
.selectAll()
.where(
'id',
'in',
db.selectFrom('posts')
.select('user_id')
.groupBy('user_id')
.having(db.fn.count('id'), '>', 100)
)
.execute();
// WITH 子句
const result = await db
.with('popular_posts', (db) =>
db
.selectFrom('posts')
.select(['id', 'user_id', 'title'])
.where('views', '>', 1000)
)
.with('active_users', (db) =>
db
.selectFrom('users')
.select(['id', 'email'])
.where('last_login', '>', new Date('2024-01-01'))
)
.selectFrom('popular_posts')
.innerJoin('active_users', 'active_users.id', 'popular_posts.user_id')
.selectAll()
.execute();
// 递归 CTE(组织层次结构)
interface OrgNode {
id: number;
name: string;
parent_id: number | null;
level: number;
}
const hierarchy = await db
.withRecursive('org_tree', (db) =>
db
.selectFrom('departments')
.select(['id', 'name', 'parent_id', sql<number>`0`.as('level')])
.where('parent_id', 'is', null)
.unionAll(
db
.selectFrom('departments')
.innerJoin('org_tree', 'org_tree.id', 'departments.parent_id')
.select([
'departments.id',
'departments.name',
'departments.parent_id',
sql<number>`org_tree.level + 1`.as('level'),
])
)
)
.selectFrom('org_tree')
.selectAll()
.execute();
# 安装
npm install --save-dev kysely-codegen
# 从现有数据库生成类型
npx kysely-codegen --url "postgresql://user:pass@localhost:5432/mydb"
生成的输出:
// 由 kysely-codegen 生成
import type { ColumnType, Generated } from 'kysely';
export interface Database {
users: UsersTable;
posts: PostsTable;
comments: CommentsTable;
}
export interface UsersTable {
id: Generated<number>;
email: string;
name: string | null;
created_at: Generated<Date>;
}
export interface PostsTable {
id: Generated<number>;
user_id: number;
title: string;
content: string;
published: Generated<boolean>;
created_at: Generated<Date>;
}
// 将数据库类型映射到 TypeScript 类型
interface CustomTypes {
timestamp: Date;
jsonb: unknown;
numeric: string; // 保留精度
uuid: string;
}
interface ProductTable {
id: ColumnType<string, string | undefined, string>; // SELECT, INSERT, UPDATE 类型
metadata: ColumnType<Record<string, unknown>, string, string>; // JSON 列
price: ColumnType<number, number, number | undefined>; // 数值类型
}
import { Kysely, Migrator, FileMigrationProvider } from 'kysely';
import { promises as fs } from 'fs';
import * as path from 'path';
const migrator = new Migrator({
db,
provider: new FileMigrationProvider({
fs,
path,
migrationFolder: path.join(__dirname, 'migrations'),
}),
});
// 运行所有待处理的迁移
async function migrateToLatest() {
const { error, results } = await migrator.migrateToLatest();
results?.forEach((it) => {
if (it.status === 'Success') {
console.log(`迁移 "${it.migrationName}" 执行成功`);
} else if (it.status === 'Error') {
console.error(`迁移 "${it.migrationName}" 失败`);
}
});
if (error) {
console.error('迁移失败:', error);
process.exit(1);
}
}
// 回滚最后一次迁移
async function migrateDown() {
const { error, results } = await migrator.migrateDown();
// 处理结果...
}
// migrations/001_create_users.ts
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.createTable('users')
.addColumn('id', 'serial', (col) => col.primaryKey())
.addColumn('email', 'varchar(255)', (col) => col.notNull().unique())
.addColumn('name', 'varchar(255)')
.addColumn('created_at', 'timestamp', (col) =>
col.defaultTo(sql`CURRENT_TIMESTAMP`).notNull()
)
.execute();
await db.schema
.createIndex('users_email_idx')
.on('users')
.column('email')
.execute();
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.dropTable('users').execute();
}
// 添加外键
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.createTable('posts')
.addColumn('id', 'serial', (col) => col.primaryKey())
.addColumn('user_id', 'integer', (col) =>
col.references('users.id').onDelete('cascade').notNull()
)
.addColumn('title', 'varchar(500)', (col) => col.notNull())
.addColumn('content', 'text')
.execute();
}
// 修改表
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.alterTable('users')
.addColumn('bio', 'text')
.execute();
await db.schema
.alterTable('users')
.modifyColumn('email', 'varchar(320)')
.execute();
}
// 添加枚举列(PostgreSQL)
export async function up(db: Kysely<any>): Promise<void> {
await sql`CREATE TYPE user_role AS ENUM ('admin', 'user', 'guest')`.execute(db);
await db.schema
.alterTable('users')
.addColumn('role', sql`user_role`, (col) => col.defaultTo('user'))
.execute();
}
// 出错时自动回滚
await db.transaction().execute(async (trx) => {
await trx
.insertInto('users')
.values({ email: 'alice@example.com', name: 'Alice', updated_at: new Date() })
.execute();
await trx
.insertInto('posts')
.values({ user_id: 1, title: 'First Post', content: 'Hello' })
.execute();
});
// 手动事务控制
const trx = await db.transaction().execute(async (trx) => {
const user = await trx
.insertInto('users')
.values({ email: 'bob@example.com', name: 'Bob', updated_at: new Date() })
.returningAll()
.executeTakeFirstOrThrow();
const post = await trx
.insertInto('posts')
.values({
user_id: user.id,
title: 'Bob\'s Post',
content: 'Content',
})
.returningAll()
.executeTakeFirstOrThrow();
return { user, post };
});
import { IsolationLevel } from 'kysely';
// 读已提交(默认)
await db.transaction()
.setIsolationLevel('read committed')
.execute(async (trx) => {
// 事务逻辑
});
// 可序列化(最强隔离级别)
await db.transaction()
.setIsolationLevel('serializable')
.execute(async (trx) => {
const balance = await trx
.selectFrom('accounts')
.select('balance')
.where('id', '=', accountId)
.executeTakeFirstOrThrow();
await trx
.updateTable('accounts')
.set({ balance: balance.balance - amount })
.where('id', '=', accountId)
.execute();
});
import { sql } from 'kysely';
// SELECT 中的原始 SQL
const result = await db
.selectFrom('users')
.select([
'id',
sql<string>`UPPER(name)`.as('uppercase_name'),
sql<number>`EXTRACT(YEAR FROM created_at)`.as('year_created'),
])
.execute();
// WHERE 中的原始 SQL
const filtered = await db
.selectFrom('posts')
.selectAll()
.where(sql`LOWER(title)`, 'like', '%typescript%')
.execute();
// 复杂的原始查询
const custom = await sql<{ total: number; avg_age: number }>`
SELECT
COUNT(*) as total,
AVG(EXTRACT(YEAR FROM age(birth_date))) as avg_age
FROM users
WHERE active = true
`.execute(db);
// 执行任意 SQL
const result = await sql`
WITH ranked_posts AS (
SELECT
p.*,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY views DESC) as rank
FROM posts p
)
SELECT * FROM ranked_posts WHERE rank <= 3
`.execute(db);
// 参数化原始查询
const email = 'alice@example.com';
const user = await sql<User>`
SELECT * FROM users WHERE email = ${email}
`.execute(db);
import { jsonBuildObject, jsonArrayFrom } from 'kysely/helpers/postgres';
// 构建 JSON 对象
const usersWithPosts = await db
.selectFrom('users')
.select([
'users.id',
'users.name',
jsonArrayFrom(
db
.selectFrom('posts')
.select(['posts.id', 'posts.title', 'posts.content'])
.whereRef('posts.user_id', '=', 'users.id')
).as('posts'),
])
.execute();
// 结果: { id: 1, name: "Alice", posts: [{ id: 1, title: "..." }] }
// JSON 聚合
const nested = await db
.selectFrom('users')
.select([
'users.id',
jsonBuildObject({
name: 'users.name',
email: 'users.email',
postCount: sql<number>`(SELECT COUNT(*) FROM posts WHERE user_id = users.id)`,
}).as('user_data'),
])
.execute();
import { SelectQueryBuilder } from 'kysely';
function paginate<DB, TB extends keyof DB, O>(
query: SelectQueryBuilder<DB, TB, O>,
page: number,
pageSize: number
) {
return query.limit(pageSize).offset((page - 1) * pageSize);
}
// 用法
const page = 2;
const pageSize = 20;
const users = await paginate(
db.selectFrom('users').selectAll(),
page,
pageSize
).execute();
// 带总数统计
async function paginateWithCount<DB, TB extends keyof DB, O>(
query: SelectQueryBuilder<DB, TB, O>,
page: number,
pageSize: number
) {
const [items, { count }] = await Promise.all([
query.limit(pageSize).offset((page - 1) * pageSize).execute(),
query.select(db.fn.count<number>('id').as('count')).executeTakeFirstOrThrow(),
]);
return {
items,
total: count,
page,
pageSize,
totalPages: Math.ceil(count / pageSize),
};
}
// 全文搜索的 GIN 索引
export async function up(db: Kysely<any>): Promise<void> {
await sql`
ALTER TABLE posts
ADD COLUMN search_vector tsvector
GENERATED ALWAYS AS (
to_tsvector('english', coalesce(title, '') || ' ' || coalesce(content, ''))
) STORED
`.execute(db);
await sql`
CREATE INDEX posts_search_idx ON posts USING GIN (search_vector)
`.execute(db);
}
// 全文搜索查询
const searchResults = await db
.selectFrom('posts')
.selectAll()
.where(
sql`search_vector`,
'@@',
sql`to_tsquery('english', ${query})`
)
.execute();
| 功能 | Kysely | Drizzle | Prisma |
|---|---|---|---|
| 类型安全 | 完整(模式 → 查询) | 完整(模式 → 查询) | 完整(生成客户端) |
| SQL 控制 | ✅ 原生 SQL 友好 | ✅ 原生 SQL 友好 | ❌ 有限 |
| 包大小 | ~50kB | ~30kB | ~500kB+ |
| 迁移系统 | ✅ 内置 | ✅ 内置 | ✅ 强大的 CLI |
| 查询性能 | ✅ 纯 SQL | ✅ 纯 SQL | ❌ 较慢(抽象层) |
| 模式定义 | TypeScript 类型 | TypeScript 模式 | Prisma 模式 |
| 需要代码生成 | 可选 | 否 | ✅ 必需 |
| ORM 功能 | ❌ 仅查询构建器 | 部分(关系型) | ✅ 完整 ORM |
| 学习曲线 | 中等(需要 SQL 知识) | 中等 | 简单(抽象 SQL) |
| 最适合 | SQL 优先,复杂查询 | 类型安全模式 | 快速原型开发 |
✅ 选择 Kysely 当:
❌ 选择 Drizzle 当:
❌ 选择 Prisma 当:
// Prisma
const users = await prisma.user.findMany({
where: { createdAt: { gte: new Date('2024-01-01') } },
include: { posts: true },
});
// Kysely 等效实现
const users = await db
.selectFrom('users')
.select([
'users.id',
'users.email',
jsonArrayFrom(
db.selectFrom('posts')
.selectAll()
.whereRef('posts.user_id', '=', 'users.id')
).as('posts'),
])
.where('created_at', '>=', new Date('2024-01-01'))
.execute();
Generated、Selectable、Insertable、Updateable❌ 忘记执行查询 :
// 错误 - 返回查询构建器,而不是结果
const users = db.selectFrom('users').selectAll();
// 正确
const users = await db.selectFrom('users').selectAll().execute();
❌ 未处理 LEFT JOIN 的空值 :
// TypeScript 知道来自 LEFT JOIN 的 posts.title 可能为空
const result = await db
.selectFrom('users')
.leftJoin('posts', 'posts.user_id', 'users.id')
.select(['users.name', 'posts.title'])
.execute();
// posts.title 类型: string | null
❌ 为自增列缺少 Generated 类型 :
// 错误 - TypeScript 将在 INSERT 中要求 'id'
interface UserTable {
id: number; // 错误!
}
// 正确
interface UserTable {
id: Generated<number>; // INSERT 不需要 id
}
使用 Kysely 时,请考虑以下补充技能:
// Kysely 利用高级 TypeScript 功能
import { Kysely, Generated, ColumnType } from 'kysely';
// 带有 Generated 类型的数据库接口
interface Database {
users: {
id: Generated<number>; // 由数据库自动生成
email: string;
created_at: ColumnType<Date, string | undefined, never>;
// ColumnType<SelectType, InsertType, UpdateType>
};
}
// 查询中的类型推断
const db = new Kysely<Database>({ /* config */ });
// 完整的类型安全 - TypeScript 知道返回类型
const users = await db
.selectFrom('users')
.select(['id', 'email'])
.where('created_at', '>', new Date('2025-01-01'))
.execute();
// 类型: Array<{ id: number; email: string }>
// 动态查询的条件类型
type SelectFields<T> = {
[K in keyof T]: T[K] extends ColumnType<infer S, any, any> ? S : T[K];
};
安全迁移原则:
Kysely 迁移示例:
// migrations/001_add_full_name.ts
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
// 阶段 1: 添加新列(初始可为空)
await db.schema
.alterTable('users')
.addColumn('full_name', 'varchar(255)')
.execute();
// 阶段 2: 回填数据
await db
.updateTable('users')
.set({
full_name: sql`concat(first_name, ' ', last_name)`
})
.execute();
// 阶段 3: 设为必需(建议单独迁移)
// await db.schema
// .alterTable('users')
// .alterColumn('full_name', (col) => col.setNotNull())
// .execute();
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema
.alterTable('users')
.dropColumn('full_name')
.execute();
}
常见安全迁移:
// 添加索引(PostgreSQL 并发)
await db.schema
.createIndex('idx_users_email')
.on('users')
.column('email')
.execute();
// 重命名列(多阶段方法)
// 阶段 1: 添加新列
await db.schema
.alterTable('users')
.addColumn('email_address', 'varchar(255)')
.execute();
// 阶段 2: 复制数据
await db
.updateTable('users')
.set({ email_address: sql`email` })
.execute();
// 阶段 3: 删除旧列(部署后)
// await db.schema
// .alterTable('users')
// .dropColumn('email')
// .execute();
// 更改列类型(添加新列,迁移,删除旧列)
await db.schema
.alterTable('products')
.addColumn('price_cents', 'integer')
.execute();
await db
.updateTable('products')
.set({ price_cents: sql`cast(price * 100 as integer)` })
.execute();
运行迁移:
// migrate.ts
import { Kysely, Migrator, FileMigrationProvider } from 'kysely';
import { promises as fs } from 'fs';
import path from 'path';
const migrator = new Migrator({
db,
provider: new FileMigrationProvider({
fs,
path,
migrationFolder: path.join(__dirname, 'migrations'),
}),
});
// 迁移到最新版本
const { error, results } = await migrator.migrateToLatest();
// 向上/向下迁移
await migrator.migrateUp();
await migrator.migrateDown();
// 列出待处理的迁移
const migrations = await migrator.getMigrations();
[完整的 TypeScript 模式和迁移工作流可在相应的技能中找到,如果一起部署]
每周安装量
96
仓库
GitHub 星标数
18
首次出现
2026年1月23日
安全审计
已安装于
gemini-cli75
opencode75
codex75
github-copilot71
claude-code70
cursor65
Kysely is a type-safe TypeScript SQL query builder that provides end-to-end type safety from database schema to query results. Unlike ORMs, it generates plain SQL and gives you full control while maintaining perfect TypeScript inference.
Key Features :
Installation :
npm install kysely
# Database driver (choose one)
npm install pg # PostgreSQL
npm install mysql2 # MySQL
npm install better-sqlite3 # SQLite
import { Generated, Selectable, Insertable, Updateable } from 'kysely';
// Table interface (all columns)
interface UserTable {
id: Generated<number>;
email: string;
name: string | null;
created_at: Generated<Date>;
updated_at: Date;
}
interface PostTable {
id: Generated<number>;
user_id: number;
title: string;
content: string;
published: Generated<boolean>;
created_at: Generated<Date>;
}
// Database interface
interface Database {
users: UserTable;
posts: PostTable;
}
// Type-safe query result types
type User = Selectable<UserTable>;
type NewUser = Insertable<UserTable>;
type UserUpdate = Updateable<UserTable>;
import { Kysely, PostgresDialect } from 'kysely';
import { Pool } from 'pg';
const db = new Kysely<Database>({
dialect: new PostgresDialect({
pool: new Pool({
host: process.env.DB_HOST,
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
max: 10,
}),
}),
});
// SELECT with full type inference
const users = await db
.selectFrom('users')
.select(['id', 'email', 'name'])
.where('created_at', '>', new Date('2024-01-01'))
.execute();
// Type: Array<{ id: number; email: string; name: string | null }>
// INSERT with type checking
const newUser: NewUser = {
email: 'alice@example.com',
name: 'Alice',
updated_at: new Date(),
};
const inserted = await db
.insertInto('users')
.values(newUser)
.returningAll()
.executeTakeFirstOrThrow();
// Type: User
// UPDATE
await db
.updateTable('users')
.set({ name: 'Alice Updated', updated_at: new Date() })
.where('id', '=', 1)
.execute();
// DELETE
await db
.deleteFrom('users')
.where('email', 'like', '%@spam.com')
.execute();
// INNER JOIN
const usersWithPosts = await db
.selectFrom('users')
.innerJoin('posts', 'posts.user_id', 'users.id')
.select([
'users.id',
'users.name',
'posts.title',
'posts.content',
])
.execute();
// Type: Array<{ id: number; name: string | null; title: string; content: string }>
// LEFT JOIN with null handling
const usersWithOptionalPosts = await db
.selectFrom('users')
.leftJoin('posts', 'posts.user_id', 'users.id')
.select([
'users.id',
'users.email',
'posts.title', // Type: string | null (from LEFT JOIN)
])
.execute();
// Multiple joins
const complexQuery = await db
.selectFrom('posts')
.innerJoin('users', 'users.id', 'posts.user_id')
.leftJoin('comments', 'comments.post_id', 'posts.id')
.select([
'posts.id as postId',
'posts.title',
'users.name as authorName',
'comments.id as commentId',
])
.execute();
import { sql } from 'kysely';
// COUNT, AVG, SUM
const stats = await db
.selectFrom('posts')
.select([
'user_id',
db.fn.count<number>('id').as('post_count'),
db.fn.avg<number>('views').as('avg_views'),
])
.groupBy('user_id')
.having(db.fn.count('id'), '>', 5)
.execute();
// Type: Array<{ user_id: number; post_count: number; avg_views: number }>
// Complex aggregations with raw SQL
const advanced = await db
.selectFrom('users')
.select([
'users.id',
sql<number>`COUNT(DISTINCT posts.id)`.as('total_posts'),
sql<Date>`MAX(posts.created_at)`.as('latest_post'),
])
.leftJoin('posts', 'posts.user_id', 'users.id')
.groupBy('users.id')
.execute();
// Scalar subquery
const usersWithPostCount = await db
.selectFrom('users')
.select([
'users.id',
'users.name',
(eb) =>
eb
.selectFrom('posts')
.select(eb.fn.count<number>('id').as('count'))
.whereRef('posts.user_id', '=', 'users.id')
.as('post_count'),
])
.execute();
// EXISTS subquery
const activeUsers = await db
.selectFrom('users')
.selectAll()
.where((eb) =>
eb.exists(
eb
.selectFrom('posts')
.select('id')
.whereRef('posts.user_id', '=', 'users.id')
.where('created_at', '>', new Date('2024-01-01'))
)
)
.execute();
// IN subquery
const usersInTopTier = await db
.selectFrom('users')
.selectAll()
.where(
'id',
'in',
db.selectFrom('posts')
.select('user_id')
.groupBy('user_id')
.having(db.fn.count('id'), '>', 100)
)
.execute();
// WITH clause
const result = await db
.with('popular_posts', (db) =>
db
.selectFrom('posts')
.select(['id', 'user_id', 'title'])
.where('views', '>', 1000)
)
.with('active_users', (db) =>
db
.selectFrom('users')
.select(['id', 'email'])
.where('last_login', '>', new Date('2024-01-01'))
)
.selectFrom('popular_posts')
.innerJoin('active_users', 'active_users.id', 'popular_posts.user_id')
.selectAll()
.execute();
// Recursive CTE (organizational hierarchy)
interface OrgNode {
id: number;
name: string;
parent_id: number | null;
level: number;
}
const hierarchy = await db
.withRecursive('org_tree', (db) =>
db
.selectFrom('departments')
.select(['id', 'name', 'parent_id', sql<number>`0`.as('level')])
.where('parent_id', 'is', null)
.unionAll(
db
.selectFrom('departments')
.innerJoin('org_tree', 'org_tree.id', 'departments.parent_id')
.select([
'departments.id',
'departments.name',
'departments.parent_id',
sql<number>`org_tree.level + 1`.as('level'),
])
)
)
.selectFrom('org_tree')
.selectAll()
.execute();
# Install
npm install --save-dev kysely-codegen
# Generate types from existing database
npx kysely-codegen --url "postgresql://user:pass@localhost:5432/mydb"
Generated output:
// Generated by kysely-codegen
import type { ColumnType, Generated } from 'kysely';
export interface Database {
users: UsersTable;
posts: PostsTable;
comments: CommentsTable;
}
export interface UsersTable {
id: Generated<number>;
email: string;
name: string | null;
created_at: Generated<Date>;
}
export interface PostsTable {
id: Generated<number>;
user_id: number;
title: string;
content: string;
published: Generated<boolean>;
created_at: Generated<Date>;
}
// Map database types to TypeScript types
interface CustomTypes {
timestamp: Date;
jsonb: unknown;
numeric: string; // Preserve precision
uuid: string;
}
interface ProductTable {
id: ColumnType<string, string | undefined, string>; // SELECT, INSERT, UPDATE types
metadata: ColumnType<Record<string, unknown>, string, string>; // JSON column
price: ColumnType<number, number, number | undefined>; // Numeric
}
import { Kysely, Migrator, FileMigrationProvider } from 'kysely';
import { promises as fs } from 'fs';
import * as path from 'path';
const migrator = new Migrator({
db,
provider: new FileMigrationProvider({
fs,
path,
migrationFolder: path.join(__dirname, 'migrations'),
}),
});
// Run all pending migrations
async function migrateToLatest() {
const { error, results } = await migrator.migrateToLatest();
results?.forEach((it) => {
if (it.status === 'Success') {
console.log(`Migration "${it.migrationName}" executed successfully`);
} else if (it.status === 'Error') {
console.error(`Migration "${it.migrationName}" failed`);
}
});
if (error) {
console.error('Migration failed:', error);
process.exit(1);
}
}
// Rollback last migration
async function migrateDown() {
const { error, results } = await migrator.migrateDown();
// Handle results...
}
// migrations/001_create_users.ts
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.createTable('users')
.addColumn('id', 'serial', (col) => col.primaryKey())
.addColumn('email', 'varchar(255)', (col) => col.notNull().unique())
.addColumn('name', 'varchar(255)')
.addColumn('created_at', 'timestamp', (col) =>
col.defaultTo(sql`CURRENT_TIMESTAMP`).notNull()
)
.execute();
await db.schema
.createIndex('users_email_idx')
.on('users')
.column('email')
.execute();
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.dropTable('users').execute();
}
// Add foreign key
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.createTable('posts')
.addColumn('id', 'serial', (col) => col.primaryKey())
.addColumn('user_id', 'integer', (col) =>
col.references('users.id').onDelete('cascade').notNull()
)
.addColumn('title', 'varchar(500)', (col) => col.notNull())
.addColumn('content', 'text')
.execute();
}
// Alter table
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.alterTable('users')
.addColumn('bio', 'text')
.execute();
await db.schema
.alterTable('users')
.modifyColumn('email', 'varchar(320)')
.execute();
}
// Add enum column (PostgreSQL)
export async function up(db: Kysely<any>): Promise<void> {
await sql`CREATE TYPE user_role AS ENUM ('admin', 'user', 'guest')`.execute(db);
await db.schema
.alterTable('users')
.addColumn('role', sql`user_role`, (col) => col.defaultTo('user'))
.execute();
}
// Automatic rollback on error
await db.transaction().execute(async (trx) => {
await trx
.insertInto('users')
.values({ email: 'alice@example.com', name: 'Alice', updated_at: new Date() })
.execute();
await trx
.insertInto('posts')
.values({ user_id: 1, title: 'First Post', content: 'Hello' })
.execute();
});
// Manual transaction control
const trx = await db.transaction().execute(async (trx) => {
const user = await trx
.insertInto('users')
.values({ email: 'bob@example.com', name: 'Bob', updated_at: new Date() })
.returningAll()
.executeTakeFirstOrThrow();
const post = await trx
.insertInto('posts')
.values({
user_id: user.id,
title: 'Bob\'s Post',
content: 'Content',
})
.returningAll()
.executeTakeFirstOrThrow();
return { user, post };
});
import { IsolationLevel } from 'kysely';
// Read committed (default)
await db.transaction()
.setIsolationLevel('read committed')
.execute(async (trx) => {
// Transaction logic
});
// Serializable (strongest isolation)
await db.transaction()
.setIsolationLevel('serializable')
.execute(async (trx) => {
const balance = await trx
.selectFrom('accounts')
.select('balance')
.where('id', '=', accountId)
.executeTakeFirstOrThrow();
await trx
.updateTable('accounts')
.set({ balance: balance.balance - amount })
.where('id', '=', accountId)
.execute();
});
import { sql } from 'kysely';
// Raw SQL in SELECT
const result = await db
.selectFrom('users')
.select([
'id',
sql<string>`UPPER(name)`.as('uppercase_name'),
sql<number>`EXTRACT(YEAR FROM created_at)`.as('year_created'),
])
.execute();
// Raw SQL in WHERE
const filtered = await db
.selectFrom('posts')
.selectAll()
.where(sql`LOWER(title)`, 'like', '%typescript%')
.execute();
// Complex raw queries
const custom = await sql<{ total: number; avg_age: number }>`
SELECT
COUNT(*) as total,
AVG(EXTRACT(YEAR FROM age(birth_date))) as avg_age
FROM users
WHERE active = true
`.execute(db);
// Execute arbitrary SQL
const result = await sql`
WITH ranked_posts AS (
SELECT
p.*,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY views DESC) as rank
FROM posts p
)
SELECT * FROM ranked_posts WHERE rank <= 3
`.execute(db);
// Parameterized raw queries
const email = 'alice@example.com';
const user = await sql<User>`
SELECT * FROM users WHERE email = ${email}
`.execute(db);
import { jsonBuildObject, jsonArrayFrom } from 'kysely/helpers/postgres';
// Build JSON objects
const usersWithPosts = await db
.selectFrom('users')
.select([
'users.id',
'users.name',
jsonArrayFrom(
db
.selectFrom('posts')
.select(['posts.id', 'posts.title', 'posts.content'])
.whereRef('posts.user_id', '=', 'users.id')
).as('posts'),
])
.execute();
// Result: { id: 1, name: "Alice", posts: [{ id: 1, title: "..." }] }
// JSON aggregation
const nested = await db
.selectFrom('users')
.select([
'users.id',
jsonBuildObject({
name: 'users.name',
email: 'users.email',
postCount: sql<number>`(SELECT COUNT(*) FROM posts WHERE user_id = users.id)`,
}).as('user_data'),
])
.execute();
import { SelectQueryBuilder } from 'kysely';
function paginate<DB, TB extends keyof DB, O>(
query: SelectQueryBuilder<DB, TB, O>,
page: number,
pageSize: number
) {
return query.limit(pageSize).offset((page - 1) * pageSize);
}
// Usage
const page = 2;
const pageSize = 20;
const users = await paginate(
db.selectFrom('users').selectAll(),
page,
pageSize
).execute();
// With total count
async function paginateWithCount<DB, TB extends keyof DB, O>(
query: SelectQueryBuilder<DB, TB, O>,
page: number,
pageSize: number
) {
const [items, { count }] = await Promise.all([
query.limit(pageSize).offset((page - 1) * pageSize).execute(),
query.select(db.fn.count<number>('id').as('count')).executeTakeFirstOrThrow(),
]);
return {
items,
total: count,
page,
pageSize,
totalPages: Math.ceil(count / pageSize),
};
}
// GIN index for full-text search
export async function up(db: Kysely<any>): Promise<void> {
await sql`
ALTER TABLE posts
ADD COLUMN search_vector tsvector
GENERATED ALWAYS AS (
to_tsvector('english', coalesce(title, '') || ' ' || coalesce(content, ''))
) STORED
`.execute(db);
await sql`
CREATE INDEX posts_search_idx ON posts USING GIN (search_vector)
`.execute(db);
}
// Full-text search query
const searchResults = await db
.selectFrom('posts')
.selectAll()
.where(
sql`search_vector`,
'@@',
sql`to_tsquery('english', ${query})`
)
.execute();
| Feature | Kysely | Drizzle | Prisma |
|---|---|---|---|
| Type Safety | Full (schema → queries) | Full (schema → queries) | Full (generated client) |
| SQL Control | ✅ Raw SQL friendly | ✅ Raw SQL friendly | ❌ Limited |
| Bundle Size | ~50kB | ~30kB | ~500kB+ |
| Migration System | ✅ Built-in | ✅ Built-in | ✅ Powerful CLI |
| Query Performance | ✅ Plain SQL | ✅ Plain SQL | ❌ Slower (abstraction) |
| Schema Definition |
✅ Choose Kysely when:
❌ Choose Drizzle when:
❌ Choose Prisma when:
// Prisma
const users = await prisma.user.findMany({
where: { createdAt: { gte: new Date('2024-01-01') } },
include: { posts: true },
});
// Kysely equivalent
const users = await db
.selectFrom('users')
.select([
'users.id',
'users.email',
jsonArrayFrom(
db.selectFrom('posts')
.selectAll()
.whereRef('posts.user_id', '=', 'users.id')
).as('posts'),
])
.where('created_at', '>=', new Date('2024-01-01'))
.execute();
Generated, Selectable, Insertable, Updateable❌ Forgetting to execute queries :
// WRONG - returns query builder, not results
const users = db.selectFrom('users').selectAll();
// CORRECT
const users = await db.selectFrom('users').selectAll().execute();
❌ Not handling null from LEFT JOIN :
// TypeScript knows posts.title can be null from LEFT JOIN
const result = await db
.selectFrom('users')
.leftJoin('posts', 'posts.user_id', 'users.id')
.select(['users.name', 'posts.title'])
.execute();
// posts.title type: string | null
❌ Missing Generated for auto-increment columns :
// WRONG - TypeScript will require 'id' in INSERT
interface UserTable {
id: number; // Bad!
}
// CORRECT
interface UserTable {
id: Generated<number>; // INSERT doesn't require id
}
When using Kysely, consider these complementary skills:
// Kysely leverages advanced TypeScript features
import { Kysely, Generated, ColumnType } from 'kysely';
// Database interface with Generated types
interface Database {
users: {
id: Generated<number>; // Auto-generated by database
email: string;
created_at: ColumnType<Date, string | undefined, never>;
// ColumnType<SelectType, InsertType, UpdateType>
};
}
// Type inference in queries
const db = new Kysely<Database>({ /* config */ });
// Full type safety - TypeScript knows return type
const users = await db
.selectFrom('users')
.select(['id', 'email'])
.where('created_at', '>', new Date('2025-01-01'))
.execute();
// Type: Array<{ id: number; email: string }>
// Conditional types for dynamic queries
type SelectFields<T> = {
[K in keyof T]: T[K] extends ColumnType<infer S, any, any> ? S : T[K];
};
Safe Migration Principles:
Kysely Migration Example:
// migrations/001_add_full_name.ts
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
// Phase 1: Add new column (nullable initially)
await db.schema
.alterTable('users')
.addColumn('full_name', 'varchar(255)')
.execute();
// Phase 2: Backfill data
await db
.updateTable('users')
.set({
full_name: sql`concat(first_name, ' ', last_name)`
})
.execute();
// Phase 3: Make required (separate migration recommended)
// await db.schema
// .alterTable('users')
// .alterColumn('full_name', (col) => col.setNotNull())
// .execute();
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema
.alterTable('users')
.dropColumn('full_name')
.execute();
}
Common Safe Migrations:
// Add index (concurrently for PostgreSQL)
await db.schema
.createIndex('idx_users_email')
.on('users')
.column('email')
.execute();
// Rename column (multi-phase approach)
// Phase 1: Add new column
await db.schema
.alterTable('users')
.addColumn('email_address', 'varchar(255)')
.execute();
// Phase 2: Copy data
await db
.updateTable('users')
.set({ email_address: sql`email` })
.execute();
// Phase 3: Drop old column (after deploy)
// await db.schema
// .alterTable('users')
// .dropColumn('email')
// .execute();
// Change column type (add new, migrate, drop old)
await db.schema
.alterTable('products')
.addColumn('price_cents', 'integer')
.execute();
await db
.updateTable('products')
.set({ price_cents: sql`cast(price * 100 as integer)` })
.execute();
Running Migrations:
// migrate.ts
import { Kysely, Migrator, FileMigrationProvider } from 'kysely';
import { promises as fs } from 'fs';
import path from 'path';
const migrator = new Migrator({
db,
provider: new FileMigrationProvider({
fs,
path,
migrationFolder: path.join(__dirname, 'migrations'),
}),
});
// Migrate to latest
const { error, results } = await migrator.migrateToLatest();
// Migrate up/down
await migrator.migrateUp();
await migrator.migrateDown();
// List pending migrations
const migrations = await migrator.getMigrations();
[Full TypeScript patterns and migration workflows available in respective skills if deployed together]
Weekly Installs
96
Repository
GitHub Stars
18
First Seen
Jan 23, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
gemini-cli75
opencode75
codex75
github-copilot71
claude-code70
cursor65
Tailwind CSS v4 + shadcn/ui 生产级技术栈配置指南与最佳实践
2,600 周安装
| TypeScript types |
| TypeScript schema |
| Prisma schema |
| Codegen Required | Optional | No | ✅ Required |
| ORM Features | ❌ Query builder only | Partial (relational) | ✅ Full ORM |
| Learning Curve | Medium (SQL knowledge) | Medium | Easy (abstracts SQL) |
| Best For | SQL-first, complex queries | Type-safe schemas | Rapid prototyping |