重写架构,支持微信4.0

This commit is contained in:
SiYuan
2025-03-28 21:29:18 +08:00
parent fc1e2fa7a5
commit 6535ed011c
388 changed files with 20483 additions and 39576 deletions

View File

@@ -0,0 +1,61 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2025/1/10 2:34
@Author : SiYuan
@Email : 863909694@qq.com
@File : wxManager-__init__.py.py
@Description :
"""
from typing import List
import psutil
from wxManager.decrypt.wx_info_v3 import dump_wechat_info_v3
from wxManager.decrypt.wx_info_v4 import dump_wechat_info_v4
from wxManager.decrypt.common import WeChatInfo
def get_info_v4() -> List[WeChatInfo]:
result_v4 = []
for process in psutil.process_iter(['name', 'exe', 'pid']):
if process.name() == 'Weixin.exe':
wechat_base_address = 0
for module in process.memory_maps(grouped=False):
if module.path and 'Weixin.dll' in module.path:
wechat_base_address = int(module.addr, 16)
break
if wechat_base_address == 0:
continue
pid = process.pid
wxinfo = dump_wechat_info_v4(pid)
result_v4.append(
wxinfo
)
return result_v4
def get_info_v3(version_list) -> List[WeChatInfo]:
result = []
for process in psutil.process_iter(['name', 'exe', 'pid']):
if process.name() == 'WeChat.exe':
pid = process.pid
wxinfo = dump_wechat_info_v3(version_list, pid)
result.append(
wxinfo
)
return result
if __name__ == "__main__":
import json
file_path = r'E:\Project\Python\MemoTrace\resources\data\version_list.json'
with open(file_path, "r", encoding="utf-8") as f:
version_list = json.loads(f.read())
r_4 = get_info_v4()
r_3 = get_info_v3(version_list)
for wx_info in r_4+r_3:
print(wx_info)

View File

@@ -0,0 +1,56 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2025/3/7 16:39
@Author : SiYuan
@Email : 863909694@qq.com
@File : MemoTrace-common.py
@Description :
"""
import psutil
import win32api
if __name__ == '__main__':
pass
def get_version(pid):
p = psutil.Process(pid)
version_info = win32api.GetFileVersionInfo(p.exe(), '\\')
version = f"{win32api.HIWORD(version_info['FileVersionMS'])}.{win32api.LOWORD(version_info['FileVersionMS'])}.{win32api.HIWORD(version_info['FileVersionLS'])}.{win32api.LOWORD(version_info['FileVersionLS'])}"
return version
class WeChatInfo:
def __init__(self):
self.pid = 0
self.version = '0.0.0.0'
self.account_name = ''
self.nick_name = ''
self.phone = ''
self.wx_dir = ''
self.key = ''
self.wxid = ''
self.errcode: int = 404 # 405: 版本不匹配, 404: 重新登录微信, other: 未知错误
self.errmsg: str = '错误!请登录微信。'
def __str__(self):
return f'''
pid: {self.pid}
version: {self.version}
account_name: {self.account_name}
nickname: {self.nick_name}
phone: {self.phone}
wxid: {self.wxid}
wx_dir: {self.wx_dir}
key: {self.key}
'''
def to_json(self):
return {
'version': self.version,
'nickname': self.nick_name,
'wx_dir': self.wx_dir,
'wxid': self.wxid
}

View File

