MPC 錢包部署配置完整指南:從環境建置到生產環境的工程實踐

MPC錢包部署配置是以太坊錢包安全實踐中最關鍵的環節之一。本文提供從開發環境建置到生產環境部署的完整指南,涵蓋分散式金鑰生成配置、閾值簽名參數設定、MPC節點網路架構、安全審計要點、以及常見部署陷阱的規避策略。

MPC 錢包部署配置完整指南:從環境建置到生產環境的工程實踐

執行摘要

多方計算(Multi-Party Computation,簡稱 MPC)錢包的部署配置是以太坊錢包安全實踐中最關鍵的環節之一。與傳統單私鑰錢包不同,MPC 錢包涉及複雜的密碼學協議、分散式金鑰生成、閾值簽名計算等多個技術環節,每個環節的配置正確性都直接關係到資產安全。本文提供從開發環境建置到生產環境部署的完整指南,涵蓋分散式金鑰生成配置、閾值簽名參數設定、MPC 節點網路架構、安全審計要點、以及常見部署陷阱的規避策略。截至 2026 年第一季度,MPC 錢包技術已相當成熟,正確的配置方法對於保護數位資產至關重要。

第一章:部署前規劃與環境需求

1.1 硬體與網路需求分析

MPC 錢包的部署需要精心規劃的硬體與網路基礎設施。根據不同的安全等級和交易量要求,配置會有所不同。

開發與測試環境

開發測試環境主要用於功能驗證和集成測試,硬體需求相對適中:

預生產環境

預生產環境用於正式上線前的全面測試:

生產環境

生產環境需要最高等級的可靠性與安全性:

1.2 軟體依賴與版本選擇

MPC 錢包依賴多個關鍵軟體組件,版本選擇需要特別謹慎:

作業系統

生產環境推薦使用經過安全加固的 Linux 發行版:

密碼學庫

選擇經過充分審計的密碼學庫至關重要:

區塊鏈交互庫

1.3 安全性規劃

部署前的安全性規劃是成功部署的基礎:

威脅模型分析

需要識別並緩解以下威脅:

  1. 外部攻擊:網路入侵、DDoS攻擊
  2. 內部威脅:惡意員工、未授權訪問
  3. 技術故障:硬體故障、軟體Bug、網路中斷
  4. 供應鏈攻擊:依賴庫被植入後門

安全邊界定義

清晰定義安全邊界:

第二章:分散式金鑰生成配置

2.1 DKG 協議參數設定

分散式金鑰生成(Distributed Key Generation,簡稱 DKG)是 MPC 錢包部署的第一個關鍵步驟。參數的正確選擇直接影響安全性和可用性。

閾值參數選擇

閾值參數 (t, n) 決定了安全性和可用性之間的平衡:

選擇考量因素:

