healthcare-expert by personamanagmentlayer/pcl
npx skills add https://github.com/personamanagmentlayer/pcl --skill healthcare-expert为医疗健康系统、医学信息学、法规遵从性(HIPAA)以及健康数据标准(HL7、FHIR)提供专家指导。
from fhirclient import client
from fhirclient.models import patient, observation, medication
from datetime import datetime
# FHIR Client setup
settings = {
'app_id': 'my_healthcare_app',
'api_base': 'https://fhir.example.com/r4'
}
smart = client.FHIRClient(settings=settings)
# Patient resource
def create_patient(first_name, last_name, gender, birth_date):
"""创建 FHIR 患者资源"""
p = patient.Patient()
p.name = [{
'use': 'official',
'family': last_name,
'given': [first_name]
}]
p.gender = gender # 'male', 'female', 'other', 'unknown'
p.birthDate = birth_date.isoformat()
return p.create(smart.server)
# Observation resource (vital signs)
def create_vital_signs_observation(patient_id, code, value, unit):
"""创建生命体征观察记录"""
obs = observation.Observation()
obs.status = 'final'
obs.category = [{
'coding': [{
'system': 'http://terminology.hl7.org/CodeSystem/observation-category',
'code': 'vital-signs',
'display': 'Vital Signs'
}]
}]
obs.code = {
'coding': [{
'system': 'http://loinc.org',
'code': code, # e.g., '8867-4' for heart rate
'display': 'Heart rate'
}]
}
obs.subject = {'reference': f'Patient/{patient_id}'}
obs.effectiveDateTime = datetime.now().isoformat()
obs.valueQuantity = {
'value': value,
'unit': unit,
'system': 'http://unitsofmeasure.org',
'code': unit
}
return obs.create(smart.server)
# Search patients
def search_patients(family_name=None, given_name=None):
"""按姓名搜索患者"""
search = patient.Patient.where(struct={})
if family_name:
search = search.where(struct={'family': family_name})
if given_name:
search = search.where(struct={'given': given_name})
return search.perform(smart.server)
# Get patient observations
def get_patient_observations(patient_id, category=None):
"""获取患者观察记录"""
search = observation.Observation.where(struct={
'patient': patient_id
})
if category:
search = search.where(struct={'category': category})
return search.perform(smart.server)
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
import hl7
# Parse HL7 message
def parse_hl7_message(message_text):
"""解析 HL7 v2 消息"""
h = hl7.parse(message_text)
# Extract message type
message_type = str(h.segment('MSH')[9])
# Extract patient information from PID segment
pid = h.segment('PID')
patient_info = {
'patient_id': str(pid[3]),
'name': str(pid[5]),
'dob': str(pid[7]),
'gender': str(pid[8])
}
return {
'message_type': message_type,
'patient': patient_info
}
# Create ADT^A01 message (Patient Admission)
def create_admission_message(patient_id, patient_name, dob, gender):
"""创建 HL7 ADT^A01 入院消息"""
message = hl7.Message(
"MSH",
[
"MSH", "|", "^~\\&", "SENDING_APP", "SENDING_FACILITY",
"RECEIVING_APP", "RECEIVING_FACILITY",
datetime.now().strftime("%Y%m%d%H%M%S"), "",
"ADT^A01", "MSG00001", "P", "2.5"
]
)
# PID segment
message.append(hl7.Segment(
"PID",
[
"PID", "", "", patient_id, "",
patient_name, "", dob, gender
]
))
# PV1 segment (Patient Visit)
message.append(hl7.Segment(
"PV1",
[
"PV1", "", "I", "ER", "", "", "",
"", "", "", "", "", "", "",
"", "", "", "", "", "", "", ""
]
))
return str(message)
# Validate HL7 message
def validate_hl7_message(message_text):
"""验证 HL7 消息结构"""
try:
h = hl7.parse(message_text)
# Check required segments
if not h.segment('MSH'):
return False, "Missing MSH segment"
# Verify message structure
msh = h.segment('MSH')
if len(msh) < 12:
return False, "Invalid MSH segment"
return True, "Valid HL7 message"
except Exception as e:
return False, f"Parsing error: {str(e)}"
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
import hashlib
import logging
from datetime import datetime
class HIPAACompliantLogger:
"""符合 HIPAA 要求的日志记录系统"""
def __init__(self, log_file):
self.logger = logging.getLogger('hipaa_audit')
self.logger.setLevel(logging.INFO)
handler = logging.FileHandler(log_file)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_access(self, user_id, patient_id, action, phi_accessed):
"""记录受保护健康信息访问(HIPAA 审计要求)"""
self.logger.info(
f"USER:{user_id} | PATIENT:{patient_id} | "
f"ACTION:{action} | PHI:{phi_accessed}"
)
def log_modification(self, user_id, resource_type, resource_id, changes):
"""记录数据修改"""
self.logger.info(
f"USER:{user_id} | MODIFIED:{resource_type}/{resource_id} | "
f"CHANGES:{changes}"
)
def log_disclosure(self, user_id, patient_id, recipient, purpose):
"""记录受保护健康信息披露"""
self.logger.info(
f"DISCLOSURE | USER:{user_id} | PATIENT:{patient_id} | "
f"TO:{recipient} | PURPOSE:{purpose}"
)
class PHIEncryption:
"""受保护健康信息加密"""
def __init__(self, master_key):
self.fernet = Fernet(master_key)
def encrypt_phi(self, data):
"""加密受保护健康信息数据"""
if isinstance(data, str):
data = data.encode()
return self.fernet.encrypt(data)
def decrypt_phi(self, encrypted_data):
"""解密受保护健康信息数据"""
decrypted = self.fernet.decrypt(encrypted_data)
return decrypted.decode()
@staticmethod
def hash_identifier(identifier):
"""哈希患者标识符以进行去标识化"""
return hashlib.sha256(identifier.encode()).hexdigest()
class HIPAAAccessControl:
"""符合 HIPAA 要求的基于角色的访问控制"""
ROLES = {
'physician': ['read', 'write', 'prescribe'],
'nurse': ['read', 'write'],
'administrative': ['read'],
'patient': ['read_own']
}
def __init__(self, user_role):
self.role = user_role
self.permissions = self.ROLES.get(user_role, [])
def can_access(self, action, patient_id, user_patient_id=None):
"""检查用户是否可以执行操作"""
if action not in self.permissions:
if action == 'read' and 'read_own' in self.permissions:
return patient_id == user_patient_id
return False
return True
def require_permission(self, action):
"""用于强制执行权限的装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
if action not in self.permissions:
raise PermissionError(
f"Role '{self.role}' lacks permission: {action}"
)
return func(*args, **kwargs)
return wrapper
return decorator
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
@dataclass
class Patient:
"""患者记录"""
patient_id: str
mrn: str # Medical Record Number
first_name: str
last_name: str
dob: datetime
gender: str
ssn: Optional[str] # Encrypted
address: dict
phone: str
email: str
emergency_contact: dict
insurance: dict
@dataclass
class Encounter:
"""临床就诊记录"""
encounter_id: str
patient_id: str
encounter_date: datetime
encounter_type: str # 'inpatient', 'outpatient', 'emergency'
chief_complaint: str
provider_id: str
facility_id: str
diagnosis_codes: List[str] # ICD-10
procedure_codes: List[str] # CPT
notes: str
@dataclass
class Medication:
"""药物医嘱"""
medication_id: str
patient_id: str
drug_name: str
dosage: str
frequency: str
route: str # 'oral', 'IV', etc.
start_date: datetime
end_date: Optional[datetime]
prescriber_id: str
pharmacy_notes: str
class EHRSystem:
"""电子健康档案系统"""
def __init__(self, db, logger, access_control, encryption):
self.db = db
self.logger = logger
self.access_control = access_control
self.encryption = encryption
def get_patient_record(self, user_id, patient_id):
"""检索患者记录并进行审计日志记录"""
# 检查权限
if not self.access_control.can_access('read', patient_id):
self.logger.log_access(
user_id, patient_id, 'DENIED', 'patient_record'
)
raise PermissionError("Access denied")
# 记录访问
self.logger.log_access(
user_id, patient_id, 'READ', 'patient_record'
)
# 检索并解密
patient = self.db.get_patient(patient_id)
if patient.ssn:
patient.ssn = self.encryption.decrypt_phi(patient.ssn)
return patient
def create_encounter(self, user_id, encounter: Encounter):
"""创建临床就诊记录"""
if not self.access_control.can_access('write', encounter.patient_id):
raise PermissionError("Cannot create encounter")
# 加密敏感数据
if encounter.notes:
encounter.notes = self.encryption.encrypt_phi(encounter.notes)
# 保存就诊记录
self.db.save_encounter(encounter)
# 记录创建
self.logger.log_modification(
user_id, 'encounter', encounter.encounter_id, 'created'
)
return encounter
def get_patient_medications(self, user_id, patient_id):
"""获取患者的当前用药"""
if not self.access_control.can_access('read', patient_id):
raise PermissionError("Access denied")
self.logger.log_access(
user_id, patient_id, 'READ', 'medications'
)
return self.db.get_active_medications(patient_id)
def prescribe_medication(self, user_id, medication: Medication):
"""开具新药处方"""
if not self.access_control.can_access('prescribe', medication.patient_id):
raise PermissionError("Cannot prescribe medication")
# 药物相互作用检查
active_meds = self.get_patient_medications(user_id, medication.patient_id)
interactions = self.check_drug_interactions(medication, active_meds)
if interactions:
return {'status': 'warning', 'interactions': interactions}
self.db.save_medication(medication)
self.logger.log_modification(
user_id, 'medication', medication.medication_id, 'prescribed'
)
return {'status': 'success', 'medication_id': medication.medication_id}
def check_drug_interactions(self, new_med, existing_meds):
"""检查药物相互作用"""
# 此处将与药物相互作用数据库集成
interactions = []
# 实现将对照药物相互作用数据库进行检查
return interactions
❌ 存储未加密的受保护健康信息 ❌ 没有审计日志记录 ❌ 访问控制不足 ❌ 使用专有格式 ❌ 没有数据备份策略 ❌ 忽视互操作性标准 ❌ 弱身份验证
每周安装量
114
代码仓库
GitHub 星标数
11
首次出现
2026年1月24日
安全审计
安装于
opencode105
codex104
gemini-cli100
cursor98
github-copilot97
amp93
Expert guidance for healthcare systems, medical informatics, regulatory compliance (HIPAA), and health data standards (HL7, FHIR).
from fhirclient import client
from fhirclient.models import patient, observation, medication
from datetime import datetime
# FHIR Client setup
settings = {
'app_id': 'my_healthcare_app',
'api_base': 'https://fhir.example.com/r4'
}
smart = client.FHIRClient(settings=settings)
# Patient resource
def create_patient(first_name, last_name, gender, birth_date):
"""Create FHIR Patient resource"""
p = patient.Patient()
p.name = [{
'use': 'official',
'family': last_name,
'given': [first_name]
}]
p.gender = gender # 'male', 'female', 'other', 'unknown'
p.birthDate = birth_date.isoformat()
return p.create(smart.server)
# Observation resource (vital signs)
def create_vital_signs_observation(patient_id, code, value, unit):
"""Create vital signs observation"""
obs = observation.Observation()
obs.status = 'final'
obs.category = [{
'coding': [{
'system': 'http://terminology.hl7.org/CodeSystem/observation-category',
'code': 'vital-signs',
'display': 'Vital Signs'
}]
}]
obs.code = {
'coding': [{
'system': 'http://loinc.org',
'code': code, # e.g., '8867-4' for heart rate
'display': 'Heart rate'
}]
}
obs.subject = {'reference': f'Patient/{patient_id}'}
obs.effectiveDateTime = datetime.now().isoformat()
obs.valueQuantity = {
'value': value,
'unit': unit,
'system': 'http://unitsofmeasure.org',
'code': unit
}
return obs.create(smart.server)
# Search patients
def search_patients(family_name=None, given_name=None):
"""Search for patients by name"""
search = patient.Patient.where(struct={})
if family_name:
search = search.where(struct={'family': family_name})
if given_name:
search = search.where(struct={'given': given_name})
return search.perform(smart.server)
# Get patient observations
def get_patient_observations(patient_id, category=None):
"""Retrieve patient observations"""
search = observation.Observation.where(struct={
'patient': patient_id
})
if category:
search = search.where(struct={'category': category})
return search.perform(smart.server)
import hl7
# Parse HL7 message
def parse_hl7_message(message_text):
"""Parse HL7 v2 message"""
h = hl7.parse(message_text)
# Extract message type
message_type = str(h.segment('MSH')[9])
# Extract patient information from PID segment
pid = h.segment('PID')
patient_info = {
'patient_id': str(pid[3]),
'name': str(pid[5]),
'dob': str(pid[7]),
'gender': str(pid[8])
}
return {
'message_type': message_type,
'patient': patient_info
}
# Create ADT^A01 message (Patient Admission)
def create_admission_message(patient_id, patient_name, dob, gender):
"""Create HL7 ADT^A01 admission message"""
message = hl7.Message(
"MSH",
[
"MSH", "|", "^~\\&", "SENDING_APP", "SENDING_FACILITY",
"RECEIVING_APP", "RECEIVING_FACILITY",
datetime.now().strftime("%Y%m%d%H%M%S"), "",
"ADT^A01", "MSG00001", "P", "2.5"
]
)
# PID segment
message.append(hl7.Segment(
"PID",
[
"PID", "", "", patient_id, "",
patient_name, "", dob, gender
]
))
# PV1 segment (Patient Visit)
message.append(hl7.Segment(
"PV1",
[
"PV1", "", "I", "ER", "", "", "",
"", "", "", "", "", "", "",
"", "", "", "", "", "", "", ""
]
))
return str(message)
# Validate HL7 message
def validate_hl7_message(message_text):
"""Validate HL7 message structure"""
try:
h = hl7.parse(message_text)
# Check required segments
if not h.segment('MSH'):
return False, "Missing MSH segment"
# Verify message structure
msh = h.segment('MSH')
if len(msh) < 12:
return False, "Invalid MSH segment"
return True, "Valid HL7 message"
except Exception as e:
return False, f"Parsing error: {str(e)}"
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
import hashlib
import logging
from datetime import datetime
class HIPAACompliantLogger:
"""HIPAA-compliant logging system"""
def __init__(self, log_file):
self.logger = logging.getLogger('hipaa_audit')
self.logger.setLevel(logging.INFO)
handler = logging.FileHandler(log_file)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_access(self, user_id, patient_id, action, phi_accessed):
"""Log PHI access (HIPAA audit requirement)"""
self.logger.info(
f"USER:{user_id} | PATIENT:{patient_id} | "
f"ACTION:{action} | PHI:{phi_accessed}"
)
def log_modification(self, user_id, resource_type, resource_id, changes):
"""Log data modifications"""
self.logger.info(
f"USER:{user_id} | MODIFIED:{resource_type}/{resource_id} | "
f"CHANGES:{changes}"
)
def log_disclosure(self, user_id, patient_id, recipient, purpose):
"""Log PHI disclosure"""
self.logger.info(
f"DISCLOSURE | USER:{user_id} | PATIENT:{patient_id} | "
f"TO:{recipient} | PURPOSE:{purpose}"
)
class PHIEncryption:
"""Encryption for Protected Health Information"""
def __init__(self, master_key):
self.fernet = Fernet(master_key)
def encrypt_phi(self, data):
"""Encrypt PHI data"""
if isinstance(data, str):
data = data.encode()
return self.fernet.encrypt(data)
def decrypt_phi(self, encrypted_data):
"""Decrypt PHI data"""
decrypted = self.fernet.decrypt(encrypted_data)
return decrypted.decode()
@staticmethod
def hash_identifier(identifier):
"""Hash patient identifiers for de-identification"""
return hashlib.sha256(identifier.encode()).hexdigest()
class HIPAAAccessControl:
"""Role-based access control for HIPAA compliance"""
ROLES = {
'physician': ['read', 'write', 'prescribe'],
'nurse': ['read', 'write'],
'administrative': ['read'],
'patient': ['read_own']
}
def __init__(self, user_role):
self.role = user_role
self.permissions = self.ROLES.get(user_role, [])
def can_access(self, action, patient_id, user_patient_id=None):
"""Check if user can perform action"""
if action not in self.permissions:
if action == 'read' and 'read_own' in self.permissions:
return patient_id == user_patient_id
return False
return True
def require_permission(self, action):
"""Decorator for enforcing permissions"""
def decorator(func):
def wrapper(*args, **kwargs):
if action not in self.permissions:
raise PermissionError(
f"Role '{self.role}' lacks permission: {action}"
)
return func(*args, **kwargs)
return wrapper
return decorator
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
@dataclass
class Patient:
"""Patient record"""
patient_id: str
mrn: str # Medical Record Number
first_name: str
last_name: str
dob: datetime
gender: str
ssn: Optional[str] # Encrypted
address: dict
phone: str
email: str
emergency_contact: dict
insurance: dict
@dataclass
class Encounter:
"""Clinical encounter"""
encounter_id: str
patient_id: str
encounter_date: datetime
encounter_type: str # 'inpatient', 'outpatient', 'emergency'
chief_complaint: str
provider_id: str
facility_id: str
diagnosis_codes: List[str] # ICD-10
procedure_codes: List[str] # CPT
notes: str
@dataclass
class Medication:
"""Medication order"""
medication_id: str
patient_id: str
drug_name: str
dosage: str
frequency: str
route: str # 'oral', 'IV', etc.
start_date: datetime
end_date: Optional[datetime]
prescriber_id: str
pharmacy_notes: str
class EHRSystem:
"""Electronic Health Record system"""
def __init__(self, db, logger, access_control, encryption):
self.db = db
self.logger = logger
self.access_control = access_control
self.encryption = encryption
def get_patient_record(self, user_id, patient_id):
"""Retrieve patient record with audit logging"""
# Check permissions
if not self.access_control.can_access('read', patient_id):
self.logger.log_access(
user_id, patient_id, 'DENIED', 'patient_record'
)
raise PermissionError("Access denied")
# Log access
self.logger.log_access(
user_id, patient_id, 'READ', 'patient_record'
)
# Retrieve and decrypt
patient = self.db.get_patient(patient_id)
if patient.ssn:
patient.ssn = self.encryption.decrypt_phi(patient.ssn)
return patient
def create_encounter(self, user_id, encounter: Encounter):
"""Create clinical encounter"""
if not self.access_control.can_access('write', encounter.patient_id):
raise PermissionError("Cannot create encounter")
# Encrypt sensitive data
if encounter.notes:
encounter.notes = self.encryption.encrypt_phi(encounter.notes)
# Save encounter
self.db.save_encounter(encounter)
# Log creation
self.logger.log_modification(
user_id, 'encounter', encounter.encounter_id, 'created'
)
return encounter
def get_patient_medications(self, user_id, patient_id):
"""Get active medications for patient"""
if not self.access_control.can_access('read', patient_id):
raise PermissionError("Access denied")
self.logger.log_access(
user_id, patient_id, 'READ', 'medications'
)
return self.db.get_active_medications(patient_id)
def prescribe_medication(self, user_id, medication: Medication):
"""Prescribe new medication"""
if not self.access_control.can_access('prescribe', medication.patient_id):
raise PermissionError("Cannot prescribe medication")
# Drug interaction check
active_meds = self.get_patient_medications(user_id, medication.patient_id)
interactions = self.check_drug_interactions(medication, active_meds)
if interactions:
return {'status': 'warning', 'interactions': interactions}
self.db.save_medication(medication)
self.logger.log_modification(
user_id, 'medication', medication.medication_id, 'prescribed'
)
return {'status': 'success', 'medication_id': medication.medication_id}
def check_drug_interactions(self, new_med, existing_meds):
"""Check for drug-drug interactions"""
# This would integrate with a drug interaction database
interactions = []
# Implementation would check against drug interaction database
return interactions
❌ Storing PHI unencrypted ❌ No audit logging ❌ Inadequate access controls ❌ Using proprietary formats ❌ No data backup strategy ❌ Ignoring interoperability standards ❌ Weak authentication
Weekly Installs
114
Repository
GitHub Stars
11
First Seen
Jan 24, 2026
Security Audits
Gen Agent Trust HubFailSocketPassSnykPass
Installed on
opencode105
codex104
gemini-cli100
cursor98
github-copilot97
amp93
供应商协议状态检查工具 - vendor-check | 合同管理、法律合规与供应商关系自动化
456 周安装
NativeWind v4 Expo 配置指南:React Native Tailwind CSS 样式库集成教程
139 周安装
企业风险管理专家 | 金融风险建模与合规框架 | 定量风险分析解决方案
137 周安装
2026内容策略与GEO优化指南:定位、信任建设与AI时代营销自动化
138 周安装
使用 bslib 构建现代化 Shiny 应用:Bootstrap 5 仪表盘与 UI 组件库
142 周安装
brand.yml 教程:为 Shiny 和 Quarto 创建品牌配置文件,实现统一品牌形象
146 周安装
Nansen Search:区块链数据搜索工具,支持代币和实体查询
138 周安装