以太坊即時數據整合開發完整指南:從 API 串接到實際應用的工程實踐

在以太坊開發中,即時數據的獲取與處理是構建高效 DApp 的核心能力。本指南從工程師視角出發,深入探討以太坊生態系統中各類即時數據的獲取方式,提供完整的 API 整合範例。我們涵蓋 RPC 節點服務整合、CoinGecko 價格 API、Gas 費用預測、The Graph 子圖查詢、DeFi 協議數據聚合等主題,並展示如何構建一個實際的即時數據儀表板。每個章節都包含可運作的程式碼範例與最佳實踐建議。

以太坊即時數據整合開發完整指南:從 API 串接到實際應用的工程實踐

概述

在以太坊開發中,即時數據的獲取與處理是構建高效 DApp 的核心能力。無論是顯示即時 ETH 價格、追蹤 Gas 費用、監控合約狀態,還是分析鏈上活動,都需要可靠的數據來源。本指南從工程師視角出發,深入探討以太坊生態系統中各類即時數據的獲取方式,提供完整的 API 整合範例,並展示如何構建一個實際的即時數據儀表板。

一、以太坊即時數據生態概述

1.1 數據類型分類

以太坊生態中的即時數據可分為以下幾大類型:

鏈上數據

協議層數據

市場數據

節點數據

1.2 主要數據提供商生態

主流 RPC 與數據服務提供商:

| 類別 | 服務商 | 特色 | 免費額度 |
|-----|-------|------|---------|
| RPC | Infura | 最廣泛使用,企業級 | 100k req/day |
| RPC | Alchemy | 高可靠性,免費 tier 強 | 10M compute units |
| RPC | QuickNode | 全球分布,高效能 | 50k req/day |
| 數據 | Etherscan | 區塊瀏覽器龍頭 | 5 req/sec |
| 數據 | Dune Analytics | SQL 查詢強大 | 免費查詢 |
| 數據 | The Graph |  GraphQL 子圖 | 100k queries |
| 數據 | CoinGecko | 價格數據 | 10k req/min |
| 數據 | Coingecko API | 即時價格 | 10-30 req/min |

二、以太坊 RPC 節點服務整合

2.1 RPC API 基礎詳解

以太坊 JSON-RPC 是與以太坊節點通信的標準接口。理解 RPC 方法是獲取鏈上數據的基礎。

常用 RPC 方法分類

// 區塊與狀態查詢
eth_blockNumber              // 取得最新區塊編號
eth_getBlockByNumber        // 依區塊編號取得區塊詳情
eth_getTransactionByHash     // 依交易雜湊取得交易詳情
eth_getTransactionReceipt   // 取得交易收據
eth_call                    // 執行合約讀取調用
eth_getBalance              // 取得帳戶餘額
eth_getCode                 // 取得帳戶代碼

// 事件與日誌
eth_getLogs                 // 取得區塊日誌
eth_newFilter               // 建立事件過濾器
eth_getFilterChanges       // 取得過濾器變更

// 狀態與估計
eth_estimateGas            // 估算 Gas 用量
eth_gasPrice               // 取得當前 Gas 價格
eth_feeHistory             // 取得歷史費用數據

2.2 程式碼實作:基礎 RPC 客戶端

以下是使用 Python 整合以太坊 RPC 的完整範例:

import requests
import json
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from decimal import Decimal
import time

@dataclass
class BlockData:
    number: int
    hash: str
    parent_hash: str
    timestamp: int
    gas_used: int
    gas_limit: int
    transactions: List[str]
    miner: str
    difficulty: int

@dataclass
class TransactionData:
    hash: str
    block_number: int
    from_address: str
    to_address: str
    value: int
    gas: int
    gas_price: int
    nonce: int
    input_data: str

