Python 实战:基于国际暗金,贵金属 API 开发实时行情监控系统

前言

在贵金属行情分析、量化交易、价格监控等场景中,稳定、低延迟的实时行情数据至关重要。本文基于 Python + 国际暗金贵金属 API,从零搭建一套完整可运行的实时行情监控系统,包含 WebSocket 长连接、心跳保活、自动断线重连、多品种订阅、数据解析与实时展示等功能,代码简洁稳定,可直接用于生产环境二次开发。


一、系统功能

本监控系统实现以下核心能力:

  1. WebSocket 长连接订阅,实时推送贵金属行情
  2. 多品种同时监控,支持批量订阅
  3. 10 秒心跳保活,避免连接断开
  4. 自动重连机制,7×24 小时稳定运行
  5. 实时解析最新价、开盘价、涨跌额、涨跌幅、买卖盘等
  6. 结构化输出,便于扩展入库、可视化、预警

二、环境准备

安装依赖:

pip install websockets requests

三、完整代码(直接运行)

import asyncio
import websockets
import json
import time

class MetalMarketMonitor:
    def __init__(self):
        # WebSocket 连接地址
        self.ws_url = "ws://39.107.99.235/ws"
        # 订阅品种(可自行修改/添加)
        self.subscribe_codes = "btcusdt,ethusdt,fx_sgbpusd"
        # 心跳间隔(秒)
        self.heartbeat_interval = 10

    def get_timestamp(self):
        """获取10位时间戳"""
        return int(time.time())

    def parse_market_data(self, data):
        """解析行情数据并输出监控信息"""
        try:
            body = data.get("body", {})
            if not body:
                return

            code = body.get("StockCode", "")
            price = body.get("Price", 0)
            open_price = body.get("Open", 0)
            last_close = body.get("LastClose", 0)
            high = body.get("High", 0)
            low = body.get("Low", 0)
            diff = body.get("Diff", 0)
            diff_rate = body.get("DiffRate", 0)
            time_str = body.get("Time", "")

            # 控制台输出监控信息
            print(f"===== 品种:{code} | 时间:{time_str} =====")
            print(f"最新价:{price}")
            print(f"开盘:{open_price} | 昨收:{last_close}")
            print(f"最高:{high} | 最低:{low}")
            print(f"涨跌:{diff:.2f} | 涨跌幅:{diff_rate:.2f}%")
            print("-" * 60)

        except Exception as e:
            pass

    async def heartbeat(self, websocket):
        """心跳保活任务"""
        while True:
            await asyncio.sleep(self.heartbeat_interval)
            ping_data = {"ping": self.get_timestamp()}
            await websocket.send(json.dumps(ping_data))

    async def run_monitor(self):
        """启动行情监控系统"""
        while True:
            try:
                async with websockets.connect(self.ws_url) as websocket:
                    print("✅ 行情监控已连接")

                    # 发送订阅
                    sub_msg = {"Key": self.subscribe_codes}
                    await websocket.send(json.dumps(sub_msg))
                    print(f"✅ 已订阅品种:{self.subscribe_codes}")

                    # 启动心跳
                    asyncio.create_task(self.heartbeat(websocket))

                    # 接收并解析数据
                    while True:
                        response = await websocket.recv()
                        data = json.loads(response)
                        self.parse_market_data(data)

            except Exception as e:
                print(f"❌ 连接断开,3秒后重连...")
                await asyncio.sleep(3)

if __name__ == "__main__":
    monitor = MetalMarketMonitor()
    asyncio.run(monitor.run_monitor())

四、使用说明

  1. 修改 self.subscribe_codes 可添加 / 删除监控品种
  2. 直接运行脚本即可启动实时监控
  3. 系统自动重连、自动心跳,无需人工维护
  4. 可扩展:价格预警、数据入库、前端看板、策略触发

五、注意事项

  1. 同一 IP 建议连接数不超过 6 个
  2. 避免重复订阅同一品种,减少资源占用
  3. 生产环境可使用 nohup 或 supervisor 托管运行

六、扩展方向

  • 价格预警:涨跌超阈值自动提醒
  • 数据持久化:存入 MySQL/Redis/InfluxDB
  • Web 看板:对接 Flask/Django 做可视化页面
  • 策略交易:结合行情实现自动化策略

总结

本文基于 Python 与国际暗金贵金属 API,实现了一套生产级实时行情监控系统。代码结构清晰、稳定可靠、易于扩展,可满足贵金属行情展示、量化分析、价格监控等多种业务场景,适合开发者快速落地使用。


相关资源:

官方文档和API参考:http://39.107.99.235:1008/market

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部