# 閾值選擇決策示例
def choose_threshold(total_nodes: int, risk_tolerance: str) -> tuple:
    """
    根據風險承受能力和節點數量選擇閾值
    
    參數:
        total_nodes: 總節點數量
        risk_tolerance: 風險承受水平 ('low', 'medium', 'high')
    
    返回:
        (threshold, n) 元組
    """
    
    if risk_tolerance == 'low':
        # 低風險容忍:需要大多數節點同意
        threshold = (total_nodes // 2) + 1
    elif risk_tolerance == 'medium':
        # 中等風險容忍:允許少數節點故障
        threshold = (total_nodes // 3) * 2
    else:
        # 高風險容忍:最小化不便
        threshold = 2
    
    return (threshold, total_nodes)

# 典型配置示例
configs = {
    "enterprise": {"n": 5, "t": 3, "description": "高安全性,企業使用"},
    "balanced": {"n": 3, "t": 2, "description": "平衡安全與可用性"},
    "max_availability": {"n": 4, "t": 2, "description": "最大化可用性"}
}

2.2 密鑰分片生成流程

密鑰分片的生成需要在安全的環境中進行:

準備階段

# 1. 創建專用用戶
sudo useradd -m -s /bin/bash mpc-operator

# 2. 創建工作目錄
sudo mkdir -p /opt/mpc-wallet/{config,keys,logs,data}
sudo chown -R mpc-operator:mpc-operator /opt/mpc-wallet

# 3. 安裝依賴
apt-get update
apt-get install -y build-essential cmake libssl-dev libgmp-dev

# 4. 克隆並編譯密碼學庫
git clone https://github.com/supranational/bls.git
cd bls
make

分片生成配置

# config/dkg-config.yaml
dkg:
  # 曲線選擇
  curve: "BLS12-381"
  
  # 閾值參數
  threshold: 3
  total_parties: 5
  
  # 參與者配置
  participants:
    - id: 1
      endpoint: "https://node1.example.com:9001"
      public_ip: "203.0.113.1"
    - id: 2
      endpoint: "https://node2.example.com:9001"
      public_ip: "203.0.113.2"
    - id: 3
      endpoint: "https://node3.example.com:9001"
      public_ip: "203.0.113.3"
    - id: 4
      endpoint: "https://node4.example.com:9001"
      public_ip: "203.0.113.4"
    - id: 5
      endpoint: "https://node5.example.com:9001"
      public_ip: "203.0.113.5"
  
  # 安全參數
  security:
    # 隨機種子來源
    entropy_source: "/dev/urandom"
    min_entropy_bits: 256
    
    # 通信加密
    tls_enabled: true
    tls_cert_path: "/opt/mpc-wallet/certs/node.crt"
    tls_key_path: "/opt/mpc-wallet/certs/node.key"

2.3 DKG 執行與驗證

DKG 的執行需要嚴格的流程控制:

初始化腳本

#!/usr/bin/env python3
"""
DKG 初始化腳本
"""

import os
import json
import asyncio
from dataclasses import dataclass
from typing import List, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class DKGNode:
    node_id: int
    endpoint: str
    private_key_share: Optional[bytes] = None
    public_key: Optional[bytes] = None
    
class DKGCoordinator:
    def __init__(self, config_path: str):
        with open(config_path) as f:
            self.config = json.load(f)
        
        self.nodes = [
            DKGNode(
                node_id=p['id'],
                endpoint=p['endpoint']
            )
            for p in self.config['dkg']['participants']
        ]
        
        self.threshold = self.config['dkg']['threshold']
        
    async def run_dkg(self) -> dict:
        """執行分散式金鑰生成"""
        
        logger.info("Starting DKG protocol...")
        
        # Phase 1: 參與者註冊
        await self._phase_registration()
        
        # Phase 2: 承諾分發
        await self._phase_commitment()
        
        # Phase 3: 分片計算
        await self._phase_share_computation()
        
        # Phase 4: 驗證與最終確定
        await self._phase_verification()
        
        logger.info("DKG completed successfully")
        
        return self._generate_final_keys()
    
    async def _phase_registration(self):
        """Phase 1: 參與者註冊"""
        logger.info("Phase 1: Participant registration")
        
        # 每個節點向協調者註冊
        for node in self.nodes:
            response = await self._send_request(
                node.endpoint,
                "/dkg/register",
                {"node_id": node.node_id}
            )
            
            if not response.get("success"):
                raise DKGError(f"Node {node.node_id} registration failed")
        
        logger.info(f"Registered {len(self.nodes)} participants")
    
    async def _phase_commitment(self):
        """Phase 2: 承諾分發"""
        logger.info("Phase 2: Commitment distribution")
        
        # 生成並分發承諾
        commitments = {}
        for node in self.nodes:
            commitment = await self._generate_commitment(node)
            commitments[node.node_id] = commitment
        
        # 廣播承諾給所有參與者
        for node in self.nodes:
            await self._broadcast(
                node.endpoint,
                "/dkg/commitments",
                commitments
            )
    
    async def _phase_share_computation(self):
        """Phase 3: 分片計算"""
        logger.info("Phase 3: Share computation")
        
        # 每個節點計算自己的分片
        for node in self.nodes:
            result = await self._send_request(
                node.endpoint,
                "/dkg/compute",
                {"threshold": self.threshold}
            )
            
            node.private_key_share = result["share"]
            node.public_key = result["public_key"]
    
    async def _phase_verification(self):
        """Phase 4: 驗證"""
        logger.info("Phase 4: Verification")
        
        # 驗證每個節點的分片
        for node in self.nodes:
            is_valid = await self._verify_share(node)
            
            if not is_valid:
                raise DKGError(
                    f"Share verification failed for node {node.node_id}"
                )
        
        logger.info("All shares verified successfully")
    
    def _generate_final_keys(self) -> dict:
        """生成最終的金鑰材料"""
        
        # 聚合公鑰
        public_key = self._aggregate_public_keys([n.public_key for n in self.nodes])
        
        return {
            "public_key": public_key.hex(),
            "threshold": self.threshold,
            "total_parties": len(self.nodes),
            "created_at": asyncio.get_event_loop().time()
        }

# 執行 DKG
async def main():
    coordinator = DKGCoordinator("/opt/mpc-wallet/config/dkg-config.yaml")
    keys = await coordinator.run_dkg()
    
    # 保存公鑰(不保存私鑰分片)
    with open("/opt/mpc-wallet/keys/public_key.json", "w") as f:
        json.dump(keys, f, indent=2)
    
    logger.info(f"Public key: {keys['public_key']}")

if __name__ == "__main__":
    asyncio.run(main())

2.4 密鑰分片的安全存儲

私鑰分片的安全存儲是 MPC 錢包安全的核心:

分片加密配置

# config/storage-config.yaml
storage:
  # 加密配置
  encryption:
    enabled: true
    algorithm: "AES-256-GCM"
    
    # 金鑰派生
    key_derivation:
      function: "Argon2id"
      memory: 65536  # 64 MB
      iterations: 3
      parallelism: 4
      salt_length: 32
    
  # 硬體安全模組配置
  hsm:
    enabled: true
    provider: "yubihsm"
    connection:
      host: "127.0.0.1"
      port: 12345
    auth:
      key_id: 1
      password_env: "YUBIHSM_AUTHKEY_PASSWORD"
    
  # 備份配置
  backup:
    enabled: true
    encrypted: true
    destinations:
      - type: "s3"
        bucket: "mpc-backups-prod"
        region: "us-east-1"
        prefix: "shards/"
      - type: "file"
        path: "/backup/encrypted-shards"
        retention_days: 90

第三章:閾值簽名配置

3.1 簽名參數詳解

閾值簽名的參數配置直接影響安全性和性能:

基礎參數配置

# config/signature-config.yaml
signature:
  # 簽名方案
  scheme: "BLS-TSS"
  
  # 曲線參數
  curve: "BLS12-381"
  hash_to_curve: "try_and_increment"
  
  # 閾值參數
  threshold: 3
  total_participants: 5
  
  # 消息格式
  message:
    domain_separator: "MPC-WALLET-v1"
    chain_id: 1  # Ethereum mainnet
  
  # 重放保護
  replay_protection:
    enabled: true
    nonce_type: "sequential"  # 或 "timestamp"
    max_age_seconds: 3600

3.2 簽名流程配置

簽名流程需要考慮多種異常情況:

# mpc/signing.py

import asyncio
import logging
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum

logger = logging.getLogger(__name__)

class SigningState(Enum):
    IDLE = "idle"
    COLLECTING = "collecting"
    SIGNING = "signing"
    BROADCASTING = "broadcasting"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class SigningRequest:
    request_id: str
    transaction_hash: bytes
    signer_ids: List[int]
    threshold: int
    deadline: float
    state: SigningState
    partial_signatures: Dict[int, bytes]
    final_signature: Optional[bytes] = None

class ThresholdSigner:
    def __init__(self, config: dict):
        self.config = config
        self.my_node_id = config['node']['id']
        self.threshold = config['signature']['threshold']
        self.timeout = config['signature'].get('timeout_seconds', 30)
        self.max_retries = config['signature'].get('max_retries', 3)
        
        self.pending_requests: Dict[str, SigningRequest] = {}
        
    async def sign_transaction(
        self,
        tx_hash: bytes,
        required_signers: List[int]
    ) -> Optional[bytes]:
        """執行閾值簽名"""
        
        request_id = self._generate_request_id(tx_hash)
        
        # 創建簽名請求
        request = SigningRequest(
            request_id=request_id,
            transaction_hash=tx_hash,
            signer_ids=required_signers,
            threshold=self.threshold,
            deadline=asyncio.get_event_loop().time() + self.timeout,
            state=SigningState.COLLECTING,
            partial_signatures={}
        )
        
        self.pending_requests[request_id] = request
        
        try:
            # 計算自己的部分簽名
            my_partial_sig = await self._compute_partial_signature(tx_hash)
            
            # 廣播部分簽名
            await self._broadcast_partial_signature(request_id, my_partial_sig)
            
            # 等待其他部分簽名
            completed_request = await self._wait_for_signatures(request)
            
            # 聚合最終簽名
            final_sig = await self._aggregate_signatures(completed_request)
            
            completed_request.state = SigningState.COMPLETED
            completed_request.final_signature = final_sig
            
            return final_sig
            
        except SigningError as e:
            logger.error(f"Signing failed: {e}")
            request.state = SigningState.FAILED
            raise
            
        finally:
            # 清理請求
            if request_id in self.pending_requests:
                del self.pending_requests[request_id]
    
    async def _compute_partial_signature(self, message: bytes) -> bytes:
        """計算部分簽名"""
        
        # 獲取自己的私鑰分片
        private_share = await self._load_private_share()
        
        # 計算部分簽名
        # 在實際實現中使用密碼學庫
        partial_sig = self._bls_sign(
            message,
            private_share,
            self.my_node_id
        )
        
        return partial_sig
    
    async def _broadcast_partial_signature(
        self,
        request_id: str,
        partial_sig: bytes
    ):
        """廣播部分簽名給其他節點"""
        
        for node_id in self.pending_requests[request_id].signer_ids:
            if node_id != self.my_node_id:
                await self._send_to_node(
                    node_id,
                    "/signing/partial",
                    {
                        "request_id": request_id,
                        "node_id": self.my_node_id,
                        "partial_signature": partial_sig.hex()
                    }
                )
    
    async def _wait_for_signatures(
        self,
        request: SigningRequest
    ) -> SigningRequest:
        """等待收集足夠的部分簽名"""
        
        required_count = request.threshold
        
        while len(request.partial_signatures) < required_count:
            # 檢查超時
            if asyncio.get_event_loop().time() > request.deadline:
                raise SigningError("Signing timeout")
            
            # 等待部分簽名到來
            await asyncio.sleep(0.1)
            
            # 處理收到的部分簽名
            await self._process_incoming_partial_sigs(request)
        
        return request
    
    async def _aggregate_signatures(
        self,
        request: SigningRequest
    ) -> bytes:
        """聚合部分簽名"""
        
        partial_sigs = [
            request.partial_signatures[node_id]
            for node_id in request.partial_signatures.keys()
        ]
        
        # 聚合簽名
        final_sig = self._bls_aggregate(partial_sigs)
        
        return final_sig

3.3 錯誤處理與重試機制

網路故障和節點故障是常見情況,需要健壯的錯誤處理:

# mpc/error_handling.py

import asyncio
import logging
from typing import Callable, TypeVar, Optional
from enum import Enum

T = TypeVar('T')

logger = logging.getLogger(__name__)

class RetryPolicy(Enum):
    IMMEDIATE = "immediate"
    LINEAR_BACKOFF = "linear"
    EXPONENTIAL_BACKOFF = "exponential"

async def with_retry(
    func: Callable[..., T],
    *args,
    max_attempts: int = 3,
    policy: RetryPolicy = RetryPolicy.EXPONENTIAL_BACKOFF,
    base_delay: float = 1.0,
    **kwargs
) -> T:
    """
    帶重試的異步函數包裝器
    """
    
    last_exception = None
    
    for attempt in range(max_attempts):
        try:
            return await func(*args, **kwargs)
            
        except Exception as e:
            last_exception = e
            logger.warning(
                f"Attempt {attempt + 1}/{max_attempts} failed: {e}"
            )
            
            if attempt < max_attempts - 1:
                # 計算延遲
                if policy == RetryPolicy.IMMEDIATE:
                    delay = 0
                elif policy == RetryPolicy.LINEAR_BACKOFF:
                    delay = base_delay * (attempt + 1)
                else:  # EXPONENTIAL
                    delay = base_delay * (2 ** attempt)
                
                logger.info(f"Retrying in {delay}s...")
                await asyncio.sleep(delay)
    
    # 所有重試都失敗
    raise last_exception

class CircuitBreaker:
    """熔斷器模式實現"""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time: Optional[float] = None
        self.state = "closed"  # closed, open, half_open"
    
    async def call(self, func: Callable[..., T], *args, **kwargs) -> T:
        """通過熔斷器執行函數"""
        
        if self.state == "open":
            # 檢查是否可以進入半開狀態
            if asyncio.get_event_loop().time() - self.last_failure_time \
                    > self.recovery_timeout:
                self.state = "half_open"
                logger.info("Circuit breaker entering half-open state")
            else:
                raise CircuitOpenError()
        
        try:
            result = await func(*args, **kwargs)
            
            if self.state == "half_open":
                # 成功,重置熔斷器
                self._reset()
            
            return result
            
        except self.expected_exception as e:
            self._record_failure()
            raise
    
    def _record_failure(self):
        """記錄失敗"""
        
        self.failure_count += 1
        self.last_failure_time = asyncio.get_event_loop().time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = "open"
            logger.warning("Circuit breaker opened")
    
    def _reset(self):
        """重置熔斷器"""
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "closed"
        logger.info("Circuit breaker closed")

第四章:節點網路架構

4.1 網路拓撲設計

MPC 節點的網路架構需要平衡安全性、延遲和可用性:

星型拓撲

                    ┌─────────────┐
                    │   中心節點   │
                    │ (協調者)    │
                    └──────┬──────┘
                           │
         ┌─────────────────┼─────────────────┐
         │                 │                 │
    ┌────▼────┐       ┌────▼────┐       ┌────▼────┐
    │ 節點 1  │       │ 節點 2  │       │ 節點 3  │
    └─────────┘       └─────────┘       └─────────┘

優點:簡單、易於管理、延遲低

缺點:中心節點故障影響整個系統

網狀拓撲

    ┌─────────┐      ┌─────────┐
    │ 節點 1  │──────│ 節點 2  │
    └────┬────┘      └────┬────┘
         │                │
    ┌────┴────┐      ┌────┴────┐
    │ 節點 3  │──────│ 節點 4  │
    └─────────┘      └─────────┘

優點:高冗餘、去中心化

缺點:配置複雜、延遲可能較高

混合拓撲(推薦):

                    ┌─────────────┐
                    │   負載均衡  │
                    └──────┬──────┘
                           │
         ┌─────────────────┼─────────────────┐
         │                 │                 │
    ┌────▼────┐       ┌────▼────┐       ┌────▼────┐
    │ 節點 1  │◄─────►│ 節點 2  │◄─────►│ 節點 3  │
    └─────────┘       └─────────┘       └─────────┘
         │                 │                 │
         └─────────────────┼─────────────────┘
                           │
                    ┌──────▼──────┐
                    │   備份節點   │
                    └─────────────┘

4.2 安全網路配置

網路安全是 MPC 錢包部署的關鍵:

防火牆規則

#!/bin/bash
# setup_firewall.sh

# 清除現有規則
iptables -F
iptables -X

# 默認策略:拒絕所有進入連接
iptables -P INPUT DROP
iptables -P FORWARD DROP
iptables -P OUTPUT ACCEPT

# 允許本地回環
iptables -A INPUT -i lo -j ACCEPT

# 允許已建立的連接
iptables -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT

# 允許 SSH(限制來源 IP)
iptables -A INPUT -p tcp --dport 22 -s 10.0.0.0/8 -j ACCEPT

# 允許 MPC 節點之間的通信
# 假設 MPC 節點在 10.0.1.0/24 網段
iptables -A INPUT -p tcp --dport 9001 -s 10.0.1.0/24 -j ACCEPT
iptables -A INPUT -p tcp --dport 9002 -s 10.0.1.0/24 -j ACCEPT
iptables -A INPUT -p tcp --dport 9003 -s 10.0.1.0/24 -j ACCEPT

# 允許健康檢查
iptables -A INPUT -p tcp --dport 8080 -s 10.0.0.0/8 -j ACCEPT

# 記錄被阻止的連接
iptables -A INPUT -m limit --limit 5/min -j LOG --log-prefix "IPTABLES-DROPED: "

# 保存規則
iptables-save > /etc/iptables/rules.v4

TLS 配置

# config/tls-config.yaml
tls:
  enabled: true
  
  # 服務器證書
  server:
    cert: "/opt/mpc-wallet/certs/server.crt"
    key: "/opt/mpc-wallet/certs/server.key"
    min_version: "TLSv1.3"
    
    # 密碼套件
    cipher_suites:
      - "TLS_AES_256_GCM_SHA384"
      - "TLS_CHACHA20_POLY1305_SHA256"
      - "TLS_AES_128_GCM_SHA256"
    
    # 客戶端證書認證
    client_auth:
      enabled: true
      ca_cert: "/opt/mpc-wallet/certs/ca.crt"
      verify_client: "require"
  
  # 客戶端配置
  client:
    verify_server: true
    server_ca_cert: "/opt/mpc-wallet/certs/ca.crt"

4.3 節點間通信協議

節點間通信需要高效且安全的消息傳遞:

# mpc/transport.py

import asyncio
import json
import logging
from typing import Dict, Any, Callable, Optional
from dataclasses import dataclass
from enum import Enum
import ssl
import websockets

logger = logging.getLogger(__name__)

class MessageType(Enum):
    DKG_INITIATE = "dkg_initiate"
    DKG_COMMITMENT = "dkg_commitment"
    DKG_SHARE = "dkg_share"
    SIGN_REQUEST = "sign_request"
    SIGN_PARTIAL = "sign_partial"
    SIGN_COMPLETE = "sign_complete"
    HEALTH_CHECK = "health_check"

@dataclass
class Message:
    type: MessageType
    sender_id: int
    receiver_id: Optional[int]
    payload: Dict[str, Any]
    message_id: str
    timestamp: float

class MPCTransport:
    def __init__(self, node_id: int, config: dict):
        self.node_id = node_id
        self.config = config
        self.connections: Dict[int, websockets.WebSocketServerProtocol] = {}
        self.handlers: Dict[MessageType, Callable] = {}
        
    async def start_server(self):
        """啟動 WebSocket 服務器"""
        
        ssl_context = self._create_ssl_context()
        
        server = await websockets.serve(
            self._handle_connection,
            self.config['host'],
            self.config['port'],
            ssl=ssl_context,
            ping_interval=20,
            ping_timeout=10
        )
        
        logger.info(f"MPC server started on {self.config['host']}:{self.config['port']}")
        
        return server
    
    async def _handle_connection(
        self,
        websocket: websockets.WebSocketServerProtocol,
        path: str
    ):
        """處理新的連接"""
        
        try:
            # 驗證客戶端證書
            if not await self._authenticate_client(websocket):
                await websocket.close(4001, "Authentication failed")
                return
            
            # 接收消息
            async for raw_message in websocket:
                await self._process_message(websocket, raw_message)
                
        except websockets.exceptions.ConnectionClosed:
            logger.info("Connection closed")
            
        except Exception as e:
            logger.error(f"Connection error: {e}")
    
    async def _process_message(
        self,
        websocket: websockets.WebSocketServerProtocol,
        raw_message: str
    ):
        """處理收到的消息"""
        
        try:
            data = json.loads(raw_message)
            message = Message(
                type=MessageType(data['type']),
                sender_id=data['sender_id'],
                receiver_id=data.get('receiver_id'),
                payload=data['payload'],
                message_id=data['message_id'],
                timestamp=data['timestamp']
            )
            
            # 調用相應的處理器
            handler = self.handlers.get(message.type)
            if handler:
                await handler(message)
            else:
                logger.warning(f"No handler for message type: {message.type}")
                
        except json.JSONDecodeError:
            logger.error("Invalid JSON message")
            
        except Exception as e:
            logger.error(f"Message processing error: {e}")
    
    async def send_to_node(
        self,
        node_id: int,
        message: Message
    ):
        """發送消息給指定節點"""
        
        if node_id not in self.connections:
            await self._connect_to_node(node_id)
        
        websocket = self.connections[node_id]
        await websocket.send(json.dumps({
            'type': message.type.value,
            'sender_id': message.sender_id,
            'receiver_id': message.receiver_id,
            'payload': message.payload,
            'message_id': message.message_id,
            'timestamp': message.timestamp
        }))
    
    async def broadcast(
        self,
        message: Message,
        exclude_nodes: Optional[list] = None
    ):
        """廣播消息給所有連接的節點"""
        
        exclude_nodes = exclude_nodes or []
        
        for node_id, websocket in self.connections.items():
            if node_id not in exclude_nodes:
                try:
                    await websocket.send(json.dumps({
                        'type': message.type.value,
                        'sender_id': message.sender_id,
                        'payload': message.payload,
                        'message_id': message.message_id,
                        'timestamp': message.timestamp
                    }))
                except Exception as e:
                    logger.error(f"Failed to send to node {node_id}: {e}")

第五章:監控與運維

5.1 監控指標體系

全面的監控是確保 MPC 錢包穩定運行的關鍵:

關鍵效能指標(KPI)

# monitoring/metrics.py

from prometheus_client import Counter, Histogram, Gauge
import time

# 交易指標
transactions_total = Counter(
    'mpc_transactions_total',
    'Total number of MPC transactions',
    ['status', 'type']
)

transaction_duration = Histogram(
    'mpc_transaction_duration_seconds',
    'MPC transaction duration',
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
)

# 簽名指標
signatures_partial_total = Counter(
    'mpc_signatures_partial_total',
    'Total partial signatures',
    ['node_id', 'status']
)

signature_aggregation_duration = Histogram(
    'mpc_signature_aggregation_duration_seconds',
    'Time to aggregate signatures'
)

# 節點健康指標
node_status = Gauge(
    'mpc_node_status',
    'Node status (1=healthy, 0=unhealthy)',
    ['node_id']
)

node_latency = Histogram(
    'mpc_node_latency_seconds',
    'Latency to other nodes',
    ['remote_node_id']
)

# 錯誤指標
errors_total = Counter(
    'mpc_errors_total',
    'Total errors',
    ['error_type', 'severity']
)

# 資金指標
wallet_balance = Gauge(
    'mpc_wallet_balance_wei',
    'Wallet balance in wei',
    ['token']
)

pending_transactions = Gauge(
    'mpc_pending_transactions',
    'Number of pending transactions'
)

日誌配置

# config/logging-config.yaml
logging:
  version: 1
  disable_existing_loggers: false
  
  formatters:
    json:
      format: '{"timestamp": "%(asctime)s", "level": "%(levelname)s", "module": "%(module)s", "message": "%(message)s"}'
    
    standard:
      format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  
  handlers:
    console:
      class: logging.StreamHandler
      level: INFO
      formatter: standard
      stream: ext://sys.stdout
    
    file:
      class: logging.handlers.RotatingFileHandler
      level: DEBUG
      formatter: json
      filename: /opt/mpc-wallet/logs/mpc.log
      maxBytes: 104857600  # 100MB
      backupCount: 10
    
    error_file:
      class: logging.handlers.RotatingFileHandler
      level: ERROR
      formatter: json
      filename: /opt/mpc-wallet/logs/error.log
      maxBytes: 52428800  # 50MB
      backupCount: 5
    
    syslog:
      class: logging.handlers.SysLogHandler
      level: WARNING
      formatter: json
      address: ['localhost', 514]
  
  loggers:
    mpc:
      level: DEBUG
      handlers: [console, file, error_file]
      propagate: false
    
    mpc.security:
      level: INFO
      handlers: [console, file, syslog]
      propagate: false
  
  root:
    level: INFO
    handlers: [console, file]

5.2 警報與應變計畫

警報配置

# monitoring/alerts.py

from dataclasses import dataclass
from typing import List, Callable, Optional
from enum import Enum
import asyncio

class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class AlertRule:
    name: str
    condition: Callable[[], bool]
    severity: AlertSeverity
    message: str
    cooldown_seconds: int = 300

class AlertManager:
    def __init__(self):
        self.rules: List[AlertRule] = []
        self.handlers: List[Callable] = []
        self.last_alert_time: dict = {}
        
    def add_rule(self, rule: AlertRule):
        """添加警報規則"""
        self.rules.append(rule)
    
    def add_handler(self, handler: Callable):
        """添加警報處理器"""
        self.handlers.append(handler)
    
    async def check_rules(self):
        """檢查所有警報規則"""
        
        for rule in self.rules:
            # 冷卻期檢查
            if rule.name in self.last_alert_time:
                elapsed = time.time() - self.last_alert_time[rule.name]
                if elapsed < rule.cooldown_seconds:
                    continue
            
            # 檢查條件
            try:
                if rule.condition():
                    await self._trigger_alert(rule)
            except Exception as e:
                logging.error(f"Error checking rule {rule.name}: {e}")
    
    async def _trigger_alert(self, rule: AlertRule):
        """觸發警報"""
        
        logging.log(
            rule.severity == AlertSeverity.CRITICAL and logging.ERROR or logging.WARNING,
            f"ALERT: {rule.name} - {rule.message}"
        )
        
        # 記錄觸發時間
        self.last_alert_time[rule.name] = time.time()
        
        # 調用所有處理器
        for handler in self.handlers:
            try:
                await handler(rule)
            except Exception as e:
                logging.error(f"Alert handler error: {e}")

# 預定義警報規則
def create_standard_alerts(metrics) -> List[AlertRule]:
    """創建標準警報規則"""
    
    return [
        AlertRule(
            name="high_error_rate",
            condition=lambda: metrics.error_rate() > 0.05,
            severity=AlertSeverity.WARNING,
            message="Error rate exceeds 5%",
            cooldown_seconds=300
        ),
        AlertRule(
            name="node_offline",
            condition=lambda: not metrics.all_nodes_healthy(),
            severity=AlertSeverity.CRITICAL,
            message="One or more nodes are offline",
            cooldown_seconds=60
        ),
        AlertRule(
            name="high_latency",
            condition=lambda: metrics.avg_latency() > 5.0,
            severity=AlertSeverity.WARNING,
            message="Average latency exceeds 5 seconds",
            cooldown_seconds=600
        ),
        AlertRule(
            name="pending_transactions_backlog",
            condition=lambda: metrics.pending_count() > 100,
            severity=AlertSeverity.CRITICAL,
            message="Transaction backlog detected",
            cooldown_seconds=120
        ),
        AlertRule(
            name="low_balance",
            condition=lambda: metrics.balance_ratio() < 0.1,
            severity=AlertSeverity.WARNING,
            message="Wallet balance is low",
            cooldown_seconds=3600
        )
    ]

結論

MPC 錢包的部署配置是一個複雜但至關重要的工程任務。從分散式金鑰生成到閾值簽名,每個環節都需要精心的規劃和嚴格的安全措施。

本文提供的指南涵蓋了從開發環境到生產部署的完整流程,包括:硬體與網路需求分析、DKG 協議配置、閾值簽名參數設定、節點網路架構設計、安全配置,以及監控與運維最佳實踐。

遵循這些指導原則,組織可以成功部署安全、可靠的 MPC 錢包基礎設施。正確的配置不僅能保護數位資產,還能確保系統的高可用性和可擴展性,支撐業務的持續增長。

延伸閱讀與來源

這篇文章對您有幫助嗎?

評論

發表評論

注意:由於這是靜態網站,您的評論將儲存在本地瀏覽器中,不會公開顯示。

目前尚無評論,成為第一個發表評論的人吧!