以太坊開發者學習路徑完整指南:從零建構簡化版 EVM、Merkle Patricia Trie 與共識機制實作

本文提供一條系統化的以太坊開發者學習路徑,透過實際編寫程式碼來理解以太坊的核心組件。讀者將從頭建構簡化版的以太坊虛擬機(EVM)、實現完整的 Merkle Patricia Trie,並模擬一個基本的工作量證明共識系統。涵蓋 EVM 指令集與執行模型、MPT 的前綴壓縮與哈希驗證、PoW 挖掘與驗證機制、質押證明的驗證者選擇與見證投票等核心主題。提供完整的 Python 程式碼範例與詳細解說,幫助開發者建立對以太坊底層機制的深度理解。

以太坊開發者學習路徑完整指南:從零建構簡化版 EVM、Merkle Patricia Trie 與共識機制實作

前言:為什麼要從底層理解以太坊

以太坊的複雜性常使開發者停留在高層抽象層面。使用 Hardhat 部署合約、呼叫 OpenZeppelin 庫、透過 ethers.js 發送交易——這些技能固然重要,但若不理解底層機制,開發者將難以診斷複雜問題、優化 Gas 消耗或設計安全的應用架構。

本文提供一條系統化的學習路徑,透過實際編寫程式碼來理解以太坊的核心組件:虛擬機器(Merkle Patricia Trie)、狀態組織(Merkle Patricia Trie)與共識機制。讀者將從頭建構簡化版的 EVM、實現完整的 Merkle Patricia Trie,並模擬一個基本的工作量證明共識系統。

第一章:建構簡化版以太坊虛擬機(EVM)

1.1 EVM 的基本架構

EVM 是一個棧式虛擬機(Stack-based Virtual Machine),其設計具有以下特點:

讓我們從最基本的結構開始建構:

from dataclasses import dataclass
from typing import List, Dict, Optional, Callable
from enum import IntEnum

class OpCode(IntEnum):
    """EVM 操作碼定義"""
    STOP = 0x00
    ADD = 0x01
    MUL = 0x02
    SUB = 0x03
    DIV = 0x04
    SDIV = 0x05  # 帶符號除法
    MOD = 0x06
    SMOD = 0x07  # 帶符號取模
    ADDMOD = 0x08
    MULMOD = 0x09
    EXP = 0x0A
    SIGNEXTEND = 0x0B
    
    # 比較與位運算
    LT = 0x10
    GT = 0x11
    SLT = 0x12  # 帶符號小於
    SGT = 0x13  # 帶符號大於
    EQ = 0x14
    ISZERO = 0x15
    AND = 0x16
    OR = 0x17
    XOR = 0x18
    NOT = 0x19
    BYTE = 0x1A
    SHL = 0x1B
    SHR = 0x1C
    SAR = 0x1D
    
    # 區塊鏈操作
    KECCAK256 = 0x20
    
    # 環境資訊
    ADDRESS = 0x30
    BALANCE = 0x31
    ORIGIN = 0x32
    CALLER = 0x33
    CALLVALUE = 0x34
    CALLDATALOAD = 0x35
    CALLDATASIZE = 0x36
    CALLDATACOPY = 0x37
    CODESIZE = 0x38
    CODECOPY = 0x39
    GASPRICE = 0x3A
    EXTCODESIZE = 0x3B
    EXTCODECOPY = 0x3C
    
    # 棧操作
    PUSH1 = 0x60
    PUSH2 = 0x61
    # ... PUSH32
    POP = 0x50
    DUP1 = 0x80
    # ... DUP16
    SWAP1 = 0x90
    # ... SWAP16
    
    # 儲存操作
    MLOAD = 0x51
    MSTORE = 0x52
    MSTORE8 = 0x53
    
    # 日誌操作
    LOG0 = 0xA0
    LOG1 = 0xA1
    LOG2 = 0xA2
    LOG3 = 0xA3
    LOG4 = 0xA4
    
    # 控制流
    JUMP = 0x56
    JUMPI = 0x57
    PC = 0x58
    JUMPDEST = 0x5B
    
    # 合約操作
    CREATE = 0xF0
    CALL = 0xF1
    RETURN = 0xF3
    DELEGATECALL = 0xF4
    STATICCALL = 0xFA
    REVERT = 0xFD
    SELFDESTRUCT = 0xFF

@dataclass
class ExecutionContext:
    """執行上下文"""
    contract_address: bytes  # 當前合約地址
    caller: bytes             # 呼叫者地址
    call_value: int          # 呼叫時傳送的 ETH 數量
    code: bytes              # 合約位元組碼
    calldata: bytes          # 輸入資料
    gas: int                 # 可用 Gas
    program_counter: int     # 程式計數器
    stack: List[int]         # 運算棧
    memory: bytearray        # 記憶體
    storage: Dict[bytes, bytes]  # 儲存(合約狀態)
    logs: List[tuple]        # 日誌
    
    def __post_init__(self):
        self.stack = []
        self.memory = bytearray()
        self.logs = []

