放射治疗安全联锁系统¶
安全联锁概述¶
安全联锁系统是放射治疗设备的关键安全组件,通过硬件和软件的多层保护机制,防止意外照射和过量辐射。
联锁类型¶
1. 硬件联锁¶
class HardwareInterlocks:
"""硬件联锁系统"""
def __init__(self):
self.interlocks = {
"door_interlock": DoorInterlock(),
"beam_stopper": BeamStopperInterlock(),
"emergency_stop": EmergencyStopInterlock(),
"collision_sensor": CollisionSensorInterlock(),
"patient_monitoring": PatientMonitoringInterlock()
}
def check_all_interlocks(self):
"""检查所有联锁"""
status = {}
for name, interlock in self.interlocks.items():
status[name] = interlock.is_safe()
return all(status.values()), status
def monitor_interlocks_during_treatment(self):
"""治疗期间监控联锁"""
while self.treatment_active:
safe, status = self.check_all_interlocks()
if not safe:
# 立即终止束流
self.terminate_beam()
# 记录事件
self.log_interlock_event(status)
# 通知操作员
self.alert_operator(status)
break
time.sleep(0.01) # 100Hz监控频率
2. 软件联锁¶
class SoftwareInterlocks:
"""软件联锁系统"""
def __init__(self):
self.checks = [
self.verify_patient_identity,
self.verify_treatment_plan,
self.verify_machine_parameters,
self.verify_dose_limits,
self.verify_beam_geometry,
self.verify_imaging_alignment
]
def pre_treatment_verification(self, patient, plan):
"""治疗前验证"""
results = {}
for check in self.checks:
try:
result = check(patient, plan)
results[check.__name__] = result
except Exception as e:
results[check.__name__] = False
self.log_error(f"{check.__name__} 失败: {str(e)}")
if not all(results.values()):
raise PreTreatmentVerificationError(
f"验证失败: {[k for k, v in results.items() if not v]}"
)
return True
def verify_dose_limits(self, patient, plan):
"""验证剂量限制"""
# 检查单次剂量
if plan.fraction_dose > self.max_fraction_dose:
raise DoseLimitError("单次剂量超过限制")
# 检查累积剂量
delivered_dose = self.get_delivered_dose(patient)
total_planned_dose = delivered_dose + plan.remaining_dose
if total_planned_dose > plan.prescription_dose * 1.05:
raise DoseLimitError("累积剂量超过处方剂量5%")
# 检查危及器官剂量
for oar in plan.organs_at_risk:
oar_dose = plan.get_oar_dose(oar)
if oar_dose > self.get_oar_limit(oar.name):
raise DoseLimitError(f"{oar.name}剂量超过限制")
return True
剂量监控¶
实时剂量监控¶
class DoseMonitoringSystem:
"""剂量监控系统"""
def __init__(self):
self.primary_monitor = IonizationChamber("primary")
self.secondary_monitor = IonizationChamber("secondary")
self.dose_rate_limit = 600 # MU/min
def monitor_dose_delivery(self, planned_mu):
"""监控剂量输出"""
delivered_mu = 0
start_time = time.time()
while delivered_mu < planned_mu:
# 读取主监测电离室
primary_reading = self.primary_monitor.read()
# 读取次监测电离室
secondary_reading = self.secondary_monitor.read()
# 交叉验证
if not self.cross_check(primary_reading, secondary_reading):
self.terminate_beam()
raise MonitorDisagreementError("主次监测电离室读数不一致")
# 更新已输出MU
delivered_mu = primary_reading
# 检查剂量率
elapsed_time = time.time() - start_time
current_dose_rate = delivered_mu / (elapsed_time / 60)
if current_dose_rate > self.dose_rate_limit:
self.terminate_beam()
raise DoseRateError("剂量率超过限制")
# 检查是否超过计划剂量
if delivered_mu > planned_mu * 1.02:
self.terminate_beam()
raise OverdoseError("输出剂量超过计划剂量2%")
time.sleep(0.01)
return delivered_mu
def cross_check(self, primary, secondary, tolerance=0.03):
"""交叉检查主次监测电离室"""
if abs(primary - secondary) / primary > tolerance:
return False
return True
碰撞检测¶
几何碰撞检测¶
class CollisionDetection:
"""碰撞检测系统"""
def __init__(self):
self.gantry_model = GantryGeometryModel()
self.patient_model = PatientGeometryModel()
self.couch_model = CouchGeometryModel()
self.safety_margin_mm = 50
def check_collision_risk(self, gantry_angle, couch_angle, couch_position):
"""检查碰撞风险"""
# 更新几何模型
self.gantry_model.set_angle(gantry_angle)
self.couch_model.set_angle(couch_angle)
self.couch_model.set_position(couch_position)
# 检查机架与患者
gantry_patient_distance = self.calculate_min_distance(
self.gantry_model, self.patient_model
)
if gantry_patient_distance < self.safety_margin_mm:
return CollisionRisk.HIGH, "机架接近患者"
# 检查机架与治疗床
gantry_couch_distance = self.calculate_min_distance(
self.gantry_model, self.couch_model
)
if gantry_couch_distance < self.safety_margin_mm:
return CollisionRisk.HIGH, "机架接近治疗床"
# 检查治疗床与墙壁
couch_wall_distance = self.calculate_distance_to_walls(
self.couch_model
)
if couch_wall_distance < self.safety_margin_mm:
return CollisionRisk.MEDIUM, "治疗床接近墙壁"
return CollisionRisk.NONE, "无碰撞风险"
def predict_collision_along_path(self, start_state, end_state):
"""预测运动路径上的碰撞"""
# 插值生成路径
path = self.interpolate_path(start_state, end_state, num_steps=100)
for step in path:
risk, message = self.check_collision_risk(
step.gantry_angle,
step.couch_angle,
step.couch_position
)
if risk != CollisionRisk.NONE:
return True, step, message
return False, None, "路径安全"
紧急停止¶
紧急停止系统¶
class EmergencyStopSystem:
"""紧急停止系统"""
def __init__(self):
self.estop_buttons = [
EmergencyButton("console"),
EmergencyButton("treatment_room_1"),
EmergencyButton("treatment_room_2"),
EmergencyButton("pendant")
]
self.response_time_ms = 100 # 最大响应时间
def monitor_emergency_buttons(self):
"""监控紧急停止按钮"""
while True:
for button in self.estop_buttons:
if button.is_pressed():
timestamp = time.time()
# 立即终止所有运动和束流
self.emergency_shutdown()
# 记录事件
self.log_emergency_stop(button.location, timestamp)
# 等待复位
self.wait_for_reset()
break
time.sleep(0.001) # 1kHz轮询
def emergency_shutdown(self):
"""紧急关闭"""
# 1. 终止束流(最高优先级)
self.terminate_beam_immediately()
# 2. 停止所有运动
self.stop_all_motion()
# 3. 关闭调制器
self.disable_modulator()
# 4. 激活束流阻挡器
self.activate_beam_stopper()
# 5. 设置系统状态
self.set_system_state(SystemState.EMERGENCY_STOP)
# 6. 通知所有子系统
self.broadcast_emergency_stop()
患者安全监控¶
生命体征监控¶
class PatientSafetyMonitoring:
"""患者安全监控"""
def __init__(self):
self.video_monitor = VideoMonitoringSystem()
self.audio_monitor = AudioMonitoringSystem()
self.motion_detector = MotionDetectionSystem()
def monitor_patient_during_treatment(self):
"""治疗期间监控患者"""
while self.treatment_active:
# 视频监控
video_frame = self.video_monitor.get_frame()
if self.detect_patient_distress(video_frame):
self.pause_treatment("检测到患者异常")
# 音频监控
audio_signal = self.audio_monitor.get_signal()
if self.detect_patient_call(audio_signal):
self.pause_treatment("患者呼叫")
# 运动检测
motion_level = self.motion_detector.measure_motion()
if motion_level > self.motion_threshold:
self.pause_treatment("患者运动超过阈值")
time.sleep(0.1) # 10Hz监控
def detect_patient_distress(self, video_frame):
"""检测患者异常"""
# 使用计算机视觉检测异常姿势或运动
pose = self.estimate_pose(video_frame)
if self.is_abnormal_pose(pose):
return True
return False
相关资源¶
参考标准¶
- IEC 60601-2-1: 医用电子加速器安全标准
- IEC 62083: 放射治疗计划系统安全要求
- IAEA Safety Report Series No. 17: 放射治疗事故教训
💬 讨论区
欢迎在这里分享您的想法、提出问题或参与讨论。需要 GitHub 账号登录。