Python实现SQL注入检测插件实例代码

时间:2021-07-23 02:41:52

扫描器需要实现的功能思维导图

Python实现SQL注入检测插件实例代码

爬虫编写思路

首先需要开发一个爬虫用于收集网站的链接,爬虫需要记录已经爬取的链接和待爬取的链接,并且去重,用 python 的set()就可以解决,大概流程是:

  • 输入 url
  • 下载解析出 url
  • url 去重,判断是否为本站
  • 加入到待爬列表
  • 重复循环

sql 判断思路

  • 通过在 url 后面加上and %d=%d或者or not (%d>%d)
  • %d后面的数字是随机可变的
  • 然后搜索网页中特殊关键词,比如:

mysql 中是 sql syntax.*mysql
microsoft sql server 是 warning.*mssql_
microsoft access 是 microsoft access driver
oracle 是 oracle error
ibm db2 是 db2 sql error
sqlite 是 sqlite.exception
...

通过这些关键词就可以判断出所用的数据库

  • 还需要判断一下 waf 之类的东西,有这种东西就直接停止。简单的方法就是用特定的 url 访问,如果出现了像ip banned,fierwall之类的关键词,可以判断出是waf。具体的正则表达式是(?i)(\a|\b)ip\b.*\b(banned|blocked|bl(a|o)ck\s?list|firewall)
  • 开发准备展开目录

请安装这些库

?
1
2
pip install requests
pip install beautifulsoup4

实验环境是 linux,创建一个code目录,在其中创建一个work文件夹,将其作为工作目录

目录结构

/w8ay.py  // 项目启动主文件
/lib/core // 核心文件存放目录
/lib/core/config.py // 配置文件
/script   // 插件存放
/exp      // exp和poc存放

步骤

sql 检测脚本编写

?
1
2
3
4
5
6
7
8
9
10
dbms_errors = {
  'mysql': (r"sql syntax.*mysql", r"warning.*mysql_.*", r"valid mysql result", r"mysqlclient\."),
  "postgresql": (r"postgresql.*error", r"warning.*\wpg_.*", r"valid postgresql result", r"npgsql\."),
  "microsoft sql server": (r"driver.* sql[\-\_\ ]*server", r"ole db.* sql server", r"(\w|\a)sql server.*driver", r"warning.*mssql_.*", r"(\w|\a)sql server.*[0-9a-fa-f]{8}", r"(?s)exception.*\wsystem\.data\.sqlclient\.", r"(?s)exception.*\wroadhouse\.cms\."),
  "microsoft access": (r"microsoft access driver", r"jet database engine", r"access database engine"),
  "oracle": (r"\bora-[0-9][0-9][0-9][0-9]", r"oracle error", r"oracle.*driver", r"warning.*\woci_.*", r"warning.*\wora_.*"),
  "ibm db2": (r"cli driver.*db2", r"db2 sql error", r"\bdb2_\w+\("),
  "sqlite": (r"sqlite/jdbcdriver", r"sqlite.exception", r"system.data.sqlite.sqliteexception", r"warning.*sqlite_.*", r"warning.*sqlite3::", r"\[sqlite_error\]"),
  "sybase": (r"(?i)warning.*sybase.*", r"sybase message", r"sybase.*server message.*"),
}

通过正则表达式就可以判断出是哪个数据库了

?
1
2
3
for (dbms, regex) in ((dbms, regex) for dbms in dbms_errors for regex in dbms_errors[dbms]):
  if (re.search(regex,_content)):
    return true

下面是我们测试语句的payload

?
1
boolean_tests = (" and %d=%d", " or not (%d=%d)")

用报错语句返回正确的内容和错误的内容进行对比

?
1
2
3
4
5
6
7
8
9
for test_payload in boolean_tests:
  # right page
  randint = random.randint(1, 255)
  _url = url + test_payload % (randint, randint)
  content["true"] = downloader.get(_url)
  _url = url + test_payload % (randint, randint + 1)
  content["false"] = downloader.get(_url)
  if content["origin"] == content["true"] != content["false"]:
    return "sql found: %" % url