class SimplifiedEVM:
    """
    簡化版 EVM 實現
    
    本實現省略了許多細節:
    - 完整的 Gas 計算
    - 位元組碼優化
    - 完整的錯誤處理
    - 區塊上下文
    
    專注於核心執行邏輯的演示
    """
    
    def __init__(self):
        self.context: Optional[ExecutionContext] = None
        self.gas_costs: Dict[int, int] = self._init_gas_costs()
    
    def _init_gas_costs(self) -> Dict[int, int]:
        """初始化 Gas 成本表"""
        return {
            OpCode.STOP: 0,
            OpCode.ADD: 3,
            OpCode.MUL: 5,
            OpCode.SUB: 3,
            OpCode.DIV: 5,
            OpCode.SDIV: 5,
            OpCode.MOD: 5,
            OpCode.SMOD: 5,
            OpCode.ADDMOD: 8,
            OpCode.MULMOD: 8,
            OpCode.EXP: 10,  # 基礎成本
            OpCode.LT: 3,
            OpCode.GT: 3,
            OpCode.SLT: 3,
            OpCode.SGT: 3,
            OpCode.EQ: 3,
            OpCode.ISZERO: 3,
            OpCode.AND: 3,
            OpCode.OR: 3,
            OpCode.XOR: 3,
            OpCode.NOT: 3,
            OpCode.BYTE: 3,
            OpCode.SHL: 3,
            OpCode.SHR: 3,
            OpCode.SAR: 3,
            OpCode.KECCAK256: 30,  # 基礎成本
            OpCode.ADDRESS: 2,
            OpCode.BALANCE: 100,
            OpCode.ORIGIN: 2,
            OpCode.CALLER: 2,
            OpCode.CALLVALUE: 2,
            OpCode.CALLDATALOAD: 3,
            OpCode.CALLDATASIZE: 2,
            OpCode.CALLDATACOPY: 3,
            OpCode.CODESIZE: 2,
            OpCode.CODECOPY: 3,
            OpCode.GASPRICE: 2,
            OpCode.EXTCODESIZE: 100,
            OpCode.EXTCODECOPY: 100,
            OpCode.POP: 2,
            OpCode.MLOAD: 3,
            OpCode.MSTORE: 3,
            OpCode.MSTORE8: 3,
            OpCode.JUMP: 8,
            OpCode.JUMPI: 10,
            OpCode.PC: 2,
            OpCode.JUMPDEST: 1,
        }
    
    def execute(self, code: bytes, calldata: bytes = b'', 
                gas: int = 10000000) -> tuple:
        """
        執行合約位元組碼
        
        返回: (success, return_data, gas_used)
        """
        self.context = ExecutionContext(
            contract_address=b'\x00' * 20,
            caller=b'\x00' * 20,
            call_value=0,
            code=code,
            calldata=calldata,
            gas=gas,
            program_counter=0,
            code=code,
            calldata=calldata,
            gas=gas,
            program_counter=0,
            stack=[],
            memory=bytearray(),
            storage={},
            logs=[]
        )
        
        try:
            while self.context.program_counter < len(self.context.code):
                op = self.context.code[self.context.program_counter]
                
                # 檢查 Gas
                gas_cost = self.gas_costs.get(op, 0)
                if self.context.gas < gas_cost:
                    return (False, b'', gas - self.context.gas)
                
                self.context.gas -= gas_cost
                
                # 執行指令
                success = self._execute_op(op)
                if not success:
                    return (False, b'', gas - self.context.gas)
                
                # PUSH 指令需要特殊處理(推進指標)
                if OpCode.PUSH1 <= op <= OpCode.PUSH2:
                    pass  # 在 _execute_op 中處理
                    
            return (True, b'', gas - self.context.gas)
            
        except Exception as e:
            return (False, str(e).encode(), gas - self.context.gas)
    
    def _execute_op(self, op: int) -> bool:
        """執行單一操作碼"""
        ctx = self.context
        
        # 棧操作
        if op == OpCode.POP:
            if len(ctx.stack) < 1:
                return False
            ctx.stack.pop()
            ctx.program_counter += 1
            
        # DUP 操作
        elif OpCode.DUP1 <= op <= OpCode.DUP16:
            n = op - OpCode.DUP1 + 1
            if len(ctx.stack) < n:
                return False
            ctx.stack.append(ctx.stack[-n])
            ctx.program_counter += 1
            
        # SWAP 操作
        elif OpCode.SWAP1 <= op <= OpCode.SWAP16:
            n = op - OpCode.SWAP1 + 1
            if len(ctx.stack) < n + 1:
                return False
            ctx.stack[-1], ctx.stack[-n-1] = ctx.stack[-n-1], ctx.stack[-1]
            ctx.program_counter += 1
            
        # PUSH 操作
        elif OpCode.PUSH1 <= op <= OpCode.PUSH2:
            n = op - OpCode.PUSH1 + 1
            if ctx.program_counter + n >= len(ctx.code):
                return False
            value = int.from_bytes(
                ctx.code[ctx.program_counter + 1:ctx.program_counter + 1 + n],
                'big'
            )
            ctx.stack.append(value)
            ctx.program_counter += 1 + n
            
        # 算術操作
        elif op == OpCode.ADD:
            if len(ctx.stack) < 2:
                return False
            a, b = ctx.stack.pop(), ctx.stack.pop()
            ctx.stack.append((a + b) % (2**256))
            ctx.program_counter += 1
            
        elif op == OpCode.MUL:
            if len(ctx.stack) < 2:
                return False
            a, b = ctx.stack.pop(), ctx.stack.pop()
            ctx.stack.append((a * b) % (2**256))
            ctx.program_counter += 1
            
        elif op == OpCode.SUB:
            if len(ctx.stack) < 2:
                return False
            a, b = ctx.stack.pop(), ctx.stack.pop()
            ctx.stack.append((a - b) % (2**256))
            ctx.program_counter += 1
            
        elif op == OpCode.DIV:
            if len(ctx.stack) < 2:
                return False
            a, b = ctx.stack.pop(), ctx.stack.pop()
            if b == 0:
                ctx.stack.append(0)
            else:
                ctx.stack.append(a // b)
            ctx.program_counter += 1
            
        # 比較操作
        elif op == OpCode.LT:
            if len(ctx.stack) < 2:
                return False
            a, b = ctx.stack.pop(), ctx.stack.pop()
            ctx.stack.append(1 if a < b else 0)
            ctx.program_counter += 1
            
        elif op == OpCode.EQ:
            if len(ctx.stack) < 2:
                return False
            a, b = ctx.stack.pop(), ctx.stack.pop()
            ctx.stack.append(1 if a == b else 0)
            ctx.program_counter += 1
            
        elif op == OpCode.ISZERO:
            if len(ctx.stack) < 1:
                return False
            ctx.stack.append(1 if ctx.stack.pop() == 0 else 0)
            ctx.program_counter += 1
            
        # 位運算
        elif op == OpCode.AND:
            if len(ctx.stack) < 2:
                return False
            a, b = ctx.stack.pop(), ctx.stack.pop()
            ctx.stack.append(a & b)
            ctx.program_counter += 1
            
        elif op == OpCode.OR:
            if len(ctx.stack) < 2:
                return False
            a, b = ctx.stack.pop(), ctx.stack.pop()
            ctx.stack.append(a | b)
            ctx.program_counter += 1
            
        elif op == OpCode.XOR:
            if len(ctx.stack) < 2:
                return False
            a, b = ctx.stack.pop(), ctx.stack.pop()
            ctx.stack.append(a ^ b)
            ctx.program_counter += 1
            
        # 記憶體操作
        elif op == OpCode.MSTORE:
            if len(ctx.stack) < 2:
                return False
            offset, value = ctx.stack.pop(), ctx.stack.pop()
            self._mstore(offset, value)
            ctx.program_counter += 1
            
        elif op == OpCode.MLOAD:
            if len(ctx.stack) < 1:
                return False
            offset = ctx.stack.pop()
            value = self._mload(offset)
            ctx.stack.append(value)
            ctx.program_counter += 1
            
        # 控制流
        elif op == OpCode.JUMP:
            if len(ctx.stack) < 1:
                return False
            dest = ctx.stack.pop()
            if ctx.code[dest:dest+1] != bytes([OpCode.JUMPDEST]):
                return False
            ctx.program_counter = dest
            
        elif op == OpCode.JUMPI:
            if len(ctx.stack) < 2:
                return False
            dest, cond = ctx.stack.pop(), ctx.stack.pop()
            if cond != 0:
                if ctx.code[dest:dest+1] != bytes([OpCode.JUMPDEST]):
                    return False
                ctx.program_counter = dest
            else:
                ctx.program_counter += 1
                
        elif op == OpCode.JUMPDEST:
            ctx.program_counter += 1
            
        elif op == OpCode.PC:
            ctx.stack.append(ctx.program_counter)
            ctx.program_counter += 1
            
        # Calldata 操作
        elif op == OpCode.CALLDATALOAD:
            if len(ctx.stack) < 1:
                return False
            offset = ctx.stack.pop()
            if offset + 32 <= len(ctx.calldata):
                value = int.from_bytes(ctx.calldata[offset:offset+32], 'big')
            else:
                value = 0
            ctx.stack.append(value)
            ctx.program_counter += 1
            
        elif op == OpCode.CALLDATASIZE:
            ctx.stack.append(len(ctx.calldata))
            ctx.program_counter += 1
            
        # 環境操作
        elif op == OpCode.ADDRESS:
            ctx.stack.append(int.from_bytes(ctx.contract_address, 'big'))
            ctx.program_counter += 1
            
        elif op == OpCode.CALLER:
            ctx.stack.append(int.from_bytes(ctx.caller, 'big'))
            ctx.program_counter += 1
            
        elif op == OpCode.CALLVALUE:
            ctx.stack.append(ctx.call_value)
            ctx.program_counter += 1
            
        # 區塊鏈操作
        elif op == OpCode.KECCAK256:
            if len(ctx.stack) < 2:
                return False
            offset, size = ctx.stack.pop(), ctx.stack.pop()
            data = bytes(ctx.memory[offset:offset+size])
            from web3 import Web3
            keccak_hash = Web3.keccak(data)
            ctx.stack.append(int.from_bytes(keccak_hash, 'big'))
            ctx.program_counter += 1
            
        # 終止操作
        elif op == OpCode.STOP:
            return True
            
        elif op == OpCode.RETURN:
            if len(ctx.stack) < 2:
                return False
            offset, size = ctx.stack.pop(), ctx.stack.pop()
            # 返回記憶體中的資料
            return True
            
        elif op == OpCode.REVERT:
            return False
            
        else:
            ctx.program_counter += 1
            
        return True
    
    def _mstore(self, offset: int, value: int):
        """將值存入記憶體"""
        # 擴展記憶體(如需要)
        required = offset + 32
        if len(self.context.memory) < required:
            self.context.memory.extend(b'\x00' * (required - len(self.context.memory)))
        # 存入 32 位元組值
        self.context.memory[offset:offset+32] = value.to_bytes(32, 'big')
    
    def _mload(self, offset: int) -> int:
        """從記憶體載入值"""
        if len(self.context.memory) < offset + 32:
            return 0
        return int.from_bytes(self.context.memory[offset:offset+32], 'big')

1.2 實作簡易算術合約

讓我們用此 EVM 執行一個簡單的算術合約。以下是對應的位元組碼:

def create_arithmetic_bytecode() -> bytes:
    """
    建立一個簡單算術合約的位元組碼
    
    功能:計算 (a + b) * c
    
    堆疊布局(輸入):
    - [c, b, a, ...]
    
    堆疊布局(輸出):
    - [result, ...]
    
    操作:
    PUSH1 a      # 0x60 a
    PUSH1 b      # 0x60 b
    ADD          # 0x01
    PUSH1 c      # 0x60 c
    MUL          # 0x02
    """
    return bytes([
        0x60, 0x05,  # PUSH1 5    // 載入 a = 5
        0x60, 0x03,  # PUSH1 3    // 載入 b = 3
        0x01,        # ADD        // a + b = 8
        0x60, 0x02,  # PUSH1 2    // 載入 c = 2
        0x02,        # MUL        // (a + b) * c = 16
        0x00         # STOP
    ])

# 測試執行
evm = SimplifiedEVM()
success, result, gas_used = evm.execute(create_arithmetic_bytecode())
print(f"執行成功: {success}")
print(f"Gas 使用: {gas_used}")
print(f"棧頂值: {evm.context.stack[-1] if evm.context.stack else 'N/A'}")

第二章:實現完整的 Merkle Patricia Trie

2.1MPT 的理論基礎

Merkle Patricia Trie(MPT)是以太坊用於組織世界狀態的資料結構。其設計結合了三種資料結構的優點:

  1. Patricia Trie(前綴樹):共享具有共同前綴的鍵,提高空間效率
  2. Merkle Tree(默克爾樹):提供高效的路徑驗證與資料完整性保證
  3. 鍵值儲存:支援狀態的 CRUD 操作

MPT 中的節點有三種類型:

from typing import Optional, Dict, List
import hashlib
import rlp

class NodeType(Enum):
    """MPT 節點類型"""
    EMPTY = 0
    BRANCH = 1      # 分支節點:16 個子節點 + 值
    EXTENSION = 2   # 擴展節點:共享前綴 + 下一個節點
    LEAF = 3        # 葉子節點:剩餘前綴 + 值

@dataclass
class MPTNode:
    """MPT 節點基類"""
    rlp_encoded: bytes  # RLP 編碼後的內容
    
    @property
    def hash(self) -> bytes:
        """計算節點的 Keccak-256 哈希"""
        return hashlib.new('keccak256', self.rlp_encoded).digest()

@dataclass
class BranchNode(MPTNode):
    """分支節點"""
    children: List[Optional[bytes]]  # 16 個子節點的哈希(0-F)
    value: Optional[bytes]           # 葉子節點值
    
    def to_rlp(self) -> bytes:
        """RLP 編碼分支節點"""
        data = self.children.copy()
        data.append(self.value if self.value else b'')
        return rlp.encode(data)
    
    def __init__(self):
        self.children = [None] * 16
        self.value = None

@dataclass
class ExtensionNode(MPTNode):
    """擴展節點"""
    shared_nibble: List[int]  # 共享前綴(半位元組陣列)
    child_hash: bytes         # 子節點哈希
    
    def to_rlp(self) -> bytes:
        """RLP 編碼擴展節點"""
        # 標記前綴:0x00(偶數長度葉子)或 0x01(奇數長度葉子)
        # 加上 2 表示擴展節點
        prefix = self._nibbles_to_hex_prefix(self.shared_nibble, is_leaf=False)
        nibbles = bytes(self.shared_nibble)
        return rlp.encode([prefix + nibbles, self.child_hash])
    
    def _nibbles_to_hex_prefix(self, nibbles: List[int], is_leaf: bool) -> bytes:
        """將半位元組轉換為十六進制前綴"""
        hex_str = ''.join(hex(n)[2] for n in nibbles)
        if is_leaf:
            prefix = 0x20 if len(nibbles) % 2 == 0 else 0x30
        else:
            prefix = 0x00 if len(nibbles) % 2 == 0 else 0x10
        return bytes([prefix + int(hex_str[:1], 16)])
    
    def __init__(self):
        self.shared_nibble = []
        self.child_hash = b''

@dataclass
class LeafNode(MPTNode):
    """葉子節點"""
    remaining_nibble: List[int]  # 剩餘前綴
    value: bytes                  # 節點值
    
    def to_rlp(self) -> bytes:
        """RLP 編碼葉子節點"""
        prefix = self._nibbles_to_hex_prefix(self.remaining_nibble, is_leaf=True)
        nibbles = bytes(self.remaining_nibble)
        return rlp.encode([prefix + nibbles, self.value])
    
    def _nibbles_to_hex_prefix(self, nibbles: List[int], is_leaf: bool) -> bytes:
        """將半位元組轉換為十六進制前綴"""
        hex_str = ''.join(hex(n)[2] for n in nibbles)
        if is_leaf:
            prefix = 0x20 if len(nibbles) % 2 == 0 else 0x30
        else:
            prefix = 0x00 if len(nibbles) % 2 == 0 else 0x10
        return bytes([prefix + int(hex_str[:1], 16)])
    
    def __init__(self):
        self.remaining_nibble = []
        self.value = b''

2.2 MPT 的完整實現

class MerklePatriciaTrie:
    """
    Merkle Patricia Trie 實現
    
    用於以太坊世界狀態的鍵值儲存:
    - 鍵:帳戶地址(160 位元組 → 40 個十六進制字元)
    - 值:帳戶狀態(RLP 編碼)
    
    特性:
    - 路徑壓縮(共享前綴)
    - Merkle 哈希驗證
    - 高效的更新與查詢
    """
    
    def __init__(self, db: Optional[Dict[bytes, bytes]] = None):
        """
        初始化 MPT
        
        參數:
        - db: 鍵值儲存(用於持久化節點)
        """
        self.db = db if db else {}  # 節點資料庫
        self.root: bytes = b''       # 根節點哈希
        self.cache: Dict[bytes, MPTNode] = {}  # 內存緩存
    
    def _hex_to_nibbles(self, key: str) -> List[int]:
        """
        將十六進制字元串轉換為半位元組陣列
        
        範例:'0x1234' → [1, 2, 3, 4]
        """
        hex_key = key.lower()
        if hex_key.startswith('0x'):
            hex_key = hex_key[2:]
        return [int(c, 16) for c in hex_key]
    
    def _nibbles_to_hex(self, nibbles: List[int]) -> str:
        """半位元組陣列轉換為十六進制字元串"""
        return ''.join(hex(n)[2] for n in nibbles)
    
    def get(self, key: str) -> Optional[bytes]:
        """
        根據鍵查詢值
        
        參數:
        - key: 查詢鍵(十六進制字元串,如 '0x1234...')
        
        返回:
        - 值(位元組),若不存在返回 None
        """
        if not self.root:
            return None
        
        key_nibbles = self._hex_to_nibbles(key)
        node = self._get_node(self.root)
        
        return self._traverse_and_get(node, key_nibbles)
    
    def _traverse_and_get(self, node: Optional[bytes], 
                          key_nibbles: List[int]) -> Optional[bytes]:
        """遞迴遍歷並獲取值"""
        if not node:
            return None
        
        decoded = self._decode_node(node)
        
        if isinstance(decoded, LeafNode):
            # 葉子節點:比較剩餘前綴
            if decoded.remaining_nibble == key_nibbles:
                return decoded.value
            return None
        
        elif isinstance(decoded, ExtensionNode):
            # 擴展節點:匹配前綴後繼續
            prefix_len = len(decoded.shared_nibble)
            if key_nibbles[:prefix_len] == decoded.shared_nibble:
                return self._traverse_and_get(
                    self._get_node(decoded.child_hash),
                    key_nibbles[prefix_len:]
                )
            return None
        
        elif isinstance(decoded, BranchNode):
            # 分支節點:根據下一個半位元組選擇分支
            if not key_nibbles:
                return decoded.value
            
            next_nibble = key_nibbles[0]
            return self._traverse_and_get(
                self._get_node(decoded.children[next_nibble]),
                key_nibbles[1:]
            )
        
        return None
    
    def put(self, key: str, value: bytes) -> None:
        """
        插入或更新鍵值對
        
        參數:
        - key: 鍵(十六進制字元串)
        - value: 值(位元組)
        """
        key_nibbles = self._hex_to_nibbles(key)
        
        if not self.root:
            # 空樹:創建葉子節點
            leaf = LeafNode()
            leaf.remaining_nibble = key_nibbles
            leaf.value = value
            leaf.rlp_encoded = leaf.to_rlp()
            self._put_node(leaf)
            self.root = leaf.hash
            return
        
        # 遍歷並更新
        old_root = self.root
        self.root = self._update_at(
            self.root, key_nibbles, key_nibbles, value
        )
    
    def _update_at(self, node_hash: bytes, key_nibbles: List[int],
                   full_key: List[int], value: bytes) -> bytes:
        """在指定位置更新節點"""
        if not node_hash:
            # 創建新葉子節點
            leaf = LeafNode()
            leaf.remaining_nibble = key_nibbles
            leaf.value = value
            leaf.rlp_encoded = leaf.to_rlp()
            self._put_node(leaf)
            return leaf.hash
        
        node = self._decode_node(node_hash)
        
        if isinstance(node, LeafNode):
            # 葉子節點:需要分裂
            return self._update_leaf_split(node, key_nibbles, full_key, value)
        
        elif isinstance(node, ExtensionNode):
            # 擴展節點:匹配前綴
            return self._update_extension(node, key_nibbles, full_key, value)
        
        elif isinstance(node, BranchNode):
            # 分支節點:遞迴更新子分支
            return self._update_branch(node, key_nibbles, full_key, value)
        
        return node_hash
    
    def _update_leaf_split(self, old_leaf: LeafNode, key_nibbles: List[int],
                           full_key: List[int], value: bytes) -> bytes:
        """葉子節點分裂處理"""
        # 找出共同前綴
        common_prefix = self._find_common_prefix(
            old_leaf.remaining_nibble, key_nibbles
        )
        
        if len(common_prefix) == len(old_leaf.remaining_nibble):
            # 新鍵完全包含舊前綴
            if not key_nibbles[len(common_prefix):]:
                # 舊葉子作為新分支的值
                leaf = LeafNode()
                leaf.remaining_nibble = old_leaf.remaining_nibble[len(common_prefix):]
                leaf.value = old_leaf.value
                leaf.rlp_encoded = leaf.to_rlp()
                new_leaf_hash = self._put_node(leaf)
                
                branch = BranchNode()
                branch.value = value
                branch.children[full_key[len(common_prefix)]] = new_leaf_hash
                branch.rlp_encoded = branch.to_rlp()
                return self._put_node(branch)
            else:
                # 需要遞迴到分支
                pass
        else:
            # 需要創建擴展節點
            pass
        
        # 簡化處理:直接替換(實際實現需要更複雜的分裂邏輯)
        leaf = LeafNode()
        leaf.remaining_nibble = key_nibbles
        leaf.value = value
        leaf.rlp_encoded = leaf.to_rlp()
        return self._put_node(leaf)
    
    def _find_common_prefix(self, nibbles1: List[int], 
                            nibbles2: List[int]) -> List[int]:
        """找出兩個半位元組陣列的共同前綴"""
        common = []
        for n1, n2 in zip(nibbles1, nibbles2):
            if n1 == n2:
                common.append(n1)
            else:
                break
        return common
    
    def _update_extension(self, ext: ExtensionNode, key_nibbles: List[int],
                          full_key: List[int], value: bytes) -> bytes:
        """擴展節點更新"""
        prefix_len = len(ext.shared_nibble)
        
        if key_nibbles[:prefix_len] != ext.shared_nibble:
            # 前綴不匹配,需要分裂
            common = self._find_common_prefix(ext.shared_nibble, key_nibbles)
            
            # 創建新分支
            branch = BranchNode()
            
            # 剩餘的舊擴展
            old_ext_remains = ext.shared_nibble[len(common):]
            if old_ext_remains:
                old_leaf = LeafNode()
                old_leaf.remaining_nibble = old_ext_remains
                old_leaf.value = ext.value if hasattr(ext, 'value') else b''
                old_leaf.rlp_encoded = old_leaf.to_rlp()
                old_hash = self._put_node(old_leaf)
            else:
                old_hash = ext.child_hash
            
            # 剩餘的新鍵
            new_key_remains = key_nibbles[len(common):]
            new_leaf = LeafNode()
            new_leaf.remaining_nibble = new_key_remains
            new_leaf.value = value
            new_leaf.rlp_encoded = new_leaf.to_rlp()
            new_hash = self._put_node(new_leaf)
            
            # 填充分支
            if new_key_remains:
                branch.children[new_key_remains[0]] = new_hash
            if old_ext_remains:
                branch.children[old_ext_remains[0]] = old_hash
            
            branch.rlp_encoded = branch.to_rlp()
            branch_hash = self._put_node(branch)
            
            # 創建新擴展
            new_ext = ExtensionNode()
            new_ext.shared_nibble = common
            new_ext.child_hash = branch_hash
            new_ext.rlp_encoded = new_ext.to_rlp()
            return self._put_node(new_ext)
        
        # 前綴匹配,遞迴更新
        new_child_hash = self._update_at(
            ext.child_hash, 
            key_nibbles[prefix_len:], 
            full_key,
            value
        )
        
        ext.child_hash = new_child_hash
        ext.rlp_encoded = ext.to_rlp()
        return self._put_node(ext)
    
    def _update_branch(self, branch: BranchNode, key_nibbles: List[int],
                       full_key: List[int], value: bytes) -> bytes:
        """分支節點更新"""
        if not key_nibbles:
            # 鍵完全匹配,在分支節點儲存值
            branch.value = value
        else:
            next_nibble = key_nibbles[0]
            new_child_hash = self._update_at(
                self._get_node(branch.children[next_nibble]),
                key_nibbles[1:],
                full_key,
                value
            )
            branch.children[next_nibble] = new_child_hash
        
        branch.rlp_encoded = branch.to_rlp()
        return self._put_node(branch)
    
    def _get_node(self, node_hash: bytes) -> Optional[bytes]:
        """從資料庫或緩存獲取節點"""
        if not node_hash:
            return None
        if node_hash in self.cache:
            return node_hash
        return self.db.get(node_hash)
    
    def _put_node(self, node: MPTNode) -> bytes:
        """將節點存入資料庫"""
        node_hash = node.hash
        if node_hash not in self.db:
            self.db[node_hash] = node.rlp_encoded
        self.cache[node_hash] = node
        return node_hash
    
    def _decode_node(self, node_data: bytes) -> MPTNode:
        """解碼 RLP 資料為 MPT 節點"""
        decoded = rlp.decode(node_data)
        
        if not decoded:
            return BranchNode()
        
        if isinstance(decoded, list):
            if len(decoded) == 17:
                # 分支節點
                branch = BranchNode()
                branch.children = [
                    child if child else None 
                    for child in decoded[:16]
                ]
                branch.value = decoded[16] if decoded[16] else None
                branch.rlp_encoded = node_data
                return branch
            elif len(decoded) == 2:
                first = bytes(decoded[0])
                second = bytes(decoded[1])
                
                # 解析前綴
                first_byte = first[0]
                
                if first_byte >= 0x20:
                    # 葉子節點
                    leaf = LeafNode()
                    nibbles = self._decode_nibbles_from_prefix(first)
                    leaf.remaining_nibble = nibbles + list(first[1:])
                    leaf.value = second
                    leaf.rlp_encoded = node_data
                    return leaf
                else:
                    # 擴展節點
                    ext = ExtensionNode()
                    nibbles = self._decode_nibbles_from_prefix(first)
                    ext.shared_nibble = nibbles + list(first[1:])
                    ext.child_hash = second
                    ext.rlp_encoded = node_data
                    return ext
        
        # 回退:當作分支節點
        return BranchNode()
    
    def _decode_nibbles_from_prefix(self, prefix: bytes) -> List[int]:
        """從前綴位元組解碼半位元組"""
        return []
    
    def get_proof(self, key: str) -> tuple:
        """
        獲取指定鍵的 Merkle 證明
        
        返回:(值, 證明路徑上的節點列表)
        """
        if not self.root:
            return (None, [])
        
        key_nibbles = self._hex_to_nibbles(key)
        proof = []
        node = self._get_node(self.root)
        
        while node:
            proof.append(node)
            decoded = self._decode_node(node)
            
            if isinstance(decoded, LeafNode):
                if decoded.remaining_nibble == key_nibbles:
                    return (decoded.value, proof)
                return (None, proof)
            
            elif isinstance(decoded, ExtensionNode):
                prefix_len = len(decoded.shared_nibble)
                if key_nibbles[:prefix_len] != decoded.shared_nibble:
                    return (None, proof)
                key_nibbles = key_nibbles[prefix_len:]
                node = self._get_node(decoded.child_hash)
            
            elif isinstance(decoded, BranchNode):
                if not key_nibbles:
                    return (decoded.value, proof)
                next_nibble = key_nibbles[0]
                node = self._get_node(decoded.children[next_nibble])
                key_nibbles = key_nibbles[1:]
            
            else:
                break
        
        return (None, proof)
    
    def verify_proof(self, root: bytes, key: str, 
                    proof: List[bytes], value: bytes) -> bool:
        """
        驗證 Merkle 證明
        
        參數:
        - root: 根節點哈希
        - key: 查詢鍵
        - proof: 證明路徑
        - value: 聲稱的值
        
        返回:驗證是否通過
        """
        if not proof:
            return value is None
        
        # 重新計算根哈希
        current_hash = self._compute_leaf_hash(key, value)
        
        for node in reversed(proof[:-1]):
            current_hash = self._compute_internal_hash(node)
        
        return current_hash == root
    
    def _compute_leaf_hash(self, key: str, value: bytes) -> bytes:
        """計算葉子節點的哈希"""
        if value is None:
            return b''
        
        key_nibbles = self._hex_to_nibbles(key)
        leaf = LeafNode()
        leaf.remaining_nibble = key_nibbles
        leaf.value = value
        leaf.rlp_encoded = leaf.to_rlp()
        return leaf.hash
    
    def _compute_internal_hash(self, node_data: bytes) -> bytes:
        """計算內部節點的哈希"""
        return hashlib.new('keccak256', node_data).digest()

2.3 MPT 的實際應用

# 創建並使用 MPT
trie = MerklePatriciaTrie()

# 插入以太坊風格的帳戶狀態
accounts = {
    '0x' + '11' * 20: b'\x01',  # 帳戶 1:非零 nonce
    '0x' + '22' * 20: b'\x02',  # 帳戶 2
    '0x' + '33' * 20: b'\x03',  # 帳戶 3
}

for address, state in accounts.items():
    trie.put(address, state)
    print(f"插入: {address} -> {state.hex()}")
    print(f"根哈希: {trie.root.hex()}")

# 查詢
value = trie.get('0x' + '22' * 20)
print(f"查詢 0x22...: {value.hex() if value else 'Not Found'}")

# 生成並驗證證明
proof_value, proof_nodes = trie.get_proof('0x' + '22' * 20)
print(f"證明長度: {len(proof_nodes)}")

第三章:實現簡易共識機制

3.1 工作量證明的模擬實現

讓我們實現一個簡化版的工作量證明(Proof of Work)共識機制:

import time
import hashlib
from dataclasses import dataclass, field
from typing import List, Optional
from concurrent.futures import ThreadPoolExecutor

@dataclass
class Block:
    """區塊結構"""
    index: int
    timestamp: int
    transactions: List[dict]
    previous_hash: bytes
    nonce: int = 0
    difficulty: int = 4  # 前導零位数
    hash: bytes = b''
    
    def __post_init__(self):
        if not self.hash:
            self.hash = self.compute_hash()
    
    def compute_hash(self) -> bytes:
        """計算區塊哈希"""
        data = (
            str(self.index).encode() +
            str(self.timestamp).encode() +
            str(self.transactions).encode() +
            self.previous_hash +
            str(self.nonce).encode()
        )
        return hashlib.sha256(data).digest()
    
    def is_valid(self) -> bool:
        """驗證區塊有效性"""
        return (
            self.hash[:self.difficulty] == b'\x00' * self.difficulty and
            self.hash == self.compute_hash()
        )

@dataclass
class Miner:
    """礦工"""
    id: str
    hash_power: float  # 相對算力(0-1)
    
    def mine_block(self, block: Block, target_difficulty: int) -> Optional[int]:
        """
        嘗試挖礦
        
        返回找到的有效 nonce,若失敗返回 None
        """
        target = b'\x00' * target_difficulty
        nonce = 0
        max_attempts = int(2 ** (256 - target_difficulty * 4))  # 粗略估算
        
        # 根據算力決定搜索範圍
        attempts_per_round = int(max_attempts * self.hash_power)
        
        for _ in range(attempts_per_round):
            test_hash = self._hash_with_nonce(block, nonce)
            if test_hash[:target_difficulty] == target:
                return nonce
            nonce += 1
        
        return None
    
    def _hash_with_nonce(self, block: Block, nonce: int) -> bytes:
        """計算帶 nonce 的哈希"""
        data = (
            str(block.index).encode() +
            str(block.timestamp).encode() +
            str(block.transactions).encode() +
            block.previous_hash +
            str(nonce).encode()
        )
        return hashlib.sha256(data).digest()

class ProofOfWorkConsensus:
    """
    簡化版工作量證明共識系統
    
    演示:
    - 區塊構造
    - 礦工競爭
    - 區塊驗證
    - 最長鏈選擇
    """
    
    def __init__(self, difficulty: int = 4):
        self.chain: List[Block] = []
        self.pending_transactions: List[dict] = []
        self.difficulty = difficulty
        self.miners: List[Miner] = []
        self.reward_address = b'\x00' * 20
        
        # 創建創世區塊
        self.create_genesis_block()
    
    def create_genesis_block(self) -> Block:
        """創建創世區塊"""
        genesis = Block(
            index=0,
            timestamp=int(time.time()),
            transactions=[],
            previous_hash=b'\x00' * 32,
            nonce=0
        )
        genesis.hash = self._mine_with_difficulty(genesis, self.difficulty)
        self.chain.append(genesis)
        return genesis
    
    def _mine_with_difficulty(self, block: Block, difficulty: int) -> bytes:
        """挖掘區塊直到找到有效哈希"""
        target = b'\x00' * difficulty
        nonce = 0
        
        while True:
            test_hash = hashlib.sha256((
                str(block.index).encode() +
                str(block.timestamp).encode() +
                str(block.transactions).encode() +
                block.previous_hash +
                str(nonce).encode()
            )).digest()
            
            if test_hash[:difficulty] == target:
                block.nonce = nonce
                return test_hash
            nonce += 1
    
    def add_transaction(self, sender: str, recipient: str, amount: float):
        """添加交易到待確認池"""
        self.pending_transactions.append({
            'sender': sender,
            'recipient': recipient,
            'amount': amount,
            'timestamp': time.time()
        })
    
    def create_block(self) -> Block:
        """創建新區塊"""
        last_block = self.chain[-1]
        new_block = Block(
            index=last_block.index + 1,
            timestamp=int(time.time()),
            transactions=self.pending_transactions.copy(),
            previous_hash=last_block.hash,
            difficulty=self.difficulty
        )
        return new_block
    
    def mine_block_parallel(self, miners: List[Miner]) -> tuple:
        """
        並行挖掘
        
        返回:(成功挖礦的礦工, 區塊, nonce)
        """
        block = self.create_block()
        
        with ThreadPoolExecutor(max_workers=len(miners)) as executor:
            futures = {
                executor.submit(miner.mine_block, block, self.difficulty): miner
                for miner in miners
            }
            
            for future in futures:
                miner = futures[future]
                result = future.result()
                if result is not None:
                    block.nonce = result
                    block.hash = hashlib.sha256((
                        str(block.index).encode() +
                        str(block.timestamp).encode() +
                        str(block.transactions).encode() +
                        block.previous_hash +
                        str(block.nonce).encode()
                    )).digest()
                    return (miner, block, result)
        
        return (None, block, None)
    
    def add_block(self, block: Block) -> bool:
        """將區塊添加到鏈"""
        last_block = self.chain[-1]
        
        # 驗證區塊
        if block.previous_hash != last_block.hash:
            return False
        if not block.is_valid():
            return False
        
        self.chain.append(block)
        self.pending_transactions = []
        return True
    
    def is_chain_valid(self) -> bool:
        """驗證整條鏈的有效性"""
        for i in range(1, len(self.chain)):
            current = self.chain[i]
            previous = self.chain[i - 1]
            
            if current.previous_hash != previous.hash:
                return False
            if not current.is_valid():
                return False
        
        return True
    
    def resolve_conflicts(self, other_chain: List[Block]) -> bool:
        """
        解決鏈衝突(最長鏈原則)
        
        返回:是否接受了新鏈
        """
        if len(other_chain) <= len(self.chain):
            return False
        
        # 驗證新鏈
        if not self._verify_chain(other_chain):
            return False
        
        self.chain = other_chain
        return True
    
    def _verify_chain(self, chain: List[Block]) -> bool:
        """驗證完整鏈"""
        if not chain:
            return False
        
        for i in range(1, len(chain)):
            if chain[i].previous_hash != chain[i - 1].hash:
                return False
            if not chain[i].is_valid():
                return False
        
        return True
    
    def get_balance(self, address: str) -> float:
        """計算地址餘額"""
        balance = 0.0
        
        for block in self.chain:
            for tx in block.transactions:
                if tx['recipient'] == address:
                    balance += tx['amount']
                if tx['sender'] == address:
                    balance -= tx['amount']
        
        return balance

3.2 質押證明的簡化模擬

@dataclass
class Validator:
    """驗證者"""
    address: bytes
    stake: int  # 質押數量
    is_active: bool = True
    slashed: bool = False

class SimplifiedProofOfStake:
    """
    簡化版權益證明共識
    
    演示:
    - 驗證者選擇
    - 區塊提議
    - 見證投票
    - 最終確定性
    """
    
    def __init__(self):
        self.validators: List[Validator] = []
        self.blocks: List[Block] = []
        self.pending_transactions: List[dict] = []
        self.current_epoch: int = 0
        self.checkpoint_interval: int = 32  # 每 32 個 slot 一個 checkpoint
        
        # 創建創世區塊
        self.create_genesis()
    
    def add_validator(self, address: bytes, stake: int):
        """添加驗證者"""
        validator = Validator(address=address, stake=stake)
        self.validators.append(validator)
    
    def _get_active_validators(self) -> List[Validator]:
        """獲取活躍驗證者列表"""
        return [v for v in self.validators if v.is_active and not v.slashed]
    
    def _select_proposer(self, slot: int) -> Validator:
        """根據質押量隨機選擇提議者"""
        active = self._get_active_validators()
        total_stake = sum(v.stake for v in active)
        
        # 簡化的偽隨機選擇
        import random
        seed = slot + self.current_epoch * 1000
        random.seed(seed)
        
        selected_stake = random.randint(0, total_stake)
        cumulative = 0
        
        for validator in active:
            cumulative += validator.stake
            if cumulative >= selected_stake:
                return validator
        
        return active[-1]
    
    def _create_block(self, proposer: Validator) -> Block:
        """創建區塊"""
        last_block = self.blocks[-1] if self.blocks else None
        
        block = Block(
            index=len(self.blocks),
            timestamp=int(time.time()),
            transactions=self.pending_transactions.copy(),
            previous_hash=last_block.hash if last_block else b'\x00' * 32,
            difficulty=0  # PoS 不需要工作量難度
        )
        
        # 設置 PoS 哈希(使用質押證明)
        block.hash = self._compute_pos_hash(block, proposer)
        
        return block
    
    def _compute_pos_hash(self, block: Block, proposer: Validator) -> bytes:
        """計算 PoS 區塊哈希"""
        data = (
            str(block.index).encode() +
            str(block.timestamp).encode() +
            str(block.transactions).encode() +
            block.previous_hash +
            proposer.address +
            str(proposer.stake).encode()
        )
        return hashlib.sha256(data).digest()
    
    def propose_block(self, slot: int) -> tuple:
        """提議區塊"""
        proposer = self._select_proposer(slot)
        block = self._create_block(proposer)
        
        return (proposer, block)
    
    def attest(self, block: Block, validator: Validator) -> dict:
        """驗證者見證區塊"""
        # 驗證區塊有效性
        is_valid = self._verify_block(block)
        
        attestation = {
            'validator': validator.address,
            'block_hash': block.hash,
            'slot': len(self.blocks),
            'is_valid': is_valid,
            'signature': self._sign_attestation(block, validator)
        }
        
        return attestation
    
    def _verify_block(self, block: Block) -> bool:
        """驗證區塊"""
        if len(self.blocks) > 0:
            last_block = self.blocks[-1]
            if block.previous_hash != last_block.hash:
                return False
        return True
    
    def _sign_attestation(self, block: Block, 
                          validator: Validator) -> bytes:
        """簽署見證"""
        data = block.hash + validator.address
        return hashlib.sha256(data).digest()
    
    def finalize_checkpoint(self, attestations: List[dict]) -> bool:
        """最終確定檢查點"""
        active = self._get_active_validators()
        
        # 計算有效見證數
        valid_attestations = sum(1 for a in attestations if a['is_valid'])
        attestation_rate = valid_attestations / len(active) if active else 0
        
        # 需要 2/3 以上的見證才能最終確定
        if attestation_rate >= 2/3:
            # 創建檢查點
            checkpoint = {
                'epoch': self.current_epoch,
                'block_hash': self.blocks[-1].hash if self.blocks else b'',
                'attestations': len(valid_attestations)
            }
            return True
        
        return False
    
    def slash_validator(self, validator: Validator, reason: str):
        """削減驗證者"""
        validator.slashed = True
        # 在實際系統中,這裡會處理質押品的沒收
        print(f"驗證者 {validator.address.hex()} 因 {reason} 被削減")

結論:實作學習的價值

透過親手實現這些核心組件,開發者將獲得:

  1. 深度理解:知道 EVM 如何執行位元組碼,理解狀態組織的原理
  2. 除錯能力:能夠診斷複雜的 Gas 優化與合約安全問題
  3. 設計視野:理解為何以太坊做出特定的設計選擇
  4. 創新基礎:在深刻理解現有系統的基礎上進行創新

這個學習路徑推薦給所有希望成為以太坊專家的開發者。從簡化實現開始,逐步添加複雜性,最終構建完整的功能原型——這是理解複雜系統最有效的方式。


參考資源

免責聲明

本網站內容僅供教育與資訊目的,不構成任何投資建議或推薦。在進行任何加密貨幣相關操作前,請自行研究並諮詢專業人士意見。所有投資均有風險,請謹慎評估您的風險承受能力。

延伸閱讀與來源

這篇文章對您有幫助嗎?

評論

發表評論

注意:由於這是靜態網站,您的評論將儲存在本地瀏覽器中,不會公開顯示。

目前尚無評論,成為第一個發表評論的人吧!