Luke a Pro

Luke Sun

Developer & Marketer

🇺🇦
EN||

建構安全系統:綜合應用

| , 20 minutes reading.

1. 為什麼要關心這個問題?

你已經學習了建構區塊:

  • 對稱加密(AES-GCM、ChaCha20)
  • 非對稱加密(RSA、ECC)
  • 雜湊和 MAC(SHA-256、HMAC)
  • 數位簽章(ECDSA、EdDSA)
  • TLS、憑證、金鑰管理

但知道這些部件並不意味著你能建構安全系統。這最後一篇文章展示如何正確地組合它們。

2. 安全系統設計原則

縱深防禦

┌─────────────────────────────────────────────────────────────────┐
│                        縱深防禦                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   網路層       → 防火牆、TLS、網路隔離                           │
│        ↓                                                        │
│   傳輸層       → TLS 1.3、憑證固定                               │
│        ↓                                                        │
│   應用層       → 輸入驗證、輸出編碼                              │
│        ↓                                                        │
│   資料層       → 靜態加密、欄位級加密                            │
│        ↓                                                        │
│   存取層       → 認證、授權、稽核                                │
│                                                                 │
│   每一層防禦不同的威脅                                           │
│   一層被突破不會危及整個系統                                     │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

最小權限原則

# 錯誤:一個金鑰做所有事情
master_key = load_key()
encrypt_user_data(master_key, data)
encrypt_logs(master_key, logs)
sign_tokens(master_key, token)

# 正確:具有最小權限的獨立金鑰
class KeyRing:
    def __init__(self, kms):
        self.kms = kms

    def get_user_data_key(self, user_id: str) -> bytes:
        """每個使用者的加密金鑰"""
        return self.kms.derive_key(f"user-data:{user_id}")

    def get_log_encryption_key(self) -> bytes:
        """用於日誌加密的獨立金鑰"""
        return self.kms.derive_key("log-encryption")

    def get_token_signing_key(self) -> bytes:
        """用於權杖簽章的獨立金鑰"""
        return self.kms.derive_key("token-signing")

安全失敗

# 錯誤:開放失敗
def check_access(token):
    try:
        claims = verify_token(token)
        return claims.get('authorized', True)  # 預設授權!
    except Exception:
        return True  # 錯誤時允許!

# 正確:安全失敗
def check_access(token):
    try:
        claims = verify_token(token)
        if not claims.get('authorized'):
            raise AuthorizationError("未授權")
        return claims
    except Exception as e:
        log.warning(f"存取檢查失敗: {e}")
        raise AuthorizationError("存取被拒絕")  # 錯誤時拒絕

3. 設計端到端加密系統

架構概述

┌──────────────────────────────────────────────────────────────────────────┐
│                        端到端加密訊息系統                                  │
├──────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│   客戶端 A                   伺服器                    客戶端 B           │
│   ────────                   ──────                    ────────          │
│                                                                          │
│   [私鑰 A]               [公鑰儲存]                  [私鑰 B]             │
│   [公鑰 A]  ─────────>   [公鑰 A] <─────────         [公鑰 B]             │
│                          [公鑰 B]                                        │
│                                                                          │
│   Encrypt(PubKey_B,     僅中繼加密              Decrypt(PrivKey_B,       │
│           message)  ───> 訊息 ───────────────>  ciphertext)              │
│                                                                          │
│   伺服器無法讀取訊息(沒有私鑰存取權限)                                   │
│                                                                          │
└──────────────────────────────────────────────────────────────────────────┘

實現

from cryptography.hazmat.primitives.asymmetric import x25519
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes, serialization
import os
import json

