美文网首页
dirsearch源码分析

dirsearch源码分析

作者: AxisX | 来源:发表于2020-01-09 18:01 被阅读0次

1.dirsearch简介

dirsearch 是一款基于python3 的暴力破解目录工具,工具可从github上直接搜索dirsearch找到,
dirsearch的使用是通过命令行获取参数,例如,
python3 dirsearch.py -u <URL> -e <EXTENSION>

可以看到dirsearch文件结构如下:
db(字典列表)、Lib(核心功能模块)、logs(错误日志)、reports(生成报告)、thirdparty(第三方库),changelog.md、dafault.conf、README.md、dirsearch.py

2. dirsearch源码分析

想要对源码分析,首先找到程序入口,即dirsearch.py。

(1)dirsearch.py

#dirsearch.py
import sys

if sys.version_info < (3, 0):
    sys.stdout.write("Sorry, dirsearch requires Python 3.x\n")
    sys.exit(1)

from lib.core import ArgumentParser
from lib.controller import *
from lib.output import *

class Program(object):
    def __init__(self):
        self.script_path = (os.path.dirname(os.path.realpath(__file__)))
        self.arguments = ArgumentParser(self.script_path)
        self.output = CLIOutput()
        self.controller = Controller(self.script_path, self.arguments, self.output)

if __name__ == '__main__':
    main = Program()

这段入口程序首先对python版本进行判断,因为dirsearch是基于python3的,所以如果python不是3.x版本就退出。然后从核心的功能模块中引入ArgumentParser、controller、output。在Program类中,script_path获取了dirsearch工具文件夹在电脑中的路径。如我的路径为:'F:\CTF工具\Web工具\目录扫描\dirsearch-master'。然后将路径传入ArgumentParser中。在此步读入基本配置信息,如db中的dicc.txt文件、线程数量等。并将-u的url参数传给urllist。

(2)output部分

当我们运行程序,可以看到如下界面,第一行的开启标,是来源于Controller模块中的banner文件,并在Controller模块中定义了许多基本参数,并调用CLIOutput.py中的newLine函数等,CLIOutput定义了输出内容。


dirsearch运行开始图.png

运行开始图主要的对应代码如下,

#CLIOutput.py
 def newLine(self, string):
        if self.lastInLine == True:
            self.erase()

        if platform.system() == 'Windows':
            sys.stdout.write(string)
            sys.stdout.flush()
            sys.stdout.write('\n')
            sys.stdout.flush()

        else:
            sys.stdout.write(string + '\n')

        sys.stdout.flush()
        self.lastInLine = False
        sys.stdout.flush()

flush空行,string是errorlog的路径,Controller前面的代码实现了屏幕上相应信息的打印,如下图。

#Controller.py
 try:
            for url in self.arguments.urlList:

                try:
                    gc.collect()
                    self.reportManager = ReportManager()
                    self.currentUrl = url
                    self.output.target(self.currentUrl)

                    try:
                        self.requester = Requester(url, cookie=self.arguments.cookie,
                                                   useragent=self.arguments.useragent,
                                                   maxPool=self.arguments.threadsCount,
                                                   maxRetries=self.arguments.maxRetries, delay=self.arguments.delay,
                                                   timeout=self.arguments.timeout,
                                                   ip=self.arguments.ip, proxy=self.arguments.proxy,
                                                   redirect=self.arguments.redirect,
                                                   requestByHostname=self.arguments.requestByHostname,
                                                   httpmethod=self.httpmethod)
                        self.requester.request("/")

                    except RequestException as e:
                        self.output.error(e.args[0]['message'])
                        raise SkipTargetInterrupt

                    if self.arguments.useRandomAgents:
                        self.requester.setRandomAgents(self.randomAgents)

                    for key, value in arguments.headers.items():
                        self.requester.setHeader(key, value)

                    # Initialize directories Queue with start Path
                    self.basePath = self.requester.basePath

                    if self.arguments.scanSubdirs is not None:
                        for subdir in self.arguments.scanSubdirs:
                            self.directories.put(subdir)

                    else:
                        self.directories.put('')

                    self.setupReports(self.requester)

                    matchCallbacks = [self.matchCallback]
                    notFoundCallbacks = [self.notFoundCallback]
                    errorCallbacks = [self.errorCallback, self.appendErrorLog]

                    self.fuzzer = Fuzzer(self.requester, self.dictionary, testFailPath=self.arguments.testFailPath,
                                         threads=self.arguments.threadsCount, matchCallbacks=matchCallbacks,
                                         notFoundCallbacks=notFoundCallbacks, errorCallbacks=errorCallbacks)
                    try:
                        self.wait()
                    except RequestException as e:
                        self.output.error("Fatal error during site scanning: " + e.args[0]['message'])
                        raise SkipTargetInterrupt

                except SkipTargetInterrupt:
                    continue

                finally:
                    self.reportManager.save()

        except KeyboardInterrupt:
            self.output.error('\nCanceled by the user')
            exit(0)

        finally:
            if not self.errorLog.closed:
                self.errorLog.close()

            self.reportManager.close()

        self.output.warning('\nTask Completed')

