嵌入式开发——4G模块使用场景(btc_ring例子)

时间:2024-11-05 07:11:15

之前提到的

ESP32设备——监测股票交易价格拨打电话提醒(后附代码)_esp32获取股票数据-****博客,我又写了一个新的例子来加深对产品的介绍。

通过请求API(每5分钟请求一次),获取数据,查看资金费率'fundingRate'负数或大于等于0.0006时拨打电话。


步骤也比较简单

给这个板子上传完代码它就自动运行了

相关代码如下

{
    "wifi_ssid": "Mi-------.4G",
    "wifi_password": "76-------0",
    "phone_number": "电话号",
    "url": "https://43.199.52.88/api/v5/public/funding-rate?instId=BTC-USD-SWAP",
    "domain_name" : "aws.okx.com"
}
# main.py 监测资金费率

import requests  # 导入requests库,用于发送HTTP请求
import json  # 导入json库,用于处理JSON数据
import time  # 导入time库,用于时间相关操作
from machine import UART  # 从machine模块导入UART类,用于串口通信
from wificonnect import wifi_connect  # 导入wificonnect模块的wifi_connect函数

# 初始化UART串口,用于与电话模块通信
uart1 = UART(2, baudrate=115200, tx=16, rx=17, timeout=10)

# 加载配置文件
def load_config():
    with open('config.json', 'r') as f:
        return json.load(f)

# 读取配置文件内容
config = load_config()
PHONE_NUMBER = config['phone_number']  # 从配置文件中获取电话号码
URL = config['url']  # 从配置文件中获取股票API的URL
domain_name = config['domain_name']  # 定义目标服务器要求的域名

# 发送命令到电话模块的函数
def send_cmd(command):
    print(f"Sending command: {command}")
    uart1.write(command + '\r\n')  # 通过串口发送命令

# 测试电话模块是否响应的函数
def ping():
    send_cmd('AT')  # 发送AT命令,用于测试电话模块是否在线

# 拨打电话的函数
def call_phone(phnum=PHONE_NUMBER):
    if len(phnum) != 11:
        print("Wrong phone number, not 11 digits long")
        return
    print(f"Making a call to: {phnum}")
    send_cmd(f'ATD{phnum};')  # 发送拨号命令
    print("Waiting for the phone to ring...")

    start_time = time.time()  # 记录开始拨打电话的时间

    # 等待电话响铃
    while time.time() - start_time < 30:
        line = uart1.read().decode('utf-8').strip()
        if "RING" in line:
            print("Phone is ringing...")
            ring_start_time = time.time()  # 记录电话响铃开始的时间
            break
        time.sleep(0.1)  # 短暂等待,避免过度占用CPU

    # 检查是否在30秒内响铃
    if "RING" in line:
        ring_end_time = time.time()
        ring_duration = ring_end_time - ring_start_time
        print(f"Phone rang for {ring_duration:.2f} seconds")
    else:
        print("Phone did not ring within 30 seconds.")

    # 等待30秒后挂断电话
    time.sleep(max(30 - (time.time() - start_time), 0))  # 确保至少等待30秒
    off_call()  # 挂断电话
    print("Call ended after 30 seconds of ringing.")

# 挂断电话的函数
def off_call():
    send_cmd('AT+CHUP')  # 发送挂断命令
    print("Phone call ended")

# 获取资金费率的函数
def get_funding_rate():
    headers = {
        'Host': domain_name  # 设置请求头中的 Host 字段为目标域名
    }
    
    response = requests.get(URL, headers=headers)  # 发送 GET 请求到指定的 URL,并附加自定义头部

    print(f"Status Code: {response.status_code}")  # 打印响应的状态码
    print(f"Response Text: {response.text}")  # 打印响应的文本内容
    print('-' * 10)

    if response.status_code == 200:
        data = response.json()  # 将响应文本转换为 JSON 对象
        if data.get("code") == "0" and data.get("data"):
            funding_rate = float(data["data"][0]["fundingRate"])  # 从 JSON 数据中提取 fundingRate
            return funding_rate
    return None

# 监控资金费率的函数
def monitor_funding_rate():
    while True:
        funding_rate = get_funding_rate()  # 请求 funding rate
        if funding_rate is not None:
            print(f"Funding Rate: {funding_rate}")  # 打印 fundingRate 的值
            if funding_rate < 0 or funding_rate >= 0.0006:
                call_phone()  # 如果 fundingRate 负数或>= 0.0006,拨打电话
        else:
            print("Failed to retrieve funding rate.")  # 处理获取失败的情况
        
        time.sleep(300)  # 暂停 5 分钟(300 秒)

# 主程序入口
if __name__ == "__main__":
    print("Connecting to Wi-Fi...")  # 打印连接Wi-Fi的提示信息
    if wifi_connect():  # 尝试连接Wi-Fi
        print("Wi-Fi connected. Monitoring funding rates...")  # Wi-Fi连接成功,开始监控资金费率
        ping()  # 测试电话模块是否在线
        monitor_funding_rate()  # 开始监控资金费率
    else:
        print("Wi-Fi connection failed.")  # Wi-Fi连接失败
#联网
# wificonnect.py


import network
import time
import json

def load_config():
    with open('config.json', 'r') as f:
        return json.load(f)

def wifi_connect():
    config = load_config()
    ssid = config['wifi_ssid']
    password = config['wifi_password']

    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)

    if not wlan.isconnected():
        print(f'Connecting to network {ssid}...')
        wlan.connect(ssid, password)
        start_time = time.time()

        while not wlan.isconnected():
            if time.time() - start_time > 10:
                print("WiFi connection timeout")
                return False
            time.sleep(1)

    print('Network connected:', wlan.ifconfig())
    return True

if __name__ == "__main__":
    wifi_connect()