Luke a Pro

Luke Sun

Developer & Marketer

🇺🇦
EN||

密钥管理:密码学最难的部分

| , 20 minutes reading.

1. 为什么要关心这个问题?

你已经实现了 AES-256-GCM 加密。你的数据受到最强密码之一的保护。但密钥在哪里?

# 你的安全加密
key = b"my-super-secret-key-12345678901"  # 🔥 硬编码!
ciphertext = encrypt(key, data)

那个硬编码的密钥使你的加密变成了表演。任何能访问你代码的人都能访问你所有的加密数据。

密钥管理是大多数密码系统失败的地方。让我们来解决这个问题。

2. 密钥生命周期

┌─────────────────────────────────────────────────────────────────┐
│                        密钥生命周期                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  生成 → 分发 → 存储 → 使用 → 轮换 → 销毁
│   │      │      │      │      │      │
│   ▼      ▼      ▼      ▼      ▼      ▼
│  安全   安全   安全   最小化  定期   安全
│  随机   通道   位置   暴露   计划   删除
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

每个阶段都有自己的风险和要求。

3. 密钥生成

正确的方式

import os
import secrets

# 正确:使用密码学安全的随机性
key_256 = secrets.token_bytes(32)  # 256 位
key_128 = secrets.token_bytes(16)  # 128 位

# 或使用 os.urandom(等效)
key = os.urandom(32)

# 对于特定算法,使用它们的密钥生成
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
key = AESGCM.generate_key(bit_length=256)

from cryptography.hazmat.primitives.asymmetric import ed25519
private_key = ed25519.Ed25519PrivateKey.generate()

错误的方式

import random
import hashlib

# 错误:使用 random 模块(不是密码学安全的)
key = bytes([random.randint(0, 255) for _ in range(32)])

# 错误:从可预测的来源派生
key = hashlib.sha256(b"password").digest()

# 错误:使用时间戳
key = hashlib.sha256(str(time.time()).encode()).digest()

# 错误:使用用户名或其他可预测数据
key = hashlib.sha256(username.encode()).digest()

从密码派生密钥

from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
import os

def derive_key_from_password(password: str, salt: bytes = None) -> tuple[bytes, bytes]:
    """从密码派生加密密钥"""
    if salt is None:
        salt = os.urandom(16)

    # scrypt 是内存困难的(首选)
    kdf = Scrypt(
        salt=salt,
        length=32,
        n=2**17,
        r=8,
        p=1
    )
    key = kdf.derive(password.encode())

    return key, salt

# 使用
password = "用户的密码短语"
key, salt = derive_key_from_password(password)
# 将盐值与加密数据一起存储
# 永远不要存储密码或派生的密钥

4. 密钥存储

存储选项(从最差到最好)

┌─────────────────────────────────────────────────────────────────┐
│ 选项                │ 安全性 │ 使用场景                         │
├─────────────────────┼────────┼────────────────────────────────┤
│ 源代码中硬编码      │ ✗✗✗    │ 永远不要                        │
│ 配置文件            │ ✗✗     │ 仅开发环境                      │
│ 环境变量            │ ✗      │ 简单部署                        │
│ 加密文件            │ ○      │ 当 HSM 不可用时                 │
│ 密钥管理器          │ ✓      │ 云部署                          │
│ HSM/KMS             │ ✓✓     │ 生产环境、合规                  │
│ 硬件密钥            │ ✓✓✓    │ 最高安全性                      │
└─────────────────────────────────────────────────────────────────┘

环境变量

import os

# 比硬编码略有改进
def get_encryption_key():
    key_hex = os.environ.get('ENCRYPTION_KEY')
    if not key_hex:
        raise ValueError("ENCRYPTION_KEY 环境变量未设置")
    return bytes.fromhex(key_hex)

# 在环境中设置(不是在代码中!)
# export ENCRYPTION_KEY=$(python -c "import secrets; print(secrets.token_hex(32))")

加密密钥文件

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
import os
import json

