Luke a Pro

Luke Sun

Developer & Marketer

🇺🇦
EN||

金鑰管理:密碼學最難的部分

| , 20 minutes reading.

1. 為什麼要關心這個問題?

你已經實現了 AES-256-GCM 加密。你的資料受到最強密碼之一的保護。但金鑰在哪裡?

# 你的安全加密
key = b"my-super-secret-key-12345678901"  # 🔥 硬編碼!
ciphertext = encrypt(key, data)

那個硬編碼的金鑰使你的加密變成了表演。任何能存取你程式碼的人都能存取你所有的加密資料。

金鑰管理是大多數密碼系統失敗的地方。讓我們來解決這個問題。

2. 金鑰生命週期

┌─────────────────────────────────────────────────────────────────┐
│                        金鑰生命週期                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  產生 → 分發 → 儲存 → 使用 → 輪換 → 銷毀
│   │      │      │      │      │      │
│   ▼      ▼      ▼      ▼      ▼      ▼
│  安全   安全   安全   最小化  定期   安全
│  隨機   通道   位置   暴露   計劃   刪除
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

每個階段都有自己的風險和要求。

3. 金鑰產生

正確的方式

import os
import secrets

# 正確:使用密碼學安全的隨機性
key_256 = secrets.token_bytes(32)  # 256 位元
key_128 = secrets.token_bytes(16)  # 128 位元

# 或使用 os.urandom(等效)
key = os.urandom(32)

# 對於特定演算法,使用它們的金鑰產生
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
key = AESGCM.generate_key(bit_length=256)

from cryptography.hazmat.primitives.asymmetric import ed25519
private_key = ed25519.Ed25519PrivateKey.generate()

錯誤的方式

import random
import hashlib

# 錯誤:使用 random 模組(不是密碼學安全的)
key = bytes([random.randint(0, 255) for _ in range(32)])

# 錯誤:從可預測的來源衍生
key = hashlib.sha256(b"password").digest()

# 錯誤:使用時間戳記
key = hashlib.sha256(str(time.time()).encode()).digest()

# 錯誤:使用使用者名稱或其他可預測資料
key = hashlib.sha256(username.encode()).digest()

從密碼衍生金鑰

from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
import os

def derive_key_from_password(password: str, salt: bytes = None) -> tuple[bytes, bytes]:
    """從密碼衍生加密金鑰"""
    if salt is None:
        salt = os.urandom(16)

    # scrypt 是記憶體困難的(首選)
    kdf = Scrypt(
        salt=salt,
        length=32,
        n=2**17,
        r=8,
        p=1
    )
    key = kdf.derive(password.encode())

    return key, salt

# 使用
password = "使用者的密碼短語"
key, salt = derive_key_from_password(password)
# 將鹽值與加密資料一起儲存
# 永遠不要儲存密碼或衍生的金鑰

4. 金鑰儲存

儲存選項(從最差到最好)

┌─────────────────────────────────────────────────────────────────┐
│ 選項                │ 安全性 │ 使用場景                         │
├─────────────────────┼────────┼────────────────────────────────┤
│ 原始碼中硬編碼      │ ✗✗✗    │ 永遠不要                        │
│ 設定檔              │ ✗✗     │ 僅開發環境                      │
│ 環境變數            │ ✗      │ 簡單部署                        │
│ 加密檔案            │ ○      │ 當 HSM 不可用時                 │
│ 金鑰管理器          │ ✓      │ 雲端部署                        │
│ HSM/KMS             │ ✓✓     │ 生產環境、合規                  │
│ 硬體金鑰            │ ✓✓✓    │ 最高安全性                      │
└─────────────────────────────────────────────────────────────────┘

環境變數

import os

# 比硬編碼略有改進
def get_encryption_key():
    key_hex = os.environ.get('ENCRYPTION_KEY')
    if not key_hex:
        raise ValueError("ENCRYPTION_KEY 環境變數未設定")
    return bytes.fromhex(key_hex)

# 在環境中設定(不是在程式碼中!)
# export ENCRYPTION_KEY=$(python -c "import secrets; print(secrets.token_hex(32))")

加密金鑰檔案

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
import os
import json

