python实现:从superset接口中读取数据,把数据以excel、pdf、图片、csv格式发送到企业微信群

时间:2024-03-23 14:22:26

python实现:从接口中读取数据,把数据以excel、pdf、图片、csv格式发送到企业微信群

接口文檔地址:https://developer.work.weixin.qq.com/document/path/99110

1.發送圖文

1.對接接口

import json
import time
import requests
import time_util

rbt_key = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=key"
headers = {'Content-Type': 'application/json; charset=UTF-8'}
text = '未拉OA提醒'

##toke
def get_superset_token():
    url = "http://hadoop102:8787/api/v1/security/login"
    request_param = {
        "password": "passwd",
        "provider": "db",
        "refresh": True,
        "username": "user"
    }
    headers = {
        "Content-Type": "application/json"
    }
    response = requests.post(url, data=json.dumps(request_param), headers=headers)
    return response.json()["access_token"]


def get_superset_data():
    access_token = get_superset_token()
    # target_date = time_util.convert_to_cst(time.time(), 'Asia/Shanghai').strftime('%Y-%m-%d')
    target_date = '2024-03-21'
    target_user = 'ALLEN.X'
    url = "http://hadoop102:8787/api/v1/chart/data"
    headers = {
        'Authorization': f'Bearer {access_token}',  # 添加正确格式的 Bearer Token
        'Content-Type': 'application/json'
    }
    request_param = {
        "datasource": {
            "id": 94,
            "type": "table"
        },
        "queries": [
            {
                "columns": [
                    "user_name",
                    "日程類型",
                    "标题"
                    , "日程开始时间",
                    "日程结束时间",
                    "是否属于全天日程",
                    "OA状态"
                ],
                "orderby": [
                    [
                        "user_name",
                        True
                    ]
                ],
                "row_limit": 100,
                "order_desc": True,
                "result_format": "string",
                "form_data": "string",
                "where": "user_name = '" + target_user + "' and start_date = '" + target_date + "'"

            }
        ]
    }
    response = requests.post(url, data=json.dumps(request_param), headers=headers)

    if response.status_code == 200:
        return response.json()['result'][0]['data']
    else:
        return 'error'

def send_text(text):  # 添加 result 參數用於傳遞消息內容
    data = {
        "msgtype": "markdown",
        "markdown": {
            "content": text + "\n<@allen>"
        }
    }
    requests.post(rbt_key, headers=headers, json=data)

def send_image():  # 添加 result 參數用於傳遞消息內容
    # send_text(text)
    data = {
        "msgtype": "news",
        "news": {
            "articles": [
                {
                    "title": "==未拉OA提醒==",
                    "description": "提醒各位拉OA",
                    "url": "\\\\ip地址\\is-department\\小組\\myoa\\image\\schedule_table.png",
                    "picurl": "https://pic.616pic.com/ys_img/00/99/99/AdcfGUOyJR.jpg"
                }
            ]
        }

    }
    requests.post(rbt_key, headers=headers, json=data)

數據時間處理


from datetime import datetime, timedelta
import pytz


# 時間戳轉美國和中國時區
def convert_to_cst(timestamp, time_zone):
    dt = datetime.fromtimestamp(timestamp, pytz.utc)  # 将时间戳转换为datetime对象(UTC时间)
    pst = dt.astimezone(pytz.timezone(time_zone))  # 转换为美国PST时区时间
    return pst
    # return pst.strftime('%Y-%m-%d %H:%M:%S')  # 格式化为字符串


def convert_to_pst(timestamp):
    dt = datetime.fromtimestamp(timestamp/1000)  # 将时间戳转换为datetime对象(UTC时间)
    pst = dt + timedelta(hours=7)  # 增加8小时,即美国PST时间
    return pst.strftime('%Y-%m-%d %H:%M:%S')  # 格式化为字符串

主程序

import data_util, time_util
import pandas as pd
import matplotlib.pyplot as plt


if __name__ == '__main__':

    data = data_util.get_superset_data()
    if(len(data)>0):
    # 应用函数转换时间戳
        for entry in data:
            entry['日程开始时间'] = time_util.convert_to_pst(entry['日程开始时间'])
            entry['日程结束时间'] = time_util.convert_to_pst(entry['日程结束时间'])

        # 创建DataFrame
        df = pd.DataFrame(data)
        # 如果不为空,则绘制表格
        # 指定中文字体
        plt.rcParams['font.sans-serif'] = ['SimHei']
        plt.rcParams['axes.unicode_minus'] = False

        # 保存为图片,并指定dpi参数
        fig, ax = plt.subplots(figsize=(10, 6), dpi=500)
        ax.axis('tight')
        ax.axis('off')
        ax.table(cellText=df.values, colLabels=df.columns, loc='center', cellLoc='center')

        # 自定义保存路径
         save_path = 'image/schedule_table.png'
        print(save_path)
        plt.savefig(save_path)
        # 可选:显示保存成功的提示
        print(f"图片已保存至:{save_path}")

    else:
        print("error")

    data_util.send_image()

