CVE漏洞爬虫脚本

时间:2024-07-14 21:46:20

前言:前阵子公司非要把CVE漏洞信息拉倒本地,说方便安全员们查看,其实在莫慌看来,大可不必,90%的CVE漏洞其实不具备现实利用的可能性的(不代表不能被利用),但是你说它没有意义也是不对的,比如将CVE漏洞和资产做好关联,当资产的版本信息刚好触发CVE是不是就能第一时间获得通知了!

      废话不多说,说下CVE爬虫的逻辑,CVE漏洞库是存放在git上面的,它会定时打一个增量包和打一个全量包,我们如果需要历史漏洞信息,可以下载全量包(官网也可以下载全量包),下面给大家展示是我随手写的增量包的漏洞下载,和全量包手工导入数据库的代码(数据库字段代码未放出,相比对各位老哥来说自己写个mysql插入不难),大家随心取用。

# -*- coding: utf-8 -*-
# @File    : cve_spider.py
# 爬取cve数据源:先拼接当天的cve zip包链接,然后解压读取里面的json文件,解析并导入数据库

import datetime
import shutil
import os,json
import requests
import zipfile
from logger import Logger #日志代码,可以去掉相关代码
from cve_pars import get_file_list,cve_details
def search_string_in_file(file_path, target_string):  
    with open(file_path, 'r') as f:  
        for line in f:  
            if target_string in line:  
                return True  
    return False

def main():
    flag =0 #0代表成功1代表失败
    # 计算昨天日期
    yesterday = datetime.date.today() - datetime.timedelta(days=1)
    # 获取当天的cve数据源
    url = 'https://github.com/CVEProject/cvelistV5/releases/download/cve_{}_at_end_of_day/{}_delta_CVEs_at_end_of_day.zip'.format(yesterday,yesterday)
    if search_string_in_file('cveflawsurls.txt', url):
        print(f"已经下载过,不再重复下载{url}");
        return
    try:   
        r = requests.get(url, timeout=10)
    except:
        print('请求超时')
        Logger.error(f" error:CVE链接请求超时")
    if r.status_code == 200:
        with open(str(yesterday) + '.zip', 'wb') as f:
            f.write(r.content)
        
        # 解压zip包到data目录
        with zipfile.ZipFile(str(yesterday) + '.zip', 'r') as z:
            z.extractall()
            #z.close()
        #zip文件备份到data目录
        try:
            os.rename(str(yesterday) + '.zip', './data/' + str(yesterday) + '.zip')
        except Exception as e:
            print(e)
            Logger.error(f" error:{e}")
        #解析deltaCves中的json文件
        file_list = get_file_list(r'./deltaCves')
        for file in file_list:
            with open(file, 'rb') as f:
                data = json.load(f)
                cve_details(data)
        #删除deltaCves目录
        try:
            shutil.rmtree('./deltaCves')
        except Exception as e:
            print(e)
            Logger.error(f" error:{e}")
    else:
        print('下载链接失败')
        flag = 1
    if flag == 0:
        #如果没有其他错误将下载的链接用覆盖模式存储cveflawurls.txt里面吗,避免重复爬取
        with open('cveflawsurls.txt', 'w') as f:
            f.write(url)

if __name__ == '__main__':
    os.chdir(os.path.split(os.path.realpath(__file__))[0])
    if not os.path.exists('cveflawsurls.txt'):
        with open('cveflawsurls.txt','w') as f:
            f.write('')
    main()
# -*- coding: utf-8 -*-
# @File    : cve_pars.py
import json,os
from filetomysql import sql #这里换成自己写的数据库插入函数
import time

# 提取并打印关键信息
def cve_details(cve_data):
    cve_id = cve_data['cveMetadata']['cveId']
    cve_stat = cve_data['cveMetadata']['state']
    cve_date = ""  #更新日期
    product_name = "" #受影响产品
    refurls = "" #参考链接
    cve_desc = "" #漏洞描述
    problem_type="" #问题类型
    severity="" #严重程度
    cvss_score = "" #CVSS评分
    print("CVE ID:", cve_id)
    print("状态:", cve_stat)
    
    try:
        cve_date = cve_data['cveMetadata']['dateUpdated'][:10]
        # print("更新日期:", cve_date)
    except:
        pass

    try:
        product_name=cve_data['containers']['cna']['affected'][0]['product']
        # print("受影响产品:", product_name)
    except:
        pass

    try:
        cve_desc =cve_data['containers']['cna']['descriptions'][0]['value']
        # print("漏洞描述",cve_desc)
    except:
        pass

    try:
        for ref in cve_data['containers']['cna']['references']:
            refurls+=ref['url']+"\n"    
        # print("参考链接",refurls)
    except:
        pass

    try:
        problem_type=cve_data['containers']['cna']['problemTypes'][0]['descriptions'][0]['description']
        #print("问题类型:",problem_type)
    except:
        pass

    try:
        severity=cve_data['containers']['cna']['metrics'][0]['cvssV3_1']['baseSeverity']
        #print("严重程度:", severity)
    except:
        pass

    try:
        cvss_score=cve_data['containers']['cna']['metrics'][0]['cvssV3_1']['baseScore']
        #print("CVSS评分:", cvss_score)
    except:
        pass
    sql =NewsSeclet()
    sql.cvevulDB(cve_id,cve_stat,severity,product_name,cve_desc,refurls,problem_type,cvss_score,cve_date)


#获取指定目录下面的文件列表
def get_file_list(path):
    file_list = []
    for root, dirs, files in os.walk(path):
        for file in files:
            file_list.append(os.path.join(root, file))
    return file_list


if  __name__ == "__main__":
    file_list = get_file_list(r'./deltaCves')
    for file in file_list:
        #time.sleep(1)
        with open(file, 'rb') as f:
            data = json.load(f)
            cve_details(data)