class KeyFile:
    """在加密檔案中儲存加密金鑰"""

    def __init__(self, filepath: str, master_password: str):
        self.filepath = filepath
        self.master_key = self._derive_master_key(master_password)

    def _derive_master_key(self, password: str) -> bytes:
        # 在生產中,單獨儲存鹽值或在檔案頭中
        salt = b"static-salt-for-demo"  # 生產中使用隨機鹽值!
        kdf = Scrypt(salt=salt, length=32, n=2**17, r=8, p=1)
        return kdf.derive(password.encode())

    def store_key(self, key_name: str, key_value: bytes):
        """在加密檔案中儲存金鑰"""
        keys = self._load_all()
        keys[key_name] = key_value.hex()
        self._save_all(keys)

    def get_key(self, key_name: str) -> bytes:
        """從加密檔案中擷取金鑰"""
        keys = self._load_all()
        if key_name not in keys:
            raise KeyError(f"金鑰 '{key_name}' 未找到")
        return bytes.fromhex(keys[key_name])

    def _load_all(self) -> dict:
        if not os.path.exists(self.filepath):
            return {}

        with open(self.filepath, 'rb') as f:
            encrypted = f.read()

        fernet = Fernet(self._to_fernet_key(self.master_key))
        decrypted = fernet.decrypt(encrypted)
        return json.loads(decrypted)

    def _save_all(self, keys: dict):
        data = json.dumps(keys).encode()
        fernet = Fernet(self._to_fernet_key(self.master_key))
        encrypted = fernet.encrypt(data)

        with open(self.filepath, 'wb') as f:
            f.write(encrypted)

    def _to_fernet_key(self, key: bytes) -> bytes:
        import base64
        return base64.urlsafe_b64encode(key)

# 使用
keyfile = KeyFile("/secure/keys.enc", os.environ['KEY_FILE_PASSWORD'])
keyfile.store_key("database_key", os.urandom(32))
db_key = keyfile.get_key("database_key")

雲端 KMS 整合

# AWS KMS
import boto3

class AWSKeyManager:
    def __init__(self, key_id: str):
        self.kms = boto3.client('kms')
        self.key_id = key_id

    def encrypt(self, plaintext: bytes) -> bytes:
        response = self.kms.encrypt(
            KeyId=self.key_id,
            Plaintext=plaintext
        )
        return response['CiphertextBlob']

    def decrypt(self, ciphertext: bytes) -> bytes:
        response = self.kms.decrypt(
            KeyId=self.key_id,
            CiphertextBlob=ciphertext
        )
        return response['Plaintext']

    def generate_data_key(self) -> tuple[bytes, bytes]:
        """產生由 KMS 加密的資料金鑰"""
        response = self.kms.generate_data_key(
            KeyId=self.key_id,
            KeySpec='AES_256'
        )
        # 回傳明文金鑰供立即使用
        # 儲存加密金鑰供以後擷取
        return response['Plaintext'], response['CiphertextBlob']

# 使用
km = AWSKeyManager('alias/my-master-key')
plaintext_key, encrypted_key = km.generate_data_key()
# 使用 plaintext_key 進行加密
# 將 encrypted_key 與密文一起儲存
# 從記憶體中丟棄 plaintext_key
# Google Cloud KMS
from google.cloud import kms

class GCPKeyManager:
    def __init__(self, project_id: str, location: str, keyring: str, key_name: str):
        self.client = kms.KeyManagementServiceClient()
        self.key_path = self.client.crypto_key_path(
            project_id, location, keyring, key_name
        )

    def encrypt(self, plaintext: bytes) -> bytes:
        response = self.client.encrypt(
            request={'name': self.key_path, 'plaintext': plaintext}
        )
        return response.ciphertext

    def decrypt(self, ciphertext: bytes) -> bytes:
        response = self.client.decrypt(
            request={'name': self.key_path, 'ciphertext': ciphertext}
        )
        return response.plaintext

5. 金鑰層次結構

信封加密

主金鑰(在 HSM/KMS 中)

    ├── 解密 → 資料加密金鑰 1(加密的)
    │                 └── 加密 → 資料集 A

    ├── 解密 → 資料加密金鑰 2(加密的)
    │                 └── 加密 → 資料集 B

    └── 解密 → 資料加密金鑰 3(加密的)
                      └── 加密 → 資料集 C

好處:
- 主金鑰永遠不離開 HSM
- 資料金鑰可以獨立輪換
- 資料金鑰與加密資料一起儲存(加密形式)
- 洩露的資料金鑰只影響一個資料集

實現

import os
from cryptography.hazmat.primitives.ciphers.aead import AESGCM