class KeyFile:
    """在加密文件中存储加密密钥"""

    def __init__(self, filepath: str, master_password: str):
        self.filepath = filepath
        self.master_key = self._derive_master_key(master_password)

    def _derive_master_key(self, password: str) -> bytes:
        # 在生产中,单独存储盐值或在文件头中
        salt = b"static-salt-for-demo"  # 生产中使用随机盐值!
        kdf = Scrypt(salt=salt, length=32, n=2**17, r=8, p=1)
        return kdf.derive(password.encode())

    def store_key(self, key_name: str, key_value: bytes):
        """在加密文件中存储密钥"""
        keys = self._load_all()
        keys[key_name] = key_value.hex()
        self._save_all(keys)

    def get_key(self, key_name: str) -> bytes:
        """从加密文件中检索密钥"""
        keys = self._load_all()
        if key_name not in keys:
            raise KeyError(f"密钥 '{key_name}' 未找到")
        return bytes.fromhex(keys[key_name])

    def _load_all(self) -> dict:
        if not os.path.exists(self.filepath):
            return {}

        with open(self.filepath, 'rb') as f:
            encrypted = f.read()

        fernet = Fernet(self._to_fernet_key(self.master_key))
        decrypted = fernet.decrypt(encrypted)
        return json.loads(decrypted)

    def _save_all(self, keys: dict):
        data = json.dumps(keys).encode()
        fernet = Fernet(self._to_fernet_key(self.master_key))
        encrypted = fernet.encrypt(data)

        with open(self.filepath, 'wb') as f:
            f.write(encrypted)

    def _to_fernet_key(self, key: bytes) -> bytes:
        import base64
        return base64.urlsafe_b64encode(key)

# 使用
keyfile = KeyFile("/secure/keys.enc", os.environ['KEY_FILE_PASSWORD'])
keyfile.store_key("database_key", os.urandom(32))
db_key = keyfile.get_key("database_key")

云 KMS 集成

# AWS KMS
import boto3

class AWSKeyManager:
    def __init__(self, key_id: str):
        self.kms = boto3.client('kms')
        self.key_id = key_id

    def encrypt(self, plaintext: bytes) -> bytes:
        response = self.kms.encrypt(
            KeyId=self.key_id,
            Plaintext=plaintext
        )
        return response['CiphertextBlob']

    def decrypt(self, ciphertext: bytes) -> bytes:
        response = self.kms.decrypt(
            KeyId=self.key_id,
            CiphertextBlob=ciphertext
        )
        return response['Plaintext']

    def generate_data_key(self) -> tuple[bytes, bytes]:
        """生成由 KMS 加密的数据密钥"""
        response = self.kms.generate_data_key(
            KeyId=self.key_id,
            KeySpec='AES_256'
        )
        # 返回明文密钥供立即使用
        # 存储加密密钥供以后检索
        return response['Plaintext'], response['CiphertextBlob']

# 使用
km = AWSKeyManager('alias/my-master-key')
plaintext_key, encrypted_key = km.generate_data_key()
# 使用 plaintext_key 进行加密
# 将 encrypted_key 与密文一起存储
# 从内存中丢弃 plaintext_key
# Google Cloud KMS
from google.cloud import kms

class GCPKeyManager:
    def __init__(self, project_id: str, location: str, keyring: str, key_name: str):
        self.client = kms.KeyManagementServiceClient()
        self.key_path = self.client.crypto_key_path(
            project_id, location, keyring, key_name
        )

    def encrypt(self, plaintext: bytes) -> bytes:
        response = self.client.encrypt(
            request={'name': self.key_path, 'plaintext': plaintext}
        )
        return response.ciphertext

    def decrypt(self, ciphertext: bytes) -> bytes:
        response = self.client.decrypt(
            request={'name': self.key_path, 'ciphertext': ciphertext}
        )
        return response.plaintext

5. 密钥层次结构

信封加密

主密钥(在 HSM/KMS 中)

    ├── 解密 → 数据加密密钥 1(加密的)
    │                 └── 加密 → 数据集 A

    ├── 解密 → 数据加密密钥 2(加密的)
    │                 └── 加密 → 数据集 B

    └── 解密 → 数据加密密钥 3(加密的)
                      └── 加密 → 数据集 C

好处:
- 主密钥永远不离开 HSM
- 数据密钥可以独立轮换
- 数据密钥与加密数据一起存储(加密形式)
- 泄露的数据密钥只影响一个数据集

实现

import os
from cryptography.hazmat.primitives.ciphers.aead import AESGCM

