Luke a Pro

Luke Sun

Developer & Marketer

🇺🇦
EN||

构建安全系统:综合应用

| , 20 minutes reading.

1. 为什么要关心这个问题?

你已经学习了构建块:

  • 对称加密(AES-GCM、ChaCha20)
  • 非对称加密(RSA、ECC)
  • 哈希和 MAC(SHA-256、HMAC)
  • 数字签名(ECDSA、EdDSA)
  • TLS、证书、密钥管理

但知道这些部件并不意味着你能构建安全系统。这最后一篇文章展示如何正确地组合它们。

2. 安全系统设计原则

纵深防御

┌─────────────────────────────────────────────────────────────────┐
│                        纵深防御                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   网络层       → 防火墙、TLS、网络隔离                           │
│        ↓                                                        │
│   传输层       → TLS 1.3、证书固定                               │
│        ↓                                                        │
│   应用层       → 输入验证、输出编码                              │
│        ↓                                                        │
│   数据层       → 静态加密、字段级加密                            │
│        ↓                                                        │
│   访问层       → 认证、授权、审计                                │
│                                                                 │
│   每一层防御不同的威胁                                           │
│   一层被突破不会危及整个系统                                     │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

最小权限原则

# 错误:一个密钥做所有事情
master_key = load_key()
encrypt_user_data(master_key, data)
encrypt_logs(master_key, logs)
sign_tokens(master_key, token)

# 正确:具有最小权限的独立密钥
class KeyRing:
    def __init__(self, kms):
        self.kms = kms

    def get_user_data_key(self, user_id: str) -> bytes:
        """每个用户的加密密钥"""
        return self.kms.derive_key(f"user-data:{user_id}")

    def get_log_encryption_key(self) -> bytes:
        """用于日志加密的独立密钥"""
        return self.kms.derive_key("log-encryption")

    def get_token_signing_key(self) -> bytes:
        """用于令牌签名的独立密钥"""
        return self.kms.derive_key("token-signing")

安全失败

# 错误:开放失败
def check_access(token):
    try:
        claims = verify_token(token)
        return claims.get('authorized', True)  # 默认授权!
    except Exception:
        return True  # 错误时允许!

# 正确:安全失败
def check_access(token):
    try:
        claims = verify_token(token)
        if not claims.get('authorized'):
            raise AuthorizationError("未授权")
        return claims
    except Exception as e:
        log.warning(f"访问检查失败: {e}")
        raise AuthorizationError("访问被拒绝")  # 错误时拒绝

3. 设计端到端加密系统

架构概述

┌──────────────────────────────────────────────────────────────────────────┐
│                        端到端加密消息系统                                  │
├──────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│   客户端 A                   服务器                    客户端 B           │
│   ────────                   ──────                    ────────          │
│                                                                          │
│   [私钥 A]               [公钥存储]                  [私钥 B]             │
│   [公钥 A]  ─────────>   [公钥 A] <─────────         [公钥 B]             │
│                          [公钥 B]                                        │
│                                                                          │
│   Encrypt(PubKey_B,     仅中继加密              Decrypt(PrivKey_B,       │
│           message)  ───> 消息 ───────────────>  ciphertext)              │
│                                                                          │
│   服务器无法读取消息(没有私钥访问权限)                                   │
│                                                                          │
└──────────────────────────────────────────────────────────────────────────┘

实现

from cryptography.hazmat.primitives.asymmetric import x25519
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes, serialization
import os
import json