@@ -0,0 +1,307 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2024/12/9 23:44
@Author : SiYuan
@Email : 863909694@qq.com
@File : wxManager-decrypt_dat.py
@Description :
"""
import os
import struct
from typing import List, Tuple
from concurrent.futures import ProcessPoolExecutor
from aiofiles import open as aio_open
from aiofiles.os import makedirs
from Crypto.Cipher import AES
# 图片字节头信息,
# [0][1]为jpg头信息
# [2][3]为png头信息
# [4][5]为gif头信息
pic_head = (0xff, 0xd8, 0x89, 0x50, 0x47, 0x49)
# 解密码
decode_code = 0
decode_code_v4 = -1
def get_code(dat_read):
"""
自动判断文件类型并获取dat文件解密码
:param file_path: dat文件路径
:return: 如果文件为jpg/png/gif格式则返回解密码否则返回-1
"""
try:
if not dat_read:
return -1, -1
head_index = 0
while head_index < len(pic_head):
# 使用第一个头信息字节来计算加密码
# 第二个字节来验证解密码是否正确
code = dat_read[0] ^ pic_head[head_index]
idf_code = dat_read[1] ^ code
head_index = head_index + 1
if idf_code == pic_head[head_index]:
return head_index, code
head_index = head_index + 1
print("not jpg, png, gif")
return -1, -1
except:
return -1, -1
def decode_dat(xor_key: int, file_path, out_path, dst_name='') -> str | bytes:
"""
解密文件,并生成图片
@param file_path: 输入文件路径
@param out_path: 输出文件文件夹
@param dst_name: 输出文件名
:param xor_key: 异或加密密钥
"""
if not os.path.exists(file_path) or os.path.isdir(file_path):
return ''
if not os.path.exists(out_path):
os.makedirs(out_path, exist_ok=True)
if not os.path.isdir(out_path):
return ''
# print(file_path,out_path,dst_name)
with open(file_path, 'rb') as file_in:
data = file_in.read(0xf)
if data.startswith(b'\x07\x08V1\x08\x07'):
# 微信4.0
return decode_dat_v4(xor_key, file_path, out_path, dst_name)
with open(file_path, 'rb') as file_in:
data = file_in.read(2)
file_type, decode_code = get_code(data)
if decode_code == -1:
return ''
filename = os.path.basename(file_path)[:-4] if not dst_name else dst_name
if file_type == 1:
pic_name = filename + ".jpg"
elif file_type == 3:
pic_name = filename + ".png"
elif file_type == 5:
pic_name = filename + ".gif"
else:
pic_name = filename + ".jpg"
file_outpath = os.path.join(out_path, pic_name)
if os.path.exists(file_outpath):
return file_outpath
# 分块读取和写入
buffer_size = 1024 # 定义缓冲区大小
with open(file_outpath, 'wb') as file_out:
file_out.write(bytes([byte ^ decode_code for byte in data]))
while True:
data = file_in.read(buffer_size)
if not data:
break
file_out.write(bytes([byte ^ decode_code for byte in data]))
# print(os.path.basename(file_outpath))
return file_outpath
def get_decode_code_v4(wx_dir):
cache_dir = os.path.join(wx_dir, 'cache')
if not os.path.isdir(wx_dir) or not os.path.exists(cache_dir):
raise ValueError(f'微信路径输入错误,请检查:{wx_dir}')
ok_flag = False
for root, dirs, files in os.walk(cache_dir):
if ok_flag:
break
for file in files:
if file.endswith(".dat"):
# 构造源文件和目标文件的完整路径
src_file_path = os.path.join(root, file)
with open(src_file_path, 'rb') as f:
data = f.read()
if not data.startswith(b'\x07\x08V1\x08\x07'):
continue
file_tail = data[-2:]
jpg_known_tail = b'\xff\xd9'
# 推导出密钥
xor_key = [c ^ p for c, p in zip(file_tail, jpg_known_tail)]
if len(set(xor_key)) == 1:
print(f'[*] 找到异或密钥: 0x{xor_key[0]:x}')
return xor_key[0]
return -1
def get_image_type(data: bytes) -> str:
"""
根据文件头字节判断图片类型
:param data: 文件头数据(通常至少需要前 10 个字节)
:return: 图片类型(扩展名),默认为 'bin'
"""
if data.startswith(b'\xff\xd8\xff'):
return 'jpg' # JPEG 文件
elif data.startswith(b'\x89PNG\r\n\x1a\n'):
return 'png' # PNG 文件
elif data.startswith(b'GIF87a') or data.startswith(b'GIF89a'):
return 'gif' # GIF 文件
elif data.startswith(b'BM'):
return 'bmp' # BMP 文件
elif data.startswith(b'II*\x00') or data.startswith(b'MM\x00*'):
return 'tiff' # TIFF 文件
elif data.startswith(b'RIFF') and data[8:12] == b'WEBP':
return 'webp' # WEBP 文件
elif data.startswith(b'\x00\x00\x01\x00'):
return 'ico' # ICO 文件
else:
return 'bin' # 未知类型,返回二进制
def decode_dat_v4(xor_key: int, file_path, out_path, dst_name='') -> str | bytes:
"""
适用于微信4.0图片.dat解密文件并生成图片
:param xor_key: int 异或密钥
:param file_path: dat文件路径
:param out_path: 输出文件夹
:param dst_name: 输出文件名,默认为输入文件名
:return:
"""
if not os.path.exists(file_path) or os.path.isdir(file_path):
return ''
# 读取加密文件的内容
with open(file_path, 'rb') as f:
header = f.read(0xf)
encrypt_length = struct.unpack_from('<H', header, 6)[0]
encrypt_length0 = encrypt_length // 16 * 16 + 16
encrypted_data = f.read(encrypt_length0)
res_data = f.read()
# 如果数据不是16的倍数填充0
if len(encrypted_data) % 16 != 0:
padding_length = 16 - (len(encrypted_data) % 16)
encrypted_data += b'\x00' * padding_length
aes_key = b'cfcd208495d565ef'
# 初始化AES解密器ECB模式
cipher = AES.new(aes_key, AES.MODE_ECB)
# 解密数据
decrypted_data = cipher.decrypt(encrypted_data)
# 获取图片后缀名
image_type = get_image_type(decrypted_data[:10])
output_file_name = os.path.basename(file_path)[:-4] if not dst_name else dst_name
output_file = os.path.join(out_path, output_file_name + '.' + image_type)
if os.path.exists(output_file):
return output_file
# 移除填充假设使用的是PKCS7或PKCS5填充
pad_length = decrypted_data[-1] # 获取填充长度
decrypted_data = decrypted_data[:-pad_length]
# 将解密后的数据写入输出文件
with open(output_file, 'wb') as f:
f.write(decrypted_data)
f.write(res_data[0:-0x100000])
f.write(bytes([byte ^ xor_key for byte in res_data[-0x100000:]]))
# print(f"解密完成,已保存到: {output_file}")
return output_file
async def decode_dat_v4_async(xor_key: int, file_path, out_path, dst_name='') -> str:
"""
异步版本的微信4.0图片 .dat 文件解密器
:param xor_key: int 异或密钥
:param file_path: .dat 文件路径
:param out_path: 输出文件夹
:param dst_name: 输出文件名,默认为输入文件名
:return: 解密后的文件路径
"""
if not os.path.exists(file_path):
return ''
# 确保输出目录存在
await makedirs(out_path, exist_ok=True)
# 读取加密文件的内容
async with aio_open(file_path, 'rb') as f:
header = await f.read(0xf)
encrypt_length = struct.unpack_from('<H', header, 6)[0]
encrypt_length0 = encrypt_length // 16 * 16 + 16
encrypted_data = await f.read(encrypt_length0)
res_data = await f.read()
aes_key = b'cfcd208495d565ef'
# 初始化AES解密器ECB模式
cipher = AES.new(aes_key, AES.MODE_ECB)
# 解密数据
decrypted_data = cipher.decrypt(encrypted_data)
# 获取图片后缀名
image_type = get_image_type(decrypted_data[:10])
output_file_name = os.path.basename(file_path)[:-4] if not dst_name else dst_name
output_file = os.path.join(out_path, output_file_name + '.' + image_type)
if os.path.exists(output_file):
return output_file
# 移除填充假设使用的是PKCS7或PKCS5填充
pad_length = decrypted_data[-1] # 获取填充长度
decrypted_data = decrypted_data[:-pad_length]
# 将解密后的数据写入输出文件
async with aio_open(output_file, 'wb') as f:
await f.write(decrypted_data)
await f.write(res_data[:-0x100000])
await f.write(bytes([byte ^ xor_key for byte in res_data[-0x100000:]]))
print(f"解密完成,已保存到: {output_file}")
return output_file
def decode_wrapper(tasks):
"""用于包装解码函数的顶层定义"""
# results = []
# for args in tasks:
# results.append(decode_dat(*args))
# return results
return decode_dat(*tasks)
def batch_decode_image_multiprocessing(xor_key, file_infos: List[Tuple[str, str, str]]):
"""
:param xor_key: 异或加密密钥
:param file_infos: 文件信息列表
item: [input_path: 输入图片路径
output_dir: 输出图片文件夹
dst_name: 输出文件名]
:return:
"""
if len(file_infos) < 1:
return
def split_list(lst, n):
k, m = divmod(len(lst), n)
return [lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n)]
with ProcessPoolExecutor(max_workers=10) as executor:
tasks = [(xor_key, file_path, out_path, file_name) for file_path, out_path, file_name in file_infos]
# print(len(split_list(tasks, 10)), '总任务数', len(file_infos))
results = list(executor.map(decode_wrapper, tasks, chunksize=200)) # 使用顶层定义的函数
return results
if __name__ == '__main__':
wx_dir = ''
xor_key = get_decode_code_v4(wx_dir)
dat_file = "2_1730948126.dat"
decode_dat_v4(xor_key, dat_file, '.', dst_name='解密后的图片')

View File

@@ -0,0 +1,111 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: getwxinfo.py
# Description:
# Author: xaoyaoo
# Date: 2023/08/21
# 微信数据库采用的加密算法是256位的AES-CBC。数据库的默认的页大小是4096字节即4KB其中每一个页都是被单独加解密的。
# 加密文件的每一个页都有一个随机的初始化向量,它被保存在每一页的末尾。
# 加密文件的每一页都存有着消息认证码算法使用的是HMAC-SHA1安卓数据库使用的是SHA512。它也被保存在每一页的末尾。
# 每一个数据库文件的开头16字节都保存了一段唯一且随机的盐值作为HMAC的验证和数据的解密。
# 用来计算HMAC的key与解密的key是不同的解密用的密钥是主密钥和之前提到的16字节的盐值通过PKCS5_PBKF2_HMAC1密钥扩展算法迭代64000次计算得到的。而计算HMAC的密钥是刚提到的解密密钥和16字节盐值异或0x3a的值通过PKCS5_PBKF2_HMAC1密钥扩展算法迭代2次计算得到的。
# 为了保证数据部分长度是16字节即AES块大小的整倍数每一页的末尾将填充一段空字节使得保留字段的长度为48字节。
# 综上加密文件结构为第一页4KB数据前16字节为盐值紧接着4032字节数据再加上16字节IV和20字节HMAC以及12字节空字节而后的页均是4048字节长度的加密数据段和48字节的保留段。
# -------------------------------------------------------------------------------
import argparse
import hmac
import hashlib
import os
import traceback
from typing import Union, List
from Crypto.Cipher import AES
from wxManager.log import logger
SQLITE_FILE_HEADER = "SQLite format 3\x00" # SQLite文件头
KEY_SIZE = 32
DEFAULT_PAGESIZE = 4096
DEFAULT_ITER = 64000
# 通过密钥解密数据库
def decrypt_db_file_v3(key: str, db_path, out_path):
"""
通过密钥解密数据库
:param key: 密钥 64位16进制字符串
:param db_path: 待解密的数据库路径(必须是文件)
:param out_path: 解密后的数据库输出路径(必须是文件)
:return:
"""
if not os.path.exists(db_path) or not os.path.isfile(db_path):
return False, f"[-] db_path:'{db_path}' File not found!"
if not os.path.exists(os.path.dirname(out_path)):
return False, f"[-] out_path:'{out_path}' File not found!"
if len(key) != 64:
return False, f"[-] key:'{key}' Len Error!"
password = bytes.fromhex(key.strip())
try:
with open(db_path, "rb") as file:
blist = file.read()
except:
logger.error(traceback.format_exc())
logger.info(db_path + '->' + out_path)
return False, 'error'
salt = blist[:16]
byteKey = hashlib.pbkdf2_hmac("sha1", password, salt, DEFAULT_ITER, KEY_SIZE)
first = blist[16:DEFAULT_PAGESIZE]
if len(salt) != 16:
return False, f"[-] db_path:'{db_path}' File Error!"
mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1)
hash_mac.update(b'\x01\x00\x00\x00')
if hash_mac.digest() != first[-32:-12]:
return False, f"[-] Key Error! (db_path:'{db_path}' )"
newblist = [blist[i:i + DEFAULT_PAGESIZE] for i in range(DEFAULT_PAGESIZE, len(blist), DEFAULT_PAGESIZE)]
with open(out_path, "wb") as deFile:
deFile.write(SQLITE_FILE_HEADER.encode())
t = AES.new(byteKey, AES.MODE_CBC, first[-48:-32])
decrypted = t.decrypt(first[:-48])
deFile.write(decrypted)
deFile.write(first[-48:])
for i in newblist:
t = AES.new(byteKey, AES.MODE_CBC, i[-48:-32])
decrypted = t.decrypt(i[:-48])
deFile.write(decrypted)
deFile.write(i[-48:])
return True, [db_path, out_path, key]
def decrypt_db_files(key, src_dir: str, dest_dir: str):
if not os.path.exists(src_dir):
print(f"源文件夹 {src_dir} 不存在")
return
if not os.path.exists(dest_dir):
os.makedirs(dest_dir) # 如果目标文件夹不存在,创建它
for root, dirs, files in os.walk(src_dir):
for file in files:
if file.endswith(".db"):
# 构造源文件和目标文件的完整路径
src_file_path = os.path.join(root, file)
# 计算目标路径,保持子文件夹结构
relative_path = os.path.relpath(root, src_dir)
dest_sub_dir = os.path.join(dest_dir, relative_path)
dest_file_path = os.path.join(dest_sub_dir, file)
# 确保目标子文件夹存在
if not os.path.exists(dest_sub_dir):
os.makedirs(dest_sub_dir)
print(dest_file_path)
decrypt_db_file_v3(key, src_file_path, dest_file_path)

View File

@@ -0,0 +1,127 @@
import hmac
import os
import struct
from Crypto.Cipher import AES
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Hash import SHA512
# Constants
IV_SIZE = 16
HMAC_SHA256_SIZE = 64
KEY_SIZE = 32
AES_BLOCK_SIZE = 16
ROUND_COUNT = 256000
PAGE_SIZE = 4096
SALT_SIZE = 16
SQLITE_HEADER = b"SQLite format 3"
def decrypt_db_file_v4(pkey, in_db_path, out_db_path):
if not os.path.exists(in_db_path):
print(f"【!!!】{in_db_path} does not exist.")
return False
with open(in_db_path, 'rb') as f_in, open(out_db_path, 'wb') as f_out:
# Read salt from the first SALT_SIZE bytes
salt = f_in.read(SALT_SIZE)
if not salt:
print("File is empty or corrupted.")
return False
mac_salt = bytes(x ^ 0x3a for x in salt)
# Convert pkey from hex to bytes
passphrase = bytes.fromhex(pkey)
# Use PBKDF2 to derive key and mac_key
key = PBKDF2(passphrase, salt, dkLen=KEY_SIZE, count=ROUND_COUNT, hmac_hash_module=SHA512)
mac_key = PBKDF2(key, mac_salt, dkLen=KEY_SIZE, count=2, hmac_hash_module=SHA512)
# Write SQLITE_HEADER to the output file
f_out.write(SQLITE_HEADER)
f_out.write(b'\x00')
# Reserve space for IV_SIZE + HMAC_SHA256_SIZE, rounded to a multiple of AES_BLOCK_SIZE
reserve = IV_SIZE + HMAC_SHA256_SIZE
reserve = ((reserve + AES_BLOCK_SIZE - 1) // AES_BLOCK_SIZE) * AES_BLOCK_SIZE
# Process each page
cur_page = 0
while True:
# For the first page, include SALT_SIZE adjustment
if cur_page == 0:
# Read one full PAGE_SIZE starting from after the salt
page = f_in.read(PAGE_SIZE - SALT_SIZE)
if not page:
break # No more data
page = salt + page # Include the salt in the first page data
else:
page = f_in.read(PAGE_SIZE)
if not page:
break # End of file
# print(f'第{cur_page + 1}页')
offset = SALT_SIZE if cur_page == 0 else 0
end = len(page)
# If the page is all zero bytes, append it directly and exit
if all(x == 0 for x in page):
f_out.write(page)
print("Exiting early due to zeroed page.")
break
# Perform HMAC check
mac = hmac.new(mac_key, page[offset:end - reserve + IV_SIZE], SHA512)
mac.update(struct.pack('<I', cur_page + 1)) # Add page number
hash_mac = mac.digest()
# Check if HMAC matches
hash_mac_start_offset = end - reserve + IV_SIZE
if hash_mac != page[hash_mac_start_offset:hash_mac_start_offset + len(hash_mac)]:
print(f'Key error: {key}')
return None
raise ValueError("Hash verification failed")
# AES-256-CBC decryption
iv = page[end - reserve:end - reserve + IV_SIZE]
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted_data = cipher.decrypt(page[offset:end - reserve])
# Remove padding
pad_len = decrypted_data[-1]
# decrypted_data = decrypted_data[:-pad_len]
# Write decrypted data and HMAC/IV to output
f_out.write(decrypted_data)
f_out.write(page[end - reserve:end])
cur_page += 1
print("Decryption completed.")
return True
def decrypt_db_files(key, src_dir: str, dest_dir: str):
if not os.path.exists(src_dir):
print(f"源文件夹 {src_dir} 不存在")
return
if not os.path.exists(dest_dir):
os.makedirs(dest_dir) # 如果目标文件夹不存在,创建它
for root, dirs, files in os.walk(src_dir):
for file in files:
if file.endswith(".db"):
# 构造源文件和目标文件的完整路径
src_file_path = os.path.join(root, file)
# 计算目标路径,保持子文件夹结构
relative_path = os.path.relpath(root, src_dir)
dest_sub_dir = os.path.join(dest_dir, relative_path)
dest_file_path = os.path.join(dest_sub_dir, file)
# 确保目标子文件夹存在
if not os.path.exists(dest_sub_dir):
os.makedirs(dest_sub_dir)
print(dest_file_path)
decrypt_db_file_v4(key, src_file_path, dest_file_path)

View File

@@ -0,0 +1,259 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: get_base_addr.py
# Description:
# Author: xaoyaoo
# Date: 2023/08/22
# License: https://github.com/xaoyaoo/PyWxDump/blob/3b794bcb47b0457d1245ce5b4cfec61b74524073/LICENSE MIT
# -------------------------------------------------------------------------------
import argparse
import ctypes
import hashlib
import json
import multiprocessing
import os
import re
import sys
import psutil
from win32com.client import Dispatch
from pymem import Pymem
import pymem
import hmac
ReadProcessMemory = ctypes.windll.kernel32.ReadProcessMemory
void_p = ctypes.c_void_p
KEY_SIZE = 32
DEFAULT_PAGESIZE = 4096
DEFAULT_ITER = 64000
def validate_key(key, salt, first, mac_salt):
byteKey = hashlib.pbkdf2_hmac("sha1", key, salt, DEFAULT_ITER, KEY_SIZE)
mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1)
hash_mac.update(b'\x01\x00\x00\x00')
if hash_mac.digest() == first[-32:-12]:
return True
else:
return False
def get_exe_bit(file_path):
"""
获取 PE 文件的位数: 32 位或 64 位
:param file_path: PE 文件路径(可执行文件)
:return: 如果遇到错误则返回 64
"""
try:
with open(file_path, 'rb') as f:
dos_header = f.read(2)
if dos_header != b'MZ':
print('get exe bit error: Invalid PE file')
return 64
# Seek to the offset of the PE signature
f.seek(60)
pe_offset_bytes = f.read(4)
pe_offset = int.from_bytes(pe_offset_bytes, byteorder='little')
# Seek to the Machine field in the PE header
f.seek(pe_offset + 4)
machine_bytes = f.read(2)
machine = int.from_bytes(machine_bytes, byteorder='little')
if machine == 0x14c:
return 32
elif machine == 0x8664:
return 64
else:
print('get exe bit error: Unknown architecture: %s' % hex(machine))
return 64
except IOError:
print('get exe bit error: File not found or cannot be opened')
return 64
def get_exe_version(file_path):
"""
获取 PE 文件的版本号
:param file_path: PE 文件路径(可执行文件)
:return: 如果遇到错误则返回
"""
file_version = Dispatch("Scripting.FileSystemObject").GetFileVersion(file_path)
return file_version
def find_all(c: bytes, string: bytes, base_addr=0):
"""
查找字符串中所有子串的位置
:param c: 子串 b'123'
:param string: 字符串 b'123456789123'
:return:
"""
return [base_addr + m.start() for m in re.finditer(re.escape(c), string)]
class BiasAddr:
def __init__(self, account, mobile, name, key, db_path):
self.account = account.encode("utf-8")
self.mobile = mobile.encode("utf-8")
self.name = name.encode("utf-8")
self.key = bytes.fromhex(key) if key else b""
self.db_path = db_path if db_path and os.path.exists(db_path) else ""
self.process_name = "WeChat.exe"
self.module_name = "WeChatWin.dll"
self.pm = None # Pymem 对象
self.is_WoW64 = None # True: 32位进程运行在64位系统上 False: 64位进程运行在64位系统上
self.process_handle = None # 进程句柄
self.pid = None # 进程ID
self.version = None # 微信版本号
self.process = None # 进程对象
self.exe_path = None # 微信路径
self.address_len = None # 4 if self.bits == 32 else 8 # 4字节或8字节
self.bits = 64 if sys.maxsize > 2 ** 32 else 32 # 系统32位或64位
def get_process_handle(self):
try:
self.pm = Pymem(self.process_name)
self.pm.check_wow64()
self.is_WoW64 = self.pm.is_WoW64
self.process_handle = self.pm.process_handle
self.pid = self.pm.process_id
self.process = psutil.Process(self.pid)
self.exe_path = self.process.exe()
self.version = get_exe_version(self.exe_path)
version_nums = list(map(int, self.version.split("."))) # 将版本号拆分为数字列表
if version_nums[0] <= 3 and version_nums[1] <= 9 and version_nums[2] <= 2:
self.address_len = 4
else:
self.address_len = 8
return True, ""
except pymem.exception.ProcessNotFound:
return False, "[-] WeChat No Run"
def search_memory_value(self, value: bytes, module_name="WeChatWin.dll"):
# 创建 Pymem 对象
module = pymem.process.module_from_name(self.pm.process_handle, module_name)
ret = self.pm.pattern_scan_module(value, module, return_multiple=True)
ret = ret[-1] - module.lpBaseOfDll if len(ret) > 0 else 0
return ret
def get_key_bias1(self):
try:
byteLen = self.address_len # 4 if self.bits == 32 else 8 # 4字节或8字节
keyLenOffset = 0x8c if self.bits == 32 else 0xd0
keyWindllOffset = 0x90 if self.bits == 32 else 0xd8
module = pymem.process.module_from_name(self.process_handle, self.module_name)
keyBytes = b'-----BEGIN PUBLIC KEY-----\n...'
publicKeyList = pymem.pattern.pattern_scan_all(self.process_handle, keyBytes, return_multiple=True)
keyaddrs = []
for addr in publicKeyList:
keyBytes = addr.to_bytes(byteLen, byteorder="little", signed=True) # 低位在前
may_addrs = pymem.pattern.pattern_scan_module(self.process_handle, module, keyBytes,
return_multiple=True)
if may_addrs != 0 and len(may_addrs) > 0:
for addr in may_addrs:
keyLen = self.pm.read_uchar(addr - keyLenOffset)
if keyLen != 32:
continue
keyaddrs.append(addr - keyWindllOffset)
return keyaddrs[-1] - module.lpBaseOfDll if len(keyaddrs) > 0 else 0
except:
return 0
def search_key(self, key: bytes):
key = re.escape(key) # 转义特殊字符
key_addr = self.pm.pattern_scan_all(key, return_multiple=False)
key = key_addr.to_bytes(self.address_len, byteorder='little', signed=True)
result = self.search_memory_value(key, self.module_name)
return result
def get_key_bias2(self, wx_db_path):
addr_len = get_exe_bit(self.exe_path) // 8
db_path = wx_db_path
def read_key_bytes(h_process, address, address_len=8):
array = ctypes.create_string_buffer(address_len)
if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址key地址
key = ctypes.create_string_buffer(32)
if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None"
key_bytes = bytes(key)
return key_bytes
def verify_key(key, wx_db_path):
KEY_SIZE = 32
DEFAULT_PAGESIZE = 4096
DEFAULT_ITER = 64000
with open(wx_db_path, "rb") as file:
blist = file.read(5000)
salt = blist[:16]
byteKey = hashlib.pbkdf2_hmac("sha1", key, salt, DEFAULT_ITER, KEY_SIZE)
first = blist[16:DEFAULT_PAGESIZE]
mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1)
hash_mac.update(b'\x01\x00\x00\x00')
if hash_mac.digest() != first[-32:-12]:
return False
return True
phone_type1 = "iphone\x00"
phone_type2 = "android\x00"
phone_type3 = "ipad\x00"
pm = pymem.Pymem("WeChat.exe")
module_name = "WeChatWin.dll"
MicroMsg_path = os.path.join(db_path, "MSG", "MicroMsg.db")
module = pymem.process.module_from_name(pm.process_handle, module_name)
type1_addrs = pm.pattern_scan_module(phone_type1.encode(), module, return_multiple=True)
type2_addrs = pm.pattern_scan_module(phone_type2.encode(), module, return_multiple=True)
type3_addrs = pm.pattern_scan_module(phone_type3.encode(), module, return_multiple=True)
type_addrs = type1_addrs if len(type1_addrs) >= 2 else type2_addrs if len(
type2_addrs) >= 2 else type3_addrs if len(type3_addrs) >= 2 else "None"
if type_addrs == "None":
return 0
for i in type_addrs[::-1]:
for j in range(i, i - 2000, -addr_len):
key_bytes = read_key_bytes(pm.process_handle, j, addr_len)
if key_bytes == "None":
continue
# if verify_key(key_bytes, MicroMsg_path):
return j - module.lpBaseOfDll
return 0
def run(self, logging_path=False, version_list_path=None):
if not self.get_process_handle()[0]:
return {}
mobile_bias = self.search_memory_value(self.mobile, self.module_name)
name_bias = self.search_memory_value(self.name, self.module_name)
account_bias = self.search_memory_value(self.account, self.module_name)
key_bias = 0
key_bias = self.get_key_bias1()
key_bias = self.search_key(self.key) if key_bias <= 0 and self.key else key_bias
key_bias = self.get_key_bias2(self.db_path) if key_bias <= 0 and self.db_path else key_bias
rdata = {self.version: [name_bias, account_bias, mobile_bias, 0, key_bias]}
return rdata
def get_info_without_key(h_process, address, n_size=64):
array = ctypes.create_string_buffer(n_size)
if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None"
array = bytes(array).split(b"\x00")[0] if b"\x00" in array else bytes(array)
text = array.decode('utf-8', errors='ignore')
return text.strip() if text.strip() != "" else "None"

View File

@@ -0,0 +1,329 @@
import os
import sys
import hmac
import hashlib
import ctypes
import winreg
import pymem
import pythoncom
from win32com.client import Dispatch
import psutil
import pymem.process
from wxManager.decrypt.wx_info_v4 import dump_wechat_info_v4
from wxManager.decrypt import WeChatInfo
from wxManager.decrypt.common import get_version
ReadProcessMemory = ctypes.windll.kernel32.ReadProcessMemory
void_p = ctypes.c_void_p
# 获取exe文件的位数
def get_exe_bit(file_path):
"""
获取 PE 文件的位数: 32 位或 64 位
:param file_path: PE 文件路径(可执行文件)
:return: 如果遇到错误则返回 64
"""
try:
with open(file_path, 'rb') as f:
dos_header = f.read(2)
if dos_header != b'MZ':
print('get exe bit error: Invalid PE file')
return 64
# Seek to the offset of the PE signature
f.seek(60)
pe_offset_bytes = f.read(4)
pe_offset = int.from_bytes(pe_offset_bytes, byteorder='little')
# Seek to the Machine field in the PE header
f.seek(pe_offset + 4)
machine_bytes = f.read(2)
machine = int.from_bytes(machine_bytes, byteorder='little')
if machine == 0x14c:
return 32
elif machine == 0x8664:
return 64
else:
print('get exe bit error: Unknown architecture: %s' % hex(machine))
return 64
except IOError:
print('get exe bit error: File not found or cannot be opened')
return 64
# 读取内存中的字符串(非key部分)
def get_info_without_key(h_process, address, n_size=64):
array = ctypes.create_string_buffer(n_size)
if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None"
array = bytes(array).split(b"\x00")[0] if b"\x00" in array else bytes(array)
text = array.decode('utf-8', errors='ignore')
return text.strip() if text.strip() != "" else "None"
def pattern_scan_all(handle, pattern, *, return_multiple=False, find_num=100):
next_region = 0
found = []
user_space_limit = 0x7FFFFFFF0000 if sys.maxsize > 2 ** 32 else 0x7fff0000
while next_region < user_space_limit:
try:
next_region, page_found = pymem.pattern.scan_pattern_page(
handle,
next_region,
pattern,
return_multiple=return_multiple
)
except Exception as e:
print(e)
break
if not return_multiple and page_found:
return page_found
if page_found:
found += page_found
if len(found) > find_num:
break
return found
def get_info_wxid(h_process):
find_num = 100
addrs = pattern_scan_all(h_process, br'\\Msg\\FTSContact', return_multiple=True, find_num=find_num)
wxids = []
for addr in addrs:
array = ctypes.create_string_buffer(80)
if ReadProcessMemory(h_process, void_p(addr - 30), array, 80, 0) == 0: return "None"
array = bytes(array) # .split(b"\\")[0]
array = array.split(b"\\Msg")[0]
array = array.split(b"\\")[-1]
wxids.append(array.decode('utf-8', errors='ignore'))
wxid = max(wxids, key=wxids.count) if wxids else "None"
return wxid
def get_wx_dir(wxid):
if not wxid:
return ''
try:
is_w_dir = False
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Tencent\WeChat", 0, winreg.KEY_READ)
value, _ = winreg.QueryValueEx(key, "FileSavePath")
winreg.CloseKey(key)
w_dir = value
is_w_dir = True
except Exception as e:
w_dir = "MyDocument:"
if not is_w_dir:
try:
user_profile = os.environ.get("USERPROFILE")
path_3ebffe94 = os.path.join(user_profile, "AppData", "Roaming", "Tencent", "WeChat", "All Users",
"config",
"3ebffe94.ini")
with open(path_3ebffe94, "r", encoding="utf-8") as f:
w_dir = f.read()
is_w_dir = True
except Exception as e:
w_dir = "MyDocument:"
if w_dir == "MyDocument:":
try:
# 打开注册表路径
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders")
documents_path = winreg.QueryValueEx(key, "Personal")[0] # 读取文档实际目录路径
winreg.CloseKey(key) # 关闭注册表
documents_paths = os.path.split(documents_path)
if "%" in documents_paths[0]:
w_dir = os.environ.get(documents_paths[0].replace("%", ""))
w_dir = os.path.join(w_dir, os.path.join(*documents_paths[1:]))
# print(1, w_dir)
else:
w_dir = documents_path
except Exception as e:
profile = os.environ.get("USERPROFILE")
w_dir = os.path.join(profile, "Documents")
msg_dir = os.path.join(w_dir, "WeChat Files", wxid)
return msg_dir
except FileNotFoundError:
return ''
def get_key(db_path, addr_len):
def read_key_bytes(h_process, address, address_len=8):
array = ctypes.create_string_buffer(address_len)
if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址key地址
key = ctypes.create_string_buffer(32)
if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None"
key_bytes = bytes(key)
return key_bytes
def verify_key(key, wx_db_path):
if not wx_db_path or wx_db_path.lower() == "none":
return True
KEY_SIZE = 32
DEFAULT_PAGESIZE = 4096
DEFAULT_ITER = 64000
with open(wx_db_path, "rb") as file:
blist = file.read(5000)
salt = blist[:16]
byteKey = hashlib.pbkdf2_hmac("sha1", key, salt, DEFAULT_ITER, KEY_SIZE)
first = blist[16:DEFAULT_PAGESIZE]
mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1)
hash_mac.update(b'\x01\x00\x00\x00')
if hash_mac.digest() != first[-32:-12]:
return False
return True
phone_type1 = "iphone\x00"
phone_type2 = "android\x00"
phone_type3 = "ipad\x00"
pm = pymem.Pymem("WeChat.exe")
module_name = "WeChatWin.dll"
MicroMsg_path = os.path.join(db_path, "MSG", "MicroMsg.db")
type1_addrs = pm.pattern_scan_module(phone_type1.encode(), module_name, return_multiple=True)
type2_addrs = pm.pattern_scan_module(phone_type2.encode(), module_name, return_multiple=True)
type3_addrs = pm.pattern_scan_module(phone_type3.encode(), module_name, return_multiple=True)
type_addrs = type1_addrs if len(type1_addrs) >= 2 else type2_addrs if len(type2_addrs) >= 2 else type3_addrs if len(
type3_addrs) >= 2 else "None"
# print(type_addrs)
if type_addrs == "None":
return "None"
for i in type_addrs[::-1]:
for j in range(i, i - 2000, -addr_len):
key_bytes = read_key_bytes(pm.process_handle, j, addr_len)
if key_bytes == "None":
continue
if db_path != "None" and verify_key(key_bytes, MicroMsg_path):
return key_bytes.hex()
return "None"
# 读取微信信息(account,mobile,name,mail,wxid,key)
def read_info(version_list):
result = []
default_res = {
'wxid': '',
'name': '',
'account': '',
'key': '',
'mobile': '',
'version': '',
'wx_dir': '',
'errcode': 404,
'errmsg': '错误!请登录微信。'
}
error = ""
for process in psutil.process_iter(['name', 'exe', 'pid']):
if process.name() == 'WeChat.exe':
tmp_rd = {}
pythoncom.CoInitialize()
tmp_rd['pid'] = process.pid
try:
tmp_rd['version'] = Dispatch("Scripting.FileSystemObject").GetFileVersion(process.exe())
except:
try:
tmp_rd['version'] = get_version(process.pid)
except:
tmp_rd['version'] = '3'
wechat_base_address = 0
for module in process.memory_maps(grouped=False):
if module.path and 'WeChatWin.dll' in module.path:
wechat_base_address = int(module.addr, 16)
break
if wechat_base_address == 0:
error = f"[-] WeChat WeChatWin.dll Not Found"
default_res['errmsg'] = '错误!请登录微信。'
return [default_res]
Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, process.pid)
bias_list = version_list.get(tmp_rd['version'])
if not isinstance(bias_list, list) or len(bias_list) <= 4:
default_res['version'] = tmp_rd['version']
default_res['errcode'] = 405
default_res['errmsg'] = '错误!微信版本不匹配,请手动填写信息。'
return [default_res]
else:
name_base_address = wechat_base_address + bias_list[0]
account__base_address = wechat_base_address + bias_list[1]
mobile_base_address = wechat_base_address + bias_list[2]
mail_base_address = wechat_base_address + bias_list[3]
# key_base_address = wechat_base_address + bias_list[4]
tmp_rd['account'] = get_info_without_key(Handle, account__base_address, 32) if bias_list[1] != 0 else "None"
tmp_rd['mobile'] = get_info_without_key(Handle, mobile_base_address, 64) if bias_list[2] != 0 else "None"
tmp_rd['name'] = get_info_without_key(Handle, name_base_address, 64) if bias_list[0] != 0 else "None"
tmp_rd['mail'] = get_info_without_key(Handle, mail_base_address, 64) if bias_list[3] != 0 else "None"
addrLen = get_exe_bit(process.exe()) // 8
tmp_rd['wxid'] = get_info_wxid(Handle)
tmp_rd['wx_dir'] = get_wx_dir(tmp_rd['wxid']) if tmp_rd['wxid'] != "None" else "None"
tmp_rd['key'] = "None"
tmp_rd['key'] = get_key(tmp_rd['wx_dir'], addrLen)
if tmp_rd['key'] == 'None':
tmp_rd['errcode'] = 404
tmp_rd['errmsg'] = '请重启微信后重试。'
else:
tmp_rd['errcode'] = 200
result.append(tmp_rd)
return result
def get_info_v4():
result_v4 = []
for process in psutil.process_iter(['name', 'exe', 'pid']):
if process.name() == 'Weixin.exe':
wechat_base_address = 0
for module in process.memory_maps(grouped=False):
if module.path and 'Weixin.dll' in module.path:
wechat_base_address = int(module.addr, 16)
break
if wechat_base_address == 0:
continue
pid = process.pid
wxinfo = dump_wechat_info_v4(pid)
result_v4.append(
{
'wxid': wxinfo.wxid,
'name': wxinfo.nick_name,
'account': wxinfo.account_name,
'key': wxinfo.key,
'mobile': wxinfo.phone,
'version': wxinfo.version,
'wx_dir': wxinfo.wx_dir,
'errcode': 200
}
)
return result_v4
def get_info_v3(version_list):
return read_info(version_list) # 读取微信信息
def get_info(version_list):
result_v3 = read_info(version_list) # 读取微信信息
result_v4 = get_info_v4()
print(result_v3 + result_v4)
return result_v3 + result_v4
if __name__ == "__main__":
import json
file_path = r'E:\Project\Python\MemoTrace\resources\data\version_list.json'
with open(file_path, "r", encoding="utf-8") as f:
version_list = json.loads(f.read())
wx_info = get_info_v3(version_list)
print(wx_info)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,263 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2025/3/7 16:30
@Author : SiYuan
@Email : 863909694@qq.com
@File : MemoTrace-wx_info_v3.py
@Description :
"""
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: getwxinfo.py
# Description:
# Author: xaoyaoo
# Date: 2023/08/21
# -------------------------------------------------------------------------------
import os
import sys
import hmac
import hashlib
import ctypes
import winreg
import pymem
import pythoncom
import psutil
import pymem.process
from wxManager.decrypt.common import WeChatInfo
from wxManager.decrypt.common import get_version
ReadProcessMemory = ctypes.windll.kernel32.ReadProcessMemory
void_p = ctypes.c_void_p
def get_exe_bit(file_path):
try:
with open(file_path, 'rb') as f:
dos_header = f.read(2)
if dos_header != b'MZ':
print('get exe bit error: Invalid PE file')
return 64
# Seek to the offset of the PE signature
f.seek(60)
pe_offset_bytes = f.read(4)
pe_offset = int.from_bytes(pe_offset_bytes, byteorder='little')
# Seek to the Machine field in the PE header
f.seek(pe_offset + 4)
machine_bytes = f.read(2)
machine = int.from_bytes(machine_bytes, byteorder='little')
if machine == 0x14c:
return 32
elif machine == 0x8664:
return 64
else:
return 64
except:
return 64
def get_info_without_key(h_process, address, n_size=64):
array = ctypes.create_string_buffer(n_size)
if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None"
array = bytes(array).split(b"\x00")[0] if b"\x00" in array else bytes(array)
text = array.decode('utf-8', errors='ignore')
return text.strip() if text.strip() != "" else "None"
def pattern_scan_all(handle, pattern, *, return_multiple=False, find_num=100):
next_region = 0
found = []
user_space_limit = 0x7FFFFFFF0000 if sys.maxsize > 2 ** 32 else 0x7fff0000
while next_region < user_space_limit:
try:
next_region, page_found = pymem.pattern.scan_pattern_page(
handle,
next_region,
pattern,
return_multiple=return_multiple
)
except Exception as e:
print(e)
break
if not return_multiple and page_found:
return page_found
if page_found:
found += page_found
if len(found) > find_num:
break
return found
def get_info_wxid(h_process):
find_num = 100
addrs = pattern_scan_all(h_process, br'\\Msg\\FTSContact', return_multiple=True, find_num=find_num)
wxids = []
for addr in addrs:
array = ctypes.create_string_buffer(80)
if ReadProcessMemory(h_process, void_p(addr - 30), array, 80, 0) == 0: return "None"
array = bytes(array) # .split(b"\\")[0]
array = array.split(b"\\Msg")[0]
array = array.split(b"\\")[-1]
wxids.append(array.decode('utf-8', errors='ignore'))
wxid = max(wxids, key=wxids.count) if wxids else "None"
return wxid
def get_wx_dir(wxid):
if not wxid:
return ''
try:
is_w_dir = False
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Tencent\WeChat", 0, winreg.KEY_READ)
value, _ = winreg.QueryValueEx(key, "FileSavePath")
winreg.CloseKey(key)
w_dir = value
is_w_dir = True
except Exception as e:
w_dir = "MyDocument:"
if not is_w_dir:
try:
user_profile = os.environ.get("USERPROFILE")
path_3ebffe94 = os.path.join(user_profile, "AppData", "Roaming", "Tencent", "WeChat", "All Users",
"config",
"3ebffe94.ini")
with open(path_3ebffe94, "r", encoding="utf-8") as f:
w_dir = f.read()
is_w_dir = True
except Exception as e:
w_dir = "MyDocument:"
if w_dir == "MyDocument:":
try:
# 打开注册表路径
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders")
documents_path = winreg.QueryValueEx(key, "Personal")[0] # 读取文档实际目录路径
winreg.CloseKey(key) # 关闭注册表
documents_paths = os.path.split(documents_path)
if "%" in documents_paths[0]:
w_dir = os.environ.get(documents_paths[0].replace("%", ""))
w_dir = os.path.join(w_dir, os.path.join(*documents_paths[1:]))
# print(1, w_dir)
else:
w_dir = documents_path
except Exception as e:
profile = os.environ.get("USERPROFILE")
w_dir = os.path.join(profile, "Documents")
msg_dir = os.path.join(w_dir, "WeChat Files", wxid)
return msg_dir
except FileNotFoundError:
return ''
def get_key(db_path, addr_len):
def read_key_bytes(h_process, address, address_len=8):
array = ctypes.create_string_buffer(address_len)
if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return ""
address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址key地址
key = ctypes.create_string_buffer(32)
if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return ""
key_bytes = bytes(key)
return key_bytes
def verify_key(key, wx_db_path):
if not wx_db_path:
return True
KEY_SIZE = 32
DEFAULT_PAGESIZE = 4096
DEFAULT_ITER = 64000
with open(wx_db_path, "rb") as file:
blist = file.read(5000)
salt = blist[:16]
byteKey = hashlib.pbkdf2_hmac("sha1", key, salt, DEFAULT_ITER, KEY_SIZE)
first = blist[16:DEFAULT_PAGESIZE]
mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1)
hash_mac.update(b'\x01\x00\x00\x00')
if hash_mac.digest() != first[-32:-12]:
return False
return True
phone_type1 = "iphone\x00"
phone_type2 = "android\x00"
phone_type3 = "ipad\x00"
pm = pymem.Pymem("WeChat.exe")
module_name = "WeChatWin.dll"
MicroMsg_path = os.path.join(db_path, "MSG", "MicroMsg.db")
type1_addrs = pm.pattern_scan_module(phone_type1.encode(), module_name, return_multiple=True)
type2_addrs = pm.pattern_scan_module(phone_type2.encode(), module_name, return_multiple=True)
type3_addrs = pm.pattern_scan_module(phone_type3.encode(), module_name, return_multiple=True)
type_addrs = type1_addrs if len(type1_addrs) >= 2 else type2_addrs if len(type2_addrs) >= 2 else type3_addrs if len(
type3_addrs) >= 2 else ""
# print(type_addrs)
if type_addrs == "":
return ""
for i in type_addrs[::-1]:
for j in range(i, i - 2000, -addr_len):
key_bytes = read_key_bytes(pm.process_handle, j, addr_len)
if key_bytes == "":
continue
if db_path != "" and verify_key(key_bytes, MicroMsg_path):
return key_bytes.hex()
return ""
def dump_wechat_info_v3(version_list, pid) -> WeChatInfo:
wechat_info = WeChatInfo()
wechat_info.pid = pid
wechat_info.version = get_version(pid)
process = psutil.Process(pid)
pythoncom.CoInitialize()
wechat_base_address = 0
for module in process.memory_maps(grouped=False):
if module.path and 'WeChatWin.dll' in module.path:
wechat_base_address = int(module.addr, 16)
break
if wechat_base_address == 0:
wechat_info.errmsg = '错误!请登录微信。'
return wechat_info
Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, process.pid)
bias_list = version_list.get(wechat_info.version)
if not isinstance(bias_list, list) or len(bias_list) <= 4:
wechat_info.errcode = 405
wechat_info.errmsg = '错误!微信版本不匹配,请手动填写信息。'
return wechat_info
else:
name_base_address = wechat_base_address + bias_list[0]
account__base_address = wechat_base_address + bias_list[1]
mobile_base_address = wechat_base_address + bias_list[2]
wechat_info.account_name = get_info_without_key(Handle, account__base_address, 32) if bias_list[1] != 0 else "None"
wechat_info.phone = get_info_without_key(Handle, mobile_base_address, 64) if bias_list[2] != 0 else "None"
wechat_info.nick_name = get_info_without_key(Handle, name_base_address, 64) if bias_list[0] != 0 else "None"
addrLen = get_exe_bit(process.exe()) // 8
wechat_info.wxid = get_info_wxid(Handle)
wechat_info.wx_dir = get_wx_dir(wechat_info.wxid)
wechat_info.key = get_key(wechat_info.wx_dir, addrLen)
if not wechat_info.key:
wechat_info.errcode = 404
wechat_info.errmsg = '请重启微信后重试。'
else:
wechat_info.errcode = 200
return wechat_info