class EnvelopeEncryption:
    """使用 KMS 保护的主密钥进行信封加密"""

    def __init__(self, kms_client):
        self.kms = kms_client

    def encrypt(self, plaintext: bytes) -> dict:
        """使用信封加密加密数据"""
        # 为此加密生成新的数据密钥
        dek_plaintext, dek_encrypted = self.kms.generate_data_key()

        # 用 DEK 加密数据
        nonce = os.urandom(12)
        aesgcm = AESGCM(dek_plaintext)
        ciphertext = aesgcm.encrypt(nonce, plaintext, None)

        # 从内存中清除 DEK(Python 限制,但尽力而为)
        del dek_plaintext

        return {
            'encrypted_dek': dek_encrypted,
            'nonce': nonce,
            'ciphertext': ciphertext
        }

    def decrypt(self, encrypted_data: dict) -> bytes:
        """使用信封加密解密数据"""
        # 使用 KMS 解密 DEK
        dek = self.kms.decrypt(encrypted_data['encrypted_dek'])

        # 用 DEK 解密数据
        aesgcm = AESGCM(dek)
        plaintext = aesgcm.decrypt(
            encrypted_data['nonce'],
            encrypted_data['ciphertext'],
            None
        )

        # 从内存中清除 DEK
        del dek

        return plaintext

6. 密钥轮换

为什么要轮换密钥?

密钥轮换的原因:
1. 限制潜在泄露的暴露
2. 合规要求(PCI-DSS 等)
3. 人员变动(有密钥访问权限的人离开)
4. 算法更新(SHA-1 → SHA-256)
5. 密钥材料耗尽(对于好的密码理论上的问题)

轮换策略:
- 基于时间:每 N 天/月轮换
- 基于事件:在安全事件时轮换
- 基于使用:在 N 次加密后轮换

轮换实现

from datetime import datetime, timedelta
import json

class RotatingKeyManager:
    """管理带版本控制的密钥轮换"""

    def __init__(self, storage, kms):
        self.storage = storage
        self.kms = kms

    def get_current_key(self, key_name: str) -> tuple[bytes, int]:
        """获取当前活动密钥及其版本"""
        metadata = self._get_metadata(key_name)
        current_version = metadata['current_version']
        encrypted_key = metadata['versions'][str(current_version)]['key']
        key = self.kms.decrypt(encrypted_key)
        return key, current_version

    def get_key_by_version(self, key_name: str, version: int) -> bytes:
        """获取特定密钥版本用于解密"""
        metadata = self._get_metadata(key_name)
        version_data = metadata['versions'].get(str(version))
        if not version_data:
            raise KeyError(f"密钥版本 {version} 未找到")
        return self.kms.decrypt(version_data['key'])

    def rotate_key(self, key_name: str) -> int:
        """创建新密钥版本并设为当前"""
        metadata = self._get_metadata(key_name)

        # 生成新密钥
        new_key = os.urandom(32)
        encrypted_key = self.kms.encrypt(new_key)

        # 创建新版本
        new_version = metadata['current_version'] + 1
        metadata['versions'][str(new_version)] = {
            'key': encrypted_key,
            'created_at': datetime.now().isoformat(),
            'status': 'active'
        }

        # 更新当前版本
        metadata['current_version'] = new_version
        metadata['versions'][str(new_version - 1)]['status'] = 'decrypt-only'

        self._save_metadata(key_name, metadata)

        return new_version

    def encrypt_with_rotation(self, key_name: str, plaintext: bytes) -> dict:
        """使用当前密钥加密,包含版本供以后解密"""
        key, version = self.get_current_key(key_name)

        nonce = os.urandom(12)
        aesgcm = AESGCM(key)
        ciphertext = aesgcm.encrypt(nonce, plaintext, None)

        return {
            'version': version,
            'nonce': nonce.hex(),
            'ciphertext': ciphertext.hex()
        }

    def decrypt_with_rotation(self, key_name: str, encrypted_data: dict) -> bytes:
        """使用正确的密钥版本解密"""
        version = encrypted_data['version']
        key = self.get_key_by_version(key_name, version)

        aesgcm = AESGCM(key)
        return aesgcm.decrypt(
            bytes.fromhex(encrypted_data['nonce']),
            bytes.fromhex(encrypted_data['ciphertext']),
            None
        )

    def _get_metadata(self, key_name: str) -> dict:
        return self.storage.get(f"key_metadata:{key_name}") or {
            'current_version': 0,
            'versions': {}
        }

    def _save_metadata(self, key_name: str, metadata: dict):
        self.storage.set(f"key_metadata:{key_name}", metadata)