这句

?
1
content["origin"] == content["true"] != content["false"]

意思就是当原始网页等于正确的网页不等于错误的网页内容时,就可以判定这个地址存在注入漏洞

完整代码:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import re, random
from lib.core import download
def sqlcheck(url):
  if (not url.find("?")): # pseudo-static page
    return false;
  downloader = download.downloader()
  boolean_tests = (" and %d=%d", " or not (%d=%d)")
  dbms_errors = {
    # regular expressions used for dbms recognition based on error message response
    "mysql": (r"sql syntax.*mysql", r"warning.*mysql_.*", r"valid mysql result", r"mysqlclient\."),
    "postgresql": (r"postgresql.*error", r"warning.*\wpg_.*", r"valid postgresql result", r"npgsql\."),
    "microsoft sql server": (r"driver.* sql[\-\_\ ]*server", r"ole db.* sql server", r"(\w|\a)sql server.*driver", r"warning.*mssql_.*", r"(\w|\a)sql server.*[0-9a-fa-f]{8}", r"(?s)exception.*\wsystem\.data\.sqlclient\.", r"(?s)exception.*\wroadhouse\.cms\."),
    "microsoft access": (r"microsoft access driver", r"jet database engine", r"access database engine"),
    "oracle": (r"\bora-[0-9][0-9][0-9][0-9]", r"oracle error", r"oracle.*driver", r"warning.*\woci_.*", r"warning.*\wora_.*"),
    "ibm db2": (r"cli driver.*db2", r"db2 sql error", r"\bdb2_\w+\("),
    "sqlite": (r"sqlite/jdbcdriver", r"sqlite.exception", r"system.data.sqlite.sqliteexception", r"warning.*sqlite_.*", r"warning.*sqlite3::", r"\[sqlite_error\]"),
    "sybase": (r"(?i)warning.*sybase.*", r"sybase message", r"sybase.*server message.*"),
  }
  _url = url + "%29%28%22%27"
  _content = downloader.get(_url)
  for (dbms, regex) in ((dbms, regex) for dbms in dbms_errors for regex in dbms_errors[dbms]):
    if (re.search(regex,_content)):
      return true
  content = {}
  content['origin'] = downloader.get(_url)
  for test_payload in boolean_tests:
    # right page
    randint = random.randint(1, 255)
    _url = url + test_payload % (randint, randint)
    content["true"] = downloader.get(_url)
    _url = url + test_payload % (randint, randint + 1)
    content["false"] = downloader.get(_url)
    if content["origin"] == content["true"] != content["false"]:
      return "sql found: %" % url

将这个文件命名为sqlcheck.py,放在/script目录中。代码的第 4 行作用是查找 url 是否包含?,如果不包含,比方说伪静态页面,可能不太好注入,因此需要过滤掉

爬虫的编写

爬虫的思路上面讲过了,先完成 url 的管理,我们单独将它作为一个类,文件保存在/lib/core/urlmanager.py

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#-*- coding:utf-8 -*-
 
class urlmanager(object):
  def __init__(self):
    self.new_urls = set()
    self.old_urls = set()
    
  def add_new_url(self, url):
    if url is none:
      return
    if url not in self.new_urls and url not in self.old_urls:
      self.new_urls.add(url)
   
  def add_new_urls(self, urls):
    if urls is none or len(urls) == 0:
      return
    for url in urls:
      self.add_new_url(url)
    
  def has_new_url(self):
    return len(self.new_urls) != 0
   
  def get_new_url(self):
    new_url = self.new_urls.pop()
    self.old_urls.add(new_url)
    return new_url

为了方便,我们也将下载功能单独作为一个类使用,文件保存在lib/core/downloader.py

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#-*- coding:utf-8 -*-
import requests
 
