以太坊開發者學習路徑完整指南:從零建構簡化版 EVM、Merkle Patricia Trie 與共識機制實作
本文提供一條系統化的以太坊開發者學習路徑,透過實際編寫程式碼來理解以太坊的核心組件。讀者將從頭建構簡化版的以太坊虛擬機(EVM)、實現完整的 Merkle Patricia Trie,並模擬一個基本的工作量證明共識系統。涵蓋 EVM 指令集與執行模型、MPT 的前綴壓縮與哈希驗證、PoW 挖掘與驗證機制、質押證明的驗證者選擇與見證投票等核心主題。提供完整的 Python 程式碼範例與詳細解說,幫助開發者建立對以太坊底層機制的深度理解。
以太坊開發者學習路徑完整指南:從零建構簡化版 EVM、Merkle Patricia Trie 與共識機制實作
前言:為什麼要從底層理解以太坊
以太坊的複雜性常使開發者停留在高層抽象層面。使用 Hardhat 部署合約、呼叫 OpenZeppelin 庫、透過 ethers.js 發送交易——這些技能固然重要,但若不理解底層機制,開發者將難以診斷複雜問題、優化 Gas 消耗或設計安全的應用架構。
本文提供一條系統化的學習路徑,透過實際編寫程式碼來理解以太坊的核心組件:虛擬機器(Merkle Patricia Trie)、狀態組織(Merkle Patricia Trie)與共識機制。讀者將從頭建構簡化版的 EVM、實現完整的 Merkle Patricia Trie,並模擬一個基本的工作量證明共識系統。
第一章:建構簡化版以太坊虛擬機(EVM)
1.1 EVM 的基本架構
EVM 是一個棧式虛擬機(Stack-based Virtual Machine),其設計具有以下特點:
- 基於 256 位元詞(Word)的操作
- 指令集圖靈完整
- 確定性執行確保所有節點得到相同結果
- Gas 機制防止無限迴圈與資源濫用
讓我們從最基本的結構開始建構:
from dataclasses import dataclass
from typing import List, Dict, Optional, Callable
from enum import IntEnum
class OpCode(IntEnum):
"""EVM 操作碼定義"""
STOP = 0x00
ADD = 0x01
MUL = 0x02
SUB = 0x03
DIV = 0x04
SDIV = 0x05 # 帶符號除法
MOD = 0x06
SMOD = 0x07 # 帶符號取模
ADDMOD = 0x08
MULMOD = 0x09
EXP = 0x0A
SIGNEXTEND = 0x0B
# 比較與位運算
LT = 0x10
GT = 0x11
SLT = 0x12 # 帶符號小於
SGT = 0x13 # 帶符號大於
EQ = 0x14
ISZERO = 0x15
AND = 0x16
OR = 0x17
XOR = 0x18
NOT = 0x19
BYTE = 0x1A
SHL = 0x1B
SHR = 0x1C
SAR = 0x1D
# 區塊鏈操作
KECCAK256 = 0x20
# 環境資訊
ADDRESS = 0x30
BALANCE = 0x31
ORIGIN = 0x32
CALLER = 0x33
CALLVALUE = 0x34
CALLDATALOAD = 0x35
CALLDATASIZE = 0x36
CALLDATACOPY = 0x37
CODESIZE = 0x38
CODECOPY = 0x39
GASPRICE = 0x3A
EXTCODESIZE = 0x3B
EXTCODECOPY = 0x3C
# 棧操作
PUSH1 = 0x60
PUSH2 = 0x61
# ... PUSH32
POP = 0x50
DUP1 = 0x80
# ... DUP16
SWAP1 = 0x90
# ... SWAP16
# 儲存操作
MLOAD = 0x51
MSTORE = 0x52
MSTORE8 = 0x53
# 日誌操作
LOG0 = 0xA0
LOG1 = 0xA1
LOG2 = 0xA2
LOG3 = 0xA3
LOG4 = 0xA4
# 控制流
JUMP = 0x56
JUMPI = 0x57
PC = 0x58
JUMPDEST = 0x5B
# 合約操作
CREATE = 0xF0
CALL = 0xF1
RETURN = 0xF3
DELEGATECALL = 0xF4
STATICCALL = 0xFA
REVERT = 0xFD
SELFDESTRUCT = 0xFF
@dataclass
class ExecutionContext:
"""執行上下文"""
contract_address: bytes # 當前合約地址
caller: bytes # 呼叫者地址
call_value: int # 呼叫時傳送的 ETH 數量
code: bytes # 合約位元組碼
calldata: bytes # 輸入資料
gas: int # 可用 Gas
program_counter: int # 程式計數器
stack: List[int] # 運算棧
memory: bytearray # 記憶體
storage: Dict[bytes, bytes] # 儲存(合約狀態)
logs: List[tuple] # 日誌
def __post_init__(self):
self.stack = []
self.memory = bytearray()
self.logs = []
class SimplifiedEVM:
"""
簡化版 EVM 實現
本實現省略了許多細節:
- 完整的 Gas 計算
- 位元組碼優化
- 完整的錯誤處理
- 區塊上下文
專注於核心執行邏輯的演示
"""
def __init__(self):
self.context: Optional[ExecutionContext] = None
self.gas_costs: Dict[int, int] = self._init_gas_costs()
def _init_gas_costs(self) -> Dict[int, int]:
"""初始化 Gas 成本表"""
return {
OpCode.STOP: 0,
OpCode.ADD: 3,
OpCode.MUL: 5,
OpCode.SUB: 3,
OpCode.DIV: 5,
OpCode.SDIV: 5,
OpCode.MOD: 5,
OpCode.SMOD: 5,
OpCode.ADDMOD: 8,
OpCode.MULMOD: 8,
OpCode.EXP: 10, # 基礎成本
OpCode.LT: 3,
OpCode.GT: 3,
OpCode.SLT: 3,
OpCode.SGT: 3,
OpCode.EQ: 3,
OpCode.ISZERO: 3,
OpCode.AND: 3,
OpCode.OR: 3,
OpCode.XOR: 3,
OpCode.NOT: 3,
OpCode.BYTE: 3,
OpCode.SHL: 3,
OpCode.SHR: 3,
OpCode.SAR: 3,
OpCode.KECCAK256: 30, # 基礎成本
OpCode.ADDRESS: 2,
OpCode.BALANCE: 100,
OpCode.ORIGIN: 2,
OpCode.CALLER: 2,
OpCode.CALLVALUE: 2,
OpCode.CALLDATALOAD: 3,
OpCode.CALLDATASIZE: 2,
OpCode.CALLDATACOPY: 3,
OpCode.CODESIZE: 2,
OpCode.CODECOPY: 3,
OpCode.GASPRICE: 2,
OpCode.EXTCODESIZE: 100,
OpCode.EXTCODECOPY: 100,
OpCode.POP: 2,
OpCode.MLOAD: 3,
OpCode.MSTORE: 3,
OpCode.MSTORE8: 3,
OpCode.JUMP: 8,
OpCode.JUMPI: 10,
OpCode.PC: 2,
OpCode.JUMPDEST: 1,
}
def execute(self, code: bytes, calldata: bytes = b'',
gas: int = 10000000) -> tuple:
"""
執行合約位元組碼
返回: (success, return_data, gas_used)
"""
self.context = ExecutionContext(
contract_address=b'\x00' * 20,
caller=b'\x00' * 20,
call_value=0,
code=code,
calldata=calldata,
gas=gas,
program_counter=0,
code=code,
calldata=calldata,
gas=gas,
program_counter=0,
stack=[],
memory=bytearray(),
storage={},
logs=[]
)
try:
while self.context.program_counter < len(self.context.code):
op = self.context.code[self.context.program_counter]
# 檢查 Gas
gas_cost = self.gas_costs.get(op, 0)
if self.context.gas < gas_cost:
return (False, b'', gas - self.context.gas)
self.context.gas -= gas_cost
# 執行指令
success = self._execute_op(op)
if not success:
return (False, b'', gas - self.context.gas)
# PUSH 指令需要特殊處理(推進指標)
if OpCode.PUSH1 <= op <= OpCode.PUSH2:
pass # 在 _execute_op 中處理
return (True, b'', gas - self.context.gas)
except Exception as e:
return (False, str(e).encode(), gas - self.context.gas)
def _execute_op(self, op: int) -> bool:
"""執行單一操作碼"""
ctx = self.context
# 棧操作
if op == OpCode.POP:
if len(ctx.stack) < 1:
return False
ctx.stack.pop()
ctx.program_counter += 1
# DUP 操作
elif OpCode.DUP1 <= op <= OpCode.DUP16:
n = op - OpCode.DUP1 + 1
if len(ctx.stack) < n:
return False
ctx.stack.append(ctx.stack[-n])
ctx.program_counter += 1
# SWAP 操作
elif OpCode.SWAP1 <= op <= OpCode.SWAP16:
n = op - OpCode.SWAP1 + 1
if len(ctx.stack) < n + 1:
return False
ctx.stack[-1], ctx.stack[-n-1] = ctx.stack[-n-1], ctx.stack[-1]
ctx.program_counter += 1
# PUSH 操作
elif OpCode.PUSH1 <= op <= OpCode.PUSH2:
n = op - OpCode.PUSH1 + 1
if ctx.program_counter + n >= len(ctx.code):
return False
value = int.from_bytes(
ctx.code[ctx.program_counter + 1:ctx.program_counter + 1 + n],
'big'
)
ctx.stack.append(value)
ctx.program_counter += 1 + n
# 算術操作
elif op == OpCode.ADD:
if len(ctx.stack) < 2:
return False
a, b = ctx.stack.pop(), ctx.stack.pop()
ctx.stack.append((a + b) % (2**256))
ctx.program_counter += 1
elif op == OpCode.MUL:
if len(ctx.stack) < 2:
return False
a, b = ctx.stack.pop(), ctx.stack.pop()
ctx.stack.append((a * b) % (2**256))
ctx.program_counter += 1
elif op == OpCode.SUB:
if len(ctx.stack) < 2:
return False
a, b = ctx.stack.pop(), ctx.stack.pop()
ctx.stack.append((a - b) % (2**256))
ctx.program_counter += 1
elif op == OpCode.DIV:
if len(ctx.stack) < 2:
return False
a, b = ctx.stack.pop(), ctx.stack.pop()
if b == 0:
ctx.stack.append(0)
else:
ctx.stack.append(a // b)
ctx.program_counter += 1
# 比較操作
elif op == OpCode.LT:
if len(ctx.stack) < 2:
return False
a, b = ctx.stack.pop(), ctx.stack.pop()
ctx.stack.append(1 if a < b else 0)
ctx.program_counter += 1
elif op == OpCode.EQ:
if len(ctx.stack) < 2:
return False
a, b = ctx.stack.pop(), ctx.stack.pop()
ctx.stack.append(1 if a == b else 0)
ctx.program_counter += 1
elif op == OpCode.ISZERO:
if len(ctx.stack) < 1:
return False
ctx.stack.append(1 if ctx.stack.pop() == 0 else 0)
ctx.program_counter += 1
# 位運算
elif op == OpCode.AND:
if len(ctx.stack) < 2:
return False
a, b = ctx.stack.pop(), ctx.stack.pop()
ctx.stack.append(a & b)
ctx.program_counter += 1
elif op == OpCode.OR:
if len(ctx.stack) < 2:
return False
a, b = ctx.stack.pop(), ctx.stack.pop()
ctx.stack.append(a | b)
ctx.program_counter += 1
elif op == OpCode.XOR:
if len(ctx.stack) < 2:
return False
a, b = ctx.stack.pop(), ctx.stack.pop()
ctx.stack.append(a ^ b)
ctx.program_counter += 1
# 記憶體操作
elif op == OpCode.MSTORE:
if len(ctx.stack) < 2:
return False
offset, value = ctx.stack.pop(), ctx.stack.pop()
self._mstore(offset, value)
ctx.program_counter += 1
elif op == OpCode.MLOAD:
if len(ctx.stack) < 1:
return False
offset = ctx.stack.pop()
value = self._mload(offset)
ctx.stack.append(value)
ctx.program_counter += 1
# 控制流
elif op == OpCode.JUMP:
if len(ctx.stack) < 1:
return False
dest = ctx.stack.pop()
if ctx.code[dest:dest+1] != bytes([OpCode.JUMPDEST]):
return False
ctx.program_counter = dest
elif op == OpCode.JUMPI:
if len(ctx.stack) < 2:
return False
dest, cond = ctx.stack.pop(), ctx.stack.pop()
if cond != 0:
if ctx.code[dest:dest+1] != bytes([OpCode.JUMPDEST]):
return False
ctx.program_counter = dest
else:
ctx.program_counter += 1
elif op == OpCode.JUMPDEST:
ctx.program_counter += 1
elif op == OpCode.PC:
ctx.stack.append(ctx.program_counter)
ctx.program_counter += 1
# Calldata 操作
elif op == OpCode.CALLDATALOAD:
if len(ctx.stack) < 1:
return False
offset = ctx.stack.pop()
if offset + 32 <= len(ctx.calldata):
value = int.from_bytes(ctx.calldata[offset:offset+32], 'big')
else:
value = 0
ctx.stack.append(value)
ctx.program_counter += 1
elif op == OpCode.CALLDATASIZE:
ctx.stack.append(len(ctx.calldata))
ctx.program_counter += 1
# 環境操作
elif op == OpCode.ADDRESS:
ctx.stack.append(int.from_bytes(ctx.contract_address, 'big'))
ctx.program_counter += 1
elif op == OpCode.CALLER:
ctx.stack.append(int.from_bytes(ctx.caller, 'big'))
ctx.program_counter += 1
elif op == OpCode.CALLVALUE:
ctx.stack.append(ctx.call_value)
ctx.program_counter += 1
# 區塊鏈操作
elif op == OpCode.KECCAK256:
if len(ctx.stack) < 2:
return False
offset, size = ctx.stack.pop(), ctx.stack.pop()
data = bytes(ctx.memory[offset:offset+size])
from web3 import Web3
keccak_hash = Web3.keccak(data)
ctx.stack.append(int.from_bytes(keccak_hash, 'big'))
ctx.program_counter += 1
# 終止操作
elif op == OpCode.STOP:
return True
elif op == OpCode.RETURN:
if len(ctx.stack) < 2:
return False
offset, size = ctx.stack.pop(), ctx.stack.pop()
# 返回記憶體中的資料
return True
elif op == OpCode.REVERT:
return False
else:
ctx.program_counter += 1
return True
def _mstore(self, offset: int, value: int):
"""將值存入記憶體"""
# 擴展記憶體(如需要)
required = offset + 32
if len(self.context.memory) < required:
self.context.memory.extend(b'\x00' * (required - len(self.context.memory)))
# 存入 32 位元組值
self.context.memory[offset:offset+32] = value.to_bytes(32, 'big')
def _mload(self, offset: int) -> int:
"""從記憶體載入值"""
if len(self.context.memory) < offset + 32:
return 0
return int.from_bytes(self.context.memory[offset:offset+32], 'big')
1.2 實作簡易算術合約
讓我們用此 EVM 執行一個簡單的算術合約。以下是對應的位元組碼:
def create_arithmetic_bytecode() -> bytes:
"""
建立一個簡單算術合約的位元組碼
功能:計算 (a + b) * c
堆疊布局(輸入):
- [c, b, a, ...]
堆疊布局(輸出):
- [result, ...]
操作:
PUSH1 a # 0x60 a
PUSH1 b # 0x60 b
ADD # 0x01
PUSH1 c # 0x60 c
MUL # 0x02
"""
return bytes([
0x60, 0x05, # PUSH1 5 // 載入 a = 5
0x60, 0x03, # PUSH1 3 // 載入 b = 3
0x01, # ADD // a + b = 8
0x60, 0x02, # PUSH1 2 // 載入 c = 2
0x02, # MUL // (a + b) * c = 16
0x00 # STOP
])
# 測試執行
evm = SimplifiedEVM()
success, result, gas_used = evm.execute(create_arithmetic_bytecode())
print(f"執行成功: {success}")
print(f"Gas 使用: {gas_used}")
print(f"棧頂值: {evm.context.stack[-1] if evm.context.stack else 'N/A'}")
第二章:實現完整的 Merkle Patricia Trie
2.1MPT 的理論基礎
Merkle Patricia Trie(MPT)是以太坊用於組織世界狀態的資料結構。其設計結合了三種資料結構的優點:
- Patricia Trie(前綴樹):共享具有共同前綴的鍵,提高空間效率
- Merkle Tree(默克爾樹):提供高效的路徑驗證與資料完整性保證
- 鍵值儲存:支援狀態的 CRUD 操作
MPT 中的節點有三種類型:
from typing import Optional, Dict, List
import hashlib
import rlp
class NodeType(Enum):
"""MPT 節點類型"""
EMPTY = 0
BRANCH = 1 # 分支節點:16 個子節點 + 值
EXTENSION = 2 # 擴展節點:共享前綴 + 下一個節點
LEAF = 3 # 葉子節點:剩餘前綴 + 值
@dataclass
class MPTNode:
"""MPT 節點基類"""
rlp_encoded: bytes # RLP 編碼後的內容
@property
def hash(self) -> bytes:
"""計算節點的 Keccak-256 哈希"""
return hashlib.new('keccak256', self.rlp_encoded).digest()
@dataclass
class BranchNode(MPTNode):
"""分支節點"""
children: List[Optional[bytes]] # 16 個子節點的哈希(0-F)
value: Optional[bytes] # 葉子節點值
def to_rlp(self) -> bytes:
"""RLP 編碼分支節點"""
data = self.children.copy()
data.append(self.value if self.value else b'')
return rlp.encode(data)
def __init__(self):
self.children = [None] * 16
self.value = None
@dataclass
class ExtensionNode(MPTNode):
"""擴展節點"""
shared_nibble: List[int] # 共享前綴(半位元組陣列)
child_hash: bytes # 子節點哈希
def to_rlp(self) -> bytes:
"""RLP 編碼擴展節點"""
# 標記前綴:0x00(偶數長度葉子)或 0x01(奇數長度葉子)
# 加上 2 表示擴展節點
prefix = self._nibbles_to_hex_prefix(self.shared_nibble, is_leaf=False)
nibbles = bytes(self.shared_nibble)
return rlp.encode([prefix + nibbles, self.child_hash])
def _nibbles_to_hex_prefix(self, nibbles: List[int], is_leaf: bool) -> bytes:
"""將半位元組轉換為十六進制前綴"""
hex_str = ''.join(hex(n)[2] for n in nibbles)
if is_leaf:
prefix = 0x20 if len(nibbles) % 2 == 0 else 0x30
else:
prefix = 0x00 if len(nibbles) % 2 == 0 else 0x10
return bytes([prefix + int(hex_str[:1], 16)])
def __init__(self):
self.shared_nibble = []
self.child_hash = b''
@dataclass
class LeafNode(MPTNode):
"""葉子節點"""
remaining_nibble: List[int] # 剩餘前綴
value: bytes # 節點值
def to_rlp(self) -> bytes:
"""RLP 編碼葉子節點"""
prefix = self._nibbles_to_hex_prefix(self.remaining_nibble, is_leaf=True)
nibbles = bytes(self.remaining_nibble)
return rlp.encode([prefix + nibbles, self.value])
def _nibbles_to_hex_prefix(self, nibbles: List[int], is_leaf: bool) -> bytes:
"""將半位元組轉換為十六進制前綴"""
hex_str = ''.join(hex(n)[2] for n in nibbles)
if is_leaf:
prefix = 0x20 if len(nibbles) % 2 == 0 else 0x30
else:
prefix = 0x00 if len(nibbles) % 2 == 0 else 0x10
return bytes([prefix + int(hex_str[:1], 16)])
def __init__(self):
self.remaining_nibble = []
self.value = b''
2.2 MPT 的完整實現
class MerklePatriciaTrie:
"""
Merkle Patricia Trie 實現
用於以太坊世界狀態的鍵值儲存:
- 鍵:帳戶地址(160 位元組 → 40 個十六進制字元)
- 值:帳戶狀態(RLP 編碼)
特性:
- 路徑壓縮(共享前綴)
- Merkle 哈希驗證
- 高效的更新與查詢
"""
def __init__(self, db: Optional[Dict[bytes, bytes]] = None):
"""
初始化 MPT
參數:
- db: 鍵值儲存(用於持久化節點)
"""
self.db = db if db else {} # 節點資料庫
self.root: bytes = b'' # 根節點哈希
self.cache: Dict[bytes, MPTNode] = {} # 內存緩存
def _hex_to_nibbles(self, key: str) -> List[int]:
"""
將十六進制字元串轉換為半位元組陣列
範例:'0x1234' → [1, 2, 3, 4]
"""
hex_key = key.lower()
if hex_key.startswith('0x'):
hex_key = hex_key[2:]
return [int(c, 16) for c in hex_key]
def _nibbles_to_hex(self, nibbles: List[int]) -> str:
"""半位元組陣列轉換為十六進制字元串"""
return ''.join(hex(n)[2] for n in nibbles)
def get(self, key: str) -> Optional[bytes]:
"""
根據鍵查詢值
參數:
- key: 查詢鍵(十六進制字元串,如 '0x1234...')
返回:
- 值(位元組),若不存在返回 None
"""
if not self.root:
return None
key_nibbles = self._hex_to_nibbles(key)
node = self._get_node(self.root)
return self._traverse_and_get(node, key_nibbles)
def _traverse_and_get(self, node: Optional[bytes],
key_nibbles: List[int]) -> Optional[bytes]:
"""遞迴遍歷並獲取值"""
if not node:
return None
decoded = self._decode_node(node)
if isinstance(decoded, LeafNode):
# 葉子節點:比較剩餘前綴
if decoded.remaining_nibble == key_nibbles:
return decoded.value
return None
elif isinstance(decoded, ExtensionNode):
# 擴展節點:匹配前綴後繼續
prefix_len = len(decoded.shared_nibble)
if key_nibbles[:prefix_len] == decoded.shared_nibble:
return self._traverse_and_get(
self._get_node(decoded.child_hash),
key_nibbles[prefix_len:]
)
return None
elif isinstance(decoded, BranchNode):
# 分支節點:根據下一個半位元組選擇分支
if not key_nibbles:
return decoded.value
next_nibble = key_nibbles[0]
return self._traverse_and_get(
self._get_node(decoded.children[next_nibble]),
key_nibbles[1:]
)
return None
def put(self, key: str, value: bytes) -> None:
"""
插入或更新鍵值對
參數:
- key: 鍵(十六進制字元串)
- value: 值(位元組)
"""
key_nibbles = self._hex_to_nibbles(key)
if not self.root:
# 空樹:創建葉子節點
leaf = LeafNode()
leaf.remaining_nibble = key_nibbles
leaf.value = value
leaf.rlp_encoded = leaf.to_rlp()
self._put_node(leaf)
self.root = leaf.hash
return
# 遍歷並更新
old_root = self.root
self.root = self._update_at(
self.root, key_nibbles, key_nibbles, value
)
def _update_at(self, node_hash: bytes, key_nibbles: List[int],
full_key: List[int], value: bytes) -> bytes:
"""在指定位置更新節點"""
if not node_hash:
# 創建新葉子節點
leaf = LeafNode()
leaf.remaining_nibble = key_nibbles
leaf.value = value
leaf.rlp_encoded = leaf.to_rlp()
self._put_node(leaf)
return leaf.hash
node = self._decode_node(node_hash)
if isinstance(node, LeafNode):
# 葉子節點:需要分裂
return self._update_leaf_split(node, key_nibbles, full_key, value)
elif isinstance(node, ExtensionNode):
# 擴展節點:匹配前綴
return self._update_extension(node, key_nibbles, full_key, value)
elif isinstance(node, BranchNode):
# 分支節點:遞迴更新子分支
return self._update_branch(node, key_nibbles, full_key, value)
return node_hash
def _update_leaf_split(self, old_leaf: LeafNode, key_nibbles: List[int],
full_key: List[int], value: bytes) -> bytes:
"""葉子節點分裂處理"""
# 找出共同前綴
common_prefix = self._find_common_prefix(
old_leaf.remaining_nibble, key_nibbles
)
if len(common_prefix) == len(old_leaf.remaining_nibble):
# 新鍵完全包含舊前綴
if not key_nibbles[len(common_prefix):]:
# 舊葉子作為新分支的值
leaf = LeafNode()
leaf.remaining_nibble = old_leaf.remaining_nibble[len(common_prefix):]
leaf.value = old_leaf.value
leaf.rlp_encoded = leaf.to_rlp()
new_leaf_hash = self._put_node(leaf)
branch = BranchNode()
branch.value = value
branch.children[full_key[len(common_prefix)]] = new_leaf_hash
branch.rlp_encoded = branch.to_rlp()
return self._put_node(branch)
else:
# 需要遞迴到分支
pass
else:
# 需要創建擴展節點
pass
# 簡化處理:直接替換(實際實現需要更複雜的分裂邏輯)
leaf = LeafNode()
leaf.remaining_nibble = key_nibbles
leaf.value = value
leaf.rlp_encoded = leaf.to_rlp()
return self._put_node(leaf)
def _find_common_prefix(self, nibbles1: List[int],
nibbles2: List[int]) -> List[int]:
"""找出兩個半位元組陣列的共同前綴"""
common = []
for n1, n2 in zip(nibbles1, nibbles2):
if n1 == n2:
common.append(n1)
else:
break
return common
def _update_extension(self, ext: ExtensionNode, key_nibbles: List[int],
full_key: List[int], value: bytes) -> bytes:
"""擴展節點更新"""
prefix_len = len(ext.shared_nibble)
if key_nibbles[:prefix_len] != ext.shared_nibble:
# 前綴不匹配,需要分裂
common = self._find_common_prefix(ext.shared_nibble, key_nibbles)
# 創建新分支
branch = BranchNode()
# 剩餘的舊擴展
old_ext_remains = ext.shared_nibble[len(common):]
if old_ext_remains:
old_leaf = LeafNode()
old_leaf.remaining_nibble = old_ext_remains
old_leaf.value = ext.value if hasattr(ext, 'value') else b''
old_leaf.rlp_encoded = old_leaf.to_rlp()
old_hash = self._put_node(old_leaf)
else:
old_hash = ext.child_hash
# 剩餘的新鍵
new_key_remains = key_nibbles[len(common):]
new_leaf = LeafNode()
new_leaf.remaining_nibble = new_key_remains
new_leaf.value = value
new_leaf.rlp_encoded = new_leaf.to_rlp()
new_hash = self._put_node(new_leaf)
# 填充分支
if new_key_remains:
branch.children[new_key_remains[0]] = new_hash
if old_ext_remains:
branch.children[old_ext_remains[0]] = old_hash
branch.rlp_encoded = branch.to_rlp()
branch_hash = self._put_node(branch)
# 創建新擴展
new_ext = ExtensionNode()
new_ext.shared_nibble = common
new_ext.child_hash = branch_hash
new_ext.rlp_encoded = new_ext.to_rlp()
return self._put_node(new_ext)
# 前綴匹配,遞迴更新
new_child_hash = self._update_at(
ext.child_hash,
key_nibbles[prefix_len:],
full_key,
value
)
ext.child_hash = new_child_hash
ext.rlp_encoded = ext.to_rlp()
return self._put_node(ext)
def _update_branch(self, branch: BranchNode, key_nibbles: List[int],
full_key: List[int], value: bytes) -> bytes:
"""分支節點更新"""
if not key_nibbles:
# 鍵完全匹配,在分支節點儲存值
branch.value = value
else:
next_nibble = key_nibbles[0]
new_child_hash = self._update_at(
self._get_node(branch.children[next_nibble]),
key_nibbles[1:],
full_key,
value
)
branch.children[next_nibble] = new_child_hash
branch.rlp_encoded = branch.to_rlp()
return self._put_node(branch)
def _get_node(self, node_hash: bytes) -> Optional[bytes]:
"""從資料庫或緩存獲取節點"""
if not node_hash:
return None
if node_hash in self.cache:
return node_hash
return self.db.get(node_hash)
def _put_node(self, node: MPTNode) -> bytes:
"""將節點存入資料庫"""
node_hash = node.hash
if node_hash not in self.db:
self.db[node_hash] = node.rlp_encoded
self.cache[node_hash] = node
return node_hash
def _decode_node(self, node_data: bytes) -> MPTNode:
"""解碼 RLP 資料為 MPT 節點"""
decoded = rlp.decode(node_data)
if not decoded:
return BranchNode()
if isinstance(decoded, list):
if len(decoded) == 17:
# 分支節點
branch = BranchNode()
branch.children = [
child if child else None
for child in decoded[:16]
]
branch.value = decoded[16] if decoded[16] else None
branch.rlp_encoded = node_data
return branch
elif len(decoded) == 2:
first = bytes(decoded[0])
second = bytes(decoded[1])
# 解析前綴
first_byte = first[0]
if first_byte >= 0x20:
# 葉子節點
leaf = LeafNode()
nibbles = self._decode_nibbles_from_prefix(first)
leaf.remaining_nibble = nibbles + list(first[1:])
leaf.value = second
leaf.rlp_encoded = node_data
return leaf
else:
# 擴展節點
ext = ExtensionNode()
nibbles = self._decode_nibbles_from_prefix(first)
ext.shared_nibble = nibbles + list(first[1:])
ext.child_hash = second
ext.rlp_encoded = node_data
return ext
# 回退:當作分支節點
return BranchNode()
def _decode_nibbles_from_prefix(self, prefix: bytes) -> List[int]:
"""從前綴位元組解碼半位元組"""
return []
def get_proof(self, key: str) -> tuple:
"""
獲取指定鍵的 Merkle 證明
返回:(值, 證明路徑上的節點列表)
"""
if not self.root:
return (None, [])
key_nibbles = self._hex_to_nibbles(key)
proof = []
node = self._get_node(self.root)
while node:
proof.append(node)
decoded = self._decode_node(node)
if isinstance(decoded, LeafNode):
if decoded.remaining_nibble == key_nibbles:
return (decoded.value, proof)
return (None, proof)
elif isinstance(decoded, ExtensionNode):
prefix_len = len(decoded.shared_nibble)
if key_nibbles[:prefix_len] != decoded.shared_nibble:
return (None, proof)
key_nibbles = key_nibbles[prefix_len:]
node = self._get_node(decoded.child_hash)
elif isinstance(decoded, BranchNode):
if not key_nibbles:
return (decoded.value, proof)
next_nibble = key_nibbles[0]
node = self._get_node(decoded.children[next_nibble])
key_nibbles = key_nibbles[1:]
else:
break
return (None, proof)
def verify_proof(self, root: bytes, key: str,
proof: List[bytes], value: bytes) -> bool:
"""
驗證 Merkle 證明
參數:
- root: 根節點哈希
- key: 查詢鍵
- proof: 證明路徑
- value: 聲稱的值
返回:驗證是否通過
"""
if not proof:
return value is None
# 重新計算根哈希
current_hash = self._compute_leaf_hash(key, value)
for node in reversed(proof[:-1]):
current_hash = self._compute_internal_hash(node)
return current_hash == root
def _compute_leaf_hash(self, key: str, value: bytes) -> bytes:
"""計算葉子節點的哈希"""
if value is None:
return b''
key_nibbles = self._hex_to_nibbles(key)
leaf = LeafNode()
leaf.remaining_nibble = key_nibbles
leaf.value = value
leaf.rlp_encoded = leaf.to_rlp()
return leaf.hash
def _compute_internal_hash(self, node_data: bytes) -> bytes:
"""計算內部節點的哈希"""
return hashlib.new('keccak256', node_data).digest()
2.3 MPT 的實際應用
# 創建並使用 MPT
trie = MerklePatriciaTrie()
# 插入以太坊風格的帳戶狀態
accounts = {
'0x' + '11' * 20: b'\x01', # 帳戶 1:非零 nonce
'0x' + '22' * 20: b'\x02', # 帳戶 2
'0x' + '33' * 20: b'\x03', # 帳戶 3
}
for address, state in accounts.items():
trie.put(address, state)
print(f"插入: {address} -> {state.hex()}")
print(f"根哈希: {trie.root.hex()}")
# 查詢
value = trie.get('0x' + '22' * 20)
print(f"查詢 0x22...: {value.hex() if value else 'Not Found'}")
# 生成並驗證證明
proof_value, proof_nodes = trie.get_proof('0x' + '22' * 20)
print(f"證明長度: {len(proof_nodes)}")
第三章:實現簡易共識機制
3.1 工作量證明的模擬實現
讓我們實現一個簡化版的工作量證明(Proof of Work)共識機制:
import time
import hashlib
from dataclasses import dataclass, field
from typing import List, Optional
from concurrent.futures import ThreadPoolExecutor
@dataclass
class Block:
"""區塊結構"""
index: int
timestamp: int
transactions: List[dict]
previous_hash: bytes
nonce: int = 0
difficulty: int = 4 # 前導零位数
hash: bytes = b''
def __post_init__(self):
if not self.hash:
self.hash = self.compute_hash()
def compute_hash(self) -> bytes:
"""計算區塊哈希"""
data = (
str(self.index).encode() +
str(self.timestamp).encode() +
str(self.transactions).encode() +
self.previous_hash +
str(self.nonce).encode()
)
return hashlib.sha256(data).digest()
def is_valid(self) -> bool:
"""驗證區塊有效性"""
return (
self.hash[:self.difficulty] == b'\x00' * self.difficulty and
self.hash == self.compute_hash()
)
@dataclass
class Miner:
"""礦工"""
id: str
hash_power: float # 相對算力(0-1)
def mine_block(self, block: Block, target_difficulty: int) -> Optional[int]:
"""
嘗試挖礦
返回找到的有效 nonce,若失敗返回 None
"""
target = b'\x00' * target_difficulty
nonce = 0
max_attempts = int(2 ** (256 - target_difficulty * 4)) # 粗略估算
# 根據算力決定搜索範圍
attempts_per_round = int(max_attempts * self.hash_power)
for _ in range(attempts_per_round):
test_hash = self._hash_with_nonce(block, nonce)
if test_hash[:target_difficulty] == target:
return nonce
nonce += 1
return None
def _hash_with_nonce(self, block: Block, nonce: int) -> bytes:
"""計算帶 nonce 的哈希"""
data = (
str(block.index).encode() +
str(block.timestamp).encode() +
str(block.transactions).encode() +
block.previous_hash +
str(nonce).encode()
)
return hashlib.sha256(data).digest()
class ProofOfWorkConsensus:
"""
簡化版工作量證明共識系統
演示:
- 區塊構造
- 礦工競爭
- 區塊驗證
- 最長鏈選擇
"""
def __init__(self, difficulty: int = 4):
self.chain: List[Block] = []
self.pending_transactions: List[dict] = []
self.difficulty = difficulty
self.miners: List[Miner] = []
self.reward_address = b'\x00' * 20
# 創建創世區塊
self.create_genesis_block()
def create_genesis_block(self) -> Block:
"""創建創世區塊"""
genesis = Block(
index=0,
timestamp=int(time.time()),
transactions=[],
previous_hash=b'\x00' * 32,
nonce=0
)
genesis.hash = self._mine_with_difficulty(genesis, self.difficulty)
self.chain.append(genesis)
return genesis
def _mine_with_difficulty(self, block: Block, difficulty: int) -> bytes:
"""挖掘區塊直到找到有效哈希"""
target = b'\x00' * difficulty
nonce = 0
while True:
test_hash = hashlib.sha256((
str(block.index).encode() +
str(block.timestamp).encode() +
str(block.transactions).encode() +
block.previous_hash +
str(nonce).encode()
)).digest()
if test_hash[:difficulty] == target:
block.nonce = nonce
return test_hash
nonce += 1
def add_transaction(self, sender: str, recipient: str, amount: float):
"""添加交易到待確認池"""
self.pending_transactions.append({
'sender': sender,
'recipient': recipient,
'amount': amount,
'timestamp': time.time()
})
def create_block(self) -> Block:
"""創建新區塊"""
last_block = self.chain[-1]
new_block = Block(
index=last_block.index + 1,
timestamp=int(time.time()),
transactions=self.pending_transactions.copy(),
previous_hash=last_block.hash,
difficulty=self.difficulty
)
return new_block
def mine_block_parallel(self, miners: List[Miner]) -> tuple:
"""
並行挖掘
返回:(成功挖礦的礦工, 區塊, nonce)
"""
block = self.create_block()
with ThreadPoolExecutor(max_workers=len(miners)) as executor:
futures = {
executor.submit(miner.mine_block, block, self.difficulty): miner
for miner in miners
}
for future in futures:
miner = futures[future]
result = future.result()
if result is not None:
block.nonce = result
block.hash = hashlib.sha256((
str(block.index).encode() +
str(block.timestamp).encode() +
str(block.transactions).encode() +
block.previous_hash +
str(block.nonce).encode()
)).digest()
return (miner, block, result)
return (None, block, None)
def add_block(self, block: Block) -> bool:
"""將區塊添加到鏈"""
last_block = self.chain[-1]
# 驗證區塊
if block.previous_hash != last_block.hash:
return False
if not block.is_valid():
return False
self.chain.append(block)
self.pending_transactions = []
return True
def is_chain_valid(self) -> bool:
"""驗證整條鏈的有效性"""
for i in range(1, len(self.chain)):
current = self.chain[i]
previous = self.chain[i - 1]
if current.previous_hash != previous.hash:
return False
if not current.is_valid():
return False
return True
def resolve_conflicts(self, other_chain: List[Block]) -> bool:
"""
解決鏈衝突(最長鏈原則)
返回:是否接受了新鏈
"""
if len(other_chain) <= len(self.chain):
return False
# 驗證新鏈
if not self._verify_chain(other_chain):
return False
self.chain = other_chain
return True
def _verify_chain(self, chain: List[Block]) -> bool:
"""驗證完整鏈"""
if not chain:
return False
for i in range(1, len(chain)):
if chain[i].previous_hash != chain[i - 1].hash:
return False
if not chain[i].is_valid():
return False
return True
def get_balance(self, address: str) -> float:
"""計算地址餘額"""
balance = 0.0
for block in self.chain:
for tx in block.transactions:
if tx['recipient'] == address:
balance += tx['amount']
if tx['sender'] == address:
balance -= tx['amount']
return balance
3.2 質押證明的簡化模擬
@dataclass
class Validator:
"""驗證者"""
address: bytes
stake: int # 質押數量
is_active: bool = True
slashed: bool = False
class SimplifiedProofOfStake:
"""
簡化版權益證明共識
演示:
- 驗證者選擇
- 區塊提議
- 見證投票
- 最終確定性
"""
def __init__(self):
self.validators: List[Validator] = []
self.blocks: List[Block] = []
self.pending_transactions: List[dict] = []
self.current_epoch: int = 0
self.checkpoint_interval: int = 32 # 每 32 個 slot 一個 checkpoint
# 創建創世區塊
self.create_genesis()
def add_validator(self, address: bytes, stake: int):
"""添加驗證者"""
validator = Validator(address=address, stake=stake)
self.validators.append(validator)
def _get_active_validators(self) -> List[Validator]:
"""獲取活躍驗證者列表"""
return [v for v in self.validators if v.is_active and not v.slashed]
def _select_proposer(self, slot: int) -> Validator:
"""根據質押量隨機選擇提議者"""
active = self._get_active_validators()
total_stake = sum(v.stake for v in active)
# 簡化的偽隨機選擇
import random
seed = slot + self.current_epoch * 1000
random.seed(seed)
selected_stake = random.randint(0, total_stake)
cumulative = 0
for validator in active:
cumulative += validator.stake
if cumulative >= selected_stake:
return validator
return active[-1]
def _create_block(self, proposer: Validator) -> Block:
"""創建區塊"""
last_block = self.blocks[-1] if self.blocks else None
block = Block(
index=len(self.blocks),
timestamp=int(time.time()),
transactions=self.pending_transactions.copy(),
previous_hash=last_block.hash if last_block else b'\x00' * 32,
difficulty=0 # PoS 不需要工作量難度
)
# 設置 PoS 哈希(使用質押證明)
block.hash = self._compute_pos_hash(block, proposer)
return block
def _compute_pos_hash(self, block: Block, proposer: Validator) -> bytes:
"""計算 PoS 區塊哈希"""
data = (
str(block.index).encode() +
str(block.timestamp).encode() +
str(block.transactions).encode() +
block.previous_hash +
proposer.address +
str(proposer.stake).encode()
)
return hashlib.sha256(data).digest()
def propose_block(self, slot: int) -> tuple:
"""提議區塊"""
proposer = self._select_proposer(slot)
block = self._create_block(proposer)
return (proposer, block)
def attest(self, block: Block, validator: Validator) -> dict:
"""驗證者見證區塊"""
# 驗證區塊有效性
is_valid = self._verify_block(block)
attestation = {
'validator': validator.address,
'block_hash': block.hash,
'slot': len(self.blocks),
'is_valid': is_valid,
'signature': self._sign_attestation(block, validator)
}
return attestation
def _verify_block(self, block: Block) -> bool:
"""驗證區塊"""
if len(self.blocks) > 0:
last_block = self.blocks[-1]
if block.previous_hash != last_block.hash:
return False
return True
def _sign_attestation(self, block: Block,
validator: Validator) -> bytes:
"""簽署見證"""
data = block.hash + validator.address
return hashlib.sha256(data).digest()
def finalize_checkpoint(self, attestations: List[dict]) -> bool:
"""最終確定檢查點"""
active = self._get_active_validators()
# 計算有效見證數
valid_attestations = sum(1 for a in attestations if a['is_valid'])
attestation_rate = valid_attestations / len(active) if active else 0
# 需要 2/3 以上的見證才能最終確定
if attestation_rate >= 2/3:
# 創建檢查點
checkpoint = {
'epoch': self.current_epoch,
'block_hash': self.blocks[-1].hash if self.blocks else b'',
'attestations': len(valid_attestations)
}
return True
return False
def slash_validator(self, validator: Validator, reason: str):
"""削減驗證者"""
validator.slashed = True
# 在實際系統中,這裡會處理質押品的沒收
print(f"驗證者 {validator.address.hex()} 因 {reason} 被削減")
結論:實作學習的價值
透過親手實現這些核心組件,開發者將獲得:
- 深度理解:知道 EVM 如何執行位元組碼,理解狀態組織的原理
- 除錯能力:能夠診斷複雜的 Gas 優化與合約安全問題
- 設計視野:理解為何以太坊做出特定的設計選擇
- 創新基礎:在深刻理解現有系統的基礎上進行創新
這個學習路徑推薦給所有希望成為以太坊專家的開發者。從簡化實現開始,逐步添加複雜性,最終構建完整的功能原型——這是理解複雜系統最有效的方式。
參考資源
- Ethereum Yellow Paper - Gavin Wood
- EVM Illustrated - taken
- Merkle Patricia Trie Specification - ethereum.org
- Casper the Friendly Finality Gadget - Vitalik et al.
- Go-Ethereum (geth) Source Code
免責聲明
本網站內容僅供教育與資訊目的,不構成任何投資建議或推薦。在進行任何加密貨幣相關操作前,請自行研究並諮詢專業人士意見。所有投資均有風險,請謹慎評估您的風險承受能力。
相關文章
- 以太坊升級歷史與 The Merge 技術演進完整指南 — 深入分析以太坊的升級歷史,特別聚焦於 2022 年最重要的「合併」(The Merge)升級,從技術層面詳細比較 PoW 與 PoS 機制的差異。我們涵蓋每個升級的時間節點、技術變更、對開發者與用戶的實際影響,以及後續升級對生態系統的影響。
- 以太坊核心協議完整技術分析:從共識機制到狀態管理 — 本文提供一份全面且深入的以太坊核心協議技術分析,涵蓋共識機制、Casper FFG、LMD Ghost、EVM 架構、Gas 計算、狀態管理等技術層面。我們從密碼學基礎出發,逐步構建對以太坊整體架構的系統性理解,提供關鍵計算公式與數值推導,並深入分析 Layer 2 擴展方案和 MEV 基礎設施。截至 2026 年第一季度,以太坊網路質押總量超過 3,400 萬 ETH,驗證者數量突破 100 萬,本技術分析將幫助讀者理解這些數據背後的工程原理。
- 以太坊共識機制深度技術分析:Attestation、Casper FFG 與 CBC 設計原理完整指南 — 本文深入剖析以太坊 PoS 共識機制的核心技術,包括 Attestation 證明機制的詳細流程、Casper FFG 與 CBC 的設計差異比較、罰沒機制、經濟激勵模型,以及 Single Slot Finality 未來演進方向。透過完整的技術分析,幫助讀者建立對以太坊共識層的深入理解。
- 以太坊核心協議基礎完整指南:從理論到實作的深度技術分析 — 本文提供以太坊核心協議的完整技術指南,涵蓋共識層、執行層、智慧合約部署、EVM 等核心元件的技術原理與實作細節。援引以太坊白皮書(Buterin, 2014)、黃皮書(Wood, 2014-2023)、Gasper 論文(Buterin et al., 2020)等正式學術文獻強化內容的學術嚴謹性。包含 Gasper 共識機制的數學定義、LMD-GHOST 分叉選擇規則、MPT 狀態管理、EIP-1559 費用燃燒機制、驗證者質押經濟學等完整技術分析。
- 以太坊 PoS 共識密碼學完整指南:BLS 簽章聚合、VDF 隨機數、BFT 容錯模型數學推導 — 本文深入分析以太坊 PoS 共識機制的密碼學基礎,包括 BLS 簽章聚合技術的數學原理與效率分析、VDF 可驗證延遲函數的設計與實現、RANDAO 混洗機制、以及共識安全性分析。特別聚焦於 BFT 共識容錯模型的數學推導,包括 PBFT 協議的安全性證明、Casper FFG 的容錯分析、LMD-GHOST 的安全模型、以及經濟激勵的數學模型。提供完整的數學推導與 2026 Q1 最新驗證者數據。
延伸閱讀與來源
- Ethereum.org Developers 官方開發者入口與技術文件
- EIPs 以太坊改進提案完整列表
- Solidity 文檔 智慧合約程式語言官方規格
- EVM 代碼庫 EVM 實作的核心參考
- Alethio EVM 分析 EVM 行為的正規驗證
這篇文章對您有幫助嗎?
請告訴我們如何改進:
評論
發表評論
注意:由於這是靜態網站,您的評論將儲存在本地瀏覽器中,不會公開顯示。
目前尚無評論,成為第一個發表評論的人吧!