class EnvelopeEncryption:
    """使用 KMS 保護的主金鑰進行信封加密"""

    def __init__(self, kms_client):
        self.kms = kms_client

    def encrypt(self, plaintext: bytes) -> dict:
        """使用信封加密加密資料"""
        # 為此加密產生新的資料金鑰
        dek_plaintext, dek_encrypted = self.kms.generate_data_key()

        # 用 DEK 加密資料
        nonce = os.urandom(12)
        aesgcm = AESGCM(dek_plaintext)
        ciphertext = aesgcm.encrypt(nonce, plaintext, None)

        # 從記憶體中清除 DEK(Python 限制,但盡力而為)
        del dek_plaintext

        return {
            'encrypted_dek': dek_encrypted,
            'nonce': nonce,
            'ciphertext': ciphertext
        }

    def decrypt(self, encrypted_data: dict) -> bytes:
        """使用信封加密解密資料"""
        # 使用 KMS 解密 DEK
        dek = self.kms.decrypt(encrypted_data['encrypted_dek'])

        # 用 DEK 解密資料
        aesgcm = AESGCM(dek)
        plaintext = aesgcm.decrypt(
            encrypted_data['nonce'],
            encrypted_data['ciphertext'],
            None
        )

        # 從記憶體中清除 DEK
        del dek

        return plaintext

6. 金鑰輪換

為什麼要輪換金鑰?

金鑰輪換的原因:
1. 限制潛在洩露的暴露
2. 合規要求(PCI-DSS 等)
3. 人員變動(有金鑰存取權限的人離開)
4. 演算法更新(SHA-1 → SHA-256)
5. 金鑰材料耗盡(對於好的密碼理論上的問題)

輪換策略:
- 基於時間:每 N 天/月輪換
- 基於事件:在安全事件時輪換
- 基於使用:在 N 次加密後輪換

輪換實現

from datetime import datetime, timedelta
import json

class RotatingKeyManager:
    """管理帶版本控制的金鑰輪換"""

    def __init__(self, storage, kms):
        self.storage = storage
        self.kms = kms

    def get_current_key(self, key_name: str) -> tuple[bytes, int]:
        """獲取當前活動金鑰及其版本"""
        metadata = self._get_metadata(key_name)
        current_version = metadata['current_version']
        encrypted_key = metadata['versions'][str(current_version)]['key']
        key = self.kms.decrypt(encrypted_key)
        return key, current_version

    def get_key_by_version(self, key_name: str, version: int) -> bytes:
        """獲取特定金鑰版本用於解密"""
        metadata = self._get_metadata(key_name)
        version_data = metadata['versions'].get(str(version))
        if not version_data:
            raise KeyError(f"金鑰版本 {version} 未找到")
        return self.kms.decrypt(version_data['key'])

    def rotate_key(self, key_name: str) -> int:
        """建立新金鑰版本並設為當前"""
        metadata = self._get_metadata(key_name)

        # 產生新金鑰
        new_key = os.urandom(32)
        encrypted_key = self.kms.encrypt(new_key)

        # 建立新版本
        new_version = metadata['current_version'] + 1
        metadata['versions'][str(new_version)] = {
            'key': encrypted_key,
            'created_at': datetime.now().isoformat(),
            'status': 'active'
        }

        # 更新當前版本
        metadata['current_version'] = new_version
        metadata['versions'][str(new_version - 1)]['status'] = 'decrypt-only'

        self._save_metadata(key_name, metadata)

        return new_version

    def encrypt_with_rotation(self, key_name: str, plaintext: bytes) -> dict:
        """使用當前金鑰加密,包含版本供以後解密"""
        key, version = self.get_current_key(key_name)

        nonce = os.urandom(12)
        aesgcm = AESGCM(key)
        ciphertext = aesgcm.encrypt(nonce, plaintext, None)

        return {
            'version': version,
            'nonce': nonce.hex(),
            'ciphertext': ciphertext.hex()
        }

    def decrypt_with_rotation(self, key_name: str, encrypted_data: dict) -> bytes:
        """使用正確的金鑰版本解密"""
        version = encrypted_data['version']
        key = self.get_key_by_version(key_name, version)

        aesgcm = AESGCM(key)
        return aesgcm.decrypt(
            bytes.fromhex(encrypted_data['nonce']),
            bytes.fromhex(encrypted_data['ciphertext']),
            None
        )

    def _get_metadata(self, key_name: str) -> dict:
        return self.storage.get(f"key_metadata:{key_name}") or {
            'current_version': 0,
            'versions': {}
        }

    def _save_metadata(self, key_name: str, metadata: dict):
        self.storage.set(f"key_metadata:{key_name}", metadata)