class downloader(object):
  def get(self, url):
    r = requests.get(url, timeout = 10)
    if r.status_code != 200:
      return none
    _str = r.text
    return _str
  
  def post(self, url, data):
    r = requests.post(url, data)
    _str = r.text
    return _str
  
  def download(self, url, htmls):
    if url is none:
      return none
    _str = {}
    _str["url"] = url
    try:
      r = requests.get(url, timeout = 10)
      if r.status_code != 200:
        return none
      _str["html"] = r.text
    except exception as e:
      return none
    htmls.append(_str)

特别说明,因为我们要写的爬虫是多线程的,所以类中有个download方法是专门为多线程下载专用的

在lib/core/spider.py中编写爬虫

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#-*- coding:utf-8 -*-
 
from lib.core import downloader, urlmanager
import threading
from urllib import parse
from urllib.parse import urljoin
from bs4 import beautifulsoup
 
class spidermain(object):
  def __init__(self, root, threadnum):
    self.urls = urlmanager.urlmanager()
    self.download = downloader.downloader()
    self.root = root
    self.threadnum = threadnum
  
  def _judge(self, domain, url):
    if (url.find(domain) != -1):
      return true
    return false
  
  def _parse(self, page_url, content):
    if content is none:
      return
    soup = beautifulsoup(content, 'html.parser')
    _news = self._get_new_urls(page_url, soup)
    return _news
    
  def _get_new_urls(self, page_url, soup):
    new_urls = set()
    links = soup.find_all('a')
    for link in links:
      new_url = link.get('href')
      new_full_url = urljoin(page_url, new_url)
      if (self._judge(self.root, new_full_url)):
        new_urls.add(new_full_url)
    return new_urls
    
  def craw(self):
    self.urls.add_new_url(self.root)
    while self.urls.has_new_url():
      _content = []
      th = []
      for i in list(range(self.threadnum)):
        if self.urls.has_new_url() is false:
          break
        new_url = self.urls.get_new_url()
        
        ## sql check
        try:
          if (sqlcheck.sqlcheck(new_url)):
            print("url:%s sqlcheck is valueable" % new_url)
        except:
          pass
            
        print("craw:" + new_url)
        t = threading.thread(target = self.download.download, args = (new_url, _content))
        t.start()
        th.append(t)
      for t in th:
        t.join()
      for _str in _content:
        if _str is none:
          continue
        new_urls = self._parse(new_url, _str["html"])
        self.urls.add_new_urls(new_urls)

爬虫通过调用craw()方法传入一个网址进行爬行,然后采用多线程的方法下载待爬行的网站,下载之后的源码用_parse方法调用beautifulsoup进行解析,之后将解析出的 url 列表丢入 url 管理器,这样循环,最后只要爬完了网页,爬虫就会停止

threading库可以自定义需要开启的线程数,线程开启后,每个线程会得到一个 url 进行下载,然后线程会阻塞,阻塞完毕后线程放行

爬虫和 sql 检查的结合

在lib/core/spider.py文件引用一下from script import sqlcheck,在craw()方法中,取出新的 url 地方调用一下

?
1
2
3
4
5
6
##sql check
try:
  if(sqlcheck.sqlcheck(new_url)):
    print("url:%s sqlcheck is valueable"%new_url)
except:
  pass

用try检测可能出现的异常,绕过它,在文件w8ay.py中进行测试

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#-*- coding:utf-8 -*-
'''
name: w8ayscan
author: mathor
copyright (c) 2019
'''
import sys
from lib.core.spider import spidermain
def main():
  root = "https://wmathor.com"
  threadnum = 50
  w8 = spidermain(root, threadnum)
  w8.craw()
 
if __name__ == "__main__":
  main()

很重要的一点!为了使得lib和script文件夹中的.py文件可以可以被认作是模块,请在lib、lib/core和script文件夹中创建__init__.py文件,文件中什么都不需要写

总结

sql 注入检测通过一些payload使页面出错,判断原始网页,正确网页,错误网页即可检测出是否存在 sql 注入漏洞
通过匹配出 sql 报错出来的信息,可以正则判断所用的数据库

好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对服务器之家的支持。

原文链接:https://www.wmathor.com/index.php/archives/1191/