由于公司的系统用的是java版本,开通了企业号打卡之后又没有预算让供应商做数据对接,所以只能自己捣鼓这个,以下是个人设置的一些内容,仅供大家参考
安装python
python的安装,这里就不详细写了,大家可自行度娘或google。
安装第三方库
python安装好之后别忘记配置环境变量!另外,所以的内容都是安装在服务器上的,且服务器需要能够上外网,否则,只能配置在本地,因为需要外网连接微信企业号的接口。这里需要用到几个第三方库:
python的pip命令,一般python安装好之后都会默认有,如果不确定,可输入命令查询,通过cmd进入命令提示符,输入
pip list
如果提示你需要更新,你可以更新,也可以不更新,更新命令其实给到你了python -m pip install --upgrade pip
安装所需要的库
step.1
pip install pymssql
如果安装pymssql出错,提示什么visual c++ 14,则先安装wheel,如不报错则忽略step2、step3
step.2
pip install wheel
step.3
下载pymssql-2.1.4.dev5-cp37-cp37m-win_amd64.whl
可去这里下载最新版本的。pymssql下载
下载好之后,进入该文件所在的目录,通过pip install安装即可cd d:\
pip install pymssql-2.1.4.dev5-cp37-cp37m-win_amd64.whl
step.4
pip install requests
至此,所有第三方库都配置好了。
写主程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# !/usr/bin/python
# -*- coding:utf-8 -*-
# @time: 2018/7/26 16:05
# @author: hychen.cc
import json # 因微信企业号返回的格式为json,所以引入json
import requests
import pymssql
import math # 引入数学方法
import time
import datetime
server = 'xx.xx.xx.xx' # 数据库服务器地址
user = 'sa' # 数据库登录名,可以用sa
password = '******' # 数据库用户对应的密码
dbname = 'dbname' # 数据库名称
corp_id = 'xxxxxx' # 微信企业号提供的corp_id
corp_secret = 'xxxxx' # 微信企业号提供的corp_secret
"""
|
因微信接口所需要unix时间戳,所以需要把时间转为为unix时间戳格式
定义时间转换为unix时间方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
"""def datetime_timestamp(dt):
# dt为字符串
# 中间过程,一般都需要将字符串转化为时间数组
time.strptime(dt, '%y-%m-%d %h:%m:%s')
## time.struct_time(tm_year=2018, tm_mon=10, tm_mday=25, tm_hour=10, tm_min=0, tm_sec=0, tm_wday=0, tm_yday=88, tm_isdst=-1)
# 将"2018-10-25 10:00:00"转化为时间戳
s = time.mktime(time.strptime(dt, '%y-%m-%d %h:%m:%s'))
return int(s)
# 定义连接数据库方法
def get_link_server():
connection = pymssql.connect(server, user, password, database=dbname)
if connection:
return connection
else:
raise valueerror('connect dbserver failed.')
"""
|
定义获取用户列表,因为微信企业号一次最大只能获取100个,所以需要转换为列表格式,分批次获取
我这里设置是从db中获取有权限微信打卡的人员(select * from table),换成自己的方式即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
"""
def get_userid_list():
"""
获取用户列表
: return :
"""
conn = get_link_server()
cursor = conn.cursor()
sql = "select * from table"
cursor.execute(sql)
row = cursor.fetchone()
userlist = []
while row:
userlist.append(row[0])
row = cursor.fetchone()
if userlist:
return userlist
else:
raise valueerror('get userlist failed.')
conn.close()
"""
|
获取access_token,因为token有时效(2小时),所以需要存在本地,这样不需要频繁调用,所以我定义了存储过程(sp_getwx_access_token)来判断之前存储的token是否有效,有效的话就不需要重复获取了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
"""
def get_access_token(refresh=false):
"""
获取access token
: return :
"""
if not refresh:
api_access_token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s" % (
corp_id, corp_secret)
response = requests.get(api_access_token_url, verify = false)
if response.status_code = = 200 :
rep_dict = json.loads(response.text)
errcode = rep_dict.get( 'errcode' )
if errcode:
raise valueerror( 'get wechat access token failed, errcode=%s.' % errcode)
else :
access_token = rep_dict.get( 'access_token' )
if access_token:
conn = get_link_server()
cursor = conn.cursor()
cursor.execute( 'exec sp_getwx_access_token @access_token=%s' , access_token)
conn.commit()
conn.close()
return access_token
else :
raise valueerror( 'get wechat access token failed.' )
else :
raise valueerror( 'get wechat access token failed.' )
else :
conn = get_link_server()
cursor = conn.cursor()
cursor.execute( "select access_token from wx_accesstoken where id=1" )
access_token = cursor.fetchone()
if access_token:
return access_token[ 0 ]
conn.close()
else :
api_access_token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s" % (
corp_id, corp_secret)
response = requests.get(api_access_token_url, verify = false)
if response.status_code = = 200 :
rep_dict = json.loads(response.text)
errcode = rep_dict.get( 'errcode' )
if errcode:
raise valueerror( 'get wechat access token failed, errcode=%s.' % errcode)
else :
access_token = rep_dict.get( 'access_token' )
if access_token:
conn = get_link_server()
cursor = conn.cursor()
cursor.execute( 'exec sp_getwx_access_token @access_token=%s' , access_token)
conn.commit()
conn.close()
return access_token
else :
raise valueerror( 'get wechat access token failed.' )
else :
raise valueerror( 'get wechat access token failed.' )
# 获取微信打卡的json格式
def get_punchcard_info(access_token, opencheckindatatype, starttime, endtime, useridlist):
api_punch_card_url = 'https://qyapi.weixin.qq.com/cgi-bin/checkin/getcheckindata?access_token=' + access_token
json_str = json.dumps(
{ 'opencheckindatatype' : opencheckindatatype, 'starttime' : starttime, 'endtime' : endtime, 'useridlist' : useridlist})
response = requests.post(api_punch_card_url, data = json_str, verify = false)
if response.status_code = = 200 :
rep_dic = json.loads(response.text)
errcode = rep_dic.get( 'errcode' )
if errcode = = 42001 :
access_token = get_access_token(true)
api_punch_card_url = 'https://qyapi.weixin.qq.com/cgi-bin/checkin/getcheckindata?access_token=' + access_token
json_str = json.dumps(
{ 'opencheckindatatype' : opencheckindatatype, 'starttime' : starttime, 'endtime' : endtime,
'useridlist' : useridlist})
response = requests.post(api_punch_card_url, data = json_str, verify = false)
rep_dic = json.loads(response.text)
errcode = rep_dic.get( 'errcode' )
if errcode:
raise valueerror( 'get punch data failed1, errcode=%s' % errcode)
else :
value_str = rep_dic.get( 'checkindata' )
if value_str:
return value_str
else :
raise valueerror( 'get punch data failed2.' )
elif errcode:
raise valueerror ( 'get punch data failed3, errcode=%s' % errcode)
else :
value_str = rep_dic.get( 'checkindata' )
if value_str:
return value_str
else :
raise valueerror( 'i do not find employee punch data.' )
else :
raise valueerror ( 'get punch data failed5.' )
# 调用接口,获得数据
if __name__ = = '__main__' :
today = datetime.date.today()
oneday = datetime.timedelta(days = 3 ) # days,即获取几天内的
yesterday = today - oneday
starttime = datetime_timestamp(yesterday.strftime( '%y-%m-%d' ) + ' 00:00:00' )
endtime = datetime_timestamp(today.strftime( '%y-%m-%d' ) + ' 23:59:59' )
opencheckindatatype = 3
access_token = get_access_token()
if access_token:
useridlist = get_userid_list()
if useridlist:
step = 100
total = len (useridlist)
n = math.ceil(total / step)
for i in range (n):
# print (useridlist[i*step:(i+1)*step])
punch_card = get_punchcard_info(access_token, opencheckindatatype, starttime, endtime,useridlist[i * step:(i + 1 ) * step])
# print (punch_card)
if punch_card:
conn = get_link_server()
cursor = conn.cursor()
for dic_obj in punch_card:
cursor.execute( 'exec sp_analysispunchcard @json=%s' ,
(json.dumps(dic_obj, ensure_ascii = false)))
# print((json.dumps(dic_obj, ensure_ascii=false))),sp_analysispunchcard把获取到的数据解析后存入数据库中
conn.commit()
conn.close()
print ( 'get punch card successed.' )
else :
raise valueerror( 'no userlist exists' )
|
设置windows计划任务
通过控制面板-管理工具-任务计划程序,右击选择创建基本任务,这里注意的是路径和程序。
程序或脚本:python.exe
添加参数(可选)(a):你的py文件目录
起始于:python目录,如果不知道python安装到哪去了,按照下列cmd命令,输入python后进入python命令查询
1
2
|
import sys
sys.prefix,回车
|
到此,配置完成,可自行右击任务-执行查询效果,或者通过python命令执行py文件
进入到py文件目录
python xxx.py
总结
以上所述是小编给大家介绍的python获取微信企业号打卡数据并生成windows计划任务,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!原文链接:https://blog.csdn.net/henvey1988/article/details/83411665