cocoindex by davila7/claude-code-templates
npx skills add https://github.com/davila7/claude-code-templates --skill cocoindexCocoIndex 是一个面向 AI 的超高性能实时数据转换框架,支持增量处理。此技能可用于构建索引流,从数据源提取数据,应用转换(分块、嵌入、LLM 提取),并导出到目标(向量数据库、图数据库、关系型数据库)。
核心能力:
主要特性:
详细文档请见: https://cocoindex.io/docs/ 搜索文档: https://cocoindex.io/docs/search?q=url%20encoded%20keyword
当用户请求以下内容时使用:
通过提问澄清以理解:
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
数据源:
转换:
目标:
根据用户需求,指导用户将带有适当额外包的 CocoIndex 添加到他们的项目中:
必需依赖项:
cocoindex - 核心功能、CLI 和大多数内置函数可选额外包(根据需要添加):
cocoindex[embeddings] - 用于 SentenceTransformer 嵌入(当使用 SentenceTransformerEmbed 时)cocoindex[colpali] - 用于 ColPali 图像/文档嵌入(当使用 ColPaliEmbedImage 或 ColPaliEmbedQuery 时)cocoindex[lancedb] - 用于 LanceDB 目标(当导出到 LanceDB 时)cocoindex[embeddings,lancedb] - 可以组合多个额外包包含内容:
embeddings 额外包:用于本地嵌入模型的 SentenceTransformers 库colpali 额外包:用于多模态文档/图像嵌入的 ColPali 引擎lancedb 额外包:用于 LanceDB 向量数据库支持的 LanceDB 客户端库用户可以使用他们喜欢的包管理器(pip、uv、poetry 等)安装,或添加到 pyproject.toml。
首先检查现有环境:
检查环境变量中是否存在 COCOINDEX_DATABASE_URL
postgres://cocoindex:cocoindex@localhost/cocoindex对于需要 LLM API 的流(嵌入、提取):
指导用户创建 .env 文件:
# 数据库连接(必需 - 内部存储)
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex
# LLM API 密钥(添加你需要的那些)
OPENAI_API_KEY=sk-... # 用于 OpenAI(生成 + 嵌入)
ANTHROPIC_API_KEY=sk-ant-... # 用于 Anthropic(仅生成)
GOOGLE_API_KEY=... # 用于 Gemini(生成 + 嵌入)
VOYAGE_API_KEY=pa-... # 用于 Voyage(仅嵌入)
# Ollama 不需要 API 密钥(本地)
更多 LLM 选项请见: https://cocoindex.io/docs/ai/llm
创建基本项目结构:
# main.py
from dotenv import load_dotenv
import cocoindex
@cocoindex.flow_def(name="FlowName")
def my_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# 流定义写在这里
pass
if __name__ == "__main__":
load_dotenv()
cocoindex.init()
my_flow.update()
遵循此结构:
@cocoindex.flow_def(name="DescriptiveName")
def flow_name(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# 1. 导入源数据
data_scope["source_name"] = flow_builder.add_source(
cocoindex.sources.SourceType(...)
)
# 2. 为输出创建收集器
collector = data_scope.add_collector()
# 3. 转换数据(遍历行)
with data_scope["source_name"].row() as item:
# 应用转换
item["new_field"] = item["existing_field"].transform(
cocoindex.functions.FunctionName(...)
)
...
# 嵌套迭代(例如,文档内的块)
with item["nested_table"].row() as nested_item:
# 更多转换
nested_item["embedding"] = nested_item["text"].transform(...)
# 收集数据以导出
collector.collect(
field1=nested_item["field1"],
field2=item["field2"],
generated_id=cocoindex.GeneratedField.UUID
)
# 4. 导出到目标
collector.export(
"target_name",
cocoindex.targets.TargetType(...),
primary_key_fields=["field1"],
vector_indexes=[...] # 如果需要
)
关键原则:
.row() 遍历表数据item["new_field"] = item["existing_field"].transform(...),而不是像 new_field = item["existing_field"].transform(...) 这样的局部变量需要避免的常见错误:
❌ 错误: 使用局部变量进行转换
with data_scope["files"].row() as file:
summary = file["content"].transform(...) # ❌ 局部变量
summaries_collector.collect(filename=file["filename"], summary=summary)
✅ 正确: 赋值给行字段
with data_scope["files"].row() as file:
file["summary"] = file["content"].transform(...) # ✅ 字段赋值
summaries_collector.collect(filename=file["filename"], summary=file["summary"])
❌ 错误: 创建不必要的数据类来镜像流字段
from dataclasses import dataclass
@dataclass
class FileSummary: # ❌ 不必要 - CocoIndex 自动管理字段
filename: str
summary: str
embedding: list[float]
# 这个数据类在流中从未使用!
重要: 下面列出的模式是常见的起点,但你无法详尽列举所有可能的情况。当用户需求与现有模式不匹配时:
常见起点模式(详细示例请参考引用):
对于文本嵌入: 加载 references/flow_patterns.md 并参考“模式 1:简单文本嵌入”
对于代码嵌入: 加载 references/flow_patterns.md 并参考“模式 2:带语言检测的代码嵌入”
对于 LLM 提取 + 知识图谱: 加载 references/flow_patterns.md 并参考“模式 3:基于 LLM 的提取到知识图谱”
对于实时更新: 加载 references/flow_patterns.md 并参考“模式 4:带刷新间隔的实时更新”
对于自定义函数: 加载 references/flow_patterns.md 并参考“模式 5:自定义转换函数”
对于可重用查询逻辑: 加载 references/flow_patterns.md 并参考“模式 6:用于可重用逻辑的转换流”
对于并发控制: 加载 references/flow_patterns.md 并参考“模式 7:并发控制”
模式组合示例:
如果用户要求“从 S3 索引图像,使用视觉 API 生成标题,并存储到 Qdrant”,则组合:
没有单一模式完全覆盖此场景,但构建块是可组合的。
指导用户进行测试:
# 1. 带设置运行
cocoindex update --setup -f main # -f 强制设置,无需确认提示
# 2. 启动服务器并将用户重定向到 CocoInsight
cocoindex server -ci main
# 然后在 https://cocoindex.io/cocoinsight 打开 CocoInsight
CocoIndex 拥有独立于编程语言的类型系统。所有数据类型都在流定义时确定,使得模式清晰且可预测。
重要:何时定义类型:
类型注解要求:
Any、dict[str, Any] 或省略注解;引擎已经知道类型为什么特定的返回类型很重要: 自定义函数的返回类型让 CocoIndex 无需处理真实数据即可推断整个流中的字段类型。这使得能够创建适当的目标模式(例如,具有固定维度的向量索引)。
常见类型类别:
基本类型:str、int、float、bool、bytes、datetime.date、datetime.datetime、uuid.UUID
向量类型(嵌入):如果你计划将向量导出到目标,请在返回类型中指定维度,因为大多数目标需要固定的向量维度
cocoindex.Vector[cocoindex.Float32, typing.Literal[768]] - 768 维 float32 向量(推荐)list[float] 也可以工作结构类型:数据类、NamedTuple 或 Pydantic 模型
Person)dict[str, Any] 或 Any表类型:
dict[K, V],其中 K = 键类型(基本或冻结结构),V = 结构类型list[R],其中 R = 结构类型dict[Any, Any] 或 list[Any]Json 类型:cocoindex.Json 用于非结构化/动态数据
可选类型:T | None 用于可空值
示例:
from dataclasses import dataclass
from typing import Literal
import cocoindex
@dataclass
class Person:
name: str
age: int
# ✅ 带维度的向量(推荐用于向量搜索)
@cocoindex.op.function(behavior_version=1)
def embed_text(text: str) -> cocoindex.Vector[cocoindex.Float32, Literal[768]]:
"""生成 768 维嵌入 - 向量索引需要维度。"""
# ... 嵌入逻辑 ...
return embedding # 768 个浮点数的 numpy 数组或列表
# ✅ 结构返回类型,宽松参数
@cocoindex.op.function(behavior_version=1)
def process_person(person: dict[str, Any]) -> Person:
"""参数可以是 dict[str, Any],返回必须是特定的结构。"""
return Person(name=person["name"], age=person["age"])
# ✅ LTable 返回类型
@cocoindex.op.function(behavior_version=1)
def filter_people(people: list[Any]) -> list[Person]:
"""返回类型指定特定结构的列表。"""
return [p for p in people if p.age >= 18]
# ❌ 错误:dict[str, str] 不是有效的特定 CocoIndex 类型
# @cocoindex.op.function(...)
# def bad_example(person: Person) -> dict[str, str]:
# return {"name": person.name}
全面的数据类型文档请见: https://cocoindex.io/docs/core/data_types
当用户需要自定义转换逻辑时,创建自定义函数。
使用独立函数的情况:
使用规范+执行器的情况:
@cocoindex.op.function(behavior_version=1)
def my_function(input_arg: str, optional_arg: int | None = None) -> dict:
"""
函数描述。
参数:
input_arg: 描述
optional_arg: 可选描述
"""
# 转换逻辑
return {"result": f"processed-{input_arg}"}
要求:
@cocoindex.op.function()cache=True 用于昂贵操作,behavior_version(使用缓存时必需)# 1. 定义配置规范
class MyFunction(cocoindex.op.FunctionSpec):
"""MyFunction 的配置。"""
model_name: str
threshold: float = 0.5
# 2. 定义执行器
@cocoindex.op.executor_class(cache=True, behavior_version=1)
class MyFunctionExecutor:
spec: MyFunction # 必需:链接到规范
model = None # 用于状态的实例变量
def prepare(self) -> None:
"""可选:在执行前运行一次。"""
# 加载模型、建立连接等
self.model = load_model(self.spec.model_name)
def __call__(self, text: str) -> dict:
"""必需:为每行数据执行。"""
# 使用 self.spec 进行配置
# 使用 self.model 获取加载的资源
result = self.model.process(text)
return {"result": result}
何时启用缓存:
重要: 当函数逻辑更改时,递增 behavior_version 以使缓存失效。
详细示例和模式,请加载 references/custom_functions.md。
关于自定义函数的更多信息请见: https://cocoindex.io/docs/custom_ops/custom_functions
设置流(创建资源):
cocoindex setup main
一次性更新:
cocoindex update main
# 带自动设置
cocoindex update --setup main
# 在设置和更新前强制重置所有内容
cocoindex update --reset main
实时更新(持续监控):
cocoindex update main.py -L
# 需要在源上设置 refresh_interval 或源特定的变更捕获
删除流(移除所有资源):
cocoindex drop main.py
检查流:
cocoindex show main.py:FlowName
无副作用测试:
cocoindex evaluate main.py:FlowName --output-dir ./test_output
完整的 CLI 参考,请加载 references/cli_operations.md。
CLI 文档请见: https://cocoindex.io/docs/core/cli
基本设置:
from dotenv import load_dotenv
import cocoindex
load_dotenv()
cocoindex.init()
@cocoindex.flow_def(name="MyFlow")
def my_flow(flow_builder, data_scope):
# ... 流定义 ...
pass
一次性更新:
stats = my_flow.update()
print(f"处理了 {stats.total_rows} 行")
# 异步
stats = await my_flow.update_async()
实时更新:
# 作为上下文管理器
with cocoindex.FlowLiveUpdater(my_flow) as updater:
# 更新器在后台运行
# 你的应用程序逻辑写在这里
pass
# 手动控制
updater = cocoindex.FlowLiveUpdater(
my_flow,
cocoindex.FlowLiveUpdaterOptions(
live_mode=True,
print_stats=True
)
)
updater.start()
# ... 应用程序逻辑 ...
updater.wait()
设置/删除:
my_flow.setup(report_to_stdout=True)
my_flow.drop(report_to_stdout=True)
cocoindex.setup_all_flows()
cocoindex.drop_all_flows()
使用转换流查询:
@cocoindex.transform_flow()
def text_to_embedding(text: cocoindex.DataSlice[str]) -> cocoindex.DataSlice[list[float]]:
return text.transform(
cocoindex.functions.SentenceTransformerEmbed(model="...")
)
# 在流中用于索引
doc["embedding"] = text_to_embedding(doc["content"])
# 用于查询
query_embedding = text_to_embedding.eval("搜索查询")
完整的 API 参考和模式,请加载 references/api_operations.md。
SplitRecursively - 智能分块文本
doc["chunks"] = doc["content"].transform(
cocoindex.functions.SplitRecursively(),
language="markdown", # 或 "python"、"javascript" 等
chunk_size=2000,
chunk_overlap=500
)
ParseJson - 解析 JSON 字符串
data = json_string.transform(cocoindex.functions.ParseJson())
DetectProgrammingLanguage - 从文件名检测语言
file["language"] = file["filename"].transform(
cocoindex.functions.DetectProgrammingLanguage()
)
SentenceTransformerEmbed - 本地嵌入模型
# 需要:cocoindex[embeddings]
chunk["embedding"] = chunk["text"].transform(
cocoindex.functions.SentenceTransformerEmbed(
model="sentence-transformers/all-MiniLM-L6-v2"
)
)
EmbedText - LLM API 嵌入
这是使用 LLM API(OpenAI、Voyage 等)生成嵌入的推荐方式。
chunk["embedding"] = chunk["text"].transform(
cocoindex.functions.EmbedText(
api_type=cocoindex.LlmApiType.OPENAI,
model="text-embedding-3-small",
)
)
ColPaliEmbedImage - 多模态图像嵌入
# 需要:cocoindex[colpali]
image["embedding"] = image["img_bytes"].transform(
cocoindex.functions.ColPaliEmbedImage(model="vidore/colpali-v1.2")
)
ExtractByLlm - 使用 LLM 提取结构化数据
这是使用 LLM 进行提取和摘要任务的推荐方式。它支持结构化输出(数据类、Pydantic 模型)和简单文本输出(str)。
import dataclasses
# 用于结构化提取
@dataclasses.dataclass
class ProductInfo:
name: str
price: float
category: str
item["product_info"] = item["text"].transform(
cocoindex.functions.ExtractByLlm(
llm_spec=cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.OPENAI,
model="gpt-4o-mini"
),
output_type=ProductInfo,
instruction="提取产品信息"
)
)
# 用于文本摘要/生成
file["summary"] = file["content"].transform(
cocoindex.functions.ExtractByLlm(
llm_spec=cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.OPENAI,
model="gpt-4o-mini"
),
output_type=str,
instruction="用一段话总结此文档"
)
)
LocalFile:
cocoindex.sources.LocalFile(
path="documents",
included_patterns=["*.md", "*.txt"],
excluded_patterns=["**/.*", "node_modules"]
)
AmazonS3:
cocoindex.sources.AmazonS3(
bucket="my-bucket",
prefix="documents/",
aws_access_key_id=cocoindex.add_transient_auth_entry("..."),
aws_secret_access_key=cocoindex.add_transient_auth_entry("...")
)
Postgres:
cocoindex.sources.Postgres(
connection=cocoindex.add_auth_entry("conn", cocoindex.sources.PostgresConnection(...)),
query="SELECT id, content FROM documents"
)
Postgres(带向量支持):
collector.export(
"target_name",
cocoindex.targets.Postgres(),
primary_key_fields=["id"],
vector_indexes=[
cocoindex.VectorIndexDef(
field_name="embedding",
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY
)
]
)
Qdrant:
collector.export(
"target_name",
cocoindex.targets.Qdrant(collection_name="my_collection"),
primary_key_fields=["id"]
)
LanceDB:
# 需要:cocoindex[lancedb]
collector.export(
"target_name",
cocoindex.targets.LanceDB(uri="lancedb_data", table_name="my_table"),
primary_key_fields=["id"]
)
Neo4j(节点):
collector.export(
"nodes",
cocoindex.targets.Neo4j(
connection=neo4j_conn,
mapping=cocoindex.targets.Nodes(label="Entity")
),
primary_key_fields=["id"]
)
Neo4j(关系):
collector.export(
"relationships",
cocoindex.targets.Neo4j(
connection=neo4j_conn,
mapping=cocoindex.targets.Relationships(
rel_type="RELATES_TO",
source=cocoindex.targets.NodeFromFields(
label="Entity",
fields=[cocoindex.targets.TargetFieldMapping(source="source_id", target="id")]
),
target=cocoindex.targets.NodeFromFields(
label="Entity",
fields=[cocoindex.targets.TargetFieldMapping(source="target_id", target="id")]
)
)
),
primary_key_fields=["id"]
)
cocoindex show main.py--app-dir.env 中是否有 COCOINDEX_DATABASE_URLpsql $COCOINDEX_DATABASE_URL--env-file 指定自定义位置cocoindex setup main.pycocoindex drop main.py && cocoindex setup main.pyrefresh_intervalmax_inflight_rows、max_inflight_bytes.env 中设置全局限制:COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS此技能包含常见模式和操作的全面参考文档:
当用户需要以下内容时,加载这些参考:
全面文档请见: https://cocoindex.io/docs/ 搜索特定主题: https://cocoindex.io/docs/search?q=url%20encoded%20keyword
每周安装次数
160
仓库
GitHub 星标数
23.4K
首次出现
2026年1月21日
安全审计
安装于
claude-code133
opencode130
gemini-cli125
cursor123
codex113
antigravity112
CocoIndex is an ultra-performant real-time data transformation framework for AI with incremental processing. This skill enables building indexing flows that extract data from sources, apply transformations (chunking, embedding, LLM extraction), and export to targets (vector databases, graph databases, relational databases).
Core capabilities:
Key features:
For detailed documentation: https://cocoindex.io/docs/ Search documentation: https://cocoindex.io/docs/search?q=url%20encoded%20keyword
Use when users request:
Ask clarifying questions to understand:
Data source:
Transformations:
Target:
Guide user to add CocoIndex with appropriate extras to their project based on their needs:
Required dependency:
cocoindex - Core functionality, CLI, and most built-in functionsOptional extras (add as needed):
cocoindex[embeddings] - For SentenceTransformer embeddings (when using SentenceTransformerEmbed)cocoindex[colpali] - For ColPali image/document embeddings (when using ColPaliEmbedImage or ColPaliEmbedQuery)cocoindex[lancedb] - For LanceDB target (when exporting to LanceDB)cocoindex[embeddings,lancedb] - Multiple extras can be combinedWhat's included:
embeddings extra: SentenceTransformers library for local embedding modelscolpali extra: ColPali engine for multimodal document/image embeddingslancedb extra: LanceDB client library for LanceDB vector database supportUsers can install using their preferred package manager (pip, uv, poetry, etc.) or add to pyproject.toml.
For installation details: https://cocoindex.io/docs/getting_started/installation
Check existing environment first:
Check if COCOINDEX_DATABASE_URL exists in environment variables
postgres://cocoindex:cocoindex@localhost/cocoindexFor flows requiring LLM APIs (embeddings, extraction):
Guide user to create.env file:
# Database connection (required - internal storage)
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex
# LLM API keys (add the ones you need)
OPENAI_API_KEY=sk-... # For OpenAI (generation + embeddings)
ANTHROPIC_API_KEY=sk-ant-... # For Anthropic (generation only)
GOOGLE_API_KEY=... # For Gemini (generation + embeddings)
VOYAGE_API_KEY=pa-... # For Voyage (embeddings only)
# Ollama requires no API key (local)
For more LLM options: https://cocoindex.io/docs/ai/llm
Create basic project structure:
# main.py
from dotenv import load_dotenv
import cocoindex
@cocoindex.flow_def(name="FlowName")
def my_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# Flow definition here
pass
if __name__ == "__main__":
load_dotenv()
cocoindex.init()
my_flow.update()
Follow this structure:
@cocoindex.flow_def(name="DescriptiveName")
def flow_name(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# 1. Import source data
data_scope["source_name"] = flow_builder.add_source(
cocoindex.sources.SourceType(...)
)
# 2. Create collector(s) for outputs
collector = data_scope.add_collector()
# 3. Transform data (iterate through rows)
with data_scope["source_name"].row() as item:
# Apply transformations
item["new_field"] = item["existing_field"].transform(
cocoindex.functions.FunctionName(...)
)
...
# Nested iteration (e.g., chunks within documents)
with item["nested_table"].row() as nested_item:
# More transformations
nested_item["embedding"] = nested_item["text"].transform(...)
# Collect data for export
collector.collect(
field1=nested_item["field1"],
field2=item["field2"],
generated_id=cocoindex.GeneratedField.UUID
)
# 4. Export to target
collector.export(
"target_name",
cocoindex.targets.TargetType(...),
primary_key_fields=["field1"],
vector_indexes=[...] # If needed
)
Key principles:
.row() to iterate through table dataitem["new_field"] = item["existing_field"].transform(...), NOT local variables like new_field = item["existing_field"].transform(...)Common mistakes to avoid:
❌ Wrong: Using local variables for transformations
with data_scope["files"].row() as file:
summary = file["content"].transform(...) # ❌ Local variable
summaries_collector.collect(filename=file["filename"], summary=summary)
✅ Correct: Assigning to row fields
with data_scope["files"].row() as file:
file["summary"] = file["content"].transform(...) # ✅ Field assignment
summaries_collector.collect(filename=file["filename"], summary=file["summary"])
❌ Wrong: Creating unnecessary dataclasses to mirror flow fields
from dataclasses import dataclass
@dataclass
class FileSummary: # ❌ Unnecessary - CocoIndex manages fields automatically
filename: str
summary: str
embedding: list[float]
# This dataclass is never used in the flow!
IMPORTANT: The patterns listed below are common starting points, but you cannot exhaustively enumerate all possible scenarios. When user requirements don't match existing patterns:
Common starting patterns (use references for detailed examples):
For text embedding: Load references/flow_patterns.md and refer to "Pattern 1: Simple Text Embedding"
For code embedding: Load references/flow_patterns.md and refer to "Pattern 2: Code Embedding with Language Detection"
For LLM extraction + knowledge graph: Load references/flow_patterns.md and refer to "Pattern 3: LLM-based Extraction to Knowledge Graph"
For live updates: Load references/flow_patterns.md and refer to "Pattern 4: Live Updates with Refresh Interval"
For custom functions: Load references/flow_patterns.md and refer to "Pattern 5: Custom Transform Function"
For reusable query logic: Load references/flow_patterns.md and refer to "Pattern 6: Transform Flow for Reusable Logic"
For concurrency control: Load references/flow_patterns.md and refer to "Pattern 7: Concurrency Control"
Example of pattern composition:
If a user asks to "index images from S3, generate captions with a vision API, and store in Qdrant", combine:
No single pattern covers this exact scenario, but the building blocks are composable.
Guide user through testing:
# 1. Run with setup
cocoindex update --setup -f main # -f force setup without confirmation prompts
# 2. Start a server and redirect users to CocoInsight
cocoindex server -ci main
# Then open CocoInsight at https://cocoindex.io/cocoinsight
CocoIndex has a type system independent of programming languages. All data types are determined at flow definition time, making schemas clear and predictable.
IMPORTANT: When to define types:
Type annotation requirements:
Any, dict[str, Any], or omit annotations; engine already knows the typesWhy specific return types matter: Custom function return types let CocoIndex infer field types throughout the flow without processing real data. This enables creating proper target schemas (e.g., vector indexes with fixed dimensions).
Common type categories:
Primitive types : str, int, float, bool, bytes, datetime.date, datetime.datetime, uuid.UUID
Vector types (embeddings): Specify dimension in return type if you plan to export as vectors to targets, as most targets require a fixed vector dimension
cocoindex.Vector[cocoindex.Float32, typing.Literal[768]] - 768-dim float32 vector (recommended)Examples:
from dataclasses import dataclass
from typing import Literal
import cocoindex
@dataclass
class Person:
name: str
age: int
# ✅ Vector with dimension (recommended for vector search)
@cocoindex.op.function(behavior_version=1)
def embed_text(text: str) -> cocoindex.Vector[cocoindex.Float32, Literal[768]]:
"""Generate 768-dim embedding - dimension needed for vector index."""
# ... embedding logic ...
return embedding # numpy array or list of 768 floats
# ✅ Struct return type, relaxed argument
@cocoindex.op.function(behavior_version=1)
def process_person(person: dict[str, Any]) -> Person:
"""Argument can be dict[str, Any], return must be specific Struct."""
return Person(name=person["name"], age=person["age"])
# ✅ LTable return type
@cocoindex.op.function(behavior_version=1)
def filter_people(people: list[Any]) -> list[Person]:
"""Return type specifies list of specific Struct."""
return [p for p in people if p.age >= 18]
# ❌ Wrong: dict[str, str] is not a valid specific CocoIndex type
# @cocoindex.op.function(...)
# def bad_example(person: Person) -> dict[str, str]:
# return {"name": person.name}
For comprehensive data types documentation: https://cocoindex.io/docs/core/data_types
When users need custom transformation logic, create custom functions.
Use standalone function when:
Use spec+executor when:
@cocoindex.op.function(behavior_version=1)
def my_function(input_arg: str, optional_arg: int | None = None) -> dict:
"""
Function description.
Args:
input_arg: Description
optional_arg: Optional description
"""
# Transformation logic
return {"result": f"processed-{input_arg}"}
Requirements:
@cocoindex.op.function()cache=True for expensive ops, behavior_version (required with cache)# 1. Define configuration spec
class MyFunction(cocoindex.op.FunctionSpec):
"""Configuration for MyFunction."""
model_name: str
threshold: float = 0.5
# 2. Define executor
@cocoindex.op.executor_class(cache=True, behavior_version=1)
class MyFunctionExecutor:
spec: MyFunction # Required: link to spec
model = None # Instance variables for state
def prepare(self) -> None:
"""Optional: run once before execution."""
# Load model, setup connections, etc.
self.model = load_model(self.spec.model_name)
def __call__(self, text: str) -> dict:
"""Required: execute for each data row."""
# Use self.spec for configuration
# Use self.model for loaded resources
result = self.model.process(text)
return {"result": result}
When to enable cache:
Important: Increment behavior_version when function logic changes to invalidate cache.
For detailed examples and patterns, load references/custom_functions.md.
For more on custom functions: https://cocoindex.io/docs/custom_ops/custom_functions
Setup flow (create resources):
cocoindex setup main
One-time update:
cocoindex update main
# With auto-setup
cocoindex update --setup main
# Force reset everything before setup and update
cocoindex update --reset main
Live update (continuous monitoring):
cocoindex update main.py -L
# Requires refresh_interval on source or source-specific change capture
Drop flow (remove all resources):
cocoindex drop main.py
Inspect flow:
cocoindex show main.py:FlowName
Test without side effects:
cocoindex evaluate main.py:FlowName --output-dir ./test_output
For complete CLI reference, load references/cli_operations.md.
For CLI documentation: https://cocoindex.io/docs/core/cli
Basic setup:
from dotenv import load_dotenv
import cocoindex
load_dotenv()
cocoindex.init()
@cocoindex.flow_def(name="MyFlow")
def my_flow(flow_builder, data_scope):
# ... flow definition ...
pass
One-time update:
stats = my_flow.update()
print(f"Processed {stats.total_rows} rows")
# Async
stats = await my_flow.update_async()
Live update:
# As context manager
with cocoindex.FlowLiveUpdater(my_flow) as updater:
# Updater runs in background
# Your application logic here
pass
# Manual control
updater = cocoindex.FlowLiveUpdater(
my_flow,
cocoindex.FlowLiveUpdaterOptions(
live_mode=True,
print_stats=True
)
)
updater.start()
# ... application logic ...
updater.wait()
Setup/drop:
my_flow.setup(report_to_stdout=True)
my_flow.drop(report_to_stdout=True)
cocoindex.setup_all_flows()
cocoindex.drop_all_flows()
Query with transform flows:
@cocoindex.transform_flow()
def text_to_embedding(text: cocoindex.DataSlice[str]) -> cocoindex.DataSlice[list[float]]:
return text.transform(
cocoindex.functions.SentenceTransformerEmbed(model="...")
)
# Use in flow for indexing
doc["embedding"] = text_to_embedding(doc["content"])
# Use for querying
query_embedding = text_to_embedding.eval("search query")
For complete API reference and patterns, load references/api_operations.md.
For API documentation: https://cocoindex.io/docs/core/flow_methods
SplitRecursively - Chunk text intelligently
doc["chunks"] = doc["content"].transform(
cocoindex.functions.SplitRecursively(),
language="markdown", # or "python", "javascript", etc.
chunk_size=2000,
chunk_overlap=500
)
ParseJson - Parse JSON strings
data = json_string.transform(cocoindex.functions.ParseJson())
DetectProgrammingLanguage - Detect language from filename
file["language"] = file["filename"].transform(
cocoindex.functions.DetectProgrammingLanguage()
)
SentenceTransformerEmbed - Local embedding model
# Requires: cocoindex[embeddings]
chunk["embedding"] = chunk["text"].transform(
cocoindex.functions.SentenceTransformerEmbed(
model="sentence-transformers/all-MiniLM-L6-v2"
)
)
EmbedText - LLM API embeddings
This is the recommended way to generate embeddings using LLM APIs (OpenAI, Voyage, etc.).
chunk["embedding"] = chunk["text"].transform(
cocoindex.functions.EmbedText(
api_type=cocoindex.LlmApiType.OPENAI,
model="text-embedding-3-small",
)
)
ColPaliEmbedImage - Multimodal image embeddings
# Requires: cocoindex[colpali]
image["embedding"] = image["img_bytes"].transform(
cocoindex.functions.ColPaliEmbedImage(model="vidore/colpali-v1.2")
)
ExtractByLlm - Extract structured data with LLM
This is the recommended way to use LLMs for extraction and summarization tasks. It supports both structured outputs (dataclasses, Pydantic models) and simple text outputs (str).
import dataclasses
# For structured extraction
@dataclasses.dataclass
class ProductInfo:
name: str
price: float
category: str
item["product_info"] = item["text"].transform(
cocoindex.functions.ExtractByLlm(
llm_spec=cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.OPENAI,
model="gpt-4o-mini"
),
output_type=ProductInfo,
instruction="Extract product information"
)
)
# For text summarization/generation
file["summary"] = file["content"].transform(
cocoindex.functions.ExtractByLlm(
llm_spec=cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.OPENAI,
model="gpt-4o-mini"
),
output_type=str,
instruction="Summarize this document in one paragraph"
)
)
Browse all sources: https://cocoindex.io/docs/sources/ Browse all targets: https://cocoindex.io/docs/targets/
LocalFile:
cocoindex.sources.LocalFile(
path="documents",
included_patterns=["*.md", "*.txt"],
excluded_patterns=["**/.*", "node_modules"]
)
AmazonS3:
cocoindex.sources.AmazonS3(
bucket="my-bucket",
prefix="documents/",
aws_access_key_id=cocoindex.add_transient_auth_entry("..."),
aws_secret_access_key=cocoindex.add_transient_auth_entry("...")
)
Postgres:
cocoindex.sources.Postgres(
connection=cocoindex.add_auth_entry("conn", cocoindex.sources.PostgresConnection(...)),
query="SELECT id, content FROM documents"
)
Postgres (with vector support):
collector.export(
"target_name",
cocoindex.targets.Postgres(),
primary_key_fields=["id"],
vector_indexes=[
cocoindex.VectorIndexDef(
field_name="embedding",
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY
)
]
)
Qdrant:
collector.export(
"target_name",
cocoindex.targets.Qdrant(collection_name="my_collection"),
primary_key_fields=["id"]
)
LanceDB:
# Requires: cocoindex[lancedb]
collector.export(
"target_name",
cocoindex.targets.LanceDB(uri="lancedb_data", table_name="my_table"),
primary_key_fields=["id"]
)
Neo4j (nodes):
collector.export(
"nodes",
cocoindex.targets.Neo4j(
connection=neo4j_conn,
mapping=cocoindex.targets.Nodes(label="Entity")
),
primary_key_fields=["id"]
)
Neo4j (relationships):
collector.export(
"relationships",
cocoindex.targets.Neo4j(
connection=neo4j_conn,
mapping=cocoindex.targets.Relationships(
rel_type="RELATES_TO",
source=cocoindex.targets.NodeFromFields(
label="Entity",
fields=[cocoindex.targets.TargetFieldMapping(source="source_id", target="id")]
),
target=cocoindex.targets.NodeFromFields(
label="Entity",
fields=[cocoindex.targets.TargetFieldMapping(source="target_id", target="id")]
)
)
),
primary_key_fields=["id"]
)
cocoindex show main.py--app-dir if not in project root.env has COCOINDEX_DATABASE_URLpsql $COCOINDEX_DATABASE_URL--env-file to specify custom locationcocoindex setup main.pycocoindex drop main.py && cocoindex setup main.pyrefresh_interval to sourcemax_inflight_rows, max_inflight_bytes.env: COCOINDEX_SOURCE_MAX_INFLIGHT_ROWSThis skill includes comprehensive reference documentation for common patterns and operations:
Load these references when users need:
For comprehensive documentation: https://cocoindex.io/docs/ Search specific topics: https://cocoindex.io/docs/search?q=url%20encoded%20keyword
Weekly Installs
160
Repository
GitHub Stars
23.4K
First Seen
Jan 21, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykFail
Installed on
claude-code133
opencode130
gemini-cli125
cursor123
codex113
antigravity112
AI Elements:基于shadcn/ui的AI原生应用组件库,快速构建对话界面
63,800 周安装
list[float] without dimension also worksStruct types : Dataclass, NamedTuple, or Pydantic model
Person)dict[str, Any] or AnyTable types :
dict[K, V] where K = key type (primitive or frozen struct), V = Struct typelist[R] where R = Struct typedict[Any, Any] or list[Any]Json type : cocoindex.Json for unstructured/dynamic data
Optional types : T | None for nullable values