扫描器需要实现的功能思维导图
爬虫编写思路
首先需要开发一个爬虫用于收集网站的链接,爬虫需要记录已经爬取的链接和待爬取的链接,并且去重,用 python 的set()就可以解决,大概流程是:
- 输入 url
- 下载解析出 url
- url 去重,判断是否为本站
- 加入到待爬列表
- 重复循环
sql 判断思路
- 通过在 url 后面加上and %d=%d或者or not (%d>%d)
- %d后面的数字是随机可变的
- 然后搜索网页中特殊关键词,比如:
mysql 中是 sql syntax.*mysql
microsoft sql server 是 warning.*mssql_
microsoft access 是 microsoft access driver
oracle 是 oracle error
ibm db2 是 db2 sql error
sqlite 是 sqlite.exception
...
通过这些关键词就可以判断出所用的数据库
- 还需要判断一下 waf 之类的东西,有这种东西就直接停止。简单的方法就是用特定的 url 访问,如果出现了像ip banned,fierwall之类的关键词,可以判断出是waf。具体的正则表达式是(?i)(\a|\b)ip\b.*\b(banned|blocked|bl(a|o)ck\s?list|firewall)
- 开发准备展开目录
请安装这些库
1
2
|
pip install requests
pip install beautifulsoup4
|
实验环境是 linux,创建一个code目录,在其中创建一个work文件夹,将其作为工作目录
目录结构
/w8ay.py // 项目启动主文件
/lib/core // 核心文件存放目录
/lib/core/config.py // 配置文件
/script // 插件存放
/exp // exp和poc存放
步骤
sql 检测脚本编写
1
2
3
4
5
6
7
8
9
10
|
dbms_errors = {
'mysql' : (r "sql syntax.*mysql" , r "warning.*mysql_.*" , r "valid mysql result" , r "mysqlclient\." ),
"postgresql" : (r "postgresql.*error" , r "warning.*\wpg_.*" , r "valid postgresql result" , r "npgsql\." ),
"microsoft sql server" : (r "driver.* sql[\-\_\ ]*server" , r "ole db.* sql server" , r "(\w|\a)sql server.*driver" , r "warning.*mssql_.*" , r "(\w|\a)sql server.*[0-9a-fa-f]{8}" , r "(?s)exception.*\wsystem\.data\.sqlclient\." , r "(?s)exception.*\wroadhouse\.cms\." ),
"microsoft access" : (r "microsoft access driver" , r "jet database engine" , r "access database engine" ),
"oracle" : (r "\bora-[0-9][0-9][0-9][0-9]" , r "oracle error" , r "oracle.*driver" , r "warning.*\woci_.*" , r "warning.*\wora_.*" ),
"ibm db2" : (r "cli driver.*db2" , r "db2 sql error" , r "\bdb2_\w+\(" ),
"sqlite" : (r "sqlite/jdbcdriver" , r "sqlite.exception" , r "system.data.sqlite.sqliteexception" , r "warning.*sqlite_.*" , r "warning.*sqlite3::" , r "\[sqlite_error\]" ),
"sybase" : (r "(?i)warning.*sybase.*" , r "sybase message" , r "sybase.*server message.*" ),
}
|
通过正则表达式就可以判断出是哪个数据库了
1
2
3
|
for (dbms, regex) in ((dbms, regex) for dbms in dbms_errors for regex in dbms_errors[dbms]):
if (re.search(regex,_content)):
return true
|
下面是我们测试语句的payload
1
|
boolean_tests = ( " and %d=%d" , " or not (%d=%d)" )
|
用报错语句返回正确的内容和错误的内容进行对比
1
2
3
4
5
6
7
8
9
|
for test_payload in boolean_tests:
# right page
randint = random.randint( 1 , 255 )
_url = url + test_payload % (randint, randint)
content[ "true" ] = downloader.get(_url)
_url = url + test_payload % (randint, randint + 1 )
content[ "false" ] = downloader.get(_url)
if content[ "origin" ] = = content[ "true" ] ! = content[ "false" ]:
return "sql found: %" % url
|
这句
1
|
content[ "origin" ] = = content[ "true" ] ! = content[ "false" ]
|
意思就是当原始网页等于正确的网页不等于错误的网页内容时,就可以判定这个地址存在注入漏洞
完整代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
import re, random
from lib.core import download
def sqlcheck(url):
if ( not url.find( "?" )): # pseudo-static page
return false;
downloader = download.downloader()
boolean_tests = ( " and %d=%d" , " or not (%d=%d)" )
dbms_errors = {
# regular expressions used for dbms recognition based on error message response
"mysql" : (r "sql syntax.*mysql" , r "warning.*mysql_.*" , r "valid mysql result" , r "mysqlclient\." ),
"postgresql" : (r "postgresql.*error" , r "warning.*\wpg_.*" , r "valid postgresql result" , r "npgsql\." ),
"microsoft sql server" : (r "driver.* sql[\-\_\ ]*server" , r "ole db.* sql server" , r "(\w|\a)sql server.*driver" , r "warning.*mssql_.*" , r "(\w|\a)sql server.*[0-9a-fa-f]{8}" , r "(?s)exception.*\wsystem\.data\.sqlclient\." , r "(?s)exception.*\wroadhouse\.cms\." ),
"microsoft access" : (r "microsoft access driver" , r "jet database engine" , r "access database engine" ),
"oracle" : (r "\bora-[0-9][0-9][0-9][0-9]" , r "oracle error" , r "oracle.*driver" , r "warning.*\woci_.*" , r "warning.*\wora_.*" ),
"ibm db2" : (r "cli driver.*db2" , r "db2 sql error" , r "\bdb2_\w+\(" ),
"sqlite" : (r "sqlite/jdbcdriver" , r "sqlite.exception" , r "system.data.sqlite.sqliteexception" , r "warning.*sqlite_.*" , r "warning.*sqlite3::" , r "\[sqlite_error\]" ),
"sybase" : (r "(?i)warning.*sybase.*" , r "sybase message" , r "sybase.*server message.*" ),
}
_url = url + "%29%28%22%27"
_content = downloader.get(_url)
for (dbms, regex) in ((dbms, regex) for dbms in dbms_errors for regex in dbms_errors[dbms]):
if (re.search(regex,_content)):
return true
content = {}
content[ 'origin' ] = downloader.get(_url)
for test_payload in boolean_tests:
# right page
randint = random.randint( 1 , 255 )
_url = url + test_payload % (randint, randint)
content[ "true" ] = downloader.get(_url)
_url = url + test_payload % (randint, randint + 1 )
content[ "false" ] = downloader.get(_url)
if content[ "origin" ] = = content[ "true" ] ! = content[ "false" ]:
return "sql found: %" % url
|
将这个文件命名为sqlcheck.py,放在/script目录中。代码的第 4 行作用是查找 url 是否包含?,如果不包含,比方说伪静态页面,可能不太好注入,因此需要过滤掉
爬虫的编写
爬虫的思路上面讲过了,先完成 url 的管理,我们单独将它作为一个类,文件保存在/lib/core/urlmanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
#-*- coding:utf-8 -*-
class urlmanager( object ):
def __init__( self ):
self .new_urls = set ()
self .old_urls = set ()
def add_new_url( self , url):
if url is none:
return
if url not in self .new_urls and url not in self .old_urls:
self .new_urls.add(url)
def add_new_urls( self , urls):
if urls is none or len (urls) = = 0 :
return
for url in urls:
self .add_new_url(url)
def has_new_url( self ):
return len ( self .new_urls) ! = 0
def get_new_url( self ):
new_url = self .new_urls.pop()
self .old_urls.add(new_url)
return new_url
|
为了方便,我们也将下载功能单独作为一个类使用,文件保存在lib/core/downloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
#-*- coding:utf-8 -*-
import requests
class downloader( object ):
def get( self , url):
r = requests.get(url, timeout = 10 )
if r.status_code ! = 200 :
return none
_str = r.text
return _str
def post( self , url, data):
r = requests.post(url, data)
_str = r.text
return _str
def download( self , url, htmls):
if url is none:
return none
_str = {}
_str[ "url" ] = url
try :
r = requests.get(url, timeout = 10 )
if r.status_code ! = 200 :
return none
_str[ "html" ] = r.text
except exception as e:
return none
htmls.append(_str)
|
特别说明,因为我们要写的爬虫是多线程的,所以类中有个download方法是专门为多线程下载专用的
在lib/core/spider.py中编写爬虫
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
#-*- coding:utf-8 -*-
from lib.core import downloader, urlmanager
import threading
from urllib import parse
from urllib.parse import urljoin
from bs4 import beautifulsoup
class spidermain( object ):
def __init__( self , root, threadnum):
self .urls = urlmanager.urlmanager()
self .download = downloader.downloader()
self .root = root
self .threadnum = threadnum
def _judge( self , domain, url):
if (url.find(domain) ! = - 1 ):
return true
return false
def _parse( self , page_url, content):
if content is none:
return
soup = beautifulsoup(content, 'html.parser' )
_news = self ._get_new_urls(page_url, soup)
return _news
def _get_new_urls( self , page_url, soup):
new_urls = set ()
links = soup.find_all( 'a' )
for link in links:
new_url = link.get( 'href' )
new_full_url = urljoin(page_url, new_url)
if ( self ._judge( self .root, new_full_url)):
new_urls.add(new_full_url)
return new_urls
def craw( self ):
self .urls.add_new_url( self .root)
while self .urls.has_new_url():
_content = []
th = []
for i in list ( range ( self .threadnum)):
if self .urls.has_new_url() is false:
break
new_url = self .urls.get_new_url()
## sql check
try :
if (sqlcheck.sqlcheck(new_url)):
print ( "url:%s sqlcheck is valueable" % new_url)
except :
pass
print ( "craw:" + new_url)
t = threading.thread(target = self .download.download, args = (new_url, _content))
t.start()
th.append(t)
for t in th:
t.join()
for _str in _content:
if _str is none:
continue
new_urls = self ._parse(new_url, _str[ "html" ])
self .urls.add_new_urls(new_urls)
|
爬虫通过调用craw()方法传入一个网址进行爬行,然后采用多线程的方法下载待爬行的网站,下载之后的源码用_parse方法调用beautifulsoup进行解析,之后将解析出的 url 列表丢入 url 管理器,这样循环,最后只要爬完了网页,爬虫就会停止
threading库可以自定义需要开启的线程数,线程开启后,每个线程会得到一个 url 进行下载,然后线程会阻塞,阻塞完毕后线程放行
爬虫和 sql 检查的结合
在lib/core/spider.py文件引用一下from script import sqlcheck,在craw()方法中,取出新的 url 地方调用一下
1
2
3
4
5
6
|
##sql check
try :
if (sqlcheck.sqlcheck(new_url)):
print ( "url:%s sqlcheck is valueable" % new_url)
except :
pass
|
用try检测可能出现的异常,绕过它,在文件w8ay.py中进行测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#-*- coding:utf-8 -*-
'''
name: w8ayscan
author: mathor
copyright (c) 2019
'''
import sys
from lib.core.spider import spidermain
def main():
root = "https://wmathor.com"
threadnum = 50
w8 = spidermain(root, threadnum)
w8.craw()
if __name__ = = "__main__" :
main()
|
很重要的一点!为了使得lib和script文件夹中的.py文件可以可以被认作是模块,请在lib、lib/core和script文件夹中创建__init__.py文件,文件中什么都不需要写
总结
sql 注入检测通过一些payload使页面出错,判断原始网页,正确网页,错误网页即可检测出是否存在 sql 注入漏洞
通过匹配出 sql 报错出来的信息,可以正则判断所用的数据库
好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对服务器之家的支持。
原文链接:https://www.wmathor.com/index.php/archives/1191/