# Sality Extractor # Copyright (C) 2017 quangnh89, develbranch.com # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # blog: https://develbranch.com # email: contact[at]develbranch.com import pefile import struct import re import argparse from unicorn import * from unicorn.x86_const import * from capstone import * from keystone import * from datetime import datetime class SalityExtractor(): def __init__(self, sample_file=None, output_file=None): self.md = Cs(CS_ARCH_X86, CS_MODE_32) self.md.detail = True self.sample = sample_file self.output = output_file self.detected = False self.control_server = [] # utility methods @staticmethod # display log message def log(msg): print str(datetime.now()), msg # dump all mapped memory to file def dump_to_file(self, mu, pe, filename, new_ep_rva=None, runable=True): memory_mapped_image = bytearray(mu.mem_read(pe.OPTIONAL_HEADER.ImageBase, pe.OPTIONAL_HEADER.SizeOfImage)) for section in pe.sections: va_adj = pe.adjust_SectionAlignment(section.VirtualAddress, pe.OPTIONAL_HEADER.SectionAlignment, pe.OPTIONAL_HEADER.FileAlignment) if section.Misc_VirtualSize == 0 or section.SizeOfRawData == 0: continue if section.SizeOfRawData > len(memory_mapped_image): continue if pe.adjust_FileAlignment(section.PointerToRawData, pe.OPTIONAL_HEADER.FileAlignment) > len( memory_mapped_image): continue pe.set_bytes_at_rva(va_adj, bytes(memory_mapped_image[va_adj: va_adj + section.SizeOfRawData])) ----- pe te( e a e) # set new entrypoint if new_ep_rva is not None: self.log("New entry point %08x" % new_ep_rva) f = open(filename, 'r+b') f.seek(pe.DOS_HEADER.e_lfanew + 4 + pe.FILE_HEADER.sizeof() + 0x10) f.write(struct.pack(' if not runable: _f.seek(0)_ _f.write('mz')_ _f.close()_ _print ('[+] Save to file {}'.format(filename))_ _@staticmethod_ _def assembler(address, assembly):_ _ks = Ks(KS_ARCH_X86, KS_MODE_32)_ _encoding, _ = ks.asm(assembly, address)_ _return ''.join(chr(e) for e in encoding)_ _# callback for tracing invalid memory access (READ or WRITE)_ _# noinspection PyUnusedLocal_ _@staticmethod_ _def hook_mem_invalid(uc, access, address, size, value, user_data):_ _# return False to indicate we want to stop emulation_ _return False_ _# callback for tracing fake-IAT interrupt_ _# noinspection PyUnusedLocal_ _def hook_intr(self, uc, intno, user_data):_ _# only handle fake-IAT interrupt_ _if intno != 0xff:_ _print ("got interrupt %x ???" % intno)_ _uc.emu_stop()_ _return_ _eax = uc.reg_read(UC_X86_REG_EAX)_ _dll_name, address, name, _ = self.import_addrs[eax]_ _if 'kernel32' in dll_name.lower():_ _if name == 'LoadLibraryA':_ _uc.reg_write(UC_X86_REG_EAX, 0xabababab)_ _elif name == 'GetProcAddress':_ _uc.reg_write(UC_X86_REG_EAX, 0xbcbcbcbc)_ _elif name == 'VirtualProtect':_ _uc.reg_write(UC_X86_REG_EAX, 0x1)_ _# noinspection PyBroadException_ _# noinspection PyUnresolvedReferences_ _def emulate_sality_dll(self, memory):_ _try:_ _pe = pefile.PE(data=memory, fast_load=True)_ _except:_ _return None_ _self.log("[+] Parse Sality DLL")_ _pe.parse_data_directories()_ _self.import_addrs = []_ _for entry in pe.DIRECTORY_ENTRY_IMPORT:_ _for imp in entry.imports:_ _nparam = 1_ _if entry.dll.lower() in 'kernel32.dll':_ _if imp.name == 'LoadLibraryA':_ _nparam = 1_ _elif imp.name == 'GetProcAddress':_ _nparam = 2_ _elif imp.name == 'VirtualProtect':_ _nparam = 4_ ----- _se_ _po t_add s appe d((e t y d,_ _p add ess,_ _p_ _a_ _e,_ _pa a_ _))_ _self.log('[+] Analyze UPX stub code')_ _entry_point_code = str(pe.get_memory_mapped_image())[pe.OPTIONAL_HEADER.AddressOfEntryPoint:]_ _begin_addr = pe.OPTIONAL_HEADER.ImageBase + pe.OPTIONAL_HEADER.AddressOfEntryPoint_ _end_addr = begin_addr_ _for i in self.md.disasm(str(entry_point_code), begin_addr):_ _if i.mnemonic.lower() in ['popad', 'popal', 'popa']:_ _end_addr = i.address + 1_ _break_ _self.log("[+] Initialize emulator in X86-32bit mode")_ _mu = Uc(UC_ARCH_X86, UC_MODE_32)_ _# map memory for this emulation_ _mu.mem_map(pe.OPTIONAL_HEADER.ImageBase, pe.OPTIONAL_HEADER.SizeOfImage)_ _# stack_ _stack_addr = 0x1000_ _stack_size = 0x4000_ _mu.mem_map(stack_addr, stack_size)_ _# write machine code to be emulated to memory_ _mu.mem_write(pe.OPTIONAL_HEADER.ImageBase, pe.get_memory_mapped_image())_ _# initialize machine registers_ _mu.reg_write(UC_X86_REG_ESP, stack_addr + stack_size / 2)_ _# intercept invalid memory events_ _mu.hook_add(UC_HOOK_MEM_READ_UNMAPPED | UC_HOOK_MEM_WRITE_UNMAPPED, self.hook_mem_invalid)_ _# build IAT table_ _iat_addr = 0x10000_ _e = self.assembler(iat_addr, 'mov eax, 1;int 0xff;ret 0xffff')_ _iat_size_adj = pe.adjust_SectionAlignment(len(self.import_addrs) * len(e) + pe.OPTIONAL_HEADER.SectionAlignment,_ _pe.OPTIONAL_HEADER.SectionAlignment, pe.OPTIONAL_HEADER.FileAlignment)_ _mu.mem_map(iat_addr, iat_size_adj)_ _for i in range(len(self.import_addrs)):_ __, iat_entry, _, nparam = self.import_addrs[i]_ _func_addr = iat_addr + i * len(e)_ _if nparam > 1:_ _c = self.assembler(func_addr, 'mov eax, %x;int 0xff;ret %x' % (i, nparam))_ _else:_ _c = self.assembler(func_addr, 'mov eax, %x;int 0xff;ret' % i)_ _mu.mem_write(func_addr, c)_ _mu.mem_write(iat_entry, struct.pack(' # handle interrupt ourselves_ _mu.hook_add(UC_HOOK_INTR, self.hook_intr)_ _self.log('[+] Emulate machine code')_ _mu.emu_start(begin_addr, end_addr)_ _decoded_memory = mu.mem_read(pe.OPTIONAL_HEADER.ImageBase, pe.OPTIONAL_HEADER.SizeOfImage)_ _return decoded_memory_ _@staticmethod_ _def check_sality(code):_ _signature = [(0,_ _'\xE8\x00\x00\x00\x00\x5D\x8B\xC5\x81\xED\x05\x10\x40\x00\x8A\x9D\x73\x27\x40\x00\x84\xDB\x74\x13\x81\xC4'),_ _(0x23,_ _'\x89\x85\x54\x12\x40\x00\xEB\x19\xC7\x85\x4D\x14\x40\x00\x22\x22\x22\x22\xC7\x85\x3A\x14\x40\x00\x33\x33\x33\x33\xE9\x82\x00\x00\x00\x3_ _for offset, s in signature:_ _if s != code[offset:offset + len(s)]:_ _return False_ _return True_ _# callback for tracing instructions_ _# noinspection PyUnusedLocal_ _def hook_code(self, uc, address, size, user_data):_ _# I expect 'retn'_ _if size != 1:_ _return_ _if uc.mem_read(address, size) != '\xc3':_ _return_ ----- _esp_ _uc eg_ ead(UC_ 86__ _G_ S )_ _sality_entrypoint = struct.unpack(' code = uc.mem_read(sality_entrypoint, 0x100)_ _if not self.check_sality(code):_ _return_ _self.detected = True_ _uc.emu_stop()_ _# noinspection PyBroadException_ _def extract(self):_ _if self.sample is None:_ _return_ _self.log("[+] Parse PE File")_ _try:_ _self.sample.seek(0)_ _except:_ _pass_ _pe = pefile.PE(data=self.sample.read(), fast_load=True)_ _self.log("[+] Initialize emulator in X86-32bit mode")_ _mu = Uc(UC_ARCH_X86, UC_MODE_32)_ _# map memory for this emulation_ _mu.mem_map(pe.OPTIONAL_HEADER.ImageBase, pe.OPTIONAL_HEADER.SizeOfImage)_ _# stack_ _stack_addr = 0x1000_ _stack_size = 0x4000_ _mu.mem_map(stack_addr, stack_size)_ _# write machine code to be emulated to memory_ _mu.mem_write(pe.OPTIONAL_HEADER.ImageBase, pe.get_memory_mapped_image())_ _# initialize machine registers_ _mu.reg_write(UC_X86_REG_ESP, stack_addr + stack_size / 2)_ _# tracing all instructions with customized callback_ _mu.hook_add(UC_HOOK_CODE, self.hook_code)_ _# intercept invalid memory events_ _mu.hook_add(UC_HOOK_MEM_READ_UNMAPPED | UC_HOOK_MEM_WRITE_UNMAPPED, self.hook_mem_invalid)_ _self.log('[+] Emulate machine code')_ _begin_addr = pe.OPTIONAL_HEADER.ImageBase + pe.OPTIONAL_HEADER.AddressOfEntryPoint_ _end_addr = pe.OPTIONAL_HEADER.ImageBase + pe.OPTIONAL_HEADER.SizeOfImage_ _try:_ _mu.emu_start(begin_addr, end_addr)_ _except Exception as e:_ _self.log('[-] Emulator error: %s' % e)_ _return_ _if not self.detected:_ _self.log('[-] Sality not found')_ _return_ _self.log("[+] Find Sality section")_ _sality_section_addr = None_ _eip_rva = mu.reg_read(UC_X86_REG_EIP) - pe.OPTIONAL_HEADER.ImageBase_ _for section in pe.sections:_ _va_adj = pe.adjust_SectionAlignment(section.VirtualAddress, pe.OPTIONAL_HEADER.SectionAlignment,_ _pe.OPTIONAL_HEADER.FileAlignment)_ _if va_adj <= eip_rva < va_adj + section.Misc_VirtualSize:_ _sality_section_addr = va_adj_ _break_ _if sality_section_addr is None:_ _self.log("[-] Sality section not found")_ _return_ _mapped_memory = str(mu.mem_read(pe.OPTIONAL_HEADER.ImageBase + sality_section_addr,_ _pe.OPTIONAL_HEADER.SizeOfImage - sality_section_addr))_ _self.detect_control_server(mapped_memory)_ _for m in re.finditer('MZ', mapped_memory):_ _sality_dll = mapped_memory[m.start():]_ _decoded_sality_dll = self.emulate_sality_dll(sality_dll)_ _if decoded_sality_dll is None:_ _continue_ ----- _se detect_co t o _se_ _e (decoded_sa ty_d )_ _if self.output is not None:_ _self.output.write(decoded_sality_dll)_ _self.log("[+] Write Sality DLL to file successfully")_ _self.log("[+] Analyze Sality DLL successfully")_ _def detect_control_server(self, memory):_ _# detect URL_ _urls = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', memory)_ _for _ in urls:_ _self.control_server.append(str(_))_ _def get_args():_ _"""This function parses and return arguments passed in"""_ _# Assign description to the help doc_ _parser = argparse.ArgumentParser(description='Script extracts URLs of Win32-Sality variants from a given file.')_ _# Add arguments_ _parser.add_argument('-z', '--zip', action='store_true')_ _parser.add_argument('-p', '--password', type=str, help='Password to open zip file', required=False,_ _default=None)_ _parser.add_argument('-n', '--name', type=str, help='File name in zip file', required=False, default=None)_ _parser.add_argument('-d', '--dump', type=str, help='Dump sality DLL to file', required=False, default=None)_ _parser.add_argument('file', nargs='?')_ _# Array for all arguments passed to script_ _args = parser.parse_args()_ _file_name = None_ _if args.file is not None and len(args.file) > 0:_ _file_name = args.file_ _# Return all variable values_ _return file_name, args.zip, args.password, args.name, args.dump_ _def main():_ _# Match return values from get_args()_ _# and assign to their respective variables_ _z = None_ _file_name, is_zip, password, name, dump = get_args()_ _if file_name is None:_ _print "Enter file name"_ _return_ _if is_zip:_ _from zipfile import ZipFile_ _z = ZipFile(file_name)_ _f = z.open(name, 'r', password)_ _else:_ _f = open(file_name, 'rb')_ _if dump is not None:_ _d = open(dump, 'wb')_ _else:_ _d = None_ _sd = SalityExtractor(f, d)_ _sd.extract()_ _if len(sd.control_server) > 0:_ _print sd.control_server_ _else:_ _print 'Found nothing'_ _if f is not None:_ _f.close()_ _if z is not None:_ _z.close()_ _if d is not None:_ _d close()_ ----- _if __name__ == '__main__':_ _main()_ -----