需求:监控多个重要网站(多线程),出现访问异常重试2次,第三次开始告警。
日志模块
import logging
import logging.config # 日志配置
LOGCONF_FILENAME = "../etc/logging.conf"
logging.config.fileConfig(LOGCONF_FILENAME)
logger = logging.getLogger('wj')
日志配置logging.conf文件(按天自动备份日志,自动删除过期日志)
###############################################
[loggers]
keys=root,wj [logger_root]
level=DEBUG
handlers=hand01 [logger_wj]
handlers=hand02
qualname=wj
propagate=0 ###############################################
[handlers]
keys=hand01,hand02 [handler_hand01]
class=StreamHandler
level=DEBUG
formatter=formatter02
args=(sys.stdout,) [handler_hand02]
class=handlers.TimedRotatingFileHandler
level=DEBUG
formatter=formatter02
args=('../log/portal_eastmonitor.log','midnight',1,30) ###############################################
[formatters]
keys=formatter01,formatter02 [formatter_formatter01]
format=%(asctime)s - %(process)d - %(thread)d - %(module)s - %(levelname)s - %(message)s
datefmt=%Y-%m-%d %H:%M:%S [formatter_formatter02]
format=%(asctime)s - %(process)d - %(thread)d - %(module)s - %(levelname)s - %(message)s
datefmt=
http访问模块
import pycurl
import StringIO
import certifi # 初始化Pycurl类
def initCurl():
c = pycurl.Curl()
c.setopt(pycurl.CONNECTTIMEOUT, 10) # 链接超时
c.setopt(pycurl.TIMEOUT, 15) # 下载超时
#c.setopt(pycurl.COOKIEFILE, "tradeh5_cookie")
#c.setopt(pycurl.COOKIEJAR, "tradeh5_cookie")
c.setopt(pycurl.FOLLOWLOCATION, 0)
c.setopt(pycurl.MAXREDIRS, 5)
c.setopt(pycurl.SSL_VERIFYPEER, 0)
c.setopt(pycurl.SSL_VERIFYHOST, 0)
c.setopt(pycurl.CAINFO, certifi.where())
c.setopt(pycurl.NOSIGNAL, 1)
return c # 封装了通用Http Get方法
def GetData(curl, url):
head = ['Accept:*/*', 'User-Agent:Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; WOW64; Trident/6.0)']
buf = StringIO.StringIO()
curl.setopt(pycurl.WRITEFUNCTION, buf.write)
curl.setopt(pycurl.URL, url)
curl.setopt(pycurl.HTTPHEADER, head)
curl.perform()
the_page = buf.getvalue()
buf.close()
return the_page
主程序,url自定义网站地址
#/usr/bin/env python
# -*- coding: utf-8 -*- import time
import pycurl
import ssl
import os
import sys
import re
import threading
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # 取绝对路径
sys.path.append(BASE_DIR) # 将目录加入系统环境变量,之后导入此目录下各模块不会提示找不到
from modules.log_config import *
from modules.http_func import *
from functools import wraps local = threading.local()
q = threading.Lock()
event = threading.Event() def retry(MyException, tries=1, delay=10, total=8):
"""
:param MyException: 告警内容
:param tries: 告警重试初始值
:param delay: 告警重试间隔
:param total: 告警重试总数
:return:
"""
def deco_retry(f):
@wraps(f)
def f_retry(*args, **kwargs):
mtries, mdelay, mtotal = tries, delay, total
while mtries < mtotal + 1:
try:
return f(*args, **kwargs)
except MyException as (node_id, host_id, response_time, log_time, monitor_ip, pname, monitor_account, err, desc, region):
if mtries < 3:
logger.error("重试次数:%s, 错误内容:%s" % (mtries, err))
mtries += 1
time.sleep(delay)
elif 3 <= mtries <= 7:
logger.error("重试次数:%s, 错误内容:%s" % (mtries, err))
time.sleep(delay)
mtries += 1
else:
mtries += 1
return False
return f_retry
return deco_retry class Portal_monitor(object):
"""
run: 主运行方法
http_access: 访问重要站点页面
update_mysql: 站点页面访问成功后,更新告警数据标志位,设置为恢复告警
""" def __init__(self):
pass def run(self, url):
self.http_access(url) @retry(Exception)
def http_access(self, url):
try:
c = initCurl()
a = threading.currentThread().getName()
req = GetData(c, url)
http_total_time = c.getinfo(pycurl.TOTAL_TIME)
backstatus = c.getinfo(pycurl.HTTP_CODE)
coasttime = str(http_total_time*1000) + 'ms'
if not req:
raise Exception
logger.debug("[%s][%s][%s]:响应时间:%s || 返回状态:%s"
% (portal_dict[url], url, a, coasttime, backstatus))
except Exception as e:
# 异常信息只保留字母和数字,特殊字符去掉
rule = re.compile(r'[^a-zA-z0-9]')
desc = rule.sub(' ', str(e))
err = '[访问:%s异常]%s, %s' % (url, portal_dict[url], desc)
logger.error(err)
log_time = int(time.time())
node_id = '28'
raise Exception(node_id, sql_prefixs[url], '', log_time,
monitor_ip, pname, '', err, portal_dict[url], '0') def run():
portal = Portal_monitor()
threads = []
logger.debug('*************************************************************************************************')
for url in portal_dict:
threads.append(threading.Thread(target=portal.run, args=(url,))) # 创建threads数组
for t in threads:
t.setDaemon(True)
t.start()
for t in threads:
t.join() # 等待子线程执行完成,才去执行主线程
logger.debug('*************************************************************************************************\n') def pid():
pid = os.getpid()
fp = file("../var/pid", "wt")
fp.write("%d" % pid)
fp.close() if __name__ == '__main__': # 关闭ssl证书验证
ssl._create_default_https_context = ssl._create_unverified_context pid() while True:
run()
time.sleep(30)
在threading模块中,定义两种类型的锁:threading.Lock和threading.RLock。它们之间有一点细微的区别,通过比较下面两段代码来说明:
- import threading
- lock = threading.Lock() #Lock对象
- lock.acquire()
- lock.acquire() #产生了死锁。
- lock.release()
- lock.release()
- import threading
- rLock = threading.RLock() #RLock对象
- rLock.acquire()
- rLock.acquire() #在同一线程内,程序不会堵塞。
- rLock.release()
- rLock.release()
这两种琐的主要区别是:RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意:如果使用RLock,那么acquire 和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁。 threading.Condition
可以把Condiftion理解为一把高级的锁,它提供了比Lock,RLock更高级的功能,允许我们能够控制复杂的线程同步问题。 threadiong.Condition在内部维护一个锁对象(默认是RLock),可以在创建Condigtion对象的时候把锁对象作为参数传入。 Condition也提供了acquire,release方法,其含义与锁的acquire,release方法一致,其实它只是简单的调用内部锁对象 的对应的方法而已。Condition还提供了如下方法(特别要注意:这些方法只有在占用锁(acquire)之后才能调用,否则将会报 RuntimeError异常。):
Condition.wait([timeout]):
wait方法释放内部所占用的锁,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有锁的时候,程序才会继续执行下去。
Condition.notify():
唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的锁。
Condition.notify_all()
Condition.notifyAll()
唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的锁。