#!/usr/bin/env python3 # -*- coding: utf-8 -*-" """ FuzzSSH (Simple SSH Fuzzer) - 2022 - by psy (epsylon@riseup.net) You should have received a copy of the GNU General Public License along with FuzzSSH; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA ---------- See following RFCs for more info: rfc4251 - The SSH Protocol Architecture rfc4252 - The SSH Authentication Protocol rfc4253 - The SSH Transport Layer Protocol rfc4254 - The SSH Connection Protocol ---------- Current [01/22][Paramiko] tested parameters: username, password, pkey, key_filename, timeout, allow_agent, look_for_keys, compress, sock, gss_auth, gss_kex, gss_deleg_creds, gss_host, banner_timeout, auth_timeout, gss_trust_dns, passphrase, disabled_algorithms """ import sys, time, os try: import paramiko except: print("\nError importing: paramiko lib. \n\n To install it on Debian based systems:\n\n $ 'sudo apt-get install python3-paramiko'\n") sys.exit() VERSION = "v:0.1beta" RELEASE = "12012022" SOURCE1 = "https://code.03c8.net/epsylon/fuzzssh" SOURCE2 = "https://github.com/epsylon/fuzzssh" CONTACT = "epsylon@riseup.net - (https://03c8.net)" try: import payloads.payloads # import payloads except: print ("\n[Info] Try to run the tool with Python3.x.y... (ex: python3 fuzzssh.py) -> [EXITING!]\n") sys.exit() def progressbar(it, prefix="", size=60, file=sys.stdout): count = len(it) def show(j): x = int(size*j/count) file.write("%s[%s%s] %i/%i\r" % (prefix, "#"*x, "."*(size-x), j, count)) file.flush() show(0) for i, item in enumerate(it): yield item show(i+1) file.write("\n") file.flush() def payloading(): print("\n"+"="*50 + "\n") payloads_numbers = payloads.payloads.numbers # load 'numbers' payloads num_payloads_numbers = len(payloads_numbers) payloads_overflows = payloads.payloads.overflows # load 'overflows' payloads num_payloads_overflows = len(payloads_overflows) payloads_strings = payloads.payloads.strings # load 'strings' payloads num_payloads_strings = len(payloads_strings) payloads_bugs = payloads.payloads.bugs # load 'bugs' payloads num_payloads_bugs = len(payloads_bugs) return payloads_numbers, num_payloads_numbers, payloads_overflows, num_payloads_overflows, payloads_strings, num_payloads_strings, payloads_bugs, num_payloads_bugs def send_payload(client, payload, parameter, verbosity, num_payloads, method): try: # FUZZED PARAMETERS if parameter == "USERNAME": client.connect(hostname=str(target),port=int(port),username=payload, password=None, pkey=None, key_filename=None, timeout=None, allow_agent=True, look_for_keys=True, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, banner_timeout=None, auth_timeout=None, gss_trust_dns=True, passphrase=None, disabled_algorithms=None) elif parameter == "PASSWORD": client.connect(hostname=str(target),port=int(port),username=None, password=payload, pkey=None, key_filename=None, timeout=None, allow_agent=True, look_for_keys=True, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, banner_timeout=None, auth_timeout=None, gss_trust_dns=True, passphrase=None, disabled_algorithms=None) elif parameter == "PKEY": client.connect(hostname=str(target),port=int(port),username=None, password=None, pkey=payload, key_filename=None, timeout=None, allow_agent=True, look_for_keys=True, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, banner_timeout=None, auth_timeout=None, gss_trust_dns=True, passphrase=None, disabled_algorithms=None) elif parameter == "KEY_FILENAME": client.connect(hostname=str(target),port=int(port),username=None, password=None, pkey=None, key_filename=payload, timeout=None, allow_agent=True, look_for_keys=True, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, banner_timeout=None, auth_timeout=None, gss_trust_dns=True, passphrase=None, disabled_algorithms=None) elif parameter == "TIMEOUT": client.connect(hostname=str(target),port=int(port),username=None, password=None, pkey=None, key_filename=None, timeout=payload, allow_agent=True, look_for_keys=True, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, banner_timeout=None, auth_timeout=None, gss_trust_dns=True, passphrase=None, disabled_algorithms=None) elif parameter == "ALLOW_AGENT": client.connect(hostname=str(target),port=int(port),username=None, password=None, pkey=None, key_filename=None, timeout=None, allow_agent=payload, look_for_keys=True, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, banner_timeout=None, auth_timeout=None, gss_trust_dns=True, passphrase=None, disabled_algorithms=None) elif parameter == "LOOK_FOR_KEYS": client.connect(hostname=str(target),port=int(port),username=None, password=None, pkey=None, key_filename=None, timeout=None, allow_agent=True, look_for_keys=payload, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, banner_timeout=None, auth_timeout=None, gss_trust_dns=True, passphrase=None, disabled_algorithms=None) elif parameter == "COMPRESS": client.connect(hostname=str(target),port=int(port),username=None, password=None, pkey=None, key_filename=None, timeout=None, allow_agent=True, look_for_keys=True, compress=payload, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, banner_timeout=None, auth_timeout=None, gss_trust_dns=True, passphrase=None, disabled_algorithms=None) elif parameter == "SOCK": client.connect(hostname=str(target),port=int(port),username=None, password=None, pkey=None, key_filename=None, timeout=None, allow_agent=True, look_for_keys=True, compress=False, sock=payload, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, banner_timeout=None, auth_timeout=None, gss_trust_dns=True, passphrase=None, disabled_algorithms=None) elif parameter == "GSS_AUTH": client.connect(hostname=str(target),port=int(port),username=None, password=None, pkey=None, key_filename=None, timeout=None, allow_agent=True, look_for_keys=True, compress=False, sock=None, gss_auth=payload, gss_kex=False, gss_deleg_creds=True, gss_host=None, banner_timeout=None, auth_timeout=None, gss_trust_dns=True, passphrase=None, disabled_algorithms=None) elif parameter == "GSS_KEX": client.connect(hostname=str(target),port=int(port),username=None, password=None, pkey=None, key_filename=None, timeout=None, allow_agent=True, look_for_keys=True, compress=False, sock=None, gss_auth=False, gss_kex=payload, gss_deleg_creds=True, gss_host=None, banner_timeout=None, auth_timeout=None, gss_trust_dns=True, passphrase=None, disabled_algorithms=None) elif parameter == "GSS_DELEG_CREDS": client.connect(hostname=str(target),port=int(port),username=None, password=None, pkey=None, key_filename=None, timeout=None, allow_agent=True, look_for_keys=True, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=payload, gss_host=None, banner_timeout=None, auth_timeout=None, gss_trust_dns=True, passphrase=None, disabled_algorithms=None) elif parameter == "GSS_HOST": client.connect(hostname=str(target),port=int(port),username=None, password=None, pkey=None, key_filename=None, timeout=None, allow_agent=True, look_for_keys=True, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=payload, banner_timeout=None, auth_timeout=None, gss_trust_dns=True, passphrase=None, disabled_algorithms=None) elif parameter == "BANNER_TIMEOUT": client.connect(hostname=str(target),port=int(port),username=None, password=None, pkey=None, key_filename=None, timeout=None, allow_agent=True, look_for_keys=True, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, banner_timeout=payload, auth_timeout=None, gss_trust_dns=True, passphrase=None, disabled_algorithms=None) elif parameter == "AUTH_TIMEOUT": client.connect(hostname=str(target),port=int(port),username=None, password=None, pkey=None, key_filename=None, timeout=None, allow_agent=True, look_for_keys=True, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, banner_timeout=None, auth_timeout=payload, gss_trust_dns=True, passphrase=None, disabled_algorithms=None) elif parameter == "GSS_TRUST_DNS": client.connect(hostname=str(target),port=int(port),username=None, password=None, pkey=None, key_filename=None, timeout=None, allow_agent=True, look_for_keys=True, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, banner_timeout=None, auth_timeout=None, gss_trust_dns=payload, passphrase=None, disabled_algorithms=None) elif parameter == "PASSPHRASE": client.connect(hostname=str(target),port=int(port),username=None, password=None, pkey=None, key_filename=None, timeout=None, allow_agent=True, look_for_keys=True, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, banner_timeout=None, auth_timeout=None, gss_trust_dns=True, passphrase=payload, disabled_algorithms=None) elif parameter == "DISABLED_ALGORITHMS": client.connect(hostname=str(target),port=int(port),username=None, password=None, pkey=None, key_filename=None, timeout=None, allow_agent=True, look_for_keys=True, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, banner_timeout=None, auth_timeout=None, gss_trust_dns=True, passphrase=None, disabled_algorithms=payload) except paramiko.SSHException as e: # https://docs.paramiko.org/en/stable/api/ssh_exception.html exception = str(e) if not os.path.exists('exceptions.log'): os.mknod('exceptions.log') if not exception in open('exceptions.log').read(): f = open("exceptions.log", "a") f.write("Exception error: %s\n\n" % exception) f.close() client.close() # close SSH client def exploit(target, port, user, pw, verbosity, payloads_numbers, num_payloads_numbers, payloads_overflows, num_payloads_overflows, payloads_strings, num_payloads_strings, payloads_bugs, num_payloads_bugs): try: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.load_system_host_keys() paramiko.util.log_to_file("/dev/null", level="INFO") # logs + bypass -> paramiko.SSHException issue (https://github.com/paramiko/paramiko/issues/1752) print("[Info] Trying SSH [NORMAL] connection...\n") client.connect(hostname=str(target),port=int(port),username=str(user),password=str(pw),timeout=10,banner_timeout=200,look_for_keys=False,allow_agent=False) print("\n[Info] [NORMAL] Connection established -> OK!") if verbosity is True: b = client.get_transport().remote_version print ("\n -> [*] Banner:") print(" -> "+str(b)) so = client._transport.get_security_options() print ("\n -> [*] Ciphering algorithms:") for c in so.ciphers: print(" -> "+str(c)) print ("\n -> [*] Key exchange algorithms:") for k in so.kex: print(" -> "+str(k)) print("\n[Info] [NORMAL] Connection closed -> OK!") print("\n"+"="*50) except: print("="*50) print ("\n[Error] [NORMAL] Connection failed! -> [PASSING!]") client.close() # close SSH client print("\n -> [*] Starting to test SSH (protocol)...") parameters = ("USERNAME", "PASSWORD", "PKEY", "KEY_FILENAME", "TIMEOUT", "ALLOW_AGENT", "LOOK_FOR_KEYS", "COMPRESS", "SOCK", "GSS_AUTH", "GSS_KEX", "GSS_DELEG_CREDS", "GSS_HOST", "BANNER_TIMEOUT", "AUTH_TIMEOUT", "GSS_TRUST_DNS", "PASSPHRASE", "DISABLED_ALGORITHMS") # FUZZED PARAMETERS for parameter in parameters: print("\n -> [SSH] -> ["+str(parameter)+"]...\n") method = " [*] Numbers " for i in progressbar(range(num_payloads_numbers),method+" ", 40): time.sleep(0.7) for number in payloads_numbers: send_payload(client, number, parameter, verbosity, num_payloads_numbers, method) time.sleep(0.2) method = " [*] Overflows " for i in progressbar(range(num_payloads_overflows),method+" ", 40): time.sleep(0.7) for overflow in payloads_overflows: send_payload(client, overflow, parameter, verbosity, num_payloads_overflows, method) time.sleep(0.2) method = " [*] Format Strings" for i in progressbar(range(num_payloads_strings),method+" ", 40): time.sleep(0.7) for string in payloads_strings: send_payload(client, string, parameter, verbosity, num_payloads_strings, method) time.sleep(0.2) method = " [*] Known bugs " for i in progressbar(range(num_payloads_bugs),method+" ", 40): time.sleep(0.7) for bug in payloads_bugs: send_payload(client, bug, parameter, verbosity, num_payloads_bugs, method) time.sleep(0.2) print("\n"+"-"*15) def set_target(): target = input("\n + Enter TARGET (ex: '100.0.0.1'): ") if target == "": # exit when no 'target' set print("\n"+"="*50) print("\n[Error] Not ANY target detected... Exiting!\n") sys.exit() port = input("\n + Enter PORT (ex: '22'): ") try: # check port as integer num port = int(port) except: port = 22 if port == "": # default when no 'port' set port = 22 user = input("\n + Enter USER (ex: 'root'): ") if user == "": # default when no 'user' set user = "root" pw = input("\n + Enter PASSWORD (ex: '12345'): ") verbosity = input("\n + Enter VERBOSITY (ex: 'true'): ") if verbosity == "True" or verbosity == "true": # default when no 'verbosity' set verbosity = True else: verbosity = False return target, port, user, pw, verbosity def print_banner(): print("\n"+"="*50) print(" _____ __________ _ _ ") print("| ___| _|__ /__ /___ ___| | | |") print("| |_ | | | | / / / // __/ __| |_| |") print("| _|| |_| |/ /_ / /_\__ \__ \ _ |") print("|_| \__,_/____/____|___/___/_| |_| by psy") print('\n"Simple SSH Protocol Fuzzing Tool"') print("\n"+"-"*15+"\n") print(" * VERSION: ") print(" + "+VERSION+" - (rev:"+RELEASE+")") print("\n * SOURCES:") print(" + "+SOURCE1) print(" + "+SOURCE2) print("\n * CONTACT: ") print(" + "+CONTACT+"\n") print("-"*15+"\n") print("="*50) # sub_init # print_banner() # show banner print("\n"+"="*50) target, port, user, pw, verbosity = set_target() payloads_numbers, num_payloads_numbers, payloads_overflows, num_payloads_overflows, payloads_strings, num_payloads_strings, payloads_bugs, num_payloads_bugs = payloading() exploit(target, port, user, pw, verbosity, payloads_numbers, num_payloads_numbers, payloads_overflows, num_payloads_overflows, payloads_strings, num_payloads_strings, payloads_bugs, num_payloads_bugs)