#!/usr/bin/env python3
"""Liveness check for every entry in every botnet/*.txt (HTTP, DNS, NTP, SNMP, UDP-amp).
Iterates the whole botnet/ folder, routes by filename to the right probe.
Informational test: PASS unless every populated category has zero alive entries
(which would indicate a systemic network failure or code regression).
"""
import socket, ssl, struct, time, os, sys
import urllib.request, urllib.error, urllib.parse
BOTNET_DIR = "botnet"
UA = "Mozilla/5.0 (X11; Linux x86_64; rv:135.0) Gecko/20100101 Firefox/135.0"
TIMEOUT = 6
HTTP_CATEGORIES = {"zombies", "aliens", "droids", "ucavs", "rpcs"}
UDP_AMP_PROBES = {
"dns": (53, b'\x00' * 12 + b'\x06google\x03com\x00\x00\x01\x00\x01',
lambda d: d[2] & 0x80),
"ntp": (123, b'\x1b' + 47 * b'\0', lambda d: len(d) == 48),
"snmp": (161,
bytes([0x30, 0x26, 0x02, 0x01, 0x01, 0x04, 0x06]) + b'public' + bytes([
0xa0, 0x19, 0x02, 0x04, 0x71, 0x44, 0x12, 0x34,
0x02, 0x01, 0x00, 0x02, 0x01, 0x00, 0x30, 0x0b,
0x30, 0x09, 0x06, 0x05, 0x2b, 0x06, 0x01, 0x02,
0x01, 0x05, 0x00]),
lambda d: len(d) > 10),
"memcached": (11211, b'\x00\x01\x00\x00\x00\x01\x00\x00stats\r\n', lambda d: len(d) > 0),
"chargen": (19, b'\x00', lambda d: len(d) > 0),
"cldap": (389,
(b'\x30\x84\x00\x00\x00\x2d\x02\x01\x01\x63\x84\x00\x00\x00\x24'
b'\x04\x00\x0a\x01\x00\x0a\x01\x00\x02\x01\x00\x02\x01\x00\x01'
b'\x01\x00\x87\x0b\x6f\x62\x6a\x65\x63\x74\x43\x6c\x61\x73\x73'
b'\x30\x00'),
lambda d: len(d) > 0),
"ssdp": (1900,
(b'M-SEARCH * HTTP/1.1\r\nHOST: 239.255.255.250:1900\r\n'
b'MAN: "ssdp:discover"\r\nMX: 1\r\nST: ssdp:all\r\n\r\n'),
lambda d: len(d) > 0),
"qotd": (17, b'\x00', lambda d: len(d) > 0),
"tftp": (69, b'\x00\x01startup-config\x00netascii\x00', lambda d: len(d) > 0),
"wsdisco": (3702, b'', lambda d: len(d) > 0),
"coap": (5683, b'\x40\x01\x12\x34\xbb.well-known\x04core', lambda d: len(d) > 0),
"mssql": (1434, b'\x02', lambda d: len(d) > 0),
"arms": (3283, b'\x00\x14\x00\x01\x00\x03', lambda d: len(d) > 0),
"plex": (32414, b'M-SEARCH * HTTP/1.1\r\n\r\n', lambda d: len(d) > 0),
"netbios": (137,
(b'\xab\xcd\x00\x10\x00\x01\x00\x00\x00\x00\x00\x00'
b'\x20\x43\x4b\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41'
b'\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x00'
b'\x00\x21\x00\x01'),
lambda d: len(d) > 0),
"ripv1": (520,
(b'\x01\x01\x00\x00' + b'\x00' * 16 + b'\x00\x00\x00\x10'),
lambda d: len(d) > 0),
}
TCP_PROBES = {
"middlebox": 80,
}
SKIP = {"dorks", "humans"}
def http_probe(url):
parsed = urllib.parse.urlparse(url.rstrip(';').split(';')[0])
if not parsed.scheme:
parsed = urllib.parse.urlparse("http://" + url)
base = parsed.scheme + "://" + parsed.netloc + parsed.path
if parsed.query:
base += "?" + parsed.query
ctx = ssl.create_default_context(); ctx.check_hostname = False; ctx.verify_mode = ssl.CERT_NONE
headers = {"User-Agent": UA, "Accept": "*/*"}
is_xmlrpc = 'xmlrpc' in base.lower()
if is_xmlrpc:
body = b'system.listMethods'
req = urllib.request.Request(base, data=body, headers={**headers, "Content-Type": "text/xml"}, method='POST')
else:
req = urllib.request.Request(base, headers=headers, method='HEAD')
try:
r = urllib.request.urlopen(req, context=ctx, timeout=TIMEOUT)
return ("UP", r.status, "")
except urllib.error.HTTPError as e:
if e.code in (403, 405):
return ("HTTP-ERR", e.code, "")
return ("HTTP-ERR", e.code, str(e)[:80])
except urllib.error.URLError as e:
return ("URL-ERR", 0, str(e.reason)[:80])
except Exception as e:
return ("ERR", 0, f"{type(e).__name__}: {e}"[:80])
def udp_probe(ip, port, payload, success_fn):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM); s.settimeout(TIMEOUT)
try:
s.sendto(payload, (ip, port))
data, _ = s.recvfrom(4096)
if success_fn(data):
return ("UP", port, f"{len(data)}B")
return ("MALFORMED", port, repr(data[:20]))
except socket.timeout:
return ("TIMEOUT", port, "")
except Exception as e:
return ("ERR", port, f"{type(e).__name__}: {e}"[:80])
finally:
s.close()
def tcp_probe(ip, port):
try:
with socket.create_connection((ip, port), timeout=TIMEOUT):
return ("UP", port, "")
except socket.timeout:
return ("TIMEOUT", port, "")
except Exception as e:
return ("ERR", port, f"{type(e).__name__}: {e}"[:80])
def load(path):
if not os.path.exists(path):
return []
with open(path, encoding='utf-8', errors='replace') as f:
return [l.strip() for l in f if l.strip()]
total = alive_total = 0
dead_per_cat = {}
all_results = []
print(f"{'category':12s} | {'entry':50s} | result")
print("-" * 110)
files = sorted(os.listdir(BOTNET_DIR))
for fname in files:
if not fname.endswith(".txt"):
continue
cat = fname[:-4]
if cat in SKIP:
continue
entries = load(os.path.join(BOTNET_DIR, fname))
if not entries:
print(f"{cat:12s} | (empty)")
continue
if all(e.startswith('') for e in entries):
print(f"{cat:12s} | (placeholders only)")
continue
entries = [e for e in entries if not (e.startswith(''))]
if not entries:
print(f"{cat:12s} | (placeholders only)")
continue
if len(entries) > 5:
entries = entries[:5]
print(f"{cat:12s} | sampling first 5 of {len(load(os.path.join(BOTNET_DIR, fname)))}")
dead_per_cat[cat] = 0
for entry in entries:
total += 1
if cat in HTTP_CATEGORIES:
status, code, detail = http_probe(entry)
ok = (status == "UP") or (status == "HTTP-ERR" and code in (403, 405) and 'xmlrpc' in entry.lower())
elif cat in UDP_AMP_PROBES:
port, payload, success_fn = UDP_AMP_PROBES[cat]
status, code, detail = udp_probe(entry, port, payload, success_fn)
ok = (status == "UP")
elif cat in TCP_PROBES:
port = TCP_PROBES[cat]
status, code, detail = tcp_probe(entry, port)
ok = (status == "UP")
else:
status, code, detail = ("SKIPPED", 0, "unknown category")
ok = True
continue
mark = "OK " if ok else "DEAD"
if not ok:
dead_per_cat[cat] += 1
all_results.append((cat, entry, status, code, detail))
else:
alive_total += 1
print(f"{cat:12s} | {entry[:50]:50s} | {mark} {status:9s} {code:<5} {detail[:50]}")
print("-" * 110)
print(f"Tested entries: {total}, Alive: {alive_total}, Dead: {total - alive_total}")
print()
print("Per-category dead:")
for cat in sorted(dead_per_cat):
if dead_per_cat[cat] > 0:
print(f" {cat}: {dead_per_cat[cat]} dead")
else:
print(f" {cat}: 0 dead")
print()
print("DEAD entries:")
for cat, entry, status, code, detail in all_results:
print(f" {cat}: {entry} ({status} {code} {detail})")
if total == 0:
print("No entries tested (all categories empty or only placeholders).")
sys.exit(0)
if alive_total == 0:
print("All entries dead - probable network issue or code regression.")
sys.exit(1)
sys.exit(0)