輪換後重新加密

def re_encrypt_all_data(key_manager, data_store, key_name: str):
    """用新的當前金鑰重新加密所有資料"""
    current_key, current_version = key_manager.get_current_key(key_name)

    for record_id in data_store.list_all():
        record = data_store.get(record_id)

        # 如果已經使用當前金鑰版本則跳過
        if record['key_version'] == current_version:
            continue

        # 用舊金鑰解密
        old_key = key_manager.get_key_by_version(key_name, record['key_version'])
        plaintext = decrypt(old_key, record['ciphertext'], record['nonce'])

        # 用新金鑰重新加密
        new_ciphertext, new_nonce = encrypt(current_key, plaintext)

        # 更新記錄
        data_store.update(record_id, {
            'ciphertext': new_ciphertext,
            'nonce': new_nonce,
            'key_version': current_version
        })

7. 金鑰銷毀

安全金鑰刪除

import ctypes
import os

def secure_zero(data: bytearray):
    """安全地將記憶體清零(Python 中盡力而為)"""
    # 獲取 bytearray 緩衝區的位址
    addr = id(data) + 32  # Python 物件頭偏移
    size = len(data)

    # 用零覆蓋
    ctypes.memset(addr, 0, size)

def secure_delete_key(key: bytes) -> None:
    """盡力安全刪除金鑰材料"""
    # 轉換為可變 bytearray
    key_array = bytearray(key)

    # 多次覆蓋
    for _ in range(3):
        secure_zero(key_array)
        for i in range(len(key_array)):
            key_array[i] = os.urandom(1)[0]
        secure_zero(key_array)

    # 清除參照
    del key_array

# 注意:Python 的記憶體管理使真正的安全刪除很困難
# 對於關鍵應用,使用具有更好記憶體控制的語言
# 或將金鑰保存在 HSM 中,它們永遠不會離開安全硬體

金鑰銷毀策略

from datetime import datetime, timedelta

class KeyDestructionPolicy:
    """管理金鑰生命週期和銷毀"""

    def __init__(self, key_manager, archive_storage):
        self.key_manager = key_manager
        self.archive = archive_storage

    def schedule_destruction(self, key_name: str, version: int,
                            days_until_destruction: int = 90):
        """安排金鑰版本進行銷毀"""
        destruction_date = datetime.now() + timedelta(days=days_until_destruction)

        self.key_manager.update_version_status(
            key_name, version,
            status='pending-destruction',
            destruction_date=destruction_date.isoformat()
        )

    def execute_pending_destructions(self):
        """銷毀已過銷毀日期的金鑰"""
        pending = self.key_manager.get_pending_destructions()

        for key_info in pending:
            if datetime.now() >= datetime.fromisoformat(key_info['destruction_date']):
                # 歸檔中繼資料(不是金鑰本身!)
                self.archive.store({
                    'key_name': key_info['key_name'],
                    'version': key_info['version'],
                    'destroyed_at': datetime.now().isoformat(),
                    'created_at': key_info['created_at']
                })

                # 銷毀金鑰
                self.key_manager.destroy_key_version(
                    key_info['key_name'],
                    key_info['version']
                )

8. 金鑰安全最佳實踐

存取控制

from enum import Enum
from functools import wraps

class KeyPermission(Enum):
    ENCRYPT = 'encrypt'
    DECRYPT = 'decrypt'
    ROTATE = 'rotate'
    DESTROY = 'destroy'
    ADMIN = 'admin'

class KeyAccessControl:
    """控制誰可以對金鑰做什麼"""

    def __init__(self):
        self.permissions = {}  # key_name -> {user_id -> set(permissions)}

    def grant(self, key_name: str, user_id: str, permission: KeyPermission):
        if key_name not in self.permissions:
            self.permissions[key_name] = {}
        if user_id not in self.permissions[key_name]:
            self.permissions[key_name][user_id] = set()
        self.permissions[key_name][user_id].add(permission)

    def check(self, key_name: str, user_id: str, permission: KeyPermission) -> bool:
        user_perms = self.permissions.get(key_name, {}).get(user_id, set())
        return permission in user_perms or KeyPermission.ADMIN in user_perms

    def require(self, key_name: str, permission: KeyPermission):
        """裝飾器,要求函數有特定權限"""
        def decorator(func):
            @wraps(func)
            def wrapper(self, user_id: str, *args, **kwargs):
                if not self.acl.check(key_name, user_id, permission):
                    raise PermissionError(
                        f"使用者 {user_id} 缺少對 {key_name}{permission.value} 權限"
                    )
                return func(self, user_id, *args, **kwargs)
            return wrapper
        return decorator