循环遍历urllist,对第一个url的target进行发包,头部包含cookie、Useragent、线程数量、代理、重定向、http方式等参数,这些在requester.py中被定义。requester.py在connect部分中,分析见(3)。


发包头部参数.png

(3)connect部分

connect文件夹包含Requester.py、Response.py、RequestException.py(主要代码如下)

#Requester.py
 def request(self, path):
        i = 0
        proxy = None
        result = None

        while i <= self.maxRetries:

            try:
                if self.proxy is not None:
                    proxy = {"https": self.proxy, "http": self.proxy}

                if self.requestByHostname:
                    url = "{0}://{1}:{2}".format(self.protocol, self.host, self.port)

                else:
                    url = "{0}://{1}:{2}".format(self.protocol, self.ip, self.port)

                url = urllib.parse.urljoin(url, self.basePath)

                # Joining with concatenation because a urljoin bug with "::"
                if not url.endswith('/'):
                    url += "/"

                if path.startswith('/'):
                    path = path[1:]

                url += path
#Response.py
    def redirect(self):
        headers = dict((key.lower(), value) for key, value in self.headers.items())
        return headers.get("location")

    @property
    def pretty(self):
        try:
            from BeautifulSoup import BeautifulSoup
        except ImportError:
            raise Exception('BeautifulSoup must be installed to get pretty HTML =(')
        html = BeautifulSoup(self.body)
        return html.prettify()

#RequestException.py
class RequestException(Exception):
    pass

首先创建Headers,定义cookie、useragent、ip、proxy、redirect、httpmethod等参数,接着通过urllib库进行url连接,返回ParseResult(scheme='http', netloc='www.xxx.com(Url)', path='/', params='', query='', fragment=''),对protocol类型进行判断,如果不是以http或https为开头的地址,那么就为其加上默认的https://,如果不存在其protocol默认定义为http,并将url赋值给host,通过socket库的gethostbyname函数对host进行解析,得到其ip。如果没有定义端口号,则通过protocol进行判断,如果是https就默认为443,如果是其他的就默认为80
如果用户传入了cookie、useragent等参数,就改变其默认的None值。

主要的函数为request(self.path),通过将protocol、host、port或者protocol、ip、port进行拼接为url,将path添加到Url后面。将url、proxies、verify、allow_redirects、headers、timeout赋值给response,result = Response(response.status_code, response.reason, response.headers, response.content),交互利用BeautifulSoup进行解析。

整个connect过程用maxRetries进行重复次数控制,如果超过次数报错,就显示connect timeout。

(4)core部分

core部分包含ArgumentParser、Dictionary、Fuzzer、path、ReportManager、Scanner

#ArgumentParser.py
    def parseArguments(self):
        usage = 'Usage: %prog [-u|--url] target [-e|--extensions] extensions [options]'
        parser = OptionParser(usage)
        # Mandatory arguments
        mandatory = OptionGroup(parser, 'Mandatory')
        mandatory.add_option('-u', '--url', help='URL target', action='store', type='string', dest='url', default=None)

ArgumentParser主要对参数选项进行检查判断

#Scanner.py
class Scanner(object):
    def __init__(self, requester, testPath=None, suffix=None):
        if testPath is None or testPath is "":
            self.testPath = RandomUtils.randString()
        else:
            self.testPath = testPath
    def setup(self):
        firstPath = self.testPath + self.suffix
        firstResponse = self.requester.request(firstPath)
        self.invalidStatus = firstResponse.status

        if self.invalidStatus == 404:
            # Using the response status code is enough :-}
            return

        # look for redirects
        secondPath = RandomUtils.randString(omit=self.testPath) + self.suffix
        secondResponse = self.requester.request(secondPath)

        if firstResponse.status in self.redirectStatusCodes and firstResponse.redirect and secondResponse.redirect:
            self.redirectRegExp = self.generateRedirectRegExp(firstResponse.redirect, secondResponse.redirect)

        # Analyze response bodies
        self.dynamicParser = DynamicContentParser(self.requester, firstPath, firstResponse.body, secondResponse.body)

        baseRatio = float("{0:.2f}".format(self.dynamicParser.comparisonRatio))  # Rounding to 2 decimals

        # If response length is small, adjust ratio
        if len(firstResponse) < 2000:
            baseRatio -= 0.1

        if baseRatio < self.ratio:
            self.ratio = baseRatio

    def scan(self, path, response):
        if self.invalidStatus == 404 and response.status == 404:
            return False

        if self.invalidStatus != response.status:
            return True

        redirectToInvalid = False

        if self.redirectRegExp is not None and response.redirect is not None:
            redirectToInvalid = re.match(self.redirectRegExp, response.redirect) is not None
            # If redirection doesn't match the rule, mark as found

            if not redirectToInvalid:
                return True

        ratio = self.dynamicParser.compareTo(response.body)

        if ratio >= self.ratio:
            return False

        elif redirectToInvalid and ratio >= (self.ratio - 0.15):
            return False

        return True

