美文网首页
端口扫描器源码分析

端口扫描器源码分析

作者: AxisX | 来源:发表于2020-01-15 11:15 被阅读0次

1.F-NAScan和RAScan简介

F-NAScan是一款基于Python2的端口扫描器,工具可从github上直接搜索F-NAScan找到,此工具只支持对ip进行扫描,不支持域名扫描。RAScan也基于python2,可以从github上直接下载。
F-NAScan的使用是通过命令行获取参数,例如,
python2 F-NAScan.py -h 10.111.1
同样,RAScan的使用方式如下,填写起始ip和结束ip的参数。
python2 RAScan.py 192.168.1.1 192.168.254.254 -t 20

2.F-NAScan源码分析

首先,在F-NAScan工程下有一个server_info.ini,内容如下,包含了常用端口及其对应的服务。

#server_info.ini
ftp|21|^220.*?ftp|^220-
ssh|22|^ssh-
telnet|23|^\xff[\xfa-\xfe]|^\x54\x65\x6c
smtp|25|^220.*?smtp
dns|53|
pop3|110|\+OK.*?pop3
NetBIOS|139|
imap|143|^\* OK.*?imap
ldap|389|
smb|445|
smtps|465|
rsync|873|^@RSYNCD|^@ERROR
imaps|993|
pop3|995|\+OK
proxy|1080|
pptp|1723|
mssql|1433|
oracle|1521|
mysql|3306|^.\0\0\0.*?mysql|^.\0\0\0\n|.*?MariaDB server
rdp|3389|
svn|3690|
PostgreSql|5432|
vnc|5800|^RFB
vnc|5900|^RFB
redis|6379|-ERR|^\$\d+\r\nredis_version
Elasticsearch|9200|
Elasticsearch|9300|
memcached|11211|
mongodb|27017|
mongodb|27018|

在主函数中,也是先对上述配置文件进行了读取,mark_list = read_config('server_info'),read_config具体内容如下。

def read_config(config_type):
    if config_type == 'server_info':
        mark_list=[]
        try:
            config_file = open('server_info.ini','r')
            for mark in config_file:
                name,port,reg = mark.strip().split("|",2)
                mark_list.append([name,port,reg])
            config_file.close()
            return mark_list
        except:
            print('Configuration file read failed')
            exit()

读取server_info.txt,逐行读取,并根据|分隔符进行分割,分别赋值给name、port、reg。
接着主程序接收传入参数,如果传入参数少于两个,就显示usage信息,

Usage: python F-NAScan.py -h 192.168.1 [-p 21,80,3306] [-m 50] [-t 10] [-n]
    try:
        options,args = getopt.getopt(sys.argv[1:],"h:p:m:t:n")
        ip = ''
        noping = False
        port = '21,22,23,25,53,80,110,139,143,389,443,445,465,873,993,995,1080,1723,1433,1521,3306,3389,3690,5432,5800,5900,6379,7001,8000,8001,8080,8081,8888,9200,9300,9080,9999,11211,27017'
        m_count = 100
        for opt,arg in options:
            if opt == '-h':
                ip = arg
            elif opt == '-p':
                port = arg
            elif opt == '-m':
                m_count = int(arg)
            elif opt == '-t':
                timeout = int(arg)
            elif opt == '-n':
                noping = True
        if ip:
            ip_list = get_ip_list(ip)
            port_list = get_port_list(port)
            if not noping:ip_list=get_ac_ip(ip_list)
            for ip_str in ip_list:
                for port_int in port_list:
                    queue.put(':'.join([ip_str,port_int]))
            for i in range(m_count):
                t = ThreadNum(queue)
                t.setDaemon(True)
                t.start()
            t_join(m_count)
            write_result()
    except Exception as e:
        print(e)
        print(msg)

getopt模块用于帮助脚本解析sys.argv中的命令行参数,具体的可查询getopt标准库文档,然后列出所有的端口,最大线程数量设置为100。根据用传入的参数进行复制,-h后面的参数赋给ip,-p后面的赋给端口,-m设为最大线程数量,-t指定HTTP请求超时时间,默认为10秒,端口扫描超时为值的1/2。-n 不进行存活探测(ICMP)直接进行扫描。
如果Ip不为空,就用get_ip_list进行Ip列表赋值,get_ip_list函数如下

