parallel-processing-patterns by josiahsiegel/claude-plugin-marketplace
npx skills add https://github.com/josiahsiegel/claude-plugin-marketplace --skill parallel-processing-patterns强制要求:在 Windows 上始终对文件路径使用反斜杠
在 Windows 上使用编辑或写入工具时,您必须在文件路径中使用反斜杠(\),而不是正斜杠(/)。
关于 bash 中并行和并发执行的综合指南,涵盖 GNU Parallel、xargs 并行化、作业控制、工作池以及用于实现最大性能的现代异步模式。
# Debian/Ubuntu
sudo apt-get install parallel
# macOS
brew install parallel
# 从源码安装
wget https://ftp.gnu.org/gnu/parallel/parallel-latest.tar.bz2
tar -xjf parallel-latest.tar.bz2
cd parallel-*
./configure && make && sudo make install
#!/usr/bin/env bash
set -euo pipefail
# 并行处理多个文件
parallel gzip ::: *.txt
# 等同于:
# for f in *.txt; do gzip "$f"; done
# 但这是并行运行的!
# 结合 find 使用 parallel
find . -name "*.jpg" | parallel convert {} -resize 50% resized/{}
# 指定作业数量
parallel -j 8 process_file ::: *.dat
# 从标准输入读取
cat urls.txt | parallel -j 10 wget -q
# 多个输入
parallel echo ::: A B C ::: 1 2 3
# 输出:A 1, A 2, A 3, B 1, B 2, B 3, C 1, C 2, C 3
# 使用 :::+ 进行配对输入
parallel echo ::: A B C :::+ 1 2 3
# 输出:A 1, B 2, C 3
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
#!/usr/bin/env bash
set -euo pipefail
# 从文件输入
parallel -a input.txt process_line
# 多个输入文件
parallel -a file1.txt -a file2.txt 'echo {1} {2}'
# 基于列的输入
cat data.tsv | parallel --colsep '\t' 'echo Name: {1}, Value: {2}'
# 命名列
cat data.csv | parallel --header : --colsep ',' 'echo {name}: {value}'
# 空字符分隔,安全处理特殊字符
find . -name "*.txt" -print0 | parallel -0 wc -l
# 基于行的分块
cat huge_file.txt | parallel --pipe -N1000 'wc -l'
#!/usr/bin/env bash
set -euo pipefail
# {} - 完整输入
parallel echo 'Processing: {}' ::: file1.txt file2.txt
# {.} - 移除扩展名
parallel echo '{.}' ::: file.txt file.csv
# 输出:file, file
# {/} - 基本文件名
parallel echo '{/}' ::: /path/to/file.txt
# 输出:file.txt
# {//} - 目录路径
parallel echo '{//}' ::: /path/to/file.txt
# 输出:/path/to
# {/.} - 不带扩展名的基本文件名
parallel echo '{/.}' ::: /path/to/file.txt
# 输出:file
# {#} - 作业编号(从 1 开始)
parallel echo 'Job {#}: {}' ::: A B C
# {%} - 槽位编号(循环使用的作业槽)
parallel -j 2 'echo "Slot {%}: {}"' ::: A B C D E
# 组合使用
parallel 'convert {} -resize 50% {//}/thumb_{/.}.jpg' ::: *.png
#!/usr/bin/env bash
set -euo pipefail
# 显示进度条
parallel --bar process_item ::: {1..100}
# 带预计完成时间的进度
parallel --progress process_item ::: {1..100}
# 详细输出
parallel --verbose gzip ::: *.txt
# 记录日志到文件
parallel --joblog jobs.log gzip ::: *.txt
# 从上次中断处恢复(跳过已完成的作业)
parallel --joblog jobs.log --resume gzip ::: *.txt
# 结果日志记录
parallel --results results_dir 'echo {1} + {2}' ::: 1 2 3 ::: 4 5 6
# 创建:results_dir/1/4/stdout, results_dir/1/4/stderr 等
#!/usr/bin/env bash
set -euo pipefail
# 基于 CPU 的并行化(核心数)
parallel -j "$(nproc)" process_item ::: {1..1000}
# 留出一些空闲核心
parallel -j '-2' process_item ::: {1..1000} # nproc - 2
# 核心百分比
parallel -j '50%' process_item ::: {1..1000}
# 基于负载的节流
parallel --load 80% process_item ::: {1..1000}
# 基于内存的节流
parallel --memfree 2G process_item ::: {1..1000}
# 速率限制(每秒最大作业数)
parallel -j 4 --delay 0.5 wget ::: url1 url2 url3 url4
# 每个作业的超时时间
parallel --timeout 60 long_process ::: {1..100}
# 重试失败的作业
parallel --retries 3 flaky_process ::: {1..100}
#!/usr/bin/env bash
set -euo pipefail
# 在多个服务器上运行
parallel --sshloginfile servers.txt process_item ::: {1..1000}
# servers.txt 格式:
# 4/server1.example.com (在 server1 上运行 4 个作业)
# 8/server2.example.com (在 server2 上运行 8 个作业)
# : (本地机器)
# 执行前传输文件
parallel --sshloginfile servers.txt --transferfile {} process {} ::: *.dat
# 返回结果
parallel --sshloginfile servers.txt --return {.}.result process {} ::: *.dat
# 传输后清理
parallel --sshloginfile servers.txt --transfer --return {.}.out --cleanup \
'process {} > {.}.out' ::: *.dat
# 环境变量
export MY_VAR="value"
parallel --env MY_VAR --sshloginfile servers.txt 'echo $MY_VAR' ::: A B C
#!/usr/bin/env bash
set -euo pipefail
# 管道模式 - 将标准输入分发到工作进程
cat huge_file.txt | parallel --pipe -N1000 'sort | uniq -c'
# 管道模式的块大小
cat data.bin | parallel --pipe --block 10M 'process_chunk'
# 保持输出顺序
parallel --keep-order 'sleep $((RANDOM % 3)); echo {}' ::: A B C D E
# 分组输出(不混合不同作业的输出)
parallel --group 'for i in 1 2 3; do echo "Job {}: line $i"; done' ::: A B C
# 用作业标识符标记输出
parallel --tag 'echo "output from {}"' ::: A B C
# 顺序输出(按完成顺序输出,但分组)
parallel --ungroup 'echo "Starting {}"; sleep 1; echo "Done {}"' ::: A B C
#!/usr/bin/env bash
set -euo pipefail
# -P 用于并行作业
find . -name "*.txt" | xargs -P 4 -I {} gzip {}
# -n 用于每个命令的项数
echo {1..100} | xargs -n 10 -P 4 echo "Batch:"
# 空字符分隔,安全处理
find . -name "*.txt" -print0 | xargs -0 -P 4 -I {} process {}
# 每个进程多个参数
cat urls.txt | xargs -P 10 -n 5 wget -q
# 限制最大总参数数
echo {1..1000} | xargs -P 4 --max-args=50 echo
#!/usr/bin/env bash
set -euo pipefail
# 使用 sh -c 执行复杂命令
find . -name "*.jpg" -print0 | \
xargs -0 -P 4 -I {} sh -c 'convert "$1" -resize 50% "thumb_$(basename "$1")"' _ {}
# 多个占位符
paste file1.txt file2.txt | \
xargs -P 4 -n 2 sh -c 'diff "$1" "$2" > "diff_$(basename "$1" .txt).patch"' _
# 批量处理
find . -name "*.log" -print0 | \
xargs -0 -P 4 -n 100 tar -czvf logs_batch.tar.gz
# 包含故障处理
find . -name "*.dat" -print0 | \
xargs -0 -P 4 -I {} sh -c 'process "$1" || echo "Failed: $1" >> failures.log' _ {}
#!/usr/bin/env bash
set -euo pipefail
# 跟踪后台作业
declare -a PIDS=()
# 启动作业
for item in {1..10}; do
process_item "$item" &
PIDS+=($!)
done
# 等待所有作业
for pid in "${PIDS[@]}"; do
wait "$pid"
done
echo "All jobs complete"
# 或者等待任意一个完成
wait -n # Bash 4.3+
echo "At least one job complete"
#!/usr/bin/env bash
set -euo pipefail
# 最大并发作业数
MAX_JOBS=4
# 使用计数器的简单信号量
job_count=0
run_with_limit() {
local cmd=("$@")
# 如果达到限制则等待
while ((job_count >= MAX_JOBS)); do
wait -n 2>/dev/null || true
((job_count--))
done
# 启动新作业
"${cmd[@]}" &
((job_count++))
}
# 用法
for item in {1..20}; do
run_with_limit process_item "$item"
done
# 等待剩余作业
wait
#!/usr/bin/env bash
set -euo pipefail
MAX_JOBS=4
JOB_FIFO="/tmp/job_pool_$$"
# 创建作业槽
mkfifo "$JOB_FIFO"
trap 'rm -f "$JOB_FIFO"' EXIT
# 初始化槽位
exec 3<>"$JOB_FIFO"
for ((i=0; i<MAX_JOBS; i++)); do
echo >&3
done
# 使用槽位运行
run_with_slot() {
local cmd=("$@")
read -u 3 # 获取槽位(如果没有可用槽位则阻塞)
{
"${cmd[@]}"
echo >&3 # 释放槽位
} &
}
# 用法
for item in {1..20}; do
run_with_slot process_item "$item"
done
wait
exec 3>&-
#!/usr/bin/env bash
set -euo pipefail
WORK_QUEUE="/tmp/work_queue_$$"
RESULT_QUEUE="/tmp/result_queue_$$"
NUM_WORKERS=4
mkfifo "$WORK_QUEUE" "$RESULT_QUEUE"
trap 'rm -f "$WORK_QUEUE" "$RESULT_QUEUE"' EXIT
# 工作函数
worker() {
local id="$1"
while read -r task; do
[[ "$task" == "STOP" ]] && break
# 处理任务
local result
result=$(process_task "$task" 2>&1)
echo "RESULT:$id:$task:$result"
done
}
# 启动工作进程
for ((i=0; i<NUM_WORKERS; i++)); do
worker "$i" < "$WORK_QUEUE" > "$RESULT_QUEUE" &
done
# 结果收集器(后台)
collect_results() {
while read -r line; do
[[ "$line" == "DONE" ]] && break
echo "$line" >> results.txt
done < "$RESULT_QUEUE"
} &
COLLECTOR_PID=$!
# 生产者 - 发送工作
{
for task in "${TASKS[@]}"; do
echo "$task"
done
# 向工作进程发送停止信号
for ((i=0; i<NUM_WORKERS; i++)); do
echo "STOP"
done
} > "$WORK_QUEUE"
# 发送结果结束信号
wait # 等待工作进程
echo "DONE" > "$RESULT_QUEUE"
wait "$COLLECTOR_PID"
#!/usr/bin/env bash
set -euo pipefail
# 异步函数包装器
async() {
local result_var="$1"
shift
local cmd=("$@")
# 为结果创建临时文件
local result_file
result_file=$(mktemp)
# 在后台运行,保存结果
{
if "${cmd[@]}" > "$result_file" 2>&1; then
echo "0" >> "$result_file.status"
else
echo "$?" >> "$result_file.status"
fi
} &
# 存储 PID 和结果文件位置
eval "${result_var}_pid=$!"
eval "${result_var}_file='$result_file'"
}
# 等待结果
await() {
local result_var="$1"
local pid_var="${result_var}_pid"
local file_var="${result_var}_file"
# 等待完成
wait "${!pid_var}"
# 获取结果
cat "${!file_var}"
local status
status=$(cat "${!file_var}.status")
# 清理
rm -f "${!file_var}" "${!file_var}.status"
return "$status"
}
# 用法
async result1 curl -s "https://api1.example.com/data"
async result2 curl -s "https://api2.example.com/data"
async result3 process_local_data
# 在此处执行其他工作...
# 获取结果(阻塞直到完成)
data1=$(await result1)
data2=$(await result2)
data3=$(await result3)
#!/usr/bin/env bash
set -euo pipefail
declare -A TASKS
declare -A TASK_RESULTS
TASK_COUNTER=0
# 注册异步任务
schedule() {
local cmd=("$@")
local task_id=$((++TASK_COUNTER))
local output_file="/tmp/task_${task_id}_$$"
"${cmd[@]}" > "$output_file" 2>&1 &
TASKS[$task_id]=$!
TASK_RESULTS[$task_id]="$output_file"
echo "$task_id"
}
# 检查任务是否完成
is_complete() {
local task_id="$1"
! kill -0 "${TASKS[$task_id]}" 2>/dev/null
}
# 获取任务结果
get_result() {
local task_id="$1"
wait "${TASKS[$task_id]}" 2>/dev/null || true
cat "${TASK_RESULTS[$task_id]}"
rm -f "${TASK_RESULTS[$task_id]}"
}
# 事件循环
run_event_loop() {
local pending=("${!TASKS[@]}")
while ((${#pending[@]} > 0)); do
local still_pending=()
for task_id in "${pending[@]}"; do
if is_complete "$task_id"; then
local result
result=$(get_result "$task_id")
on_task_complete "$task_id" "$result"
else
still_pending+=("$task_id")
fi
done
pending=("${still_pending[@]}")
# 短暂休眠以防止忙等待
((${#pending[@]} > 0)) && sleep 0.1
done
}
# 任务完成时的回调
on_task_complete() {
local task_id="$1"
local result="$2"
echo "Task $task_id complete: ${result:0:50}..."
}
#!/usr/bin/env bash
set -euo pipefail
# 扇出:分发工作
fan_out() {
local -n items="$1"
local workers="$2"
local worker_func="$3"
local chunk_size=$(( (${#items[@]} + workers - 1) / workers ))
local pids=()
for ((i=0; i<workers; i++)); do
local start=$((i * chunk_size))
local chunk=("${items[@]:start:chunk_size}")
if ((${#chunk[@]} > 0)); then
$worker_func "${chunk[@]}" &
pids+=($!)
fi
done
# 为 fan_in 返回 PID
echo "${pids[*]}"
}
# 扇入:收集结果
fan_in() {
local -a pids=($1)
local results=()
for pid in "${pids[@]}"; do
wait "$pid"
done
}
# 示例工作函数
process_chunk() {
local items=("$@")
for item in "${items[@]}"; do
echo "Processed: $item"
done
}
# 用法
data=({1..100})
pids=$(fan_out data 4 process_chunk)
fan_in "$pids"
#!/usr/bin/env bash
set -euo pipefail
# Map 函数
parallel_map() {
local -n input="$1"
local map_func="$2"
local workers="${3:-$(nproc)}"
printf '%s\n' "${input[@]}" | \
parallel -j "$workers" "$map_func"
}
# Reduce 函数
reduce() {
local reduce_func="$1"
local accumulator="$2"
while IFS= read -r value; do
accumulator=$($reduce_func "$accumulator" "$value")
done
echo "$accumulator"
}
# 示例:平方和
square() { echo $(($1 * $1)); }
add() { echo $(($1 + $2)); }
numbers=({1..100})
sum_of_squares=$(
parallel_map numbers square 4 | reduce add 0
)
echo "Sum of squares: $sum_of_squares"
# 词频统计示例
word_count_map() {
tr ' ' '\n' | sort | uniq -c
}
word_count_reduce() {
sort -k2 | awk '{
if ($2 == prev) { count += $1 }
else { if (prev) print count, prev; count = $1; prev = $2 }
} END { if (prev) print count, prev }'
}
cat large_text.txt | \
parallel --pipe -N1000 word_count_map | \
word_count_reduce
#!/usr/bin/env bash
set -euo pipefail
# 以最优批量大小处理
optimal_batch_process() {
local items=("$@")
local batch_size=100
local workers=$(nproc)
printf '%s\n' "${items[@]}" | \
parallel --pipe -N"$batch_size" -j"$workers" '
while IFS= read -r item; do
process_item "$item"
done
'
}
# 基于内存的动态批量大小调整
dynamic_batch() {
local mem_available
mem_available=$(free -m | awk '/^Mem:/ {print $7}')
# 根据可用内存调整批量大小
local batch_size=$((mem_available / 100)) # 每批 100MB
((batch_size < 10)) && batch_size=10
((batch_size > 1000)) && batch_size=1000
parallel --pipe -N"$batch_size" process_batch
}
#!/usr/bin/env bash
set -euo pipefail
# 对中间文件使用 tmpfs
setup_fast_temp() {
local tmpdir="/dev/shm/parallel_$$"
mkdir -p "$tmpdir"
trap 'rm -rf "$tmpdir"' EXIT
echo "$tmpdir"
}
# 缓冲 I/O 操作
buffered_parallel() {
local input="$1"
local tmpdir
tmpdir=$(setup_fast_temp)
# 将输入分割成块
split -l 1000 "$input" "$tmpdir/chunk_"
# 并行处理块
parallel process_chunk {} ::: "$tmpdir"/chunk_*
# 合并结果
cat "$tmpdir"/result_* > output.txt
}
# 使用进程替换避免磁盘 I/O
no_disk_parallel() {
# 替代方案:
# command > temp.txt
# parallel process ::: temp.txt
# rm temp.txt
# 这样做:
command | parallel --pipe process
}
#!/usr/bin/env bash
set -euo pipefail
# 将工作进程绑定到特定 CPU
cpu_pinned_parallel() {
local num_cpus
num_cpus=$(nproc)
for ((cpu=0; cpu<num_cpus; cpu++)); do
taskset -c "$cpu" process_worker "$cpu" &
done
wait
}
# NUMA 感知处理
numa_parallel() {
local num_nodes
num_nodes=$(numactl --hardware | grep "available:" | awk '{print $2}')
for ((node=0; node<num_nodes; node++)); do
numactl --cpunodebind="$node" --membind="$node" \
process_chunk "$node" &
done
wait
}
#!/usr/bin/env bash
set -euo pipefail
# 跟踪失败
declare -A FAILURES
parallel_with_retry() {
local max_retries=3
local items=("$@")
for item in "${items[@]}"; do
local retries=0
local success=false
while ((retries < max_retries)) && ! $success; do
if process_item "$item"; then
success=true
else
((retries++))
echo "Retry $retries for $item" >&2
sleep $((retries * 2)) # 指数退避
fi
done
if ! $success; then
FAILURES["$item"]="Failed after $max_retries retries"
fi
done &
wait
}
# 报告失败
report_failures() {
if ((${#FAILURES[@]} > 0)); then
echo "Failures:" >&2
for item in "${!FAILURES[@]}"; do
echo " $item: ${FAILURES[$item]}" >&2
done
return 1
fi
}
#!/usr/bin/env bash
set -euo pipefail
# 全局取消标志
CANCELLED=false
declare -a WORKER_PIDS=()
cancel_all() {
CANCELLED=true
for pid in "${WORKER_PIDS[@]}"; do
kill "$pid" 2>/dev/null || true
done
}
trap cancel_all SIGINT SIGTERM
cancellable_worker() {
local id="$1"
while ! $CANCELLED; do
# 检查是否有工作
if work=$(get_next_work); then
process_work "$work"
else
sleep 0.1
fi
done
}
# 启动工作进程
for ((i=0; i<NUM_WORKERS; i++)); do
cancellable_worker "$i" &
WORKER_PIDS+=($!)
done
# 支持中断的等待
wait || true
掌握并行处理,实现高效的多核利用和更快的脚本执行。
每周安装次数
79
仓库
GitHub 星标数
21
首次出现
2026年1月24日
安全审计
已安装于
opencode64
gemini-cli63
codex61
claude-code59
cursor58
github-copilot56
MANDATORY: Always Use Backslashes on Windows for File Paths
When using Edit or Write tools on Windows, you MUST use backslashes (\) in file paths, NOT forward slashes (/).
Comprehensive guide to parallel and concurrent execution in bash, covering GNU Parallel, xargs parallelization, job control, worker pools, and modern async patterns for maximum performance.
# Debian/Ubuntu
sudo apt-get install parallel
# macOS
brew install parallel
# From source
wget https://ftp.gnu.org/gnu/parallel/parallel-latest.tar.bz2
tar -xjf parallel-latest.tar.bz2
cd parallel-*
./configure && make && sudo make install
#!/usr/bin/env bash
set -euo pipefail
# Process multiple files in parallel
parallel gzip ::: *.txt
# Equivalent to:
# for f in *.txt; do gzip "$f"; done
# But runs in parallel!
# Using find with parallel
find . -name "*.jpg" | parallel convert {} -resize 50% resized/{}
# Specify number of jobs
parallel -j 8 process_file ::: *.dat
# From stdin
cat urls.txt | parallel -j 10 wget -q
# Multiple inputs
parallel echo ::: A B C ::: 1 2 3
# Output: A 1, A 2, A 3, B 1, B 2, B 3, C 1, C 2, C 3
# Paired inputs with :::+
parallel echo ::: A B C :::+ 1 2 3
# Output: A 1, B 2, C 3
#!/usr/bin/env bash
set -euo pipefail
# Input from file
parallel -a input.txt process_line
# Multiple input files
parallel -a file1.txt -a file2.txt 'echo {1} {2}'
# Column-based input
cat data.tsv | parallel --colsep '\t' 'echo Name: {1}, Value: {2}'
# Named columns
cat data.csv | parallel --header : --colsep ',' 'echo {name}: {value}'
# Null-delimited for safety with special characters
find . -name "*.txt" -print0 | parallel -0 wc -l
# Line-based chunking
cat huge_file.txt | parallel --pipe -N1000 'wc -l'
#!/usr/bin/env bash
set -euo pipefail
# {} - Full input
parallel echo 'Processing: {}' ::: file1.txt file2.txt
# {.} - Remove extension
parallel echo '{.}' ::: file.txt file.csv
# Output: file, file
# {/} - Basename
parallel echo '{/}' ::: /path/to/file.txt
# Output: file.txt
# {//} - Directory path
parallel echo '{//}' ::: /path/to/file.txt
# Output: /path/to
# {/.} - Basename without extension
parallel echo '{/.}' ::: /path/to/file.txt
# Output: file
# {#} - Job number (1-based)
parallel echo 'Job {#}: {}' ::: A B C
# {%} - Slot number (recycled job slot)
parallel -j 2 'echo "Slot {%}: {}"' ::: A B C D E
# Combined
parallel 'convert {} -resize 50% {//}/thumb_{/.}.jpg' ::: *.png
#!/usr/bin/env bash
set -euo pipefail
# Show progress bar
parallel --bar process_item ::: {1..100}
# Progress with ETA
parallel --progress process_item ::: {1..100}
# Verbose output
parallel --verbose gzip ::: *.txt
# Log to file
parallel --joblog jobs.log gzip ::: *.txt
# Resume from where it left off (skip completed jobs)
parallel --joblog jobs.log --resume gzip ::: *.txt
# Results logging
parallel --results results_dir 'echo {1} + {2}' ::: 1 2 3 ::: 4 5 6
# Creates: results_dir/1/4/stdout, results_dir/1/4/stderr, etc.
#!/usr/bin/env bash
set -euo pipefail
# CPU-based parallelism (number of cores)
parallel -j "$(nproc)" process_item ::: {1..1000}
# Leave some cores free
parallel -j '-2' process_item ::: {1..1000} # nproc - 2
# Percentage of cores
parallel -j '50%' process_item ::: {1..1000}
# Load-based throttling
parallel --load 80% process_item ::: {1..1000}
# Memory-based throttling
parallel --memfree 2G process_item ::: {1..1000}
# Rate limiting (max jobs per second)
parallel -j 4 --delay 0.5 wget ::: url1 url2 url3 url4
# Timeout per job
parallel --timeout 60 long_process ::: {1..100}
# Retry failed jobs
parallel --retries 3 flaky_process ::: {1..100}
#!/usr/bin/env bash
set -euo pipefail
# Run on multiple servers
parallel --sshloginfile servers.txt process_item ::: {1..1000}
# servers.txt format:
# 4/server1.example.com (4 jobs on server1)
# 8/server2.example.com (8 jobs on server2)
# : (local machine)
# Transfer files before execution
parallel --sshloginfile servers.txt --transferfile {} process {} ::: *.dat
# Return results
parallel --sshloginfile servers.txt --return {.}.result process {} ::: *.dat
# Cleanup after transfer
parallel --sshloginfile servers.txt --transfer --return {.}.out --cleanup \
'process {} > {.}.out' ::: *.dat
# Environment variables
export MY_VAR="value"
parallel --env MY_VAR --sshloginfile servers.txt 'echo $MY_VAR' ::: A B C
#!/usr/bin/env bash
set -euo pipefail
# Pipe mode - distribute stdin across workers
cat huge_file.txt | parallel --pipe -N1000 'sort | uniq -c'
# Block size for pipe mode
cat data.bin | parallel --pipe --block 10M 'process_chunk'
# Keep order of output
parallel --keep-order 'sleep $((RANDOM % 3)); echo {}' ::: A B C D E
# Group output (don't mix output from different jobs)
parallel --group 'for i in 1 2 3; do echo "Job {}: line $i"; done' ::: A B C
# Tag output with job identifier
parallel --tag 'echo "output from {}"' ::: A B C
# Sequence output (output as they complete, but grouped)
parallel --ungroup 'echo "Starting {}"; sleep 1; echo "Done {}"' ::: A B C
#!/usr/bin/env bash
set -euo pipefail
# -P for parallel jobs
find . -name "*.txt" | xargs -P 4 -I {} gzip {}
# -n for items per command
echo {1..100} | xargs -n 10 -P 4 echo "Batch:"
# Null-delimited for safety
find . -name "*.txt" -print0 | xargs -0 -P 4 -I {} process {}
# Multiple arguments per process
cat urls.txt | xargs -P 10 -n 5 wget -q
# Limit max total arguments
echo {1..1000} | xargs -P 4 --max-args=50 echo
#!/usr/bin/env bash
set -euo pipefail
# Use sh -c for complex commands
find . -name "*.jpg" -print0 | \
xargs -0 -P 4 -I {} sh -c 'convert "$1" -resize 50% "thumb_$(basename "$1")"' _ {}
# Multiple placeholders
paste file1.txt file2.txt | \
xargs -P 4 -n 2 sh -c 'diff "$1" "$2" > "diff_$(basename "$1" .txt).patch"' _
# Process in batches
find . -name "*.log" -print0 | \
xargs -0 -P 4 -n 100 tar -czvf logs_batch.tar.gz
# With failure handling
find . -name "*.dat" -print0 | \
xargs -0 -P 4 -I {} sh -c 'process "$1" || echo "Failed: $1" >> failures.log' _ {}
#!/usr/bin/env bash
set -euo pipefail
# Track background jobs
declare -a PIDS=()
# Start jobs
for item in {1..10}; do
process_item "$item" &
PIDS+=($!)
done
# Wait for all
for pid in "${PIDS[@]}"; do
wait "$pid"
done
echo "All jobs complete"
# Or wait for any to complete
wait -n # Bash 4.3+
echo "At least one job complete"
#!/usr/bin/env bash
set -euo pipefail
# Maximum concurrent jobs
MAX_JOBS=4
# Simple semaphore using a counter
job_count=0
run_with_limit() {
local cmd=("$@")
# Wait if at limit
while ((job_count >= MAX_JOBS)); do
wait -n 2>/dev/null || true
((job_count--))
done
# Start new job
"${cmd[@]}" &
((job_count++))
}
# Usage
for item in {1..20}; do
run_with_limit process_item "$item"
done
# Wait for remaining
wait
#!/usr/bin/env bash
set -euo pipefail
MAX_JOBS=4
JOB_FIFO="/tmp/job_pool_$$"
# Create job slots
mkfifo "$JOB_FIFO"
trap 'rm -f "$JOB_FIFO"' EXIT
# Initialize slots
exec 3<>"$JOB_FIFO"
for ((i=0; i<MAX_JOBS; i++)); do
echo >&3
done
# Run with slot
run_with_slot() {
local cmd=("$@")
read -u 3 # Acquire slot (blocks if none available)
{
"${cmd[@]}"
echo >&3 # Release slot
} &
}
# Usage
for item in {1..20}; do
run_with_slot process_item "$item"
done
wait
exec 3>&-
#!/usr/bin/env bash
set -euo pipefail
WORK_QUEUE="/tmp/work_queue_$$"
RESULT_QUEUE="/tmp/result_queue_$$"
NUM_WORKERS=4
mkfifo "$WORK_QUEUE" "$RESULT_QUEUE"
trap 'rm -f "$WORK_QUEUE" "$RESULT_QUEUE"' EXIT
# Worker function
worker() {
local id="$1"
while read -r task; do
[[ "$task" == "STOP" ]] && break
# Process task
local result
result=$(process_task "$task" 2>&1)
echo "RESULT:$id:$task:$result"
done
}
# Start workers
for ((i=0; i<NUM_WORKERS; i++)); do
worker "$i" < "$WORK_QUEUE" > "$RESULT_QUEUE" &
done
# Result collector (background)
collect_results() {
while read -r line; do
[[ "$line" == "DONE" ]] && break
echo "$line" >> results.txt
done < "$RESULT_QUEUE"
} &
COLLECTOR_PID=$!
# Producer - send work
{
for task in "${TASKS[@]}"; do
echo "$task"
done
# Stop signals for workers
for ((i=0; i<NUM_WORKERS; i++)); do
echo "STOP"
done
} > "$WORK_QUEUE"
# Signal end of results
wait # Wait for workers
echo "DONE" > "$RESULT_QUEUE"
wait "$COLLECTOR_PID"
#!/usr/bin/env bash
set -euo pipefail
# Async function wrapper
async() {
local result_var="$1"
shift
local cmd=("$@")
# Create temp file for result
local result_file
result_file=$(mktemp)
# Run in background, save result
{
if "${cmd[@]}" > "$result_file" 2>&1; then
echo "0" >> "$result_file.status"
else
echo "$?" >> "$result_file.status"
fi
} &
# Store PID and result file location
eval "${result_var}_pid=$!"
eval "${result_var}_file='$result_file'"
}
# Await result
await() {
local result_var="$1"
local pid_var="${result_var}_pid"
local file_var="${result_var}_file"
# Wait for completion
wait "${!pid_var}"
# Get result
cat "${!file_var}"
local status
status=$(cat "${!file_var}.status")
# Cleanup
rm -f "${!file_var}" "${!file_var}.status"
return "$status"
}
# Usage
async result1 curl -s "https://api1.example.com/data"
async result2 curl -s "https://api2.example.com/data"
async result3 process_local_data
# Do other work here...
# Get results (blocks until complete)
data1=$(await result1)
data2=$(await result2)
data3=$(await result3)
#!/usr/bin/env bash
set -euo pipefail
declare -A TASKS
declare -A TASK_RESULTS
TASK_COUNTER=0
# Register async task
schedule() {
local cmd=("$@")
local task_id=$((++TASK_COUNTER))
local output_file="/tmp/task_${task_id}_$$"
"${cmd[@]}" > "$output_file" 2>&1 &
TASKS[$task_id]=$!
TASK_RESULTS[$task_id]="$output_file"
echo "$task_id"
}
# Check if task complete
is_complete() {
local task_id="$1"
! kill -0 "${TASKS[$task_id]}" 2>/dev/null
}
# Get task result
get_result() {
local task_id="$1"
wait "${TASKS[$task_id]}" 2>/dev/null || true
cat "${TASK_RESULTS[$task_id]}"
rm -f "${TASK_RESULTS[$task_id]}"
}
# Event loop
run_event_loop() {
local pending=("${!TASKS[@]}")
while ((${#pending[@]} > 0)); do
local still_pending=()
for task_id in "${pending[@]}"; do
if is_complete "$task_id"; then
local result
result=$(get_result "$task_id")
on_task_complete "$task_id" "$result"
else
still_pending+=("$task_id")
fi
done
pending=("${still_pending[@]}")
# Small sleep to prevent busy-waiting
((${#pending[@]} > 0)) && sleep 0.1
done
}
# Callback for completed tasks
on_task_complete() {
local task_id="$1"
local result="$2"
echo "Task $task_id complete: ${result:0:50}..."
}
#!/usr/bin/env bash
set -euo pipefail
# Fan-out: distribute work
fan_out() {
local -n items="$1"
local workers="$2"
local worker_func="$3"
local chunk_size=$(( (${#items[@]} + workers - 1) / workers ))
local pids=()
for ((i=0; i<workers; i++)); do
local start=$((i * chunk_size))
local chunk=("${items[@]:start:chunk_size}")
if ((${#chunk[@]} > 0)); then
$worker_func "${chunk[@]}" &
pids+=($!)
fi
done
# Return PIDs for fan_in
echo "${pids[*]}"
}
# Fan-in: collect results
fan_in() {
local -a pids=($1)
local results=()
for pid in "${pids[@]}"; do
wait "$pid"
done
}
# Example worker
process_chunk() {
local items=("$@")
for item in "${items[@]}"; do
echo "Processed: $item"
done
}
# Usage
data=({1..100})
pids=$(fan_out data 4 process_chunk)
fan_in "$pids"
#!/usr/bin/env bash
set -euo pipefail
# Map function
parallel_map() {
local -n input="$1"
local map_func="$2"
local workers="${3:-$(nproc)}"
printf '%s\n' "${input[@]}" | \
parallel -j "$workers" "$map_func"
}
# Reduce function
reduce() {
local reduce_func="$1"
local accumulator="$2"
while IFS= read -r value; do
accumulator=$($reduce_func "$accumulator" "$value")
done
echo "$accumulator"
}
# Example: Sum of squares
square() { echo $(($1 * $1)); }
add() { echo $(($1 + $2)); }
numbers=({1..100})
sum_of_squares=$(
parallel_map numbers square 4 | reduce add 0
)
echo "Sum of squares: $sum_of_squares"
# Word count example
word_count_map() {
tr ' ' '\n' | sort | uniq -c
}
word_count_reduce() {
sort -k2 | awk '{
if ($2 == prev) { count += $1 }
else { if (prev) print count, prev; count = $1; prev = $2 }
} END { if (prev) print count, prev }'
}
cat large_text.txt | \
parallel --pipe -N1000 word_count_map | \
word_count_reduce
#!/usr/bin/env bash
set -euo pipefail
# Process in optimal batch sizes
optimal_batch_process() {
local items=("$@")
local batch_size=100
local workers=$(nproc)
printf '%s\n' "${items[@]}" | \
parallel --pipe -N"$batch_size" -j"$workers" '
while IFS= read -r item; do
process_item "$item"
done
'
}
# Dynamic batch sizing based on memory
dynamic_batch() {
local mem_available
mem_available=$(free -m | awk '/^Mem:/ {print $7}')
# Adjust batch size based on available memory
local batch_size=$((mem_available / 100)) # 100MB per batch
((batch_size < 10)) && batch_size=10
((batch_size > 1000)) && batch_size=1000
parallel --pipe -N"$batch_size" process_batch
}
#!/usr/bin/env bash
set -euo pipefail
# Use tmpfs for intermediate files
setup_fast_temp() {
local tmpdir="/dev/shm/parallel_$$"
mkdir -p "$tmpdir"
trap 'rm -rf "$tmpdir"' EXIT
echo "$tmpdir"
}
# Buffer I/O operations
buffered_parallel() {
local input="$1"
local tmpdir
tmpdir=$(setup_fast_temp)
# Split input into chunks
split -l 1000 "$input" "$tmpdir/chunk_"
# Process chunks in parallel
parallel process_chunk {} ::: "$tmpdir"/chunk_*
# Combine results
cat "$tmpdir"/result_* > output.txt
}
# Avoid disk I/O with process substitution
no_disk_parallel() {
# Instead of:
# command > temp.txt
# parallel process ::: temp.txt
# rm temp.txt
# Do this:
command | parallel --pipe process
}
#!/usr/bin/env bash
set -euo pipefail
# Pin workers to specific CPUs
cpu_pinned_parallel() {
local num_cpus
num_cpus=$(nproc)
for ((cpu=0; cpu<num_cpus; cpu++)); do
taskset -c "$cpu" process_worker "$cpu" &
done
wait
}
# NUMA-aware processing
numa_parallel() {
local num_nodes
num_nodes=$(numactl --hardware | grep "available:" | awk '{print $2}')
for ((node=0; node<num_nodes; node++)); do
numactl --cpunodebind="$node" --membind="$node" \
process_chunk "$node" &
done
wait
}
#!/usr/bin/env bash
set -euo pipefail
# Track failures
declare -A FAILURES
parallel_with_retry() {
local max_retries=3
local items=("$@")
for item in "${items[@]}"; do
local retries=0
local success=false
while ((retries < max_retries)) && ! $success; do
if process_item "$item"; then
success=true
else
((retries++))
echo "Retry $retries for $item" >&2
sleep $((retries * 2)) # Exponential backoff
fi
done
if ! $success; then
FAILURES["$item"]="Failed after $max_retries retries"
fi
done &
wait
}
# Report failures
report_failures() {
if ((${#FAILURES[@]} > 0)); then
echo "Failures:" >&2
for item in "${!FAILURES[@]}"; do
echo " $item: ${FAILURES[$item]}" >&2
done
return 1
fi
}
#!/usr/bin/env bash
set -euo pipefail
# Global cancellation flag
CANCELLED=false
declare -a WORKER_PIDS=()
cancel_all() {
CANCELLED=true
for pid in "${WORKER_PIDS[@]}"; do
kill "$pid" 2>/dev/null || true
done
}
trap cancel_all SIGINT SIGTERM
cancellable_worker() {
local id="$1"
while ! $CANCELLED; do
# Check for work
if work=$(get_next_work); then
process_work "$work"
else
sleep 0.1
fi
done
}
# Start workers
for ((i=0; i<NUM_WORKERS; i++)); do
cancellable_worker "$i" &
WORKER_PIDS+=($!)
done
# Wait with interrupt support
wait || true
Master parallel processing for efficient multi-core utilization and faster script execution.
Weekly Installs
79
Repository
GitHub Stars
21
First Seen
Jan 24, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
opencode64
gemini-cli63
codex61
claude-code59
cursor58
github-copilot56
Skills CLI 使用指南:AI Agent 技能包管理器安装与管理教程
44,900 周安装
Google Docs API 技能:自动化文档创建、编辑与管理 | 集成指南
138 周安装
Java 21+ 专家技能:现代 Java 开发、Spring Boot 3.x、JVM 性能优化与云原生架构
138 周安装
React 前端开发专家技能:最佳实践、性能优化与 TypeScript 集成指南
141 周安装
Boss直聘职位搜索技能 - AI自动化爬取招聘信息,支持薪资、公司规模筛选
146 周安装
Tailwind CSS UI重构指南:基于《重构UI》的52条最佳实践与代码规范
140 周安装
Conductor 状态监控工具 - 实时查看项目进度、轨道任务与阻塞项
144 周安装