golang-database-patterns by bobmatnyc/claude-mpm-skills
npx skills add https://github.com/bobmatnyc/claude-mpm-skills --skill golang-database-patternsGo 的数据库生态系统为 SQL 数据库集成提供了多层抽象。从标准库的 database/sql 到增强库如 sqlx 和针对 PostgreSQL 优化的 pgx,开发者可以根据性能和易用性需求选择合适的工具。
主要特性:
在以下场景激活此技能:
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
┌─────────────────────────────────────┐
│ 你使用什么数据库? │
└──────────────┬──────────────────────┘
│
┌──────────┴──────────┐
│ │
PostgreSQL 其他 SQL 数据库
│ │
▼ ▼
┌─────────────────┐ 使用 database/sql
│ 需要最大性能? │ + sqlx 以获得便利性
└─────┬───────────┘
│
┌──┴──┐
是 否
│ │
pgx sqlx + pq 驱动
在以下情况使用 database/sql:
在以下情况使用 sqlx:
在以下情况使用 pgx:
核心概念:
package main
import (
"context"
"database/sql"
"time"
_ "github.com/lib/pq" // PostgreSQL 驱动
)
func setupDB(dsn string) (*sql.DB, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
// 连接池配置
db.SetMaxOpenConns(25) // 最大打开连接数
db.SetMaxIdleConns(5) // 最大空闲连接数
db.SetConnMaxLifetime(5 * time.Minute) // 连接最大生命周期
db.SetConnMaxIdleTime(1 * time.Minute) // 连接最大空闲时间
// 验证连接
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
return nil, err
}
return db, nil
}
关键模式:
// 查询单行
func GetUserByID(ctx context.Context, db *sql.DB, id int) (*User, error) {
var user User
query := `SELECT id, name, email, created_at FROM users WHERE id = $1`
err := db.QueryRowContext(ctx, query, id).Scan(
&user.ID, &user.Name, &user.Email, &user.CreatedAt,
)
if err == sql.ErrNoRows {
return nil, ErrUserNotFound // 自定义错误
}
if err != nil {
return nil, fmt.Errorf("query user: %w", err)
}
return &user, nil
}
// 查询多行
func ListActiveUsers(ctx context.Context, db *sql.DB) ([]User, error) {
query := `SELECT id, name, email, created_at FROM users WHERE active = true`
rows, err := db.QueryContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("query users: %w", err)
}
defer rows.Close() // 关键:始终关闭 rows
var users []User
for rows.Next() {
var user User
if err := rows.Scan(&user.ID, &user.Name, &user.Email, &user.CreatedAt); err != nil {
return nil, fmt.Errorf("scan user: %w", err)
}
users = append(users, user)
}
// 检查迭代期间的错误
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate users: %w", err)
}
return users, nil
}
安装:
go get github.com/jmoiron/sqlx
核心特性:
package main
import (
"context"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
)
type User struct {
ID int `db:"id"`
Name string `db:"name"`
Email string `db:"email"`
CreatedAt time.Time `db:"created_at"`
}
// 获取单个结构体
func GetUserByID(ctx context.Context, db *sqlx.DB, id int) (*User, error) {
var user User
query := `SELECT id, name, email, created_at FROM users WHERE id = $1`
err := db.GetContext(ctx, &user, query, id)
if err == sql.ErrNoRows {
return nil, ErrUserNotFound
}
return &user, err
}
// 选择多个结构体
func ListUsers(ctx context.Context, db *sqlx.DB, limit int) ([]User, error) {
var users []User
query := `SELECT id, name, email, created_at FROM users LIMIT $1`
err := db.SelectContext(ctx, &users, query, limit)
return users, err
}
// 命名查询
func FindUsersByName(ctx context.Context, db *sqlx.DB, name string) ([]User, error) {
var users []User
query := `SELECT * FROM users WHERE name LIKE :name || '%'`
nstmt, err := db.PrepareNamedContext(ctx, query)
if err != nil {
return nil, err
}
defer nstmt.Close()
err = nstmt.SelectContext(ctx, &users, map[string]interface{}{"name": name})
return users, err
}
// IN 子句扩展
func GetUsersByIDs(ctx context.Context, db *sqlx.DB, ids []int) ([]User, error) {
var users []User
query, args, err := sqlx.In(`SELECT * FROM users WHERE id IN (?)`, ids)
if err != nil {
return nil, err
}
// 为 PostgreSQL($1, $2, ...)与 MySQL(?, ?, ...)重新绑定
query = db.Rebind(query)
err = db.SelectContext(ctx, &users, query, args...)
return users, err
}
安装:
go get github.com/jackc/pgx/v5
go get github.com/jackc/pgx/v5/pgxpool
连接池设置:
package main
import (
"context"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
)
func setupPgxPool(ctx context.Context, dsn string) (*pgxpool.Pool, error) {
config, err := pgxpool.ParseConfig(dsn)
if err != nil {
return nil, fmt.Errorf("parse config: %w", err)
}
// 连接池调优
config.MaxConns = 25
config.MinConns = 5
config.MaxConnLifetime = 1 * time.Hour
config.MaxConnIdleTime = 30 * time.Minute
config.HealthCheckPeriod = 1 * time.Minute
pool, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
return nil, fmt.Errorf("create pool: %w", err)
}
// 验证连通性
if err := pool.Ping(ctx); err != nil {
return nil, fmt.Errorf("ping: %w", err)
}
return pool, nil
}
查询模式:
// 查询单行
func GetUser(ctx context.Context, pool *pgxpool.Pool, id int) (*User, error) {
var user User
query := `SELECT id, name, email, created_at FROM users WHERE id = $1`
err := pool.QueryRow(ctx, query, id).Scan(
&user.ID, &user.Name, &user.Email, &user.CreatedAt,
)
if err == pgx.ErrNoRows {
return nil, ErrUserNotFound
}
return &user, err
}
// 批量操作(pgx 特定优化)
func BatchInsertUsers(ctx context.Context, pool *pgxpool.Pool, users []User) error {
batch := &pgx.Batch{}
query := `INSERT INTO users (name, email) VALUES ($1, $2)`
for _, user := range users {
batch.Queue(query, user.Name, user.Email)
}
results := pool.SendBatch(ctx, batch)
defer results.Close()
for range users {
_, err := results.Exec()
if err != nil {
return fmt.Errorf("batch insert: %w", err)
}
}
return nil
}
// 用于批量插入的 COPY(比 INSERT 快 10 倍)
func BulkCopyUsers(ctx context.Context, pool *pgxpool.Pool, users []User) error {
_, err := pool.CopyFrom(
ctx,
pgx.Identifier{"users"},
[]string{"name", "email"},
pgx.CopyFromSlice(len(users), func(i int) ([]interface{}, error) {
return []interface{}{users[i].Name, users[i].Email}, nil
}),
)
return err
}
package repository
import (
"context"
"database/sql"
)
// UserRepository 定义数据访问接口
type UserRepository interface {
Create(ctx context.Context, user *User) error
GetByID(ctx context.Context, id int) (*User, error)
GetByEmail(ctx context.Context, email string) (*User, error)
Update(ctx context.Context, user *User) error
Delete(ctx context.Context, id int) error
List(ctx context.Context, filters ListFilters) ([]User, error)
}
// PostgresUserRepository 实现 UserRepository
type PostgresUserRepository struct {
db *sqlx.DB
}
func NewPostgresUserRepository(db *sqlx.DB) *PostgresUserRepository {
return &PostgresUserRepository{db: db}
}
func (r *PostgresUserRepository) Create(ctx context.Context, user *User) error {
query := `
INSERT INTO users (name, email, password_hash)
VALUES ($1, $2, $3)
RETURNING id, created_at
`
err := r.db.QueryRowContext(
ctx, query,
user.Name, user.Email, user.PasswordHash,
).Scan(&user.ID, &user.CreatedAt)
if err != nil {
return fmt.Errorf("insert user: %w", err)
}
return nil
}
func (r *PostgresUserRepository) GetByID(ctx context.Context, id int) (*User, error) {
var user User
query := `SELECT id, name, email, created_at, updated_at FROM users WHERE id = $1`
err := r.db.GetContext(ctx, &user, query, id)
if err == sql.ErrNoRows {
return nil, ErrUserNotFound
}
if err != nil {
return nil, fmt.Errorf("get user: %w", err)
}
return &user, nil
}
func (r *PostgresUserRepository) Update(ctx context.Context, user *User) error {
query := `
UPDATE users
SET name = $1, email = $2, updated_at = NOW()
WHERE id = $3
RETURNING updated_at
`
err := r.db.QueryRowContext(
ctx, query,
user.Name, user.Email, user.ID,
).Scan(&user.UpdatedAt)
if err == sql.ErrNoRows {
return ErrUserNotFound
}
return err
}
func (r *PostgresUserRepository) Delete(ctx context.Context, id int) error {
query := `DELETE FROM users WHERE id = $1`
result, err := r.db.ExecContext(ctx, query, id)
if err != nil {
return fmt.Errorf("delete user: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return err
}
if rows == 0 {
return ErrUserNotFound
}
return nil
}
package repository_test
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
// 用于测试的 MockUserRepository
type MockUserRepository struct {
mock.Mock
}
func (m *MockUserRepository) GetByID(ctx context.Context, id int) (*User, error) {
args := m.Called(ctx, id)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*User), args.Error(1)
}
func TestUserService_GetUser(t *testing.T) {
mockRepo := new(MockUserRepository)
service := NewUserService(mockRepo)
expectedUser := &User{ID: 1, Name: "Alice", Email: "alice@example.com"}
mockRepo.On("GetByID", mock.Anything, 1).Return(expectedUser, nil)
user, err := service.GetUser(context.Background(), 1)
assert.NoError(t, err)
assert.Equal(t, expectedUser, user)
mockRepo.AssertExpectations(t)
}
func (r *PostgresUserRepository) UpdateWithHistory(ctx context.Context, user *User) error {
tx, err := r.db.BeginTxx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback() // 即使在提交后调用也是安全的
// 更新用户
query := `UPDATE users SET name = $1, email = $2 WHERE id = $3`
_, err = tx.ExecContext(ctx, query, user.Name, user.Email, user.ID)
if err != nil {
return fmt.Errorf("update user: %w", err)
}
// 插入历史记录
historyQuery := `INSERT INTO user_history (user_id, name, email, changed_at) VALUES ($1, $2, $3, NOW())`
_, err = tx.ExecContext(ctx, historyQuery, user.ID, user.Name, user.Email)
if err != nil {
return fmt.Errorf("insert history: %w", err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit tx: %w", err)
}
return nil
}
func (r *PostgresUserRepository) TransferBalance(ctx context.Context, fromID, toID int, amount float64) error {
// 对金融交易使用可序列化隔离级别
txOpts := &sql.TxOptions{
Isolation: sql.LevelSerializable,
ReadOnly: false,
}
tx, err := r.db.BeginTxx(ctx, txOpts)
if err != nil {
return err
}
defer tx.Rollback()
// 从发送方扣除
_, err = tx.ExecContext(ctx,
`UPDATE accounts SET balance = balance - $1 WHERE user_id = $2 AND balance >= $1`,
amount, fromID,
)
if err != nil {
return fmt.Errorf("deduct balance: %w", err)
}
// 添加到接收方
_, err = tx.ExecContext(ctx,
`UPDATE accounts SET balance = balance + $1 WHERE user_id = $2`,
amount, toID,
)
if err != nil {
return fmt.Errorf("add balance: %w", err)
}
return tx.Commit()
}
func WithRetry(ctx context.Context, maxRetries int, fn func() error) error {
for i := 0; i < maxRetries; i++ {
err := fn()
if err == nil {
return nil
}
// 检查序列化错误(PostgreSQL 错误码 40001)
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code == "40001" {
// 指数退避
time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
continue
}
return err // 不可重试的错误
}
return fmt.Errorf("max retries exceeded")
}
// 用法
err := WithRetry(ctx, 3, func() error {
return r.TransferBalance(ctx, fromID, toID, amount)
})
┌─────────────────────────────────────┐
│ 迁移工具选择 │
└──────────────┬──────────────────────┘
│
┌──────────┴──────────┐
│ │
简单 SQL 复杂逻辑
迁移 (需要 Go 代码)
│ │
▼ ▼
golang-migrate goose
(仅 SQL) (Go + SQL 迁移)
在以下情况使用 golang-migrate:
在以下情况使用 goose:
在以下情况使用 sql-migrate:
安装:
# CLI 工具
go install -tags 'postgres' github.com/golang-migrate/migrate/v4/cmd/migrate@latest
# 库
go get -u github.com/golang-migrate/migrate/v4
go get -u github.com/golang-migrate/migrate/v4/database/postgres
go get -u github.com/golang-migrate/migrate/v4/source/file
迁移文件:
# 创建迁移
migrate create -ext sql -dir migrations -seq create_users_table
# 生成:
# migrations/000001_create_users_table.up.sql
# migrations/000001_create_users_table.down.sql
000001_create_users_table.up.sql:
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_users_email ON users(email);
000001_create_users_table.down.sql:
DROP INDEX IF EXISTS idx_users_email;
DROP TABLE IF EXISTS users;
程序化迁移:
package main
import (
"fmt"
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/postgres"
_ "github.com/golang-migrate/migrate/v4/source/file"
)
func runMigrations(databaseURL, migrationsPath string) error {
m, err := migrate.New(
fmt.Sprintf("file://%s", migrationsPath),
databaseURL,
)
if err != nil {
return fmt.Errorf("create migrate instance: %w", err)
}
defer m.Close()
if err := m.Up(); err != nil && err != migrate.ErrNoChange {
return fmt.Errorf("run migrations: %w", err)
}
version, dirty, err := m.Version()
if err != nil {
return err
}
fmt.Printf("Migration complete. Version: %d, Dirty: %v\n", version, dirty)
return nil
}
CLI 用法:
# 应用所有 up 迁移
migrate -path migrations -database "postgres://user:pass@localhost:5432/db?sslmode=disable" up
# 回滚一个迁移
migrate -path migrations -database $DATABASE_URL down 1
# 跳转到特定版本
migrate -path migrations -database $DATABASE_URL goto 5
# 检查当前版本
migrate -path migrations -database $DATABASE_URL version
type User struct {
ID int `db:"id"`
Name string `db:"name"`
Email string `db:"email"`
Phone sql.NullString `db:"phone"` // 可空的字符串
Age sql.NullInt64 `db:"age"` // 可空的整数
UpdatedAt sql.NullTime `db:"updated_at"` // 可空的时间戳
}
func (r *PostgresUserRepository) GetUser(ctx context.Context, id int) (*User, error) {
var user User
err := r.db.GetContext(ctx, &user, `SELECT * FROM users WHERE id = $1`, id)
if err != nil {
return nil, err
}
// 访问可空字段
if user.Phone.Valid {
fmt.Println("Phone:", user.Phone.String)
}
return &user, nil
}
// 设置 NULL 值
func (r *PostgresUserRepository) UpdatePhone(ctx context.Context, userID int, phone *string) error {
var nullPhone sql.NullString
if phone != nil {
nullPhone = sql.NullString{String: *phone, Valid: true}
}
// 如果 phone 是 nil,nullPhone.Valid 为 false,SQL 写入 NULL
query := `UPDATE users SET phone = $1 WHERE id = $2`
_, err := r.db.ExecContext(ctx, query, nullPhone, userID)
return err
}
// 带有 JSON 序列化的自定义可空类型
type NullString struct {
sql.NullString
}
func (ns NullString) MarshalJSON() ([]byte, error) {
if !ns.Valid {
return []byte("null"), nil
}
return json.Marshal(ns.String)
}
func (ns *NullString) UnmarshalJSON(data []byte) error {
if string(data) == "null" {
ns.Valid = false
return nil
}
var s string
if err := json.Unmarshal(data, &s); err != nil {
return err
}
ns.String = s
ns.Valid = true
return nil
}
错误做法:
func GetUsersWithPosts(ctx context.Context, db *sqlx.DB) ([]UserWithPosts, error) {
var users []User
db.SelectContext(ctx, &users, `SELECT * FROM users`)
for i, user := range users {
var posts []Post
// N+1:每个用户一个查询!
db.SelectContext(ctx, &posts, `SELECT * FROM posts WHERE user_id = $1`, user.ID)
users[i].Posts = posts
}
return users, nil
}
正确做法:
func GetUsersWithPosts(ctx context.Context, db *sqlx.DB) ([]UserWithPosts, error) {
// 使用 JOIN 的单次查询
query := `
SELECT u.id, u.name, p.id as post_id, p.title, p.content
FROM users u
LEFT JOIN posts p ON p.user_id = u.id
ORDER BY u.id
`
rows, err := db.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
usersMap := make(map[int]*UserWithPosts)
for rows.Next() {
var userID int
var userName string
var postID sql.NullInt64
var title, content sql.NullString
rows.Scan(&userID, &userName, &postID, &title, &content)
if _, exists := usersMap[userID]; !exists {
usersMap[userID] = &UserWithPosts{ID: userID, Name: userName}
}
if postID.Valid {
usersMap[userID].Posts = append(usersMap[userID].Posts, Post{
ID: int(postID.Int64),
Title: title.String,
Content: content.String,
})
}
}
result := make([]UserWithPosts, 0, len(usersMap))
for _, user := range usersMap {
result = append(result, *user)
}
return result, nil
}
错误做法:
db, _ := sql.Open("postgres", dsn)
// 使用默认值:无限连接,无超时
正确做法:
db, _ := sql.Open("postgres", dsn)
// 生产就绪的池设置
db.SetMaxOpenConns(25) // 限制总连接数
db.SetMaxIdleConns(5) // 限制空闲连接数
db.SetConnMaxLifetime(5 * time.Minute) // 回收旧连接
db.SetConnMaxIdleTime(1 * time.Minute) // 关闭空闲连接
错误做法:
func SlowQuery(db *sql.DB) error {
// 无上下文 - 即使客户端断开连接,查询也会运行到完成
rows, err := db.Query("SELECT * FROM huge_table")
// ...
}
正确做法:
func SlowQuery(ctx context.Context, db *sql.DB) error {
// 上下文取消会传播到数据库
rows, err := db.QueryContext(ctx, "SELECT * FROM huge_table")
// 如果 ctx 被取消,查询会被终止
}
错误做法:
func GetUsers(db *sql.DB) ([]User, error) {
rows, _ := db.Query("SELECT * FROM users")
// 缺少 rows.Close() - 连接泄漏!
var users []User
for rows.Next() {
// ...
}
return users, nil
}
正确做法:
func GetUsers(db *sql.DB) ([]User, error) {
rows, err := db.Query("SELECT * FROM users")
if err != nil {
return nil, err
}
defer rows.Close() // 关键:始终 defer Close
var users []User
for rows.Next() {
// ...
}
return users, rows.Err() // 检查迭代错误
}
错误做法:
func FindUser(db *sql.DB, email string) (*User, error) {
// 切勿将用户输入拼接到 SQL 中!
query := fmt.Sprintf("SELECT * FROM users WHERE email = '%s'", email)
// 易受攻击:' OR '1'='1
row := db.QueryRow(query)
// ...
}
正确做法:
func FindUser(db *sql.DB, email string) (*User, error) {
// 使用参数化查询
query := "SELECT * FROM users WHERE email = $1"
row := db.QueryRow(query, email) // 安全
// ...
}
错误做法:
func UpdateUser(db *sql.DB, user *User) error {
tx, _ := db.Begin()
tx.Exec("UPDATE users SET name = $1 WHERE id = $2", user.Name, user.ID)
tx.Commit() // 忽略错误 - 数据可能未提交!
return nil
}
正确做法:
func UpdateUser(db *sql.DB, user *User) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback() // 如果未达到提交则回滚
_, err = tx.Exec("UPDATE users SET name = $1 WHERE id = $2", user.Name, user.ID)
if err != nil {
return err
}
return tx.Commit() // 检查提交错误
}
func OptimizeDatabasePool(db *sql.DB, config PoolConfig) {
// MaxOpenConns:总连接数(使用中 + 空闲)
// 经验法则:(CPU 核心数 * 2) + 磁盘主轴数
// 云数据库通常限制连接数(例如,AWS RDS:100-5000)
db.SetMaxOpenConns(config.MaxOpen) // 示例:小型应用 25
// MaxIdleConns:准备重用的空闲连接数
// 应低于 MaxOpenConns
// 太低:频繁重新连接(慢)
// 太高:浪费资源
db.SetMaxIdleConns(config.MaxIdle) // 示例:5-10
// ConnMaxLifetime:任何连接的最大寿命
// 防止与负载均衡器的陈旧连接
// 推荐:5-15 分钟
db.SetConnMaxLifetime(config.MaxLifetime)
// ConnMaxIdleTime:在此持续时间后关闭空闲连接
// 在低流量期间节省资源
// 推荐:1-5 分钟
db.SetConnMaxIdleTime(config.MaxIdleTime)
}
type PoolConfig struct {
MaxOpen int
MaxIdle int
MaxLifetime time.Duration
MaxIdleTime time.Duration
}
// 示例配置
var (
// 开发环境:低资源使用
DevConfig = PoolConfig{
MaxOpen: 10,
MaxIdle: 2,
MaxLifetime: 10 * time.Minute,
MaxIdleTime: 2 * time.Minute,
}
// 生产环境:高吞吐量
ProdConfig = PoolConfig{
MaxOpen: 25,
MaxIdle: 10,
MaxLifetime: 5 * time.Minute,
MaxIdleTime: 1 * time.Minute,
}
// 高流量 API:最大性能
HighTrafficConfig = PoolConfig{
MaxOpen: 50,
MaxIdle: 20,
MaxLifetime: 5 * time.Minute,
MaxIdleTime: 30 * time.Second,
}
)
func MonitorConnectionPool(db *sql.DB) {
stats := db.Stats()
fmt.Printf("Connection Pool Stats:\n")
fmt.Printf(" Open Connections: %d\n", stats.OpenConnections)
fmt.Printf(" In Use: %d\n", stats.InUse)
fmt.Printf(" Idle: %d\n", stats.Idle)
fmt.Printf(" Wait Count: %d\n", stats.WaitCount) // 等待连接的查询数
fmt.Printf(" Wait Duration: %s\n", stats.WaitDuration) // 总等待时间
fmt.Printf(" Max Idle Closed: %d\n", stats.MaxIdleClosed) // 因空闲而关闭的连接数
fmt.Printf(" Max Lifetime Closed: %d\n", stats.MaxLifetimeClosed)
// 如果等待过多则告警(需要更多连接)
if stats.WaitCount > 100 {
fmt.Println("WARNING: High wait count - consider increasing MaxOpenConns")
}
// 如果空闲关闭过多则告警(池太大)
if stats.MaxIdleClosed > 1000 {
fmt.Println("INFO: Many idle closures - consider reducing MaxIdleConns")
}
}
安装:
go get github.com/DATA-DOG/go-sqlmock
示例:
package repository_test
import (
"context"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/assert"
)
func TestGetUserByID(t *testing.T) {
// 创建模拟数据库
db, mock, err := sqlmock.New()
assert.NoError(t, err)
defer db.Close()
sqlxDB := sqlx.NewDb(db, "postgres")
repo := NewPostgresUserRepository(sqlxDB)
// 预期查询和结果
rows := sqlmock.NewRows([]string{"id", "name", "email"}).
AddRow(1, "Alice", "alice@example.com")
mock.ExpectQuery("SELECT (.+) FROM users WHERE id = \\$1").
WithArgs(1).
WillReturnRows(rows)
// 执行
user, err := repo.GetByID(context.Background(), 1)
// 断言
assert.NoError(t, err)
assert.Equal(t, "Alice", user.Name)
assert.Equal(t, "alice@example.com", user.Email)
assert.NoError(t, mock.ExpectationsWereMet())
}
func TestGetUserByID_NotFound(t *testing.T) {
db, mock, err := sqlmock.New()
assert.NoError(t, err)
defer db.Close()
sqlxDB := sqlx.NewDb(db, "postgres")
repo := NewPostgresUserRepository(sqlxDB)
mock.ExpectQuery
Go's database ecosystem provides multiple layers of abstraction for SQL database integration. From the standard library's database/sql to enhanced libraries like sqlx and PostgreSQL-optimized pgx, developers can choose the right tool for their performance and ergonomics needs.
Key Features:
Activate this skill when:
┌─────────────────────────────────────┐
│ What database are you using? │
└──────────────┬──────────────────────┘
│
┌──────────┴──────────┐
│ │
PostgreSQL Other SQL DB
│ │
▼ ▼
┌─────────────────┐ Use database/sql
│ Need max perf? │ + sqlx for convenience
└─────┬───────────┘
│
┌──┴──┐
Yes No
│ │
pgx sqlx + pq driver
Use database/sql when:
Use sqlx when:
Use pgx when:
Core Concepts:
package main
import (
"context"
"database/sql"
"time"
_ "github.com/lib/pq" // PostgreSQL driver
)
func setupDB(dsn string) (*sql.DB, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
// Connection pooling configuration
db.SetMaxOpenConns(25) // Max open connections
db.SetMaxIdleConns(5) // Max idle connections
db.SetConnMaxLifetime(5 * time.Minute) // Max connection lifetime
db.SetConnMaxIdleTime(1 * time.Minute) // Max idle time
// Verify connection
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
return nil, err
}
return db, nil
}
Key Patterns:
// Query single row
func GetUserByID(ctx context.Context, db *sql.DB, id int) (*User, error) {
var user User
query := `SELECT id, name, email, created_at FROM users WHERE id = $1`
err := db.QueryRowContext(ctx, query, id).Scan(
&user.ID, &user.Name, &user.Email, &user.CreatedAt,
)
if err == sql.ErrNoRows {
return nil, ErrUserNotFound // Custom error
}
if err != nil {
return nil, fmt.Errorf("query user: %w", err)
}
return &user, nil
}
// Query multiple rows
func ListActiveUsers(ctx context.Context, db *sql.DB) ([]User, error) {
query := `SELECT id, name, email, created_at FROM users WHERE active = true`
rows, err := db.QueryContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("query users: %w", err)
}
defer rows.Close() // CRITICAL: Always close rows
var users []User
for rows.Next() {
var user User
if err := rows.Scan(&user.ID, &user.Name, &user.Email, &user.CreatedAt); err != nil {
return nil, fmt.Errorf("scan user: %w", err)
}
users = append(users, user)
}
// Check for errors during iteration
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate users: %w", err)
}
return users, nil
}
Installation:
go get github.com/jmoiron/sqlx
Core Features:
package main
import (
"context"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
)
type User struct {
ID int `db:"id"`
Name string `db:"name"`
Email string `db:"email"`
CreatedAt time.Time `db:"created_at"`
}
// Get single struct
func GetUserByID(ctx context.Context, db *sqlx.DB, id int) (*User, error) {
var user User
query := `SELECT id, name, email, created_at FROM users WHERE id = $1`
err := db.GetContext(ctx, &user, query, id)
if err == sql.ErrNoRows {
return nil, ErrUserNotFound
}
return &user, err
}
// Select multiple structs
func ListUsers(ctx context.Context, db *sqlx.DB, limit int) ([]User, error) {
var users []User
query := `SELECT id, name, email, created_at FROM users LIMIT $1`
err := db.SelectContext(ctx, &users, query, limit)
return users, err
}
// Named queries
func FindUsersByName(ctx context.Context, db *sqlx.DB, name string) ([]User, error) {
var users []User
query := `SELECT * FROM users WHERE name LIKE :name || '%'`
nstmt, err := db.PrepareNamedContext(ctx, query)
if err != nil {
return nil, err
}
defer nstmt.Close()
err = nstmt.SelectContext(ctx, &users, map[string]interface{}{"name": name})
return users, err
}
// IN clause expansion
func GetUsersByIDs(ctx context.Context, db *sqlx.DB, ids []int) ([]User, error) {
var users []User
query, args, err := sqlx.In(`SELECT * FROM users WHERE id IN (?)`, ids)
if err != nil {
return nil, err
}
// Rebind for PostgreSQL ($1, $2, ...) vs MySQL (?, ?, ...)
query = db.Rebind(query)
err = db.SelectContext(ctx, &users, query, args...)
return users, err
}
Installation:
go get github.com/jackc/pgx/v5
go get github.com/jackc/pgx/v5/pgxpool
Connection Pool Setup:
package main
import (
"context"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
)
func setupPgxPool(ctx context.Context, dsn string) (*pgxpool.Pool, error) {
config, err := pgxpool.ParseConfig(dsn)
if err != nil {
return nil, fmt.Errorf("parse config: %w", err)
}
// Connection pool tuning
config.MaxConns = 25
config.MinConns = 5
config.MaxConnLifetime = 1 * time.Hour
config.MaxConnIdleTime = 30 * time.Minute
config.HealthCheckPeriod = 1 * time.Minute
pool, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
return nil, fmt.Errorf("create pool: %w", err)
}
// Verify connectivity
if err := pool.Ping(ctx); err != nil {
return nil, fmt.Errorf("ping: %w", err)
}
return pool, nil
}
Query Patterns:
// Query single row
func GetUser(ctx context.Context, pool *pgxpool.Pool, id int) (*User, error) {
var user User
query := `SELECT id, name, email, created_at FROM users WHERE id = $1`
err := pool.QueryRow(ctx, query, id).Scan(
&user.ID, &user.Name, &user.Email, &user.CreatedAt,
)
if err == pgx.ErrNoRows {
return nil, ErrUserNotFound
}
return &user, err
}
// Batch operations (pgx-specific optimization)
func BatchInsertUsers(ctx context.Context, pool *pgxpool.Pool, users []User) error {
batch := &pgx.Batch{}
query := `INSERT INTO users (name, email) VALUES ($1, $2)`
for _, user := range users {
batch.Queue(query, user.Name, user.Email)
}
results := pool.SendBatch(ctx, batch)
defer results.Close()
for range users {
_, err := results.Exec()
if err != nil {
return fmt.Errorf("batch insert: %w", err)
}
}
return nil
}
// COPY for bulk inserts (10x faster than INSERT)
func BulkCopyUsers(ctx context.Context, pool *pgxpool.Pool, users []User) error {
_, err := pool.CopyFrom(
ctx,
pgx.Identifier{"users"},
[]string{"name", "email"},
pgx.CopyFromSlice(len(users), func(i int) ([]interface{}, error) {
return []interface{}{users[i].Name, users[i].Email}, nil
}),
)
return err
}
package repository
import (
"context"
"database/sql"
)
// UserRepository defines data access interface
type UserRepository interface {
Create(ctx context.Context, user *User) error
GetByID(ctx context.Context, id int) (*User, error)
GetByEmail(ctx context.Context, email string) (*User, error)
Update(ctx context.Context, user *User) error
Delete(ctx context.Context, id int) error
List(ctx context.Context, filters ListFilters) ([]User, error)
}
// PostgresUserRepository implements UserRepository
type PostgresUserRepository struct {
db *sqlx.DB
}
func NewPostgresUserRepository(db *sqlx.DB) *PostgresUserRepository {
return &PostgresUserRepository{db: db}
}
func (r *PostgresUserRepository) Create(ctx context.Context, user *User) error {
query := `
INSERT INTO users (name, email, password_hash)
VALUES ($1, $2, $3)
RETURNING id, created_at
`
err := r.db.QueryRowContext(
ctx, query,
user.Name, user.Email, user.PasswordHash,
).Scan(&user.ID, &user.CreatedAt)
if err != nil {
return fmt.Errorf("insert user: %w", err)
}
return nil
}
func (r *PostgresUserRepository) GetByID(ctx context.Context, id int) (*User, error) {
var user User
query := `SELECT id, name, email, created_at, updated_at FROM users WHERE id = $1`
err := r.db.GetContext(ctx, &user, query, id)
if err == sql.ErrNoRows {
return nil, ErrUserNotFound
}
if err != nil {
return nil, fmt.Errorf("get user: %w", err)
}
return &user, nil
}
func (r *PostgresUserRepository) Update(ctx context.Context, user *User) error {
query := `
UPDATE users
SET name = $1, email = $2, updated_at = NOW()
WHERE id = $3
RETURNING updated_at
`
err := r.db.QueryRowContext(
ctx, query,
user.Name, user.Email, user.ID,
).Scan(&user.UpdatedAt)
if err == sql.ErrNoRows {
return ErrUserNotFound
}
return err
}
func (r *PostgresUserRepository) Delete(ctx context.Context, id int) error {
query := `DELETE FROM users WHERE id = $1`
result, err := r.db.ExecContext(ctx, query, id)
if err != nil {
return fmt.Errorf("delete user: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return err
}
if rows == 0 {
return ErrUserNotFound
}
return nil
}
package repository_test
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
// MockUserRepository for testing
type MockUserRepository struct {
mock.Mock
}
func (m *MockUserRepository) GetByID(ctx context.Context, id int) (*User, error) {
args := m.Called(ctx, id)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*User), args.Error(1)
}
func TestUserService_GetUser(t *testing.T) {
mockRepo := new(MockUserRepository)
service := NewUserService(mockRepo)
expectedUser := &User{ID: 1, Name: "Alice", Email: "alice@example.com"}
mockRepo.On("GetByID", mock.Anything, 1).Return(expectedUser, nil)
user, err := service.GetUser(context.Background(), 1)
assert.NoError(t, err)
assert.Equal(t, expectedUser, user)
mockRepo.AssertExpectations(t)
}
func (r *PostgresUserRepository) UpdateWithHistory(ctx context.Context, user *User) error {
tx, err := r.db.BeginTxx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback() // Safe to call even after commit
// Update user
query := `UPDATE users SET name = $1, email = $2 WHERE id = $3`
_, err = tx.ExecContext(ctx, query, user.Name, user.Email, user.ID)
if err != nil {
return fmt.Errorf("update user: %w", err)
}
// Insert history record
historyQuery := `INSERT INTO user_history (user_id, name, email, changed_at) VALUES ($1, $2, $3, NOW())`
_, err = tx.ExecContext(ctx, historyQuery, user.ID, user.Name, user.Email)
if err != nil {
return fmt.Errorf("insert history: %w", err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit tx: %w", err)
}
return nil
}
func (r *PostgresUserRepository) TransferBalance(ctx context.Context, fromID, toID int, amount float64) error {
// Use serializable isolation for financial transactions
txOpts := &sql.TxOptions{
Isolation: sql.LevelSerializable,
ReadOnly: false,
}
tx, err := r.db.BeginTxx(ctx, txOpts)
if err != nil {
return err
}
defer tx.Rollback()
// Deduct from sender
_, err = tx.ExecContext(ctx,
`UPDATE accounts SET balance = balance - $1 WHERE user_id = $2 AND balance >= $1`,
amount, fromID,
)
if err != nil {
return fmt.Errorf("deduct balance: %w", err)
}
// Add to receiver
_, err = tx.ExecContext(ctx,
`UPDATE accounts SET balance = balance + $1 WHERE user_id = $2`,
amount, toID,
)
if err != nil {
return fmt.Errorf("add balance: %w", err)
}
return tx.Commit()
}
func WithRetry(ctx context.Context, maxRetries int, fn func() error) error {
for i := 0; i < maxRetries; i++ {
err := fn()
if err == nil {
return nil
}
// Check for serialization error (PostgreSQL error code 40001)
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code == "40001" {
// Exponential backoff
time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
continue
}
return err // Non-retryable error
}
return fmt.Errorf("max retries exceeded")
}
// Usage
err := WithRetry(ctx, 3, func() error {
return r.TransferBalance(ctx, fromID, toID, amount)
})
┌─────────────────────────────────────┐
│ Migration tool selection │
└──────────────┬──────────────────────┘
│
┌──────────┴──────────┐
│ │
Simple SQL Complex logic
migrations (Go code needed)
│ │
▼ ▼
golang-migrate goose
(SQL only) (Go + SQL migrations)
Use golang-migrate when:
Use goose when:
Use sql-migrate when:
Installation:
# CLI tool
go install -tags 'postgres' github.com/golang-migrate/migrate/v4/cmd/migrate@latest
# Library
go get -u github.com/golang-migrate/migrate/v4
go get -u github.com/golang-migrate/migrate/v4/database/postgres
go get -u github.com/golang-migrate/migrate/v4/source/file
Migration Files:
# Create migration
migrate create -ext sql -dir migrations -seq create_users_table
# Generates:
# migrations/000001_create_users_table.up.sql
# migrations/000001_create_users_table.down.sql
000001_create_users_table.up.sql:
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_users_email ON users(email);
000001_create_users_table.down.sql:
DROP INDEX IF EXISTS idx_users_email;
DROP TABLE IF EXISTS users;
Programmatic Migration:
package main
import (
"fmt"
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/postgres"
_ "github.com/golang-migrate/migrate/v4/source/file"
)
func runMigrations(databaseURL, migrationsPath string) error {
m, err := migrate.New(
fmt.Sprintf("file://%s", migrationsPath),
databaseURL,
)
if err != nil {
return fmt.Errorf("create migrate instance: %w", err)
}
defer m.Close()
if err := m.Up(); err != nil && err != migrate.ErrNoChange {
return fmt.Errorf("run migrations: %w", err)
}
version, dirty, err := m.Version()
if err != nil {
return err
}
fmt.Printf("Migration complete. Version: %d, Dirty: %v\n", version, dirty)
return nil
}
CLI Usage:
# Apply all up migrations
migrate -path migrations -database "postgres://user:pass@localhost:5432/db?sslmode=disable" up
# Rollback one migration
migrate -path migrations -database $DATABASE_URL down 1
# Go to specific version
migrate -path migrations -database $DATABASE_URL goto 5
# Check current version
migrate -path migrations -database $DATABASE_URL version
type User struct {
ID int `db:"id"`
Name string `db:"name"`
Email string `db:"email"`
Phone sql.NullString `db:"phone"` // Nullable string
Age sql.NullInt64 `db:"age"` // Nullable int
UpdatedAt sql.NullTime `db:"updated_at"` // Nullable timestamp
}
func (r *PostgresUserRepository) GetUser(ctx context.Context, id int) (*User, error) {
var user User
err := r.db.GetContext(ctx, &user, `SELECT * FROM users WHERE id = $1`, id)
if err != nil {
return nil, err
}
// Access nullable fields
if user.Phone.Valid {
fmt.Println("Phone:", user.Phone.String)
}
return &user, nil
}
// Setting NULL values
func (r *PostgresUserRepository) UpdatePhone(ctx context.Context, userID int, phone *string) error {
var nullPhone sql.NullString
if phone != nil {
nullPhone = sql.NullString{String: *phone, Valid: true}
}
// If phone is nil, nullPhone.Valid is false, SQL writes NULL
query := `UPDATE users SET phone = $1 WHERE id = $2`
_, err := r.db.ExecContext(ctx, query, nullPhone, userID)
return err
}
// Custom nullable type with JSON marshaling
type NullString struct {
sql.NullString
}
func (ns NullString) MarshalJSON() ([]byte, error) {
if !ns.Valid {
return []byte("null"), nil
}
return json.Marshal(ns.String)
}
func (ns *NullString) UnmarshalJSON(data []byte) error {
if string(data) == "null" {
ns.Valid = false
return nil
}
var s string
if err := json.Unmarshal(data, &s); err != nil {
return err
}
ns.String = s
ns.Valid = true
return nil
}
Wrong:
func GetUsersWithPosts(ctx context.Context, db *sqlx.DB) ([]UserWithPosts, error) {
var users []User
db.SelectContext(ctx, &users, `SELECT * FROM users`)
for i, user := range users {
var posts []Post
// N+1: One query per user!
db.SelectContext(ctx, &posts, `SELECT * FROM posts WHERE user_id = $1`, user.ID)
users[i].Posts = posts
}
return users, nil
}
Correct:
func GetUsersWithPosts(ctx context.Context, db *sqlx.DB) ([]UserWithPosts, error) {
// Single query with JOIN
query := `
SELECT u.id, u.name, p.id as post_id, p.title, p.content
FROM users u
LEFT JOIN posts p ON p.user_id = u.id
ORDER BY u.id
`
rows, err := db.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
usersMap := make(map[int]*UserWithPosts)
for rows.Next() {
var userID int
var userName string
var postID sql.NullInt64
var title, content sql.NullString
rows.Scan(&userID, &userName, &postID, &title, &content)
if _, exists := usersMap[userID]; !exists {
usersMap[userID] = &UserWithPosts{ID: userID, Name: userName}
}
if postID.Valid {
usersMap[userID].Posts = append(usersMap[userID].Posts, Post{
ID: int(postID.Int64),
Title: title.String,
Content: content.String,
})
}
}
result := make([]UserWithPosts, 0, len(usersMap))
for _, user := range usersMap {
result = append(result, *user)
}
return result, nil
}
Wrong:
db, _ := sql.Open("postgres", dsn)
// Uses defaults: unlimited connections, no timeouts
Correct:
db, _ := sql.Open("postgres", dsn)
// Production-ready pool settings
db.SetMaxOpenConns(25) // Limit total connections
db.SetMaxIdleConns(5) // Limit idle connections
db.SetConnMaxLifetime(5 * time.Minute) // Recycle old connections
db.SetConnMaxIdleTime(1 * time.Minute) // Close idle connections
Wrong:
func SlowQuery(db *sql.DB) error {
// No context - query runs until completion even if client disconnects
rows, err := db.Query("SELECT * FROM huge_table")
// ...
}
Correct:
func SlowQuery(ctx context.Context, db *sql.DB) error {
// Context cancellation propagates to database
rows, err := db.QueryContext(ctx, "SELECT * FROM huge_table")
// If ctx is canceled, query is terminated
}
Wrong:
func GetUsers(db *sql.DB) ([]User, error) {
rows, _ := db.Query("SELECT * FROM users")
// Missing rows.Close() - connection leak!
var users []User
for rows.Next() {
// ...
}
return users, nil
}
Correct:
func GetUsers(db *sql.DB) ([]User, error) {
rows, err := db.Query("SELECT * FROM users")
if err != nil {
return nil, err
}
defer rows.Close() // CRITICAL: Always defer Close
var users []User
for rows.Next() {
// ...
}
return users, rows.Err() // Check for iteration errors
}
Wrong:
func FindUser(db *sql.DB, email string) (*User, error) {
// NEVER concatenate user input into SQL!
query := fmt.Sprintf("SELECT * FROM users WHERE email = '%s'", email)
// Vulnerable to: ' OR '1'='1
row := db.QueryRow(query)
// ...
}
Correct:
func FindUser(db *sql.DB, email string) (*User, error) {
// Use parameterized queries
query := "SELECT * FROM users WHERE email = $1"
row := db.QueryRow(query, email) // Safe
// ...
}
Wrong:
func UpdateUser(db *sql.DB, user *User) error {
tx, _ := db.Begin()
tx.Exec("UPDATE users SET name = $1 WHERE id = $2", user.Name, user.ID)
tx.Commit() // Ignores errors - data may not be committed!
return nil
}
Correct:
func UpdateUser(db *sql.DB, user *User) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback() // Rollback if commit not reached
_, err = tx.Exec("UPDATE users SET name = $1 WHERE id = $2", user.Name, user.ID)
if err != nil {
return err
}
return tx.Commit() // Check commit error
}
func OptimizeDatabasePool(db *sql.DB, config PoolConfig) {
// MaxOpenConns: Total connections (in-use + idle)
// Rule of thumb: (CPU cores * 2) + disk spindles
// Cloud databases often limit connections (e.g., AWS RDS: 100-5000)
db.SetMaxOpenConns(config.MaxOpen) // Example: 25 for small app
// MaxIdleConns: Idle connections ready for reuse
// Should be lower than MaxOpenConns
// Too low: frequent reconnections (slow)
// Too high: wasted resources
db.SetMaxIdleConns(config.MaxIdle) // Example: 5-10
// ConnMaxLifetime: Maximum age of any connection
// Prevents stale connections to load balancers
// Recommended: 5-15 minutes
db.SetConnMaxLifetime(config.MaxLifetime)
// ConnMaxIdleTime: Close idle connections after this duration
// Saves resources during low traffic
// Recommended: 1-5 minutes
db.SetConnMaxIdleTime(config.MaxIdleTime)
}
type PoolConfig struct {
MaxOpen int
MaxIdle int
MaxLifetime time.Duration
MaxIdleTime time.Duration
}
// Example configurations
var (
// Development: Low resource usage
DevConfig = PoolConfig{
MaxOpen: 10,
MaxIdle: 2,
MaxLifetime: 10 * time.Minute,
MaxIdleTime: 2 * time.Minute,
}
// Production: High throughput
ProdConfig = PoolConfig{
MaxOpen: 25,
MaxIdle: 10,
MaxLifetime: 5 * time.Minute,
MaxIdleTime: 1 * time.Minute,
}
// High-traffic API: Maximum performance
HighTrafficConfig = PoolConfig{
MaxOpen: 50,
MaxIdle: 20,
MaxLifetime: 5 * time.Minute,
MaxIdleTime: 30 * time.Second,
}
)
func MonitorConnectionPool(db *sql.DB) {
stats := db.Stats()
fmt.Printf("Connection Pool Stats:\n")
fmt.Printf(" Open Connections: %d\n", stats.OpenConnections)
fmt.Printf(" In Use: %d\n", stats.InUse)
fmt.Printf(" Idle: %d\n", stats.Idle)
fmt.Printf(" Wait Count: %d\n", stats.WaitCount) // Queries waited for connection
fmt.Printf(" Wait Duration: %s\n", stats.WaitDuration) // Total wait time
fmt.Printf(" Max Idle Closed: %d\n", stats.MaxIdleClosed) // Connections closed due to idle
fmt.Printf(" Max Lifetime Closed: %d\n", stats.MaxLifetimeClosed)
// Alert if too many waits (need more connections)
if stats.WaitCount > 100 {
fmt.Println("WARNING: High wait count - consider increasing MaxOpenConns")
}
// Alert if many idle closures (pool too large)
if stats.MaxIdleClosed > 1000 {
fmt.Println("INFO: Many idle closures - consider reducing MaxIdleConns")
}
}
Installation:
go get github.com/DATA-DOG/go-sqlmock
Example:
package repository_test
import (
"context"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/assert"
)
func TestGetUserByID(t *testing.T) {
// Create mock database
db, mock, err := sqlmock.New()
assert.NoError(t, err)
defer db.Close()
sqlxDB := sqlx.NewDb(db, "postgres")
repo := NewPostgresUserRepository(sqlxDB)
// Expected query and result
rows := sqlmock.NewRows([]string{"id", "name", "email"}).
AddRow(1, "Alice", "alice@example.com")
mock.ExpectQuery("SELECT (.+) FROM users WHERE id = \\$1").
WithArgs(1).
WillReturnRows(rows)
// Execute
user, err := repo.GetByID(context.Background(), 1)
// Assert
assert.NoError(t, err)
assert.Equal(t, "Alice", user.Name)
assert.Equal(t, "alice@example.com", user.Email)
assert.NoError(t, mock.ExpectationsWereMet())
}
func TestGetUserByID_NotFound(t *testing.T) {
db, mock, err := sqlmock.New()
assert.NoError(t, err)
defer db.Close()
sqlxDB := sqlx.NewDb(db, "postgres")
repo := NewPostgresUserRepository(sqlxDB)
mock.ExpectQuery("SELECT (.+) FROM users WHERE id = \\$1").
WithArgs(999).
WillReturnError(sql.ErrNoRows)
user, err := repo.GetByID(context.Background(), 999)
assert.Nil(t, user)
assert.ErrorIs(t, err, ErrUserNotFound)
assert.NoError(t, mock.ExpectationsWereMet())
}
// +build integration
package repository_test
import (
"context"
"testing"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
type UserRepositoryIntegrationSuite struct {
suite.Suite
db *sqlx.DB
repo *PostgresUserRepository
}
func (s *UserRepositoryIntegrationSuite) SetupSuite() {
// Connect to test database
db, err := sqlx.Connect("postgres", "postgres://test:test@localhost/testdb?sslmode=disable")
s.Require().NoError(err)
s.db = db
s.repo = NewPostgresUserRepository(db)
}
func (s *UserRepositoryIntegrationSuite) TearDownSuite() {
s.db.Close()
}
func (s *UserRepositoryIntegrationSuite) SetupTest() {
// Clean database before each test
_, err := s.db.Exec("TRUNCATE users RESTART IDENTITY CASCADE")
s.Require().NoError(err)
}
func (s *UserRepositoryIntegrationSuite) TestCreateUser() {
user := &User{Name: "Alice", Email: "alice@example.com"}
err := s.repo.Create(context.Background(), user)
s.NoError(err)
s.NotZero(user.ID)
s.NotZero(user.CreatedAt)
}
func (s *UserRepositoryIntegrationSuite) TestGetUserByID() {
// Insert test data
user := &User{Name: "Bob", Email: "bob@example.com"}
s.repo.Create(context.Background(), user)
// Test retrieval
retrieved, err := s.repo.GetByID(context.Background(), user.ID)
s.NoError(err)
s.Equal(user.Name, retrieved.Name)
s.Equal(user.Email, retrieved.Email)
}
func TestUserRepositoryIntegration(t *testing.T) {
suite.Run(t, new(UserRepositoryIntegrationSuite))
}
Run integration tests:
# Skip integration tests by default
go test ./...
# Run only integration tests
go test -tags=integration ./...
Go database patterns prioritize simplicity, type safety, and performance:
Library Selection:
database/sql for portabilitysqlx for convenience and reduced boilerplatepgx for PostgreSQL-specific high-performance applicationsCore Patterns:
Migration Strategy:
golang-migrate for version-controlled schema evolutionAvoid Common Pitfalls:
Testing:
By following these patterns, you'll build robust, performant, and maintainable database layers in Go.
Weekly Installs
115
Repository
GitHub Stars
20
First Seen
Jan 23, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykFail
Installed on
opencode92
gemini-cli91
codex87
github-copilot87
claude-code81
cursor73
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
116,600 周安装
Skill Creator 技能创建指南:为 Claude AI 构建模块化专业技能的完整教程
660 周安装
gstack工作流助手:Claude Code专家团队,AI驱动开发规划、代码评审与自动化测试
664 周安装
find-skills 技能查找工具:快速发现并安装智能体技能,扩展AI能力
669 周安装
专业转化文案撰写指南:提升营销效果与SEO优化的文案技巧
656 周安装
WordPress Elementor 页面编辑与模板管理指南:WP-CLI 与浏览器自动化
674 周安装
Skill Forge 技能开发指南:Claude AI 专用技能创建与优化工作流程
668 周安装