2.發送純圖片

主程序

import data_util, time_util
import pandas as pd
import matplotlib.pyplot as plt
import base64
import hashlib
if __name__ == '__main__':

    data = data_util.get_superset_data()
    if (len(data) > 0):
        # 应用函数转换时间戳
        for entry in data:
            entry['日程开始时间'] = time_util.convert_to_pst(entry['日程开始时间'])
            entry['日程结束时间'] = time_util.convert_to_pst(entry['日程结束时间'])

        # 创建DataFrame
        df = pd.DataFrame(data)
        # 如果不为空,则绘制表格
        # 指定中文字体
        plt.rcParams['font.sans-serif'] = ['SimHei']
        plt.rcParams['axes.unicode_minus'] = False

        # 保存为图片,并指定dpi参数
        fig, ax = plt.subplots(figsize=(10, 6), dpi=500)
        ax.axis('tight')
        ax.axis('off')
        ax.table(cellText=df.values, colLabels=df.columns, loc='center', cellLoc='center')

        # 自定义保存路径
        save_path = 'image/schedule_table.png'

        # print(save_path)
        plt.savefig(save_path)
        # 可选:显示保存成功的提示
        print(f"图片已保存至:{save_path}")

        # 1. 读取图片文件内容
        with open("image/schedule_table.png", "rb") as f:
            image_data = f.read()

        # 2. 对图片内容进行Base64编码
        base64_image = base64.b64encode(image_data).decode('utf-8')

        # 3. 计算图片内容的MD5值
        md5_hash = hashlib.md5(image_data).hexdigest()

        data_util.send_image(base64_image, md5_hash)
    else:
        print("error")

數據接口


def send_image(data, md5):  # 添加 result 參數用於傳遞消息內容
    # send_text(text)
    data = {
        "msgtype": "image",
        "image": {
            "base64": data,
            "md5": md5
        }
    }
    requests.post(rbt_key, headers=headers, json=data)

3.發送excel,csv,pdf

主程序

import data_util, time_util
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages

if __name__ == '__main__':

    data = data_util.get_superset_data()
    df = ''
    plt.rcParams['font.sans-serif'] = ['SimHei']
    plt.rcParams['axes.unicode_minus'] = False

    if (len(data) > 0):
        # 应用函数转换时间戳
        for entry in data:
            entry['日程开始时间'] = time_util.convert_to_pst(entry['日程开始时间'])
            entry['日程结束时间'] = time_util.convert_to_pst(entry['日程结束时间'])

            if isinstance(data, dict):
                if data:
                    df = pd.DataFrame.from_records([data])
                else:
                    print("Data is an empty dictionary")
                    df = pd.DataFrame()
            elif isinstance(data, list):
                dfs = [pd.DataFrame.from_records([item]) for item in data if isinstance(item, dict)]
                df = pd.concat(dfs, ignore_index=True)
            else:
                print("Data is not a list or a dictionary")
                df = pd.DataFrame()

        # print(df)
        df.to_excel('media.xlsx', index=False)
        df.to_csv('media.csv', index=False)


            # 创建一个PDF文件
        with PdfPages('media.pdf') as pdf:
            # 将DataFrame绘制成表格
            fig, ax = plt.subplots(figsize=(8.27, 11.69), dpi=400)  # 设置dpi为300,提高图像质量
            ax.axis('tight')
            ax.axis('off')
            ax.table(cellText=df.values, colLabels=df.columns, loc='top', cellLoc='center')
            pdf.savefig(dpi=400)  # 将当前图形保存到PDF,并设置dpi
            plt.close()

        print("PDF文件已生成")

     data = data_util.send_file('media.pdf')
     if data != 'error':
         data_util.get_file(data)

調用接口

def send_file(file):
    # 设置请求参数
    url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key=key&type=file"
    data = {'file': open(file, 'rb')}  # post jason
    response = requests.post(url=url, files=data)  # post 请求上传文件
    if response.status_code == 200:
        return response.json()['media_id']
    else:
        return 'error'


def get_file(media_id):  # 添加 result 參數用於傳遞消息內容
    data = {
        "msgtype": "file",
        "file": {
            "media_id": media_id
        }
    }
    requests.post(rbt_key, headers=headers, json=data)