class EthereumRPCClient:
    """以太坊 RPC 客戶端封裝"""
    
    def __init__(self, rpc_url: str, api_key: Optional[str] = None):
        """
        初始化 RPC 客戶端
        
        Args:
            rpc_url: RPC 端點 URL
            api_key: API Key(如需要)
        """
        self.rpc_url = rpc_url
        self.api_key = api_key
        self.session = requests.Session()
        self.session.headers.update({
            'Content-Type': 'application/json'
        })
    
    def _make_request(self, method: str, params: List[Any] = None) -> Any:
        """發送 RPC 請求"""
        payload = {
            'jsonrpc': '2.0',
            'method': method,
            'params': params or [],
            'id': 1
        }
        
        response = self.session.post(self.rpc_url, json=payload)
        result = response.json()
        
        if 'error' in result:
            raise Exception(f"RPC Error: {result['error']}")
        
        return result.get('result')
    
    def get_block_number(self) -> int:
        """取得最新區塊編號"""
        return int(self._make_request('eth_blockNumber'), 16)
    
    def get_block_by_number(self, block_number: int, full_transactions: bool = False) -> BlockData:
        """
        依區塊編號取得區塊詳情
        
        Args:
            block_number: 區塊編號
            full_transactions: 是否包含完整交易資料
        """
        hex_number = hex(block_number)
        params = [hex_number, full_transactions]
        block = self._make_request('eth_getBlockByNumber', params)
        
        return BlockData(
            number=int(block['number'], 16),
            hash=block['hash'],
            parent_hash=block['parentHash'],
            timestamp=int(block['timestamp'], 16),
            gas_used=int(block['gasUsed'], 16),
            gas_limit=int(block['gasLimit'], 16),
            transactions=[tx['hash'] for tx in block['transactions']] if full_transactions else block['transactions'],
            miner=block['miner'],
            difficulty=int(block['totalDifficulty'], 16) if 'totalDifficulty' in block else 0
        )
    
    def get_balance(self, address: str, block: str = 'latest') -> Decimal:
        """
        取得帳戶 ETH 餘額
        
        Args:
            address: 以太坊地址
            block: 區塊標識(latest, earliest, pending 或區塊編號)
        
        Returns:
            以 ETH 為單位的餘額
        """
        params = [address, block]
        wei_balance = int(self._make_request('eth_getBalance', params), 16)
        return Decimal(wei_balance) / Decimal(10**18)
    
    def get_transaction(self, tx_hash: str) -> TransactionData:
        """依交易雜湊取得交易詳情"""
        tx = self._make_request('eth_getTransactionByHash', [tx_hash])
        
        return TransactionData(
            hash=tx['hash'],
            block_number=int(tx['blockNumber'], 16) if tx['blockNumber'] else 0,
            from_address=tx['from'],
            to_address=tx['to'] or '',
            value=int(tx['value'], 16),
            gas=int(tx['gas'], 16),
            gas_price=int(tx['gasPrice'], 16),
            nonce=int(tx['nonce'], 16),
            input_data=tx['input']
        )
    
    def get_gas_price(self) -> Dict[str, int]:
        """
        取得當前 Gas 價格
        
        Returns:
            包含 low, medium, high 的字典(以 Wei 為單位)
        """
        # 取得 pending 區塊的費用歷史
        latest_block = self.get_block_number()
        hex_blocks = hex(latest_block - 10)
        
        try:
            fee_history = self._make_request('eth_feeHistory', [
                '0x5',  # 最近 5 個區塊
                hex_blocks,
                [10, 50, 90]  # 百分位數
            ])
            
            base_fee_per_gas = int(fee_history['baseFeePerGas'][-1], 16)
            priority_fee_per_gas = fee_history['reward']
            
            return {
                'low': int(priority_fee_per_gas[0], 16),
                'medium': int(priority_fee_per_gas[1], 16),
                'high': int(priority_fee_per_gas[2], 16),
                'base_fee': base_fee_per_gas
            }
        except Exception:
            # 備用方案:直接獲取 gas price
            gas_price = int(self._make_request('eth_gasPrice'), 16)
            return {
                'low': gas_price // 2,
                'medium': gas_price,
                'high': gas_price * 2,
                'base_fee': gas_price
            }
    
    def estimate_gas(self, tx_params: Dict[str, Any]) -> int:
        """
        估算交易 Gas 用量
        
        Args:
            tx_params: 交易參數,包含 to, value, data 等
        """
        return int(self._make_request('eth_estimateGas', [tx_params]), 16)
    
    def get_token_balance(self, token_address: str, wallet_address: str, block: str = 'latest') -> Dict[str, Any]:
        """
        取得 ERC-20 代幣餘額
        
        使用 ERC-20 standard balanceOf 方法
        """
        # ERC-20 balanceOf 方法簽名
        method_id = '0x70a08231'
        # wallet address padding to 32 bytes
        padded_wallet = wallet_address[2:].zfill(64)
        data = method_id + padded_wallet
        
        params = [{
            'to': token_address,
            'data': data
        }, block]
        
        result = int(self._make_request('eth_call', params), 16)
        
        return {
            'raw': result,
            'decimals': 18,  # 應從合約獲取
            'formatted': result / (10**18)
        }


# 使用範例
def main():
    # 使用 Infura RPC
    rpc_url = "https://mainnet.infura.io/v3/YOUR_PROJECT_ID"
    client = EthereumRPCClient(rpc_url)
    
    # 取得最新區塊
    block_number = client.get_block_number()
    print(f"最新區塊編號: {block_number}")
    
    # 取得區塊詳情
    block = client.get_block_by_number(block_number)
    print(f"區塊時間戳: {block.timestamp}")
    print(f"Gas 使用量: {block.gas_used}/{block.gas_limit}")
    
    # 取得餘額
    balance = client.get_balance("0x742d35Cc6634C0532925a3b844Bc9e7595f0eB5e")
    print(f"ETH 餘額: {balance}")
    
    # 取得 Gas 價格
    gas_prices = client.get_gas_price()
    print(f"Gas 價格 (Gwei): low={gas_prices['low']/1e9}, "
          f"medium={gas_prices['medium']/1e9}, high={gas_prices['high']/1e9}")


if __name__ == "__main__":
    main()

2.3 批次查詢與效能優化

在處理大量數據時,需要注意請求效率優化:

import asyncio
import aiohttp
from typing import List, Dict, Any
import json