class E2EClient:
    """端到端加密消息客户端"""

    def __init__(self, user_id: str):
        self.user_id = user_id
        self.identity_key = x25519.X25519PrivateKey.generate()
        self.public_key = self.identity_key.public_key()

    def get_public_key_bytes(self) -> bytes:
        """导出公钥用于服务器存储"""
        return self.public_key.public_bytes(
            encoding=serialization.Encoding.Raw,
            format=serialization.PublicFormat.Raw
        )

    def encrypt_message(self, recipient_public_key: bytes, message: str) -> dict:
        """为收件人加密消息"""
        # 加载收件人的公钥
        recipient_key = x25519.X25519PublicKey.from_public_bytes(recipient_public_key)

        # 生成临时密钥对以实现前向保密
        ephemeral_private = x25519.X25519PrivateKey.generate()
        ephemeral_public = ephemeral_private.public_key()

        # 派生共享密钥
        shared_secret = ephemeral_private.exchange(recipient_key)

        # 使用 HKDF 派生加密密钥
        encryption_key = HKDF(
            algorithm=hashes.SHA256(),
            length=32,
            salt=None,
            info=b"e2e-message-encryption"
        ).derive(shared_secret)

        # 加密消息
        nonce = os.urandom(12)
        aesgcm = AESGCM(encryption_key)
        ciphertext = aesgcm.encrypt(nonce, message.encode(), None)

        return {
            'ephemeral_public': ephemeral_public.public_bytes(
                encoding=serialization.Encoding.Raw,
                format=serialization.PublicFormat.Raw
            ).hex(),
            'nonce': nonce.hex(),
            'ciphertext': ciphertext.hex(),
            'sender': self.user_id
        }

    def decrypt_message(self, encrypted_message: dict) -> str:
        """解密发送给我们的消息"""
        # 加载发送者的临时公钥
        ephemeral_public = x25519.X25519PublicKey.from_public_bytes(
            bytes.fromhex(encrypted_message['ephemeral_public'])
        )

        # 派生共享密钥
        shared_secret = self.identity_key.exchange(ephemeral_public)

        # 派生解密密钥
        decryption_key = HKDF(
            algorithm=hashes.SHA256(),
            length=32,
            salt=None,
            info=b"e2e-message-encryption"
        ).derive(shared_secret)

        # 解密消息
        aesgcm = AESGCM(decryption_key)
        plaintext = aesgcm.decrypt(
            bytes.fromhex(encrypted_message['nonce']),
            bytes.fromhex(encrypted_message['ciphertext']),
            None
        )

        return plaintext.decode()


class E2EServer:
    """中继加密消息的服务器(无法读取消息)"""

    def __init__(self):
        self.public_keys = {}  # user_id -> public_key_bytes
        self.messages = {}     # user_id -> [encrypted_messages]

    def register_user(self, user_id: str, public_key: bytes):
        """存储用户的公钥"""
        self.public_keys[user_id] = public_key
        self.messages[user_id] = []

    def get_public_key(self, user_id: str) -> bytes:
        """获取用户的公钥用于加密"""
        return self.public_keys.get(user_id)

    def send_message(self, recipient_id: str, encrypted_message: dict):
        """为收件人存储加密消息"""
        # 服务器只能看到:发送者、收件人、时间戳、大小
        # 服务器无法读取消息内容
        self.messages[recipient_id].append(encrypted_message)

    def get_messages(self, user_id: str) -> list:
        """获取用户的待处理消息"""
        messages = self.messages.get(user_id, [])
        self.messages[user_id] = []  # 检索后清除
        return messages


# 使用示例
def demo_e2e():
    # 设置
    server = E2EServer()
    alice = E2EClient("alice")
    bob = E2EClient("bob")

    # 注册公钥
    server.register_user("alice", alice.get_public_key_bytes())
    server.register_user("bob", bob.get_public_key_bytes())

    # Alice 发送消息给 Bob
    bob_pubkey = server.get_public_key("bob")
    encrypted = alice.encrypt_message(bob_pubkey, "你好 Bob!这是秘密。")
    server.send_message("bob", encrypted)

    # Bob 接收并解密
    messages = server.get_messages("bob")
    for msg in messages:
        plaintext = bob.decrypt_message(msg)
        print(f"Bob 收到来自 {msg['sender']} 的消息: {plaintext}")

4. 安全 API 设计

认证层

import hmac
import hashlib
import time
from functools import wraps

