radius挑战认证+对接华为防火墙实现SSLvpn +钉钉 双因子认证2

时间:2024-02-19 16:39:33

上文,我们简单实现了 使用pyrad 搭建radius对接防火墙挑战认证 但是挑战码只在控制台输出就有点不太合适,我们接下来使用python对接钉钉,及时的把动态码发给钉钉使用

不说废话 上代码

# -*- coding: UTF-8 -*-
from pyrad import *
import socket
import pyrad.host
import random
import pymysql
from dingtalk import *
import requests
import json
BUFSIZE = 1024
KEY = b"testing123456789"
CHALLENGE = "666" #挑战码初始为666
appkey=\'xxx\'#钉钉的appkey
appsecret=\'xxxx\' #钉钉的appsecret 在钉钉的管理后台这两个参数都能找到
class RadiusServer(pyrad.host.Host):
    def __init__(self):
        dict = pyrad.dictionary.Dictionary("dictionary") #从通用的字典使用
        pyrad.host.Host.__init__(self, dict=dict)
    def get_challenge(self): #产生一个4字节的随机挑战码
        challenge = ""  #初始化挑战码
        challenge = challenge + str(chr(random.randint(65,90)))
        challenge = challenge + str(chr(random.randint(65,90)))
        challenge = challenge + str(chr(random.randint(65,90)))
        challenge = challenge + str(chr(random.randint(65,90)))
        return challenge
    def get_token(self):
        url = \'https://oapi.dingtalk.com/gettoken?appkey=\'+appkey+\'&appsecret=\'+appsecret
        response = requests.get(url=url)
        result = response.json()
        errmsg = result[\'errmsg\']
        print(\'获取token是否成功:\',errmsg)
        try:
            access_token = result[\'access_token\']
        except Exception as e:
            print(e)
            access_token = \'\'
        return access_token                 
    def check_pass(self, radpkt): 
        global CHALLENGE
        global access_token1
        conn=pymysql.connect(
            host=\'xxxx\',
            port=3306,   
            user=\'root\',
            password=\'xxxx\',
            db=\'xxxx\',#这都是数据库的参数 按照你实际的情况来
            charset=\'utf8\'
        )
         # 拿到游标
        cursor=conn.cursor()
        user=radpkt["User-Name"][0]
        print(user)
        password=radpkt["User-Password"][0]
        print(password)
         # 执行sql语句 
        sql=\'select * from test where user = "%s" \'% (user)
        res=cursor.execute(sql)
        pwd=cursor.fetchall()
        cursor.close()
        conn.close()     
        if radpkt.PwCrypt(CHALLENGE) == radpkt["User-Password"][0]:
            radpkt.code = packet.AccessAccept
            CHALLENGE = self.get_challenge() #挑战码使用过后就更换掉
            radpkt.AddAttribute("Reply-Message","PASSCODE Accept")
            print("AccessAccept")                
        elif radpkt.PwCrypt(pwd[0][1]) == radpkt["User-Password"][0]:
            radpkt.code = packet.AccessChallenge
            CHALLENGE = self.get_challenge()
            radpkt.AddAttribute("Reply-Message","Enter Token Code") 
            radpkt.AddAttribute("State",b\'0x1122123231\') #最好这个还是随机生成,好吧我有点偷懒
            #下面发送验证码
            access_token1 = self.get_token()
            userid=pwd[0][2]
            print(\'钉钉id为:\',pwd[0][2])
            url = \'https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2?access_token=\'+access_token1
            content = \'你好,VPN验证码为\'+CHALLENGE
            _neirong = {
                     \'agent_id\':\'xxxx\',#应用id钉钉后台查看
                     \'userid_list\':userid,
                     \'msg\':{
                            \'msgtype\':\'text\',
                            \'text\':{\'content\':content}}
                        }
            neirong = json.dumps(_neirong)
            response = requests.post(url=url,data=neirong)
            result = response.json()
            print(result)            
        else:
            radpkt.code = packet.AccessReject
            print("AccessReject")
    def get_pkt(self, pkt):
        get_pw = None
        get_name = None
        radpkt = self.CreateAuthPacket(packet=pkt) #解析请求报文
        #radpkt.code = packet.AccessChallenge
        radpkt.secret = KEY
        print("输出相关参数")
        for key in radpkt.keys():        
            print(key, radpkt[key])
            if key == "User-Password":
                get_pw = 1
            if key == "User-Name":
                get_name = 1       
        if 1 == get_pw and 1 == get_name:
            self.check_pass(radpkt)        
        return radpkt.ReplyPacket()         
ip_port = (\'\', 1812)
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # udp协议
server.bind(ip_port)


while True:
    data,client_addr = server.recvfrom(BUFSIZE)
    srv = RadiusServer()
    reply = srv.get_pkt(data)
    server.sendto(reply, client_addr)    

不得不说一下 钉钉官方的sdk还是python 2 有点小坑  。但是手册写的很清楚,参考手册还是能搞明白的。