本文实例讲述了Python实现的微信公众号群发图片与文本消息功能。分享给大家供大家参考,具体如下:
在微信公众号开发中,使用api都要附加access_token内容。因此,首先需要获取access_token。如下:
1
2
3
4
5
6
7
8
9
10
11
|
#获取微信access_token
def get_token():
payload_access_token = {
'grant_type' : 'client_credential' ,
'appid' : 'xxxxxxxxxxxxx' ,
'secret' : 'xxxxxxxxxxxxx'
}
token_url = 'https://api.weixin.qq.com/cgi-bin/token'
r = requests.get(token_url,params = payload_access_token)
dict_result = (r.json())
return dict_result[ 'access_token' ]
|
在群发图片时,需要提供已经上传图片的media_id。注意,群发图片的时候,必须使用接口:https://api.weixin.qq.com/cgi-bin/material/add_material。
1
2
3
4
5
6
7
8
9
10
11
12
|
#获取上传文件的media_ID
#群发图片的时候,必须使用该api提供的media_ID
def get_media_ID(path):
img_url = 'https://api.weixin.qq.com/cgi-bin/material/add_material'
payload_img = {
'access_token' :get_token(),
'type' : 'image'
}
data = { 'media' : open (path, 'rb' )}
r = requests.post(url = img_url,params = payload_img,files = data)
dict = r.json()
return dict [ 'media_id' ]
|
订阅号进行群发,必须通过分组id,首先需要获取所有的用户分组情况。
1
2
3
4
5
6
7
8
9
|
#查询所有用户分组信息
def get_group_id():
url = "https://api.weixin.qq.com/cgi-bin/groups/get"
payload_id = {
'access_token' :get_token()
}
r = requests.get(url = url,params = payload_id)
result = r.json()
return result[ 'groups' ]
|
需要选择一个分组进行群发,在这里我选择第一个有效的分组进行群发(即第一个分组用户数不为0的分组)。
1
2
3
4
5
6
7
8
9
|
#返回第一个有效的group 分组id
def get_first_group_id():
groups = get_group_id()
group_id = 0
for group in groups:
if (group[ 'count' ]! = 0 ):
group_id = group[ 'id' ]
break ;
return group_id
|
下面的代码用于群发文本消息,群发给第一个有效的分组:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
def send_txt_to_first_group( str = 'Hello World!' ):
group_id = get_first_group_id()
pay_send_all = {
"filter" :{
"is_to_all" : False ,
"group_id" :group_id
},
"text" :{
"content" : str
},
"msgtype" : "text"
}
url = "https://api.weixin.qq.com/cgi-bin/message/mass/sendall?access_token=" + get_token()
#需要指定json编码的时候不会对中文转码为unicode,否则群发的消息会显示为unicode码,不能正确显示
r = requests.post(url = url,data = json.dumps(pay_send_all,ensure_ascii = False ,indent = 2 )) #此处的必须指定此参数
result = r.json()
#根据返回码的内容是否为0判断是否成功
return result[ 'errcode' ] = = 0
|
下面的代码用于群发图片,群发给第一个有效的分组。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
def send_img_to_first_group(path = '/home/fit/Desktop/test.jpg' ):
group_id = get_first_group_id()
pay_send_all = {
"filter" :{
"is_to_all" : False ,
"group_id" :group_id
},
"image" :{
"media_id" :get_media_ID(path)
},
"msgtype" : "image"
}
url = "https://api.weixin.qq.com/cgi-bin/message/mass/sendall?access_token=" + get_token()
r = requests.post(url = url,data = json.dumps(pay_send_all))
result = r.json()
#根据返回码的内容是否为0判断是否成功
return result[ 'errcode' ] = = 0
|
以下是所有代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
# -*- coding: utf-8 -*-
import requests
#首先获取access_token
import json
#获取微信access_token
def get_token():
payload_access_token = {
'grant_type' : 'client_credential' ,
'appid' : 'xxxxxxxxxx' ,
'secret' : 'xxxxxxxxx'
}
token_url = 'https://api.weixin.qq.com/cgi-bin/token'
r = requests.get(token_url,params = payload_access_token)
dict_result = (r.json())
return dict_result[ 'access_token' ]
#获取上传文件的media_ID
#群发图片的时候,必须使用该api提供的media_ID
def get_media_ID(path):
img_url = 'https://api.weixin.qq.com/cgi-bin/material/add_material'
payload_img = {
'access_token' :get_token(),
'type' : 'image'
}
data = { 'media' : open (path, 'rb' )}
r = requests.post(url = img_url,params = payload_img,files = data)
dict = r.json()
return dict [ 'media_id' ]
#查询所有用户分组信息
def get_group_id():
url = "https://api.weixin.qq.com/cgi-bin/groups/get"
payload_id = {
'access_token' :get_token()
}
r = requests.get(url = url,params = payload_id)
result = r.json()
return result[ 'groups' ]
#返回第一个有效的group 分组id
def get_first_group_id():
groups = get_group_id()
group_id = 0
for group in groups:
if (group[ 'count' ]! = 0 ):
group_id = group[ 'id' ]
break ;
return group_id
def send_img_to_first_group(path = '/home/fit/Desktop/test.jpg' ):
group_id = get_first_group_id()
pay_send_all = {
"filter" :{
"is_to_all" : False ,
"group_id" :group_id
},
"image" :{
"media_id" :get_media_ID(path)
},
"msgtype" : "image"
}
url = "https://api.weixin.qq.com/cgi-bin/message/mass/sendall?access_token=" + get_token()
r = requests.post(url = url,data = json.dumps(pay_send_all))
result = r.json()
print result
#根据返回码的内容是否为0判断是否成功
return result[ 'errcode' ] = = 0
def send_txt_to_first_group( str = 'Hello World!' ):
group_id = get_first_group_id()
pay_send_all = {
"filter" :{
"is_to_all" : False ,
"group_id" :group_id
},
"text" :{
"content" : str
},
"msgtype" : "text"
}
url = "https://api.weixin.qq.com/cgi-bin/message/mass/sendall?access_token=" + get_token()
#需要指定json编码的时候不会对中文转码为unicode,否则群发的消息会显示为unicode码,不能正确显示
r = requests.post(url = url,data = json.dumps(pay_send_all,ensure_ascii = False ,indent = 2 )) #此处的必须指定此参数
result = r.json()
#根据返回码的内容是否为0判断是否成功
return result[ 'errcode' ] = = 0
if (send_txt_to_first_group( "祝你合家欢乐,幸福美满!" )):
print 'success!'
else :
print 'fail!'
|
附录:在使用微信测试订阅号测试群发图片接口的时候,返回码如下:
1
|
{u 'errcode' : 45028, u 'errmsg' : u 'has no masssend quota hint: [OKvFdA0813ge12]' }
|
这是因为测试订阅号没有群发图文消息的权限,并不是因为接口调用有误。
PS:
作者的github: https://github.com/zhoudayang
希望本文所述对大家Python程序设计有所帮助。