class SecureAPI:
    """具有多种认证方法的安全 API"""

    def __init__(self, secret_key: bytes):
        self.secret_key = secret_key
        self.token_expiry = 3600  # 1 小时

    def create_access_token(self, user_id: str, permissions: list) -> str:
        """创建签名的访问令牌"""
        import json
        import base64

        payload = {
            'user_id': user_id,
            'permissions': permissions,
            'issued_at': int(time.time()),
            'expires_at': int(time.time()) + self.token_expiry
        }

        payload_json = json.dumps(payload, sort_keys=True)
        payload_b64 = base64.urlsafe_b64encode(payload_json.encode()).decode()

        signature = hmac.new(
            self.secret_key,
            payload_b64.encode(),
            hashlib.sha256
        ).hexdigest()

        return f"{payload_b64}.{signature}"

    def verify_token(self, token: str) -> dict:
        """验证并解码访问令牌"""
        import json
        import base64

        try:
            payload_b64, signature = token.rsplit('.', 1)

            # 验证签名
            expected_sig = hmac.new(
                self.secret_key,
                payload_b64.encode(),
                hashlib.sha256
            ).hexdigest()

            if not hmac.compare_digest(expected_sig, signature):
                raise ValueError("无效签名")

            # 解码负载
            payload = json.loads(base64.urlsafe_b64decode(payload_b64))

            # 检查过期
            if time.time() > payload['expires_at']:
                raise ValueError("令牌已过期")

            return payload

        except Exception as e:
            raise ValueError(f"令牌验证失败: {e}")

    def require_permission(self, permission: str):
        """装饰器,要求特定权限"""
        def decorator(func):
            @wraps(func)
            def wrapper(request, *args, **kwargs):
                token = request.headers.get('Authorization', '').replace('Bearer ', '')
                if not token:
                    raise PermissionError("未提供令牌")

                payload = self.verify_token(token)
                if permission not in payload.get('permissions', []):
                    raise PermissionError(f"缺少权限: {permission}")

                request.user = payload
                return func(request, *args, **kwargs)
            return wrapper
        return decorator

API 请求签名

import hmac
import hashlib
import time
import urllib.parse

class SignedAPIClient:
    """签名所有 API 请求的客户端"""

    def __init__(self, api_key: str, api_secret: str):
        self.api_key = api_key
        self.api_secret = api_secret.encode()

    def sign_request(self, method: str, path: str, body: str = "",
                     query_params: dict = None) -> dict:
        """为请求生成认证头"""
        timestamp = str(int(time.time()))
        nonce = os.urandom(16).hex()

        # 规范请求字符串
        query_string = urllib.parse.urlencode(sorted(query_params.items())) if query_params else ""
        body_hash = hashlib.sha256(body.encode()).hexdigest()

        canonical = f"{method}\n{path}\n{query_string}\n{timestamp}\n{nonce}\n{body_hash}"

        # 签名
        signature = hmac.new(
            self.api_secret,
            canonical.encode(),
            hashlib.sha256
        ).hexdigest()

        return {
            'X-API-Key': self.api_key,
            'X-Timestamp': timestamp,
            'X-Nonce': nonce,
            'X-Signature': signature
        }


class SignedAPIServer:
    """验证签名请求的服务器"""

    def __init__(self, secrets: dict):
        self.secrets = secrets  # api_key -> api_secret
        self.used_nonces = set()  # 防止重放攻击
        self.max_clock_skew = 300  # 5 分钟

    def verify_request(self, method: str, path: str, headers: dict,
                      body: str = "", query_params: dict = None) -> str:
        """验证请求签名,如果有效则返回 user_id"""

        # 提取认证头
        api_key = headers.get('X-API-Key')
        timestamp = headers.get('X-Timestamp')
        nonce = headers.get('X-Nonce')
        signature = headers.get('X-Signature')

        if not all([api_key, timestamp, nonce, signature]):
            raise ValueError("缺少认证头")

        # 检查时间戳(防止旧请求重放)
        request_time = int(timestamp)
        if abs(time.time() - request_time) > self.max_clock_skew:
            raise ValueError("请求时间戳与服务器时间相差太远")

        # 检查 nonce(防止重放攻击)
        if nonce in self.used_nonces:
            raise ValueError("Nonce 已使用")
        self.used_nonces.add(nonce)

        # 获取 API 密钥
        api_secret = self.secrets.get(api_key)
        if not api_secret:
            raise ValueError("未知的 API 密钥")

        # 验证签名
        query_string = urllib.parse.urlencode(sorted(query_params.items())) if query_params else ""
        body_hash = hashlib.sha256(body.encode()).hexdigest()
        canonical = f"{method}\n{path}\n{query_string}\n{timestamp}\n{nonce}\n{body_hash}"

        expected_sig = hmac.new(
            api_secret.encode(),
            canonical.encode(),
            hashlib.sha256
        ).hexdigest()

        if not hmac.compare_digest(expected_sig, signature):
            raise ValueError("无效签名")

        return api_key  # 返回已认证的身份

5. 数据加密模式

字段级加密

from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import json
import os

