目录爆破工具dirsearch源码分析

发布时间 2023-09-18 16:07:53作者: ShuoYS

 dirsearch介绍

dirsearch介绍. dirsearch是一个基于python的命令行工具,用于暴力扫描页面结构,包括网页中的目录和文件。

目录结构

 

程序源码

1.主程序dirsearch.py

主程序入口查看,首先获取脚本所在的绝对路径,然后在ArgumentParser对象中对需要的参数赋值,根据-q参数确定数据输出的模式,在Controller()类中执行整个扫描过程。

 1 class Program(object):
 2     def __init__(self):
 3         #获取脚本路径所在目录的路径,例如从'e:/dirsearch/dirsearch.py'中获取到'e:/dersearch'
 4         self.script_path = os.path.dirname(os.path.realpath(__file__)) 
 5         #ArgumentParser类中首先赋值默认参数,然后从命令行中获取用户输入的参数进行赋值
 6         self.arguments = ArgumentParser(self.script_path) 
 7         #根据-q参数是否存在选择数据输出的模式,默认选择CLIOutput
 8         if self.arguments.quiet: 
 9             self.output = PrintOutput(self.arguments.color) #安静模式输出
10         else:
11             self.output = CLIOutput(self.arguments.color) #冗长模式输出
12         #定义Controller对象,执行整个扫描过程
13         self.controller = Controller(self.script_path, self.arguments, self.output) #传入目录路径,参数,输出模式
14 #程序入口
15 if __name__ == "__main__":
16     main = Program()

2.ArgumentParser()类

2.1 __init__()

进入ArgumentParser类中,位于lib.core.argument_parser.py。(由于其中多为相同的赋值与判断操作,大部分进行省略),执行代码中,首先调用函数获取默认的配置文件default.conf的信息,然后获取命令行用户输入的参数值并赋值

1 from lib.core.argument_parser import ArgumentParser
2 #class ArgumentParser(object):
3 def __init__(self, script_path):
4     self.script_path = script_path #目录路径赋值
5     self.parse_config() #获取默认配置文件的值
6     options = self.parse_arguments() #获取命令行用户输入的参数值

2.2 parse_config()

进入parse_config()函数,(其中存在大量重复操作,只进行简单描述),获取默认配置文件的参数并赋值

 1 #lib\core\argument_parser.py
 2 #def parse_config(self):
 3 #用于读取和写入配置文件。它提供了一种简单且易于使用的接口来解析INI样式的配置文件。
 4 config = ConfigParser()
 5 #连接目录路径和配置文件名得到默认配置文件路径
 6 config_path = FileUtils.build_path(self.script_path, "default.conf") 
 7 #读取default.conf配置文件信息
 8 config.read(config_path) 
 9 #获取默认参数并赋值
10 self.default_extensions = config.safe_get("mandatory", "default-extensions", str())
11 self.threads_count = config.safe_getint("general", "threads", 30, list(range(1, 300)))
12 self.output_location = config.safe_get("reports", "report-output-folder", None)
13 self.prefixes = config.safe_get("dictionary", "prefixes", None)
14 self.useragent = config.safe_get("request", "user-agent", "")
15 self.delay = config.safe_getfloat("connection", "delay", 0)
16 /*...*/

2.3 parse_arguments()

进入parse_arguments()函数,(其中存在大量重复操作,只进行简单描述),对用户可以操作的参数进行设置

 1 ##lib\core\argument_parser.py
 2 from optparse import OptionParser, OptionGroup #使用optpars进行参数设置
 3 #def parse_arguments(self): 
 4 usage = "Usage: %prog [-u|--url] target [-e|--extensions] extensions [options]"
 5 parser = OptionParser(usage, version="dirsearch v0.4.2",epilog="""...""")
 6 
 7 mandatory = OptionGroup(parser, "Mandatory")
 8 mandatory.add_option("-u", "--url", help="Target URL", action="store", type="string", dest="url", default=None)
 9 /*...*/
10 dictionary = OptionGroup(parser, "Dictionary Settings")
11 dictionary.add_option(...)
12 /*...*/
13 general = OptionGroup(parser, "General Settings")
14 general.add_option(...)
15 /*...*/
16 request = OptionGroup(parser, "Request Settings")
17 request.add_option(...)
18 /*...*/
19 connection = OptionGroup(parser, "Connection Settings")
20 connection.add_option(...)
21 /*...*/
22 reports = OptionGroup(parser, "Reports")
23 reports.add_option(...)
24 /*...*/
25 parser.add_option_group(mandatory)
26 parser.add_option_group(dictionary)
27 parser.add_option_group(general)
28 parser.add_option_group(request)
29 parser.add_option_group(connection)
30 parser.add_option_group(reports)
31 options, arguments = parser.parse_args() #从命令行中获取用户输入的参数值
32 return options #返回用户参数值

2.4. __init__()

回到__init__()继续查看,对参数进行赋值

 1 self.quiet = options.quiet
 2 self.full_url = options.full_url
 3 self.url_list = []
 4 self.raw_file = None
 5 
 6 #判断参数是否存在,存在时执行相应代码
 7 if options.url:
 8     self.url_list = [options.url]
 9 elif options.url_list:
10     file = self.access_file(options.url_list, "file contains URLs") 
11     self.url_list = list(file.get_lines())
12 elif options.cidr:
13     self.url_list = iprange(options.cidr) 
14 elif options.stdin_urls:
15     self.url_list = sys.stdin.read().splitlines()
16 elif options.raw_file:
17     self.access_file(options.raw_file, "file with raw request") 
18     self.raw_file = options.raw_file
19 
20 #判断url目标列表是否为空,为空时要求输入目标url
21 if not len(self.url_list): 
22     print("URL target is missing, try using -u <url>")
23     exit(1)
24             
25 self.url_list = uniq(self.url_list) #去除url_list中重复的url目标
26 
27 /*...*/ #判断某些参数值判断是否为空,不为空时执行相应代码进行检查和处理
28 
29 
30 #对无需处理的参数直接赋值
31 self.lowercase = options.lowercase
32 self.uppercase = options.uppercase
33 /*...*/

3. CLIOutput()类

回到主程序,选择进入默认输出CLIOutput()中查看(PrintOutput()输出模式在之后进行简单分析),进行了基本的参数赋值

 1 #lib\output\verbose_output.py
 2 #class CLIOutput(object):
 3 def __init__(self, color):
 4         self.last_length = 0
 5         self.last_output = ""
 6         self.last_in_line = False
 7         self.mutex = Lock()
 8         self.blacklists = {} #存放blacklists字典
 9         self.base_path = None
10         self.errors = 0
11         self.colorizer = ColorOutput(color) #定义ColorOutput()对象

4. ColorOutput()类

4.1 __init__()

进行颜色定义

 1 #lib\output\colors.py
 2 #class ColorOutput(object):
 3 def __init__(self, colors=True):
 4     self.colors = colors
 5     self.fore_table = {
 6         "red": Fore.RED,
 7         "green": Fore.GREEN,
 8         "yellow": Fore.YELLOW,
 9         "blue": Fore.BLUE,
10         "magenta": Fore.MAGENTA,
11         "cyan": Fore.CYAN,
12         "white": Fore.WHITE
13     } #前景色可选项
14     self.back_table = {...} #背景色可选项
15     self.escape_seq = None
16     self.prepare_sequence_escaper() #调用自身函数,自定义一个匹配规则
17     init()

4.2 prepare_sequence_escaper()

进入函数prepare_sequence_escaper(),作用为定义一个匹配规则

1 #lib\output\colors.py
2 #def prepare_sequence_escaper(self):
3 ESC = Literal("\x1b") #匹配ESC字符
4 integer = Word("0123456789") #匹配0-9的数字
5 alphas = list("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz") #匹配任意字母
6 self.escape_seq = Combine(
7     ESC + "[" + Optional(delimitedList(integer, ";")) + oneOf(alphas)
8 ) #合并以匹配相应的字符串

解释:

ESC = Literal('\x1b')
integer = Word(nums)
escapeSeq = Combine(ESC + '[' + Optional(delimitedList(integer,';')) +
        oneOf(list(alphas)))

nonAnsiString = lambda s : Suppress(escapeSeq).transformString(s)

unColorString = nonAnsiString('\x1b[1m0.0\x1b[0m')
print unColorString, len(unColorString)

输出:0.0 3