class BatchRPCClient:
    """批次 RPC 客戶端,支援並發請求"""
    
    def __init__(self, rpc_url: str, max_concurrent: int = 10):
        self.rpc_url = rpc_url
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def _make_request(self, session: aiohttp.ClientSession, 
                          method: str, params: List[Any]) -> Dict:
        """發送單個請求"""
        async with self.semaphore:
            payload = {
                'jsonrpc': '2.0',
                'method': method,
                'params': params,
                'id': 1
            }
            
            async with session.post(self.rpc_url, json=payload) as response:
                return await response.json()
    
    async def batch_get_balances(self, addresses: List[str]) -> Dict[str, Dict]:
        """
        批次取得多個地址餘額
        
        這比串行請求快 10-100 倍
        """
        async with aiohttp.ClientSession() as session:
            tasks = [
                self._make_request(session, 'eth_getBalance', [addr, 'latest'])
                for addr in addresses
            ]
            
            results = await asyncio.gather(*tasks)
            
            return {
                addr: {
                    'wei': int(res['result'], 16),
                    'eth': int(res['result'], 16) / 1e18
                }
                for addr, res in zip(addresses, results)
            }
    
    async def batch_get_token_balances(self, 
                                       token_address: str,
                                       addresses: List[str]) -> Dict[str, int]:
        """批次取得 ERC-20 代幣餘額"""
        method_id = '0x70a08231'
        
        async with aiohttp.ClientSession() as session:
            tasks = []
            for addr in addresses:
                padded_addr = addr[2:].zfill(64)
                data = method_id + padded_addr
                params = [{'to': token_address, 'data': data}, 'latest']
                tasks.append(
                    self._make_request(session, 'eth_call', params)
                )
            
            results = await asyncio.gather(*tasks)
            
            return {
                addr: int(res['result'], 16)
                for addr, res in zip(addresses, results)
            }
    
    async def get_block_transactions(self, 
                                    block_numbers: List[int]) -> Dict[int, List[str]]:
        """批次取得多個區塊的交易"""
        async with aiohttp.ClientSession() as session:
            tasks = [
                self._make_request(session, 'eth_getBlockByNumber', 
                                 [hex(num), False])
                for num in block_numbers
            ]
            
            results = await asyncio.gather(*tasks)
            
            return {
                num: [tx['hash'] for tx in res['result']['transactions']]
                for num, res in zip(block_numbers, results)
            }


