上文,我们简单实现了 使用pyrad 搭建radius对接防火墙挑战认证 但是挑战码只在控制台输出就有点不太合适,我们接下来使用python对接钉钉,及时的把动态码发给钉钉使用
不说废话 上代码
# -*- coding: UTF-8 -*- from pyrad import * import socket import pyrad.host import random import pymysql from dingtalk import * import requests import json BUFSIZE = 1024 KEY = b"testing123456789" CHALLENGE = "666" #挑战码初始为666 appkey=\'xxx\'#钉钉的appkey appsecret=\'xxxx\' #钉钉的appsecret 在钉钉的管理后台这两个参数都能找到 class RadiusServer(pyrad.host.Host): def __init__(self): dict = pyrad.dictionary.Dictionary("dictionary") #从通用的字典使用 pyrad.host.Host.__init__(self, dict=dict) def get_challenge(self): #产生一个4字节的随机挑战码 challenge = "" #初始化挑战码 challenge = challenge + str(chr(random.randint(65,90))) challenge = challenge + str(chr(random.randint(65,90))) challenge = challenge + str(chr(random.randint(65,90))) challenge = challenge + str(chr(random.randint(65,90))) return challenge def get_token(self): url = \'https://oapi.dingtalk.com/gettoken?appkey=\'+appkey+\'&appsecret=\'+appsecret response = requests.get(url=url) result = response.json() errmsg = result[\'errmsg\'] print(\'获取token是否成功:\',errmsg) try: access_token = result[\'access_token\'] except Exception as e: print(e) access_token = \'\' return access_token def check_pass(self, radpkt): global CHALLENGE global access_token1 conn=pymysql.connect( host=\'xxxx\', port=3306, user=\'root\', password=\'xxxx\', db=\'xxxx\',#这都是数据库的参数 按照你实际的情况来 charset=\'utf8\' ) # 拿到游标 cursor=conn.cursor() user=radpkt["User-Name"][0] print(user) password=radpkt["User-Password"][0] print(password) # 执行sql语句 sql=\'select * from test where user = "%s" \'% (user) res=cursor.execute(sql) pwd=cursor.fetchall() cursor.close() conn.close() if radpkt.PwCrypt(CHALLENGE) == radpkt["User-Password"][0]: radpkt.code = packet.AccessAccept CHALLENGE = self.get_challenge() #挑战码使用过后就更换掉 radpkt.AddAttribute("Reply-Message","PASSCODE Accept") print("AccessAccept") elif radpkt.PwCrypt(pwd[0][1]) == radpkt["User-Password"][0]: radpkt.code = packet.AccessChallenge CHALLENGE = self.get_challenge() radpkt.AddAttribute("Reply-Message","Enter Token Code") radpkt.AddAttribute("State",b\'0x1122123231\') #最好这个还是随机生成,好吧我有点偷懒 #下面发送验证码 access_token1 = self.get_token() userid=pwd[0][2] print(\'钉钉id为:\',pwd[0][2]) url = \'https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2?access_token=\'+access_token1 content = \'你好,VPN验证码为\'+CHALLENGE _neirong = { \'agent_id\':\'xxxx\',#应用id钉钉后台查看 \'userid_list\':userid, \'msg\':{ \'msgtype\':\'text\', \'text\':{\'content\':content}} } neirong = json.dumps(_neirong) response = requests.post(url=url,data=neirong) result = response.json() print(result) else: radpkt.code = packet.AccessReject print("AccessReject") def get_pkt(self, pkt): get_pw = None get_name = None radpkt = self.CreateAuthPacket(packet=pkt) #解析请求报文 #radpkt.code = packet.AccessChallenge radpkt.secret = KEY print("输出相关参数") for key in radpkt.keys(): print(key, radpkt[key]) if key == "User-Password": get_pw = 1 if key == "User-Name": get_name = 1 if 1 == get_pw and 1 == get_name: self.check_pass(radpkt) return radpkt.ReplyPacket() ip_port = (\'\', 1812) server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # udp协议 server.bind(ip_port) while True: data,client_addr = server.recvfrom(BUFSIZE) srv = RadiusServer() reply = srv.get_pkt(data) server.sendto(reply, client_addr)
不得不说一下 钉钉官方的sdk还是python 2 有点小坑 。但是手册写的很清楚,参考手册还是能搞明白的。