class E2EClient:
    """端到端加密訊息客戶端"""

    def __init__(self, user_id: str):
        self.user_id = user_id
        self.identity_key = x25519.X25519PrivateKey.generate()
        self.public_key = self.identity_key.public_key()

    def get_public_key_bytes(self) -> bytes:
        """匯出公鑰用於伺服器儲存"""
        return self.public_key.public_bytes(
            encoding=serialization.Encoding.Raw,
            format=serialization.PublicFormat.Raw
        )

    def encrypt_message(self, recipient_public_key: bytes, message: str) -> dict:
        """為收件人加密訊息"""
        # 載入收件人的公鑰
        recipient_key = x25519.X25519PublicKey.from_public_bytes(recipient_public_key)

        # 產生臨時金鑰對以實現前向保密
        ephemeral_private = x25519.X25519PrivateKey.generate()
        ephemeral_public = ephemeral_private.public_key()

        # 衍生共享金鑰
        shared_secret = ephemeral_private.exchange(recipient_key)

        # 使用 HKDF 衍生加密金鑰
        encryption_key = HKDF(
            algorithm=hashes.SHA256(),
            length=32,
            salt=None,
            info=b"e2e-message-encryption"
        ).derive(shared_secret)

        # 加密訊息
        nonce = os.urandom(12)
        aesgcm = AESGCM(encryption_key)
        ciphertext = aesgcm.encrypt(nonce, message.encode(), None)

        return {
            'ephemeral_public': ephemeral_public.public_bytes(
                encoding=serialization.Encoding.Raw,
                format=serialization.PublicFormat.Raw
            ).hex(),
            'nonce': nonce.hex(),
            'ciphertext': ciphertext.hex(),
            'sender': self.user_id
        }

    def decrypt_message(self, encrypted_message: dict) -> str:
        """解密發送給我們的訊息"""
        # 載入發送者的臨時公鑰
        ephemeral_public = x25519.X25519PublicKey.from_public_bytes(
            bytes.fromhex(encrypted_message['ephemeral_public'])
        )

        # 衍生共享金鑰
        shared_secret = self.identity_key.exchange(ephemeral_public)

        # 衍生解密金鑰
        decryption_key = HKDF(
            algorithm=hashes.SHA256(),
            length=32,
            salt=None,
            info=b"e2e-message-encryption"
        ).derive(shared_secret)

        # 解密訊息
        aesgcm = AESGCM(decryption_key)
        plaintext = aesgcm.decrypt(
            bytes.fromhex(encrypted_message['nonce']),
            bytes.fromhex(encrypted_message['ciphertext']),
            None
        )

        return plaintext.decode()


class E2EServer:
    """中繼加密訊息的伺服器(無法讀取訊息)"""

    def __init__(self):
        self.public_keys = {}  # user_id -> public_key_bytes
        self.messages = {}     # user_id -> [encrypted_messages]

    def register_user(self, user_id: str, public_key: bytes):
        """儲存使用者的公鑰"""
        self.public_keys[user_id] = public_key
        self.messages[user_id] = []

    def get_public_key(self, user_id: str) -> bytes:
        """獲取使用者的公鑰用於加密"""
        return self.public_keys.get(user_id)

    def send_message(self, recipient_id: str, encrypted_message: dict):
        """為收件人儲存加密訊息"""
        # 伺服器只能看到:發送者、收件人、時間戳記、大小
        # 伺服器無法讀取訊息內容
        self.messages[recipient_id].append(encrypted_message)

    def get_messages(self, user_id: str) -> list:
        """獲取使用者的待處理訊息"""
        messages = self.messages.get(user_id, [])
        self.messages[user_id] = []  # 擷取後清除
        return messages


# 使用範例
def demo_e2e():
    # 設定
    server = E2EServer()
    alice = E2EClient("alice")
    bob = E2EClient("bob")

    # 註冊公鑰
    server.register_user("alice", alice.get_public_key_bytes())
    server.register_user("bob", bob.get_public_key_bytes())

    # Alice 發送訊息給 Bob
    bob_pubkey = server.get_public_key("bob")
    encrypted = alice.encrypt_message(bob_pubkey, "你好 Bob!這是秘密。")
    server.send_message("bob", encrypted)

    # Bob 接收並解密
    messages = server.get_messages("bob")
    for msg in messages:
        plaintext = bob.decrypt_message(msg)
        print(f"Bob 收到來自 {msg['sender']} 的訊息: {plaintext}")

4. 安全 API 設計

認證層

import hmac
import hashlib
import time
from functools import wraps

