需要 Python 3.4+,一个参数用来选择测试搜索服务还是 GAE 服务。测试 GAE 服务的话需要先修改开头的两个变量。从标准输入读取 IP 地址或者 IP 段(形如 192.168.0.0/16)列表,每行一个。可用 IP 输出到标准输出。实时测试结果输出到标准错误。50 线程并发。
checkgoogleip
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
#!/usr/bin/env python3
import sys
from ipaddress import IPv4Network
import http.client as client
from concurrent.futures import ThreadPoolExecutor
import argparse
import ssl
import socket
# 先按自己的情况修改以下几行
APP_ID = 'your_id_here'
APP_PATH = '/fetch.py'
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations( '/etc/ssl/certs/ca-certificates.crt' )
class HTTPSConnection(client.HTTPSConnection):
def __init__( self , * args, hostname = None , * * kwargs):
self ._hostname = hostname
super ().__init__( * args, * * kwargs)
def connect( self ):
super (client.HTTPSConnection, self ).connect()
if self ._tunnel_host:
server_hostname = self ._tunnel_host
else :
server_hostname = self ._hostname or self .host
sni_hostname = server_hostname if ssl.HAS_SNI else None
self .sock = self ._context.wrap_socket( self .sock,
server_hostname = sni_hostname)
if not self ._context.check_hostname and self ._check_hostname:
try :
ssl.match_hostname( self .sock.getpeercert(), server_hostname)
except Exception:
self .sock.shutdown(socket.SHUT_RDWR)
self .sock.close()
raise
def check_ip_p(ip, func):
if func(ip):
print (ip, flush = True )
def check_for_gae(ip):
return _check(APP_ID + '.appspot.com' , APP_PATH, ip)
def check_for_search(ip):
return _check( 'www.google.com' , '/' , ip)
def _check(host, path, ip):
for chance in range ( 1 , - 1 , - 1 ):
try :
conn = HTTPSConnection(
ip, timeout = 5 ,
context = context,
hostname = host,
)
conn.request( 'GET' , path, headers = {
'Host' : host,
})
response = conn.getresponse()
if response.status < 400 :
print ( 'GOOD:' , ip, file = sys.stderr)
else :
raise Exception( 'HTTP Error %s %s' % (
response.status, response.reason))
return True
except KeyboardInterrupt:
raise
except Exception as e:
if isinstance (e, ssl.CertificateError):
print ( 'WARN: %s is not Google\'s!' % ip, file = sys.stderr)
chance = 0
if chance = = 0 :
print ( 'BAD :' , ip, e, file = sys.stderr)
return False
else :
print ( 'RE :' , ip, e, file = sys.stderr)
def main():
parser = argparse.ArgumentParser(description = 'Check Google IPs' )
parser.add_argument( 'service' , choices = [ 'search' , 'gae' ],
help = 'service to check' )
args = parser.parse_args()
func = globals ()[ 'check_for_' + args.service]
count = 0
with ThreadPoolExecutor(max_workers = 50 ) as executor:
for l in sys.stdin:
l = l.strip()
if '/' in l:
for ip in IPv4Network(l).hosts():
executor.submit(check_ip_p, str (ip), func)
count + = 1
else :
executor.submit(check_ip_p, l, func)
count + = 1
print ( '%d IP checked.' % count)
if __name__ = = '__main__' :
main()
|