国家企业信用信息查询工商数据爬虫

时间:2025-02-11 22:30:38

国家企业信用信息公示系统采用了加速乐和极验的反扒,并且即使拿到了cookie后,获取所有详情也很费劲,每一部分数据都是一个请求,算下来一个公司少说也有三四十个请求才能完整地获取所有数据;

主要有5个cookie(__jsluid_h, __jsl_clearance, JSESSIONID, SECTOKEN, tlb_cookie)是多次请求结果运算得出的。
下面附请求步骤及源码:
1、第一次访问/, 得到响应,HTTP状态码521,响应内容为混淆后的js代码:<script>=('_')+('_')+('j')+('s')+('l')+('_')+('c')+('l')+('e')+('a')+('r')+('a')+('n')+('c')+('e')+('=')+((+true)+'')+......m')+('a')+('x')+('-')+('a')+('g')+('e')+('=')+((2^1)+'')+(6+'')+(~~{}+'')+((+[])+'')+(';')+('p')+('a')+('t')+('h')+('=')+('/');=+</script>,处理字符串删掉=前和;后的代码,使用python js引擎执行该段代码会得到__jsl_clearance的值,__jsluid_h 的值在headers[‘Set-Cookie’]取得。
2。带着第一步算出来的两个cookies第二次访问/, 得到响应,HTTP状态码521,响应内容为混淆后的js代码,使用正则处理后取得我们想要的代码段,加密算法有三种,sha1,sha256和md5,直接怼就行。
3,使用第二步算出的更新后的__jsl_clearance的值和第一步的__jsluid_h 的值第三次访问/, 得到响应,HTTP状态码200,第三次请求的headers里会有我们想要的另外三个cookies:JSESSIONID, SECTOKEN, tlb_cookie,拼接好这5个cookie就可以访问拿数据了。下面附请求脚本,注意:全流程需要使用相同的代理ip,cookie和ip是有绑定的。

ex1 = (r'(?<=cookie=).+(?=;)')
ex2 = (r'(?<=;go\().+(?=\)</script>)')


class GetJslCookie:
    """
    获取jsl_cookie,加速乐cookie与ip绑定
    传入调用类的代理,返回jslcookie和代理(如果代理过期有变化返回新代理,后续流程携带新代理进行请求)
    """

    def __init__(self, in_proxy):
         = {
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,"
                      "*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
            "Accept-Encoding": "gzip, deflate",
            "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,zh-TW;q=0.7",
            "Cache-Control": "max-age=0",
            "Upgrade-Insecure-Requests": "1",
            "Referrer": "/",
            "Host": "",
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/86.0.4240.193 Safari/537.36"
        }
         = ProxyGenerator()
         = in_proxy

    @staticmethod
    def get_hash(msg, flag):
        """jsl cookie 计算"""
        if flag == 'md5':
            md5 = hashlib.md5()
            (msg)
            return ()
        elif flag == 'sha1':
            sha1 = hashlib.sha1()
            (msg)
            return ()
        elif flag == 'sha256':
            sha256 = hashlib.sha256()
            (msg)
            return ()
        else:
            raise Exception("get_hash flag err")

    def jsl_cookie(self, url):
        """
        需要进行两次请求才能获取到jsl的cookie
        :param url: url
        :return: None 函数中会对['cookie']进行赋值
        """
        for _ in range(5):
            try:
                if ("cookie"):
                    del ['cookie']
                (f'第{_}次,jsl第一次请求,proxy={}')
                rsp = RequestMethod.ip_requests(url, headers=, proxies=)
                if rsp == -1:
                     = .get_proxy()
                    (f"重新获取proxy={}")
                    (() * 2)
                    continue

                first_js_code = (ex1, )[0]
                js_ctx = (
                    "function cookie() {return " + first_js_code + ";}")
                cookie_1 = js_ctx.call("cookie") + ';' + ('Set-Cookie')
                cookie_dict = {}
                for cook in cookie_1.split(';'):
                    if len(('=')) == 2:
                        k, value = ('=')
                        cookie_dict[()] = ()

                ('给第二次请求的headers构造cookie')
                __jsl_clearance = cookie_dict.get('__jsl_clearance')
                __jsluid_h = cookie_dict.get('__jsluid_h')
                ['cookie'] = f'__jsl_clearance={__jsl_clearance}; __jsluid_h={__jsluid_h}'
                (f'第{_}次,jsl第二次请求,proxy={}')
                rsp = RequestMethod.ip_requests(url, headers=, proxies=)
                if rsp == -1:
                     = .get_proxy()
                    (f"重新获取1proxy={}")
                    (() * 2)
                    continue

                if '' in :
                    (" 第二次还是521")
                     = .get_proxy()
                    del ['cookie']
                    continue
                __jsl_clearance = self.get_jsl_clearance(((ex2, )[0]))
                if __jsl_clearance:
                    (f'第{_}次,获取__jsl_clearance成功')
                    return f'__jsl_clearance={__jsl_clearance}; __jsluid_h={__jsluid_h}', 
            except Exception as exp:
                 = .get_proxy()
                (f'获取__jsl_clearance异常--{exp},eline-{exp.__traceback__.tb_lineno}')

    def get_jsl_clearance(self, data):
        """获取jsl_clearance参数"""
        chars = ('chars')
        bts = ('bts')
        c_t = ('ct')
        h_a = ('ha')
        for chars_1 in chars:
            for chars_2 in chars:
                __jsl_clearance = bts[0] + chars_1 + chars_2 + bts[1]
                if self.get_hash(__jsl_clearance.encode('utf-8'), h_a) == c_t:
                    return __jsl_clearance
        return None