def get_ip_list(ip):
    ip_list = []
    iptonum = lambda x:sum([256**j*int(i) for j,i in enumerate(x.split('.')[::-1])])
    numtoip = lambda x: '.'.join([str(x/(256**i)%256) for i in range(3,-1,-1)])
    if '-' in ip:
        ip_range = ip.split('-')
        ip_start = int(iptonum(ip_range[0]))
        ip_end = int(iptonum(ip_range[1]))
        ip_count = ip_end - ip_start
        if ip_count >= 0 and ip_count <= 65536:
            for ip_num in range(ip_start,ip_end+1):
                ip_list.append(numtoip(ip_num))
        else:
            print('-h wrong format')
    elif '.ini' in ip:
        ip_config = open(ip,'r')
        for ip in ip_config:
            ip_list.extend(get_ip_list(ip.strip()))
        ip_config.close()
    else:
        ip_split=ip.split('.')
        net = len(ip_split)
        if net == 2:
            for b in range(1,255):
                for c in range(1,255):
                    ip = "%s.%s.%d.%d"%(ip_split[0],ip_split[1],b,c)
                    ip_list.append(ip)
        elif net == 3:
            for c in range(1,255):
                ip = "%s.%s.%s.%d"%(ip_split[0],ip_split[1],ip_split[2],c)
                ip_list.append(ip)
        elif net ==4:
            ip_list.append(ip)
        else:
            print("-h wrong format")
    return ip_list

如果输入的ip使用-分割则进入第一if条件,如果是.ini文件则进入elif,其他进入else,如果我们正常输入123.123.456.78这种形式的ip,将其根据点号分割放入ip_split列表,如果列表长度为2,就把后两位按照1-255补全生成完整ip,如果列表长度为3,就把最后一位按1-255补全。如果长度为4正常返回该列表。
接着获取端口列表,通过get_port_list函数,函数内容如下

def get_port_list(port):
    port_list = []
    if '.ini' in port:
        port_config = open(port,'r')
        for port in port_config:
            port_list.append(port.strip())
        port_config.close()
    else:
        port_list = port.split(',')
    return port_list

如果用户输入端口参数为Ini文件,就逐行读取并加入列表,如果不是就直接按默认的端口进行逗号分隔插入到port_list列表中,最后返回端口列表。

def get_ac_ip(ip_list):
    try:
        s = Nscan()
        ipPool = set(ip_list)
        return s.mPing(ipPool)
    except:
        print('The current user permissions unable to send icmp packets')
        return ip_list

接着设置ip池

在最大线程数内循环,t=ThreadNum(queue)

class ThreadNum(threading.Thread):
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.queue = queue
    def run(self):
        while True:
            try:
                if queue.empty():break
                queue_task = self.queue.get()
            except:
                break
            try:
                task_host,task_port = queue_task.split(":")
                data = scan_port(task_host,task_port)
                if data:
                    if data!= 'NULL':
                        port_data[task_host + ":" + task_port] = urllib.request.quote(data)
                    server_type = server_discern(task_host,task_port,data)
                    if not server_type:
                        h_server,title = get_web_info(task_host,task_port)
                        if title or h_server:server_type = 'web ' + title
                    if server_type:log('server',task_host,task_port,server_type.strip())
            except Exception as e:
                continue

最终获取的数据通过scan_port函数得到,具体函数如下

def scan_port(host,port):
    try:
        socket.setdefaulttimeout(timeout/2)
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((str(host),int(port)))
        log('portscan',host,port)
    except Exception as e:
        return False
    try:
        data = sock.recv(512)
        sock.close()
        if len(data) > 2:
            return data
        else:
            return 'NULL'
    except Exception as e:
        return 'NULL'

通过用socket库进行host:port链接,并将512大小的数据赋给data,如果data长度大于2直接返回,否则返回Null。然后ThreadNum类再对data进行判断,如果不为空,server_type=server_discern(task_host,task_port,data),再对server_type进行判断,server_discern函数如下

