自动化运维脚本-python

时间:2022-12-21 21:44:36

准备工作

1.创建一个服务,服务名为:playgame

https://pypi.org/project/srvwrapper/

安装srvwrapper

pip install srvwrapper

准备一个死循环的代码 ---play_game.py,代码如下:

import time

while True:
    print("你好")

文件路径:D:\test1\dingding\play_game.py

 

打开管理员DOS窗口,命令如下:

srvwrapper playgame python --arguments "\"D:\test1\dingding\play_game.py""

 这样就创建好了一个playgame服务

 

删除服务命令

sc delete playgame

 

 

上代码

 安装

pip install wmi
pip install pypiwin32

新建文件,dingding.py,代码如下:

import wmi
import os
import requests
import json
import time

time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
monitor_name = {"playgame"}
headers = {"Content-Type": "application/json; charset=utf-8"}
url = "https://oapi.dingtalk.com/robot/..."  # 钉钉机器人地址
monitor_map = {
    "playgame": "net start playgame"
}


while True:
    w1 = wmi.WMI()
    services = w1.Win32_Service()
    r = set()
  # 遍历所有服务
for i in services: if i.State == "Running": r.add(i.Name) monitor_stop = monitor_name - r if monitor_stop: for s in monitor_stop: p_status = "停止" p_name = s data = { "msgtype": "markdown", "markdown": { "title": "监控讯息", "text": "%s \n" % time_now + ">#### 服务名:%s \n\n" % p_name + ">#### 状态:%s \n" % p_status + ">#### 正在尝试启动" }, } send_data = json.dumps(data).encode("utf-8") requests.post(url=url, data=send_data, headers=headers) # 执行系统命令 os.system(monitor_map[s]) w2 = wmi.WMI() s2 = w2.Win32_Service() running = set() for j in s2: if j.State == "Running": running.add(j.Name) if s in running: p_status = "正在运行" data = { "msgtype": "markdown", "markdown": { "title": "监控讯息", "text": "%s \n" % time_now + ">#### 服务名:%s \n\n" % p_name + ">#### 状态:%s \n" % p_status + ">#### 启动成功" }, } send_data = json.dumps(data).encode("utf-8") requests.post(url=url, data=send_data, headers=headers) else: p_status = "停止" data = { "msgtype": "markdown", "markdown": { "title": "监控讯息", "text": "%s \n" % time_now + ">#### 服务名:%s \n\n" % p_name + ">#### 状态:%s \n" % p_status + ">#### 启动失败" }, } send_data = json.dumps(data).encode("utf-8") requests.post(url=url, data=send_data, headers=headers)

 注意:使用管理员来启动dingding.py脚本

 

 

 下面代码是进程监控及自动开启

import psutil

# mem = psutil.virtual_memory()
# mem_dict = {}
# mem_dict["内存总数"] = str(mem.total/1000000000) + 'G'
# mem_dict["已使用内存"] = str(mem.used/1000000000) + 'G'
# mem_dict["可用内存"] = str(mem.free/1000000000) + 'G'
# mem_dict["占比"] = mem.percent


import os
import requests
import json
import time

time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
monitor_name = {"ServiceWrapper.exe"}
headers = {"Content-Type": "application/json; charset=utf-8"}
url = "https://oapi.dingtalk.com/robot/send?access_token=d3d68aaab32efae5ea389949a9b604b080da0b265f5aacf4e0a82e8e97d557b6"
monitor_map = {
    "ServiceWrapper.exe": "net start playgame"
}
while True:
    proc_dict = {}
    proc_name = set()
  # 获取所有进程
    for p in psutil.process_iter(attrs=['pid', 'name']):
        proc_dict[p.info['pid']] = p.info['name']
        proc_name.add(p.info['name'])

    proc_stop = monitor_name - proc_name

    if proc_stop:
        for p2 in proc_stop:
            p_status = "停止"

            p_name = p2
            data = {
                "msgtype": "markdown",
                "markdown": {
                    "title": "监控讯息",
                    "text": "%s \n" % time_now +
                            ">#### 服务名:%s \n\n" % p_name +
                            ">#### 状态:%s \n" % p_status +
                            ">#### 正在尝试启动"
                },
            }

            send_data = json.dumps(data).encode("utf-8")
            requests.post(url=url, data=send_data, headers=headers)
            # 执行系统命令
            os.system(monitor_map[p2])
            proc_name = set()

            for p3 in psutil.process_iter(attrs=['pid', 'name']):
                proc_name.add(p3.info['name'])

            if p2 in proc_name:
                p_status = "启动"
                data = {
                    "msgtype": "markdown",
                    "markdown": {
                        "title": "监控讯息",
                        "text": "%s \n" % time_now +
                                ">#### 服务名:%s \n\n" % p_name +
                                ">#### 状态:%s \n" % p_status +
                                ">#### 启动成功"
                    },
                }
                send_data = json.dumps(data).encode("utf-8")
                requests.post(url=url, data=send_data, headers=headers)
            else:
                p_status = "停止"
                data = {
                    "msgtype": "markdown",
                    "markdown": {
                        "title": "监控讯息",
                        "text": "%s \n" % time_now +
                                ">#### 服务名:%s \n\n" % p_name +
                                ">#### 状态:%s \n" % p_status +
                                ">#### 启动失败"
                    },
                }
                send_data = json.dumps(data).encode("utf-8")
                requests.post(url=url, data=send_data, headers=headers)
        time.sleep(5)