1.F-NAScan和RAScan简介
F-NAScan是一款基于Python2的端口扫描器,工具可从github上直接搜索F-NAScan找到,此工具只支持对ip进行扫描,不支持域名扫描。RAScan也基于python2,可以从github上直接下载。
F-NAScan的使用是通过命令行获取参数,例如,
python2 F-NAScan.py -h 10.111.1
同样,RAScan的使用方式如下,填写起始ip和结束ip的参数。
python2 RAScan.py 192.168.1.1 192.168.254.254 -t 20
2.F-NAScan源码分析
首先,在F-NAScan工程下有一个server_info.ini,内容如下,包含了常用端口及其对应的服务。
#server_info.ini
ftp|21|^220.*?ftp|^220-
ssh|22|^ssh-
telnet|23|^\xff[\xfa-\xfe]|^\x54\x65\x6c
smtp|25|^220.*?smtp
dns|53|
pop3|110|\+OK.*?pop3
NetBIOS|139|
imap|143|^\* OK.*?imap
ldap|389|
smb|445|
smtps|465|
rsync|873|^@RSYNCD|^@ERROR
imaps|993|
pop3|995|\+OK
proxy|1080|
pptp|1723|
mssql|1433|
oracle|1521|
mysql|3306|^.\0\0\0.*?mysql|^.\0\0\0\n|.*?MariaDB server
rdp|3389|
svn|3690|
PostgreSql|5432|
vnc|5800|^RFB
vnc|5900|^RFB
redis|6379|-ERR|^\$\d+\r\nredis_version
Elasticsearch|9200|
Elasticsearch|9300|
memcached|11211|
mongodb|27017|
mongodb|27018|
在主函数中,也是先对上述配置文件进行了读取,mark_list = read_config('server_info'),read_config具体内容如下。
def read_config(config_type):
if config_type == 'server_info':
mark_list=[]
try:
config_file = open('server_info.ini','r')
for mark in config_file:
name,port,reg = mark.strip().split("|",2)
mark_list.append([name,port,reg])
config_file.close()
return mark_list
except:
print('Configuration file read failed')
exit()
读取server_info.txt,逐行读取,并根据|分隔符进行分割,分别赋值给name、port、reg。
接着主程序接收传入参数,如果传入参数少于两个,就显示usage信息,
Usage: python F-NAScan.py -h 192.168.1 [-p 21,80,3306] [-m 50] [-t 10] [-n]
try:
options,args = getopt.getopt(sys.argv[1:],"h:p:m:t:n")
ip = ''
noping = False
port = '21,22,23,25,53,80,110,139,143,389,443,445,465,873,993,995,1080,1723,1433,1521,3306,3389,3690,5432,5800,5900,6379,7001,8000,8001,8080,8081,8888,9200,9300,9080,9999,11211,27017'
m_count = 100
for opt,arg in options:
if opt == '-h':
ip = arg
elif opt == '-p':
port = arg
elif opt == '-m':
m_count = int(arg)
elif opt == '-t':
timeout = int(arg)
elif opt == '-n':
noping = True
if ip:
ip_list = get_ip_list(ip)
port_list = get_port_list(port)
if not noping:ip_list=get_ac_ip(ip_list)
for ip_str in ip_list:
for port_int in port_list:
queue.put(':'.join([ip_str,port_int]))
for i in range(m_count):
t = ThreadNum(queue)
t.setDaemon(True)
t.start()
t_join(m_count)
write_result()
except Exception as e:
print(e)
print(msg)
getopt模块用于帮助脚本解析sys.argv中的命令行参数,具体的可查询getopt标准库文档,然后列出所有的端口,最大线程数量设置为100。根据用传入的参数进行复制,-h后面的参数赋给ip,-p后面的赋给端口,-m设为最大线程数量,-t指定HTTP请求超时时间,默认为10秒,端口扫描超时为值的1/2。-n 不进行存活探测(ICMP)直接进行扫描。
如果Ip不为空,就用get_ip_list进行Ip列表赋值,get_ip_list函数如下
def get_ip_list(ip):
ip_list = []
iptonum = lambda x:sum([256**j*int(i) for j,i in enumerate(x.split('.')[::-1])])
numtoip = lambda x: '.'.join([str(x/(256**i)%256) for i in range(3,-1,-1)])
if '-' in ip:
ip_range = ip.split('-')
ip_start = int(iptonum(ip_range[0]))
ip_end = int(iptonum(ip_range[1]))
ip_count = ip_end - ip_start
if ip_count >= 0 and ip_count <= 65536:
for ip_num in range(ip_start,ip_end+1):
ip_list.append(numtoip(ip_num))
else:
print('-h wrong format')
elif '.ini' in ip:
ip_config = open(ip,'r')
for ip in ip_config:
ip_list.extend(get_ip_list(ip.strip()))
ip_config.close()
else:
ip_split=ip.split('.')
net = len(ip_split)
if net == 2:
for b in range(1,255):
for c in range(1,255):
ip = "%s.%s.%d.%d"%(ip_split[0],ip_split[1],b,c)
ip_list.append(ip)
elif net == 3:
for c in range(1,255):
ip = "%s.%s.%s.%d"%(ip_split[0],ip_split[1],ip_split[2],c)
ip_list.append(ip)
elif net ==4:
ip_list.append(ip)
else:
print("-h wrong format")
return ip_list
如果输入的ip使用-分割则进入第一if条件,如果是.ini文件则进入elif,其他进入else,如果我们正常输入123.123.456.78这种形式的ip,将其根据点号分割放入ip_split列表,如果列表长度为2,就把后两位按照1-255补全生成完整ip,如果列表长度为3,就把最后一位按1-255补全。如果长度为4正常返回该列表。
接着获取端口列表,通过get_port_list函数,函数内容如下
def get_port_list(port):
port_list = []
if '.ini' in port:
port_config = open(port,'r')
for port in port_config:
port_list.append(port.strip())
port_config.close()
else:
port_list = port.split(',')
return port_list
如果用户输入端口参数为Ini文件,就逐行读取并加入列表,如果不是就直接按默认的端口进行逗号分隔插入到port_list列表中,最后返回端口列表。
def get_ac_ip(ip_list):
try:
s = Nscan()
ipPool = set(ip_list)
return s.mPing(ipPool)
except:
print('The current user permissions unable to send icmp packets')
return ip_list
接着设置ip池
在最大线程数内循环,t=ThreadNum(queue)
class ThreadNum(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
try:
if queue.empty():break
queue_task = self.queue.get()
except:
break
try:
task_host,task_port = queue_task.split(":")
data = scan_port(task_host,task_port)
if data:
if data!= 'NULL':
port_data[task_host + ":" + task_port] = urllib.request.quote(data)
server_type = server_discern(task_host,task_port,data)
if not server_type:
h_server,title = get_web_info(task_host,task_port)
if title or h_server:server_type = 'web ' + title
if server_type:log('server',task_host,task_port,server_type.strip())
except Exception as e:
continue
最终获取的数据通过scan_port函数得到,具体函数如下
def scan_port(host,port):
try:
socket.setdefaulttimeout(timeout/2)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((str(host),int(port)))
log('portscan',host,port)
except Exception as e:
return False
try:
data = sock.recv(512)
sock.close()
if len(data) > 2:
return data
else:
return 'NULL'
except Exception as e:
return 'NULL'
通过用socket库进行host:port链接,并将512大小的数据赋给data,如果data长度大于2直接返回,否则返回Null。然后ThreadNum类再对data进行判断,如果不为空,server_type=server_discern(task_host,task_port,data),再对server_type进行判断,server_discern函数如下
def server_discern(host,port,data):
server = ''
for mark_info in mark_list:
try:
name,default_port,reg = mark_info
if int(default_port) == int(port):server = name+"(default)"
if reg and data!= 'NULL':
matchObj = re.search(reg,data,re.I|re.M)
if matchObj:server = name
if server:
return server
except Exception as e:
continue
return server
其中,mark_list=[name,port,reg],对其进行遍历,如果默认端口和传入的端口相等,将name赋给server。函数返回的server值赋给server_type,如果server_type为空,通过get_web_info去获取相关信息,get_web_info函数如下
def get_web_info(host,port):
h_server,h_xpb,title_str,html = '','','',''
try:
info = urllib.request.urlopen("http://%s:%s"%(host,port),timeout=timeout)
html = info.read()
header = info.headers
except urllib.request.HTTPError as e:
header = e.headers
except Exception as e:
return False,False
if not header:return False,False
try:
html_code = get_code(header,html).strip()
if html_code and len(html_code) < 12:
html = html.decode(html_code).encode('utf-8')
except:
pass
try:
port_data[host + ":" + str(port)] = urllib.request.quote(str(header) + "\r\n\r\n" + cgi.escape(html))
title = re.search(r'<title>(.*?)</title>', html, flags=re.I|re.M)
if title:title_str=title.group(1)
except Exception as e:
pass
return str(header),title_str
通过Urllib库进行host:port的连接,读取网页内容,并把headers赋给header。接着调用get_code函数,将页面code赋给html_code
def get_code(header,html):
try:
m = re.search(r'<meta.*?charset\=(.*?)"(>| |\/)',html, flags=re.I)
if m:
return m.group(1).replace('"','')
except:
pass
try:
if header.has_key('Content-Type'):
Content_Type = header['Content-Type']
m = re.search(r'.*?charset\=(.*?)(;|$)',Content_Type,flags=re.I)
if m:return m.group(1)
except:
pass
通过正则表达式对页面进行检索,返回页面的编码方式。如果页面编码方式不为空并且长度小于12,把页面编码方式设为utf-8,然后再次进行网页请求获取页面的title,并将其返回。
3.RAScan源码分析
#main
usage = "usage: mul_scan.py 192.168.1.1 192.168.1.254 -t 20"
parser = optparse.OptionParser(usage=usage)
parser.add_option("-t", "--threads", dest="NUM", help="Maximum threads, default 20")
parser.add_option("-b", "--start-ip", dest="startIp", help="start_ip")
parser.add_option("-e", "--end-ip", dest="endIp", help="end_ip")
(options, args) = parser.parse_args()
if len(args) < 1:
parser.print_help()
sys.exit()
if options.NUM != None and int(options.NUM) != 0:
SETTHREAD = int(options.NUM)
else:
SETTHREAD = 20
首先显示相关参数信息,然后从命令行接收参数。接着对参数进行判断处理
#main
startIp = str(options.startIp)
endIp = str(options.endIp)
startIp = args[0]
endIp = args[1]
lock = threading.Lock()
# 程序运行时间
PORT = {80: "web", 8080: "web", 3311: "kangle主机管理系统", 3312: "kangle主机管理系统", 3389: "远程登录",
4440: "rundeck是用java写的开源工具", 5672: "rabbitMQ", 5900: "vnc", 6082: "varnish", 7001: "weblogic",
8161: "activeMQ", 8649: "ganglia", 9000: "fastcgi", 9090: "ibm", 9200: "elasticsearch",
9300: "elasticsearch", 9999: "amg", 10050: "zabbix", 11211: "memcache", 27017: "mongodb", 28017: "mondodb",
3777: "大华监控设备", 50000: "sap netweaver", 50060: "hadoop", 50070: "hadoop", 21: "ftp", 22: "ssh",
23: "telnet", 25: "smtp", 53: "dns", 123: "ntp", 161: "snmp", 8161: "snmp", 162: "snmp", 389: "ldap",
443: "ssl", 512: "rlogin", 513: "rlogin", 873: "rsync", 1433: "mssql", 1080: "socks", 1521: "oracle",
1900: "bes", 2049: "nfs", 2601: "zebra", 2604: "zebra", 2082: "cpanle", 2083: "cpanle", 3128: "squid",
3312: "squid", 3306: "mysql", 4899: "radmin", 8834: 'nessus', 4848: 'glashfish'}
starttime = time.time()
queue = queue.Queue()
iplist = ip_range(startIp, endIp)
print('端口采用默认扫描请自行进行比对:\nbegin Scan ' + str(len(iplist)) + " ip...")
for i in range(SETTHREAD):
st1 = threading.Thread(target=scan_open_port_server)
st1.setDaemon(True)
st1.start()
for host in iplist:
for port in PORT.keys():
queue.put((host, port))
queue.join()
print('All RUN TIME:' + str(time.time() - starttime))
接着接收开始ip和结束ip,这个也是设计不够友好的部分,必须输入两个ip,如果只对单ip进行扫描,要重复输入两遍。
def scan_open_port_server():
global lock
while True:
host, port = queue.get()
ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ss.settimeout(2)
try:
ss.connect((host, port))
lock.acquire()
print("%s 开放端口 %s %s" % (host, port, PORT[port]))
lock.release()
ss.close()
except:
pass
queue.task_done()
接着多线程进行扫描,这部分和F-NAScan的scan_port函数类似
4. F-NAScan和RAScan对比
对于同一个ip地址,扫描结果如下,为保证隐私ip后两位进行打码。
F-NAScan扫描结果.png RAScan扫描结果.png
可以看到两个工具扫描结果基本相同,并且RAScan的展示更友好,扫描更加迅速
两者整体流程的区别:
两者在获取ip和端口的区别:
F-NAScan支持ip(192.168.1.1),ip段(192.168.1),ip范围指定(192.168.1.1-192.168.1.254),ip列表文件(ip.ini)。而RAScan只支持ip段,对于单个Ip要输入两次。F-NAScan支持用户定义端口,RAScan扫描已定义的内置端口。
网友评论