abductor.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-"
  3. """
  4. This file is part of the UFONet project, https://ufonet.03c8.net
  5. Copyright (c) 2013/2020 | psy <epsylon@riseup.net>
  6. You should have received a copy of the GNU General Public License along
  7. with UFONet; if not, write to the Free Software Foundation, Inc., 51
  8. Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
  9. """
  10. import ssl, random, socket, time, re
  11. import urllib.request, urllib.error, urllib.parse
  12. from urllib.parse import urlparse as urlparse
  13. # UFONet recognizance (abduction) class
  14. class Abductor(object):
  15. def __init__(self,ufonet):
  16. self.ufonet=ufonet
  17. self.start = None
  18. self.stop = None
  19. self.port = None
  20. self.ctx = ssl.create_default_context() # creating context to bypass SSL cert validation (black magic)
  21. self.ctx.check_hostname = False
  22. self.ctx.verify_mode = ssl.CERT_NONE
  23. def proxy_transport(self, proxy):
  24. proxy_url = self.ufonet.extract_proxy(proxy)
  25. proxy = urllib.request.ProxyHandler({'https': proxy_url})
  26. opener = urllib.request.build_opener(proxy)
  27. urllib.request.install_opener(opener)
  28. def establish_connection(self, target):
  29. if target.endswith(""):
  30. target.replace("", "/")
  31. self.ufonet.user_agent = random.choice(self.ufonet.agents).strip() # shuffle user-agent
  32. headers = {'User-Agent' : self.ufonet.user_agent, 'Referer' : self.ufonet.referer} # set fake user-agent and referer
  33. try:
  34. req = urllib.request.Request(target, None, headers)
  35. if self.ufonet.options.proxy: # set proxy
  36. self.proxy_transport(self.ufonet.options.proxy)
  37. self.start = time.time()
  38. target_reply = urllib.request.urlopen(req, context=self.ctx)
  39. header = target_reply.getheaders()
  40. target_reply = target_reply.read().decode('utf-8')
  41. self.stop = time.time()
  42. else:
  43. self.start = time.time()
  44. target_reply = urllib.request.urlopen(req, context=self.ctx)
  45. header = target_reply.getheaders()
  46. target_reply = target_reply.read().decode('utf-8')
  47. self.stop = time.time()
  48. except:
  49. print('[Error] [AI] Unable to connect -> [Exiting!]\n')
  50. return
  51. return target_reply, header
  52. def convert_size(self, size):
  53. import math
  54. if (size == 0):
  55. return '0B'
  56. size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
  57. i = int(math.floor(math.log(size,1024)))
  58. p = math.pow(1024,i)
  59. s = round(size/p,2)
  60. return '%s %s' % (s,size_name[i])
  61. def convert_time(self, time):
  62. return '%.2f' % time
  63. def extract_banner(self, header): # extract webserver banner
  64. banner = None
  65. via = None
  66. try:
  67. for h in header:
  68. if h[0] == "Server":
  69. banner = h[1]
  70. except:
  71. banner = "NOT found!"
  72. try:
  73. for h in header:
  74. if h[0] == "Via":
  75. via = h[1]
  76. except: # return when fails performing query
  77. via = "NOT found!"
  78. if not banner:
  79. banner = "NOT found!"
  80. if not via:
  81. via = "NOT found!"
  82. return banner, via
  83. def extract_whois(self, domain): # extract whois data from target domain
  84. try:
  85. import whois
  86. d = whois.query(domain, ignore_returncode=True) # ignore return code
  87. if d.creation_date is None: # return when no creation date
  88. return
  89. else:
  90. print(" -Registrant : " + str(d.registrar))
  91. print(" -Creation date: " + str(d.creation_date))
  92. print(" -Expiration : " + str(d.expiration_date))
  93. print(" -Last update : " + str(d.last_updated))
  94. except: # return when fails performing query
  95. return
  96. def extract_cve(self, banner): # extract Denial of Service vulnerabilities related with webserver banner from CVE database
  97. url = 'https://cve.mitre.org/cgi-bin/cvekey.cgi?keyword'
  98. q = str(banner)
  99. query_string = { '':q}
  100. data = urllib.parse.urlencode(query_string)
  101. target = url + data
  102. try:
  103. self.ufonet.user_agent = random.choice(self.ufonet.agents).strip() # shuffle user-agent
  104. headers = {'User-Agent' : self.ufonet.user_agent, 'Referer' : self.ufonet.referer} # set fake user-agent and referer
  105. req = urllib.request.Request(target, None, headers)
  106. if self.ufonet.options.proxy: # set proxy
  107. self.proxy_transport(self.ufonet.options.proxy)
  108. target_reply = urllib.request.urlopen(req, context=self.ctx).read().decode('utf-8')
  109. else:
  110. target_reply = urllib.request.urlopen(req, context=self.ctx).read().decode('utf-8')
  111. except:
  112. return #sys.exit(2)
  113. if target_reply == "": # no records found
  114. return
  115. if "<b>0</b> CVE entries" in target_reply: # regex for: no CVE records found
  116. cve = "NOT found!"
  117. else:
  118. regex_s = '<td valign="top" nowrap="nowrap"><a href="(.+?)">' # regex magics
  119. pattern_s = re.compile(regex_s)
  120. cve = re.findall(pattern_s, target_reply)
  121. return cve
  122. def waf_detection(self, banner, target_reply):
  123. self.wafs_file = "core/txt/wafs.txt" # set source path to retrieve 'wafs'
  124. try:
  125. f = open(self.wafs_file)
  126. wafs = f.readlines()
  127. f.close()
  128. except:
  129. wafs = "broken!"
  130. sep = "##"
  131. for w in wafs:
  132. if sep in w:
  133. w = w.split(sep)
  134. signature = w[0] # signature
  135. t = w[1] # vendor
  136. if signature in target_reply or signature in banner:
  137. waf = "VENDOR -> " + str(t)
  138. else:
  139. pass
  140. else:
  141. waf = "FIREWALL NOT PRESENT (or not discovered yet)! ;-)\n"
  142. return waf
  143. def abducting(self, target):
  144. try:
  145. target_reply, header = self.establish_connection(target)
  146. except:
  147. print("[Error] [AI] Something wrong connecting to your target -> [Aborting!]\n")
  148. return #sys.exit(2)
  149. if not target_reply:
  150. print("[Error] [AI] Something wrong connecting to your target -> [Aborting!]\n")
  151. return #sys.exit(2)
  152. print(' -Target URL:', target, "\n")
  153. try:
  154. if target.startswith("http://"):
  155. self.port = "80"
  156. if target.startswith("https://"):
  157. self.port = "443"
  158. except:
  159. self.port = "Error!"
  160. try:
  161. domain = urlparse(target)
  162. domain = domain.netloc
  163. if domain.startswith("www."):
  164. domain = domain.replace("www.", "")
  165. except:
  166. domain = "OFF"
  167. try:
  168. ipv4 = socket.gethostbyname(domain)
  169. except:
  170. try:
  171. try: # extra resolver plan extracted from Orb (https://orb.03c8.net/) [24/12/2018 -> OK!]
  172. import dns.resolver
  173. r = dns.resolver.Resolver()
  174. r.nameservers = ['8.8.8.8', '8.8.4.4'] # google DNS resolvers
  175. url = urlparse(domain)
  176. a = r.query(url.netloc, "A") # A record
  177. for rd in a:
  178. ipv4 = str(rd)
  179. except:
  180. ipv4 = "OFF"
  181. except:
  182. ipv4 = "OFF"
  183. try:
  184. ipv6 = socket.getaddrinfo(domain, int(self.port), socket.AF_INET6, socket.IPPROTO_TCP)
  185. ftpca = ipv6[0]
  186. ipv6 = ftpca[4][0]
  187. except:
  188. ipv6 = "OFF"
  189. print(' -IP :', ipv4)
  190. print(' -IPv6 :', ipv6)
  191. print(' -Port :', self.port)
  192. print(' \n -Domain:', domain)
  193. try:
  194. whois = self.extract_whois(domain)
  195. except:
  196. pass
  197. try:
  198. size = self.convert_size(len(target_reply))
  199. except:
  200. size = "Error!"
  201. try:
  202. time_required = self.stop - self.start
  203. load = self.convert_time(time_required)
  204. except:
  205. load = "Error!"
  206. try:
  207. banner, via = self.extract_banner(header)
  208. except:
  209. pass
  210. print('\n---------')
  211. print("\nTrying single visit broadband test (using GET)...\n")
  212. print(' -Bytes in :', size)
  213. print(' -Load time:', load, "seconds\n")
  214. print('---------')
  215. print("\nDetermining webserver fingerprint (note that this value can be a fake)...\n")
  216. print(' -Banner:', banner)
  217. print(' -Vía :', via , "\n")
  218. print('---------')
  219. print("\nSearching for extra Anti-DDoS protections...\n")
  220. waf = self.waf_detection(banner, target_reply)
  221. print(' -WAF/IDS: ' + waf)
  222. if 'VENDOR' in waf:
  223. print(' -NOTICE : This FIREWALL probably is using Anti-(D)DoS measures!', "\n")
  224. print('---------')
  225. if banner == "NOT found!":
  226. pass
  227. else:
  228. print("\nSearching at CVE (https://cve.mitre.org) for vulnerabilities...\n")
  229. try:
  230. cve = self.extract_cve(banner)
  231. if cve == None:
  232. print(' -Last Reports: NOT found!', "\n")
  233. elif cve == "NOT found!":
  234. print(' -Last Reports:', cve)
  235. else:
  236. print(' -Last Reports:')
  237. i = 0
  238. for c in cve:
  239. i = i + 1
  240. if i < 11:
  241. cve_info = c.replace("/cgi-bin/cvename.cgi?name=","")
  242. print("\n +", cve_info, "->", "https://cve.mitre.org" + c) # 8 tab for zen
  243. print('\n---------')
  244. except:
  245. pass
  246. print("\n[Info] [AI] Abduction finished! -> [OK!]\n")