class SecureAPI:
    """具有多種認證方法的安全 API"""

    def __init__(self, secret_key: bytes):
        self.secret_key = secret_key
        self.token_expiry = 3600  # 1 小時

    def create_access_token(self, user_id: str, permissions: list) -> str:
        """建立簽章的存取權杖"""
        import json
        import base64

        payload = {
            'user_id': user_id,
            'permissions': permissions,
            'issued_at': int(time.time()),
            'expires_at': int(time.time()) + self.token_expiry
        }

        payload_json = json.dumps(payload, sort_keys=True)
        payload_b64 = base64.urlsafe_b64encode(payload_json.encode()).decode()

        signature = hmac.new(
            self.secret_key,
            payload_b64.encode(),
            hashlib.sha256
        ).hexdigest()

        return f"{payload_b64}.{signature}"

    def verify_token(self, token: str) -> dict:
        """驗證並解碼存取權杖"""
        import json
        import base64

        try:
            payload_b64, signature = token.rsplit('.', 1)

            # 驗證簽章
            expected_sig = hmac.new(
                self.secret_key,
                payload_b64.encode(),
                hashlib.sha256
            ).hexdigest()

            if not hmac.compare_digest(expected_sig, signature):
                raise ValueError("無效簽章")

            # 解碼負載
            payload = json.loads(base64.urlsafe_b64decode(payload_b64))

            # 檢查過期
            if time.time() > payload['expires_at']:
                raise ValueError("權杖已過期")

            return payload

        except Exception as e:
            raise ValueError(f"權杖驗證失敗: {e}")

    def require_permission(self, permission: str):
        """裝飾器,要求特定權限"""
        def decorator(func):
            @wraps(func)
            def wrapper(request, *args, **kwargs):
                token = request.headers.get('Authorization', '').replace('Bearer ', '')
                if not token:
                    raise PermissionError("未提供權杖")

                payload = self.verify_token(token)
                if permission not in payload.get('permissions', []):
                    raise PermissionError(f"缺少權限: {permission}")

                request.user = payload
                return func(request, *args, **kwargs)
            return wrapper
        return decorator

API 請求簽章

import hmac
import hashlib
import time
import urllib.parse

class SignedAPIClient:
    """簽章所有 API 請求的客戶端"""

    def __init__(self, api_key: str, api_secret: str):
        self.api_key = api_key
        self.api_secret = api_secret.encode()

    def sign_request(self, method: str, path: str, body: str = "",
                     query_params: dict = None) -> dict:
        """為請求產生認證標頭"""
        timestamp = str(int(time.time()))
        nonce = os.urandom(16).hex()

        # 規範請求字串
        query_string = urllib.parse.urlencode(sorted(query_params.items())) if query_params else ""
        body_hash = hashlib.sha256(body.encode()).hexdigest()

        canonical = f"{method}\n{path}\n{query_string}\n{timestamp}\n{nonce}\n{body_hash}"

        # 簽章
        signature = hmac.new(
            self.api_secret,
            canonical.encode(),
            hashlib.sha256
        ).hexdigest()

        return {
            'X-API-Key': self.api_key,
            'X-Timestamp': timestamp,
            'X-Nonce': nonce,
            'X-Signature': signature
        }


class SignedAPIServer:
    """驗證簽章請求的伺服器"""

    def __init__(self, secrets: dict):
        self.secrets = secrets  # api_key -> api_secret
        self.used_nonces = set()  # 防止重放攻擊
        self.max_clock_skew = 300  # 5 分鐘

    def verify_request(self, method: str, path: str, headers: dict,
                      body: str = "", query_params: dict = None) -> str:
        """驗證請求簽章,如果有效則回傳 user_id"""

        # 擷取認證標頭
        api_key = headers.get('X-API-Key')
        timestamp = headers.get('X-Timestamp')
        nonce = headers.get('X-Nonce')
        signature = headers.get('X-Signature')

        if not all([api_key, timestamp, nonce, signature]):
            raise ValueError("缺少認證標頭")

        # 檢查時間戳記(防止舊請求重放)
        request_time = int(timestamp)
        if abs(time.time() - request_time) > self.max_clock_skew:
            raise ValueError("請求時間戳記與伺服器時間相差太遠")

        # 檢查 nonce(防止重放攻擊)
        if nonce in self.used_nonces:
            raise ValueError("Nonce 已使用")
        self.used_nonces.add(nonce)

        # 獲取 API 金鑰
        api_secret = self.secrets.get(api_key)
        if not api_secret:
            raise ValueError("未知的 API 金鑰")

        # 驗證簽章
        query_string = urllib.parse.urlencode(sorted(query_params.items())) if query_params else ""
        body_hash = hashlib.sha256(body.encode()).hexdigest()
        canonical = f"{method}\n{path}\n{query_string}\n{timestamp}\n{nonce}\n{body_hash}"

        expected_sig = hmac.new(
            api_secret.encode(),
            canonical.encode(),
            hashlib.sha256
        ).hexdigest()

        if not hmac.compare_digest(expected_sig, signature):
            raise ValueError("無效簽章")

        return api_key  # 回傳已認證的身份

5. 資料加密模式

欄位級加密

from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import json
import os

