前言
在贵金属行情分析、量化交易、价格监控等场景中,稳定、低延迟的实时行情数据至关重要。本文基于 Python + 国际暗金贵金属 API,从零搭建一套完整可运行的实时行情监控系统,包含 WebSocket 长连接、心跳保活、自动断线重连、多品种订阅、数据解析与实时展示等功能,代码简洁稳定,可直接用于生产环境二次开发。
一、系统功能
本监控系统实现以下核心能力:
- WebSocket 长连接订阅,实时推送贵金属行情
- 多品种同时监控,支持批量订阅
- 10 秒心跳保活,避免连接断开
- 自动重连机制,7×24 小时稳定运行
- 实时解析最新价、开盘价、涨跌额、涨跌幅、买卖盘等
- 结构化输出,便于扩展入库、可视化、预警
二、环境准备
安装依赖:
pip install websockets requests
三、完整代码(直接运行)
import asyncio
import websockets
import json
import time
class MetalMarketMonitor:
def __init__(self):
# WebSocket 连接地址
self.ws_url = "ws://39.107.99.235/ws"
# 订阅品种(可自行修改/添加)
self.subscribe_codes = "btcusdt,ethusdt,fx_sgbpusd"
# 心跳间隔(秒)
self.heartbeat_interval = 10
def get_timestamp(self):
"""获取10位时间戳"""
return int(time.time())
def parse_market_data(self, data):
"""解析行情数据并输出监控信息"""
try:
body = data.get("body", {})
if not body:
return
code = body.get("StockCode", "")
price = body.get("Price", 0)
open_price = body.get("Open", 0)
last_close = body.get("LastClose", 0)
high = body.get("High", 0)
low = body.get("Low", 0)
diff = body.get("Diff", 0)
diff_rate = body.get("DiffRate", 0)
time_str = body.get("Time", "")
# 控制台输出监控信息
print(f"===== 品种:{code} | 时间:{time_str} =====")
print(f"最新价:{price}")
print(f"开盘:{open_price} | 昨收:{last_close}")
print(f"最高:{high} | 最低:{low}")
print(f"涨跌:{diff:.2f} | 涨跌幅:{diff_rate:.2f}%")
print("-" * 60)
except Exception as e:
pass
async def heartbeat(self, websocket):
"""心跳保活任务"""
while True:
await asyncio.sleep(self.heartbeat_interval)
ping_data = {"ping": self.get_timestamp()}
await websocket.send(json.dumps(ping_data))
async def run_monitor(self):
"""启动行情监控系统"""
while True:
try:
async with websockets.connect(self.ws_url) as websocket:
print("✅ 行情监控已连接")
# 发送订阅
sub_msg = {"Key": self.subscribe_codes}
await websocket.send(json.dumps(sub_msg))
print(f"✅ 已订阅品种:{self.subscribe_codes}")
# 启动心跳
asyncio.create_task(self.heartbeat(websocket))
# 接收并解析数据
while True:
response = await websocket.recv()
data = json.loads(response)
self.parse_market_data(data)
except Exception as e:
print(f"❌ 连接断开,3秒后重连...")
await asyncio.sleep(3)
if __name__ == "__main__":
monitor = MetalMarketMonitor()
asyncio.run(monitor.run_monitor())
四、使用说明
- 修改
self.subscribe_codes可添加 / 删除监控品种 - 直接运行脚本即可启动实时监控
- 系统自动重连、自动心跳,无需人工维护
- 可扩展:价格预警、数据入库、前端看板、策略触发
五、注意事项
- 同一 IP 建议连接数不超过 6 个
- 避免重复订阅同一品种,减少资源占用
- 生产环境可使用 nohup 或 supervisor 托管运行
六、扩展方向
- 价格预警:涨跌超阈值自动提醒
- 数据持久化:存入 MySQL/Redis/InfluxDB
- Web 看板:对接 Flask/Django 做可视化页面
- 策略交易:结合行情实现自动化策略
总结
本文基于 Python 与国际暗金贵金属 API,实现了一套生产级实时行情监控系统。代码结构清晰、稳定可靠、易于扩展,可满足贵金属行情展示、量化分析、价格监控等多种业务场景,适合开发者快速落地使用。
相关资源:
官方文档和API参考:http://39.107.99.235:1008/market