可以从字符串escapeSeq+string+escapeSeq样式的字符创中提取出string字符串,例如上述代码中从‘\x1b[1m’+’0.0‘+’\x1b[0m‘的规则字符串中提取出了0.0

prepare_sequence_escaper()执行完后回到ColorOutput(),ColorOutput()执行完后回到了CLIOutput(),CLIOutput()执行完后又回到了主程序dirsearch.py。执行self.controller = Controller(self.script_path, self.arguments, self.output)进入到Controller()类中

5. Controller()类

5.1 __init__()

Controller()类中__init__方法中为整个脚本最主要的控制代码,包括了参数处理,扫描开始和结果输出整个过程

 1 #lib\controller\controller.py
 2 #class Controller(object):
 3 def __init__(self, script_path, arguments, output):
 4     global VERSION
 5     #加载banner头和版本信息,目录中有一个自定义的banner.txt文件作为启动dirsearch.py后的标志头输出
 6     program_banner = (
 7         open(FileUtils.build_path(script_path, "banner.txt"))
 8         .read()
 9         .format(**VERSION)
10     )
11 
12     self.directories = Queue() #使用队列存放信息
13     self.script_path = script_path
14     self.arguments = arguments
15     self.output = output
16     self.pass_dirs = ["/"] #定义目录分割符号

然后对用户输入的参数值进行处理和赋值

 1 if arguments.raw_file: #raw_file参数存在时加载指定request请求模板
 2     raw = Raw(arguments.raw_file, arguments.scheme) #调用类方法对request请求包进行处理
 3     self.url_list = [raw.url] #获取request请求包中的url赋值
 4     self.httpmethod = raw.method #获取request请求包中的请求方法赋值
 5     self.data = raw.body #获取request请求包中的请求体赋值
 6     self.headers = raw.headers #获取request请求包中的请求头赋值
 7 else: #没有使用request请求模板使用默认请求头
 8     default_headers = {
 9         "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36",
10         "Accept-Language": "*",
11         "Accept-Encoding": "*",
12         "Keep-Alive": "timeout=15, max=1000",
13         "Cache-Control": "max-age=0",
14     }
15     self.url_list = arguments.url_list #获取目标列表
16     self.httpmethod = arguments.httpmethod.lower() #将request请求方法小写
17     self.data = arguments.data
18     self.headers = {**default_headers, **arguments.headers}
19     #设置身份验证cookie和UA
20     if arguments.cookie:
21         self.headers["Cookie"] = arguments.cookie
22     if arguments.useragent:
23         self.headers["User-Agent"] = arguments.useragent
24 
25 self.recursion_depth = arguments.recursion_depth
26 
27 #加载日志文件路径,若用户指定了日志文件路径则获取文件路径值,否则使用默认日志文件的路径
28 if arguments.logs_location and self.validate_path(arguments.logs_location):
29     self.logs_path = FileUtils.build_path(arguments.logs_location)
30 elif self.validate_path(self.script_path):
31     self.logs_path = FileUtils.build_path(self.script_path, "logs")
32     if not FileUtils.exists(self.logs_path):
33         FileUtils.create_directory(self.logs_path)
34 
35 #加载输出报告路径,若用户指定了报告输出文件路径则获取文件路径值,否则使用默认报告输出的文件路径
36 if arguments.output_location and self.validate_path(arguments.output_location):
37     self.report_path = FileUtils.build_path(arguments.output_location)
38 elif self.validate_path(self.script_path):
39     self.report_path = FileUtils.build_path(self.script_path, "reports")
40     if not FileUtils.exists(self.report_path):
41         FileUtils.create_directory(self.report_path)
42 
43 #参数赋值
44 #调用generate_blacklists()函数加载黑名单字典,其中存储的是各个响应码的黑名单数组数据
45 self.blacklists = Dictionary.generate_blacklists(arguments.extensions, self.script_path) 
46 #其余参数直接赋值
47 self.extensions = arguments.extensions
48 self.prefixes = arguments.prefixes
49 /*...*/ 
50 
51 #将字典定义为对象
52 self.dictionary = Dictionary(...)
53 
54 #计算工作量
55 self.jobs_count = len(self.url_list) * (
56     len(self.scan_subdirs) if self.scan_subdirs else 1
57 )
58 
59 /*...*/ #参数直接赋值
60 
61 #将以下参数定义为对象
62 self.report_manager = EmptyReportManager()
63 self.report = EmptyReport()
64 self.timer = EmptyTimer()
65 
66 self.output.header(program_banner) #输出banner头和版本信息
67 self.print_config() #输出使用的配置信息
68 
69 #使用use_random_agents参数则加载user-agents.txt字典
70 if arguments.use_random_agents:
71     self.random_agents = FileUtils.get_lines(
72         FileUtils.build_path(script_path, "db", "user-agents.txt") #随机获取UA字段
73     )
74 
75 if arguments.autosave_report or arguments.output_file:
76     self.setup_reports() #批量创建文件夹
77 
78 self.setup_error_logs() #创建错误日志的文件夹
79 self.output.error_log_file(self.error_log_path) #输出错误日志的文件路径
80 
81 #若使用了maxtime参数则开启时间计数
82 if self.maxtime:
83     threading.Thread(target=self.time_monitor, daemon=True).start()

以上为基本参数的赋值和值处理情况,其中调用了其他模块的函数,现在分别进入分析

5.1.1 Raw()

5.1.1.1 __init__()

raw = Raw(arguments.raw_file, arguments.scheme) #调用类方法对request请求包进行处理,进入Raw查看,加载request请求包文件后调用自身parse()函数,对request请求包进行解析,从中获取需要的数据,包括请求行、请求头、请求体等

1 #lib\core\raw.py
2 #class Raw(object):
3 def __init__(self, raw_file, scheme):
4     #读取用户指定的request文件路径
5     with File(raw_file) as raw_content:
6         self.raw_content = raw_content.read()
7     self.scheme = scheme #通过scheme参数确定用户使用的方案
8     self.parse()
5.1.1.2 parse()

进入parse()函数查看,对request请求包进行解析,从中获取需要的数据,包括请求行、请求头、请求体等

 1 #lib\core\raw.py
 2 #class Raw(object):
 3 # Parse for 2 situations: \n as a newline or \r\n as a newline #要求使用\n或\r\n分隔请求包中的每一行
 4 self.parsed = self.raw_content.split("\n\n") #分割request请求行、头、数据
 5 if len(self.parsed) == 1: #数组长度为1则分割失败
 6     self.parsed = self.raw_content.split("\r\n\r\n") #分割失败则重新分割
 7 self.startline = self.parsed[0].splitlines()[0]
 8 #最终分隔成请求行、请求头、请求体3部分
 9 try:
10     self.headers_parser = HeadersParser(self.parsed[0].splitlines()[1:]) #获取请求头
11 except Exception:
12     print("Invalid headers in the raw request")
13     exit(1)
14 
15 try:
16     self.body = self.parsed[1] if self.parsed[1] else None #获取请求体
17 except IndexError:
18     self.body = None
19 
20 try:
21     self.host = self.headers_parser.lower_headers["host"].strip() #从请求头中获取ip值
22 except KeyError:
23     print("Can't find the Host header in the raw request")
24     exit(1)
25     
26 self.path = self.startline.split(" ")[1] #用空格分割请求行,从中获取url的路径

6.1.2 validate_path()

if arguments.logs_location and self.validate_path(arguments.logs_location)加载文件时大量使用validate_path(),进入查看,为基本的检查文件路径是否有效,路径有效时才加该路径下文件

 1 #lib\controller\controller.py
 2 if not FileUtils.exists(path):
 3     self.output.error("{0} does not exist".format(path))
 4     exit(1)
 5 if FileUtils.exists(path) and not FileUtils.is_dir(path):
 6     self.output.error("{0} is a file, should be a directory".format(path))
 7     exit(1)
 8 if not FileUtils.can_write(path):
 9     self.output.error("Directory {0} is not writable".format(path))
10     exit(1)
11 return True

5.1.3 generate_blacklists()

self.blacklists = Dictionary.generate_blacklists(arguments.extensions, self.script_path),改函数对{status}_blacklist.txt字典中的数据进行处理和获取,并返回供其他函数使用

 1 #lib\core\dictionary.py
 2 #class Dictionary(object):
 3 @staticmethod #该装饰符能使类中的方法直接调用而不用先定义对象,使用Dictionary.generate_blacklists()直接调用方法
 4 def generate_blacklists(extensions, script_path):
 5     reext = re.compile(r"\%ext\%", re.IGNORECASE).sub
 6     blacklists = {}
 7     for status in [400, 403, 500]:
 8         blacklist_file_name = FileUtils.build_path(script_path, "db")
 9         blacklist_file_name = FileUtils.build_path(
10             blacklist_file_name, "{}_blacklist.txt".format(status)
11         )
12         if not FileUtils.can_read(blacklist_file_name):
13             #文件无法读取时跳过并继续循环
14             continue
15         blacklists[status] = [] #使用数组储存各状态码文件{status}_blacklist.txt中的数据
16         for line in FileUtils.get_lines(blacklist_file_name): #获取字典中的数据
17                 #跳过以'#'号开头的行
18                 if line.lstrip().startswith("#"):
19                     continue
20                 #若存在'/'则只取/后的部分
21                 if line.startswith("/"):
22                     line = line[1:]
23                 #若存在%ext%则替换成extensions参数的值(php,asp等),将数据存入数组
24                 if "%ext%" in line.lower():
25                     for extension in extensions:
26                         entry = reext.sub(extension, line)
27                         blacklists[status].append(entry)
28                 else:
29                     blacklists[status].append(line)
30    return blacklists #返回字典

5.1.4 print_config()

self.print_config()其中调用了lib\output\verbose_output.py中CLIOutput()类的config方法,最终确认该方法为在窗口中输出配置信息

1 #lib\controller\controller.py
2 #调用了lib\output\verbose_output.py中CLIOutput()类的config方法
3 self.output.config(', '.join(self.extensions),', '.join(self.prefixes),', '.join(self.suffixes),str(self.threads_count),str(len(self.dictionary)),str(self.httpmethod),)

进入config方法查看,其对参数进行了赋值,并调用了print_header()方法

1 #lib\output\verbose_output.py
2 def config(self,extensions,prefixes,suffixes,threads,wordlist_size,method,):
3         config = {}
4         config["Extensions"] = extensions
5         /*...*/#省略其他赋值语句
6         self.print_header(config)

查看print_header(),确认为构造需要输出的语句msg,并将相应字符附上特定颜色,然后调用new_line()函数输出语句msg

 1 #lib\output\verbose_output.py
 2 #class CLIOutput(object):
 3 def print_header(self, entries):
 4         l, _ = get_terminal_size() #获取系统窗口大小
 5         msg = "" #定义一个需要输出空msg
 6         #构造msg输出语句
 7         for i, entry in enumerate(entries.items()):
 8             key, value = entry
 9             msg += self.colorizer.color(key + ": ", fore="yellow", bright=True)
10             msg += self.colorizer.color(value, fore="cyan", bright=True)
11             if i == len(entries) - 1:
12                 break
13             last_line_len = len(self.colorizer.clean_color(msg.splitlines()[-1]))
14             if last_line_len + 3 >= l:
15                 msg += "\n"
16             else:
17                 msg += self.colorizer.color(" | ", fore="magenta", bright=True)
18         self.new_line(msg) #调用new_line()函数输出

查看new_line(),进行语句输出和清楚缓存

 1 #lib\output\verbose_output.py
 2 #class CLIOutput(object):
 3 def new_line(self, string=''):
 4         if self.last_in_line:
 5             self.erase()
 6         if sys.platform in ["win32", "msys"]:
 7             sys.stdout.write(string)
 8             sys.stdout.flush()
 9             sys.stdout.write("\n")
10             sys.stdout.flush()
11         else:
12             sys.stdout.write(string + "\n")
13         sys.stdout.flush()
14         self.last_in_line = False
15         sys.stdout.flush()

5.1.5 setup_reports()

setup_reports()中存在大量其他函数的调用,暂时不进入分析,直接标注其作用。确认setup_reports()函数为输出报告的相关设置和赋值

 1 #lib\controller\controller.py
 2 #class Controller(object):
 3 def setup_reports(self):
 4         if self.output_file:
 5             output_file = FileUtils.get_abs_path(self.output_file)
 6             self.output.output_file(output_file) #调用,输出报告文件路径
 7         else:
 8             if len(self.url_list) > 1: #当需要爆破的url数量大于1时
 9                 self.setup_batch_reports() #根据时间批量创建文件夹
10                 filename = "BATCH"
11                 filename += self.get_output_extension() #检测后缀是否合法,默认为txt格式,构造输出报告的文件名
12                 directory_path = self.batch_directory_path 
13             else: #否则直接获取当前时间创建文件
14                 parsed = urlparse(self.url_list[0])
15                 filename = (
16                     "{}_".format(parsed.path)
17                 )
18                 filename += time.strftime("%y-%m-%d_%H-%M-%S")
19                 filename += self.get_output_extension()
20                 directory_path = FileUtils.build_path(
21                     self.report_path, clean_filename(parsed.netloc)
22                 )
23 
24             filename = clean_filename(filename) #检查文件名,存在非法字符时将非法字符替换成'-'
25             output_file = FileUtils.build_path(directory_path, filename)
26             #多个输出文件默认使用1/2/3...的排序
27             if FileUtils.exists(output_file):
28                 i = 2
29                 while FileUtils.exists(output_file + "_" + str(i)):
30                     i += 1
31                 output_file += "_" + str(i)
32             if not FileUtils.exists(directory_path):
33                 FileUtils.create_directory(directory_path)
34                 if not FileUtils.exists(directory_path):
35                     self.output.error(
36                         "Couldn't create the reports folder at {}".format(directory_path)
37                     )
38                     sys.exit(1)
39             self.output.output_file(output_file) #输出输出文件的路径
40             
41         if self.output_file and self.output_format: #判断用户是否输入了参数,进行对应的对象创建
42             self.report_manager = ReportManager(self.output_format, self.output_file)
43         elif self.output_format:
44             self.report_manager = ReportManager(self.output_format, output_file)
45         else:
46             self.report_manager = ReportManager("plain", output_file)

其中调用了lib\output\verbose_output.py中的CLIOutput()类中的两个方法,作用调用new_line函数,刚分析过new_line是在窗口中输出,outfile和errlog的文件路径

1 #lib\output\verbose_output.py
2 #class CLIOutput(object):
3 def output_file(self, target):
4         self.new_line("\nOutput File: {0}".format(target))
5 def error_log_file(self, target):
6         self.new_line("\nError Log: {0}".format(target))

5.2. __init__()

回到Controller()类的初始化方法中,以上为扫描前的基础准备(输入,输出,存储设置),接着后续代码分析。循环遍历需要爆破的url列表,for循环中,首先主要代码为调用Request类设置发送request请求的参数,然后开启线程,在线程调用Fuzzer类中的start()开始扫描爆破

 1 try:
 2     for url in self.url_list: #遍历目标url列表
 3         try:
 4             gc.collect() #每次循环回收不再使用的内存空间,减少消耗
 5             url = url if url.endswith("/") else url + "/" #在url结尾添加反斜杠,127.0.0.1=>127.0.0.1/
 6             self.output.set_target(url, arguments.scheme) #检查构造正确的url目标http://ip:端口/
 7             try:
 8                 #定义请求类,设置发送请求的参数
 9                 self.requester = Requester(url, #目标url
10                             max_pool=arguments.threads_count, #线程数
11                             max_retries=arguments.max_retries, #失败后最大重试次数
12                             timeout=arguments.timeout, #超时时间
13                             ip=arguments.ip, #ip地址
14                             proxy=arguments.proxy, #代理
15                             proxylist=arguments.proxylist, #代理列表
16                             redirect=arguments.redirect,
17                             request_by_hostname=arguments.request_by_hostname, #通过域名发送请求
18                             httpmethod=self.httpmethod, #请求方式
19                             data=self.data, #请求数据包模板
20                             scheme=arguments.scheme,
21                             )
22 
23                 for key, value in self.headers.items():
24                     self.requester.set_header(key, value) #设置请求头
25 
26                 if arguments.auth: #参数存在时使用用户输入的身份令牌
27                     self.requester.set_auth(arguments.auth_type, arguments.auth)
28                     
29                 self.requester.request("") #若服务开启,则发起测试请求
30 
31                 if arguments.autosave_report or arguments.output_file:
32                     self.report = Report(self.requester.host,self.requester.port, self.requester.protocol, self.requester.base_path)
33 
34             except RequestException as e:
35                 self.output.error(e.args[0]["message"])
36                 raise SkipTargetInterrupt
37 
38             if arguments.use_random_agents: #参数存在时每次请求都使用随机UA
39                 self.requester.set_random_agents(self.random_agents)
40 
41             # Initialize directories Queue with start Path
42             self.base_path = self.requester.base_path
43             self.status_skip = None
44 
45             if not self.scan_subdirs:
46                 self.directories.put("")
47 
48             for subdir in self.scan_subdirs:
49                 self.directories.put(subdir)
50                 self.pass_dirs.append(subdir)
51 
52             match_callbacks = [self.match_callback]
53             not_found_callbacks = [self.not_found_callback]
54             error_callbacks = [self.error_callback, self.append_error_log]
55 
56             #爆破参数设置
57             self.fuzzer = Fuzzer(
58                         self.requester,
59                         self.dictionary,
60                         suffixes=arguments.suffixes,
61                         prefixes=arguments.prefixes,
62                         exclude_response=arguments.exclude_response,
63                         threads=arguments.threads_count,
64                         delay=arguments.delay,
65                         maxrate=arguments.maxrate,
66                         match_callbacks=match_callbacks,
67                         not_found_callbacks=not_found_callbacks,
68                         error_callbacks=error_callbacks,
69                         )
70             try:
71                 self.prepare()
72             except RequestException as e:
73                 self.output.error(e.args[0]["message"])
74                 raise SkipTargetInterrupt
75 
76         except SkipTargetInterrupt:
77             self.report.completed = True
78             continue
79 
80 except KeyboardInterrupt:
81     self.output.error("\nCanceled by the user")
82     exit(0)
83 
84 finally:
85     self.error_log.close()
86 
87  self.output.warning("\nTask Completed")

6. Requester()类

进入request类中查看,设置请求基本的参数并构造合理的url

 1 #lib\connection\requester.py
 2 #class Requester(object):
 3 def __init__(...):
 4         self.httpmethod = httpmethod
 5         self.data = data
 6         self.headers = {}
 7         parsed = urlparse(url)
 8         # url若没有指定协议,默认使用http解析
 9         if "://" not in url:
10             parsed = urlparse("{0}://{1}".format(scheme, url)) #构造url并解析然后返回对象
11         elif parsed.scheme not in ["https", "http"]:
12             raise RequestException({"message": "Unsupported URL scheme: {0}".format(parsed.scheme)})
13 
14         self.base_path = parsed.path
15         #去除url开头的'/'
16         if parsed.path.startswith("/"):
17             self.base_path = parsed.path[1:]
18 
19         self.base_path = safequote(self.base_path) #对特殊字符进行url编码,防止特殊字符导致错误
20         self.protocol = parsed.scheme #获取协议
21         self.host = parsed.netloc.split(":")[0] #获取主机域名值
22 
23         if ip:
24             self.ip = ip
25         elif not proxy and not proxylist:
26             try:
27                 self.ip = socket.gethostbyname(self.host) #通过域名请求dns解析出ip值
28             except socket.gaierror:
29                 #检查主机名是否仅解析为IPv6地址
30                 try:
31                     self.ip = socket.gethostbyname(self.host, None, socket.AF_INET6)
32                 except socket.gaierror:
33                     raise RequestException({"message": "Couldn't resolve DNS"})
34 
35         # 没有端口则设置默认端口 (80, 443)
36         try:
37             self.port = int(parsed.netloc.split(":")[1])
38         except IndexError:
39             self.port = 443 if self.protocol == "https" else 80
40         except ValueError:
41             raise RequestException({"message": "Invalid port number: {0}".format(parsed.netloc.split(":")[1])})
42 
43         self.headers["Host"] = self.host #设置请求头中的host
44 
45         #检查端口是否标准,若不标准设置为标准格式
46         if (self.protocol == "https" and self.port != 443) or (
47             self.protocol == "http" and self.port != 80
48         ):
49             self.headers["Host"] += ":{0}".format(self.port)
50 
51         /*...*/ #参数赋值self.max_retries = max_retries
52         
53         self.url = "{0}://{1}:{2}/".format(self.protocol,self.host if self.request_by_hostname else self.ip,self.port,)
54         self.base_url = "{0}://{1}:{2}/".format(self.protocol,self.host,self.port,)
55         self.set_adapter() #调用set_adapter()函数,设置请求重试的最大次数

7. Controller()类

然后调用了Fuzzer(),设置和赋值爆破目录需要的参数(因为最终的扫描函数在Fuzzer()类中被调用执行),然后调用自身函数self.prepare()

 1 #lib\controller\controller.py
 2 #class Controller(object):
 3 def prepare(self):
 4         #开始循环,直到字典为空停止循环
 5         while not self.directories.empty(): 
 6             gc.collect()
 7             self.current_job += 1 #活跃的工作数+1
 8             self.index = 0
 9             self.current_directory = self.directories.get()
10             self.output.warning("[{1}] Starting: {0}".format(self.current_directory, time.strftime("%H:%M:%S"))) #输出扫描开始信息
11             self.fuzzer.requester.base_path = self.output.base_path = self.base_path + self.current_directory #获取爆破路径=url+目录路径
12             self.fuzzer.start() #调用fuzzer类中的函数开始扫描
13             self.process_paths() #线程相关
14         self.report.completed = True #扫描完成

8. Fuzzer()

8.1 start()

然后调用了Fuzzer()中的start()

 1 #lib\core\fuzzer.py
 2 #class Fuzzer(object):
 3 def start(self):
 4     self.setup_scanners() #调用自身函数setup_scanners()
 5     self.setup_threads() #调用自身的setup_threads(),其中利用线程开始爆破扫描
 6     self.index = 0
 7     self.rate = 0
 8     self.stand_rate = 0
 9     self.dictionary.reset()
10     self.running_threads_count = len(self.threads)
11     self.running = True
12     self.paused = False
13     self.play_event = threading.Event()
14     self.paused_semaphore = threading.Semaphore(0)
15     self.play_event.clear()
16     for thread in self.threads:
17         thread.start()
18     threading.Thread(target=self.rate_adjuster, daemon=True).start()
19     self.play()

8.2 setup_scanners()

根据用户输入的前缀和后缀以及扩展名参数依次执行循环,循环中定义Scanner()类并执行其初始化函数__init__()

 1 #lib\core\fuzzer.py
 2 #class Fuzzer(object):
 3 def setup_scanners(self):
 4         if len(self.scanners):
 5             self.scanners = {"prefixes": {},"suffixes": {},}
 6         #一定会执行的默认扫描,url和子目录路径的直接拼接
 7         self.default_scanner = Scanner(self.requester) #若存在泛解析会获得一个相似值
 8         self.prefixes.append(".")
 9         self.suffixes.append("/")
10         #存在前缀添加前缀,如admin=>testadmin
11         for prefix in self.prefixes:
12             self.scanners["prefixes"][prefix] = Scanner(
13                 self.requester, prefix=prefix, tested=self.scanners
14             )
15         #存在后缀添加后缀,如admin=>admintest
16         for suffix in self.suffixes:
17             self.scanners["suffixes"][suffix] = Scanner(
18                 self.requester, suffix=suffix, tested=self.scanners
19             )
20         #存在扩展名添加扩展名,如admin=>admin.php
21         for extension in self.dictionary.extensions:
22             if "." + extension not in self.scanners["suffixes"]:
23                 self.scanners["suffixes"]["." + extension] = Scanner(
24                     self.requester, suffix="." + extension, tested=self.scanners
25                 )
26         #通过该参数路径下的响应内容排除目标,若用户给出的响应内容与请求得到的响应内容相同则排除该请求目标
27         if self.exclude_response:
28             if self.exclude_response.startswith("/"):
29                 self.exclude_response = self.exclude_response[1:]
30             self.calibration = Scanner(
31                 self.requester, calibration=self.exclude_response, tested=self.scanners
32             )

其中根据prefixes、suffixes、extensions和exclude_response执行了5次Scanner(),若url请求不存在泛解析,则prefixes、suffixes、extensions的3次请求将返回404;若存在泛解析,两次请求的响应内容相似度会很高,同样,若存在"not found"却重定向到固定页面,两次请求的响应内容相似度也会很高。通过相似度能确定是否存在url通配符解析问题

9. Scanner()

9.1 __init__()

回到Fuzzer(),之后调用了scanner()类中初始化进行了参数赋值,然后调用自身的setup()函数

1 #lib\core\scanner.py
2 #class Scanner(object):
3 def __init__(self, requester, calibration=None, suffix=None, prefix=None, tested=None):
4         /*...*/ #参数赋值
5         self.setup() #调用该函数进行扫描

9.2 setup()

查看setup()函数,其中duplicate值及相关代码在exclude_response参数存在时才执行,if first_response.redirect and second_response.redirect:下的代码在页面存在重定向时才执行

 1 #lib\core\scanner.py
 2 #class Scanner(object):
 3 def setup(self):
 4         #构造需求的url地址
 5         first_path = self.prefix + (
 6             self.calibration if self.calibration else rand_string()
 7         ) + self.suffix
 8         #发送请求获取响应
 9         first_response = self.requester.request(first_path)
10         self.response = first_response
11         #响应状态码为404时直接返回,确认该目录路径不存在
12         if self.response.status == 404:
13             # Using the response status code is enough :-}
14             return
15         #当tested值不为空且first_response响应内容与self.calibration内容相同时返回tested
16         duplicate = self.duplicate(first_response)
17         if duplicate:
18             #已经显示相同的响应内容,将tested中的值赋值给自身对应参数
19             self.ratio = duplicate.ratio
20             self.dynamic_parser = duplicate.dynamic_parser
21             self.redirect_parser = duplicate.redirect_parser
22             self.sign = duplicate.sign
23             return
24         #构造第二条路径
25         second_path = self.prefix + (
26             self.calibration if self.calibration else rand_string(omit=first_path)
27         ) + self.suffix
28         #发起第二次请求
29         second_response = self.requester.request(second_path)
30         #存响应在重定向则调用generate_redirect_reg_exp函数,使用唯一的符号定位重定向中反映路径的位置
31         if first_response.redirect and second_response.redirect:
32             self.generate_redirect_reg_exp(
33                 first_response.redirect, first_path,
34                 second_response.redirect, second_path,
35             ) #使用唯一符号定位重定向中反映路径的位置
36 
37         # 对比两个响应文本的相似度
38         if first_response.body is not None and second_response.body is not None:
39             self.dynamic_parser = DynamicContentParser(
40                 self.requester, first_path, first_response.body, second_response.body
41             )
42         else:
43             self.dynamic_parser = None
44 
45         #获取相似度,取到小数点后两位
46         self.ratio = float("{0:.2f}".format(self.dynamic_parser.comparisonRatio) ) 
47 
48         #根据响应文本量的大小调整相似度的值
49         if self.ratio == 1:
50             pass
51         elif len(first_response) < 100:
52             self.ratio -= 0.1
53         elif len(first_response) < 500:
54             self.ratio -= 0.05
55         elif len(first_response) < 2000:
56             self.ratio -= 0.02
57         else:
58             self.ratio -= 0.01
59         
60         #如果路径在响应文本中得到显示,则减小比率。因为路径长度之间的差异会降低相似性比率
61         encoding_type = get_encoding_type(first_response.body) #获取内容的编码方式
62         if first_path in first_response.body.decode(encoding_type): #根据编码方式解码
63             if len(first_response) < 200:
64                 self.ratio -= 0.15 + 15 / len(first_response)
65             elif len(first_response) < 800:
66                 self.ratio -= 0.06 + 30 / len(first_response)
67             elif len(first_response) < 5000:
68                 self.ratio -= 0.03 + 80 / len(first_response)
69             elif len(first_response) < 20000:
70                 self.ratio -= 0.02 + 200 / len(first_response)
71             else:
72                 self.ratio -= 0.01

10. Fuzzer()

10.1 setup_threads()

进行线程设置并执行相关操作

 1 #lib\core\fuzzer.py
 2 #class Fuzzer(object):
 3 def setup_threads(self):
 4         if len(self.threads):
 5             self.threads = []
 6 
 7         for thread in range(self.threads_count):
 8             #在线程中调用自身thread_proc函数,操作线程相关参数
 9             new_thread = threading.Thread(target=self.thread_proc) 
10             new_thread.daemon = True
11             self.threads.append(new_thread)

10.2 thread_proc()

进入thread_proc(),通过next(self.dictionary) 依次发起默认url、有前缀、有后缀、有扩展名的请求;通过scan()函数发起request请求并接收响应内容

 1 #lib\core\fuzzer.py
 2 #class Fuzzer(object):
 3 def thread_proc(self):
 4         self.play_event.wait()
 5         try:
 6             #获取可迭代对象的下一个对象,与后面调用的get_scanner_for()函数中yield self.calibration配合使用
 7             path = next(self.dictionary) 
 8             while path: #遍历字典
 9                 try:
10                     #如果请求速率超过最大值,则暂停
11                     while self.maxrate and self.rate >= self.maxrate:
12                         pass
13                     self.increase_rate() #调用函数加大请求速率
14                     status, response = self.scan(path) #调用scan()函数发起请求并返回响应
15                     result = Path(path=path, status=status, response=response) #定义Path类,用来存储对应路径相关的结果-路径:状态码,响应数据
16                     
17                     if status: #将结果存入对应的数组中
18                         self.matches.append(result) 
19                         for callback in self.match_callbacks:
20                             callback(result)
21                     else:
22                         for callback in self.not_found_callbacks:
23                             callback(result)
24 
25                 except RequestException as e:
26                     for callback in self.error_callbacks:
27                         callback(path, e.args[0]["message"])
28                     continue
29 
30                 finally:
31                     if not self.play_event.is_set():
32                         self.decrease_threads()
33                         self.paused_semaphore.release()
34                         self.play_event.wait()
35                         self.increase_threads()
36 
37                     path = next(self.dictionary)  # Raises StopIteration when finishes
38 
39                     if not self.running:
40                         break
41 
42                     time.sleep(self.delay)
43 
44         except StopIteration:
45             pass

10.3 scan()

最终请求发出位于scan()函数中,

 1 #lib\core\fuzzer.py
 2 #class Fuzzer(object):
 3 def scan(self, path):
 4         response = self.requester.request(path) #得到响应
 5         result = response.status #状态码
 6 
 7         for tester in list(set(self.get_scanner_for(path))): #调用自身get_scanner_for函数
 8             #检测是否存在通配符泛解析问题,若存在则结果不可信任,重置result为None
 9             if not tester.scan(path, response): 
10                 result = None
11                 break

10.4 get_scanner_for()

在get_scanner_for()中通过yield迭代执行,利用yield和next()实现循环遍历参数值,当参数值存在时,代码执行直到遇到"yield 关键字"时停止运行

 1 #lib\core\fuzzer.py
 2 #class Fuzzer(object):
 3 def get_scanner_for(self, path):
 4         #处理路径,以便检查扩展名/后缀
 5         path = path.split("?")[0].split("#")[0]
 6 
 7         #第一次执行next(self.dictionary) ,存在exclude_response参数则该次循环结束
 8         if self.exclude_response:
 9             yield self.calibration  
10         #第二次执行next(self.dictionary) 存在prefixes参数则该次循环结束
11         for prefix in self.prefixes:
12             if path.startswith(prefix):
13                 yield self.scanners["prefixes"][prefix]
14         #第三次执行next(self.dictionary) 存在suffixes参数则该次循环结束
15         for suffix in self.suffixes:
16             if path.endswith(suffix):
17                 yield self.scanners["suffixes"][suffix]
18         ##第四次执行next(self.dictionary) 存在extensions参数则该次循环结束
19         for extension in self.dictionary.extensions:
20             if path.endswith("." + extension):
21                 yield self.scanners["suffixes"]["." + extension]
22 
23         yield self.default_scanner

yield 解释:yield 是一个特殊的关键词,它用于在函数或方法中创建一个生成器。

生成器是一种可以在运行时动态生成值的迭代器。生成器的特点是,它不会一次生成所有的值,而是按需生成。在每次迭代时,生成器会返回一个新的值。这是通过定义一个带有 "yield" 关键词的函数来实现的。

下面是一个简单的 Python 生成器示例:

def simple_generator():
n = 1
print('This is printed first')
yield n
n += 1
print('This is printed second')
yield n
n += 1
print('This is printed last')
yield n
# 创建一个生成器对象my_gen = simple_generator()
# 使用 next() 函数来迭代生成器的值
print(next(my_gen))
print(next(my_gen))
print(next(my_gen))

在这个例子中,"yield" 关键词被用于在函数中创建三个不同的值。每次调用 next(my_gen) 时,生成器都会从它最后一次 yield 的值开始,然后按顺序生成下一个值。所以,这个例子的输出将会是:

This is printed first
1
This is printed second
2
This is printed last
3

 

11. Scanner()

11.1 scan()

回到Fuzzer()中的scan(),调用了Scanner()中的scan()。通过比较默认请求的状态码和新路径发起请求的状态码确认响应有效,然后确定是否存在重定向,若存在则比较重定向的响应页面,若相似度很高,则判定url存在重定向到特定页面的设置,若没有重定向而相似度高则判定存在泛解析为同一页面。

 1 #lib\core\scanner.py
 2 def scan(self, path, response):
 3         if self.response.status == response.status == 404:
 4             return False
 5 
 6         if self.response.status != response.status:
 7             return True
 8 
 9         if self.redirect_parser and response.redirect:
10             #在比较之前删除DOM(#)和查询(?)以减少误报
11             path = path.split("?")[0].split("#")[0] #获取url
12             redirect = response.redirect.split("?")[0].split("#")[0] #获取重定向到的url
13 
14             path = re.escape(unquote(path)) #转义特殊字符
15             #检查重定向的路径是否有效
16             regex = self.redirect_parser.regex.replace(self.sign, path)
17             redirect_to_invalid = self.redirect_parser.compare(regex, redirect)
18 
19             #如果重定向不符合规则,则标记为找到
20             if not redirect_to_invalid:
21                 return True
22 
23         # Compare 2 responses (wildcard one and given one)
24         ratio = self.dynamic_parser.compareTo(response.body)
25 
26         #如果相似度高则证明它是通配符,若重定向无效则调整比较的ratio值
27         if ratio >= self.ratio:
28             return False
29         elif "redirect_to_invalid" in locals() and ratio >= (self.ratio - 0.18):
30             return False
31 
32         return True #默认返回true,结果可信

总结

dirsearch.py为入口主程序,使用ArgumentParser()类获取用户输入的参数值,并对参数值进行检查和处理后,进行赋值,供其他函数使用;然后根据-q参数选择print()输出信息的模式,存在-q时调用PrintOutput()类输出,否则调用CLIOutput()输出(默认选择该类输出);之后使用Controller()类进行主要代码的执行控制,包括参数赋值,前提准备,多线程爆破执行,报告输出,从中调用其他模块中的类执行。

函数调用关系图

(以上为分析结果,由于本人也在学习阶段,可能存在部分错误,建议作为参考)