View File

@@ -0,0 +1,514 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2025/1/10 2:36
@Author : SiYuan
@Email : 863909694@qq.com
@File : wxManager-wx_info_v4.py
@Description :
"""
import ctypes
import multiprocessing
import os.path
import hmac
import os
import struct
import time
from ctypes import wintypes
from multiprocessing import freeze_support
import pymem
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Hash import SHA512
import yara
from wxManager.decrypt.common import WeChatInfo
from wxManager.decrypt.common import get_version
# 定义必要的常量
PROCESS_ALL_ACCESS = 0x1F0FFF
PAGE_READWRITE = 0x04
MEM_COMMIT = 0x1000
MEM_PRIVATE = 0x20000
# Constants
IV_SIZE = 16
HMAC_SHA256_SIZE = 64
HMAC_SHA512_SIZE = 64
KEY_SIZE = 32
AES_BLOCK_SIZE = 16
ROUND_COUNT = 256000
PAGE_SIZE = 4096
SALT_SIZE = 16
finish_flag = False
# 定义 MEMORY_BASIC_INFORMATION 结构
class MEMORY_BASIC_INFORMATION(ctypes.Structure):
_fields_ = [
("BaseAddress", ctypes.c_void_p),
("AllocationBase", ctypes.c_void_p),
("AllocationProtect", ctypes.c_ulong),
("RegionSize", ctypes.c_size_t),
("State", ctypes.c_ulong),
("Protect", ctypes.c_ulong),
("Type", ctypes.c_ulong),
]
# Windows API Constants
PROCESS_VM_READ = 0x0010
PROCESS_QUERY_INFORMATION = 0x0400
# Load Windows DLLs
kernel32 = ctypes.windll.kernel32
# 打开目标进程
def open_process(pid):
return ctypes.windll.kernel32.OpenProcess(PROCESS_ALL_ACCESS, False, pid)
# 读取目标进程内存
def read_process_memory(process_handle, address, size):
buffer = ctypes.create_string_buffer(size)
bytes_read = ctypes.c_size_t(0)
success = ctypes.windll.kernel32.ReadProcessMemory(
process_handle,
ctypes.c_void_p(address),
buffer,
size,
ctypes.byref(bytes_read)
)
if not success:
return None
return buffer.raw
# 获取所有内存区域
def get_memory_regions(process_handle):
regions = []
mbi = MEMORY_BASIC_INFORMATION()
address = 0
while ctypes.windll.kernel32.VirtualQueryEx(
process_handle,
ctypes.c_void_p(address),
ctypes.byref(mbi),
ctypes.sizeof(mbi)
):
if mbi.State == MEM_COMMIT and mbi.Type == MEM_PRIVATE:
regions.append((mbi.BaseAddress, mbi.RegionSize))
address += mbi.RegionSize
return regions
rules_v4 = r'''
rule GetDataDir {
strings:
$a = /[a-zA-Z]:\\(.{1,100}?\\){0,1}?xwechat_files\\[0-9a-zA-Z_-]{6,24}?\\db_storage\\/
condition:
$a
}
rule GetPhoneNumberOffset {
strings:
$a = /[\x01-\x20]\x00{7}(\x0f|\x1f)\x00{7}[0-9]{11}\x00{5}\x0b\x00{7}\x0f\x00{7}/
condition:
$a
}
rule GetKeyAddrStub
{
strings:
$a = /.{6}\x00{2}\x00{8}\x20\x00{7}\x2f\x00{7}/
condition:
all of them
}
'''
def read_string(data: bytes, offset, size):
try:
return data[offset:offset + size].decode('utf-8')
except:
# print(data[offset:offset + size])
# print(traceback.format_exc())
return ''
def read_num(data: bytes, offset, size):
# 构建格式字符串,根据 size 来选择相应的格式
if size == 1:
fmt = '<B' # 1 字节unsigned char
elif size == 2:
fmt = '<H' # 2 字节unsigned short
elif size == 4:
fmt = '<I' # 4 字节unsigned int
elif size == 8:
fmt = '<Q' # 8 字节unsigned long long
else:
raise ValueError("Unsupported size")
# 使用 struct.unpack 从指定 offset 开始读取 size 字节的数据并转换为数字
result = struct.unpack_from(fmt, data, offset)[0] # 通过 unpack_from 来读取指定偏移的数据
return result
def read_bytes(data: bytes, offset, size):
return data[offset:offset + size]
# def read_bytes_from_pid(pid, offset, size):
# with open(f'/proc/{pid}/mem', 'rb') as mem_file:
# mem_file.seek(offset)
# return mem_file.read(size)
# 导入 Windows API 函数
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
OpenProcess = kernel32.OpenProcess
OpenProcess.argtypes = [wintypes.DWORD, wintypes.BOOL, wintypes.DWORD]
OpenProcess.restype = wintypes.HANDLE
ReadProcessMemory = kernel32.ReadProcessMemory
ReadProcessMemory.argtypes = [wintypes.HANDLE, wintypes.LPCVOID, wintypes.LPVOID, ctypes.c_size_t,
ctypes.POINTER(ctypes.c_size_t)]
ReadProcessMemory.restype = wintypes.BOOL
CloseHandle = kernel32.CloseHandle
CloseHandle.argtypes = [wintypes.HANDLE]
CloseHandle.restype = wintypes.BOOL
def read_bytes_from_pid(pid: int, addr: int, size: int):
# 打开进程
hprocess = OpenProcess(PROCESS_VM_READ | PROCESS_QUERY_INFORMATION, False, pid)
if not hprocess:
raise Exception(f"Failed to open process with PID {pid}")
buffer = b''
try:
# 创建缓冲区
buffer = ctypes.create_string_buffer(size)
# 读取内存
bytes_read = ctypes.c_size_t(0)
success = ReadProcessMemory(hprocess, addr, buffer, size, ctypes.byref(bytes_read))
if not success:
CloseHandle(hprocess)
return b''
raise Exception(f"Failed to read memory at address {hex(addr)}")
# 关闭句柄
CloseHandle(hprocess)
except:
pass
# 返回读取的字节数组
return bytes(buffer)
def read_string_from_pid(pid: int, addr: int, size: int):
bytes0 = read_bytes_from_pid(pid, addr, size)
try:
return bytes0.decode('utf-8')
except:
return ''
def is_ok(passphrase, buf):
global finish_flag
if finish_flag:
return False
# 获取文件开头的 salt
salt = buf[:SALT_SIZE]
# salt 异或 0x3a 得到 mac_salt用于计算 HMAC
mac_salt = bytes(x ^ 0x3a for x in salt)
# 使用 PBKDF2 生成新的密钥
new_key = PBKDF2(passphrase, salt, dkLen=KEY_SIZE, count=ROUND_COUNT, hmac_hash_module=SHA512)
# 使用新的密钥和 mac_salt 计算 mac_key
mac_key = PBKDF2(new_key, mac_salt, dkLen=KEY_SIZE, count=2, hmac_hash_module=SHA512)
# 计算 hash 校验码的保留空间
reserve = IV_SIZE + HMAC_SHA512_SIZE
reserve = ((reserve + AES_BLOCK_SIZE - 1) // AES_BLOCK_SIZE) * AES_BLOCK_SIZE
# 校验 HMAC
start = SALT_SIZE
end = PAGE_SIZE
mac = hmac.new(mac_key, buf[start:end - reserve + IV_SIZE], SHA512)
mac.update(struct.pack('<I', 1)) # page number as 1
hash_mac = mac.digest()
# 校验 HMAC 是否一致
hash_mac_start_offset = end - reserve + IV_SIZE
hash_mac_end_offset = hash_mac_start_offset + len(hash_mac)
if hash_mac == buf[hash_mac_start_offset:hash_mac_end_offset]:
print(f"[v] found key at 0x{start:x}")
finish_flag = True
return True
return False
def check_chunk(chunk, buf):
global finish_flag
if finish_flag:
return False
if is_ok(chunk, buf):
return chunk
return False
def verify_key(key: bytes, buffer: bytes, flag, result):
if len(key) != 32:
return False
if flag.value: # 如果其他进程已找到结果,提前退出
return False
if is_ok(key, buffer): # 替换为实际的目标检测条件
print("Key found!", key)
with flag.get_lock(): # 保证线程安全
flag.value = True
return key
else:
return False
def get_key_(keys, buf):
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() // 2)
results = pool.starmap(check_chunk, ((key, buf) for key in keys))
pool.close()
pool.join()
for r in results:
if r:
print("Key found!", r)
return bytes.hex(r)
return None
def get_key_inner(pid, process_infos):
"""
扫描可能为key的内存
:param pid:
:param process_infos:
:return:
"""
process_handle = open_process(pid)
rules_v4_key = r'''
rule GetKeyAddrStub
{
strings:
$a = /.{6}\x00{2}\x00{8}\x20\x00{7}\x2f\x00{7}/
condition:
all of them
}
'''
rules = yara.compile(source=rules_v4_key)
pre_addresses = []
for base_address, region_size in process_infos:
memory = read_process_memory(process_handle, base_address, region_size)
# 定义目标数据(如内存或文件内容)
target_data = memory # 二进制数据
if not memory:
continue
# 加上这些判断条件时灵时不灵
# if b'-----BEGIN PUBLIC KEY-----' not in target_data or b'USER_KEYINFO' not in target_data:
# continue
# if b'db_storage' not in memory:
# continue
# with open(f'key-{base_address}.bin', 'wb') as f:
# f.write(target_data)
matches = rules.match(data=target_data)
if matches:
for match in matches:
rule_name = match.rule
if rule_name == 'GetKeyAddrStub':
for string in match.strings:
instance = string.instances[0]
offset, content = instance.offset, instance.matched_data
addr = read_num(target_data, offset, 8)
pre_addresses.append(addr)
keys = []
key_set = set()
for pre_address in pre_addresses:
if any([base_address <= pre_address <= base_address + region_size - KEY_SIZE for base_address, region_size in
process_infos]):
key = read_bytes_from_pid(pid, pre_address, 32)
if key not in key_set:
keys.append(key)
key_set.add(key)
return keys
def get_key(pid, process_handle, buf):
process_infos = get_memory_regions(process_handle)
def split_list(lst, n):
k, m = divmod(len(lst), n)
return (lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
keys = []
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() // 2)
results = pool.starmap(get_key_inner, ((pid, process_info_) for process_info_ in
split_list(process_infos, min(len(process_infos), 40))))
pool.close()
pool.join()
for r in results:
if r:
keys += r
key = get_key_(keys, buf)
return key
def get_wx_dir(process_handle):
rules_v4_dir = r'''
rule GetDataDir {
strings:
$a = /[a-zA-Z]:\\(.{1,100}?\\){0,1}?xwechat_files\\[0-9a-zA-Z_-]{6,24}?\\db_storage\\/
condition:
$a
}
'''
rules = yara.compile(source=rules_v4_dir)
process_infos = get_memory_regions(process_handle)
wx_dir_cnt = {}
for base_address, region_size in process_infos:
memory = read_process_memory(process_handle, base_address, region_size)
# 定义目标数据(如内存或文件内容)
target_data = memory # 二进制数据
if not memory:
continue
if b'db_storage' not in memory:
continue
matches = rules.match(data=target_data)
if matches:
# 输出匹配结果
for match in matches:
rule_name = match.rule
if rule_name == 'GetDataDir':
for string in match.strings:
content = string.instances[0].matched_data
wx_dir_cnt[content] = wx_dir_cnt.get(content, 0) + 1
return max(wx_dir_cnt, key=wx_dir_cnt.get).decode('utf-8') if wx_dir_cnt else ''
def get_nickname(pid):
process_handle = open_process(pid)
if not process_handle:
print(f"无法打开进程 {pid}")
return {}
process_infos = get_memory_regions(process_handle)
# 加载规则
r'''$a = /(.{16}[\x00-\x20]\x00{7}(\x0f|\x1f)\x00{7}){2}.{16}[\x01-\x20]\x00{7}(\x0f|\x1f)\x00{7}[0-9]{11}\x00{5}\x0b\x00{7}\x0f\x00{7}.{25}\x00{7}(\x3f|\x2f|\x1f|\x0f)\x00{7}/s'''
rules_v4_phone = r'''
rule GetPhoneNumberOffset {
strings:
$a = /[\x01-\x20]\x00{7}(\x0f|\x1f)\x00{7}[0-9]{11}\x00{5}\x0b\x00{7}\x0f\x00{7}/
condition:
$a
}
'''
nick_name = ''
phone = ''
account_name = ''
rules = yara.compile(source=rules_v4_phone)
for base_address, region_size in process_infos:
memory = read_process_memory(process_handle, base_address, region_size)
# 定义目标数据(如内存或文件内容)
target_data = memory # 二进制数据
if not memory:
continue
# if not (b'db_storage' in target_data or b'USER_KEYINFO' in target_data):
# continue
# if not (b'-----BEGIN PUBLIC KEY-----' in target_data):
# continue
matches = rules.match(data=target_data)
if matches:
# 输出匹配结果
for match in matches:
rule_name = match.rule
if rule_name == 'GetPhoneNumberOffset':
for string in match.strings:
instance = string.instances[0]
offset, content = instance.offset, instance.matched_data
# print(
# f"匹配字符串: {identifier} 内容: 偏移: {offset} 在地址: {hex(base_address + offset + 0x10)}")
# print(string)
with open('a.bin','wb') as f:
f.write(target_data)
phone_addr = offset + 0x10
phone = read_string(target_data, phone_addr, 11)
# 提取前 8 个字节
data_slice = target_data[offset:offset + 8]
# 使用 struct.unpack() 将字节转换为 u64'<Q' 表示小端字节序的 8 字节无符号整数
nick_name_length = struct.unpack('<Q', data_slice)[0]
# print('nick_name_length', nick_name_length)
nick_name = read_string(target_data, phone_addr - 0x20, nick_name_length)
a = target_data[phone_addr - 0x60:phone_addr + 0x50]
account_name_length = read_num(target_data, phone_addr - 0x30, 8)
# print('account_name_length', account_name_length)
account_name = read_string(target_data, phone_addr - 0x40, account_name_length)
# with open('a.bin', 'wb') as f:
# f.write(target_data)
if not account_name:
addr = read_num(target_data, phone_addr - 0x40, 8)
# print(hex(addr))
account_name = read_string_from_pid(pid, addr, account_name_length)
return {
'nick_name': nick_name,
'phone': phone,
'account_name': account_name
}
def worker(pid, queue):
nickname_dic = get_nickname(pid)
queue.put(nickname_dic)
def dump_wechat_info_v4(pid) -> WeChatInfo | None:
wechat_info = WeChatInfo()
wechat_info.pid = pid
wechat_info.version = get_version(pid)
process_handle = open_process(pid)
if not process_handle:
print(f"无法打开进程 {pid}")
return wechat_info
queue = multiprocessing.Queue()
process = multiprocessing.Process(target=worker, args=(pid, queue))
process.start()
wechat_info.wx_dir = get_wx_dir(process_handle)
# print(wx_dir_cnt)
if not wechat_info.wx_dir:
return wechat_info
db_file_path = os.path.join(wechat_info.wx_dir, 'biz', 'biz.db')
with open(db_file_path, 'rb') as f:
buf = f.read()
wechat_info.key = get_key(pid, process_handle, buf)
ctypes.windll.kernel32.CloseHandle(process_handle)
wechat_info.wxid = '_'.join(wechat_info.wx_dir.split('\\')[-3].split('_')[0:-1])
wechat_info.wx_dir = '\\'.join(wechat_info.wx_dir.split('\\')[:-2])
process.join() # 等待子进程完成
if not queue.empty():
nickname_info = queue.get()
wechat_info.nick_name = nickname_info.get('nick_name', '')
wechat_info.phone = nickname_info.get('phone', '')
wechat_info.account_name = nickname_info.get('account_name', '')
if not wechat_info.key:
wechat_info.errcode = 404
else:
wechat_info.errcode = 200
return wechat_info
if __name__ == '__main__':
freeze_support()
st = time.time()
pm = pymem.Pymem("Weixin.exe")
pid = pm.process_id
w = dump_wechat_info_v4(pid)
print(w)
et = time.time()
print(et - st)

544
wxManager/decrypt/wxinfo.py Normal file
View File

@@ -0,0 +1,544 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2025/1/10 2:36
@Author : SiYuan
@Email : 863909694@qq.com
@File : wxManager-wxinfo.py
@Description :
"""
import ctypes
import multiprocessing
import os.path
import hmac
import os
import struct
import sys
import time
import traceback
from ctypes import wintypes
from multiprocessing import freeze_support
from typing import Set, Tuple
import pymem
import win32api
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Hash import SHA512
import psutil
import yara
# 定义必要的常量
PROCESS_ALL_ACCESS = 0x1F0FFF
PAGE_READWRITE = 0x04
MEM_COMMIT = 0x1000
MEM_PRIVATE = 0x20000
# Constants
IV_SIZE = 16
HMAC_SHA256_SIZE = 64
HMAC_SHA512_SIZE = 64
KEY_SIZE = 32
AES_BLOCK_SIZE = 16
ROUND_COUNT = 256000
PAGE_SIZE = 4096
SALT_SIZE = 16
finish_flag = False
class WechatInfo:
def __init__(self):
self.pid = 0
self.version = '0.0.0.0'
self.account_name = ''
self.nick_name = ''
self.phone = ''
self.wx_dir = ''
self.key = ''
self.wxid = ''
def __str__(self):
return f'''
pid: {self.pid}
version: {self.version}
account_name: {self.account_name}
nickname: {self.nick_name}
phone: {self.phone}
wxid: {self.wxid}
wx_dir: {self.wx_dir}
key: {self.key}
'''
# 定义 MEMORY_BASIC_INFORMATION 结构
class MEMORY_BASIC_INFORMATION(ctypes.Structure):
_fields_ = [
("BaseAddress", ctypes.c_void_p),
("AllocationBase", ctypes.c_void_p),
("AllocationProtect", ctypes.c_ulong),
("RegionSize", ctypes.c_size_t),
("State", ctypes.c_ulong),
("Protect", ctypes.c_ulong),
("Type", ctypes.c_ulong),
]
# Windows API Constants
PROCESS_VM_READ = 0x0010
PROCESS_QUERY_INFORMATION = 0x0400
# Load Windows DLLs
kernel32 = ctypes.windll.kernel32
# 打开目标进程
def open_process(pid):
return ctypes.windll.kernel32.OpenProcess(PROCESS_ALL_ACCESS, False, pid)
# 读取目标进程内存
def read_process_memory(process_handle, address, size):
buffer = ctypes.create_string_buffer(size)
bytes_read = ctypes.c_size_t(0)
success = ctypes.windll.kernel32.ReadProcessMemory(
process_handle,
ctypes.c_void_p(address),
buffer,
size,
ctypes.byref(bytes_read)
)
if not success:
return None
return buffer.raw
# 获取所有内存区域
def get_memory_regions(process_handle):
regions = []
mbi = MEMORY_BASIC_INFORMATION()
address = 0
while ctypes.windll.kernel32.VirtualQueryEx(
process_handle,
ctypes.c_void_p(address),
ctypes.byref(mbi),
ctypes.sizeof(mbi)
):
if mbi.State == MEM_COMMIT and mbi.Type == MEM_PRIVATE:
regions.append((mbi.BaseAddress, mbi.RegionSize))
address += mbi.RegionSize
return regions
rules_v4 = r'''
rule GetDataDir {
strings:
$a = /[a-zA-Z]:\\(.{1,100}?\\){0,1}?xwechat_files\\[0-9a-zA-Z_-]{6,24}?\\db_storage\\/
condition:
$a
}
rule GetPhoneNumberOffset {
strings:
$a = /[\x01-\x20]\x00{7}(\x0f|\x1f)\x00{7}[0-9]{11}\x00{5}\x0b\x00{7}\x0f\x00{7}/
condition:
$a
}
rule GetKeyAddrStub
{
strings:
$a = /.{6}\x00{2}\x00{8}\x20\x00{7}\x2f\x00{7}/
condition:
all of them
}
'''
def read_string(data: bytes, offset, size):
try:
return data[offset:offset + size].decode('utf-8')
except:
# print(data[offset:offset + size])
# print(traceback.format_exc())
return ''
def read_num(data: bytes, offset, size):
# 构建格式字符串,根据 size 来选择相应的格式
if size == 1:
fmt = '<B' # 1 字节unsigned char
elif size == 2:
fmt = '<H' # 2 字节unsigned short
elif size == 4:
fmt = '<I' # 4 字节unsigned int
elif size == 8:
fmt = '<Q' # 8 字节unsigned long long
else:
raise ValueError("Unsupported size")
# 使用 struct.unpack 从指定 offset 开始读取 size 字节的数据并转换为数字
result = struct.unpack_from(fmt, data, offset)[0] # 通过 unpack_from 来读取指定偏移的数据
return result
def read_bytes(data: bytes, offset, size):
return data[offset:offset + size]
# def read_bytes_from_pid(pid, offset, size):
# with open(f'/proc/{pid}/mem', 'rb') as mem_file:
# mem_file.seek(offset)
# return mem_file.read(size)
# 导入 Windows API 函数
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
OpenProcess = kernel32.OpenProcess
OpenProcess.argtypes = [wintypes.DWORD, wintypes.BOOL, wintypes.DWORD]
OpenProcess.restype = wintypes.HANDLE
ReadProcessMemory = kernel32.ReadProcessMemory
ReadProcessMemory.argtypes = [wintypes.HANDLE, wintypes.LPCVOID, wintypes.LPVOID, ctypes.c_size_t,
ctypes.POINTER(ctypes.c_size_t)]
ReadProcessMemory.restype = wintypes.BOOL
CloseHandle = kernel32.CloseHandle
CloseHandle.argtypes = [wintypes.HANDLE]
CloseHandle.restype = wintypes.BOOL
def read_bytes_from_pid(pid: int, addr: int, size: int):
# 打开进程
hprocess = OpenProcess(PROCESS_VM_READ | PROCESS_QUERY_INFORMATION, False, pid)
if not hprocess:
raise Exception(f"Failed to open process with PID {pid}")
buffer = b''
try:
# 创建缓冲区
buffer = ctypes.create_string_buffer(size)
# 读取内存
bytes_read = ctypes.c_size_t(0)
success = ReadProcessMemory(hprocess, addr, buffer, size, ctypes.byref(bytes_read))
if not success:
CloseHandle(hprocess)
return b''
raise Exception(f"Failed to read memory at address {hex(addr)}")
# 关闭句柄
CloseHandle(hprocess)
except:
pass
# 返回读取的字节数组
return bytes(buffer)
def read_string_from_pid(pid: int, addr: int, size: int):
bytes0 = read_bytes_from_pid(pid, addr, size)
try:
return bytes0.decode('utf-8')
except:
return ''
def is_ok(passphrase, buf):
global finish_flag
if finish_flag:
return False
# 获取文件开头的 salt
salt = buf[:SALT_SIZE]
# salt 异或 0x3a 得到 mac_salt用于计算 HMAC
mac_salt = bytes(x ^ 0x3a for x in salt)
# 使用 PBKDF2 生成新的密钥
new_key = PBKDF2(passphrase, salt, dkLen=KEY_SIZE, count=ROUND_COUNT, hmac_hash_module=SHA512)
# 使用新的密钥和 mac_salt 计算 mac_key
mac_key = PBKDF2(new_key, mac_salt, dkLen=KEY_SIZE, count=2, hmac_hash_module=SHA512)
# 计算 hash 校验码的保留空间
reserve = IV_SIZE + HMAC_SHA512_SIZE
reserve = ((reserve + AES_BLOCK_SIZE - 1) // AES_BLOCK_SIZE) * AES_BLOCK_SIZE
# 校验 HMAC
start = SALT_SIZE
end = PAGE_SIZE
mac = hmac.new(mac_key, buf[start:end - reserve + IV_SIZE], SHA512)
mac.update(struct.pack('<I', 1)) # page number as 1
hash_mac = mac.digest()
# 校验 HMAC 是否一致
hash_mac_start_offset = end - reserve + IV_SIZE
hash_mac_end_offset = hash_mac_start_offset + len(hash_mac)
if hash_mac == buf[hash_mac_start_offset:hash_mac_end_offset]:
print(f"[v] found key at 0x{start:x}")
finish_flag = True
return True
return False
def get_version(pid):
p = psutil.Process(pid)
version_info = win32api.GetFileVersionInfo(p.exe(), '\\')
version = f"{win32api.HIWORD(version_info['FileVersionMS'])}.{win32api.LOWORD(version_info['FileVersionMS'])}.{win32api.HIWORD(version_info['FileVersionLS'])}.{win32api.LOWORD(version_info['FileVersionLS'])}"
return version
def check_chunk(chunk, buf):
global finish_flag
if finish_flag:
return False
if is_ok(chunk, buf):
return chunk
return False
def verify_key(key: bytes, buffer: bytes, flag, result):
if len(key) != 32:
return False
if flag.value: # 如果其他进程已找到结果,提前退出
return False
if is_ok(key, buffer): # 替换为实际的目标检测条件
print("Key found!", key)
with flag.get_lock(): # 保证线程安全
flag.value = True
return key
else:
return False
def get_key_(keys, buf):
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() // 2)
results = pool.starmap(check_chunk, ((key, buf) for key in keys))
pool.close()
pool.join()
for r in results:
if r:
print("Key found!", r)
return bytes.hex(r)
return None
def get_key_inner(pid, process_infos):
"""
扫描可能为key的内存
:param pid:
:param process_infos:
:return:
"""
process_handle = open_process(pid)
rules_v4_key = r'''
rule GetKeyAddrStub
{
strings:
$a = /.{6}\x00{2}\x00{8}\x20\x00{7}\x2f\x00{7}/
condition:
all of them
}
'''
rules = yara.compile(source=rules_v4_key)
pre_addresses = []
for base_address, region_size in process_infos:
memory = read_process_memory(process_handle, base_address, region_size)
# 定义目标数据(如内存或文件内容)
target_data = memory # 二进制数据
if not memory:
continue
# 加上这些判断条件时灵时不灵
# if b'-----BEGIN PUBLIC KEY-----' not in target_data or b'USER_KEYINFO' not in target_data:
# continue
# if b'db_storage' not in memory:
# continue
# with open(f'key-{base_address}.bin', 'wb') as f:
# f.write(target_data)
matches = rules.match(data=target_data)
if matches:
for match in matches:
rule_name = match.rule
if rule_name == 'GetKeyAddrStub':
for string in match.strings:
instance = string.instances[0]
offset, content = instance.offset, instance.matched_data
addr = read_num(target_data, offset, 8)
pre_addresses.append(addr)
keys = []
key_set = set()
for pre_address in pre_addresses:
if any([base_address <= pre_address <= base_address + region_size - KEY_SIZE for base_address, region_size in
process_infos]):
key = read_bytes_from_pid(pid, pre_address, 32)
if key not in key_set:
keys.append(key)
key_set.add(key)
return keys
def get_key(pid, process_handle, buf):
process_infos = get_memory_regions(process_handle)
def split_list(lst, n):
k, m = divmod(len(lst), n)
return (lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
keys = []
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() // 2)
results = pool.starmap(get_key_inner, ((pid, process_info_) for process_info_ in
split_list(process_infos, min(len(process_infos), 40))))
pool.close()
pool.join()
for r in results:
if r:
keys += r
key = get_key_(keys, buf)
return key
def get_wx_dir(process_handle):
rules_v4_dir = r'''
rule GetDataDir {
strings:
$a = /[a-zA-Z]:\\(.{1,100}?\\){0,1}?xwechat_files\\[0-9a-zA-Z_-]{6,24}?\\db_storage\\/
condition:
$a
}
'''
rules = yara.compile(source=rules_v4_dir)
process_infos = get_memory_regions(process_handle)
wx_dir_cnt = {}
for base_address, region_size in process_infos:
memory = read_process_memory(process_handle, base_address, region_size)
# 定义目标数据(如内存或文件内容)
target_data = memory # 二进制数据
if not memory:
continue
if b'db_storage' not in memory:
continue
matches = rules.match(data=target_data)
if matches:
# 输出匹配结果
for match in matches:
rule_name = match.rule
if rule_name == 'GetDataDir':
for string in match.strings:
content = string.instances[0].matched_data
wx_dir_cnt[content] = wx_dir_cnt.get(content, 0) + 1
return max(wx_dir_cnt, key=wx_dir_cnt.get).decode('utf-8')
def get_nickname(pid):
process_handle = open_process(pid)
if not process_handle:
print(f"无法打开进程 {pid}")
return {}
process_infos = get_memory_regions(process_handle)
# 加载规则
r'''$a = /(.{16}[\x00-\x20]\x00{7}(\x0f|\x1f)\x00{7}){2}.{16}[\x01-\x20]\x00{7}(\x0f|\x1f)\x00{7}[0-9]{11}\x00{5}\x0b\x00{7}\x0f\x00{7}.{25}\x00{7}(\x3f|\x2f|\x1f|\x0f)\x00{7}/s'''
rules_v4_phone = r'''
rule GetPhoneNumberOffset {
strings:
$a = /[\x01-\x20]\x00{7}(\x0f|\x1f)\x00{7}[0-9]{11}\x00{5}\x0b\x00{7}\x0f\x00{7}/
condition:
$a
}
'''
nick_name = ''
phone = ''
account_name = ''
rules = yara.compile(source=rules_v4_phone)
for base_address, region_size in process_infos:
memory = read_process_memory(process_handle, base_address, region_size)
# 定义目标数据(如内存或文件内容)
target_data = memory # 二进制数据
if not memory:
continue
# if not (b'db_storage' in target_data or b'USER_KEYINFO' in target_data):
# continue
# if not (b'-----BEGIN PUBLIC KEY-----' in target_data):
# continue
matches = rules.match(data=target_data)
if matches:
# 输出匹配结果
for match in matches:
rule_name = match.rule
if rule_name == 'GetPhoneNumberOffset':
for string in match.strings:
instance = string.instances[0]
offset, content = instance.offset, instance.matched_data
# print(
# f"匹配字符串: {identifier} 内容: 偏移: {offset} 在地址: {hex(base_address + offset + 0x10)}")
# print(string)
with open('a.bin','wb') as f:
f.write(target_data)
phone_addr = offset + 0x10
phone = read_string(target_data, phone_addr, 11)
# 提取前 8 个字节
data_slice = target_data[offset:offset + 8]
# 使用 struct.unpack() 将字节转换为 u64'<Q' 表示小端字节序的 8 字节无符号整数
nick_name_length = struct.unpack('<Q', data_slice)[0]
# print('nick_name_length', nick_name_length)
nick_name = read_string(target_data, phone_addr - 0x20, nick_name_length)
a = target_data[phone_addr - 0x60:phone_addr + 0x50]
account_name_length = read_num(target_data, phone_addr - 0x30, 8)
# print('account_name_length', account_name_length)
account_name = read_string(target_data, phone_addr - 0x40, account_name_length)
# with open('a.bin', 'wb') as f:
# f.write(target_data)
if not account_name:
addr = read_num(target_data, phone_addr - 0x40, 8)
# print(hex(addr))
account_name = read_string_from_pid(pid, addr, account_name_length)
return {
'nick_name': nick_name,
'phone': phone,
'account_name': account_name
}
def worker(pid, queue):
nickname_dic = get_nickname(pid)
queue.put(nickname_dic)
def dump_wechat_info_v4_(pid) -> WechatInfo | None:
wechat_info = WechatInfo()
wechat_info.pid = pid
wechat_info.version = get_version(pid)
process_handle = open_process(pid)
if not process_handle:
print(f"无法打开进程 {pid}")
return None
queue = multiprocessing.Queue()
process = multiprocessing.Process(target=worker, args=(pid, queue))
process.start()
wechat_info.wx_dir = get_wx_dir(process_handle)
# print(wx_dir_cnt)
if not wechat_info.wx_dir:
return None
db_file_path = os.path.join(wechat_info.wx_dir, 'biz', 'biz.db')
with open(db_file_path, 'rb') as f:
buf = f.read()
wechat_info.key = get_key(pid, process_handle, buf)
ctypes.windll.kernel32.CloseHandle(process_handle)
wechat_info.wxid = '_'.join(wechat_info.wx_dir.split('\\')[-3].split('_')[0:-1])
wechat_info.wx_dir = '\\'.join(wechat_info.wx_dir.split('\\')[:-2])
process.join() # 等待子进程完成
if not queue.empty():
nickname_info = queue.get()
wechat_info.nick_name = nickname_info.get('nick_name', '')
wechat_info.phone = nickname_info.get('phone', '')
wechat_info.account_name = nickname_info.get('account_name', '')
return wechat_info
if __name__ == '__main__':
freeze_support()
st = time.time()
pm = pymem.Pymem("Weixin.exe")
pid = pm.process_id
w = dump_wechat_info_v4_(pid)
print(w)
et = time.time()
print(et - st)