轮换后重新加密

def re_encrypt_all_data(key_manager, data_store, key_name: str):
    """用新的当前密钥重新加密所有数据"""
    current_key, current_version = key_manager.get_current_key(key_name)

    for record_id in data_store.list_all():
        record = data_store.get(record_id)

        # 如果已经使用当前密钥版本则跳过
        if record['key_version'] == current_version:
            continue

        # 用旧密钥解密
        old_key = key_manager.get_key_by_version(key_name, record['key_version'])
        plaintext = decrypt(old_key, record['ciphertext'], record['nonce'])

        # 用新密钥重新加密
        new_ciphertext, new_nonce = encrypt(current_key, plaintext)

        # 更新记录
        data_store.update(record_id, {
            'ciphertext': new_ciphertext,
            'nonce': new_nonce,
            'key_version': current_version
        })

7. 密钥销毁

安全密钥删除

import ctypes
import os

def secure_zero(data: bytearray):
    """安全地将内存清零(Python 中尽力而为)"""
    # 获取 bytearray 缓冲区的地址
    addr = id(data) + 32  # Python 对象头偏移
    size = len(data)

    # 用零覆盖
    ctypes.memset(addr, 0, size)

def secure_delete_key(key: bytes) -> None:
    """尽力安全删除密钥材料"""
    # 转换为可变 bytearray
    key_array = bytearray(key)

    # 多次覆盖
    for _ in range(3):
        secure_zero(key_array)
        for i in range(len(key_array)):
            key_array[i] = os.urandom(1)[0]
        secure_zero(key_array)

    # 清除引用
    del key_array

# 注意:Python 的内存管理使真正的安全删除很困难
# 对于关键应用,使用具有更好内存控制的语言
# 或将密钥保存在 HSM 中,它们永远不会离开安全硬件

密钥销毁策略

from datetime import datetime, timedelta

class KeyDestructionPolicy:
    """管理密钥生命周期和销毁"""

    def __init__(self, key_manager, archive_storage):
        self.key_manager = key_manager
        self.archive = archive_storage

    def schedule_destruction(self, key_name: str, version: int,
                            days_until_destruction: int = 90):
        """安排密钥版本进行销毁"""
        destruction_date = datetime.now() + timedelta(days=days_until_destruction)

        self.key_manager.update_version_status(
            key_name, version,
            status='pending-destruction',
            destruction_date=destruction_date.isoformat()
        )

    def execute_pending_destructions(self):
        """销毁已过销毁日期的密钥"""
        pending = self.key_manager.get_pending_destructions()

        for key_info in pending:
            if datetime.now() >= datetime.fromisoformat(key_info['destruction_date']):
                # 归档元数据(不是密钥本身!)
                self.archive.store({
                    'key_name': key_info['key_name'],
                    'version': key_info['version'],
                    'destroyed_at': datetime.now().isoformat(),
                    'created_at': key_info['created_at']
                })

                # 销毁密钥
                self.key_manager.destroy_key_version(
                    key_info['key_name'],
                    key_info['version']
                )

8. 密钥安全最佳实践

访问控制

from enum import Enum
from functools import wraps

class KeyPermission(Enum):
    ENCRYPT = 'encrypt'
    DECRYPT = 'decrypt'
    ROTATE = 'rotate'
    DESTROY = 'destroy'
    ADMIN = 'admin'

class KeyAccessControl:
    """控制谁可以对密钥做什么"""

    def __init__(self):
        self.permissions = {}  # key_name -> {user_id -> set(permissions)}

    def grant(self, key_name: str, user_id: str, permission: KeyPermission):
        if key_name not in self.permissions:
            self.permissions[key_name] = {}
        if user_id not in self.permissions[key_name]:
            self.permissions[key_name][user_id] = set()
        self.permissions[key_name][user_id].add(permission)

    def check(self, key_name: str, user_id: str, permission: KeyPermission) -> bool:
        user_perms = self.permissions.get(key_name, {}).get(user_id, set())
        return permission in user_perms or KeyPermission.ADMIN in user_perms

    def require(self, key_name: str, permission: KeyPermission):
        """装饰器,要求函数有特定权限"""
        def decorator(func):
            @wraps(func)
            def wrapper(self, user_id: str, *args, **kwargs):
                if not self.acl.check(key_name, user_id, permission):
                    raise PermissionError(
                        f"用户 {user_id} 缺少对 {key_name}{permission.value} 权限"
                    )
                return func(self, user_id, *args, **kwargs)
            return wrapper
        return decorator

