隐私与合规¶
学习目标¶
通过本文档的学习,你将能够:
- 理解核心概念和原理
- 掌握实际应用方法
- 了解最佳实践和注意事项
前置知识¶
在学习本文档之前,建议你已经掌握:
- 基础的嵌入式系统知识
- C/C++编程基础
- 相关领域的基本概念
概述¶
医疗数据的隐私保护和法规合规是医疗设备云平台的首要任务。违反隐私法规可能导致巨额罚款、法律诉讼和声誉损失。本文将介绍主要的医疗数据隐私法规及其技术实现。
1. 主要法规概述¶
1.1 HIPAA (Health Insurance Portability and Accountability Act)¶
适用范围: 美国
核心要求: - 隐私规则: 保护个人健康信息(PHI)的使用和披露 - 安全规则: 要求实施物理、技术和管理保障措施 - 违规通知规则: 数据泄露时必须通知受影响的个人
关键概念: - PHI (Protected Health Information): 受保护的健康信息 - BAA (Business Associate Agreement): 业务伙伴协议 - Minimum Necessary: 最小必要原则
1.2 GDPR (General Data Protection Regulation)¶
适用范围: 欧盟及处理欧盟居民数据的组织
核心原则: - 合法性、公平性和透明度: 数据处理必须合法、公平且透明 - 目的限制: 只能用于明确的合法目的 - 数据最小化: 只收集必要的数据 - 准确性: 确保数据准确并及时更新 - 存储限制: 不得超过必要期限存储 - 完整性和保密性: 确保数据安全
个人权利: - 访问权 - 更正权 - 删除权(被遗忘权) - 限制处理权 - 数据可携带权 - 反对权
1.3 中国《数据安全法》和《个人信息保护法》¶
适用范围: 中国境内及处理中国公民数据的组织
核心要求: - 数据分类分级: 根据重要程度进行分类分级保护 - 数据本地化: 重要数据和个人信息需在境内存储 - 跨境传输限制: 出境需要安全评估 - 安全保护义务: 采取技术和管理措施保护数据安全
2. 数据加密¶
2.1 传输加密 (Encryption in Transit)¶
TLS/SSL配置¶
from flask import Flask
import ssl
app = Flask(__name__)
# 配置TLS
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.load_cert_chain('cert.pem', 'key.pem')
# 强制使用强加密套件
context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
if __name__ == '__main__':
app.run(ssl_context=context, host='0.0.0.0', port=443)
HTTPS请求验证¶
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
class SecureAPIClient:
def __init__(self, base_url, cert_path=None):
self.base_url = base_url
self.session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("https://", adapter)
# 配置证书验证
if cert_path:
self.session.verify = cert_path
else:
self.session.verify = True # 使用系统证书
def post_patient_data(self, data, token):
"""发送患者数据(加密传输)"""
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
'X-Request-ID': str(uuid.uuid4())
}
response = self.session.post(
f'{self.base_url}/api/v1/patients',
json=data,
headers=headers,
timeout=30
)
response.raise_for_status()
return response.json()
2.2 静态加密 (Encryption at Rest)¶
数据库加密¶
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
import base64
import os
class EncryptionManager:
def __init__(self, master_key=None):
if master_key is None:
# 从环境变量或密钥管理服务获取
master_key = os.environ.get('ENCRYPTION_MASTER_KEY')
self.master_key = master_key.encode()
self.cipher = self._create_cipher()
def _create_cipher(self):
"""创建加密器"""
# 使用PBKDF2派生密钥
kdf = PBKDF2(
algorithm=hashes.SHA256(),
length=32,
salt=b'medical_device_salt', # 实际应用中应使用随机salt
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(self.master_key))
return Fernet(key)
def encrypt_field(self, plaintext):
"""加密字段"""
if plaintext is None:
return None
if isinstance(plaintext, str):
plaintext = plaintext.encode()
encrypted = self.cipher.encrypt(plaintext)
return base64.b64encode(encrypted).decode()
def decrypt_field(self, ciphertext):
"""解密字段"""
if ciphertext is None:
return None
ciphertext = base64.b64decode(ciphertext)
decrypted = self.cipher.decrypt(ciphertext)
return decrypted.decode()
# SQLAlchemy模型中使用加密
from sqlalchemy import Column, String, TypeDecorator
class EncryptedString(TypeDecorator):
"""加密字符串类型"""
impl = String
cache_ok = True
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.encryption_manager = EncryptionManager()
def process_bind_param(self, value, dialect):
"""存储时加密"""
if value is not None:
return self.encryption_manager.encrypt_field(value)
return value
def process_result_value(self, value, dialect):
"""读取时解密"""
if value is not None:
return self.encryption_manager.decrypt_field(value)
return value
# 使用示例
class Patient(Base):
__tablename__ = 'patients'
patient_id = Column(UUID(as_uuid=True), primary_key=True)
mrn = Column(String(50), unique=True, nullable=False)
# 敏感字段加密存储
ssn = Column(EncryptedString(100)) # 社会安全号
phone = Column(EncryptedString(50))
email = Column(EncryptedString(100))
address = Column(EncryptedString(500))
AWS KMS集成¶
import boto3
import base64
class KMSEncryptionManager:
def __init__(self, key_id, region='us-east-1'):
self.kms_client = boto3.client('kms', region_name=region)
self.key_id = key_id
def encrypt_data(self, plaintext):
"""使用KMS加密数据"""
if isinstance(plaintext, str):
plaintext = plaintext.encode()
response = self.kms_client.encrypt(
KeyId=self.key_id,
Plaintext=plaintext
)
# 返回base64编码的密文
return base64.b64encode(response['CiphertextBlob']).decode()
def decrypt_data(self, ciphertext):
"""使用KMS解密数据"""
ciphertext_blob = base64.b64decode(ciphertext)
response = self.kms_client.decrypt(
CiphertextBlob=ciphertext_blob
)
return response['Plaintext'].decode()
def generate_data_key(self):
"""生成数据加密密钥"""
response = self.kms_client.generate_data_key(
KeyId=self.key_id,
KeySpec='AES_256'
)
return {
'plaintext_key': response['Plaintext'],
'encrypted_key': base64.b64encode(response['CiphertextBlob']).decode()
}
def encrypt_large_data(self, data):
"""
加密大数据(使用信封加密)
1. 生成数据密钥
2. 使用数据密钥加密数据
3. 使用KMS加密数据密钥
"""
# 生成数据密钥
data_key = self.generate_data_key()
# 使用数据密钥加密数据
cipher = Fernet(base64.urlsafe_b64encode(data_key['plaintext_key']))
encrypted_data = cipher.encrypt(data.encode() if isinstance(data, str) else data)
return {
'encrypted_data': base64.b64encode(encrypted_data).decode(),
'encrypted_key': data_key['encrypted_key']
}
def decrypt_large_data(self, encrypted_data, encrypted_key):
"""解密大数据"""
# 解密数据密钥
plaintext_key = self.decrypt_data(encrypted_key)
# 使用数据密钥解密数据
cipher = Fernet(base64.urlsafe_b64encode(plaintext_key.encode()))
encrypted_data_bytes = base64.b64decode(encrypted_data)
decrypted_data = cipher.decrypt(encrypted_data_bytes)
return decrypted_data.decode()
# 使用示例
kms = KMSEncryptionManager(key_id='arn:aws:kms:us-east-1:123456789:key/abc-123')
# 加密小数据
encrypted_ssn = kms.encrypt_data('123-45-6789')
# 加密大数据(如医疗影像)
with open('medical_image.dcm', 'rb') as f:
image_data = f.read()
encrypted_image = kms.encrypt_large_data(image_data)
3. 访问控制¶
3.1 基于角色的访问控制 (RBAC)¶
from enum import Enum
from functools import wraps
from flask import request, jsonify
class Role(Enum):
ADMIN = 'admin'
DOCTOR = 'doctor'
NURSE = 'nurse'
PATIENT = 'patient'
DEVICE_OPERATOR = 'device_operator'
class Permission(Enum):
READ_PATIENT_DATA = 'read:patient_data'
WRITE_PATIENT_DATA = 'write:patient_data'
READ_DEVICE_DATA = 'read:device_data'
WRITE_DEVICE_DATA = 'write:device_data'
MANAGE_USERS = 'manage:users'
MANAGE_DEVICES = 'manage:devices'
VIEW_AUDIT_LOGS = 'view:audit_logs'
# 角色权限映射
ROLE_PERMISSIONS = {
Role.ADMIN: [
Permission.READ_PATIENT_DATA,
Permission.WRITE_PATIENT_DATA,
Permission.READ_DEVICE_DATA,
Permission.WRITE_DEVICE_DATA,
Permission.MANAGE_USERS,
Permission.MANAGE_DEVICES,
Permission.VIEW_AUDIT_LOGS
],
Role.DOCTOR: [
Permission.READ_PATIENT_DATA,
Permission.WRITE_PATIENT_DATA,
Permission.READ_DEVICE_DATA
],
Role.NURSE: [
Permission.READ_PATIENT_DATA,
Permission.READ_DEVICE_DATA,
Permission.WRITE_DEVICE_DATA
],
Role.PATIENT: [
Permission.READ_PATIENT_DATA # 只能读取自己的数据
],
Role.DEVICE_OPERATOR: [
Permission.READ_DEVICE_DATA,
Permission.WRITE_DEVICE_DATA,
Permission.MANAGE_DEVICES
]
}
class AccessControl:
@staticmethod
def has_permission(user_role, required_permission):
"""检查用户是否有权限"""
role = Role(user_role)
permissions = ROLE_PERMISSIONS.get(role, [])
return required_permission in permissions
@staticmethod
def require_permission(permission):
"""装饰器:要求特定权限"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# 从JWT token获取用户角色
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_data = verify_jwt_token(token)
if not user_data:
return jsonify({'error': 'Invalid token'}), 401
user_role = user_data.get('role')
if not AccessControl.has_permission(user_role, permission):
return jsonify({'error': 'Insufficient permissions'}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
# 使用示例
@app.route('/api/v1/patients/<patient_id>', methods=['GET'])
@AccessControl.require_permission(Permission.READ_PATIENT_DATA)
def get_patient(patient_id):
"""获取患者信息"""
# 额外检查:患者只能访问自己的数据
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_data = verify_jwt_token(token)
if user_data['role'] == Role.PATIENT.value:
if user_data['user_id'] != patient_id:
return jsonify({'error': 'Access denied'}), 403
patient = get_patient_from_db(patient_id)
return jsonify(patient)
@app.route('/api/v1/devices/<device_id>/data', methods=['POST'])
@AccessControl.require_permission(Permission.WRITE_DEVICE_DATA)
def post_device_data(device_id):
"""上传设备数据"""
data = request.json
save_device_data(device_id, data)
return jsonify({'status': 'success'}), 201
3.2 属性基于访问控制 (ABAC)¶
from typing import Dict, Any
class ABACPolicy:
def __init__(self):
self.policies = []
def add_policy(self, policy):
"""添加访问策略"""
self.policies.append(policy)
def evaluate(self, subject: Dict, resource: Dict, action: str, environment: Dict) -> bool:
"""
评估访问请求
subject: 主体属性(用户)
resource: 资源属性(数据)
action: 操作(read, write, delete)
environment: 环境属性(时间、位置等)
"""
for policy in self.policies:
if policy.matches(subject, resource, action, environment):
return policy.effect == 'allow'
# 默认拒绝
return False
class Policy:
def __init__(self, name, effect, conditions):
self.name = name
self.effect = effect # 'allow' or 'deny'
self.conditions = conditions
def matches(self, subject, resource, action, environment):
"""检查策略是否匹配"""
for condition in self.conditions:
if not condition(subject, resource, action, environment):
return False
return True
# 定义策略条件
def doctor_can_read_assigned_patients(subject, resource, action, environment):
"""医生只能读取分配给他的患者数据"""
if action != 'read':
return False
if subject.get('role') != 'doctor':
return False
assigned_patients = subject.get('assigned_patients', [])
patient_id = resource.get('patient_id')
return patient_id in assigned_patients
def only_during_business_hours(subject, resource, action, environment):
"""只在工作时间允许访问"""
from datetime import datetime
current_hour = datetime.now().hour
return 8 <= current_hour <= 18
def same_department_access(subject, resource, action, environment):
"""同部门访问"""
return subject.get('department') == resource.get('department')
# 创建ABAC策略引擎
abac = ABACPolicy()
# 添加策略
abac.add_policy(Policy(
name='DoctorReadAssignedPatients',
effect='allow',
conditions=[doctor_can_read_assigned_patients]
))
abac.add_policy(Policy(
name='BusinessHoursAccess',
effect='allow',
conditions=[only_during_business_hours, same_department_access]
))
# 使用示例
subject = {
'user_id': 'doctor-001',
'role': 'doctor',
'department': 'cardiology',
'assigned_patients': ['patient-123', 'patient-456']
}
resource = {
'patient_id': 'patient-123',
'department': 'cardiology',
'data_type': 'medical_record'
}
action = 'read'
environment = {'timestamp': datetime.now()}
if abac.evaluate(subject, resource, action, environment):
print("Access granted")
else:
print("Access denied")
3.3 OAuth 2.0 和 JWT¶
import jwt
from datetime import datetime, timedelta
from flask import request, jsonify
class AuthenticationManager:
def __init__(self, secret_key, algorithm='HS256'):
self.secret_key = secret_key
self.algorithm = algorithm
def generate_token(self, user_id, role, permissions, expires_in=3600):
"""生成JWT token"""
payload = {
'user_id': user_id,
'role': role,
'permissions': permissions,
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=expires_in),
'iss': 'medical-device-platform',
'aud': 'medical-api'
}
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
return token
def verify_token(self, token):
"""验证JWT token"""
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm],
audience='medical-api',
issuer='medical-device-platform'
)
return payload
except jwt.ExpiredSignatureError:
raise ValueError('Token has expired')
except jwt.InvalidTokenError:
raise ValueError('Invalid token')
def refresh_token(self, token):
"""刷新token"""
try:
# 验证旧token(允许过期)
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm],
options={'verify_exp': False}
)
# 生成新token
new_token = self.generate_token(
user_id=payload['user_id'],
role=payload['role'],
permissions=payload['permissions']
)
return new_token
except jwt.InvalidTokenError:
raise ValueError('Invalid token')
# OAuth 2.0 授权服务器
class OAuth2Server:
def __init__(self):
self.auth_manager = AuthenticationManager(secret_key='your-secret-key')
self.authorization_codes = {} # 临时存储授权码
def authorize(self, client_id, redirect_uri, scope, state):
"""授权端点"""
# 生成授权码
import secrets
code = secrets.token_urlsafe(32)
# 存储授权码
self.authorization_codes[code] = {
'client_id': client_id,
'redirect_uri': redirect_uri,
'scope': scope,
'expires_at': datetime.utcnow() + timedelta(minutes=10)
}
# 重定向到客户端
return f"{redirect_uri}?code={code}&state={state}"
def token(self, grant_type, code, client_id, client_secret, redirect_uri):
"""令牌端点"""
if grant_type != 'authorization_code':
return {'error': 'unsupported_grant_type'}, 400
# 验证授权码
auth_code_data = self.authorization_codes.get(code)
if not auth_code_data:
return {'error': 'invalid_grant'}, 400
if auth_code_data['client_id'] != client_id:
return {'error': 'invalid_client'}, 401
if auth_code_data['redirect_uri'] != redirect_uri:
return {'error': 'invalid_grant'}, 400
if datetime.utcnow() > auth_code_data['expires_at']:
return {'error': 'expired_token'}, 400
# 删除已使用的授权码
del self.authorization_codes[code]
# 生成访问令牌
access_token = self.auth_manager.generate_token(
user_id='user-123',
role='doctor',
permissions=['read:patient_data'],
expires_in=3600
)
refresh_token = self.auth_manager.generate_token(
user_id='user-123',
role='doctor',
permissions=['read:patient_data'],
expires_in=86400 # 24小时
)
return {
'access_token': access_token,
'token_type': 'Bearer',
'expires_in': 3600,
'refresh_token': refresh_token,
'scope': auth_code_data['scope']
}
4. 审计日志¶
4.1 审计日志系统¶
from datetime import datetime
import json
import hashlib
class AuditLogger:
def __init__(self, log_storage):
self.log_storage = log_storage
def log_access(self, user_id, resource_type, resource_id, action, result, metadata=None):
"""记录访问日志"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'resource_type': resource_type,
'resource_id': resource_id,
'action': action,
'result': result, # 'success' or 'denied'
'ip_address': metadata.get('ip_address') if metadata else None,
'user_agent': metadata.get('user_agent') if metadata else None,
'session_id': metadata.get('session_id') if metadata else None
}
# 计算日志哈希值(防篡改)
log_entry['hash'] = self._calculate_hash(log_entry)
# 存储日志
self.log_storage.store(log_entry)
return log_entry
def log_data_modification(self, user_id, table_name, record_id, operation, old_value, new_value):
"""记录数据修改日志"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'table_name': table_name,
'record_id': record_id,
'operation': operation, # 'insert', 'update', 'delete'
'old_value': old_value,
'new_value': new_value
}
log_entry['hash'] = self._calculate_hash(log_entry)
self.log_storage.store(log_entry)
return log_entry
def log_authentication(self, user_id, auth_method, result, metadata=None):
"""记录认证日志"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'auth_method': auth_method, # 'password', 'mfa', 'sso'
'result': result, # 'success' or 'failed'
'ip_address': metadata.get('ip_address') if metadata else None,
'failure_reason': metadata.get('failure_reason') if metadata else None
}
log_entry['hash'] = self._calculate_hash(log_entry)
self.log_storage.store(log_entry)
return log_entry
def _calculate_hash(self, log_entry):
"""计算日志哈希值"""
# 移除hash字段(如果存在)
entry_copy = {k: v for k, v in log_entry.items() if k != 'hash'}
# 序列化并计算哈希
entry_json = json.dumps(entry_copy, sort_keys=True)
return hashlib.sha256(entry_json.encode()).hexdigest()
def verify_log_integrity(self, log_entry):
"""验证日志完整性"""
stored_hash = log_entry.get('hash')
calculated_hash = self._calculate_hash(log_entry)
return stored_hash == calculated_hash
def query_logs(self, filters):
"""查询审计日志"""
return self.log_storage.query(filters)
# CloudWatch Logs集成
import boto3
class CloudWatchLogStorage:
def __init__(self, log_group_name, log_stream_name, region='us-east-1'):
self.client = boto3.client('logs', region_name=region)
self.log_group_name = log_group_name
self.log_stream_name = log_stream_name
# 创建日志组和日志流(如果不存在)
self._ensure_log_group_exists()
self._ensure_log_stream_exists()
def _ensure_log_group_exists(self):
try:
self.client.create_log_group(logGroupName=self.log_group_name)
except self.client.exceptions.ResourceAlreadyExistsException:
pass
def _ensure_log_stream_exists(self):
try:
self.client.create_log_stream(
logGroupName=self.log_group_name,
logStreamName=self.log_stream_name
)
except self.client.exceptions.ResourceAlreadyExistsException:
pass
def store(self, log_entry):
"""存储日志到CloudWatch"""
self.client.put_log_events(
logGroupName=self.log_group_name,
logStreamName=self.log_stream_name,
logEvents=[
{
'timestamp': int(datetime.utcnow().timestamp() * 1000),
'message': json.dumps(log_entry)
}
]
)
def query(self, filters):
"""查询日志"""
# 构建CloudWatch Insights查询
query_string = self._build_query_string(filters)
response = self.client.start_query(
logGroupName=self.log_group_name,
startTime=int((datetime.utcnow() - timedelta(days=7)).timestamp()),
endTime=int(datetime.utcnow().timestamp()),
queryString=query_string
)
query_id = response['queryId']
# 等待查询完成
while True:
result = self.client.get_query_results(queryId=query_id)
status = result['status']
if status in ['Complete', 'Failed', 'Cancelled']:
break
time.sleep(1)
return result.get('results', [])
def _build_query_string(self, filters):
"""构建查询字符串"""
conditions = []
if 'user_id' in filters:
conditions.append(f"user_id = '{filters['user_id']}'")
if 'resource_type' in filters:
conditions.append(f"resource_type = '{filters['resource_type']}'")
if 'action' in filters:
conditions.append(f"action = '{filters['action']}'")
where_clause = ' and '.join(conditions) if conditions else '1=1'
return f"""
fields @timestamp, user_id, resource_type, resource_id, action, result
| filter {where_clause}
| sort @timestamp desc
| limit 100
"""
# 使用示例
log_storage = CloudWatchLogStorage(
log_group_name='/medical-platform/audit-logs',
log_stream_name='api-access'
)
audit_logger = AuditLogger(log_storage)
# 记录访问日志
@app.route('/api/v1/patients/<patient_id>', methods=['GET'])
def get_patient(patient_id):
user_id = get_current_user_id()
try:
# 检查权限
if not has_permission(user_id, 'read:patient_data'):
audit_logger.log_access(
user_id=user_id,
resource_type='patient',
resource_id=patient_id,
action='read',
result='denied',
metadata={
'ip_address': request.remote_addr,
'user_agent': request.user_agent.string
}
)
return jsonify({'error': 'Access denied'}), 403
# 获取患者数据
patient = get_patient_from_db(patient_id)
# 记录成功访问
audit_logger.log_access(
user_id=user_id,
resource_type='patient',
resource_id=patient_id,
action='read',
result='success',
metadata={
'ip_address': request.remote_addr,
'user_agent': request.user_agent.string
}
)
return jsonify(patient)
except Exception as e:
# 记录错误
audit_logger.log_access(
user_id=user_id,
resource_type='patient',
resource_id=patient_id,
action='read',
result='error',
metadata={
'ip_address': request.remote_addr,
'error': str(e)
}
)
raise
4.2 审计报告生成¶
from datetime import datetime, timedelta
import pandas as pd
class AuditReportGenerator:
def __init__(self, audit_logger):
self.audit_logger = audit_logger
def generate_access_report(self, start_date, end_date, user_id=None):
"""生成访问报告"""
filters = {
'start_date': start_date,
'end_date': end_date
}
if user_id:
filters['user_id'] = user_id
logs = self.audit_logger.query_logs(filters)
# 转换为DataFrame
df = pd.DataFrame(logs)
# 统计分析
report = {
'total_accesses': len(df),
'successful_accesses': len(df[df['result'] == 'success']),
'denied_accesses': len(df[df['result'] == 'denied']),
'unique_users': df['user_id'].nunique(),
'most_accessed_resources': df['resource_id'].value_counts().head(10).to_dict(),
'access_by_hour': df.groupby(df['timestamp'].dt.hour).size().to_dict()
}
return report
def generate_compliance_report(self, start_date, end_date):
"""生成合规报告"""
logs = self.audit_logger.query_logs({
'start_date': start_date,
'end_date': end_date
})
df = pd.DataFrame(logs)
# HIPAA合规检查
report = {
'audit_log_completeness': self._check_log_completeness(df),
'access_control_violations': self._check_access_violations(df),
'encryption_compliance': self._check_encryption_compliance(df),
'data_retention_compliance': self._check_retention_compliance(df)
}
return report
def _check_log_completeness(self, df):
"""检查日志完整性"""
required_fields = ['timestamp', 'user_id', 'resource_type', 'action', 'result']
completeness = {}
for field in required_fields:
missing_count = df[field].isna().sum()
completeness[field] = {
'total': len(df),
'missing': missing_count,
'completeness_rate': (len(df) - missing_count) / len(df) * 100
}
return completeness
def _check_access_violations(self, df):
"""检查访问违规"""
violations = df[df['result'] == 'denied']
return {
'total_violations': len(violations),
'violations_by_user': violations['user_id'].value_counts().to_dict(),
'violations_by_resource': violations['resource_type'].value_counts().to_dict()
}
5. 数据匿名化和去标识化¶
5.1 数据脱敏技术¶
import hashlib
import random
import string
from faker import Faker
class DataAnonymizer:
def __init__(self):
self.faker = Faker()
self.hash_salt = 'medical_platform_salt'
def anonymize_patient_data(self, patient_data):
"""匿名化患者数据"""
anonymized = patient_data.copy()
# 移除直接标识符
anonymized['name'] = self._pseudonymize(patient_data['name'])
anonymized['ssn'] = self._mask_ssn(patient_data.get('ssn'))
anonymized['email'] = self._mask_email(patient_data.get('email'))
anonymized['phone'] = self._mask_phone(patient_data.get('phone'))
anonymized['address'] = self._generalize_address(patient_data.get('address'))
# 泛化准标识符
anonymized['date_of_birth'] = self._generalize_date(patient_data['date_of_birth'])
anonymized['zip_code'] = self._generalize_zip_code(patient_data.get('zip_code'))
return anonymized
def _pseudonymize(self, value):
"""假名化 - 使用一致的哈希"""
if not value:
return None
hash_input = f"{value}{self.hash_salt}"
return hashlib.sha256(hash_input.encode()).hexdigest()[:16]
def _mask_ssn(self, ssn):
"""掩码社会安全号"""
if not ssn:
return None
# 只保留后4位: XXX-XX-1234
return f"XXX-XX-{ssn[-4:]}"
def _mask_email(self, email):
"""掩码电子邮件"""
if not email:
return None
# 保留域名,掩码用户名: j***@example.com
username, domain = email.split('@')
masked_username = username[0] + '*' * (len(username) - 1)
return f"{masked_username}@{domain}"
def _mask_phone(self, phone):
"""掩码电话号码"""
if not phone:
return None
# 只保留后4位: (XXX) XXX-1234
return f"(XXX) XXX-{phone[-4:]}"
def _generalize_address(self, address):
"""泛化地址 - 只保留城市和州"""
if not address:
return None
# 假设地址格式: "123 Main St, Springfield, IL 62701"
parts = address.split(',')
if len(parts) >= 2:
return f"{parts[-2].strip()}, {parts[-1].strip()[:2]}"
return None
def _generalize_date(self, date_str):
"""泛化日期 - 只保留年份"""
if not date_str:
return None
# 假设格式: YYYY-MM-DD
return date_str[:4]
def _generalize_zip_code(self, zip_code):
"""泛化邮政编码 - 只保留前3位"""
if not zip_code:
return None
return f"{zip_code[:3]}XX"
def k_anonymize(self, dataframe, quasi_identifiers, k=5):
"""
K-匿名化
确保每个准标识符组合至少有k条记录
"""
import pandas as pd
# 按准标识符分组
grouped = dataframe.groupby(quasi_identifiers)
# 过滤掉少于k条记录的组
filtered = grouped.filter(lambda x: len(x) >= k)
return filtered
def l_diversity(self, dataframe, quasi_identifiers, sensitive_attribute, l=2):
"""
L-多样性
确保每个等价类中敏感属性至少有l个不同值
"""
import pandas as pd
# 按准标识符分组
grouped = dataframe.groupby(quasi_identifiers)
# 过滤掉敏感属性少于l个不同值的组
filtered = grouped.filter(
lambda x: x[sensitive_attribute].nunique() >= l
)
return filtered
# 使用示例
anonymizer = DataAnonymizer()
patient_data = {
'patient_id': 'P12345',
'name': 'John Smith',
'ssn': '123-45-6789',
'email': 'john.smith@example.com',
'phone': '(555) 123-4567',
'address': '123 Main St, Springfield, IL 62701',
'date_of_birth': '1980-05-15',
'zip_code': '62701',
'diagnosis': 'Hypertension'
}
anonymized_data = anonymizer.anonymize_patient_data(patient_data)
print(anonymized_data)
5.2 差分隐私¶
import numpy as np
class DifferentialPrivacy:
def __init__(self, epsilon=1.0):
"""
epsilon: 隐私预算,越小隐私保护越强
"""
self.epsilon = epsilon
def add_laplace_noise(self, true_value, sensitivity):
"""
添加拉普拉斯噪声
sensitivity: 查询的敏感度(单个记录变化对结果的最大影响)
"""
scale = sensitivity / self.epsilon
noise = np.random.laplace(0, scale)
return true_value + noise
def noisy_count(self, count):
"""带噪声的计数查询"""
# 计数查询的敏感度为1
return int(self.add_laplace_noise(count, sensitivity=1))
def noisy_sum(self, sum_value, max_value):
"""带噪声的求和查询"""
# 求和查询的敏感度为max_value
return self.add_laplace_noise(sum_value, sensitivity=max_value)
def noisy_average(self, values, min_value, max_value):
"""带噪声的平均值查询"""
true_avg = np.mean(values)
count = len(values)
# 平均值的敏感度
sensitivity = (max_value - min_value) / count
return self.add_laplace_noise(true_avg, sensitivity)
# 使用示例
dp = DifferentialPrivacy(epsilon=0.5)
# 查询患者数量(带隐私保护)
true_patient_count = 1000
noisy_count = dp.noisy_count(true_patient_count)
print(f"True count: {true_patient_count}, Noisy count: {noisy_count}")
# 查询平均心率(带隐私保护)
heart_rates = [70, 75, 80, 72, 78, 85, 68, 73, 77, 82]
noisy_avg = dp.noisy_average(heart_rates, min_value=40, max_value=200)
print(f"True average: {np.mean(heart_rates):.2f}, Noisy average: {noisy_avg:.2f}")
6. 合规检查清单¶
6.1 HIPAA合规检查¶
class HIPAAComplianceChecker:
def __init__(self):
self.checks = []
def check_encryption_at_rest(self, storage_config):
"""检查静态加密"""
result = {
'check': 'Encryption at Rest',
'status': 'pass' if storage_config.get('encryption_enabled') else 'fail',
'details': storage_config
}
self.checks.append(result)
return result
def check_encryption_in_transit(self, api_config):
"""检查传输加密"""
result = {
'check': 'Encryption in Transit',
'status': 'pass' if api_config.get('tls_enabled') else 'fail',
'details': api_config
}
self.checks.append(result)
return result
def check_access_controls(self, iam_config):
"""检查访问控制"""
has_rbac = iam_config.get('rbac_enabled', False)
has_mfa = iam_config.get('mfa_enabled', False)
result = {
'check': 'Access Controls',
'status': 'pass' if (has_rbac and has_mfa) else 'fail',
'details': {
'rbac_enabled': has_rbac,
'mfa_enabled': has_mfa
}
}
self.checks.append(result)
return result
def check_audit_logging(self, logging_config):
"""检查审计日志"""
result = {
'check': 'Audit Logging',
'status': 'pass' if logging_config.get('audit_enabled') else 'fail',
'details': logging_config
}
self.checks.append(result)
return result
def check_backup_and_recovery(self, backup_config):
"""检查备份和恢复"""
has_backup = backup_config.get('backup_enabled', False)
has_dr_plan = backup_config.get('disaster_recovery_plan', False)
result = {
'check': 'Backup and Recovery',
'status': 'pass' if (has_backup and has_dr_plan) else 'fail',
'details': backup_config
}
self.checks.append(result)
return result
def generate_report(self):
"""生成合规报告"""
total_checks = len(self.checks)
passed_checks = sum(1 for check in self.checks if check['status'] == 'pass')
report = {
'total_checks': total_checks,
'passed_checks': passed_checks,
'failed_checks': total_checks - passed_checks,
'compliance_rate': (passed_checks / total_checks * 100) if total_checks > 0 else 0,
'checks': self.checks
}
return report
# 使用示例
checker = HIPAAComplianceChecker()
# 执行检查
checker.check_encryption_at_rest({'encryption_enabled': True, 'algorithm': 'AES-256'})
checker.check_encryption_in_transit({'tls_enabled': True, 'tls_version': '1.3'})
checker.check_access_controls({'rbac_enabled': True, 'mfa_enabled': True})
checker.check_audit_logging({'audit_enabled': True, 'retention_days': 365})
checker.check_backup_and_recovery({'backup_enabled': True, 'disaster_recovery_plan': True})
# 生成报告
report = checker.generate_report()
print(f"Compliance Rate: {report['compliance_rate']:.2f}%")
6.2 GDPR合规检查¶
class GDPRComplianceManager:
def __init__(self):
self.data_registry = {}
def register_data_processing(self, purpose, legal_basis, data_categories, retention_period):
"""注册数据处理活动"""
processing_id = str(uuid.uuid4())
self.data_registry[processing_id] = {
'purpose': purpose,
'legal_basis': legal_basis, # consent, contract, legal_obligation, etc.
'data_categories': data_categories,
'retention_period': retention_period,
'registered_at': datetime.utcnow().isoformat()
}
return processing_id
def handle_data_subject_request(self, request_type, subject_id):
"""处理数据主体请求"""
if request_type == 'access':
return self._handle_access_request(subject_id)
elif request_type == 'rectification':
return self._handle_rectification_request(subject_id)
elif request_type == 'erasure':
return self._handle_erasure_request(subject_id)
elif request_type == 'portability':
return self._handle_portability_request(subject_id)
else:
raise ValueError(f"Unknown request type: {request_type}")
def _handle_access_request(self, subject_id):
"""处理访问请求 - 提供个人数据副本"""
# 收集所有相关数据
data = {
'patient_data': get_patient_data(subject_id),
'measurements': get_measurements(subject_id),
'audit_logs': get_audit_logs(subject_id)
}
return {
'request_type': 'access',
'subject_id': subject_id,
'data': data,
'generated_at': datetime.utcnow().isoformat()
}
def _handle_erasure_request(self, subject_id):
"""处理删除请求 - 删除个人数据"""
# 检查是否有法律义务保留数据
if self._has_legal_retention_requirement(subject_id):
return {
'request_type': 'erasure',
'status': 'denied',
'reason': 'Legal retention requirement'
}
# 删除数据
delete_patient_data(subject_id)
delete_measurements(subject_id)
anonymize_audit_logs(subject_id)
return {
'request_type': 'erasure',
'status': 'completed',
'subject_id': subject_id,
'completed_at': datetime.utcnow().isoformat()
}
def _handle_portability_request(self, subject_id):
"""处理数据可携带请求 - 以机器可读格式提供数据"""
data = {
'patient_data': get_patient_data(subject_id),
'measurements': get_measurements(subject_id)
}
# 转换为标准格式(如FHIR)
fhir_bundle = convert_to_fhir(data)
return {
'request_type': 'portability',
'subject_id': subject_id,
'format': 'FHIR',
'data': fhir_bundle,
'generated_at': datetime.utcnow().isoformat()
}
def check_consent(self, subject_id, purpose):
"""检查同意"""
consent = get_consent_record(subject_id, purpose)
if not consent:
return False
# 检查同意是否有效
if consent.get('withdrawn'):
return False
if consent.get('expiry_date'):
if datetime.fromisoformat(consent['expiry_date']) < datetime.utcnow():
return False
return True
7. 安全最佳实践¶
关键要点¶
- 默认安全: 系统设计时就考虑安全性
- 最小权限: 只授予必要的权限
- 纵深防御: 多层安全措施
- 持续监控: 实时监控和告警
- 定期审计: 定期进行安全审计
- 员工培训: 定期进行安全意识培训
- 事件响应: 制定并演练事件响应计划
安全检查清单¶
- 所有数据传输使用TLS 1.2+
- 敏感数据静态加密
- 实施强密码策略
- 启用多因素认证(MFA)
- 实施基于角色的访问控制(RBAC)
- 记录所有访问和修改操作
- 定期备份数据
- 测试灾难恢复计划
- 定期更新和打补丁
- 进行安全漏洞扫描
- 实施入侵检测系统(IDS)
- 制定数据泄露响应计划
总结¶
隐私保护和合规是医疗设备云平台的基础。需要:
- 了解法规: 深入理解HIPAA、GDPR等法规要求
- 技术实现: 实施加密、访问控制、审计日志等技术措施
- 流程管理: 建立数据处理流程和事件响应流程
- 持续改进: 定期审查和更新安全措施
💬 讨论区
欢迎在这里分享您的想法、提出问题或参与讨论。需要 GitHub 账号登录。