def server_discern(host,port,data):
    server = ''
    for mark_info in mark_list:
        try:
            name,default_port,reg = mark_info
            if int(default_port) == int(port):server = name+"(default)"
            if reg and data!= 'NULL':
                matchObj = re.search(reg,data,re.I|re.M)
                if matchObj:server = name
                if server:
                    return server
        except Exception as e:
            continue
    return server

其中,mark_list=[name,port,reg],对其进行遍历,如果默认端口和传入的端口相等,将name赋给server。函数返回的server值赋给server_type,如果server_type为空,通过get_web_info去获取相关信息,get_web_info函数如下

def get_web_info(host,port):
    h_server,h_xpb,title_str,html = '','','',''
    try:
        info = urllib.request.urlopen("http://%s:%s"%(host,port),timeout=timeout)
        html = info.read()
        header = info.headers
    except urllib.request.HTTPError as e:
        header = e.headers
    except Exception as e:
        return False,False
    if not header:return False,False
    try:
        html_code = get_code(header,html).strip()
        if html_code and len(html_code) < 12:
            html = html.decode(html_code).encode('utf-8')
    except:
        pass
    try:
        port_data[host + ":" + str(port)] = urllib.request.quote(str(header) + "\r\n\r\n" + cgi.escape(html))
        title = re.search(r'<title>(.*?)</title>', html, flags=re.I|re.M)
        if title:title_str=title.group(1)
    except Exception as e:
        pass
    return str(header),title_str

通过Urllib库进行host:port的连接,读取网页内容,并把headers赋给header。接着调用get_code函数,将页面code赋给html_code

def get_code(header,html):
    try:
        m = re.search(r'<meta.*?charset\=(.*?)"(>| |\/)',html, flags=re.I)
        if m:
            return m.group(1).replace('"','')
    except:
        pass
    try:
        if header.has_key('Content-Type'):
            Content_Type = header['Content-Type']
            m = re.search(r'.*?charset\=(.*?)(;|$)',Content_Type,flags=re.I)
            if m:return m.group(1)
    except:
        pass

通过正则表达式对页面进行检索,返回页面的编码方式。如果页面编码方式不为空并且长度小于12,把页面编码方式设为utf-8,然后再次进行网页请求获取页面的title,并将其返回。

3.RAScan源码分析

#main
    usage = "usage: mul_scan.py  192.168.1.1 192.168.1.254 -t 20"
    parser = optparse.OptionParser(usage=usage)
    parser.add_option("-t", "--threads", dest="NUM", help="Maximum threads, default 20")
    parser.add_option("-b", "--start-ip", dest="startIp", help="start_ip")
    parser.add_option("-e", "--end-ip", dest="endIp", help="end_ip")
    (options, args) = parser.parse_args()
    if len(args) < 1:
        parser.print_help()
        sys.exit()
    if options.NUM != None and int(options.NUM) != 0:
        SETTHREAD = int(options.NUM)
    else:
        SETTHREAD = 20

首先显示相关参数信息,然后从命令行接收参数。接着对参数进行判断处理

#main
    startIp = str(options.startIp)
    endIp = str(options.endIp)
    startIp = args[0]
    endIp = args[1]
    lock = threading.Lock()
    # 程序运行时间
    PORT = {80: "web", 8080: "web", 3311: "kangle主机管理系统", 3312: "kangle主机管理系统", 3389: "远程登录",
            4440: "rundeck是用java写的开源工具", 5672: "rabbitMQ", 5900: "vnc", 6082: "varnish", 7001: "weblogic",
            8161: "activeMQ", 8649: "ganglia", 9000: "fastcgi", 9090: "ibm", 9200: "elasticsearch",
            9300: "elasticsearch", 9999: "amg", 10050: "zabbix", 11211: "memcache", 27017: "mongodb", 28017: "mondodb",
            3777: "大华监控设备", 50000: "sap netweaver", 50060: "hadoop", 50070: "hadoop", 21: "ftp", 22: "ssh",
            23: "telnet", 25: "smtp", 53: "dns", 123: "ntp", 161: "snmp", 8161: "snmp", 162: "snmp", 389: "ldap",
            443: "ssl", 512: "rlogin", 513: "rlogin", 873: "rsync", 1433: "mssql", 1080: "socks", 1521: "oracle",
            1900: "bes", 2049: "nfs", 2601: "zebra", 2604: "zebra", 2082: "cpanle", 2083: "cpanle", 3128: "squid",
            3312: "squid", 3306: "mysql", 4899: "radmin", 8834: 'nessus', 4848: 'glashfish'}
    starttime = time.time()
    queue = queue.Queue()
    iplist = ip_range(startIp, endIp)
    print('端口采用默认扫描请自行进行比对:\nbegin Scan ' + str(len(iplist)) + " ip...")
    for i in range(SETTHREAD):
        st1 = threading.Thread(target=scan_open_port_server)
        st1.setDaemon(True)
        st1.start()
    for host in iplist:
        for port in PORT.keys():
            queue.put((host, port))
    queue.join()
    print('All RUN TIME:' + str(time.time() - starttime))