审计日志

from datetime import datetime
import json

class KeyAuditLog:
    """记录所有密钥操作以供审计"""

    def __init__(self, storage):
        self.storage = storage

    def log(self, event_type: str, key_name: str, user_id: str,
            details: dict = None, success: bool = True):
        entry = {
            'timestamp': datetime.now().isoformat(),
            'event_type': event_type,
            'key_name': key_name,
            'user_id': user_id,
            'success': success,
            'details': details or {}
        }

        self.storage.append('key_audit_log', entry)

    def log_key_access(self, key_name: str, user_id: str, operation: str):
        self.log('key_access', key_name, user_id, {'operation': operation})

    def log_key_rotation(self, key_name: str, user_id: str,
                         old_version: int, new_version: int):
        self.log('key_rotation', key_name, user_id, {
            'old_version': old_version,
            'new_version': new_version
        })

    def log_key_destruction(self, key_name: str, user_id: str, version: int):
        self.log('key_destruction', key_name, user_id, {'version': version})

9. 常见错误

错误 1:密钥在源代码控制中

# 错误:代码中的密钥
SECRET_KEY = "aGVsbG8gd29ybGQK"

# 错误:配置文件中的密钥会被提交
# config.yaml:
# encryption_key: "aGVsbG8gd29ybGQK"

# 正确:从环境或密钥管理器获取密钥
SECRET_KEY = os.environ.get('SECRET_KEY')

# 检查你的 .gitignore!
# .env 文件永远不应该被提交

错误 2:所有事情使用同一个密钥

# 错误:一个密钥用于所有目的
MASTER_KEY = os.environ['KEY']
encrypt_data(MASTER_KEY, data)
sign_token(MASTER_KEY, token)
encrypt_session(MASTER_KEY, session)

# 正确:为每个目的派生单独的密钥
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes

def derive_purpose_key(master_key: bytes, purpose: str) -> bytes:
    hkdf = HKDF(
        algorithm=hashes.SHA256(),
        length=32,
        salt=None,
        info=purpose.encode()
    )
    return hkdf.derive(master_key)

encryption_key = derive_purpose_key(master_key, "data-encryption")
signing_key = derive_purpose_key(master_key, "token-signing")
session_key = derive_purpose_key(master_key, "session-encryption")

错误 3:泄露后不轮换

# 当检测到泄露时:
def incident_response(key_manager, key_name: str):
    # 1. 立即轮换到新密钥
    new_version = key_manager.rotate_key(key_name)

    # 2. 将旧版本标记为已泄露(不仅仅是仅解密)
    key_manager.mark_compromised(key_name, new_version - 1)

    # 3. 开始重新加密所有数据(优先处理敏感数据)
    schedule_reencryption(key_name, priority='critical')

    # 4. 审计谁有权访问已泄露的密钥
    generate_access_report(key_name, new_version - 1)

    # 5. 通知安全团队
    alert_security_team(key_name, 'key_compromise')

10. 本章小结

三点要记住:

  1. 密钥需要完整的生命周期。 生成、存储、分发、轮换和销毁——每个阶段都需要仔细的安全考虑。尽可能使用 HSM/KMS。

  2. 使用信封加密。 用 KMS 中的主密钥保护数据密钥。这限制了暴露、启用密钥轮换,并将主密钥保留在硬件中。

  3. 定期轮换密钥,在任何疑似泄露后立即轮换。 将密钥版本与加密数据一起包含,这样你可以用旧密钥解密同时用新密钥加密。

11. 下一步

我们已经涵盖了密码学构建块和密钥管理。现在是时候把所有东西放在一起了。

在最后一篇文章中:构建安全系统——应用我们学到的一切来设计端到端加密系统、安全 API 和纵深防御。