class FieldEncryption:
    """加密記錄中的特定欄位"""

    def __init__(self, kms):
        self.kms = kms
        self.sensitive_fields = {'ssn', 'credit_card', 'password_hash', 'private_data'}

    def encrypt_record(self, record: dict, context: str) -> dict:
        """加密記錄中的敏感欄位"""
        encrypted = record.copy()
        encrypted['_encrypted_fields'] = {}

        for field in self.sensitive_fields:
            if field in record:
                # 獲取欄位特定的金鑰
                key = self.kms.derive_key(f"field:{field}:{context}")

                # 加密
                nonce = os.urandom(12)
                aesgcm = AESGCM(key)
                value_bytes = json.dumps(record[field]).encode()
                ciphertext = aesgcm.encrypt(nonce, value_bytes, field.encode())

                # 儲存加密值
                encrypted['_encrypted_fields'][field] = {
                    'nonce': nonce.hex(),
                    'ciphertext': ciphertext.hex()
                }
                encrypted[field] = "[已加密]"

        return encrypted

    def decrypt_record(self, encrypted: dict, context: str) -> dict:
        """解密記錄中的敏感欄位"""
        record = encrypted.copy()
        encrypted_fields = record.pop('_encrypted_fields', {})

        for field, enc_data in encrypted_fields.items():
            key = self.kms.derive_key(f"field:{field}:{context}")

            aesgcm = AESGCM(key)
            plaintext = aesgcm.decrypt(
                bytes.fromhex(enc_data['nonce']),
                bytes.fromhex(enc_data['ciphertext']),
                field.encode()
            )

            record[field] = json.loads(plaintext)

        return record

可搜尋加密

import hmac
import hashlib

class SearchableEncryption:
    """允許搜尋加密資料而不解密所有內容"""

    def __init__(self, search_key: bytes, encryption_key: bytes):
        self.search_key = search_key
        self.encryption_key = encryption_key

    def create_search_token(self, value: str) -> str:
        """建立用於搜尋的確定性權杖"""
        # 規範化值以實現一致匹配
        normalized = value.lower().strip()

        # 建立搜尋權杖(確定性、單向)
        token = hmac.new(
            self.search_key,
            normalized.encode(),
            hashlib.sha256
        ).hexdigest()

        return token

    def encrypt_with_search(self, value: str) -> dict:
        """加密值同時啟用搜尋"""
        # 建立搜尋權杖用於索引
        search_token = self.create_search_token(value)

        # 加密實際值(非確定性)
        nonce = os.urandom(12)
        aesgcm = AESGCM(self.encryption_key)
        ciphertext = aesgcm.encrypt(nonce, value.encode(), None)

        return {
            'search_token': search_token,
            'nonce': nonce.hex(),
            'ciphertext': ciphertext.hex()
        }

    def search(self, query: str, encrypted_records: list) -> list:
        """搜尋加密記錄"""
        search_token = self.create_search_token(query)

        # 在搜尋權杖上匹配(伺服器可以做這個)
        matches = [
            r for r in encrypted_records
            if r.get('search_token') == search_token
        ]

        # 解密匹配項(客戶端做這個)
        results = []
        for match in matches:
            aesgcm = AESGCM(self.encryption_key)
            plaintext = aesgcm.decrypt(
                bytes.fromhex(match['nonce']),
                bytes.fromhex(match['ciphertext']),
                None
            )
            results.append(plaintext.decode())

        return results

6. 安全檢查清單

部署前

┌─────────────────────────────────────────────────────────────────┐
│                      安全部署檢查清單                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│ [ ] 啟用 TLS 1.3,最低 TLS 1.2                                  │
│ [ ] 所有金鑰在環境變數或金鑰管理器中                             │
│ [ ] 程式碼中沒有硬編碼的金鑰、權杖或密碼                         │
│ [ ] .gitignore 包含 .env、*.pem、*.key 檔案                     │
│ [ ] 密碼使用 Argon2id 或 bcrypt 雜湊                            │
│ [ ] 加密金鑰按計劃輪換                                          │
│ [ ] 安全相關操作的稽核日誌                                      │
│ [ ] 認證端點的速率限制                                          │
│ [ ] 所有使用者輸入的輸入驗證                                    │
│ [ ] 強制 HTTPS(HSTS 標頭)                                     │
│ [ ] 配置安全標頭(CSP、X-Frame-Options 等)                     │
│ [ ] 檢查相依套件的漏洞                                          │
│ [ ] 錯誤訊息不洩露敏感資訊                                      │
│ [ ] 會話管理安全(httpOnly、secure cookies)                    │
│ [ ] 啟用 CSRF 保護                                              │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

