额,,, 将视线拉回到本渣作。w9scan的初期代码是模仿bugscan而写的,因为w9scan在编写的初期就是为了兼容bugscan的插件,因此做了大量兼容工作来兼容w9scan的代码。而做兼容工作必不可少要了解下bugscan的工作原理,所以笔者用自己的渣渣理解力简述下bugscan的功能流程:
于是笔者在代码结构层次模仿w9scan,功能结构层次模仿bugscan,凭着脑中若干的想像!@#¥@# 制造了w9scan第一版本..
1. 获取plugins目录下所有的py文件,(__init__.py除外),将内容存入一个字典中
2. 插件代码加载函数,通过调用imp模块将字典存储的代码加载进来,返回模块对象
def_load_module(self,chunk,name='<w9scan>'): try: pluginObj = imp.new_module(str(name)) exec chunk in pluginObj.__dict__ except Exception as err_info: raise LoadModuleException return pluginObj3.为返回的模块类加上内置的API(就是一些bugscan常用的内置API,如curlhackhttp之类)
4.一切就绪,然后根据bugscan API说明,bugscan插件需定义两个函数 assign为验证 audit为执行函数。所有接下来来调用这两个函数了。
pluginObj_tuple = pluginObj.assign(service,url) if notisinstance(pluginObj_tuple, tuple): # 判断是否是元组 continue bool_value, agrs =pluginObj_tuple[0], pluginObj_tuple[1] if bool_value: pluginObj.audit(agrs) CMS识别目前CMS大多数都是依靠指纹,看了很多依靠机器识别来验证webshell的列子,笔者也在学习如何用机器学习来识别CMS。
W9scan的CMS识别的指纹库以及代码都是bugscan的,里面有一些有趣的技巧,分享一下。CMS指纹文件在 lib/utils/cmsdata.py
来到 plugins/www/whatcms.py
import re,urlparse from lib.utils.cmsdata import cms_dict import hashlib def getMD5(password): m= hashlib.md5() m.update(password) return m.hexdigest() def makeurl(url): prox = "http://" if(url.startswith("https://")): prox = "https://" url_info = urlparse.urlparse(url) url = prox + url_info.netloc + "/" return url def isMatching(f_path, cms_name, sign, res,code, host, head): isMatch = False if f_path.endswith(".gif"): if sign: isMatch = getMD5(res) == sign else: isMatch = res.startswith("GIF89a") elif f_path.endswith(".png"): if sign: isMatch = getMD5(res) == sign else: isMatch = res.startswith("\x89PNG\x0d\x0a\x1a\x0a") elif f_path.endswith(".jpg"): if sign: isMatch = getMD5(res) == sign else: isMatch = res.startswith("\xff\xd8\xff\xe0\x00\x10JFIF") elif f_path.endswith(".ico"): if sign: isMatch = getMD5(res) == sign else: isMatch = res.startswith("\x00\x00\x00") elif code == 200: if sign and res.find(sign) != -1 or head.find(sign) != -1: isMatch = True elif sign and head.find(sign) != -1: isMatch = True if isMatch: task_push(cms_name, host, target=util.get_url_host(host)) security_note(cms_name,'whatcms') #print "%s %s" % (cms_name, host) return True return False def assign(service, arg): if service == "www": return True,makeurl(arg) def audit(arg): cms_cache = {} cache = {} def _cache(url): if url in cache: return cache[url] else: status_code, header, html_body, error, error = curl.curl2(url) if status_code != 200 or not html_body: html_body = "" cache[url] = (status_code, header, html_body) return status_code, header, html_body for cmsname in cms_dict: cms_hash_list = cms_dict[cmsname] for cms_hash in cms_hash_list: if isinstance(cms_hash, tuple): f_path, sign = cms_hash else: f_path, sign = cms_hash, None if not isinstance(f_path, list): f_path = [f_path] for file_path in f_path: if file_path not in cms_cache: cms_cache[file_path] = [] cms_cache[file_path].append((cmsname, sign)) cms_key = cms_cache.keys() cms_key.sort(key=len) isMatch = False for f_path in cms_key: if isMatch: break for cms_name, sign in cms_cache[f_path]: code, head, res = _cache(arg + f_path) isMatch =isMatching(f_path, cms_name, sign, res, code, arg, head) if isMatch: break #p#分页标题#e#从 audit函数看起,第一个for循环是给指纹排序
for cmsname in cms_dict: cms_hash_list = cms_dict[cmsname] for cms_hash in cms_hash_list: if isinstance(cms_hash, tuple): f_path, sign = cms_hash else: f_path, sign = cms_hash, None if not isinstance(f_path, list): f_path = [f_path] for file_path in f_path: if file_path not in cms_cache: cms_cache[file_path] = [] cms_cache[file_path].append((cmsname, sign))为了减少访问网页的次数,cms识别即最好访问一次路径后,同时找出相同路径下的cms的特征,例如访问一次 robots.txt 会有很多cms的指纹路径是这个。
#p#分页标题#e#最后在isMatch() 函数下找到我们问题的答案~
def isMatching(f_path, cms_name, sign, res,code, host, head): isMatch = False if f_path.endswith(".gif"): if sign: isMatch = getMD5(res) == sign else: isMatch = res.startswith("GIF89a") elif f_path.endswith(".png"): if sign: isMatch = getMD5(res) == sign else: isMatch = res.startswith("\x89PNG\x0d\x0a\x1a\x0a") elif f_path.endswith(".jpg"): if sign: isMatch = getMD5(res) == sign else: isMatch = res.startswith("\xff\xd8\xff\xe0\x00\x10JFIF") elif f_path.endswith(".ico"): if sign: isMatch = getMD5(res) == sign else: isMatch = res.startswith("\x00\x00\x00") elif code == 200: if sign and res.find(sign) != -1 or head.find(sign) != -1: isMatch = True elif sign and head.find(sign) != -1: isMatch = True if isMatch: task_push(cms_name, host, target=util.get_url_host(host)) security_note(cms_name,'whatcms') #print "%s %s" % (cms_name, host) return True return False可以看到,是通过对访问路径的后缀做判断,并且对读取的图片前几位做校验。这种判断模式比一些暴力用指纹枚举的判断模式在效率方面应该是更好的。
if isMatch: task_push(cms_name, host, target=util.get_url_host(host)) security_note(cms_name,'whatcms') #print "%s %s" % (cms_name, host) return True在我们exploit_run 类的内部实现task_push 的功能就好啦~
其实整理也不辛苦拉,像笔者这么懒的人怎么会一个个手动干呢,一个正则一个移动文件 一个自动创建文件夹就把这些搞定了
try: l =socket.gethostbyname_ex(hostnames) security_info(str(l),'subdomain') except socket.error: passO(∩_∩)O哈哈~,其实通过socket.gethostname就可以完成子域名爆破的工作,为什么要用自己重写dns协议呢。
大家可以自行 pipinstall buildwith 安装这个库试试,非常强大!
# coding:utf-8 # 模拟一个 线程池,可以向里面添加任务, import threading import time import traceback import Queue import random class w8_threadpool: def__init__(self,threadnum,func_scan): self.thread_count = self.thread_nums = threadnum self.scan_count_lock = threading.Lock() self.thread_count_lock = threading.Lock() self.load_lock = threading.Lock() self.scan_count = 0 self.isContinue = True self.func_scan = func_scan self.queue = Queue.Queue() def push(self,payload): self.queue.put(payload) defchangeScanCount(self,num): self.scan_count_lock.acquire() self.scan_count += num self.scan_count_lock.release() defchangeThreadCount(self,num): self.thread_count_lock.acquire() self.thread_count += num self.thread_count_lock.release() defrun(self): for iin range(self.thread_nums): t= threading.Thread(target=self.scan, name=str(i)) t.setDaemon(True) t.start() #It can quit with Ctrl-C while1: ifself.thread_count > 0 and self.isContinue: time.sleep(0.01) else: break defstop(self): self.load_lock.acquire() self.isContinue = False self.load_lock.release() defscan(self): while1: self.load_lock.acquire() ifself.queue.qsize() > 0 and self.isContinue: payload = self.queue.get() self.load_lock.release() else: self.load_lock.release() break try: # POC在执行时报错如果不被处理,线程框架会停止并退出 self.func_scan(payload) except KeyboardInterrupt: self.isContinue = False raise KeyboardInterrupt except Exception: errmsg = traceback.format_exc() self.isContinue = False self.changeScanCount(-1) self.changeThreadCount(-1) if __name__ == '__main__': defcalucator(num): i =random.randint(1, 100) u =num a = i* u if (a% 6 == 0): for x in range(5): print "new thread" p.push(x) p =w8_threadpool(3, calucator) for i inrange(100000): p.push(i) p.run()这么一个简陋的线程池对于笔者的w9scan来说足够了(其实笔者的要求也不高是不是)
后记 #p#分页标题#e#我觉得,一些成熟的扫描器在一些常规扫描方式达到登峰造极的地步,以至于在想用这些常规方法扫描漏洞,效果可能不太好。想要有效挖掘一个网站的漏洞,应该从那些不起眼的地方着力,比如w9scan扫描器中有git svm泄露路径寻找,会基于域名的字典来查找备份文件,以及根据爬虫寻找出的文件逐个进行一句话密码破解等等……