EPUSDT 后台接入 OKX 欧易实时汇率教程
先说明
这篇教程只做一件事:把 EPUSDT 的 CNY -> USDT 汇率改成 OKX 欧易实时汇率,并且让新订单直接按实时汇率出金额。
当前 v1.0.0 需要先记住这几点:
- 后台
汇率 API 地址不能直接填 OKX 官方地址,epusdt 只认{"cny":{"usdt":0.14636}}这种返回格式。 - 后台
强制 USDT 汇率现在已经可以直接填写并保存,而且当这个值> 0时,程序会优先使用它;只有它小于等于0时,才会回退到汇率 API 地址。 - 如果你只是手动填一个固定汇率,可以直接在后台改,不一定非要跑这篇教程里的本机服务。
- 这版脚本的意义不是“解锁灰色输入框”,而是自动抓 OKX 实时汇率,同时更新
rate.forced_usdt_rate,并补齐cny.json、usd.json和其它币种汇率,避免新订单长期吃固定值,也顺手解决TRON -> TRX切换时报系统错误的问题。 - 这套做法依赖服务器本机能访问
www.okx.com和cdn.jsdelivr.net,并且系统里有python3。这份脚本只用标准库,不需要额外pip install依赖。
下面这套做法适合想要长期自动同步 OKX 实时汇率的人:本机服务一边提供 epusdt 能识别的 JSON,一边自动登录 epusdt 后台 API,把 rate.forced_usdt_rate 同步成 OKX 实时值。
步骤 1:创建目录
在服务器里新建目录 /opt/epusdt-okx-rate,然后进入这个目录。
mkdir -p /opt/epusdt-okx-rate cd /opt/epusdt-okx-rate
步骤 2:新建 server.py
#!/usr/bin/env python3
import json
import logging
import os
import re
import sqlite3
import threading
import time
import urllib.parse
import urllib.request
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from typing import Dict, Optional, Tuple
HOST = "127.0.0.1"
PORT = 18089
CACHE_TTL_SECONDS = 10
REFRESH_INTERVAL_SECONDS = 10
EPUSDT_DB_PATH = os.getenv("EPUSDT_DB_PATH", "/www/wwwroot/epusdt/epusdt.db")
EPUSDT_ADMIN_API_BASE = os.getenv("EPUSDT_ADMIN_API_BASE", "http://127.0.0.1:8000/admin/api/v1")
EPUSDT_ADMIN_USERNAME = os.getenv("EPUSDT_ADMIN_USERNAME", "")
EPUSDT_ADMIN_PASSWORD = os.getenv("EPUSDT_ADMIN_PASSWORD", "")
USER_AGENT = (
"Mozilla/5.0 (X11; Linux x86_64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/135.0.0.0 Safari/537.36"
)
OKX_EXCHANGE_URL = "https://www.okx.com/zh-hans/exchange/usdt-to-cny"
OKX_CONVERT_URL = "https://www.okx.com/zh-hans/convert/cny-to-usdt"
OKX_QUOTED_PRICE_URL = (
"https://www.okx.com/priapi/v3/b2c/deposit/quotedPrice"
"?baseCurrency=USDT"eCurrency=CNY"
)
PUBLIC_CURRENCY_API_URL = (
"https://cdn.jsdelivr.net/npm/@fawazahmed0/currency-api@latest/v1/currencies/{base}.json"
)
PRODUCT_PRICE_RE = re.compile(r'"price":"([0-9]+(?:\.[0-9]+)?)","priceCurrency":"CNY"')
DIRECT_RATE_RE_LIST = (
re.compile(r'当前 1 CNY 可兑换 ([0-9]+(?:\.[0-9]+)?) USDT'),
re.compile(r'CNY/USDT 今天的兑换率为 ([0-9]+(?:\.[0-9]+)?) USDT'),
)
_cache_lock = threading.Lock()
_cache = {"ts": 0.0, "rate": None, "source": None}
_public_cache_lock = threading.Lock()
_public_cache: Dict[str, dict] = {}
_admin_token_lock = threading.Lock()
_admin_token: Optional[str] = None
def format_float(value: float) -> str:
return f"{value:.8f}".rstrip("0").rstrip(".")
def sync_forced_usdt_rate_via_db(value: str, source: str, cny_to_usdt_rate: float) -> None:
conn = sqlite3.connect(EPUSDT_DB_PATH, timeout=5)
try:
conn.execute("PRAGMA busy_timeout = 5000")
conn.execute(
"""
INSERT INTO settings
("group", "key", "value", "type", "description", "created_at", "updated_at", "deleted_at")
VALUES
('rate', 'rate.forced_usdt_rate', ?, 'string', '强制USDT汇率', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, NULL)
ON CONFLICT("key") DO UPDATE SET
"value" = excluded."value",
"type" = 'string',
"description" = '强制USDT汇率',
"updated_at" = CURRENT_TIMESTAMP,
"deleted_at" = NULL
""",
(value,),
)
conn.commit()
logging.warning(
"fallback synced rate.forced_usdt_rate=%s via sqlite from %s (cny->usdt=%s)",
value,
source,
format_float(cny_to_usdt_rate),
)
finally:
conn.close()
def http_get_text(url: str, timeout: int = 8) -> str:
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read().decode("utf-8", errors="ignore")
def fetch_public_currency_payload(base: str) -> dict:
payload = json.loads(http_get_text(PUBLIC_CURRENCY_API_URL.format(base=base)))
data = payload.get(base)
if not isinstance(data, dict):
raise ValueError(f"invalid currency payload for {base}")
return data
def get_public_currency_payload(base: str, force_refresh: bool = False) -> dict:
now = time.time()
with _public_cache_lock:
cached = _public_cache.get(base)
if (
cached is not None
and not force_refresh
and now - cached["ts"] < CACHE_TTL_SECONDS
):
return dict(cached["data"])
data = fetch_public_currency_payload(base)
with _public_cache_lock:
_public_cache[base] = {"ts": now, "data": dict(data)}
return dict(data)
def http_request_json(
url: str,
method: str = "GET",
payload: Optional[dict] = None,
headers: Optional[Dict[str, str]] = None,
timeout: int = 8,
) -> dict:
body = None
request_headers = {"User-Agent": USER_AGENT}
if headers:
request_headers.update(headers)
if payload is not None:
body = json.dumps(payload, ensure_ascii=True, separators=(",", ":")).encode()
request_headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=body, headers=request_headers, method=method)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
def get_admin_token(force_refresh: bool = False) -> str:
global _admin_token
if not EPUSDT_ADMIN_USERNAME or not EPUSDT_ADMIN_PASSWORD:
raise ValueError("missing EPUSDT_ADMIN_USERNAME / EPUSDT_ADMIN_PASSWORD")
with _admin_token_lock:
if _admin_token and not force_refresh:
return _admin_token
payload = {
"username": EPUSDT_ADMIN_USERNAME,
"password": EPUSDT_ADMIN_PASSWORD,
}
resp = http_request_json(
f"{EPUSDT_ADMIN_API_BASE}/auth/login",
method="POST",
payload=payload,
)
token = ((resp.get("data") or {}).get("token") or "").strip()
if not token:
raise ValueError(f"admin login failed: {resp}")
_admin_token = token
return token
def sync_forced_usdt_rate(cny_to_usdt_rate: float, source: str) -> None:
if cny_to_usdt_rate <= 0:
raise ValueError("invalid cny->usdt rate")
usdt_to_cny_rate = 1.0 / cny_to_usdt_rate
value = format_float(usdt_to_cny_rate)
payload = {
"items": [
{
"group": "rate",
"key": "rate.forced_usdt_rate",
"value": value,
"type": "string",
}
]
}
for force_refresh in (False, True):
try:
token = get_admin_token(force_refresh=force_refresh)
resp = http_request_json(
f"{EPUSDT_ADMIN_API_BASE}/settings",
method="PUT",
payload=payload,
headers={"Authorization": f"Bearer {token}"},
)
results = resp.get("data") or []
if not results or not results[0].get("ok"):
raise ValueError(f"settings upsert failed: {resp}")
logging.info(
"synced rate.forced_usdt_rate=%s via admin api from %s (cny->usdt=%s)",
value,
source,
format_float(cny_to_usdt_rate),
)
return
except Exception: # noqa: BLE001
if force_refresh:
logging.exception("sync via admin api failed, falling back to sqlite")
sync_forced_usdt_rate_via_db(value, source, cny_to_usdt_rate)
def fetch_rate_from_exchange_page() -> Tuple[float, str]:
html = http_get_text(OKX_EXCHANGE_URL)
match = PRODUCT_PRICE_RE.search(html)
if match:
price_cny_per_usdt = float(match.group(1))
if price_cny_per_usdt > 0:
return round(1.0 / price_cny_per_usdt, 12), "okx-exchange-page"
for regex in DIRECT_RATE_RE_LIST:
match = regex.search(html)
if match:
return round(float(match.group(1)), 12), "okx-exchange-page-faq"
raise ValueError("failed to parse OKX exchange page")
def fetch_rate_from_convert_page() -> Tuple[float, str]:
html = http_get_text(OKX_CONVERT_URL)
match = PRODUCT_PRICE_RE.search(html)
if match:
price_cny_per_usdt = float(match.group(1))
if price_cny_per_usdt > 0:
return round(1.0 / price_cny_per_usdt, 12), "okx-convert-page"
for regex in DIRECT_RATE_RE_LIST:
match = regex.search(html)
if match:
return round(float(match.group(1)), 12), "okx-convert-page-faq"
raise ValueError("failed to parse OKX convert page")
def fetch_rate_from_quote_api() -> Tuple[float, str]:
payload = json.loads(http_get_text(OKX_QUOTED_PRICE_URL))
items = payload.get("data") or []
if not items:
raise ValueError("quotedPrice returned empty data")
price_cny_per_usdt = float(items[0]["price"])
if price_cny_per_usdt <= 0:
raise ValueError("quotedPrice returned invalid price")
return round(1.0 / price_cny_per_usdt, 12), "okx-quotedPrice"
def fetch_okx_cny_to_usdt_rate() -> Tuple[float, str]:
errors = []
for fetcher in (
fetch_rate_from_exchange_page,
fetch_rate_from_convert_page,
fetch_rate_from_quote_api,
):
try:
return fetcher()
except Exception as exc: # noqa: BLE001
errors.append(f"{fetcher.__name__}: {exc}")
raise RuntimeError("; ".join(errors))
def get_okx_cny_to_usdt_rate(force_refresh: bool = False) -> Tuple[float, str]:
now = time.time()
with _cache_lock:
if (
not force_refresh
and _cache["rate"] is not None
and now - _cache["ts"] < CACHE_TTL_SECONDS
):
return _cache["rate"], _cache["source"]
rate, source = fetch_okx_cny_to_usdt_rate()
sync_forced_usdt_rate(rate, source)
_cache["ts"] = now
_cache["rate"] = rate
_cache["source"] = source
return rate, source
def refresh_loop() -> None:
while True:
try:
get_okx_cny_to_usdt_rate(force_refresh=True)
except Exception: # noqa: BLE001
logging.exception("background refresh failed")
time.sleep(REFRESH_INTERVAL_SECONDS)
class Handler(BaseHTTPRequestHandler):
server_version = "epusdt-okx-rate/1.0"
def do_GET(self) -> None: # noqa: N802
parsed = urllib.parse.urlparse(self.path)
force_refresh = urllib.parse.parse_qs(parsed.query).get("refresh") == ["1"]
if parsed.path == "/healthz":
self.write_json(200, {"ok": True})
return
if parsed.path == "/":
self.write_json(
200,
{
"service": "epusdt okx rate adapter",
"usage": {
"cny": "GET /cny.json",
"healthz": "GET /healthz",
},
},
)
return
if not parsed.path.endswith(".json"):
self.write_json(404, {"error": "not found"})
return
base = parsed.path.rsplit("/", 1)[-1][:-5].strip().lower()
if not base:
self.write_json(404, {"error": "not found"})
return
try:
if base == "cny":
rate, source = get_okx_cny_to_usdt_rate(force_refresh=force_refresh)
data = get_public_currency_payload("cny", force_refresh=force_refresh)
# Keep USDT pegged to the OKX-derived rate used by epusdt.
data["usdt"] = rate
self.write_json(200, {"cny": data}, {"X-Rate-Source": source})
return
if base == "usd":
data = get_public_currency_payload("usd", force_refresh=force_refresh)
data["usdt"] = 1
self.write_json(200, {"usd": data})
return
self.write_json(200, {base: get_public_currency_payload(base, force_refresh=force_refresh)})
except Exception as exc: # noqa: BLE001
logging.exception("failed to serve %s", parsed.path)
self.write_json(500, {"error": str(exc)})
def log_message(self, fmt: str, *args) -> None:
logging.info("%s - %s", self.address_string(), fmt % args)
def write_json(
self, status: int, payload: dict, extra_headers: Optional[Dict[str, str]] = None
) -> None:
body = json.dumps(payload, ensure_ascii=True, separators=(",", ":")).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.send_header("Cache-Control", "no-store")
if extra_headers:
for key, value in extra_headers.items():
self.send_header(key, value)
self.end_headers()
self.wfile.write(body)
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
daemon_threads = True
def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
threading.Thread(target=refresh_loop, daemon=True).start()
server = ThreadingHTTPServer((HOST, PORT), Handler)
logging.info("listening on http://%s:%s", HOST, PORT)
server.serve_forever()
if __name__ == "__main__":
main()
步骤 3:新建 systemd 服务文件
新建文件 /etc/systemd/system/epusdt-okx-rate.service,把下面内容整段复制进去保存。
[Unit] Description=EPUSDT OKX Rate Adapter After=network-online.target Wants=network-online.target [Service] Type=simple WorkingDirectory=/opt/epusdt-okx-rate Environment=PYTHONUNBUFFERED=1 Environment=EPUSDT_DB_PATH=/www/wwwroot/epusdt/epusdt.db Environment=EPUSDT_ADMIN_API_BASE=http://127.0.0.1:8000/admin/api/v1 Environment=EPUSDT_ADMIN_USERNAME=你的后台账号 Environment=EPUSDT_ADMIN_PASSWORD=你的后台密码 ExecStart=/usr/bin/python3 /opt/epusdt-okx-rate/server.py Restart=always RestartSec=2 [Install] WantedBy=multi-user.target
EPUSDT_DB_PATH 默认写的是 /www/wwwroot/epusdt/epusdt.db。如果你的 epusdt 不是部署在这个目录,或者数据库文件名改过,把上面这行环境变量改成你自己的实际数据库路径。
EPUSDT_ADMIN_API_BASE 填 epusdt 程序本机实际监听的后台 API 地址,常见是 http://127.0.0.1:8000/admin/api/v1。如果你的 epusdt 不是这个端口,就改成你自己的本机地址。
EPUSDT_ADMIN_USERNAME 和 EPUSDT_ADMIN_PASSWORD 改成你自己的 epusdt 后台登录账号密码。
步骤 4:启动服务并先检查自动同步
文件保存好后,再执行下面命令。
systemctl daemon-reload systemctl enable --now epusdt-okx-rate.service curl http://127.0.0.1:18089/cny.json curl http://127.0.0.1:18089/trx.json journalctl -u epusdt-okx-rate.service -n 20 --no-pager
这里必须同时满足下面几个结果再继续:
curl http://127.0.0.1:18089/cny.json要返回包含usdt的 JSONcurl http://127.0.0.1:18089/trx.json不能是空对象- 日志里要看到
synced rate.forced_usdt_rate=...,最好是带via admin api
如果日志里只有 fallback synced ... via sqlite 或者出现 admin login failed,先检查上一步的后台 API 地址、账号、密码,不要继续往下测。这里的 fallback via sqlite 只能说明脚本把值写进了数据库,不代表运行中的 epusdt 一定已经立即吃到新汇率;这篇教程还是以 via admin api 成功为准。
步骤 5:进入 epusdt 后台填写汇率地址
登录 epusdt 后台,打开 系统配置 -> 汇率配置,把 汇率 API 地址 改成 http://127.0.0.1:18089/ 并保存。