Scanner主要用于分析并存储当前网站对各类无效目录的 HTTP Resonse 的模式。在调用setup() 函数时执行。其使用的路径是一个包含 12 个随机字符的字符串,代码为self.testPath= RandomUtils.randString()。对于请求无效网页,有的HTTP Request返回200,有的返回301/302/307的跳转。
当 Scanner 访问这些随机字符串路径时,如果服务器返回的状态码是 404,则 Scanner 不继续分析,直接返回;如果状态码不是 404,Scanner 会发送第二次请求,依然是随机字符串的路径,分析两次Response Body 的相似度并保存该相似度的浮点值,如果两次 Response 都发生了跳转(301/302/307),那么会为 Location 字段值(URL)生成一个正则。代码如下。

 def generateRedirectRegExp(self, firstLocation, secondLocation):
        if firstLocation is None or secondLocation is None:
            return None

        sm = SequenceMatcher(None, firstLocation, secondLocation)
        marks = []

        for blocks in sm.get_matching_blocks():
            i = blocks[0]
            n = blocks[2]
            # empty block

            if n == 0:
                continue

            mark = firstLocation[i:i + n]
            marks.append(mark)

        regexp = "^.*{0}.*$".format(".*".join(map(re.escape, marks)))
        return regexp

后面发生跳转的location值如要对匹配该正则,当页面相似度不小于当前值时,认为该目录无效。

Scanner 是被 Fuzzer 创建并调用的,Fuzzer 为无后缀斜线目录(/dir)、有后缀斜线目录(/dir/)、用户指定扩展文件(/xx.php、/xx.jsp 等)分别创建了一个 Scanner。Fuzzer是整个测试过程的核心代码,如下

class Fuzzer(object):
    def wait(self, timeout=None):
        for thread in self.threads:
            thread.join(timeout)

            if timeout is not None and thread.is_alive():
                return False

        return True

    def setupScanners(self):
        if len(self.scanners) != 0:
            self.scanners = {}

        self.defaultScanner = Scanner(self.requester, self.testFailPath, "")
        self.scanners['/'] = Scanner(self.requester, self.testFailPath, "/")

        for extension in self.dictionary.extensions:
            self.scanners[extension] = Scanner(self.requester, self.testFailPath, "." + extension)

    def setupThreads(self):
        if len(self.threads) != 0:
            self.threads = []

        for thread in range(self.threadsCount):
            newThread = threading.Thread(target=self.thread_proc)
            newThread.daemon = True
            self.threads.append(newThread)

    def getScannerFor(self, path):
        if path.endswith('/'):
            return self.scanners['/']

        for extension in list(self.scanners.keys()):
            if path.endswith(extension):
                return self.scanners[extension]

        # By default, returns empty tester
        return self.defaultScanner

  def start(self):
        for thread in self.threads:
            thread.start()

        self.play()

Fuzzer遍历整个字典,运行返回的状态经过output处理进行展示。


测试过程图.png

3. dirsearch调试

想要在pycharm等开发工具中调试dirsearch,并不能像往常一样直接开启debug,因为需要从命令行传参,如果没有传参,ArgumentParser中的代码就会爆出URL target is missing, try using -u <url> 等错误从而退出程序。可以自己再写一个调用脚本,并在dirsearch的相应位置下断点,从而进行debug。程序如下

import dirsearch
import sys
sys.argv= ['dirsearch.py', '-u', 'http://xxx.xxx.xxx', '-e', 'jsp']
dirsearch.Program()

sys.argv可以根据dirsearch的使用说明加相应参数。

相关文章

网友评论

      本文标题:dirsearch源码分析

      本文链接:https://www.haomeiwen.com/subject/plfuactx.html