class FieldEncryption:
    """加密记录中的特定字段"""

    def __init__(self, kms):
        self.kms = kms
        self.sensitive_fields = {'ssn', 'credit_card', 'password_hash', 'private_data'}

    def encrypt_record(self, record: dict, context: str) -> dict:
        """加密记录中的敏感字段"""
        encrypted = record.copy()
        encrypted['_encrypted_fields'] = {}

        for field in self.sensitive_fields:
            if field in record:
                # 获取字段特定的密钥
                key = self.kms.derive_key(f"field:{field}:{context}")

                # 加密
                nonce = os.urandom(12)
                aesgcm = AESGCM(key)
                value_bytes = json.dumps(record[field]).encode()
                ciphertext = aesgcm.encrypt(nonce, value_bytes, field.encode())

                # 存储加密值
                encrypted['_encrypted_fields'][field] = {
                    'nonce': nonce.hex(),
                    'ciphertext': ciphertext.hex()
                }
                encrypted[field] = "[已加密]"

        return encrypted

    def decrypt_record(self, encrypted: dict, context: str) -> dict:
        """解密记录中的敏感字段"""
        record = encrypted.copy()
        encrypted_fields = record.pop('_encrypted_fields', {})

        for field, enc_data in encrypted_fields.items():
            key = self.kms.derive_key(f"field:{field}:{context}")

            aesgcm = AESGCM(key)
            plaintext = aesgcm.decrypt(
                bytes.fromhex(enc_data['nonce']),
                bytes.fromhex(enc_data['ciphertext']),
                field.encode()
            )

            record[field] = json.loads(plaintext)

        return record

可搜索加密

import hmac
import hashlib

class SearchableEncryption:
    """允许搜索加密数据而不解密所有内容"""

    def __init__(self, search_key: bytes, encryption_key: bytes):
        self.search_key = search_key
        self.encryption_key = encryption_key

    def create_search_token(self, value: str) -> str:
        """创建用于搜索的确定性令牌"""
        # 规范化值以实现一致匹配
        normalized = value.lower().strip()

        # 创建搜索令牌(确定性、单向)
        token = hmac.new(
            self.search_key,
            normalized.encode(),
            hashlib.sha256
        ).hexdigest()

        return token

    def encrypt_with_search(self, value: str) -> dict:
        """加密值同时启用搜索"""
        # 创建搜索令牌用于索引
        search_token = self.create_search_token(value)

        # 加密实际值(非确定性)
        nonce = os.urandom(12)
        aesgcm = AESGCM(self.encryption_key)
        ciphertext = aesgcm.encrypt(nonce, value.encode(), None)

        return {
            'search_token': search_token,
            'nonce': nonce.hex(),
            'ciphertext': ciphertext.hex()
        }

    def search(self, query: str, encrypted_records: list) -> list:
        """搜索加密记录"""
        search_token = self.create_search_token(query)

        # 在搜索令牌上匹配(服务器可以做这个)
        matches = [
            r for r in encrypted_records
            if r.get('search_token') == search_token
        ]

        # 解密匹配项(客户端做这个)
        results = []
        for match in matches:
            aesgcm = AESGCM(self.encryption_key)
            plaintext = aesgcm.decrypt(
                bytes.fromhex(match['nonce']),
                bytes.fromhex(match['ciphertext']),
                None
            )
            results.append(plaintext.decode())

        return results

6. 安全检查清单

部署前

┌─────────────────────────────────────────────────────────────────┐
│                      安全部署检查清单                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│ [ ] 启用 TLS 1.3,最低 TLS 1.2                                  │
│ [ ] 所有密钥在环境变量或密钥管理器中                             │
│ [ ] 代码中没有硬编码的密钥、令牌或密码                           │
│ [ ] .gitignore 包含 .env、*.pem、*.key 文件                     │
│ [ ] 密码使用 Argon2id 或 bcrypt 哈希                            │
│ [ ] 加密密钥按计划轮换                                          │
│ [ ] 安全相关操作的审计日志                                      │
│ [ ] 认证端点的速率限制                                          │
│ [ ] 所有用户输入的输入验证                                      │
│ [ ] 强制 HTTPS(HSTS 头)                                       │
│ [ ] 配置安全头(CSP、X-Frame-Options 等)                       │
│ [ ] 检查依赖项的漏洞                                            │
│ [ ] 错误消息不泄露敏感信息                                      │
│ [ ] 会话管理安全(httpOnly、secure cookies)                    │
│ [ ] 启用 CSRF 保护                                              │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

