"wifi_ssid": "Mi-------.4G",
"wifi_password": "76-------0",
"phone_number": "电话号",
"url": "",
"domain_name" : "aws.okx.com"
# main.py 监测资金费率
import requests # 导入requests库,用于发送HTTP请求
import json # 导入json库,用于处理JSON数据
import time # 导入time库,用于时间相关操作
from machine import UART # 从machine模块导入UART类,用于串口通信
from wificonnect import wifi_connect # 导入wificonnect模块的wifi_connect函数
# 初始化UART串口,用于与电话模块通信
uart1 = UART(2, baudrate=115200, tx=16, rx=17, timeout=10)
# 加载配置文件
def load_config():
with open('config.json', 'r') as f:
return json.load(f)
# 读取配置文件内容
config = load_config()
PHONE_NUMBER = config['phone_number'] # 从配置文件中获取电话号码
URL = config['url'] # 从配置文件中获取股票API的URL
domain_name = config['domain_name'] # 定义目标服务器要求的域名
# 发送命令到电话模块的函数
def send_cmd(command):
print(f"Sending command: {command}")
uart1.write(command + '\r\n') # 通过串口发送命令
# 测试电话模块是否响应的函数
def ping():
send_cmd('AT') # 发送AT命令,用于测试电话模块是否在线
# 拨打电话的函数
def call_phone(phnum=PHONE_NUMBER):
if len(phnum) != 11:
print("Wrong phone number, not 11 digits long")
print(f"Making a call to: {phnum}")
send_cmd(f'ATD{phnum};') # 发送拨号命令
print("Waiting for the phone to ring...")
start_time = time.time() # 记录开始拨打电话的时间
# 等待电话响铃
while time.time() - start_time < 30:
line = uart1.read().decode('utf-8').strip()
if "RING" in line:
print("Phone is ringing...")
ring_start_time = time.time() # 记录电话响铃开始的时间
time.sleep(0.1) # 短暂等待,避免过度占用CPU
# 检查是否在30秒内响铃
if "RING" in line:
ring_end_time = time.time()
ring_duration = ring_end_time - ring_start_time
print(f"Phone rang for {ring_duration:.2f} seconds")
print("Phone did not ring within 30 seconds.")
# 等待30秒后挂断电话
time.sleep(max(30 - (time.time() - start_time), 0)) # 确保至少等待30秒
off_call() # 挂断电话
print("Call ended after 30 seconds of ringing.")
# 挂断电话的函数
def off_call():
send_cmd('AT+CHUP') # 发送挂断命令
print("Phone call ended")
# 获取资金费率的函数
def get_funding_rate():
headers = {
'Host': domain_name # 设置请求头中的 Host 字段为目标域名
response = requests.get(URL, headers=headers) # 发送 GET 请求到指定的 URL,并附加自定义头部
print(f"Status Code: {response.status_code}") # 打印响应的状态码
print(f"Response Text: {response.text}") # 打印响应的文本内容
print('-' * 10)
if response.status_code == 200:
data = response.json() # 将响应文本转换为 JSON 对象
if data.get("code") == "0" and data.get("data"):
funding_rate = float(data["data"][0]["fundingRate"]) # 从 JSON 数据中提取 fundingRate
return funding_rate
return None
# 监控资金费率的函数
def monitor_funding_rate():
while True:
funding_rate = get_funding_rate() # 请求 funding rate
if funding_rate is not None:
print(f"Funding Rate: {funding_rate}") # 打印 fundingRate 的值
if funding_rate < 0 or funding_rate >= 0.0006:
call_phone() # 如果 fundingRate 负数或>= 0.0006,拨打电话
print("Failed to retrieve funding rate.") # 处理获取失败的情况
time.sleep(300) # 暂停 5 分钟(300 秒)
# 主程序入口
if __name__ == "__main__":
print("Connecting to Wi-Fi...") # 打印连接Wi-Fi的提示信息
if wifi_connect(): # 尝试连接Wi-Fi
print("Wi-Fi connected. Monitoring funding rates...") # Wi-Fi连接成功,开始监控资金费率
ping() # 测试电话模块是否在线
monitor_funding_rate() # 开始监控资金费率
print("Wi-Fi connection failed.") # Wi-Fi连接失败
# wificonnect.py
import network
import time
import json
def load_config():
with open('config.json', 'r') as f:
return json.load(f)
def wifi_connect():
config = load_config()
ssid = config['wifi_ssid']
password = config['wifi_password']
wlan = network.WLAN(network.STA_IF)
if not wlan.isconnected():
print(f'Connecting to network {ssid}...')
wlan.connect(ssid, password)
start_time = time.time()
while not wlan.isconnected():
if time.time() - start_time > 10:
print("WiFi connection timeout")
return False
print('Network connected:', wlan.ifconfig())
return True
if __name__ == "__main__":