# 使用範例:批次查詢
async def batch_example():
    client = BatchRPCClient("https://mainnet.infura.io/v3/YOUR_PROJECT_ID")
    
    # 批次查詢 100 個地址的餘額
    addresses = [f"0x{'0' * 40}"жалей" for _ in range(100)]  # 示例地址
    balances = await client.batch_get_balances(addresses)
    
    print(f"查詢了 {len(balances)} 個地址")
    
    # 批次查詢某代幣餘額
    usdc_address = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"  # USDC
    token_balances = await client.batch_get_token_balances(
        usdc_address, 
        addresses[:50]
    )
    
    print(f"代幣餘額查詢完成")


# 執行
asyncio.run(batch_example())

三、即時價格數據 API 整合

3.1 CoinGecko API 實作

CoinGecko 提供免費的加密貨幣價格 API,是獲取即時價格數據的首選:

import requests
import time
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from decimal import Decimal
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class PriceData:
    """價格數據結構"""
    symbol: str
    usd: Decimal
    usd_24h_change: Decimal
    usd_24h_vol: Decimal
    usd_market_cap: Decimal
    last_updated: str

class CoinGeckoAPI:
    """CoinGecko API 封裝"""
    
    BASE_URL = "https://api.coingecko.com/api/v3"
    RATE_LIMIT = 10  # 免費版 10-30 calls/minute
    
    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key
        self.session = requests.Session()
        self.last_request_time = 0
        
        if api_key:
            self.session.headers['x-cg-demo-api-key'] = api_key
    
    def _rate_limit_wait(self):
        """速率限制"""
        elapsed = time.time() - self.last_request_time
        if elapsed < (60 / self.RATE_LIMIT):
            time.sleep((60 / self.RATE_LIMIT) - elapsed)
        self.last_request_time = time.time()
    
    def get_price(self, 
                 ids: List[str],
                 vs_currencies: List[str] = ['usd'],
                 include_24hr_change: bool = True,
                 include_24hr_vol: bool = True,
                 include_market_cap: bool = True) -> Dict[str, Any]:
        """
        取得加密貨幣價格
        
        Args:
            ids: CoinGecko 代幣 ID(如 ethereum, bitcoin)
            vs_currencies: 報價貨幣
            include_24hr_change: 包含 24 小時漲跌幅
            include_24hr_vol: 包含 24 小時交易量
            include_market_cap: 包含市值
        
        Returns:
            價格數據字典
        """
        self._rate_limit_wait()
        
        endpoint = f"{self.BASE_URL}/simple/price"
        params = {
            'ids': ','.join(ids),
            'vs_currencies': ','.join(vs_currencies),
            'include_24hr_change': str(include_24hr_change).lower(),
            'include_24hr_vol': str(include_24hr_vol).lower(),
            'include_market_cap': str(include_market_cap).lower()
        }
        
        response = self.session.get(endpoint, params=params)
        response.raise_for_status()
        
        return response.json()
    
    def get_eth_price(self) -> PriceData:
        """取得 ETH 即時價格"""
        data = self.get_price(
            ids=['ethereum'],
            vs_currencies=['usd'],
            include_24hr_change=True,
            include_24hr_vol=True,
            include_market_cap=True
        )['ethereum']
        
        return PriceData(
            symbol='ETH',
            usd=Decimal(str(data['usd'])),
            usd_24h_change=Decimal(str(data.get('usd_24h_change', 0))),
            usd_24h_vol=Decimal(str(data.get('usd_24h_vol', 0))),
            usd_market_cap=Decimal(str(data.get('usd_market_cap', 0))),
            last_updated=time.strftime('%Y-%m-%d %H:%M:%S')
        )
    
    def get_token_prices(self, contract_addresses: List[str]) -> Dict[str, Any]:
        """
        依合約地址取得代幣價格
        
        適用於 ERC-20 代幣
        """
        self._rate_limit_wait()
        
        endpoint = f"{self.BASE_URL}/simple/token_price/ethereum"
        params = {
            'contract_addresses': ','.join(contract_addresses),
            'vs_currencies': 'usd'
        }
        
        response = self.session.get(endpoint, params=params)
        
        return response.json()
    
    def get_market_chart(self, 
                        coin_id: str,
                        vs_currency: str = 'usd',
                        days: int = 7) -> Dict[str, List[List[float]]]:
        """
        取得市場歷史數據
        
        Args:
            coin_id: 代幣 ID
            vs_currency: 報價貨幣
            days: 天數(1, 7, 30, 90, 365, max)
        
        Returns:
            價格歷史陣列
        """
        self._rate_limit_wait()
        
        endpoint = f"{self.BASE_URL}/coins/{coin_id}/market_chart"
        params = {
            'vs_currency': vs_currency,
            'days': days
        }
        
        response = self.session.get(endpoint, params=params)
        response.raise_for_status()
        
        return response.json()


class PriceMonitor:
    """價格監控器"""
    
    def __init__(self, api: CoinGeckoAPI, alert_threshold_percent: float = 5.0):
        self.api = api
        self.alert_threshold = alert_threshold_percent
        self.last_price = None
    
    async def start_monitoring(self, interval_seconds: int = 60):
        """開始監控價格"""
        import asyncio
        
        while True:
            try:
                current_price = self.api.get_eth_price()
                
                if self.last_price:
                    change_percent = abs(
                        (current_price.usd - self.last_price.usd) 
                        / self.last_price.usd * 100
                    )
                    
                    if change_percent > self.alert_threshold:
                        logger.warning(
                            f"價格變動 alert: ETH ${current_price.usd} "
                            f"({change_percent:.2f}% 變動)"
                        )
                
                self.last_price = current_price
                logger.info(f"ETH Price: ${current_price.usd}")
                
            except Exception as e:
                logger.error(f"Price fetch error: {e}")
            
            await asyncio.sleep(interval_seconds)


# 使用範例
def price_example():
    api = CoinGeckoAPI()
    
    # 取得 ETH 價格
    eth_price = api.get_eth_price()
    print(f"ETH 價格: ${eth_price.usd}")
    print(f"24小時變動: {eth_price.usd_24h_change}%")
    print(f"24小時交易量: ${eth_price.usd_24h_vol:,.0f}")
    
    # 取得多個代幣價格
    prices = api.get_price(
        ids=['ethereum', 'bitcoin', 'usd-coin', 'tether'],
        include_24hr_change=True
    )
    
    print("\n多代幣報價:")
    for token, data in prices.items():
        print(f"  {token}: ${data['usd']} ({data.get('usd_24h_change', 0):.2f}%)")
    
    # 取得歷史數據
    chart = api.get_market_chart('ethereum', days=30)
    prices_history = chart['prices']
    
    print(f"\n近 30 天價格範圍:")
    prices_only = [p[1] for p in prices_history]
    print(f"  最高: ${max(prices_only):.2f}")
    print(f"  最低: ${min(prices_only):.2f}")

3.2 即時 Gas 費用預測

Gas 費用的預測對於優化交易成本至關重要:

import requests
from typing import Dict, List, Optional
from dataclasses import dataclass
import time

@dataclass
class GasRecommendation:
    """Gas 推薦數據"""
    slow: int      # Gwei
    standard: int  # Gwei
    fast: int      # Gwei
    base_fee: int  # Gwei
    priority_fee: int  # Gwei

class GasTracker:
    """Gas 費用追蹤器"""
    
    def __init__(self, rpc_url: str):
        self.rpc_url = rpc_url
        self.session = requests.Session()
    
    def _rpc_call(self, method: str, params: List = None) -> any:
        """發送 RPC 請求"""
        payload = {
            'jsonrpc': '2.0',
            'method': method,
            'params': params or [],
            'id': 1
        }
        
        response = self.session.post(self.rpc_url, json=payload)
        return response.json()['result']
    
    def get_current_gas(self) -> GasRecommendation:
        """
        取得當前 Gas 推薦
        
        使用 eth_feeHistory 計算
        """
        latest_block = int(self._rpc_call('eth_blockNumber'), 16)
        
        # 取得最近 10 個區塊的費用歷史
        fee_history = self._rpc_call('eth_feeHistory', [
            '0xa',  # 10 blocks
            hex(latest_block - 10),
            [20, 50, 80]  # 百分位數
        ])
        
        base_fee = int(fee_history['baseFeePerGas'][-1], 16)
        rewards = [int(r, 16) for r in fee_history['reward']]
        
        # 計算推薦值
        return GasRecommendation(
            slow=rewards[0] + base_fee,
            standard=rewards[1] + base_fee,
            fast=rewards[2] + base_fee,
            base_fee=base_fee,
            priority_fee=rewards[1]
        )
    
    def predict_future_gas(self, blocks_ahead: int = 3) -> Dict[str, int]:
        """
        預測未來區塊的 Gas 費用
        
        基於區塊空間供需模型
        """
        latest_block = int(self._rpc_call('eth_blockNumber'), 16)
        
        # 取得最近區塊的 Gas 使用情況
        recent_gas = []
        for i in range(10):
            block_hex = hex(latest_block - i)
            block = self._rpc_call('eth_getBlockByNumber', [block_hex, False])
            recent_gas.append(int(block['gasUsed'], 16))
        
        avg_gas = sum(recent_gas) / len(recent_gas)
        target_gas = 15_000_000  # target gas per block
        
        # 簡單線性預測模型
        utilization = avg_gas / target_gas
        
        current_fee = self.get_current_gas()
        
        # 根據供需預測
        if utilization > 0.8:
            # 高擁堵,費用會上升
            multiplier = 1 + (utilization - 0.8) * 0.5
        elif utilization < 0.5:
            # 低利用率,費用會下降
            multiplier = 1 - (0.5 - utilization) * 0.3
        else:
            multiplier = 1.0
        
        return {
            'predicted_standard': int(current_fee.standard * multiplier),
            'predicted_fast': int(current_fee.fast * multiplier),
            'current_utilization': utilization,
            'recommended_wait_blocks': 3 if utilization > 0.9 else 1
        }
    
    def get_gas_oracle_data(self) -> Dict:
        """
        從 Gas Oracle 取得推薦數據
        
        部分錢包和服務提供自己的 Oracle
        """
        # EIP-1559 後錢包常用的推薦來源
        # 這裡展示如何從多個來源聚合數據
        
        sources = {
            'eth_gas_station': self._get_eth_gas_station(),
            'rpc': self.get_current_gas(),
            'prediction': self.predict_future_gas()
        }
        
        # 聚合推薦
        return {
            'slow': sources['rpc'].slow,
            'standard': sources['rpc'].standard,
            'fast': sources['rpc'].fast,
            'prediction': sources['prediction'],
            'sources': sources
        }
    
    def _get_eth_gas_station(self) -> Optional[Dict]:
        """ETH Gas Station API"""
        try:
            response = self.session.get(
                'https://ethgasstation.info/api/ethgasAPI.json'
            )
            data = response.json()
            
            return {
                'slow': int(data['safeLow'] * 1e9),
                'standard': int(data['average'] * 1e9),
                'fast': int(data['fast'] * 1e9)
            }
        except:
            return None


# 使用範例
def gas_example():
    tracker = GasTracker("https://mainnet.infura.io/v3/YOUR_PROJECT_ID")
    
    # 取得當前 Gas
    gas = tracker.get_current_gas()
    print(f"當前 Gas 推薦 (Gwei):")
    print(f"  Slow: {gas.slow / 1e9:.2f}")
    print(f"  Standard: {gas.standard / 1e9:.2f}")
    print(f"  Fast: {gas.fast / 1e9:.2f}")
    print(f"  Base Fee: {gas.base_fee / 1e9:.2f}")
    
    # 預測未來
    prediction = tracker.predict_future_gas()
    print(f"\n費用預測:")
    print(f"  預測 Standard: {prediction['predicted_standard'] / 1e9:.2f} Gwei")
    print(f"  當前利用率: {prediction['current_utilization']:.1%}")

四、The Graph 子圖查詢

4.1 The Graph 基礎與使用

The Graph 是一個用於查詢區塊鏈數據的分散式協定,透過 GraphQL 提供高效的數據查詢:

import requests
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime

@dataclass
class PoolData:
    """Uniswap 池子數據"""
    id: str
    token0_symbol: str
    token1_symbol: str
    token0_price: float
    token1_price: float
    volume_token0: float
    volume_token1: float
    tvl: float
    fee_tier: int

class TheGraphClient:
    """The Graph 子圖查詢客戶端"""
    
    # 常用子圖端點
    SUBGRAPH_URLS = {
        'uniswap_v3': 'https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v3',
        'aave_v3': 'https://api.thegraph.com/subgraphs/name/aave/aave-v3-ethereum',
        'compound': 'https://api.thegraph.com/subgraphs/name/compound-finance/compound-v3-ethereum',
        'lido': 'https://api.thegraph.com/subgraphs/name/lido/lido-ethereum',
    }
    
    def __init__(self, subgraph_name: str = 'uniswap_v3'):
        self.endpoint = self.SUBGRAPH_URLS.get(subgraph_name)
        self.session = requests.Session()
    
    def query(self, query: str, variables: Dict = None) -> Dict:
        """執行 GraphQL 查詢"""
        payload = {
            'query': query,
            'variables': variables or {}
        }
        
        response = self.session.post(self.endpoint, json=payload)
        data = response.json()
        
        if 'errors' in data:
            raise Exception(f"GraphQL Error: {data['errors']}")
        
        return data['data']
    
    def get_uniswap_top_pools(self, limit: int = 10) -> List[PoolData]:
        """取得 Uniswap V3 熱門池子"""
        query = """
        query GetTopPools($limit: Int!) {
            pools(
                first: $limit
                orderBy: totalValueLockedUSD
                orderDirection: desc
                where: { volumeUSD_gt: 1000 }
            ) {
                id
                token0 {
                    symbol
                }
                token1 {
                    symbol
                }
                token0Price
                token1Price
                volumeToken0
                volumeToken1
                totalValueLockedUSD
                feeTier
            }
        }
        """
        
        data = self.query(query, {'limit': limit})
        
        return [
            PoolData(
                id=pool['id'],
                token0_symbol=pool['token0']['symbol'],
                token1_symbol=pool['token1']['symbol'],
                token0_price=float(pool['token0Price']),
                token1_price=float(pool['token1Price']),
                volume_token0=float(pool['volumeToken0']),
                volume_token1=float(pool['volumeToken1']),
                tvl=float(pool['totalValueLockedUSD']),
                fee_tier=int(pool['feeTier'])
            )
            for pool in data['pools']
        ]
    
    def get_token_swaps(self, pool_id: str, limit: int = 100) -> List[Dict]:
        """取得特定池子的近期交易"""
        query = """
        query GetSwaps($poolId: String!, $limit: Int!) {
            swaps(
                first: $limit
                orderBy: timestamp
                orderDirection: desc
                where: { pool: $poolId }
            ) {
                id
                timestamp
                sender
                recipient
                amount0
                amount1
                amountUSD
                tick
            }
        }
        """
        
        data = self.query(query, {'poolId': pool_id, 'limit': limit})
        return data['swaps']
    
    def get_aave_supply_borrow_rates(self, asset: str) -> Dict:
        """取得 Aave V3 借貸利率"""
        query = """
        query GetRates($asset: String!) {
            reserves(where: { symbol: $asset }) {
                symbol
                liquidityRate
                variableBorrowRate
                stableBorrowRate
                totalDeposits
                totalBorrows
                availableLiquidity
                utilizationRate
            }
        }
        """
        
        data = self.query(query, {'asset': asset})
        
        if data['reserves']:
            reserve = data['reserves'][0]
            return {
                'supply_rate': float(reserve['liquidityRate']) / 1e27,
                'variable_borrow_rate': float(reserve['variableBorrowRate']) / 1e27,
                'stable_borrow_rate': float(reserve['stableBorrowRate']) / 1e27,
                'total_deposits': float(reserve['totalDeposits']),
                'total_borrows': float(reserve['totalBorrows']),
                'utilization': float(reserve['utilizationRate']) / 1e27
            }
        
        return {}
    
    def get_lido_staking_stats(self) -> Dict:
        """取得 Lido 質押統計"""
        query = """
        query GetLidoStats {
            lidoExtractors {
                id
                totalStaked
                totalRewards
                stETH Holders
            }
            validators(where: { status: "Active" }) {
                count
            }
        }
        """
        
        data = self.query(query)
        
        return {
            'total_staked': data['lidoExtractors'][0]['totalStaked'],
            'total_rewards': data['lidoExtractors'][0]['totalRewards'],
            'active_validators': data['validators'][0]['count']
        }


# 使用範例
def the_graph_example():
    # Uniswap V3 查詢
    client = TheGraphClient('uniswap_v3')
    
    pools = client.get_uniswap_top_pools(5)
    print("Uniswap V3 熱門池子:")
    for pool in pools:
        print(f"  {pool.token0_symbol}/{pool.token1_symbol}: "
              f"TVL ${pool.tvl:,.0f}")
    
    # Aave V3 利率查詢
    aave_client = TheGraphClient('aave_v3')
    rates = aave_client.get_aave_supply_borrow_rates('ETH')
    
    print("\nAave ETH 利率:")
    print(f"  存款利率: {rates['supply_rate']:.2%}")
    print(f"  浮動借款利率: {rates['variable_borrow_rate']:.2%}")
    print(f"  利用率: {rates['utilization']:.1%}")

五、DeFi 協議即時數據整合

5.1 多協議數據聚合器

以下是一個整合多個 DeFi 協議數據的完整範例:

from typing import Dict, List, Optional
from dataclasses import dataclass
from decimal import Decimal
import asyncio
import aiohttp
from abc import ABC, abstractmethod

@dataclass
class ProtocolStats:
    """協議統計數據"""
    name: str
    tvl: Decimal
    daily_volume: Decimal
    total_users: int
    asset_count: int

@dataclass
class MarketData:
    """市場數據"""
    eth_price: Decimal
    gas_price_gwei: Decimal
    total_defi_tvl: Decimal
    timestamp: int

class ProtocolDataSource(ABC):
    """協議數據源抽象類"""
    
    @abstractmethod
    async def get_stats(self) -> ProtocolStats:
        pass

class AaveDataSource(ProtocolDataSource):
    """Aave V3 數據源"""
    
    def __init__(self, rpc_url: str):
        self.rpc_url = rpc_url
    
    async def get_stats(self) -> ProtocolStats:
        # Aave V3 Pool 合約地址
        pool_address = "0x87870Bca3F3fD6335C3FbdC83E7a82f43aa5B2"
        
        # 透過 RPC 查詢 TVL
        # 這裡使用 The Graph 獲取更完整的數據
        async with aiohttp.ClientSession() as session:
            query = """
            {
                reserves {
                    totalDeposits
                    totalBorrows
                }
            }
            """
            
            async with session.post(
                'https://api.thegraph.com/subgraphs/name/aave/aave-v3-ethereum',
                json={'query': query}
            ) as resp:
                data = await resp.json()
        
        total_deposits = sum(
            float(r['totalDeposits']) for r in data['data']['reserves']
        )
        
        return ProtocolStats(
            name='Aave V3',
            tvl=Decimal(str(total_deposits)),
            daily_volume=Decimal('0'),  # 需要歷史數據
            total_users=0,
            asset_count=len(data['data']['reserves'])
        )

class UniswapDataSource(ProtocolDataSource):
    """Uniswap 數據源"""
    
    async def get_stats(self) -> ProtocolStats:
        async with aiohttp.ClientSession() as session:
            query = """
            {
                factories(first: 1) {
                    totalValueLockedUSD
                    totalVolumeUSD
                    poolCount
                }
            }
            """
            
            async with session.post(
                'https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v3',
                json={'query': query}
            ) as resp:
                data = await resp.json()
        
        factory = data['data']['factories'][0]
        
        return ProtocolStats(
            name='Uniswap V3',
            tvl=Decimal(factory['totalValueLockedUSD']),
            daily_volume=Decimal(factory['totalVolumeUSD']),
            total_users=0,
            asset_count=int(factory['poolCount'])
        )

class DefiAggregator:
    """DeFi 數據聚合器"""
    
    def __init__(self, rpc_url: str):
        self.protocols: List[ProtocolDataSource] = [
            AaveDataSource(rpc_url),
            UniswapDataSource(rpc_url),
        ]
        self.price_api = None  # 初始化價格 API
    
    async def get_all_protocol_stats(self) -> List[ProtocolStats]:
        """取得所有協議統計"""
        tasks = [p.get_stats() for p in self.protocols]
        return await asyncio.gather(*tasks)
    
    async def get_market_overview(self) -> MarketData:
        """取得市場總覽"""
        # 並行獲取各項數據
        price_task = self._get_eth_price()
        gas_task = self._get_gas_price()
        tvl_task = self._get_total_defi_tvl()
        
        eth_price, gas_price, total_tvl = await asyncio.gather(
            price_task, gas_task, tvl_task
        )
        
        return MarketData(
            eth_price=eth_price,
            gas_price_gwei=gas_price,
            total_defi_tvl=total_tvl,
            timestamp=int(asyncio.get_event_loop().time())
        )
    
    async def _get_eth_price(self) -> Decimal:
        """取得 ETH 價格"""
        async with aiohttp.ClientSession() as session:
            async with session.get(
                'https://api.coingecko.com/api/v3/simple/price'
                '?ids=ethereum&vs_currencies=usd'
            ) as resp:
                data = await resp.json()
        
        return Decimal(str(data['ethereum']['usd']))
    
    async def _get_gas_price(self) -> Decimal:
        """取得 Gas 價格"""
        async with aiohttp.ClientSession() as session:
            # 使用 ETH Gas Station
            async with session.get(
                'https://ethgasstation.info/api/ethgasAPI.json'
            ) as resp:
                data = await resp.json()
        
        return Decimal(data['average'])
    
    async def _get_total_defi_tvl(self) -> Decimal:
        """計算總 DeFi TVL"""
        # 聚合多個協議的 TVL
        stats = await self.get_all_protocol_stats()
        
        return sum(s.tvl for s in stats)


# 使用範例
async def defi_aggregator_example():
    aggregator = DefiAggregator("https://mainnet.infura.io/v3/YOUR_KEY")
    
    # 取得市場總覽
    market = await aggregator.get_market_overview()
    print(f"市場總覽:")
    print(f"  ETH 價格: ${market.eth_price}")
    print(f"  Gas 價格: {market.gas_price_gwei} Gwei")
    print(f"  總 DeFi TVL: ${market.total_defi_tvl:,.0f}")
    
    # 取得各協議數據
    protocol_stats = await aggregator.get_all_protocol_stats()
    
    print(f"\n協議 TVL:")
    for stats in protocol_stats:
        print(f"  {stats.name}: ${stats.tvl:,.0f}")

六、實戰:構建即時數據儀表板

6.1 架構設計

一個完整的即時數據儀表板需要整合多種數據源:

┌─────────────────────────────────────────────────────────────────┐
│                        數據層                                    │
├────────────────┬─────────────────┬────────────────────────────┤
│  區塊鏈 RPC    │  價格 API       │  The Graph                 │
│  (Infura)     │  (CoinGecko)   │  (Subgraphs)               │
└───────┬────────┴────────┬────────┴────────────┬───────────────┘
        │                 │                     │
        ▼                 ▼                     ▼
┌─────────────────────────────────────────────────────────────────┐
│                      API Gateway                                 │
│                  (速率限制、緩存、聚合)                           │
└───────────────────────────┬─────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────────────┐
│                     應用層                                        │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
│  │  價格監控    │  │  Gas 追蹤   │  │  DeFi 儀表板 │              │
│  │  服務       │  │  服務       │  │  服務        │              │
│  └─────────────┘  └─────────────┘  └─────────────┘              │
└───────────────────────────┬─────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────────────┐
│                      展示層                                        │
│              (Web UI / API / WebSocket)                         │
└─────────────────────────────────────────────────────────────────┘

6.2 後端服務實現

from fastapi import FastAPI, HTTPException, WebSocket, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Dict, Optional
import asyncio
import json
from datetime import datetime

app = FastAPI(title="以太坊即時數據 API")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 數據緩存
_cache = {
    'eth_price': {'value': None, 'timestamp': 0},
    'gas_price': {'value': None, 'timestamp': 0},
    'market_data': {'value': None, 'timestamp': 0}
}
CACHE_TTL = 60  # 60 秒緩存

class PriceResponse(BaseModel):
    """價格響應模型"""
    eth_usd: float
    change_24h: float
    timestamp: str

class GasResponse(BaseModel):
    """Gas 費用響應"""
    slow: float
    standard: float
    fast: float
    base_fee: float
    timestamp: str

class MarketResponse(BaseModel):
    """市場數據響應"""
    eth_price: float
    gas_price: float
    total_defi_tvl: float
    top_pools: List[Dict]
    timestamp: str


def _is_cache_valid(key: str) -> bool:
    """檢查緩存是否有效"""
    if key not in _cache:
        return False
    
    age = datetime.now().timestamp() - _cache[key]['timestamp']
    return age < CACHE_TTL


@app.get("/api/price", response_model=PriceResponse)
async def get_price():
    """取得 ETH 價格"""
    if _is_cache_valid('eth_price'):
        return _cache['eth_price']['value']
    
    # 獲取新數據
    api = CoinGeckoAPI()
    price_data = api.get_eth_price()
    
    response        eth_usd=float(price_data = PriceResponse(
.usd),
        change_24h=float(price_data.usd_24h_change),
        timestamp=price_data.last_updated
    )
    
    _cache['eth_price'] = {
        'value': response,
        'timestamp': datetime.now().timestamp()
    }
    
    return response


@app.get("/api/gas", response_model=GasResponse)
async def get_gas():
    """取得 Gas 費用"""
    if _is_cache_valid('gas_price'):
        return _cache['gas_price']['value']
    
    tracker = GasTracker("https://mainnet.infura.io/v3/YOUR_KEY")
    gas_data = tracker.get_current_gas()
    
    response = GasResponse(
        slow=gas_data.slow / 1e9,
        standard=gas_data.standard / 1e9,
        fast=gas_data.fast / 1e9,
        base_fee=gas_data.base_fee / 1e9,
        timestamp=datetime.now().isoformat()
    )
    
    _cache['gas_price'] = {
        'value': response,
        'timestamp': datetime.now().timestamp()
    }
    
    return response


@app.get("/api/market", response_model=MarketResponse)
async def get_market():
    """取得市場綜合數據"""
    if _is_cache_valid('market_data'):
        return _cache['market_data']['value']
    
    # 獲取各項數據
    api = CoinGeckoAPI()
    eth_price = api.get_eth_price()
    
    tracker = GasTracker("https://mainnet.infura.io/v3/YOUR_KEY")
    gas = tracker.get_current_gas()
    
    # 獲取 DeFi TVL
    graph = TheGraphClient('uniswap_v3')
    pools = graph.get_uniswap_top_pools(5)
    
    response = MarketResponse(
        eth_price=float(eth_price.usd),
        gas_price=gas.standard / 1e9,
        total_defi_tvl=sum(p.tvl for p in pools) * 2,  # 估算
        top_pools=[
            {
                'pair': f"{p.token0_symbol}/{p.token1_symbol}",
                'tvl': p.tvl
            }
            for p in pools
        ],
        timestamp=datetime.now().isoformat()
    )
    
    _cache['market_data'] = {
        'value': response,
        'timestamp': datetime.now().timestamp()
    }
    
    return response


# WebSocket 即時更新
@app.websocket("/ws/price")
async def websocket_price(websocket: WebSocket):
    """WebSocket 即時價格更新"""
    await websocket.accept()
    
    try:
        while True:
            api = CoinGeckoAPI()
            price = api.get_eth_price()
            
            await websocket.send_json({
                'type': 'price',
                'data': {
                    'eth_usd': float(price.usd),
                    'change_24h': float(price.usd_24h_change),
                    'timestamp': price.last_updated
                }
            })
            
            await asyncio.sleep(10)  # 每 10 秒更新
            
    except Exception as e:
        await websocket.close()


# 後台任務:定時更新緩存
async def update_cache_periodically():
    """定時更新緩存"""
    while True:
        await asyncio.sleep(30)
        
        try:
            # 更新所有緩存
            api = CoinGeckoAPI()
            price_data = api.get_eth_price()
            
            _cache['eth_price'] = {
                'value': PriceResponse(
                    eth_usd=float(price_data.usd),
                    change_24h=float(price_data.usd_24h_change),
                    timestamp=price_data.last_updated
                ),
                'timestamp': datetime.now().timestamp()
            }
            
        except Exception as e:
            print(f"Cache update error: {e}")


@app.on_event("startup")
async def startup_event():
    """啟動時的背景任務"""
    asyncio.create_task(update_cache_periodically())


# 啟動服務
# uvicorn main:app --reload --port 8000

七、最佳實踐與效能優化

7.1 數據獲取策略

class OptimizedDataFetcher:
    """優化的數據獲取器"""
    
    def __init__(self):
        self.cache = {}
        self.rate_limiter = asyncio.Semaphore(10)
    
    async def fetch_with_cache(self, 
                               key: str, 
                               fetch_func, 
                               ttl: int = 60):
        """帶緩存的獲取"""
        now = asyncio.get_event_loop().time()
        
        if key in self.cache:
            data, timestamp = self.cache[key]
            if now - timestamp < ttl:
                return data
        
        # 獲取新數據
        async with self.rate_limiter:
            data = await fetch_func()
            self.cache[key] = (data, now)
            return data
    
    def batch_fetch(self, items: List[Dict], fetch_func) -> List:
        """批次獲取"""
        return asyncio.gather(*[
            fetch_func(item) for item in items
        ])

7.2 錯誤處理與重試

import asyncio
from functools import wraps

def retry_on_error(max_retries: int = 3, delay: float = 1.0):
    """錯誤重試裝飾器"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise
                    await asyncio.sleep(delay * (attempt + 1))
        return wrapper
    return decorator

@retry_on_error(max_retries=3, delay=1.0)
async def safe_api_call(api_func):
    """安全的 API 調用"""
    return await api_func()

結論

本指南涵蓋了以太坊即時數據獲取的各個面向,從基礎的 RPC 調用到複雜的多協議數據聚合。這些技術是構建高效以太坊應用的基礎,建議開發者根據實際需求選擇合適的數據源,並遵循以下原則:

  1. 選擇合適的數據源:免費 API 適合開發和測試,生產環境應考慮付費服務
  2. 實作緩存機制:減少 API 調用頻率,提昇回應速度
  3. 做好錯誤處理:網路請求總是可能失敗,需要完善的降級策略
  4. 遵守速率限制:尊重 API 提供商的使用限制,避免被封禁
  5. 保持數據新鮮:根據應用場景選擇合適的刷新頻率

透過這些實踐,可以構建出穩定、高效的以太坊數據應用。

延伸閱讀與來源

這篇文章對您有幫助嗎?

評論

發表評論

注意:由於這是靜態網站,您的評論將儲存在本地瀏覽器中,不會公開顯示。

目前尚無評論,成為第一個發表評論的人吧!