1.dirsearch简介
dirsearch 是一款基于python3 的暴力破解目录工具,工具可从github上直接搜索dirsearch找到,
dirsearch的使用是通过命令行获取参数,例如,
python3 dirsearch.py -u <URL> -e <EXTENSION>
可以看到dirsearch文件结构如下:
db(字典列表)、Lib(核心功能模块)、logs(错误日志)、reports(生成报告)、thirdparty(第三方库),changelog.md、dafault.conf、README.md、dirsearch.py
2. dirsearch源码分析
想要对源码分析,首先找到程序入口,即dirsearch.py。
(1)dirsearch.py
#dirsearch.py
import sys
if sys.version_info < (3, 0):
sys.stdout.write("Sorry, dirsearch requires Python 3.x\n")
sys.exit(1)
from lib.core import ArgumentParser
from lib.controller import *
from lib.output import *
class Program(object):
def __init__(self):
self.script_path = (os.path.dirname(os.path.realpath(__file__)))
self.arguments = ArgumentParser(self.script_path)
self.output = CLIOutput()
self.controller = Controller(self.script_path, self.arguments, self.output)
if __name__ == '__main__':
main = Program()
这段入口程序首先对python版本进行判断,因为dirsearch是基于python3的,所以如果python不是3.x版本就退出。然后从核心的功能模块中引入ArgumentParser、controller、output。在Program类中,script_path获取了dirsearch工具文件夹在电脑中的路径。如我的路径为:'F:\CTF工具\Web工具\目录扫描\dirsearch-master'。然后将路径传入ArgumentParser中。在此步读入基本配置信息,如db中的dicc.txt文件、线程数量等。并将-u的url参数传给urllist。
(2)output部分
当我们运行程序,可以看到如下界面,第一行的开启标,是来源于Controller模块中的banner文件,并在Controller模块中定义了许多基本参数,并调用CLIOutput.py中的newLine函数等,CLIOutput定义了输出内容。
dirsearch运行开始图.png
运行开始图主要的对应代码如下,
#CLIOutput.py
def newLine(self, string):
if self.lastInLine == True:
self.erase()
if platform.system() == 'Windows':
sys.stdout.write(string)
sys.stdout.flush()
sys.stdout.write('\n')
sys.stdout.flush()
else:
sys.stdout.write(string + '\n')
sys.stdout.flush()
self.lastInLine = False
sys.stdout.flush()
flush空行,string是errorlog的路径,Controller前面的代码实现了屏幕上相应信息的打印,如下图。
#Controller.py
try:
for url in self.arguments.urlList:
try:
gc.collect()
self.reportManager = ReportManager()
self.currentUrl = url
self.output.target(self.currentUrl)
try:
self.requester = Requester(url, cookie=self.arguments.cookie,
useragent=self.arguments.useragent,
maxPool=self.arguments.threadsCount,
maxRetries=self.arguments.maxRetries, delay=self.arguments.delay,
timeout=self.arguments.timeout,
ip=self.arguments.ip, proxy=self.arguments.proxy,
redirect=self.arguments.redirect,
requestByHostname=self.arguments.requestByHostname,
httpmethod=self.httpmethod)
self.requester.request("/")
except RequestException as e:
self.output.error(e.args[0]['message'])
raise SkipTargetInterrupt
if self.arguments.useRandomAgents:
self.requester.setRandomAgents(self.randomAgents)
for key, value in arguments.headers.items():
self.requester.setHeader(key, value)
# Initialize directories Queue with start Path
self.basePath = self.requester.basePath
if self.arguments.scanSubdirs is not None:
for subdir in self.arguments.scanSubdirs:
self.directories.put(subdir)
else:
self.directories.put('')
self.setupReports(self.requester)
matchCallbacks = [self.matchCallback]
notFoundCallbacks = [self.notFoundCallback]
errorCallbacks = [self.errorCallback, self.appendErrorLog]
self.fuzzer = Fuzzer(self.requester, self.dictionary, testFailPath=self.arguments.testFailPath,
threads=self.arguments.threadsCount, matchCallbacks=matchCallbacks,
notFoundCallbacks=notFoundCallbacks, errorCallbacks=errorCallbacks)
try:
self.wait()
except RequestException as e:
self.output.error("Fatal error during site scanning: " + e.args[0]['message'])
raise SkipTargetInterrupt
except SkipTargetInterrupt:
continue
finally:
self.reportManager.save()
except KeyboardInterrupt:
self.output.error('\nCanceled by the user')
exit(0)
finally:
if not self.errorLog.closed:
self.errorLog.close()
self.reportManager.close()
self.output.warning('\nTask Completed')
循环遍历urllist,对第一个url的target进行发包,头部包含cookie、Useragent、线程数量、代理、重定向、http方式等参数,这些在requester.py中被定义。requester.py在connect部分中,分析见(3)。
发包头部参数.png
(3)connect部分
connect文件夹包含Requester.py、Response.py、RequestException.py(主要代码如下)
#Requester.py
def request(self, path):
i = 0
proxy = None
result = None
while i <= self.maxRetries:
try:
if self.proxy is not None:
proxy = {"https": self.proxy, "http": self.proxy}
if self.requestByHostname:
url = "{0}://{1}:{2}".format(self.protocol, self.host, self.port)
else:
url = "{0}://{1}:{2}".format(self.protocol, self.ip, self.port)
url = urllib.parse.urljoin(url, self.basePath)
# Joining with concatenation because a urljoin bug with "::"
if not url.endswith('/'):
url += "/"
if path.startswith('/'):
path = path[1:]
url += path
#Response.py
def redirect(self):
headers = dict((key.lower(), value) for key, value in self.headers.items())
return headers.get("location")
@property
def pretty(self):
try:
from BeautifulSoup import BeautifulSoup
except ImportError:
raise Exception('BeautifulSoup must be installed to get pretty HTML =(')
html = BeautifulSoup(self.body)
return html.prettify()
#RequestException.py
class RequestException(Exception):
pass
首先创建Headers,定义cookie、useragent、ip、proxy、redirect、httpmethod等参数,接着通过urllib库进行url连接,返回ParseResult(scheme='http', netloc='www.xxx.com(Url)', path='/', params='', query='', fragment=''),对protocol类型进行判断,如果不是以http或https为开头的地址,那么就为其加上默认的https://,如果不存在其protocol默认定义为http,并将url赋值给host,通过socket库的gethostbyname函数对host进行解析,得到其ip。如果没有定义端口号,则通过protocol进行判断,如果是https就默认为443,如果是其他的就默认为80
如果用户传入了cookie、useragent等参数,就改变其默认的None值。
主要的函数为request(self.path),通过将protocol、host、port或者protocol、ip、port进行拼接为url,将path添加到Url后面。将url、proxies、verify、allow_redirects、headers、timeout赋值给response,result = Response(response.status_code, response.reason, response.headers, response.content),交互利用BeautifulSoup进行解析。
整个connect过程用maxRetries进行重复次数控制,如果超过次数报错,就显示connect timeout。
(4)core部分
core部分包含ArgumentParser、Dictionary、Fuzzer、path、ReportManager、Scanner
#ArgumentParser.py
def parseArguments(self):
usage = 'Usage: %prog [-u|--url] target [-e|--extensions] extensions [options]'
parser = OptionParser(usage)
# Mandatory arguments
mandatory = OptionGroup(parser, 'Mandatory')
mandatory.add_option('-u', '--url', help='URL target', action='store', type='string', dest='url', default=None)
ArgumentParser主要对参数选项进行检查判断
#Scanner.py
class Scanner(object):
def __init__(self, requester, testPath=None, suffix=None):
if testPath is None or testPath is "":
self.testPath = RandomUtils.randString()
else:
self.testPath = testPath
def setup(self):
firstPath = self.testPath + self.suffix
firstResponse = self.requester.request(firstPath)
self.invalidStatus = firstResponse.status
if self.invalidStatus == 404:
# Using the response status code is enough :-}
return
# look for redirects
secondPath = RandomUtils.randString(omit=self.testPath) + self.suffix
secondResponse = self.requester.request(secondPath)
if firstResponse.status in self.redirectStatusCodes and firstResponse.redirect and secondResponse.redirect:
self.redirectRegExp = self.generateRedirectRegExp(firstResponse.redirect, secondResponse.redirect)
# Analyze response bodies
self.dynamicParser = DynamicContentParser(self.requester, firstPath, firstResponse.body, secondResponse.body)
baseRatio = float("{0:.2f}".format(self.dynamicParser.comparisonRatio)) # Rounding to 2 decimals
# If response length is small, adjust ratio
if len(firstResponse) < 2000:
baseRatio -= 0.1
if baseRatio < self.ratio:
self.ratio = baseRatio
def scan(self, path, response):
if self.invalidStatus == 404 and response.status == 404:
return False
if self.invalidStatus != response.status:
return True
redirectToInvalid = False
if self.redirectRegExp is not None and response.redirect is not None:
redirectToInvalid = re.match(self.redirectRegExp, response.redirect) is not None
# If redirection doesn't match the rule, mark as found
if not redirectToInvalid:
return True
ratio = self.dynamicParser.compareTo(response.body)
if ratio >= self.ratio:
return False
elif redirectToInvalid and ratio >= (self.ratio - 0.15):
return False
return True
Scanner主要用于分析并存储当前网站对各类无效目录的 HTTP Resonse 的模式。在调用setup() 函数时执行。其使用的路径是一个包含 12 个随机字符的字符串,代码为self.testPath= RandomUtils.randString()。对于请求无效网页,有的HTTP Request返回200,有的返回301/302/307的跳转。
当 Scanner 访问这些随机字符串路径时,如果服务器返回的状态码是 404,则 Scanner 不继续分析,直接返回;如果状态码不是 404,Scanner 会发送第二次请求,依然是随机字符串的路径,分析两次Response Body 的相似度并保存该相似度的浮点值,如果两次 Response 都发生了跳转(301/302/307),那么会为 Location 字段值(URL)生成一个正则。代码如下。
def generateRedirectRegExp(self, firstLocation, secondLocation):
if firstLocation is None or secondLocation is None:
return None
sm = SequenceMatcher(None, firstLocation, secondLocation)
marks = []
for blocks in sm.get_matching_blocks():
i = blocks[0]
n = blocks[2]
# empty block
if n == 0:
continue
mark = firstLocation[i:i + n]
marks.append(mark)
regexp = "^.*{0}.*$".format(".*".join(map(re.escape, marks)))
return regexp
后面发生跳转的location值如要对匹配该正则,当页面相似度不小于当前值时,认为该目录无效。
Scanner 是被 Fuzzer 创建并调用的,Fuzzer 为无后缀斜线目录(/dir)、有后缀斜线目录(/dir/)、用户指定扩展文件(/xx.php、/xx.jsp 等)分别创建了一个 Scanner。Fuzzer是整个测试过程的核心代码,如下
class Fuzzer(object):
def wait(self, timeout=None):
for thread in self.threads:
thread.join(timeout)
if timeout is not None and thread.is_alive():
return False
return True
def setupScanners(self):
if len(self.scanners) != 0:
self.scanners = {}
self.defaultScanner = Scanner(self.requester, self.testFailPath, "")
self.scanners['/'] = Scanner(self.requester, self.testFailPath, "/")
for extension in self.dictionary.extensions:
self.scanners[extension] = Scanner(self.requester, self.testFailPath, "." + extension)
def setupThreads(self):
if len(self.threads) != 0:
self.threads = []
for thread in range(self.threadsCount):
newThread = threading.Thread(target=self.thread_proc)
newThread.daemon = True
self.threads.append(newThread)
def getScannerFor(self, path):
if path.endswith('/'):
return self.scanners['/']
for extension in list(self.scanners.keys()):
if path.endswith(extension):
return self.scanners[extension]
# By default, returns empty tester
return self.defaultScanner
def start(self):
for thread in self.threads:
thread.start()
self.play()
Fuzzer遍历整个字典,运行返回的状态经过output处理进行展示。
测试过程图.png
3. dirsearch调试
想要在pycharm等开发工具中调试dirsearch,并不能像往常一样直接开启debug,因为需要从命令行传参,如果没有传参,ArgumentParser中的代码就会爆出URL target is missing, try using -u <url> 等错误从而退出程序。可以自己再写一个调用脚本,并在dirsearch的相应位置下断点,从而进行debug。程序如下
import dirsearch
import sys
sys.argv= ['dirsearch.py', '-u', 'http://xxx.xxx.xxx', '-e', 'jsp']
dirsearch.Program()
sys.argv可以根据dirsearch的使用说明加相应参数。
网友评论