以太坊即時數據整合開發完整指南:從 API 串接到實際應用的工程實踐
在以太坊開發中,即時數據的獲取與處理是構建高效 DApp 的核心能力。本指南從工程師視角出發,深入探討以太坊生態系統中各類即時數據的獲取方式,提供完整的 API 整合範例。我們涵蓋 RPC 節點服務整合、CoinGecko 價格 API、Gas 費用預測、The Graph 子圖查詢、DeFi 協議數據聚合等主題,並展示如何構建一個實際的即時數據儀表板。每個章節都包含可運作的程式碼範例與最佳實踐建議。
以太坊即時數據整合開發完整指南:從 API 串接到實際應用的工程實踐
概述
在以太坊開發中,即時數據的獲取與處理是構建高效 DApp 的核心能力。無論是顯示即時 ETH 價格、追蹤 Gas 費用、監控合約狀態,還是分析鏈上活動,都需要可靠的數據來源。本指南從工程師視角出發,深入探討以太坊生態系統中各類即時數據的獲取方式,提供完整的 API 整合範例,並展示如何構建一個實際的即時數據儀表板。
一、以太坊即時數據生態概述
1.1 數據類型分類
以太坊生態中的即時數據可分為以下幾大類型:
鏈上數據
- 區塊與交易資訊
- 帳戶餘額與交易歷史
- 合約狀態與事件日誌
- Token 轉移與餘額
協議層數據
- Gas 費用預測與即時費用
- 區塊空間利用率
- 網路延遲與區塊時間
市場數據
- ETH 價格與交易量
- DeFi 協議 TVL
- 穩定幣市值與流通量
節點數據
- 驗證者分布與質押量
- RPC 節點健康狀態
- P2P 網路拓撲
1.2 主要數據提供商生態
主流 RPC 與數據服務提供商:
| 類別 | 服務商 | 特色 | 免費額度 |
|-----|-------|------|---------|
| RPC | Infura | 最廣泛使用,企業級 | 100k req/day |
| RPC | Alchemy | 高可靠性,免費 tier 強 | 10M compute units |
| RPC | QuickNode | 全球分布,高效能 | 50k req/day |
| 數據 | Etherscan | 區塊瀏覽器龍頭 | 5 req/sec |
| 數據 | Dune Analytics | SQL 查詢強大 | 免費查詢 |
| 數據 | The Graph | GraphQL 子圖 | 100k queries |
| 數據 | CoinGecko | 價格數據 | 10k req/min |
| 數據 | Coingecko API | 即時價格 | 10-30 req/min |
二、以太坊 RPC 節點服務整合
2.1 RPC API 基礎詳解
以太坊 JSON-RPC 是與以太坊節點通信的標準接口。理解 RPC 方法是獲取鏈上數據的基礎。
常用 RPC 方法分類
// 區塊與狀態查詢
eth_blockNumber // 取得最新區塊編號
eth_getBlockByNumber // 依區塊編號取得區塊詳情
eth_getTransactionByHash // 依交易雜湊取得交易詳情
eth_getTransactionReceipt // 取得交易收據
eth_call // 執行合約讀取調用
eth_getBalance // 取得帳戶餘額
eth_getCode // 取得帳戶代碼
// 事件與日誌
eth_getLogs // 取得區塊日誌
eth_newFilter // 建立事件過濾器
eth_getFilterChanges // 取得過濾器變更
// 狀態與估計
eth_estimateGas // 估算 Gas 用量
eth_gasPrice // 取得當前 Gas 價格
eth_feeHistory // 取得歷史費用數據
2.2 程式碼實作:基礎 RPC 客戶端
以下是使用 Python 整合以太坊 RPC 的完整範例:
import requests
import json
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from decimal import Decimal
import time
@dataclass
class BlockData:
number: int
hash: str
parent_hash: str
timestamp: int
gas_used: int
gas_limit: int
transactions: List[str]
miner: str
difficulty: int
@dataclass
class TransactionData:
hash: str
block_number: int
from_address: str
to_address: str
value: int
gas: int
gas_price: int
nonce: int
input_data: str
class EthereumRPCClient:
"""以太坊 RPC 客戶端封裝"""
def __init__(self, rpc_url: str, api_key: Optional[str] = None):
"""
初始化 RPC 客戶端
Args:
rpc_url: RPC 端點 URL
api_key: API Key(如需要)
"""
self.rpc_url = rpc_url
self.api_key = api_key
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json'
})
def _make_request(self, method: str, params: List[Any] = None) -> Any:
"""發送 RPC 請求"""
payload = {
'jsonrpc': '2.0',
'method': method,
'params': params or [],
'id': 1
}
response = self.session.post(self.rpc_url, json=payload)
result = response.json()
if 'error' in result:
raise Exception(f"RPC Error: {result['error']}")
return result.get('result')
def get_block_number(self) -> int:
"""取得最新區塊編號"""
return int(self._make_request('eth_blockNumber'), 16)
def get_block_by_number(self, block_number: int, full_transactions: bool = False) -> BlockData:
"""
依區塊編號取得區塊詳情
Args:
block_number: 區塊編號
full_transactions: 是否包含完整交易資料
"""
hex_number = hex(block_number)
params = [hex_number, full_transactions]
block = self._make_request('eth_getBlockByNumber', params)
return BlockData(
number=int(block['number'], 16),
hash=block['hash'],
parent_hash=block['parentHash'],
timestamp=int(block['timestamp'], 16),
gas_used=int(block['gasUsed'], 16),
gas_limit=int(block['gasLimit'], 16),
transactions=[tx['hash'] for tx in block['transactions']] if full_transactions else block['transactions'],
miner=block['miner'],
difficulty=int(block['totalDifficulty'], 16) if 'totalDifficulty' in block else 0
)
def get_balance(self, address: str, block: str = 'latest') -> Decimal:
"""
取得帳戶 ETH 餘額
Args:
address: 以太坊地址
block: 區塊標識(latest, earliest, pending 或區塊編號)
Returns:
以 ETH 為單位的餘額
"""
params = [address, block]
wei_balance = int(self._make_request('eth_getBalance', params), 16)
return Decimal(wei_balance) / Decimal(10**18)
def get_transaction(self, tx_hash: str) -> TransactionData:
"""依交易雜湊取得交易詳情"""
tx = self._make_request('eth_getTransactionByHash', [tx_hash])
return TransactionData(
hash=tx['hash'],
block_number=int(tx['blockNumber'], 16) if tx['blockNumber'] else 0,
from_address=tx['from'],
to_address=tx['to'] or '',
value=int(tx['value'], 16),
gas=int(tx['gas'], 16),
gas_price=int(tx['gasPrice'], 16),
nonce=int(tx['nonce'], 16),
input_data=tx['input']
)
def get_gas_price(self) -> Dict[str, int]:
"""
取得當前 Gas 價格
Returns:
包含 low, medium, high 的字典(以 Wei 為單位)
"""
# 取得 pending 區塊的費用歷史
latest_block = self.get_block_number()
hex_blocks = hex(latest_block - 10)
try:
fee_history = self._make_request('eth_feeHistory', [
'0x5', # 最近 5 個區塊
hex_blocks,
[10, 50, 90] # 百分位數
])
base_fee_per_gas = int(fee_history['baseFeePerGas'][-1], 16)
priority_fee_per_gas = fee_history['reward']
return {
'low': int(priority_fee_per_gas[0], 16),
'medium': int(priority_fee_per_gas[1], 16),
'high': int(priority_fee_per_gas[2], 16),
'base_fee': base_fee_per_gas
}
except Exception:
# 備用方案:直接獲取 gas price
gas_price = int(self._make_request('eth_gasPrice'), 16)
return {
'low': gas_price // 2,
'medium': gas_price,
'high': gas_price * 2,
'base_fee': gas_price
}
def estimate_gas(self, tx_params: Dict[str, Any]) -> int:
"""
估算交易 Gas 用量
Args:
tx_params: 交易參數,包含 to, value, data 等
"""
return int(self._make_request('eth_estimateGas', [tx_params]), 16)
def get_token_balance(self, token_address: str, wallet_address: str, block: str = 'latest') -> Dict[str, Any]:
"""
取得 ERC-20 代幣餘額
使用 ERC-20 standard balanceOf 方法
"""
# ERC-20 balanceOf 方法簽名
method_id = '0x70a08231'
# wallet address padding to 32 bytes
padded_wallet = wallet_address[2:].zfill(64)
data = method_id + padded_wallet
params = [{
'to': token_address,
'data': data
}, block]
result = int(self._make_request('eth_call', params), 16)
return {
'raw': result,
'decimals': 18, # 應從合約獲取
'formatted': result / (10**18)
}
# 使用範例
def main():
# 使用 Infura RPC
rpc_url = "https://mainnet.infura.io/v3/YOUR_PROJECT_ID"
client = EthereumRPCClient(rpc_url)
# 取得最新區塊
block_number = client.get_block_number()
print(f"最新區塊編號: {block_number}")
# 取得區塊詳情
block = client.get_block_by_number(block_number)
print(f"區塊時間戳: {block.timestamp}")
print(f"Gas 使用量: {block.gas_used}/{block.gas_limit}")
# 取得餘額
balance = client.get_balance("0x742d35Cc6634C0532925a3b844Bc9e7595f0eB5e")
print(f"ETH 餘額: {balance}")
# 取得 Gas 價格
gas_prices = client.get_gas_price()
print(f"Gas 價格 (Gwei): low={gas_prices['low']/1e9}, "
f"medium={gas_prices['medium']/1e9}, high={gas_prices['high']/1e9}")
if __name__ == "__main__":
main()
2.3 批次查詢與效能優化
在處理大量數據時,需要注意請求效率優化:
import asyncio
import aiohttp
from typing import List, Dict, Any
import json
class BatchRPCClient:
"""批次 RPC 客戶端,支援並發請求"""
def __init__(self, rpc_url: str, max_concurrent: int = 10):
self.rpc_url = rpc_url
self.semaphore = asyncio.Semaphore(max_concurrent)
async def _make_request(self, session: aiohttp.ClientSession,
method: str, params: List[Any]) -> Dict:
"""發送單個請求"""
async with self.semaphore:
payload = {
'jsonrpc': '2.0',
'method': method,
'params': params,
'id': 1
}
async with session.post(self.rpc_url, json=payload) as response:
return await response.json()
async def batch_get_balances(self, addresses: List[str]) -> Dict[str, Dict]:
"""
批次取得多個地址餘額
這比串行請求快 10-100 倍
"""
async with aiohttp.ClientSession() as session:
tasks = [
self._make_request(session, 'eth_getBalance', [addr, 'latest'])
for addr in addresses
]
results = await asyncio.gather(*tasks)
return {
addr: {
'wei': int(res['result'], 16),
'eth': int(res['result'], 16) / 1e18
}
for addr, res in zip(addresses, results)
}
async def batch_get_token_balances(self,
token_address: str,
addresses: List[str]) -> Dict[str, int]:
"""批次取得 ERC-20 代幣餘額"""
method_id = '0x70a08231'
async with aiohttp.ClientSession() as session:
tasks = []
for addr in addresses:
padded_addr = addr[2:].zfill(64)
data = method_id + padded_addr
params = [{'to': token_address, 'data': data}, 'latest']
tasks.append(
self._make_request(session, 'eth_call', params)
)
results = await asyncio.gather(*tasks)
return {
addr: int(res['result'], 16)
for addr, res in zip(addresses, results)
}
async def get_block_transactions(self,
block_numbers: List[int]) -> Dict[int, List[str]]:
"""批次取得多個區塊的交易"""
async with aiohttp.ClientSession() as session:
tasks = [
self._make_request(session, 'eth_getBlockByNumber',
[hex(num), False])
for num in block_numbers
]
results = await asyncio.gather(*tasks)
return {
num: [tx['hash'] for tx in res['result']['transactions']]
for num, res in zip(block_numbers, results)
}
# 使用範例:批次查詢
async def batch_example():
client = BatchRPCClient("https://mainnet.infura.io/v3/YOUR_PROJECT_ID")
# 批次查詢 100 個地址的餘額
addresses = [f"0x{'0' * 40}"жалей" for _ in range(100)] # 示例地址
balances = await client.batch_get_balances(addresses)
print(f"查詢了 {len(balances)} 個地址")
# 批次查詢某代幣餘額
usdc_address = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48" # USDC
token_balances = await client.batch_get_token_balances(
usdc_address,
addresses[:50]
)
print(f"代幣餘額查詢完成")
# 執行
asyncio.run(batch_example())
三、即時價格數據 API 整合
3.1 CoinGecko API 實作
CoinGecko 提供免費的加密貨幣價格 API,是獲取即時價格數據的首選:
import requests
import time
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from decimal import Decimal
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class PriceData:
"""價格數據結構"""
symbol: str
usd: Decimal
usd_24h_change: Decimal
usd_24h_vol: Decimal
usd_market_cap: Decimal
last_updated: str
class CoinGeckoAPI:
"""CoinGecko API 封裝"""
BASE_URL = "https://api.coingecko.com/api/v3"
RATE_LIMIT = 10 # 免費版 10-30 calls/minute
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key
self.session = requests.Session()
self.last_request_time = 0
if api_key:
self.session.headers['x-cg-demo-api-key'] = api_key
def _rate_limit_wait(self):
"""速率限制"""
elapsed = time.time() - self.last_request_time
if elapsed < (60 / self.RATE_LIMIT):
time.sleep((60 / self.RATE_LIMIT) - elapsed)
self.last_request_time = time.time()
def get_price(self,
ids: List[str],
vs_currencies: List[str] = ['usd'],
include_24hr_change: bool = True,
include_24hr_vol: bool = True,
include_market_cap: bool = True) -> Dict[str, Any]:
"""
取得加密貨幣價格
Args:
ids: CoinGecko 代幣 ID(如 ethereum, bitcoin)
vs_currencies: 報價貨幣
include_24hr_change: 包含 24 小時漲跌幅
include_24hr_vol: 包含 24 小時交易量
include_market_cap: 包含市值
Returns:
價格數據字典
"""
self._rate_limit_wait()
endpoint = f"{self.BASE_URL}/simple/price"
params = {
'ids': ','.join(ids),
'vs_currencies': ','.join(vs_currencies),
'include_24hr_change': str(include_24hr_change).lower(),
'include_24hr_vol': str(include_24hr_vol).lower(),
'include_market_cap': str(include_market_cap).lower()
}
response = self.session.get(endpoint, params=params)
response.raise_for_status()
return response.json()
def get_eth_price(self) -> PriceData:
"""取得 ETH 即時價格"""
data = self.get_price(
ids=['ethereum'],
vs_currencies=['usd'],
include_24hr_change=True,
include_24hr_vol=True,
include_market_cap=True
)['ethereum']
return PriceData(
symbol='ETH',
usd=Decimal(str(data['usd'])),
usd_24h_change=Decimal(str(data.get('usd_24h_change', 0))),
usd_24h_vol=Decimal(str(data.get('usd_24h_vol', 0))),
usd_market_cap=Decimal(str(data.get('usd_market_cap', 0))),
last_updated=time.strftime('%Y-%m-%d %H:%M:%S')
)
def get_token_prices(self, contract_addresses: List[str]) -> Dict[str, Any]:
"""
依合約地址取得代幣價格
適用於 ERC-20 代幣
"""
self._rate_limit_wait()
endpoint = f"{self.BASE_URL}/simple/token_price/ethereum"
params = {
'contract_addresses': ','.join(contract_addresses),
'vs_currencies': 'usd'
}
response = self.session.get(endpoint, params=params)
return response.json()
def get_market_chart(self,
coin_id: str,
vs_currency: str = 'usd',
days: int = 7) -> Dict[str, List[List[float]]]:
"""
取得市場歷史數據
Args:
coin_id: 代幣 ID
vs_currency: 報價貨幣
days: 天數(1, 7, 30, 90, 365, max)
Returns:
價格歷史陣列
"""
self._rate_limit_wait()
endpoint = f"{self.BASE_URL}/coins/{coin_id}/market_chart"
params = {
'vs_currency': vs_currency,
'days': days
}
response = self.session.get(endpoint, params=params)
response.raise_for_status()
return response.json()
class PriceMonitor:
"""價格監控器"""
def __init__(self, api: CoinGeckoAPI, alert_threshold_percent: float = 5.0):
self.api = api
self.alert_threshold = alert_threshold_percent
self.last_price = None
async def start_monitoring(self, interval_seconds: int = 60):
"""開始監控價格"""
import asyncio
while True:
try:
current_price = self.api.get_eth_price()
if self.last_price:
change_percent = abs(
(current_price.usd - self.last_price.usd)
/ self.last_price.usd * 100
)
if change_percent > self.alert_threshold:
logger.warning(
f"價格變動 alert: ETH ${current_price.usd} "
f"({change_percent:.2f}% 變動)"
)
self.last_price = current_price
logger.info(f"ETH Price: ${current_price.usd}")
except Exception as e:
logger.error(f"Price fetch error: {e}")
await asyncio.sleep(interval_seconds)
# 使用範例
def price_example():
api = CoinGeckoAPI()
# 取得 ETH 價格
eth_price = api.get_eth_price()
print(f"ETH 價格: ${eth_price.usd}")
print(f"24小時變動: {eth_price.usd_24h_change}%")
print(f"24小時交易量: ${eth_price.usd_24h_vol:,.0f}")
# 取得多個代幣價格
prices = api.get_price(
ids=['ethereum', 'bitcoin', 'usd-coin', 'tether'],
include_24hr_change=True
)
print("\n多代幣報價:")
for token, data in prices.items():
print(f" {token}: ${data['usd']} ({data.get('usd_24h_change', 0):.2f}%)")
# 取得歷史數據
chart = api.get_market_chart('ethereum', days=30)
prices_history = chart['prices']
print(f"\n近 30 天價格範圍:")
prices_only = [p[1] for p in prices_history]
print(f" 最高: ${max(prices_only):.2f}")
print(f" 最低: ${min(prices_only):.2f}")
3.2 即時 Gas 費用預測
Gas 費用的預測對於優化交易成本至關重要:
import requests
from typing import Dict, List, Optional
from dataclasses import dataclass
import time
@dataclass
class GasRecommendation:
"""Gas 推薦數據"""
slow: int # Gwei
standard: int # Gwei
fast: int # Gwei
base_fee: int # Gwei
priority_fee: int # Gwei
class GasTracker:
"""Gas 費用追蹤器"""
def __init__(self, rpc_url: str):
self.rpc_url = rpc_url
self.session = requests.Session()
def _rpc_call(self, method: str, params: List = None) -> any:
"""發送 RPC 請求"""
payload = {
'jsonrpc': '2.0',
'method': method,
'params': params or [],
'id': 1
}
response = self.session.post(self.rpc_url, json=payload)
return response.json()['result']
def get_current_gas(self) -> GasRecommendation:
"""
取得當前 Gas 推薦
使用 eth_feeHistory 計算
"""
latest_block = int(self._rpc_call('eth_blockNumber'), 16)
# 取得最近 10 個區塊的費用歷史
fee_history = self._rpc_call('eth_feeHistory', [
'0xa', # 10 blocks
hex(latest_block - 10),
[20, 50, 80] # 百分位數
])
base_fee = int(fee_history['baseFeePerGas'][-1], 16)
rewards = [int(r, 16) for r in fee_history['reward']]
# 計算推薦值
return GasRecommendation(
slow=rewards[0] + base_fee,
standard=rewards[1] + base_fee,
fast=rewards[2] + base_fee,
base_fee=base_fee,
priority_fee=rewards[1]
)
def predict_future_gas(self, blocks_ahead: int = 3) -> Dict[str, int]:
"""
預測未來區塊的 Gas 費用
基於區塊空間供需模型
"""
latest_block = int(self._rpc_call('eth_blockNumber'), 16)
# 取得最近區塊的 Gas 使用情況
recent_gas = []
for i in range(10):
block_hex = hex(latest_block - i)
block = self._rpc_call('eth_getBlockByNumber', [block_hex, False])
recent_gas.append(int(block['gasUsed'], 16))
avg_gas = sum(recent_gas) / len(recent_gas)
target_gas = 15_000_000 # target gas per block
# 簡單線性預測模型
utilization = avg_gas / target_gas
current_fee = self.get_current_gas()
# 根據供需預測
if utilization > 0.8:
# 高擁堵,費用會上升
multiplier = 1 + (utilization - 0.8) * 0.5
elif utilization < 0.5:
# 低利用率,費用會下降
multiplier = 1 - (0.5 - utilization) * 0.3
else:
multiplier = 1.0
return {
'predicted_standard': int(current_fee.standard * multiplier),
'predicted_fast': int(current_fee.fast * multiplier),
'current_utilization': utilization,
'recommended_wait_blocks': 3 if utilization > 0.9 else 1
}
def get_gas_oracle_data(self) -> Dict:
"""
從 Gas Oracle 取得推薦數據
部分錢包和服務提供自己的 Oracle
"""
# EIP-1559 後錢包常用的推薦來源
# 這裡展示如何從多個來源聚合數據
sources = {
'eth_gas_station': self._get_eth_gas_station(),
'rpc': self.get_current_gas(),
'prediction': self.predict_future_gas()
}
# 聚合推薦
return {
'slow': sources['rpc'].slow,
'standard': sources['rpc'].standard,
'fast': sources['rpc'].fast,
'prediction': sources['prediction'],
'sources': sources
}
def _get_eth_gas_station(self) -> Optional[Dict]:
"""ETH Gas Station API"""
try:
response = self.session.get(
'https://ethgasstation.info/api/ethgasAPI.json'
)
data = response.json()
return {
'slow': int(data['safeLow'] * 1e9),
'standard': int(data['average'] * 1e9),
'fast': int(data['fast'] * 1e9)
}
except:
return None
# 使用範例
def gas_example():
tracker = GasTracker("https://mainnet.infura.io/v3/YOUR_PROJECT_ID")
# 取得當前 Gas
gas = tracker.get_current_gas()
print(f"當前 Gas 推薦 (Gwei):")
print(f" Slow: {gas.slow / 1e9:.2f}")
print(f" Standard: {gas.standard / 1e9:.2f}")
print(f" Fast: {gas.fast / 1e9:.2f}")
print(f" Base Fee: {gas.base_fee / 1e9:.2f}")
# 預測未來
prediction = tracker.predict_future_gas()
print(f"\n費用預測:")
print(f" 預測 Standard: {prediction['predicted_standard'] / 1e9:.2f} Gwei")
print(f" 當前利用率: {prediction['current_utilization']:.1%}")
四、The Graph 子圖查詢
4.1 The Graph 基礎與使用
The Graph 是一個用於查詢區塊鏈數據的分散式協定,透過 GraphQL 提供高效的數據查詢:
import requests
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class PoolData:
"""Uniswap 池子數據"""
id: str
token0_symbol: str
token1_symbol: str
token0_price: float
token1_price: float
volume_token0: float
volume_token1: float
tvl: float
fee_tier: int
class TheGraphClient:
"""The Graph 子圖查詢客戶端"""
# 常用子圖端點
SUBGRAPH_URLS = {
'uniswap_v3': 'https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v3',
'aave_v3': 'https://api.thegraph.com/subgraphs/name/aave/aave-v3-ethereum',
'compound': 'https://api.thegraph.com/subgraphs/name/compound-finance/compound-v3-ethereum',
'lido': 'https://api.thegraph.com/subgraphs/name/lido/lido-ethereum',
}
def __init__(self, subgraph_name: str = 'uniswap_v3'):
self.endpoint = self.SUBGRAPH_URLS.get(subgraph_name)
self.session = requests.Session()
def query(self, query: str, variables: Dict = None) -> Dict:
"""執行 GraphQL 查詢"""
payload = {
'query': query,
'variables': variables or {}
}
response = self.session.post(self.endpoint, json=payload)
data = response.json()
if 'errors' in data:
raise Exception(f"GraphQL Error: {data['errors']}")
return data['data']
def get_uniswap_top_pools(self, limit: int = 10) -> List[PoolData]:
"""取得 Uniswap V3 熱門池子"""
query = """
query GetTopPools($limit: Int!) {
pools(
first: $limit
orderBy: totalValueLockedUSD
orderDirection: desc
where: { volumeUSD_gt: 1000 }
) {
id
token0 {
symbol
}
token1 {
symbol
}
token0Price
token1Price
volumeToken0
volumeToken1
totalValueLockedUSD
feeTier
}
}
"""
data = self.query(query, {'limit': limit})
return [
PoolData(
id=pool['id'],
token0_symbol=pool['token0']['symbol'],
token1_symbol=pool['token1']['symbol'],
token0_price=float(pool['token0Price']),
token1_price=float(pool['token1Price']),
volume_token0=float(pool['volumeToken0']),
volume_token1=float(pool['volumeToken1']),
tvl=float(pool['totalValueLockedUSD']),
fee_tier=int(pool['feeTier'])
)
for pool in data['pools']
]
def get_token_swaps(self, pool_id: str, limit: int = 100) -> List[Dict]:
"""取得特定池子的近期交易"""
query = """
query GetSwaps($poolId: String!, $limit: Int!) {
swaps(
first: $limit
orderBy: timestamp
orderDirection: desc
where: { pool: $poolId }
) {
id
timestamp
sender
recipient
amount0
amount1
amountUSD
tick
}
}
"""
data = self.query(query, {'poolId': pool_id, 'limit': limit})
return data['swaps']
def get_aave_supply_borrow_rates(self, asset: str) -> Dict:
"""取得 Aave V3 借貸利率"""
query = """
query GetRates($asset: String!) {
reserves(where: { symbol: $asset }) {
symbol
liquidityRate
variableBorrowRate
stableBorrowRate
totalDeposits
totalBorrows
availableLiquidity
utilizationRate
}
}
"""
data = self.query(query, {'asset': asset})
if data['reserves']:
reserve = data['reserves'][0]
return {
'supply_rate': float(reserve['liquidityRate']) / 1e27,
'variable_borrow_rate': float(reserve['variableBorrowRate']) / 1e27,
'stable_borrow_rate': float(reserve['stableBorrowRate']) / 1e27,
'total_deposits': float(reserve['totalDeposits']),
'total_borrows': float(reserve['totalBorrows']),
'utilization': float(reserve['utilizationRate']) / 1e27
}
return {}
def get_lido_staking_stats(self) -> Dict:
"""取得 Lido 質押統計"""
query = """
query GetLidoStats {
lidoExtractors {
id
totalStaked
totalRewards
stETH Holders
}
validators(where: { status: "Active" }) {
count
}
}
"""
data = self.query(query)
return {
'total_staked': data['lidoExtractors'][0]['totalStaked'],
'total_rewards': data['lidoExtractors'][0]['totalRewards'],
'active_validators': data['validators'][0]['count']
}
# 使用範例
def the_graph_example():
# Uniswap V3 查詢
client = TheGraphClient('uniswap_v3')
pools = client.get_uniswap_top_pools(5)
print("Uniswap V3 熱門池子:")
for pool in pools:
print(f" {pool.token0_symbol}/{pool.token1_symbol}: "
f"TVL ${pool.tvl:,.0f}")
# Aave V3 利率查詢
aave_client = TheGraphClient('aave_v3')
rates = aave_client.get_aave_supply_borrow_rates('ETH')
print("\nAave ETH 利率:")
print(f" 存款利率: {rates['supply_rate']:.2%}")
print(f" 浮動借款利率: {rates['variable_borrow_rate']:.2%}")
print(f" 利用率: {rates['utilization']:.1%}")
五、DeFi 協議即時數據整合
5.1 多協議數據聚合器
以下是一個整合多個 DeFi 協議數據的完整範例:
from typing import Dict, List, Optional
from dataclasses import dataclass
from decimal import Decimal
import asyncio
import aiohttp
from abc import ABC, abstractmethod
@dataclass
class ProtocolStats:
"""協議統計數據"""
name: str
tvl: Decimal
daily_volume: Decimal
total_users: int
asset_count: int
@dataclass
class MarketData:
"""市場數據"""
eth_price: Decimal
gas_price_gwei: Decimal
total_defi_tvl: Decimal
timestamp: int
class ProtocolDataSource(ABC):
"""協議數據源抽象類"""
@abstractmethod
async def get_stats(self) -> ProtocolStats:
pass
class AaveDataSource(ProtocolDataSource):
"""Aave V3 數據源"""
def __init__(self, rpc_url: str):
self.rpc_url = rpc_url
async def get_stats(self) -> ProtocolStats:
# Aave V3 Pool 合約地址
pool_address = "0x87870Bca3F3fD6335C3FbdC83E7a82f43aa5B2"
# 透過 RPC 查詢 TVL
# 這裡使用 The Graph 獲取更完整的數據
async with aiohttp.ClientSession() as session:
query = """
{
reserves {
totalDeposits
totalBorrows
}
}
"""
async with session.post(
'https://api.thegraph.com/subgraphs/name/aave/aave-v3-ethereum',
json={'query': query}
) as resp:
data = await resp.json()
total_deposits = sum(
float(r['totalDeposits']) for r in data['data']['reserves']
)
return ProtocolStats(
name='Aave V3',
tvl=Decimal(str(total_deposits)),
daily_volume=Decimal('0'), # 需要歷史數據
total_users=0,
asset_count=len(data['data']['reserves'])
)
class UniswapDataSource(ProtocolDataSource):
"""Uniswap 數據源"""
async def get_stats(self) -> ProtocolStats:
async with aiohttp.ClientSession() as session:
query = """
{
factories(first: 1) {
totalValueLockedUSD
totalVolumeUSD
poolCount
}
}
"""
async with session.post(
'https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v3',
json={'query': query}
) as resp:
data = await resp.json()
factory = data['data']['factories'][0]
return ProtocolStats(
name='Uniswap V3',
tvl=Decimal(factory['totalValueLockedUSD']),
daily_volume=Decimal(factory['totalVolumeUSD']),
total_users=0,
asset_count=int(factory['poolCount'])
)
class DefiAggregator:
"""DeFi 數據聚合器"""
def __init__(self, rpc_url: str):
self.protocols: List[ProtocolDataSource] = [
AaveDataSource(rpc_url),
UniswapDataSource(rpc_url),
]
self.price_api = None # 初始化價格 API
async def get_all_protocol_stats(self) -> List[ProtocolStats]:
"""取得所有協議統計"""
tasks = [p.get_stats() for p in self.protocols]
return await asyncio.gather(*tasks)
async def get_market_overview(self) -> MarketData:
"""取得市場總覽"""
# 並行獲取各項數據
price_task = self._get_eth_price()
gas_task = self._get_gas_price()
tvl_task = self._get_total_defi_tvl()
eth_price, gas_price, total_tvl = await asyncio.gather(
price_task, gas_task, tvl_task
)
return MarketData(
eth_price=eth_price,
gas_price_gwei=gas_price,
total_defi_tvl=total_tvl,
timestamp=int(asyncio.get_event_loop().time())
)
async def _get_eth_price(self) -> Decimal:
"""取得 ETH 價格"""
async with aiohttp.ClientSession() as session:
async with session.get(
'https://api.coingecko.com/api/v3/simple/price'
'?ids=ethereum&vs_currencies=usd'
) as resp:
data = await resp.json()
return Decimal(str(data['ethereum']['usd']))
async def _get_gas_price(self) -> Decimal:
"""取得 Gas 價格"""
async with aiohttp.ClientSession() as session:
# 使用 ETH Gas Station
async with session.get(
'https://ethgasstation.info/api/ethgasAPI.json'
) as resp:
data = await resp.json()
return Decimal(data['average'])
async def _get_total_defi_tvl(self) -> Decimal:
"""計算總 DeFi TVL"""
# 聚合多個協議的 TVL
stats = await self.get_all_protocol_stats()
return sum(s.tvl for s in stats)
# 使用範例
async def defi_aggregator_example():
aggregator = DefiAggregator("https://mainnet.infura.io/v3/YOUR_KEY")
# 取得市場總覽
market = await aggregator.get_market_overview()
print(f"市場總覽:")
print(f" ETH 價格: ${market.eth_price}")
print(f" Gas 價格: {market.gas_price_gwei} Gwei")
print(f" 總 DeFi TVL: ${market.total_defi_tvl:,.0f}")
# 取得各協議數據
protocol_stats = await aggregator.get_all_protocol_stats()
print(f"\n協議 TVL:")
for stats in protocol_stats:
print(f" {stats.name}: ${stats.tvl:,.0f}")
六、實戰:構建即時數據儀表板
6.1 架構設計
一個完整的即時數據儀表板需要整合多種數據源:
┌─────────────────────────────────────────────────────────────────┐
│ 數據層 │
├────────────────┬─────────────────┬────────────────────────────┤
│ 區塊鏈 RPC │ 價格 API │ The Graph │
│ (Infura) │ (CoinGecko) │ (Subgraphs) │
└───────┬────────┴────────┬────────┴────────────┬───────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ API Gateway │
│ (速率限制、緩存、聚合) │
└───────────────────────────┬─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 應用層 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 價格監控 │ │ Gas 追蹤 │ │ DeFi 儀表板 │ │
│ │ 服務 │ │ 服務 │ │ 服務 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└───────────────────────────┬─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 展示層 │
│ (Web UI / API / WebSocket) │
└─────────────────────────────────────────────────────────────────┘
6.2 後端服務實現
from fastapi import FastAPI, HTTPException, WebSocket, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Dict, Optional
import asyncio
import json
from datetime import datetime
app = FastAPI(title="以太坊即時數據 API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 數據緩存
_cache = {
'eth_price': {'value': None, 'timestamp': 0},
'gas_price': {'value': None, 'timestamp': 0},
'market_data': {'value': None, 'timestamp': 0}
}
CACHE_TTL = 60 # 60 秒緩存
class PriceResponse(BaseModel):
"""價格響應模型"""
eth_usd: float
change_24h: float
timestamp: str
class GasResponse(BaseModel):
"""Gas 費用響應"""
slow: float
standard: float
fast: float
base_fee: float
timestamp: str
class MarketResponse(BaseModel):
"""市場數據響應"""
eth_price: float
gas_price: float
total_defi_tvl: float
top_pools: List[Dict]
timestamp: str
def _is_cache_valid(key: str) -> bool:
"""檢查緩存是否有效"""
if key not in _cache:
return False
age = datetime.now().timestamp() - _cache[key]['timestamp']
return age < CACHE_TTL
@app.get("/api/price", response_model=PriceResponse)
async def get_price():
"""取得 ETH 價格"""
if _is_cache_valid('eth_price'):
return _cache['eth_price']['value']
# 獲取新數據
api = CoinGeckoAPI()
price_data = api.get_eth_price()
response eth_usd=float(price_data = PriceResponse(
.usd),
change_24h=float(price_data.usd_24h_change),
timestamp=price_data.last_updated
)
_cache['eth_price'] = {
'value': response,
'timestamp': datetime.now().timestamp()
}
return response
@app.get("/api/gas", response_model=GasResponse)
async def get_gas():
"""取得 Gas 費用"""
if _is_cache_valid('gas_price'):
return _cache['gas_price']['value']
tracker = GasTracker("https://mainnet.infura.io/v3/YOUR_KEY")
gas_data = tracker.get_current_gas()
response = GasResponse(
slow=gas_data.slow / 1e9,
standard=gas_data.standard / 1e9,
fast=gas_data.fast / 1e9,
base_fee=gas_data.base_fee / 1e9,
timestamp=datetime.now().isoformat()
)
_cache['gas_price'] = {
'value': response,
'timestamp': datetime.now().timestamp()
}
return response
@app.get("/api/market", response_model=MarketResponse)
async def get_market():
"""取得市場綜合數據"""
if _is_cache_valid('market_data'):
return _cache['market_data']['value']
# 獲取各項數據
api = CoinGeckoAPI()
eth_price = api.get_eth_price()
tracker = GasTracker("https://mainnet.infura.io/v3/YOUR_KEY")
gas = tracker.get_current_gas()
# 獲取 DeFi TVL
graph = TheGraphClient('uniswap_v3')
pools = graph.get_uniswap_top_pools(5)
response = MarketResponse(
eth_price=float(eth_price.usd),
gas_price=gas.standard / 1e9,
total_defi_tvl=sum(p.tvl for p in pools) * 2, # 估算
top_pools=[
{
'pair': f"{p.token0_symbol}/{p.token1_symbol}",
'tvl': p.tvl
}
for p in pools
],
timestamp=datetime.now().isoformat()
)
_cache['market_data'] = {
'value': response,
'timestamp': datetime.now().timestamp()
}
return response
# WebSocket 即時更新
@app.websocket("/ws/price")
async def websocket_price(websocket: WebSocket):
"""WebSocket 即時價格更新"""
await websocket.accept()
try:
while True:
api = CoinGeckoAPI()
price = api.get_eth_price()
await websocket.send_json({
'type': 'price',
'data': {
'eth_usd': float(price.usd),
'change_24h': float(price.usd_24h_change),
'timestamp': price.last_updated
}
})
await asyncio.sleep(10) # 每 10 秒更新
except Exception as e:
await websocket.close()
# 後台任務:定時更新緩存
async def update_cache_periodically():
"""定時更新緩存"""
while True:
await asyncio.sleep(30)
try:
# 更新所有緩存
api = CoinGeckoAPI()
price_data = api.get_eth_price()
_cache['eth_price'] = {
'value': PriceResponse(
eth_usd=float(price_data.usd),
change_24h=float(price_data.usd_24h_change),
timestamp=price_data.last_updated
),
'timestamp': datetime.now().timestamp()
}
except Exception as e:
print(f"Cache update error: {e}")
@app.on_event("startup")
async def startup_event():
"""啟動時的背景任務"""
asyncio.create_task(update_cache_periodically())
# 啟動服務
# uvicorn main:app --reload --port 8000
七、最佳實踐與效能優化
7.1 數據獲取策略
class OptimizedDataFetcher:
"""優化的數據獲取器"""
def __init__(self):
self.cache = {}
self.rate_limiter = asyncio.Semaphore(10)
async def fetch_with_cache(self,
key: str,
fetch_func,
ttl: int = 60):
"""帶緩存的獲取"""
now = asyncio.get_event_loop().time()
if key in self.cache:
data, timestamp = self.cache[key]
if now - timestamp < ttl:
return data
# 獲取新數據
async with self.rate_limiter:
data = await fetch_func()
self.cache[key] = (data, now)
return data
def batch_fetch(self, items: List[Dict], fetch_func) -> List:
"""批次獲取"""
return asyncio.gather(*[
fetch_func(item) for item in items
])
7.2 錯誤處理與重試
import asyncio
from functools import wraps
def retry_on_error(max_retries: int = 3, delay: float = 1.0):
"""錯誤重試裝飾器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(delay * (attempt + 1))
return wrapper
return decorator
@retry_on_error(max_retries=3, delay=1.0)
async def safe_api_call(api_func):
"""安全的 API 調用"""
return await api_func()
結論
本指南涵蓋了以太坊即時數據獲取的各個面向,從基礎的 RPC 調用到複雜的多協議數據聚合。這些技術是構建高效以太坊應用的基礎,建議開發者根據實際需求選擇合適的數據源,並遵循以下原則:
- 選擇合適的數據源:免費 API 適合開發和測試,生產環境應考慮付費服務
- 實作緩存機制:減少 API 調用頻率,提昇回應速度
- 做好錯誤處理:網路請求總是可能失敗,需要完善的降級策略
- 遵守速率限制:尊重 API 提供商的使用限制,避免被封禁
- 保持數據新鮮:根據應用場景選擇合適的刷新頻率
透過這些實踐,可以構建出穩定、高效的以太坊數據應用。
相關文章
- 以太坊虛擬機(EVM)深度技術分析:Opcode、執行模型與狀態轉換的數學原理 — 以太坊虛擬機(EVM)是以太坊智能合約運行的核心環境,被譽為「世界電腦」。本文從計算機科學和密碼學的角度,深入剖析 EVM 的架構設計、Opcode 操作機制、執行模型、以及狀態轉換的數學原理,提供完整的技術細節和工程視角,包括詳細的 Gas 消耗模型和實際的優化策略。
- ERC-4626 Tokenized Vault 完整實現指南:從標準規範到生產級合約 — 本文深入探討 ERC-4626 標準的技術細節,提供完整的生產級合約實現。內容涵蓋標準接口定義、資產與份額轉換的數學模型、收益策略整合、費用機制設計,並提供可直接部署的 Solidity 代碼範例。通過本指南,開發者可以構建安全可靠的代幣化 vault 系統。
- 以太坊智能合約開發實戰:從基礎到 DeFi 協議完整代碼範例指南 — 本文提供以太坊智能合約開發的完整實戰指南,透過可直接運行的 Solidity 代碼範例,幫助開發者從理論走向實踐。內容涵蓋基礎合約開發、借貸協議實作、AMM 機制實現、以及中文圈特有的應用場景(台灣交易所整合、香港監管合規、Singapore MAS 牌照申請)。本指南假設讀者具備基本的程式設計基礎,熟悉 JavaScript 或 Python 等語言,並對區塊鏈概念有基本理解。
- EigenLayer 再質押風險模擬與量化分析:從理論到實踐的完整框架 — 本文深入探討 EigenLayer 再質押協議的風險評估框架與量化分析方法。我們提供完整的質押收益率計算模型、風險調整後收益評估、Monte Carlo 模擬框架,以及 Solidity 智能合約風險示例代碼。通過實際可運行的 Python 程式碼和詳細的風險指標解讀,幫助投資者和開發者系統性地評估和管理再質押風險,做出更明智的質押決策。
- AI Agent 與以太坊整合深度技術分析:2024-2026 年區塊鏈驅動的自主智能系統完整指南 — AI Agent 與區塊鏈技術的結合正在重新定義數位系統的可能性。本文深入分析 AI Agent 的技術架構、與以太坊的多種整合模式、主要應用場景和安全考量。從自動化 DeFi 策略到智能投資組合管理,我們提供全面的技術分析和實踐指南。
延伸閱讀與來源
- Ethereum.org Developers 官方開發者入口與技術文件
- EIPs 以太坊改進提案
這篇文章對您有幫助嗎?
請告訴我們如何改進:
評論
發表評論
注意:由於這是靜態網站,您的評論將儲存在本地瀏覽器中,不會公開顯示。
目前尚無評論,成為第一個發表評論的人吧!