接着接收开始ip和结束ip,这个也是设计不够友好的部分,必须输入两个ip,如果只对单ip进行扫描,要重复输入两遍。

def scan_open_port_server():
    global lock
    while True:
        host, port = queue.get()
        ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        ss.settimeout(2)
        try:
            ss.connect((host, port))
            lock.acquire()
            print("%s 开放端口 %s   %s" % (host, port, PORT[port]))
            lock.release()
            ss.close()
        except:
            pass
        queue.task_done()

接着多线程进行扫描,这部分和F-NAScan的scan_port函数类似

4. F-NAScan和RAScan对比

对于同一个ip地址,扫描结果如下,为保证隐私ip后两位进行打码。


F-NAScan扫描结果.png RAScan扫描结果.png

可以看到两个工具扫描结果基本相同,并且RAScan的展示更友好,扫描更加迅速

两者整体流程的区别:

F-NAScan函数调用关系图 RAScan函数调用关系.png

两者在获取ip和端口的区别:
F-NAScan支持ip(192.168.1.1),ip段(192.168.1),ip范围指定(192.168.1.1-192.168.1.254),ip列表文件(ip.ini)。而RAScan只支持ip段,对于单个Ip要输入两次。F-NAScan支持用户定义端口,RAScan扫描已定义的内置端口。

相关文章

  • 用Python编写一个高效的端口扫描器

    PyPortScanner python多线程端口扫描器。 输出示例: Github 此端口扫描器的源码,文档及详...

  • 端口扫描器源码分析

    1.F-NAScan和RAScan简介 F-NAScan是一款基于Python2的端口扫描器,工具可从github...

  • 基于C++的端口扫描器+源码

    VC++端口扫描器源码,可实现单独扫描指定的一个端口,比如80端口,扫描其是否被占用; 扫描指定范围内的端口,比如...

  • 信息收集系列(一)

    最快的端口扫描器 nmap基本上都知道,masscan知道的人可能会少点,但是它是公认的最快的互联网端口扫描器。可...

  • web扫描器

    web扫描器应该包含哪些部分: 1》端口扫描 (masscan, zmap, nmap) 2》端口指纹扫描 (nm...

  • NettyServer启动流程分析

    详细代码分析见NettyServer源码分析 NettyServer启动过程主要是创建server建立端口绑定监听...

  • 骇客

    扫描器 Nmap扫描器 Nmap是一款针对大型网络的端口扫描工具,同时也使用于单机扫描,它支持Vanilla TC...

  • python脚本解析masscan扫描结果

    masscan简介 开源、免费的端口扫描器,获取主机开放的端口和端口信息。 速度非常快,6分钟可以扫描整个互联网,...

  • 端口扫描器

    使用argparse包,threading包。实现线程扫描,还有信号量来避免乱码和失序。使用screenLock....

  • 【Python与网络2】

    端口扫描 使用Python编写端口扫描器 在Linux系统下ping命令被用于判断与一台主机的网络是否连通,而判断...

网友评论

      本文标题:端口扫描器源码分析

      本文链接:https://www.haomeiwen.com/subject/hekaactx.html