持續安全

class SecurityMonitor:
    """持續安全監控"""

    def __init__(self, alert_service):
        self.alerts = alert_service

    def check_certificate_expiry(self, cert_path: str, warn_days: int = 30):
        """在憑證過期前發出警報"""
        from cryptography import x509
        from datetime import datetime, timedelta

        with open(cert_path, 'rb') as f:
            cert = x509.load_pem_x509_certificate(f.read())

        days_until_expiry = (cert.not_valid_after_utc - datetime.utcnow()).days

        if days_until_expiry < warn_days:
            self.alerts.send(
                level='warning',
                message=f"憑證將在 {days_until_expiry} 天後過期"
            )

    def check_failed_auth_rate(self, window_minutes: int = 5, threshold: int = 100):
        """警報可疑的認證失敗率"""
        failures = self.get_auth_failures(window_minutes)

        if failures > threshold:
            self.alerts.send(
                level='critical',
                message=f"高認證失敗率: {window_minutes} 分鐘內 {failures} 次"
            )

    def check_key_age(self, key_metadata: dict, max_age_days: int = 90):
        """當金鑰需要輪換時發出警報"""
        from datetime import datetime

        created = datetime.fromisoformat(key_metadata['created_at'])
        age_days = (datetime.now() - created).days

        if age_days > max_age_days:
            self.alerts.send(
                level='warning',
                message=f"金鑰 {key_metadata['name']} 已有 {age_days} 天"
            )

7. 要避免的常見錯誤

錯誤 1:透過隱蔽實現安全

# 錯誤:依賴隱蔽
def "secure"_encrypt(data):
    # "沒人會發現我的自訂演算法"
    result = ""
    for char in data:
        result += chr(ord(char) + 3)  # ROT3 不是加密!
    return result

# 正確:使用經過驗證的演算法
from cryptography.hazmat.primitives.ciphers.aead import AESGCM

def secure_encrypt(key: bytes, data: bytes) -> tuple[bytes, bytes]:
    nonce = os.urandom(12)
    aesgcm = AESGCM(key)
    return nonce, aesgcm.encrypt(nonce, data, None)

錯誤 2:隨機性不足

import random
import secrets

# 錯誤:可預測的隨機性
def bad_token():
    return ''.join(random.choices('abcdef0123456789', k=32))

# 正確:密碼學安全的隨機性
def good_token():
    return secrets.token_hex(16)

# 錯誤:用時間做種子
random.seed(int(time.time()))  # 可預測的!

# 正確:系統熵
secure_random = secrets.SystemRandom()

錯誤 3:記錄敏感資料

import logging

# 錯誤:記錄敏感資料
def authenticate_bad(username, password):
    logging.info(f"認證嘗試: user={username}, pass={password}")  # 不!

# 正確:安全日誌
def authenticate_good(username, password):
    logging.info(f"認證嘗試: user={username}")
    # 密碼永遠不記錄

# 正確:編輯敏感欄位
def safe_log(data: dict) -> dict:
    """建立資料副本並編輯敏感欄位"""
    sensitive = {'password', 'token', 'secret', 'key', 'ssn', 'credit_card'}
    return {
        k: '[已編輯]' if k.lower() in sensitive else v
        for k, v in data.items()
    }

logging.info(f"請求: {safe_log(request_data)}")

8. 本章小結

三點要記住:

  1. 縱深防禦。 不要依賴任何單一的安全措施。分層防禦:TLS 用於傳輸、加密用於儲存、認證用於存取、稽核用於偵測。

  2. 使用經過驗證的解決方案。 不要發明密碼學。使用 TLS 1.3、AES-GCM、Argon2、已建立的函式庫。安全社群已經解決了大多數問題。

  3. 安全失敗,記錄一切。 當出現問題時,預設拒絕存取。記錄所有安全相關事件,以便你可以偵測和調查攻擊。

9. 系列總結

恭喜!你已經完成了開發者加密系列。

你現在理解了:

  • 對稱和非對稱加密如何運作
  • 何時使用 AES-GCM vs ChaCha20 vs RSA vs ECC
  • 為什麼我們雜湊密碼以及如何正確做
  • TLS 如何保護傳輸中的資料
  • 如何管理密碼學金鑰
  • 如何建構安全系統

記住: 密碼學是工具,不是解決方案。安全是關於理解威脅、做出好的設計決策,以及保持警惕。

繼續學習。保持對漏洞的更新。當有疑問時,諮詢安全專業人士。

祝你建構安全系統好運!