稽核日誌

from datetime import datetime
import json

class KeyAuditLog:
    """記錄所有金鑰操作以供稽核"""

    def __init__(self, storage):
        self.storage = storage

    def log(self, event_type: str, key_name: str, user_id: str,
            details: dict = None, success: bool = True):
        entry = {
            'timestamp': datetime.now().isoformat(),
            'event_type': event_type,
            'key_name': key_name,
            'user_id': user_id,
            'success': success,
            'details': details or {}
        }

        self.storage.append('key_audit_log', entry)

    def log_key_access(self, key_name: str, user_id: str, operation: str):
        self.log('key_access', key_name, user_id, {'operation': operation})

    def log_key_rotation(self, key_name: str, user_id: str,
                         old_version: int, new_version: int):
        self.log('key_rotation', key_name, user_id, {
            'old_version': old_version,
            'new_version': new_version
        })

    def log_key_destruction(self, key_name: str, user_id: str, version: int):
        self.log('key_destruction', key_name, user_id, {'version': version})

9. 常見錯誤

錯誤 1:金鑰在原始碼控制中

# 錯誤:程式碼中的金鑰
SECRET_KEY = "aGVsbG8gd29ybGQK"

# 錯誤:設定檔中的金鑰會被提交
# config.yaml:
# encryption_key: "aGVsbG8gd29ybGQK"

# 正確:從環境或金鑰管理器獲取金鑰
SECRET_KEY = os.environ.get('SECRET_KEY')

# 檢查你的 .gitignore!
# .env 檔案永遠不應該被提交

錯誤 2:所有事情使用同一個金鑰

# 錯誤:一個金鑰用於所有目的
MASTER_KEY = os.environ['KEY']
encrypt_data(MASTER_KEY, data)
sign_token(MASTER_KEY, token)
encrypt_session(MASTER_KEY, session)

# 正確:為每個目的衍生單獨的金鑰
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes

def derive_purpose_key(master_key: bytes, purpose: str) -> bytes:
    hkdf = HKDF(
        algorithm=hashes.SHA256(),
        length=32,
        salt=None,
        info=purpose.encode()
    )
    return hkdf.derive(master_key)

encryption_key = derive_purpose_key(master_key, "data-encryption")
signing_key = derive_purpose_key(master_key, "token-signing")
session_key = derive_purpose_key(master_key, "session-encryption")

錯誤 3:洩露後不輪換

# 當偵測到洩露時:
def incident_response(key_manager, key_name: str):
    # 1. 立即輪換到新金鑰
    new_version = key_manager.rotate_key(key_name)

    # 2. 將舊版本標記為已洩露(不僅僅是僅解密)
    key_manager.mark_compromised(key_name, new_version - 1)

    # 3. 開始重新加密所有資料(優先處理敏感資料)
    schedule_reencryption(key_name, priority='critical')

    # 4. 稽核誰有權存取已洩露的金鑰
    generate_access_report(key_name, new_version - 1)

    # 5. 通知安全團隊
    alert_security_team(key_name, 'key_compromise')

10. 本章小結

三點要記住:

  1. 金鑰需要完整的生命週期。 產生、儲存、分發、輪換和銷毀——每個階段都需要仔細的安全考慮。盡可能使用 HSM/KMS。

  2. 使用信封加密。 用 KMS 中的主金鑰保護資料金鑰。這限制了暴露、啟用金鑰輪換,並將主金鑰保留在硬體中。

  3. 定期輪換金鑰,在任何疑似洩露後立即輪換。 將金鑰版本與加密資料一起包含,這樣你可以用舊金鑰解密同時用新金鑰加密。

11. 下一步

我們已經涵蓋了密碼學建構區塊和金鑰管理。現在是時候把所有東西放在一起了。

在最後一篇文章中:建構安全系統——應用我們學到的一切來設計端到端加密系統、安全 API 和縱深防禦。