elasticsearch-file-ingest by elastic/agent-skills
npx skills add https://github.com/elastic/agent-skills --skill elasticsearch-file-ingest基于流式处理的大型数据文件(NDJSON、CSV、Parquet、Arrow IPC)导入与转换到 Elasticsearch。
logs/*.json)的多个文件此技能是自包含的。scripts/ 文件夹和 package.json 位于此技能目录中。请从此目录运行所有命令。引用位于其他位置的数据文件时,请使用绝对路径。
首次使用前,请安装依赖项:
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
npm install
Elasticsearch 连接通过环境变量配置。当提供 CLI 标志 --node、--api-key、--username 和 --password 时,它们会覆盖环境变量。
export ELASTICSEARCH_CLOUD_ID="deployment-name:base64encodedcloudid"
export ELASTICSEARCH_API_KEY="base64encodedapikey"
export ELASTICSEARCH_URL="https://elasticsearch:9200"
export ELASTICSEARCH_API_KEY="base64encodedapikey"
export ELASTICSEARCH_URL="https://elasticsearch:9200"
export ELASTICSEARCH_USERNAME="elastic"
export ELASTICSEARCH_PASSWORD="changeme"
对于本地开发和测试,使用 start-local 通过 Docker 或 Podman 快速启动 Elasticsearch 和 Kibana:
curl -fsSL https://elastic.co/start-local | sh
安装完成后,加载生成的 .env 文件:
source elastic-start-local/.env
export ELASTICSEARCH_URL="$ES_LOCAL_URL"
export ELASTICSEARCH_API_KEY="$ES_LOCAL_API_KEY"
export ELASTICSEARCH_INSECURE="true"
node scripts/ingest.js --file /absolute/path/to/data.json --target my-index
# NDJSON
cat /absolute/path/to/data.ndjson | node scripts/ingest.js --stdin --target my-index
# CSV
cat /absolute/path/to/data.csv | node scripts/ingest.js --stdin --source-format csv --target my-index
node scripts/ingest.js --file /absolute/path/to/users.csv --source-format csv --target users
node scripts/ingest.js --file /absolute/path/to/users.parquet --source-format parquet --target users
node scripts/ingest.js --file /absolute/path/to/users.arrow --source-format arrow --target users
# csv-options.json
# {
# "columns": true,
# "delimiter": ";",
# "trim": true
# }
node scripts/ingest.js --file /absolute/path/to/users.csv --source-format csv --csv-options csv-options.json --target users
使用 --infer-mappings 时,不要与 --source-format csv 结合使用。推断功能会将原始样本发送到 Elasticsearch 的 _text_structure/find_structure 端点,该端点会返回映射和一个包含 CSV 处理器的导入管道。如果同时设置了 --source-format csv,CSV 将在客户端和服务器端进行解析,导致索引为空。让 --infer-mappings 处理所有事情:
node scripts/ingest.js --file /absolute/path/to/users.csv --infer-mappings --target users
# infer-options.json
# {
# "sampleBytes": 200000,
# "lines_to_sample": 2000
# }
node scripts/ingest.js --file /absolute/path/to/users.csv --infer-mappings --infer-mappings-options infer-options.json --target users
node scripts/ingest.js --file /absolute/path/to/data.json --target my-index --mappings mappings.json
node scripts/ingest.js --file /absolute/path/to/data.json --target my-index --transform transform.js
node scripts/ingest.js --source-index old-index --target new-index
node scripts/ingest.js --source-index logs \
--node https://es8.example.com:9200 --api-key es8-key \
--target new-logs \
--target-node https://es9.example.com:9200 --target-api-key es9-key
--target <index> # 目标索引名称
--file <path> # 源文件(支持通配符,例如 logs/*.json)
--source-index <name> # 源 Elasticsearch 索引
--stdin # 从 stdin 读取 NDJSON/CSV
--node <url> # ES 节点 URL(默认:http://localhost:9200)
--api-key <key> # API 密钥身份验证
--username <user> # 基本身份验证用户名
--password <pass> # 基本身份验证密码
--target-node <url> # 目标 ES 节点 URL(如果未指定则使用 --node)
--target-api-key <key> # 目标 API 密钥
--target-username <user> # 目标用户名
--target-password <pass> # 目标密码
--mappings <file.json> # 映射文件(重建索引时自动从源复制)
--infer-mappings # 从文件/流推断映射/管道(请勿与 --source-format 结合使用)
--infer-mappings-options <file> # 推断选项(JSON 文件)
--delete-index # 如果目标索引存在则删除
--pipeline <name> # 导入管道名称
--transform <file.js> # 转换函数(导出为 default 或 module.exports)
--query <file.json> # 查询文件,用于过滤源文档
--source-format <fmt> # 源格式:ndjson|csv|parquet|arrow(默认:ndjson)
--csv-options <file> # CSV 解析器选项(JSON 文件)
--skip-header # 跳过第一行(例如 CSV 表头)
--buffer-size <kb> # 缓冲区大小,单位 KB(默认:5120)
--search-size <n> # 重建索引时每次搜索的文档数(默认:100)
--total-docs <n> # 进度条的总文档数(文件/流)
--stall-warn-seconds <n> # 停滞警告阈值(默认:30)
--progress-mode <mode> # 进度输出:auto|line|newline(默认:auto)
--debug-events # 记录暂停/恢复/停滞事件
--quiet # 禁用进度条
转换函数允许您在导入过程中修改文档。创建一个导出转换函数的 JavaScript 文件:
// ES 模块(默认)
export default function transform(doc) {
return {
...doc,
full_name: `${doc.first_name} ${doc.last_name}`,
timestamp: new Date().toISOString(),
};
}
// 或 CommonJS
module.exports = function transform(doc) {
return {
...doc,
full_name: `${doc.first_name} ${doc.last_name}`,
};
};
返回 null 或 undefined 以跳过文档:
export default function transform(doc) {
// 跳过无效文档
if (!doc.email || !doc.email.includes("@")) {
return null;
}
return doc;
}
返回一个数组,从一个源文档创建多个目标文档:
export default function transform(doc) {
// 将一条推文拆分为多个标签文档
const hashtags = doc.text.match(/#\w+/g) || [];
return hashtags.map((tag) => ({
hashtag: tag,
tweet_id: doc.id,
created_at: doc.created_at,
}));
}
重建索引时,映射会自动从源索引复制:
node scripts/ingest.js --source-index old-logs --target new-logs
{
"properties": {
"@timestamp": { "type": "date" },
"message": { "type": "text" },
"user": {
"properties": {
"name": { "type": "keyword" },
"email": { "type": "keyword" }
}
}
}
}
node scripts/ingest.js --file /absolute/path/to/data.json --target my-index --mappings mappings.json
在重建索引时使用查询文件过滤源文档:
{
"range": {
"@timestamp": {
"gte": "2024-01-01",
"lt": "2024-02-01"
}
}
}
node scripts/ingest.js \
--source-index logs \
--target filtered-logs \
--query filter.json
--delete-index 标志或删除现有索引和数据)。--infer-mappings 与 --source-format 结合使用。推断功能会创建一个服务器端的导入管道来处理解析(例如 CSV 处理器)。使用 --source-format csv 也会在客户端进行解析,导致双重解析和空索引。对于自动检测,请单独使用 --infer-mappings;对于手动控制,请使用 --source-format 和显式的 --mappings。--source-format csv 配合 --mappings。--infer-mappings。考虑以下替代方案:
每周安装次数
157
代码仓库
GitHub 星标数
89
首次出现
11 天前
安全审计
已安装于
cursor137
github-copilot127
opencode126
gemini-cli126
codex126
kimi-cli125
Stream-based ingestion and transformation of large data files (NDJSON, CSV, Parquet, Arrow IPC) into Elasticsearch.
logs/*.json)This skill is self-contained. The scripts/ folder and package.json live in this skill's directory. Run all commands from this directory. Use absolute paths when referencing data files located elsewhere.
Before first use, install dependencies:
npm install
Elasticsearch connection is configured via environment variables. The CLI flags --node, --api-key, --username, and --password override environment variables when provided.
export ELASTICSEARCH_CLOUD_ID="deployment-name:base64encodedcloudid"
export ELASTICSEARCH_API_KEY="base64encodedapikey"
export ELASTICSEARCH_URL="https://elasticsearch:9200"
export ELASTICSEARCH_API_KEY="base64encodedapikey"
export ELASTICSEARCH_URL="https://elasticsearch:9200"
export ELASTICSEARCH_USERNAME="elastic"
export ELASTICSEARCH_PASSWORD="changeme"
For local development and testing, use start-local to quickly spin up Elasticsearch and Kibana using Docker or Podman:
curl -fsSL https://elastic.co/start-local | sh
After installation completes, source the generated .env file:
source elastic-start-local/.env
export ELASTICSEARCH_URL="$ES_LOCAL_URL"
export ELASTICSEARCH_API_KEY="$ES_LOCAL_API_KEY"
export ELASTICSEARCH_INSECURE="true"
node scripts/ingest.js --file /absolute/path/to/data.json --target my-index
# NDJSON
cat /absolute/path/to/data.ndjson | node scripts/ingest.js --stdin --target my-index
# CSV
cat /absolute/path/to/data.csv | node scripts/ingest.js --stdin --source-format csv --target my-index
node scripts/ingest.js --file /absolute/path/to/users.csv --source-format csv --target users
node scripts/ingest.js --file /absolute/path/to/users.parquet --source-format parquet --target users
node scripts/ingest.js --file /absolute/path/to/users.arrow --source-format arrow --target users
# csv-options.json
# {
# "columns": true,
# "delimiter": ";",
# "trim": true
# }
node scripts/ingest.js --file /absolute/path/to/users.csv --source-format csv --csv-options csv-options.json --target users
When using --infer-mappings, do not combine it with --source-format csv. Inference sends a raw sample to Elasticsearch's _text_structure/find_structure endpoint, which returns both mappings and an ingest pipeline with a CSV processor. If --source-format csv is also set, CSV is parsed client-side and server-side, resulting in an empty index. Let --infer-mappings handle everything:
node scripts/ingest.js --file /absolute/path/to/users.csv --infer-mappings --target users
# infer-options.json
# {
# "sampleBytes": 200000,
# "lines_to_sample": 2000
# }
node scripts/ingest.js --file /absolute/path/to/users.csv --infer-mappings --infer-mappings-options infer-options.json --target users
node scripts/ingest.js --file /absolute/path/to/data.json --target my-index --mappings mappings.json
node scripts/ingest.js --file /absolute/path/to/data.json --target my-index --transform transform.js
node scripts/ingest.js --source-index old-index --target new-index
node scripts/ingest.js --source-index logs \
--node https://es8.example.com:9200 --api-key es8-key \
--target new-logs \
--target-node https://es9.example.com:9200 --target-api-key es9-key
--target <index> # Target index name
--file <path> # Source file (supports wildcards, e.g., logs/*.json)
--source-index <name> # Source Elasticsearch index
--stdin # Read NDJSON/CSV from stdin
--node <url> # ES node URL (default: http://localhost:9200)
--api-key <key> # API key authentication
--username <user> # Basic auth username
--password <pass> # Basic auth password
--target-node <url> # Target ES node URL (uses --node if not specified)
--target-api-key <key> # Target API key
--target-username <user> # Target username
--target-password <pass> # Target password
--mappings <file.json> # Mappings file (auto-copy from source if reindexing)
--infer-mappings # Infer mappings/pipeline from file/stream (do NOT combine with --source-format)
--infer-mappings-options <file> # Options for inference (JSON file)
--delete-index # Delete target index if exists
--pipeline <name> # Ingest pipeline name
--transform <file.js> # Transform function (export as default or module.exports)
--query <file.json> # Query file to filter source documents
--source-format <fmt> # Source format: ndjson|csv|parquet|arrow (default: ndjson)
--csv-options <file> # CSV parser options (JSON file)
--skip-header # Skip first line (e.g., CSV header)
--buffer-size <kb> # Buffer size in KB (default: 5120)
--search-size <n> # Docs per search when reindexing (default: 100)
--total-docs <n> # Total docs for progress bar (file/stream)
--stall-warn-seconds <n> # Stall warning threshold (default: 30)
--progress-mode <mode> # Progress output: auto|line|newline (default: auto)
--debug-events # Log pause/resume/stall events
--quiet # Disable progress bars
Transform functions let you modify documents during ingestion. Create a JavaScript file that exports a transform function:
// ES modules (default)
export default function transform(doc) {
return {
...doc,
full_name: `${doc.first_name} ${doc.last_name}`,
timestamp: new Date().toISOString(),
};
}
// Or CommonJS
module.exports = function transform(doc) {
return {
...doc,
full_name: `${doc.first_name} ${doc.last_name}`,
};
};
Return null or undefined to skip a document:
export default function transform(doc) {
// Skip invalid documents
if (!doc.email || !doc.email.includes("@")) {
return null;
}
return doc;
}
Return an array to create multiple target documents from one source:
export default function transform(doc) {
// Split a tweet into multiple hashtag documents
const hashtags = doc.text.match(/#\w+/g) || [];
return hashtags.map((tag) => ({
hashtag: tag,
tweet_id: doc.id,
created_at: doc.created_at,
}));
}
When reindexing, mappings are automatically copied from the source index:
node scripts/ingest.js --source-index old-logs --target new-logs
{
"properties": {
"@timestamp": { "type": "date" },
"message": { "type": "text" },
"user": {
"properties": {
"name": { "type": "keyword" },
"email": { "type": "keyword" }
}
}
}
}
node scripts/ingest.js --file /absolute/path/to/data.json --target my-index --mappings mappings.json
Filter source documents during reindexing with a query file:
{
"range": {
"@timestamp": {
"gte": "2024-01-01",
"lt": "2024-02-01"
}
}
}
node scripts/ingest.js \
--source-index logs \
--target filtered-logs \
--query filter.json
--delete-index flag or deleting existing indices and data) without explicit user confirmation.--infer-mappings with --source-format. Inference creates a server-side ingest pipeline that handles parsing (e.g., CSV processor). Using --source-format csv parses client-side as well, causing double-parsing and an empty index. Use --infer-mappings alone for automatic detection, or --source-format with explicit --mappings for manual control.--source-format csv with --mappings when you want client-side CSV parsing with known field types.--infer-mappings alone when you want Elasticsearch to detect the format, infer field types, and create an ingest pipeline automatically.Consider alternatives for:
Weekly Installs
157
Repository
GitHub Stars
89
First Seen
11 days ago
Security Audits
Gen Agent Trust HubPassSocketPassSnykFail
Installed on
cursor137
github-copilot127
opencode126
gemini-cli126
codex126
kimi-cli125
Apify Actor 输出模式生成工具 - 自动化创建 dataset_schema.json 与 output_schema.json
1,000 周安装
SuperDesign - AI 驱动设计助手:自动分析代码库,生成设计草稿与设计系统
3,200 周安装
Flutter原生视图与网页嵌入指南:Android/iOS平台视图集成与Web应用嵌入
3,300 周安装
Python资源管理:上下文管理器自动释放数据库连接、文件句柄和网络套接字
3,300 周安装
Tushare自然语言财经数据工具:股票行情、财务分析、宏观数据一键查询与导出
3,200 周安装
Vercel React 最佳实践指南:45条性能优化规则与代码示例
3,200 周安装
Ralphmode权限配置文件 - 跨平台自动化工作流安全解决方案
3,200 周安装