Merge branch 'master' of https://github.com/LC044/WeChatMsg
This commit is contained in:
@@ -194,7 +194,7 @@ class Msg(DataBaseBase):
|
||||
@param username_:
|
||||
@return:
|
||||
"""
|
||||
sql = f'''SELECT DISTINCT strftime('%Y-%m-%d',create_time,'unixepoch','localtime') AS date
|
||||
sql = f'''SELECT DISTINCT strftime('%Y-%m-%d',CreateTime,'unixepoch','localtime') AS date
|
||||
from MSG
|
||||
where StrTalker=?
|
||||
ORDER BY date desc;
|
||||
|
||||
@@ -195,7 +195,30 @@ class OpenIMMsgDB(DataBaseBase):
|
||||
|
||||
def get_messages_by_type(self, username: str, type_: MessageType,
|
||||
time_range: Tuple[int | float | str | date, int | float | str | date] = None, ):
|
||||
return self.get_messages_by_type(self.DB.cursor, username, type_, time_range)
|
||||
return self._get_messages_by_type(self.DB.cursor, username, type_, time_range)
|
||||
|
||||
def _get_messages_calendar(self, cursor, username):
|
||||
"""
|
||||
获取某个人的聊天日历列表
|
||||
@param username_:
|
||||
@return:
|
||||
"""
|
||||
sql = f'''SELECT DISTINCT strftime('%Y-%m-%d',CreateTime,'unixepoch','localtime') AS date
|
||||
from PublicMsg
|
||||
where StrTalker=?
|
||||
ORDER BY date desc;
|
||||
'''
|
||||
cursor.execute(sql, [username])
|
||||
result = cursor.fetchall()
|
||||
return (data[0] for data in result)
|
||||
|
||||
def get_messages_calendar(self, username):
|
||||
res = []
|
||||
r1 = self._get_messages_calendar(self.DB.cursor(), username)
|
||||
if r1:
|
||||
res.extend(r1)
|
||||
res.sort()
|
||||
return res
|
||||
|
||||
def merge(self, db_path):
|
||||
if not (os.path.exists(db_path) or os.path.isfile(db_path)):
|
||||
|
||||
@@ -37,7 +37,7 @@ class PublicMsg(DataBaseBase):
|
||||
|
||||
def get_messages_by_type(self, username: str, type_: MessageType,
|
||||
time_range: Tuple[int | float | str | date, int | float | str | date] = None, ):
|
||||
return self.get_messages_by_type(self.DB.cursor, username, type_, time_range)
|
||||
return self._get_messages_by_type(self.DB.cursor, username, type_, time_range)
|
||||
|
||||
def get_sport_score_by_name(self, username,
|
||||
time_range: Tuple[int | float | str | date, int | float | str | date] = None, ):
|
||||
@@ -110,7 +110,7 @@ class PublicMsg(DataBaseBase):
|
||||
@param username_:
|
||||
@return:
|
||||
"""
|
||||
sql = f'''SELECT DISTINCT strftime('%Y-%m-%d',create_time,'unixepoch','localtime') AS date
|
||||
sql = f'''SELECT DISTINCT strftime('%Y-%m-%d',CreateTime,'unixepoch','localtime') AS date
|
||||
from PublicMsg
|
||||
where StrTalker=?
|
||||
ORDER BY date desc;
|
||||
|
||||
@@ -51,5 +51,6 @@ class EmotionDB(DataBaseBase):
|
||||
print(f"数据库操作错误: {traceback.format_exc()}")
|
||||
self.DB.rollback()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pass
|
||||
|
||||
@@ -182,6 +182,10 @@ class HardLinkDB(DataBaseBase):
|
||||
if os.path.exists(os.path.join(Me().wx_dir, path1)):
|
||||
return path1
|
||||
else:
|
||||
data_image = f'{message.file_name}_h.dat' if message.file_name else f'{local_id}_{create_time}_h.dat'
|
||||
path1 = os.path.join(image_root_path, dir1, dir2, dir0, data_image)
|
||||
if os.path.exists(os.path.join(Me().wx_dir, path1)):
|
||||
return path1
|
||||
data_image = f'{message.file_name}.dat' if message.file_name else f'{local_id}_{create_time}.dat'
|
||||
path1 = os.path.join(image_root_path, dir1, dir2, dir0, data_image)
|
||||
return path1
|
||||
|
||||
@@ -121,30 +121,49 @@ def decode_dat(xor_key: int, file_path, out_path, dst_name='') -> str | bytes:
|
||||
|
||||
|
||||
def get_decode_code_v4(wx_dir):
|
||||
"""
|
||||
从微信文件夹里找到异或密钥,原理详见:https://blog.lc044.love/post/16
|
||||
:param wx_dir:
|
||||
:return:
|
||||
"""
|
||||
cache_dir = os.path.join(wx_dir, 'cache')
|
||||
if not os.path.isdir(wx_dir) or not os.path.exists(cache_dir):
|
||||
raise ValueError(f'微信路径输入错误,请检查:{wx_dir}')
|
||||
ok_flag = False
|
||||
for root, dirs, files in os.walk(cache_dir):
|
||||
if ok_flag:
|
||||
break
|
||||
for file in files:
|
||||
if file.endswith(".dat"):
|
||||
# 构造源文件和目标文件的完整路径
|
||||
src_file_path = os.path.join(root, file)
|
||||
with open(src_file_path, 'rb') as f:
|
||||
data = f.read()
|
||||
if not is_v4_image(data):
|
||||
continue
|
||||
file_tail = data[-2:]
|
||||
|
||||
jpg_known_tail = b'\xff\xd9'
|
||||
# 推导出密钥
|
||||
xor_key = [c ^ p for c, p in zip(file_tail, jpg_known_tail)]
|
||||
if len(set(xor_key)) == 1:
|
||||
print(f'[*] 找到异或密钥: 0x{xor_key[0]:x}')
|
||||
return xor_key[0]
|
||||
return -1
|
||||
def find_xor_key(dir0):
|
||||
ok_flag = False
|
||||
for root, dirs, files in os.walk(dir0):
|
||||
if ok_flag:
|
||||
break
|
||||
for file in files:
|
||||
if file.endswith("_t.dat"):
|
||||
# 构造源文件和目标文件的完整路径
|
||||
src_file_path = os.path.join(root, file)
|
||||
with open(src_file_path, 'rb') as f:
|
||||
data = f.read()
|
||||
if not is_v4_image(data):
|
||||
continue
|
||||
file_tail = data[-2:]
|
||||
|
||||
jpg_known_tail = b'\xff\xd9'
|
||||
# 推导出密钥
|
||||
xor_key = [c ^ p for c, p in zip(file_tail, jpg_known_tail)]
|
||||
if len(set(xor_key)) == 1:
|
||||
print(f'[*] 找到异或密钥: 0x{xor_key[0]:x}')
|
||||
return xor_key[0]
|
||||
return -1
|
||||
|
||||
xor_key_ = find_xor_key(cache_dir)
|
||||
if xor_key_ != -1:
|
||||
return xor_key_
|
||||
else:
|
||||
dirs = ['temp', 'msg']
|
||||
for dir_name in dirs:
|
||||
cache_dir = os.path.join(wx_dir, dir_name)
|
||||
xor_key_ = find_xor_key(cache_dir)
|
||||
if xor_key_ != -1:
|
||||
return xor_key_
|
||||
return 0
|
||||
|
||||
|
||||
def get_image_type(data: bytes) -> str:
|
||||
|
||||
@@ -157,7 +157,7 @@ class Me:
|
||||
self.small_head_img_url = ''
|
||||
self.nickname = self.name
|
||||
self.remark = self.nickname
|
||||
self.xor_key = -1
|
||||
self.xor_key = 0
|
||||
|
||||
def to_json(self) -> dict:
|
||||
return {
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
@Description :
|
||||
"""
|
||||
import base64
|
||||
import re
|
||||
import traceback
|
||||
|
||||
import xmltodict
|
||||
@@ -26,9 +27,23 @@ def parser_emoji(xml_content):
|
||||
'height': 0,
|
||||
'desc': ''
|
||||
}
|
||||
|
||||
def extract_msg(text):
|
||||
# 使用正则表达式匹配第一个 <msg> 标签及其内容
|
||||
pattern = r'(<msg>.*?</msg>)'
|
||||
match = re.search(pattern, text)
|
||||
return f'<msg>{match.group(0)}</msg>' if match else ''
|
||||
|
||||
xml_content = xml_content.strip().replace('&', '&')
|
||||
try:
|
||||
xml_dict = xmltodict.parse(xml_content)
|
||||
except:
|
||||
try:
|
||||
xml_content = extract_msg(xml_content)
|
||||
xml_dict = xmltodict.parse(xml_content)
|
||||
except:
|
||||
pass
|
||||
try:
|
||||
emoji_dic = xml_dict.get('msg', {}).get('emoji', {})
|
||||
if '@androidmd5' in emoji_dic:
|
||||
md5 = emoji_dic.get('@androidmd5', '')
|
||||
|
||||
@@ -17,7 +17,7 @@ from wxManager.log import logger
|
||||
def get_image_type(header):
|
||||
# 根据文件头判断图片类型
|
||||
if header.startswith(b'\xFF\xD8'):
|
||||
return 'jepg'
|
||||
return 'jpeg'
|
||||
elif header.startswith(b'\x89PNG'):
|
||||
return 'png'
|
||||
elif header[:6] in (b'GIF87a', b'GIF89a'):
|
||||
|
||||
Reference in New Issue
Block a user