Medical Imaging Pipelines by aurabx/skills
npx skills add https://github.com/aurabx/skills --skill 'Medical Imaging Pipelines'生成端到端医学影像数据处理流程的代码:包括数据导入、格式转换、预处理、数据集准备和导出。涵盖从原始 DICOM 文件到机器学习就绪数据集、研究导出和自动化处理工作流的完整路径。
Python 3.8+
核心库:pydicom、numpy
格式转换:SimpleITK 或 nibabel(用于 NIfTI)、pillow(用于 PNG/JPEG)
机器学习准备:scikit-image、scipy
可选:h5py(HDF5)、(元数据)、(进度条)
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
pandastqdmpip install pydicom numpy SimpleITK nibabel pillow scikit-image scipy h5py pandas tqdm pylibjpeg pylibjpeg-libjpeg
from pathlib import Path
import pydicom
import numpy as np
from PIL import Image
def dicom_dir_to_pngs(input_dir: str, output_dir: str):
"""将 DICOM 文件目录转换为 PNG 图像。"""
input_path = Path(input_dir)
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
for dcm_file in sorted(input_path.rglob("*")):
if dcm_file.is_dir():
continue
try:
ds = pydicom.dcmread(str(dcm_file))
pixels = ds.pixel_array.astype(float)
# 归一化到 0-255 范围
if pixels.max() != pixels.min():
pixels = (pixels - pixels.min()) / (pixels.max() - pixels.min()) * 255
pixels = pixels.astype(np.uint8)
img = Image.fromarray(pixels)
img.save(output_path / f"{dcm_file.stem}.png")
except Exception as e:
print(f"Skipped {dcm_file.name}: {e}")
dicom_dir_to_pngs("/data/dicom_study", "/data/png_output")
NIfTI 是神经影像研究(脑部 MRI、fMRI)的标准格式。使用 SimpleITK 进行稳健的转换,可保留空间元数据。
import SimpleITK as sitk
from pathlib import Path
def dicom_series_to_nifti(dicom_dir: str, output_path: str):
"""将 DICOM 序列转换为 NIfTI 文件。
参数:
dicom_dir: 包含单个序列 DICOM 文件的目录
output_path: 输出的 .nii.gz 文件路径
"""
reader = sitk.ImageSeriesReader()
dicom_files = reader.GetGDCMSeriesFileNames(dicom_dir)
if not dicom_files:
raise ValueError(f"No DICOM series found in {dicom_dir}")
reader.SetFileNames(dicom_files)
reader.MetaDataDictionaryArrayUpdateOn()
reader.LoadPrivateTagsOn()
image = reader.Execute()
# 保留方向和间距
sitk.WriteImage(image, output_path)
print(f"Written: {output_path}")
print(f" Size: {image.GetSize()}")
print(f" Spacing: {image.GetSpacing()}")
print(f" Origin: {image.GetOrigin()}")
print(f" Direction: {image.GetDirection()}")
def dicom_dir_to_nifti_all_series(dicom_dir: str, output_dir: str):
"""将目录中的所有序列转换为单独的 NIfTI 文件。"""
reader = sitk.ImageSeriesReader()
series_ids = reader.GetGDCMSeriesIDs(dicom_dir)
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
for idx, series_id in enumerate(series_ids):
dicom_files = reader.GetGDCMSeriesFileNames(dicom_dir,
series_id)
reader.SetFileNames(dicom_files)
image = reader.Execute()
nifti_path = output_path / f"series_{idx:03d}_{series_id[:8]}.nii.gz"
sitk.WriteImage(image, str(nifti_path))
print(f"Series {idx}: {len(dicom_files)} files -> {nifti_path.name}")
import nibabel as nib
import pydicom
import numpy as np
from pathlib import Path
def dicom_to_nifti_manual(dicom_dir: str, output_path: str):
"""手动进行 DICOM 到 NIfTI 转换,提供完全控制。"""
# 读取并排序切片
slices = []
for f in Path(dicom_dir).glob("*"):
try:
ds = pydicom.dcmread(str(f))
slices.append(ds)
except:
continue
# 按 ImagePositionPatient Z 坐标排序
slices.sort(key=lambda s: float(s.ImagePositionPatient[2]))
# 构建 3D 体数据
volume = np.stack([s.pixel_array for s in slices], axis=-1)
# 如果是 CT,应用重缩放
if hasattr(slices[0], "RescaleSlope"):
volume = volume * slices[0].RescaleSlope + slices[0].RescaleIntercept
# 从 DICOM 头信息构建仿射矩阵
ds = slices[0]
pos = [float(x) for x in ds.ImagePositionPatient]
orient = [float(x) for x in ds.ImageOrientationPatient]
spacing = [float(x) for x in ds.PixelSpacing]
# 根据前两个切片计算切片间距
if len(slices) > 1:
pos2 = [float(x) for x in slices[1].ImagePositionPatient]
slice_spacing = np.sqrt(sum((a - b) ** 2 for a, b in zip(pos, pos2)))
else:
slice_spacing = float(getattr(ds, "SliceThickness", 1.0))
# 行和列的方向余弦
row_cosine = np.array(orient[:3])
col_cosine = np.array(orient[3:])
slice_cosine = np.cross(row_cosine, col_cosine)
# 构建 4x4 仿射矩阵
affine = np.eye(4)
affine[:3, 0] = row_cosine * spacing[1]
affine[:3, 1] = col_cosine * spacing[0]
affine[:3, 2] = slice_cosine * slice_spacing
affine[:3, 3] = pos
nifti_img = nib.Nifti1Image(volume.astype(np.float32), affine)
nib.save(nifti_img, output_path)
import pydicom
import numpy as np
from PIL import Image
from pathlib import Path
def dicom_to_png(dicom_path: str, output_path: str,
window_center: float = None,
window_width: float = None):
"""将单个 DICOM 文件转换为 PNG,可选窗宽窗位调整。"""
ds = pydicom.dcmread(dicom_path)
pixels = ds.pixel_array.astype(np.float64)
# 应用重缩放斜率/截距(CT 亨氏单位)
slope = float(getattr(ds, "RescaleSlope", 1))
intercept = float(getattr(ds, "RescaleIntercept", 0))
pixels = pixels * slope + intercept
# 应用窗宽/窗位
if window_center is None:
window_center = float(getattr(ds, "WindowCenter",
(pixels.max() + pixels.min()) / 2))
if isinstance(window_center, pydicom.multival.MultiValue):
window_center = float(window_center[0])
if window_width is None:
window_width = float(getattr(ds, "WindowWidth",
pixels.max() - pixels.min()))
if isinstance(window_width, pydicom.multival.MultiValue):
window_width = float(window_width[0])
img_min = window_center - window_width / 2
img_max = window_center + window_width / 2
pixels = np.clip(pixels, img_min, img_max)
pixels = ((pixels - img_min) / (img_max - img_min) * 255).astype(np.uint8)
# 处理光度解释
if getattr(ds, "PhotometricInterpretation", "") == "MONOCHROME1":
pixels = 255 - pixels # 反转
img = Image.fromarray(pixels)
img.save(output_path)
import h5py
import pydicom
import numpy as np
from pathlib import Path
from tqdm import tqdm
def build_hdf5_dataset(dicom_dirs: dict[str, str], output_path: str,
include_metadata: bool = True):
"""
从 DICOM 目录构建 HDF5 数据集。
参数:
dicom_dirs: 映射关系 {标签: dicom_目录路径}
例如:{"train/positive": "/data/pos", "train/negative": "/data/neg"}
output_path: 输出 .h5 文件的路径
include_metadata: 是否在像素数据旁存储 DICOM 元数据
"""
with h5py.File(output_path, "w") as hf:
for label, dicom_dir in dicom_dirs.items():
group = hf.create_group(label)
files = sorted(Path(dicom_dir).rglob("*.dcm"))
# 根据第一个文件确定形状
ds = pydicom.dcmread(str(files[0]))
rows, cols = ds.Rows, ds.Columns
# 预分配数据集
images = group.create_dataset(
"images",
shape=(len(files), rows, cols),
dtype=np.float32,
chunks=(1, rows, cols),
compression="gzip",
compression_opts=4,
)
if include_metadata:
metadata_items = []
for i, f in enumerate(tqdm(files, desc=label)):
ds = pydicom.dcmread(str(f))
pixels = ds.pixel_array.astype(np.float32)
# 重缩放
slope = float(getattr(ds, "RescaleSlope", 1))
intercept = float(getattr(ds, "RescaleIntercept", 0))
pixels = pixels * slope + intercept
images[i] = pixels
if include_metadata:
metadata_items.append({
"file": f.name,
"patient_id": str(getattr(ds, "PatientID", "")),
"modality": str(getattr(ds, "Modality", "")),
"study_date": str(getattr(ds, "StudyDate", "")),
"spacing": [float(x) for x in getattr(ds, "PixelSpacing", [1, 1])],
})
if include_metadata and metadata_items:
import json
group.attrs["metadata"] = json.dumps(metadata_items)
print(f"Dataset saved: {output_path}")
print(f" Groups: {list(dicom_dirs.keys())}")
def normalize_ct(volume: np.ndarray,
hu_min: float = -1000,
hu_max: float = 400) -> np.ndarray:
"""将 CT 亨氏单位裁剪并归一化到 [0, 1] 范围。"""
volume = np.clip(volume, hu_min, hu_max)
volume = (volume - hu_min) / (hu_max - hu_min)
return volume.astype(np.float32)
def normalize_mri(volume: np.ndarray,
percentile_lower: float = 1,
percentile_upper: float = 99) -> np.ndarray:
"""基于百分位数的 MRI 归一化(处理强度不均匀性)。"""
p_low = np.percentile(volume[volume > 0], percentile_lower)
p_high = np.percentile(volume[volume > 0], percentile_upper)
volume = np.clip(volume, p_low, p_high)
volume = (volume - p_low) / (p_high - p_low)
return volume.astype(np.float32)
def z_score_normalize(volume: np.ndarray,
mask: np.ndarray = None) -> np.ndarray:
"""Z 分数归一化(零均值,单位方差)。"""
if mask is not None:
mean = volume[mask > 0].mean()
std = volume[mask > 0].std()
else:
mean = volume.mean()
std = volume.std()
return ((volume - mean) / (std + 1e-8)).astype(np.float32)
import SimpleITK as sitk
def resample_volume(image: sitk.Image,
new_spacing: tuple = (1.0, 1.0, 1.0),
interpolator=sitk.sitkLinear) -> sitk.Image:
"""将 3D 体数据重采样为各向同性间距。"""
original_spacing = image.GetSpacing()
original_size = image.GetSize()
new_size = [
int(round(osz * ospc / nspc))
for osz, ospc, nspc in zip(original_size, original_spacing, new_spacing)
]
resampler = sitk.ResampleImageFilter()
resampler.SetOutputSpacing(new_spacing)
resampler.SetSize(new_size)
resampler.SetOutputDirection(image.GetDirection())
resampler.SetOutputOrigin(image.GetOrigin())
resampler.SetInterpolator(interpolator)
resampler.SetDefaultPixelValue(0)
return resampler.Execute(image)
def resize_2d(pixels: np.ndarray, target_size: tuple = (256, 256)) -> np.ndarray:
"""使用 PIL 调整 2D 图像大小。"""
from PIL import Image
img = Image.fromarray(pixels)
img = img.resize(target_size, Image.Resampling.BILINEAR)
return np.array(img)
def center_crop_3d(volume: np.ndarray, crop_size: tuple) -> np.ndarray:
"""对 3D 体数据进行中心裁剪。"""
d, h, w = volume.shape
cd, ch, cw = crop_size
d_start = max(0, (d - cd) // 2)
h_start = max(0, (h - ch) // 2)
w_start = max(0, (w - cw) // 2)
return volume[d_start:d_start+cd, h_start:h_start+ch, w_start:w_start+cw]
def pad_to_size(volume: np.ndarray, target_size: tuple,
pad_value: float = 0) -> np.ndarray:
"""将体数据填充到目标大小。"""
pad_widths = []
for current, target in zip(volume.shape, target_size):
total_pad = max(0, target - current)
pad_before = total_pad // 2
pad_after = total_pad - pad_before
pad_widths.append((pad_before, pad_after))
return np.pad(volume, pad_widths, mode="constant",
constant_values=pad_value)
import pandas as pd
import pydicom
from pathlib import Path
from tqdm import tqdm
def build_manifest(dicom_dir: str, output_csv: str = "manifest.csv") -> pd.DataFrame:
"""
扫描 DICOM 目录并构建元数据清单。
返回一个 DataFrame,每行对应一个 DICOM 文件。
"""
records = []
dicom_path = Path(dicom_dir)
for f in tqdm(list(dicom_path.rglob("*")), desc="Scanning"):
if f.is_dir():
continue
try:
ds = pydicom.dcmread(str(f), stop_before_pixels=True)
records.append({
"filepath": str(f.relative_to(dicom_path)),
"patient_id": str(getattr(ds, "PatientID", "")),
"patient_name": str(getattr(ds, "PatientName", "")),
"study_uid": str(getattr(ds, "StudyInstanceUID", "")),
"series_uid": str(getattr(ds, "SeriesInstanceUID", "")),
"sop_uid": str(getattr(ds, "SOPInstanceUID", "")),
"modality": str(getattr(ds, "Modality", "")),
"study_date": str(getattr(ds, "StudyDate", "")),
"study_description": str(getattr(ds, "StudyDescription", "")),
"series_description": str(getattr(ds, "SeriesDescription", "")),
"instance_number": int(getattr(ds, "InstanceNumber", 0)),
"rows": int(getattr(ds, "Rows", 0)),
"columns": int(getattr(ds, "Columns", 0)),
"pixel_spacing": str(getattr(ds, "PixelSpacing", "")),
"slice_thickness": str(getattr(ds, "SliceThickness", "")),
"manufacturer": str(getattr(ds, "Manufacturer", "")),
})
except Exception:
continue
df = pd.DataFrame(records)
df.to_csv(output_csv, index=False)
print(f"Manifest: {len(df)} files, {df['study_uid'].nunique()} studies, "
f"{df['patient_id'].nunique()} patients")
return df
def dataset_statistics(manifest: pd.DataFrame):
"""打印 DICOM 数据集的汇总统计信息。"""
print(f"Total files: {len(manifest)}")
print(f"Patients: {manifest['patient_id'].nunique()}")
print(f"Studies: {manifest['study_uid'].nunique()}")
print(f"Series: {manifest['series_uid'].nunique()}")
print(f"\nModality distribution:")
print(manifest['modality'].value_counts().to_string())
print(f"\nDate range: {manifest['study_date'].min()} - {manifest['study_date'].max()}")
print(f"\nImage sizes:")
print(f" Rows: {manifest['rows'].min()}-{manifest['rows'].max()}")
print(f" Cols: {manifest['columns'].min()}-{manifest['columns'].max()}")
print(f"\nManufacturers:")
print(manifest['manufacturer'].value_counts().to_string())
"""
端到端流程:DICOM CT 研究 -> 去标识化、归一化的 NIfTI 体数据
"""
from pathlib import Path
import pydicom
import numpy as np
import SimpleITK as sitk
from tqdm import tqdm
import json
class CTResearchPipeline:
"""用于处理研究用 CT DICOM 数据的流程。"""
def __init__(self, output_dir: str, target_spacing: tuple = (1.0, 1.0, 1.0),
hu_window: tuple = (-1000, 400)):
self.output_dir = Path(output_dir)
self.target_spacing = target_spacing
self.hu_min, self.hu_max = hu_window
self.manifest = []
# 创建输出结构
(self.output_dir / "volumes").mkdir(parents=True, exist_ok=True)
(self.output_dir / "metadata").mkdir(parents=True, exist_ok=True)
def process_study(self, dicom_dir: str, subject_id: str):
"""处理单个 DICOM 研究目录。"""
print(f"Processing {subject_id}...")
# 步骤 1:读取 DICOM 序列
reader = sitk.ImageSeriesReader()
series_ids = reader.GetGDCMSeriesIDs(dicom_dir)
if not series_ids:
print(f" No DICOM series found in {dicom_dir}")
return
for series_idx, series_id in enumerate(series_ids):
file_names = reader.GetGDCMSeriesFileNames(dicom_dir, series_id)
reader.SetFileNames(file_names)
reader.MetaDataDictionaryArrayUpdateOn()
image = reader.Execute()
# 步骤 2:重采样到目标间距
image = self._resample(image)
# 步骤 3:转换为 numpy 数组以进行归一化
volume = sitk.GetArrayFromImage(image) # 形状:(Z, Y, X)
# 步骤 4:归一化 HU 值
volume = np.clip(volume, self.hu_min, self.hu_max)
volume = ((volume - self.hu_min) /
(self.hu_max - self.hu_min)).astype(np.float32)
# 步骤 5:保存为 NIfTI
output_image = sitk.GetImageFromArray(volume)
output_image.CopyInformation(image)
nifti_name = f"{subject_id}_series{series_idx:02d}.nii.gz"
nifti_path = self.output_dir / "volumes" / nifti_name
sitk.WriteImage(output_image, str(nifti_path))
# 步骤 6:保存元数据
metadata = self._extract_metadata(reader, file_names[0],
subject_id, series_idx)
metadata["output_file"] = nifti_name
metadata["volume_shape"] = list(volume.shape)
metadata["spacing"] = list(image.GetSpacing())
meta_path = (self.output_dir / "metadata" /
f"{subject_id}_series{series_idx:02d}.json")
with open(meta_path, "w") as f:
json.dump(metadata, f, indent=2)
self.manifest.append(metadata)
print(f" Series {series_idx}: {volume.shape} -> {nifti_name}")
def _resample(self, image: sitk.Image) -> sitk.Image:
original_spacing = image.GetSpacing()
original_size = image.GetSize()
new_size = [
int(round(osz * ospc / nspc))
for osz, ospc, nspc in zip(original_size, original_spacing,
self.target_spacing)
]
resampler = sitk.ResampleImageFilter()
resampler.SetOutputSpacing(self.target_spacing)
resampler.SetSize(new_size)
resampler.SetOutputDirection(image.GetDirection())
resampler.SetOutputOrigin(image.GetOrigin())
resampler.SetInterpolator(sitk.sitkLinear)
resampler.SetDefaultPixelValue(-1000)
return resampler.Execute(image)
def _extract_metadata(self, reader, first_file: str,
subject_id: str, series_idx: int) -> dict:
ds = pydicom.dcmread(first_file, stop_before_pixels=True)
return {
"subject_id": subject_id,
"series_index": series_idx,
"modality": str(getattr(ds, "Modality", "")),
"study_description": str(getattr(ds, "StudyDescription", "")),
"series_description": str(getattr(ds, "SeriesDescription", "")),
"manufacturer": str(getattr(ds, "Manufacturer", "")),
"slice_thickness": float(getattr(ds, "SliceThickness", 0)),
"kvp": float(getattr(ds, "KVP", 0)),
}
def save_manifest(self):
manifest_path = self.output_dir / "manifest.json"
with open(manifest_path, "w") as f:
json.dump(self.manifest, f, indent=2)
print(f"\nManifest saved: {manifest_path}")
print(f"Total subjects: {len(set(m['subject_id'] for m in self.manifest))}")
print(f"Total volumes: {len(self.manifest)}")
# 用法
pipeline = CTResearchPipeline(
output_dir="/data/processed",
target_spacing=(1.0, 1.0, 1.0),
hu_window=(-1000, 400),
)
# 处理每个研究
study_dirs = {
"SUBJ-001": "/data/raw/patient_001",
"SUBJ-002": "/data/raw/patient_002",
"SUBJ-003": "/data/raw/patient_003",
}
for subject_id, dicom_dir in study_dirs.items():
pipeline.process_study(dicom_dir, subject_id)
pipeline.save_manifest()
import torch
from torch.utils.data import Dataset
import nibabel as nib
import numpy as np
import json
from pathlib import Path
class MedicalImageDataset(Dataset):
"""用于处理后的医学影像体数据的 PyTorch 数据集。"""
def __init__(self, data_dir: str, transform=None,
target_key: str = None):
self.data_dir = Path(data_dir)
self.transform = transform
# 加载清单
with open(self.data_dir / "manifest.json") as f:
self.manifest = json.load(f)
# 可选:加载标签
self.target_key = target_key
def __len__(self):
return len(self.manifest)
def __getitem__(self, idx):
entry = self.manifest[idx]
nifti_path = self.data_dir / "volumes" / entry["output_file"]
# 加载体数据
img = nib.load(str(nifti_path))
volume = img.get_fdata().astype(np.float32)
# 添加通道维度:(D, H, W) -> (1, D, H, W)
volume = np.expand_dims(volume, axis=0)
if self.transform:
volume = self.transform(volume)
sample = {
"image": torch.from_numpy(volume),
"subject_id": entry["subject_id"],
"metadata": entry,
}
if self.target_key and self.target_key in entry:
sample["label"] = entry[self.target_key]
return sample
# 用法
dataset = MedicalImageDataset("/data/processed")
dataloader = torch.utils.data.DataLoader(
dataset, batch_size=4, shuffle=True, num_workers=2,
)
for batch in dataloader:
images = batch["image"] # (B, 1, D, H, W)
print(f"Batch shape: {images.shape}")
break
ImagePositionPatient[2] 或 InstanceNumber 对切片进行排序。未排序的切片会产生混乱的体数据。PixelSpacing 是平面内的(行,列)。切片间距必须根据 ImagePositionPatient 的差异或 SpacingBetweenSlices(不太可靠)计算。pixel * slope + intercept 来获取亨氏单位。pylibjpeg 包,否则 pixel_array 会出现难以理解的错误。每周安装量
0
代码仓库
GitHub 星标数
5
首次出现
1970年1月1日
安全审计
Generates code for end-to-end medical imaging data pipelines: ingestion, format conversion, preprocessing, dataset preparation, and export. Covers the common path from raw DICOM files to ML-ready datasets, research exports, and automated processing workflows.
Python 3.8+
Core: pydicom, numpy
Conversion: SimpleITK or nibabel (for NIfTI), pillow (for PNG/JPEG)
ML prep: scikit-image, scipy
Optional: h5py (HDF5), pandas (metadata), tqdm (progress bars)
pip install pydicom numpy SimpleITK nibabel pillow scikit-image scipy h5py pandas tqdm pylibjpeg pylibjpeg-libjpeg
from pathlib import Path
import pydicom
import numpy as np
from PIL import Image
def dicom_dir_to_pngs(input_dir: str, output_dir: str):
"""Convert a directory of DICOM files to PNG images."""
input_path = Path(input_dir)
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
for dcm_file in sorted(input_path.rglob("*")):
if dcm_file.is_dir():
continue
try:
ds = pydicom.dcmread(str(dcm_file))
pixels = ds.pixel_array.astype(float)
# Normalize to 0-255
if pixels.max() != pixels.min():
pixels = (pixels - pixels.min()) / (pixels.max() - pixels.min()) * 255
pixels = pixels.astype(np.uint8)
img = Image.fromarray(pixels)
img.save(output_path / f"{dcm_file.stem}.png")
except Exception as e:
print(f"Skipped {dcm_file.name}: {e}")
dicom_dir_to_pngs("/data/dicom_study", "/data/png_output")
NIfTI is the standard format for neuroimaging research (brain MRI, fMRI). Use SimpleITK for robust conversion that preserves spatial metadata.
import SimpleITK as sitk
from pathlib import Path
def dicom_series_to_nifti(dicom_dir: str, output_path: str):
"""Convert a DICOM series to a NIfTI file.
Args:
dicom_dir: Directory containing DICOM files for ONE series
output_path: Output .nii.gz file path
"""
reader = sitk.ImageSeriesReader()
dicom_files = reader.GetGDCMSeriesFileNames(dicom_dir)
if not dicom_files:
raise ValueError(f"No DICOM series found in {dicom_dir}")
reader.SetFileNames(dicom_files)
reader.MetaDataDictionaryArrayUpdateOn()
reader.LoadPrivateTagsOn()
image = reader.Execute()
# Preserve orientation and spacing
sitk.WriteImage(image, output_path)
print(f"Written: {output_path}")
print(f" Size: {image.GetSize()}")
print(f" Spacing: {image.GetSpacing()}")
print(f" Origin: {image.GetOrigin()}")
print(f" Direction: {image.GetDirection()}")
def dicom_dir_to_nifti_all_series(dicom_dir: str, output_dir: str):
"""Convert all series in a directory to separate NIfTI files."""
reader = sitk.ImageSeriesReader()
series_ids = reader.GetGDCMSeriesIDs(dicom_dir)
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
for idx, series_id in enumerate(series_ids):
dicom_files = reader.GetGDCMSeriesFileNames(dicom_dir,
series_id)
reader.SetFileNames(dicom_files)
image = reader.Execute()
nifti_path = output_path / f"series_{idx:03d}_{series_id[:8]}.nii.gz"
sitk.WriteImage(image, str(nifti_path))
print(f"Series {idx}: {len(dicom_files)} files -> {nifti_path.name}")
import nibabel as nib
import pydicom
import numpy as np
from pathlib import Path
def dicom_to_nifti_manual(dicom_dir: str, output_path: str):
"""Manual DICOM to NIfTI conversion with full control."""
# Read and sort slices
slices = []
for f in Path(dicom_dir).glob("*"):
try:
ds = pydicom.dcmread(str(f))
slices.append(ds)
except:
continue
# Sort by ImagePositionPatient Z coordinate
slices.sort(key=lambda s: float(s.ImagePositionPatient[2]))
# Build 3D volume
volume = np.stack([s.pixel_array for s in slices], axis=-1)
# Apply rescale if CT
if hasattr(slices[0], "RescaleSlope"):
volume = volume * slices[0].RescaleSlope + slices[0].RescaleIntercept
# Build affine matrix from DICOM headers
ds = slices[0]
pos = [float(x) for x in ds.ImagePositionPatient]
orient = [float(x) for x in ds.ImageOrientationPatient]
spacing = [float(x) for x in ds.PixelSpacing]
# Calculate slice spacing from first two slices
if len(slices) > 1:
pos2 = [float(x) for x in slices[1].ImagePositionPatient]
slice_spacing = np.sqrt(sum((a - b) ** 2 for a, b in zip(pos, pos2)))
else:
slice_spacing = float(getattr(ds, "SliceThickness", 1.0))
# Row and column direction cosines
row_cosine = np.array(orient[:3])
col_cosine = np.array(orient[3:])
slice_cosine = np.cross(row_cosine, col_cosine)
# Build 4x4 affine
affine = np.eye(4)
affine[:3, 0] = row_cosine * spacing[1]
affine[:3, 1] = col_cosine * spacing[0]
affine[:3, 2] = slice_cosine * slice_spacing
affine[:3, 3] = pos
nifti_img = nib.Nifti1Image(volume.astype(np.float32), affine)
nib.save(nifti_img, output_path)
import pydicom
import numpy as np
from PIL import Image
from pathlib import Path
def dicom_to_png(dicom_path: str, output_path: str,
window_center: float = None,
window_width: float = None):
"""Convert a single DICOM file to PNG with optional windowing."""
ds = pydicom.dcmread(dicom_path)
pixels = ds.pixel_array.astype(np.float64)
# Apply rescale slope/intercept (CT Hounsfield units)
slope = float(getattr(ds, "RescaleSlope", 1))
intercept = float(getattr(ds, "RescaleIntercept", 0))
pixels = pixels * slope + intercept
# Apply window/level
if window_center is None:
window_center = float(getattr(ds, "WindowCenter",
(pixels.max() + pixels.min()) / 2))
if isinstance(window_center, pydicom.multival.MultiValue):
window_center = float(window_center[0])
if window_width is None:
window_width = float(getattr(ds, "WindowWidth",
pixels.max() - pixels.min()))
if isinstance(window_width, pydicom.multival.MultiValue):
window_width = float(window_width[0])
img_min = window_center - window_width / 2
img_max = window_center + window_width / 2
pixels = np.clip(pixels, img_min, img_max)
pixels = ((pixels - img_min) / (img_max - img_min) * 255).astype(np.uint8)
# Handle photometric interpretation
if getattr(ds, "PhotometricInterpretation", "") == "MONOCHROME1":
pixels = 255 - pixels # Invert
img = Image.fromarray(pixels)
img.save(output_path)
import h5py
import pydicom
import numpy as np
from pathlib import Path
from tqdm import tqdm
def build_hdf5_dataset(dicom_dirs: dict[str, str], output_path: str,
include_metadata: bool = True):
"""
Build an HDF5 dataset from DICOM directories.
Args:
dicom_dirs: Mapping of {label: dicom_directory_path}
e.g., {"train/positive": "/data/pos", "train/negative": "/data/neg"}
output_path: Path for the output .h5 file
include_metadata: Whether to store DICOM metadata alongside pixel data
"""
with h5py.File(output_path, "w") as hf:
for label, dicom_dir in dicom_dirs.items():
group = hf.create_group(label)
files = sorted(Path(dicom_dir).rglob("*.dcm"))
# Determine shape from first file
ds = pydicom.dcmread(str(files[0]))
rows, cols = ds.Rows, ds.Columns
# Pre-allocate dataset
images = group.create_dataset(
"images",
shape=(len(files), rows, cols),
dtype=np.float32,
chunks=(1, rows, cols),
compression="gzip",
compression_opts=4,
)
if include_metadata:
metadata_items = []
for i, f in enumerate(tqdm(files, desc=label)):
ds = pydicom.dcmread(str(f))
pixels = ds.pixel_array.astype(np.float32)
# Rescale
slope = float(getattr(ds, "RescaleSlope", 1))
intercept = float(getattr(ds, "RescaleIntercept", 0))
pixels = pixels * slope + intercept
images[i] = pixels
if include_metadata:
metadata_items.append({
"file": f.name,
"patient_id": str(getattr(ds, "PatientID", "")),
"modality": str(getattr(ds, "Modality", "")),
"study_date": str(getattr(ds, "StudyDate", "")),
"spacing": [float(x) for x in getattr(ds, "PixelSpacing", [1, 1])],
})
if include_metadata and metadata_items:
import json
group.attrs["metadata"] = json.dumps(metadata_items)
print(f"Dataset saved: {output_path}")
print(f" Groups: {list(dicom_dirs.keys())}")
def normalize_ct(volume: np.ndarray,
hu_min: float = -1000,
hu_max: float = 400) -> np.ndarray:
"""Clip and normalize CT Hounsfield units to [0, 1]."""
volume = np.clip(volume, hu_min, hu_max)
volume = (volume - hu_min) / (hu_max - hu_min)
return volume.astype(np.float32)
def normalize_mri(volume: np.ndarray,
percentile_lower: float = 1,
percentile_upper: float = 99) -> np.ndarray:
"""Percentile-based normalization for MRI (handles intensity inhomogeneity)."""
p_low = np.percentile(volume[volume > 0], percentile_lower)
p_high = np.percentile(volume[volume > 0], percentile_upper)
volume = np.clip(volume, p_low, p_high)
volume = (volume - p_low) / (p_high - p_low)
return volume.astype(np.float32)
def z_score_normalize(volume: np.ndarray,
mask: np.ndarray = None) -> np.ndarray:
"""Z-score normalization (zero mean, unit variance)."""
if mask is not None:
mean = volume[mask > 0].mean()
std = volume[mask > 0].std()
else:
mean = volume.mean()
std = volume.std()
return ((volume - mean) / (std + 1e-8)).astype(np.float32)
import SimpleITK as sitk
def resample_volume(image: sitk.Image,
new_spacing: tuple = (1.0, 1.0, 1.0),
interpolator=sitk.sitkLinear) -> sitk.Image:
"""Resample a 3D volume to isotropic spacing."""
original_spacing = image.GetSpacing()
original_size = image.GetSize()
new_size = [
int(round(osz * ospc / nspc))
for osz, ospc, nspc in zip(original_size, original_spacing, new_spacing)
]
resampler = sitk.ResampleImageFilter()
resampler.SetOutputSpacing(new_spacing)
resampler.SetSize(new_size)
resampler.SetOutputDirection(image.GetDirection())
resampler.SetOutputOrigin(image.GetOrigin())
resampler.SetInterpolator(interpolator)
resampler.SetDefaultPixelValue(0)
return resampler.Execute(image)
def resize_2d(pixels: np.ndarray, target_size: tuple = (256, 256)) -> np.ndarray:
"""Resize a 2D image using PIL."""
from PIL import Image
img = Image.fromarray(pixels)
img = img.resize(target_size, Image.Resampling.BILINEAR)
return np.array(img)
def center_crop_3d(volume: np.ndarray, crop_size: tuple) -> np.ndarray:
"""Center crop a 3D volume."""
d, h, w = volume.shape
cd, ch, cw = crop_size
d_start = max(0, (d - cd) // 2)
h_start = max(0, (h - ch) // 2)
w_start = max(0, (w - cw) // 2)
return volume[d_start:d_start+cd, h_start:h_start+ch, w_start:w_start+cw]
def pad_to_size(volume: np.ndarray, target_size: tuple,
pad_value: float = 0) -> np.ndarray:
"""Pad a volume to a target size."""
pad_widths = []
for current, target in zip(volume.shape, target_size):
total_pad = max(0, target - current)
pad_before = total_pad // 2
pad_after = total_pad - pad_before
pad_widths.append((pad_before, pad_after))
return np.pad(volume, pad_widths, mode="constant",
constant_values=pad_value)
import pandas as pd
import pydicom
from pathlib import Path
from tqdm import tqdm
def build_manifest(dicom_dir: str, output_csv: str = "manifest.csv") -> pd.DataFrame:
"""
Scan a DICOM directory and build a metadata manifest.
Returns a DataFrame with one row per DICOM file.
"""
records = []
dicom_path = Path(dicom_dir)
for f in tqdm(list(dicom_path.rglob("*")), desc="Scanning"):
if f.is_dir():
continue
try:
ds = pydicom.dcmread(str(f), stop_before_pixels=True)
records.append({
"filepath": str(f.relative_to(dicom_path)),
"patient_id": str(getattr(ds, "PatientID", "")),
"patient_name": str(getattr(ds, "PatientName", "")),
"study_uid": str(getattr(ds, "StudyInstanceUID", "")),
"series_uid": str(getattr(ds, "SeriesInstanceUID", "")),
"sop_uid": str(getattr(ds, "SOPInstanceUID", "")),
"modality": str(getattr(ds, "Modality", "")),
"study_date": str(getattr(ds, "StudyDate", "")),
"study_description": str(getattr(ds, "StudyDescription", "")),
"series_description": str(getattr(ds, "SeriesDescription", "")),
"instance_number": int(getattr(ds, "InstanceNumber", 0)),
"rows": int(getattr(ds, "Rows", 0)),
"columns": int(getattr(ds, "Columns", 0)),
"pixel_spacing": str(getattr(ds, "PixelSpacing", "")),
"slice_thickness": str(getattr(ds, "SliceThickness", "")),
"manufacturer": str(getattr(ds, "Manufacturer", "")),
})
except Exception:
continue
df = pd.DataFrame(records)
df.to_csv(output_csv, index=False)
print(f"Manifest: {len(df)} files, {df['study_uid'].nunique()} studies, "
f"{df['patient_id'].nunique()} patients")
return df
def dataset_statistics(manifest: pd.DataFrame):
"""Print summary statistics for a DICOM dataset."""
print(f"Total files: {len(manifest)}")
print(f"Patients: {manifest['patient_id'].nunique()}")
print(f"Studies: {manifest['study_uid'].nunique()}")
print(f"Series: {manifest['series_uid'].nunique()}")
print(f"\nModality distribution:")
print(manifest['modality'].value_counts().to_string())
print(f"\nDate range: {manifest['study_date'].min()} - {manifest['study_date'].max()}")
print(f"\nImage sizes:")
print(f" Rows: {manifest['rows'].min()}-{manifest['rows'].max()}")
print(f" Cols: {manifest['columns'].min()}-{manifest['columns'].max()}")
print(f"\nManufacturers:")
print(manifest['manufacturer'].value_counts().to_string())
"""
End-to-end pipeline: DICOM CT studies -> de-identified, normalized NIfTI volumes
"""
from pathlib import Path
import pydicom
import numpy as np
import SimpleITK as sitk
from tqdm import tqdm
import json
class CTResearchPipeline:
"""Pipeline for processing CT DICOM data for research."""
def __init__(self, output_dir: str, target_spacing: tuple = (1.0, 1.0, 1.0),
hu_window: tuple = (-1000, 400)):
self.output_dir = Path(output_dir)
self.target_spacing = target_spacing
self.hu_min, self.hu_max = hu_window
self.manifest = []
# Create output structure
(self.output_dir / "volumes").mkdir(parents=True, exist_ok=True)
(self.output_dir / "metadata").mkdir(parents=True, exist_ok=True)
def process_study(self, dicom_dir: str, subject_id: str):
"""Process a single DICOM study directory."""
print(f"Processing {subject_id}...")
# Step 1: Read DICOM series
reader = sitk.ImageSeriesReader()
series_ids = reader.GetGDCMSeriesIDs(dicom_dir)
if not series_ids:
print(f" No DICOM series found in {dicom_dir}")
return
for series_idx, series_id in enumerate(series_ids):
file_names = reader.GetGDCMSeriesFileNames(dicom_dir, series_id)
reader.SetFileNames(file_names)
reader.MetaDataDictionaryArrayUpdateOn()
image = reader.Execute()
# Step 2: Resample to target spacing
image = self._resample(image)
# Step 3: Convert to numpy for normalization
volume = sitk.GetArrayFromImage(image) # shape: (Z, Y, X)
# Step 4: Normalize HU values
volume = np.clip(volume, self.hu_min, self.hu_max)
volume = ((volume - self.hu_min) /
(self.hu_max - self.hu_min)).astype(np.float32)
# Step 5: Save as NIfTI
output_image = sitk.GetImageFromArray(volume)
output_image.CopyInformation(image)
nifti_name = f"{subject_id}_series{series_idx:02d}.nii.gz"
nifti_path = self.output_dir / "volumes" / nifti_name
sitk.WriteImage(output_image, str(nifti_path))
# Step 6: Save metadata
metadata = self._extract_metadata(reader, file_names[0],
subject_id, series_idx)
metadata["output_file"] = nifti_name
metadata["volume_shape"] = list(volume.shape)
metadata["spacing"] = list(image.GetSpacing())
meta_path = (self.output_dir / "metadata" /
f"{subject_id}_series{series_idx:02d}.json")
with open(meta_path, "w") as f:
json.dump(metadata, f, indent=2)
self.manifest.append(metadata)
print(f" Series {series_idx}: {volume.shape} -> {nifti_name}")
def _resample(self, image: sitk.Image) -> sitk.Image:
original_spacing = image.GetSpacing()
original_size = image.GetSize()
new_size = [
int(round(osz * ospc / nspc))
for osz, ospc, nspc in zip(original_size, original_spacing,
self.target_spacing)
]
resampler = sitk.ResampleImageFilter()
resampler.SetOutputSpacing(self.target_spacing)
resampler.SetSize(new_size)
resampler.SetOutputDirection(image.GetDirection())
resampler.SetOutputOrigin(image.GetOrigin())
resampler.SetInterpolator(sitk.sitkLinear)
resampler.SetDefaultPixelValue(-1000)
return resampler.Execute(image)
def _extract_metadata(self, reader, first_file: str,
subject_id: str, series_idx: int) -> dict:
ds = pydicom.dcmread(first_file, stop_before_pixels=True)
return {
"subject_id": subject_id,
"series_index": series_idx,
"modality": str(getattr(ds, "Modality", "")),
"study_description": str(getattr(ds, "StudyDescription", "")),
"series_description": str(getattr(ds, "SeriesDescription", "")),
"manufacturer": str(getattr(ds, "Manufacturer", "")),
"slice_thickness": float(getattr(ds, "SliceThickness", 0)),
"kvp": float(getattr(ds, "KVP", 0)),
}
def save_manifest(self):
manifest_path = self.output_dir / "manifest.json"
with open(manifest_path, "w") as f:
json.dump(self.manifest, f, indent=2)
print(f"\nManifest saved: {manifest_path}")
print(f"Total subjects: {len(set(m['subject_id'] for m in self.manifest))}")
print(f"Total volumes: {len(self.manifest)}")
# Usage
pipeline = CTResearchPipeline(
output_dir="/data/processed",
target_spacing=(1.0, 1.0, 1.0),
hu_window=(-1000, 400),
)
# Process each study
study_dirs = {
"SUBJ-001": "/data/raw/patient_001",
"SUBJ-002": "/data/raw/patient_002",
"SUBJ-003": "/data/raw/patient_003",
}
for subject_id, dicom_dir in study_dirs.items():
pipeline.process_study(dicom_dir, subject_id)
pipeline.save_manifest()
import torch
from torch.utils.data import Dataset
import nibabel as nib
import numpy as np
import json
from pathlib import Path
class MedicalImageDataset(Dataset):
"""PyTorch dataset for processed medical imaging volumes."""
def __init__(self, data_dir: str, transform=None,
target_key: str = None):
self.data_dir = Path(data_dir)
self.transform = transform
# Load manifest
with open(self.data_dir / "manifest.json") as f:
self.manifest = json.load(f)
# Optional: load labels
self.target_key = target_key
def __len__(self):
return len(self.manifest)
def __getitem__(self, idx):
entry = self.manifest[idx]
nifti_path = self.data_dir / "volumes" / entry["output_file"]
# Load volume
img = nib.load(str(nifti_path))
volume = img.get_fdata().astype(np.float32)
# Add channel dimension: (D, H, W) -> (1, D, H, W)
volume = np.expand_dims(volume, axis=0)
if self.transform:
volume = self.transform(volume)
sample = {
"image": torch.from_numpy(volume),
"subject_id": entry["subject_id"],
"metadata": entry,
}
if self.target_key and self.target_key in entry:
sample["label"] = entry[self.target_key]
return sample
# Usage
dataset = MedicalImageDataset("/data/processed")
dataloader = torch.utils.data.DataLoader(
dataset, batch_size=4, shuffle=True, num_workers=2,
)
for batch in dataloader:
images = batch["image"] # (B, 1, D, H, W)
print(f"Batch shape: {images.shape}")
break
ImagePositionPatient[2] or InstanceNumber before stacking into a volume. Unsorted slices produce garbled volumes.PixelSpacing is in-plane (row, column). Slice spacing must be computed from ImagePositionPatient differences or SpacingBetweenSlices (less reliable).pixel * slope + intercept to get Hounsfield units.Weekly Installs
0
Repository
GitHub Stars
5
First Seen
Jan 1, 1970
Security Audits
超能力技能使用指南:AI助手技能调用优先级与工作流程详解
45,100 周安装
Docnify自动化:通过Rube MCP和Composio工具包实现文档操作自动化
1 周安装
Docmosis自动化集成指南:通过Rube MCP与Composio实现文档生成自动化
1 周安装
Dictionary API自动化教程:通过Rube MCP和Composio实现词典API操作自动化
1 周安装
detrack-automation:自动化追踪技能,集成Claude AI提升开发效率
1 周安装
Demio自动化工具包:通过Rube MCP和Composio实现Demio操作自动化
1 周安装
Deel自动化工具:通过Rube MCP与Composio实现HR与薪资操作自动化
1 周安装
pylibjpeg packages before processing JPEG-compressed DICOM files, or you'll get cryptic errors from pixel_array.