持续安全

class SecurityMonitor:
    """持续安全监控"""

    def __init__(self, alert_service):
        self.alerts = alert_service

    def check_certificate_expiry(self, cert_path: str, warn_days: int = 30):
        """在证书过期前发出警报"""
        from cryptography import x509
        from datetime import datetime, timedelta

        with open(cert_path, 'rb') as f:
            cert = x509.load_pem_x509_certificate(f.read())

        days_until_expiry = (cert.not_valid_after_utc - datetime.utcnow()).days

        if days_until_expiry < warn_days:
            self.alerts.send(
                level='warning',
                message=f"证书将在 {days_until_expiry} 天后过期"
            )

    def check_failed_auth_rate(self, window_minutes: int = 5, threshold: int = 100):
        """警报可疑的认证失败率"""
        failures = self.get_auth_failures(window_minutes)

        if failures > threshold:
            self.alerts.send(
                level='critical',
                message=f"高认证失败率: {window_minutes} 分钟内 {failures} 次"
            )

    def check_key_age(self, key_metadata: dict, max_age_days: int = 90):
        """当密钥需要轮换时发出警报"""
        from datetime import datetime

        created = datetime.fromisoformat(key_metadata['created_at'])
        age_days = (datetime.now() - created).days

        if age_days > max_age_days:
            self.alerts.send(
                level='warning',
                message=f"密钥 {key_metadata['name']} 已有 {age_days} 天"
            )

7. 要避免的常见错误

错误 1:通过隐蔽实现安全

# 错误:依赖隐蔽
def "secure"_encrypt(data):
    # "没人会发现我的自定义算法"
    result = ""
    for char in data:
        result += chr(ord(char) + 3)  # ROT3 不是加密!
    return result

# 正确:使用经过验证的算法
from cryptography.hazmat.primitives.ciphers.aead import AESGCM

def secure_encrypt(key: bytes, data: bytes) -> tuple[bytes, bytes]:
    nonce = os.urandom(12)
    aesgcm = AESGCM(key)
    return nonce, aesgcm.encrypt(nonce, data, None)

错误 2:随机性不足

import random
import secrets

# 错误:可预测的随机性
def bad_token():
    return ''.join(random.choices('abcdef0123456789', k=32))

# 正确:密码学安全的随机性
def good_token():
    return secrets.token_hex(16)

# 错误:用时间做种子
random.seed(int(time.time()))  # 可预测的!

# 正确:系统熵
secure_random = secrets.SystemRandom()

错误 3:记录敏感数据

import logging

# 错误:记录敏感数据
def authenticate_bad(username, password):
    logging.info(f"认证尝试: user={username}, pass={password}")  # 不!

# 正确:安全日志
def authenticate_good(username, password):
    logging.info(f"认证尝试: user={username}")
    # 密码永远不记录

# 正确:编辑敏感字段
def safe_log(data: dict) -> dict:
    """创建数据副本并编辑敏感字段"""
    sensitive = {'password', 'token', 'secret', 'key', 'ssn', 'credit_card'}
    return {
        k: '[已编辑]' if k.lower() in sensitive else v
        for k, v in data.items()
    }

logging.info(f"请求: {safe_log(request_data)}")

8. 本章小结

三点要记住:

  1. 纵深防御。 不要依赖任何单一的安全措施。分层防御:TLS 用于传输、加密用于存储、认证用于访问、审计用于检测。

  2. 使用经过验证的解决方案。 不要发明密码学。使用 TLS 1.3、AES-GCM、Argon2、已建立的库。安全社区已经解决了大多数问题。

  3. 安全失败,记录一切。 当出现问题时,默认拒绝访问。记录所有安全相关事件,以便你可以检测和调查攻击。

9. 系列总结

恭喜!你已经完成了开发者加密系列。

你现在理解了:

  • 对称和非对称加密如何工作
  • 何时使用 AES-GCM vs ChaCha20 vs RSA vs ECC
  • 为什么我们哈希密码以及如何正确做
  • TLS 如何保护传输中的数据
  • 如何管理密码学密钥
  • 如何构建安全系统

记住: 密码学是工具,不是解决方案。安全是关于理解威胁、做出好的设计决策,以及保持警惕。

继续学习。保持对漏洞的更新。当有疑问时,咨询安全专业人士。

祝你构建安全系统好运!