新版 epusdt 后台已经支持手动填写 强制 USDT 汇率。
- 如果你只是想临时写死一个固定汇率,可以直接在后台手动填写并保存。
- 如果你是按本教程部署这套本机服务,就不要再手动长期维护这个值。上面的本机服务会通过后台 API 自动同步;如果你手动填了固定值,后续也可能被自动同步覆盖。
- 如果你想单独测试
汇率 API 地址是否生效,先把强制 USDT 汇率改成0再保存。只要这个值大于0,程序就会优先使用它,不会走 API。
不要把 OKX 官方接口原地址直接填到 汇率 API 地址 这里,这个版本的 epusdt 识别不了官方返回格式。
步骤 6:确认新订单已经按实时汇率出金额
正常情况下,这套做法不需要手动反复改 强制 USDT 汇率,也不用再靠重启 epusdt 去吃数据库值。前面的服务会直接通过后台 API 同步运行中的汇率配置。
最后按下面规则检查:
- 重新新建一笔订单测试,不要看旧订单。旧订单金额不会自动变。
- 订单金额要能跟着当前 OKX 实时汇率变化。
- 如果刚好有未支付旧单占用了同一个金额,epusdt 会自动加
0.01防撞单,这是正常现象,不是汇率错了。 - 切到
TRON -> TRX不应再报系统错误。
如果你按完上面步骤后,日志里已经出现 via admin api,curl http://127.0.0.1:18089/trx.json 也不是空对象,并且新订单金额能跟着 OKX 实时变化,这篇教程就完成了。