重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
ml-data-pipeline-architecture by terrylica/cc-skills
npx skills add https://github.com/terrylica/cc-skills --skill ml-data-pipeline-architecture使用 Polars、Arrow 和 ClickHouse 构建高效 ML 数据管道的模式。
ADR : 2026-01-22-polars-preference-hook (效率偏好框架)
注意 : PreToolUse 钩子强制执行 Polars 偏好。要使用 Pandas,请在文件顶部添加
# polars-exception: <原因>。
在以下情况下使用此技能:
Dataset size?
├─ < 1M rows → Pandas OK (simpler API, richer ecosystem)
├─ 1M-10M rows → Consider Polars (2-5x faster, less memory)
└─ > 10M rows → Use Polars (required for memory efficiency)
Operations?
├─ Simple transforms → Either works
├─ Group-by aggregations → Polars 5-10x faster
├─ Complex joins → Polars with lazy evaluation
└─ Streaming/chunked → Polars scan_* functions
Integration?
├─ scikit-learn heavy → Pandas (better interop)
├─ PyTorch/custom → Polars + Arrow (zero-copy to tensor)
└─ ClickHouse source → Arrow stream → Polars (optimal)
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
# BAD: 3 memory copies
df = pd.read_sql(query, conn) # Copy 1: DB → pandas
X = df[features].values # Copy 2: pandas → numpy
tensor = torch.from_numpy(X) # Copy 3: numpy → tensor
# Peak memory: 3x data size
# GOOD: 0-1 memory copies
import clickhouse_connect
import polars as pl
import torch
client = clickhouse_connect.get_client(...)
arrow_table = client.query_arrow("SELECT * FROM bars") # Arrow in DB memory
df = pl.from_arrow(arrow_table) # Zero-copy view
X = df.select(features).to_numpy() # Single allocation
tensor = torch.from_numpy(X) # View (no copy)
# Peak memory: 1.2x data size
def query_arrow(client, query: str) -> pl.DataFrame:
"""ClickHouse → Arrow → Polars (zero-copy chain)."""
arrow_table = client.query_arrow(f"{query} FORMAT ArrowStream")
return pl.from_arrow(arrow_table)
# Usage
df = query_arrow(client, "SELECT * FROM bars WHERE ts >= '2024-01-01'")
# Polars has native ClickHouse support (see pola.rs for version requirements)
df = pl.read_database_uri(
query="SELECT * FROM bars",
uri="clickhouse://user:pass@host/db"
)
# For reproducible batch processing
client.query("SELECT * FROM bars INTO OUTFILE 'data.parquet' FORMAT Parquet")
df = pl.scan_parquet("data.parquet") # Lazy, memory-mapped
from torch.utils.data import TensorDataset, DataLoader
# Accept both pandas and polars
def prepare_data(df) -> tuple[torch.Tensor, torch.Tensor]:
if isinstance(df, pd.DataFrame):
df = pl.from_pandas(df)
X = df.select(features).to_numpy()
y = df.select(target).to_numpy()
return (
torch.from_numpy(X).float(),
torch.from_numpy(y).float()
)
X, y = prepare_data(df)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, pin_memory=True)
class PolarsDataset(torch.utils.data.Dataset):
"""Memory-efficient dataset from Polars DataFrame."""
def __init__(self, df: pl.DataFrame, features: list[str], target: str):
self.arrow = df.to_arrow() # Arrow backing for zero-copy slicing
self.features = features
self.target = target
def __len__(self) -> int:
return self.arrow.num_rows
def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
row = self.arrow.slice(idx, 1)
x = torch.tensor([row[f][0].as_py() for f in self.features], dtype=torch.float32)
y = torch.tensor(row[self.target][0].as_py(), dtype=torch.float32)
return x, y
# Define transformations lazily (no computation yet)
pipeline = (
pl.scan_parquet("raw_data.parquet")
.filter(pl.col("timestamp") >= start_date)
.with_columns([
(pl.col("close").pct_change()).alias("returns"),
(pl.col("volume").log()).alias("log_volume"),
])
.select(features + [target])
)
# Execute only when needed
train_df = pipeline.filter(pl.col("timestamp") < split_date).collect()
test_df = pipeline.filter(pl.col("timestamp") >= split_date).collect()
# Process file in chunks (never loads full file)
def process_large_file(path: str, chunk_size: int = 100_000):
reader = pl.scan_parquet(path)
for batch in reader.iter_batches(n_rows=chunk_size):
# Process each chunk
features = compute_features(batch)
yield features.to_numpy()
from pydantic import BaseModel, field_validator
class FeatureConfig(BaseModel):
features: list[str]
target: str
seq_len: int = 15
@field_validator("features")
@classmethod
def validate_features(cls, v):
required = {"returns_vs", "momentum_z", "atr_pct"}
missing = required - set(v)
if missing:
raise ValueError(f"Missing required features: {missing}")
return v
def validate_schema(df: pl.DataFrame, required: list[str], stage: str) -> None:
"""Fail-fast schema validation."""
missing = [c for c in required if c not in df.columns]
if missing:
raise ValueError(
f"[{stage}] Missing columns: {missing}\n"
f"Available: {sorted(df.columns)}"
)
| 操作 | Pandas | Polars | 加速比 |
|---|---|---|---|
| 读取 CSV (1GB) | 45s | 4s | 11x |
| 筛选行 | 2.1s | 0.4s | 5x |
| 分组聚合 | 3.8s | 0.3s | 13x |
| 排序 | 5.2s | 0.4s | 13x |
| 内存峰值 | 10GB | 2.5GB | 4x |
基准测试:5000 万行,20 列,MacBook M2
polars = "<版本>" (参见 PyPI)query_arrow()pl.from_pandas() 包装器prepare_sequences() 以接受两种类型pl.scan_*.to_numpy() 之前调用 .collect()# BAD: Convert back and forth
df_polars = pl.from_pandas(df_pandas)
df_pandas_again = df_polars.to_pandas() # Why?
# BAD: Defeats lazy evaluation
df = pl.scan_parquet("data.parquet").collect() # Full load
filtered = df.filter(...) # After the fact
# GOOD: Filter before collect
df = pl.scan_parquet("data.parquet").filter(...).collect()
# BAD: Loads entire file
df = pl.read_parquet("huge_file.parquet")
# GOOD: Stream in chunks
for batch in pl.scan_parquet("huge_file.parquet").iter_batches():
process(batch)
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 加载期间内存激增 | 过早收集 | 使用惰性求值,仅在需要时调用 collect() |
| Arrow 转换失败 | 不支持的数据类型 | 检查对象列,转换为原生类型 |
| ClickHouse 连接错误 | 端口或凭据错误 | 验证 host:8123 (HTTP) 或 host:9000 (原生) |
| 零拷贝不工作 | 中间进行了 pandas 转换 | 移除 to_pandas() 调用,保持在 Arrow/Polars 中 |
| Polars 钩子阻止代码 | 使用了 Pandas 但未声明例外 | 在文件顶部添加 # polars-exception: 原因 注释 |
| 分组操作缓慢 | 对大型数据集使用 pandas | 迁移到 Polars 以获得 5-10 倍加速 |
| 模式验证失败 | 列名区分大小写 | 验证来自源的准确列名 |
| PyTorch DataLoader 内存不足 | 将整个数据集加载到内存中 | 使用带有 Arrow 支持的 PolarsDataset 进行惰性访问 |
| Parquet 扫描性能差 | 未使用谓词下推 | 在 collect() 之前添加过滤器以进行惰性求值 |
| 张量中的类型不匹配 | Float64 与 Float32 不匹配 | 在转换为 numpy 前使用 .cast(pl.Float32) 显式转换 |
每周安装量
61
代码仓库
GitHub 星标数
26
首次出现
2026年2月7日
安全审计
安装于
gemini-cli59
opencode59
github-copilot58
codex58
kimi-cli58
amp58
Patterns for efficient ML data pipelines using Polars, Arrow, and ClickHouse.
ADR : 2026-01-22-polars-preference-hook (efficiency preferences framework)
Note : A PreToolUse hook enforces Polars preference. To use Pandas, add
# polars-exception: <reason>at file top.
Use this skill when:
Dataset size?
├─ < 1M rows → Pandas OK (simpler API, richer ecosystem)
├─ 1M-10M rows → Consider Polars (2-5x faster, less memory)
└─ > 10M rows → Use Polars (required for memory efficiency)
Operations?
├─ Simple transforms → Either works
├─ Group-by aggregations → Polars 5-10x faster
├─ Complex joins → Polars with lazy evaluation
└─ Streaming/chunked → Polars scan_* functions
Integration?
├─ scikit-learn heavy → Pandas (better interop)
├─ PyTorch/custom → Polars + Arrow (zero-copy to tensor)
└─ ClickHouse source → Arrow stream → Polars (optimal)
# BAD: 3 memory copies
df = pd.read_sql(query, conn) # Copy 1: DB → pandas
X = df[features].values # Copy 2: pandas → numpy
tensor = torch.from_numpy(X) # Copy 3: numpy → tensor
# Peak memory: 3x data size
# GOOD: 0-1 memory copies
import clickhouse_connect
import polars as pl
import torch
client = clickhouse_connect.get_client(...)
arrow_table = client.query_arrow("SELECT * FROM bars") # Arrow in DB memory
df = pl.from_arrow(arrow_table) # Zero-copy view
X = df.select(features).to_numpy() # Single allocation
tensor = torch.from_numpy(X) # View (no copy)
# Peak memory: 1.2x data size
def query_arrow(client, query: str) -> pl.DataFrame:
"""ClickHouse → Arrow → Polars (zero-copy chain)."""
arrow_table = client.query_arrow(f"{query} FORMAT ArrowStream")
return pl.from_arrow(arrow_table)
# Usage
df = query_arrow(client, "SELECT * FROM bars WHERE ts >= '2024-01-01'")
# Polars has native ClickHouse support (see pola.rs for version requirements)
df = pl.read_database_uri(
query="SELECT * FROM bars",
uri="clickhouse://user:pass@host/db"
)
# For reproducible batch processing
client.query("SELECT * FROM bars INTO OUTFILE 'data.parquet' FORMAT Parquet")
df = pl.scan_parquet("data.parquet") # Lazy, memory-mapped
from torch.utils.data import TensorDataset, DataLoader
# Accept both pandas and polars
def prepare_data(df) -> tuple[torch.Tensor, torch.Tensor]:
if isinstance(df, pd.DataFrame):
df = pl.from_pandas(df)
X = df.select(features).to_numpy()
y = df.select(target).to_numpy()
return (
torch.from_numpy(X).float(),
torch.from_numpy(y).float()
)
X, y = prepare_data(df)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, pin_memory=True)
class PolarsDataset(torch.utils.data.Dataset):
"""Memory-efficient dataset from Polars DataFrame."""
def __init__(self, df: pl.DataFrame, features: list[str], target: str):
self.arrow = df.to_arrow() # Arrow backing for zero-copy slicing
self.features = features
self.target = target
def __len__(self) -> int:
return self.arrow.num_rows
def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
row = self.arrow.slice(idx, 1)
x = torch.tensor([row[f][0].as_py() for f in self.features], dtype=torch.float32)
y = torch.tensor(row[self.target][0].as_py(), dtype=torch.float32)
return x, y
# Define transformations lazily (no computation yet)
pipeline = (
pl.scan_parquet("raw_data.parquet")
.filter(pl.col("timestamp") >= start_date)
.with_columns([
(pl.col("close").pct_change()).alias("returns"),
(pl.col("volume").log()).alias("log_volume"),
])
.select(features + [target])
)
# Execute only when needed
train_df = pipeline.filter(pl.col("timestamp") < split_date).collect()
test_df = pipeline.filter(pl.col("timestamp") >= split_date).collect()
# Process file in chunks (never loads full file)
def process_large_file(path: str, chunk_size: int = 100_000):
reader = pl.scan_parquet(path)
for batch in reader.iter_batches(n_rows=chunk_size):
# Process each chunk
features = compute_features(batch)
yield features.to_numpy()
from pydantic import BaseModel, field_validator
class FeatureConfig(BaseModel):
features: list[str]
target: str
seq_len: int = 15
@field_validator("features")
@classmethod
def validate_features(cls, v):
required = {"returns_vs", "momentum_z", "atr_pct"}
missing = required - set(v)
if missing:
raise ValueError(f"Missing required features: {missing}")
return v
def validate_schema(df: pl.DataFrame, required: list[str], stage: str) -> None:
"""Fail-fast schema validation."""
missing = [c for c in required if c not in df.columns]
if missing:
raise ValueError(
f"[{stage}] Missing columns: {missing}\n"
f"Available: {sorted(df.columns)}"
)
| Operation | Pandas | Polars | Speedup |
|---|---|---|---|
| Read CSV (1GB) | 45s | 4s | 11x |
| Filter rows | 2.1s | 0.4s | 5x |
| Group-by agg | 3.8s | 0.3s | 13x |
| Sort | 5.2s | 0.4s | 13x |
| Memory peak | 10GB | 2.5GB | 4x |
Benchmark: 50M rows, 20 columns, MacBook M2
polars = "<version>" to dependencies (see PyPI)query_arrow() in data clientpl.from_pandas() wrapper at trainer entryprepare_sequences() to accept both typespl.scan_*.collect() only before .to_numpy()# BAD: Convert back and forth
df_polars = pl.from_pandas(df_pandas)
df_pandas_again = df_polars.to_pandas() # Why?
# BAD: Defeats lazy evaluation
df = pl.scan_parquet("data.parquet").collect() # Full load
filtered = df.filter(...) # After the fact
# GOOD: Filter before collect
df = pl.scan_parquet("data.parquet").filter(...).collect()
# BAD: Loads entire file
df = pl.read_parquet("huge_file.parquet")
# GOOD: Stream in chunks
for batch in pl.scan_parquet("huge_file.parquet").iter_batches():
process(batch)
| Issue | Cause | Solution |
|---|---|---|
| Memory spike during load | Collecting too early | Use lazy evaluation, call collect() only when needed |
| Arrow conversion fails | Unsupported data type | Check for object columns, convert to native types |
| ClickHouse connection error | Wrong port or credentials | Verify host:8123 (HTTP) or host:9000 (native) |
| Zero-copy not working | Intermediate pandas conversion | Remove to_pandas() calls, stay in Arrow/Polars |
| Polars hook blocking code | Pandas used without exception | Add # polars-exception: reason comment at file top |
| Slow group-by operations | Using pandas for large datasets | Migrate to Polars for 5-10x speedup |
| Schema validation failure | Column names case-sensitive | Verify exact column names from source |
Weekly Installs
61
Repository
GitHub Stars
26
First Seen
Feb 7, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
gemini-cli59
opencode59
github-copilot58
codex58
kimi-cli58
amp58
AI界面设计评审工具 - 全面评估UI/UX设计质量、检测AI生成痕迹与优化用户体验
58,500 周安装
| PyTorch DataLoader OOM | Loading full dataset into memory | Use PolarsDataset with Arrow backing for lazy access |
| Parquet scan performance | Not using predicate pushdown | Add filters before collect() for lazy evaluation |
| Type mismatch in tensor | Float64 vs Float32 mismatch | Explicitly cast with .cast(pl.Float32) before numpy |