Files
CradCopyer/main.py
2025-12-23 16:18:16 +08:00

2989 lines
119 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import sys
import os
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Optional
# 全局图标管理器
_global_icon_image = None # 存储PIL Image对象
_icon_preloaded = False
def preload_icon():
"""预加载图标,确保在窗口创建前就可用了"""
global _icon_preloaded, _global_icon_image
if not _icon_preloaded:
try:
# 确保PIL模块可用
if 'PIL_Image' not in globals() or 'PIL_ImageTk' not in globals():
from PIL import Image as PIL_Image
from PIL import ImageTk as PIL_ImageTk
# 将其注入全局命名空间以便其他函数使用
globals()['PIL_Image'] = PIL_Image
globals()['PIL_ImageTk'] = PIL_ImageTk
icon_path = get_icon_path()
if icon_path and 'PIL_Image' in globals():
# 只加载Image对象不创建PhotoImage因为PhotoImage依赖于特定的Tk实例
_global_icon_image = PIL_Image.open(icon_path)
_icon_preloaded = True
print(f"图标图像预加载成功: {icon_path}")
return True
except Exception as e:
print(f"图标预加载失败: {e}")
_icon_preloaded = False
return _icon_preloaded is True
def get_global_icon_image():
"""获取全局图标Image对象"""
global _global_icon_image
if _global_icon_image is None:
preload_icon()
return _global_icon_image
def get_global_icon_photo():
"""已废弃为了兼容性保留但返回None以强制重新创建"""
return None
def get_icon_path():
"""获取图标文件的完整路径"""
# 检查当前目录(源码运行)
if os.path.exists('appicon.png'):
return 'appicon.png'
# 检查可执行文件所在目录(打包后运行)
if hasattr(sys, '_MEIPASS'):
# PyInstaller 打包后的路径
icon_path = os.path.join(sys._MEIPASS, 'appicon.png')
if os.path.exists(icon_path):
return icon_path
# 检查应用包内的 Resources 目录
app_path = os.path.dirname(os.path.abspath(__file__))
resources_path = os.path.join(app_path, 'appicon.png')
if os.path.exists(resources_path):
return resources_path
return None
def get_log_directory():
"""获取跨平台的日志目录路径"""
system = sys.platform
if system == "darwin": # macOS
# macOS: ~/Documents/CardCopyer/logs
log_dir = os.path.expanduser("~/Documents/CardCopyer/logs")
elif system == "win32": # Windows
# Windows: ~/Documents/CardCopyer/logs
log_dir = os.path.expanduser("~/Documents/CardCopyer/logs")
else: # Linux 和其他系统
# Linux: ~/.local/share/CardCopyer/logs
log_dir = os.path.expanduser("~/.local/share/CardCopyer/logs")
# 确保目录存在
try:
os.makedirs(log_dir, exist_ok=True)
except Exception as e:
# 如果无法创建目录,回退到应用目录
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs")
os.makedirs(log_dir, exist_ok=True)
return log_dir
# 延迟导入 - 提高启动速度
def import_heavy_modules():
"""延迟导入重量级的模块"""
global tk, ttk, tb, filedialog, messagebox, shutil, hashlib, psutil, json, subprocess, PIL_Image, PIL_ImageTk
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from tkinter.constants import BOTH, YES, NO, X, Y, LEFT, RIGHT, TOP, BOTTOM, W, E, N, S, CENTER, HORIZONTAL, VERTICAL
import shutil
import hashlib
import json
import subprocess
# 这些模块较重,延迟导入
try:
import ttkbootstrap as tb
from ttkbootstrap.constants import PRIMARY, SUCCESS, INFO, WARNING, DANGER
except ImportError:
tb = None
try:
import psutil
except ImportError:
psutil = None
try:
from PIL import Image as PIL_Image, ImageTk as PIL_ImageTk
except ImportError:
PIL_Image = None
PIL_ImageTk = None
# 快速依赖检查(只检查关键模块)
def quick_check_dependencies():
"""快速依赖检查,只检查最基本的模块"""
required_modules = ["tkinter"]
missing_modules = []
for module_name in required_modules:
try:
__import__(module_name)
except ImportError:
missing_modules.append(module_name)
if missing_modules:
if getattr(sys, 'frozen', False):
error_msg = f"缺少必要的依赖模块: {', '.join(missing_modules)}\n"
error_msg += "请重新打包程序或联系开发者。"
print(error_msg)
return False
else:
print(f"缺少依赖模块: {', '.join(missing_modules)}")
return False
return True
# 完整依赖检查(在后台线程中执行)
def full_check_dependencies():
"""完整的依赖检查"""
required_modules = ["ttkbootstrap", "psutil", "PIL"]
missing_modules = []
for module_name in required_modules:
try:
__import__(module_name)
except ImportError:
missing_modules.append(module_name)
if missing_modules:
if getattr(sys, 'frozen', False):
return False, f"缺少必要的依赖模块: {', '.join(missing_modules)}"
else:
return False, f"缺少依赖模块: {', '.join(missing_modules)}\n请在命令行中运行: pip install ttkbootstrap psutil Pillow"
return True, None
class Tooltip:
def __init__(self, widget, text):
self.widget = widget
self.text = text
self.tip = None
widget.bind("<Enter>", self.show)
widget.bind("<Leave>", self.hide)
widget.bind("<Motion>", self.move)
def show(self, event=None):
if self.tip or not self.text:
return
x = self.widget.winfo_rootx() + 20
y = self.widget.winfo_rooty() + self.widget.winfo_height() + 10
self.tip = tk.Toplevel(self.widget)
self.tip.wm_overrideredirect(True)
self.tip.wm_geometry(f"+{x}+{y}")
label = tk.Label(self.tip, text=self.text, justify="left", relief="solid", borderwidth=1, background="#ffffe0", foreground="#333", font=("Arial", 10))
label.pack(padx=8, pady=6)
def move(self, event):
if self.tip:
x = event.x_root + 12
y = event.y_root + 12
self.tip.wm_geometry(f"+{x}+{y}")
def hide(self, event=None):
if self.tip:
self.tip.destroy()
self.tip = None
class CopyManager:
"""拷贝管理器 - 优化性能和资源管理"""
def __init__(self):
self.copying = False
self.verifying = False
self.backup_copying = False
self.total_files = 0
self.copied_files = 0
self.verified_files = 0
self.errors = []
# 日志文件相关
self.log_file = None
self.log_buffer = [] # 日志缓冲区
# 拷贝进度相关
self.total_size = 0 # 总大小(字节)
self.copied_size = 0 # 已拷贝大小(字节)
self.copy_start_time = 0 # 拷贝开始时间
self.copy_speed = 0 # 拷贝速度(字节/秒)
self.copy_eta = 0 # 预计剩余时间(秒)
# 验证进度相关
self.verified_size = 0 # 已验证大小(字节)
self.verify_start_time = 0 # 验证开始时间
self.verify_speed = 0 # 验证速度(字节/秒)
self.verify_eta = 0 # 预计剩余时间(秒)
# 日期文件夹名称(用于保持拷贝和验证使用相同的时间戳)
self.date_folder = None
# MD5验证统计
self.total_md5_files = 0 # 需要验证MD5的文件总数
self.md5_verified_files = 0 # 已完成MD5验证的文件数
self.md5_calc_size = 0 # 已计算MD5的数据量字节
self.md5_calc_speed = 0 # MD5计算速度字节/秒)
self.md5_start_time = 0 # MD5验证开始时间
# 性能优化:缓存文件大小计算结果
self._size_cache = {}
self._size_cache_timeout = 5 # 缓存超时时间(秒)
# 备用拷贝进度相关
self.backup_total_files = 0
self.backup_copied_files = 0
self.backup_total_size = 0
self.backup_copied_size = 0
self.total_backup_destinations = 0
self.current_backup_index = 0
self.backup_start_time = 0
self.backup_results = []
def get_folder_size(self, folder_path: str) -> int:
"""计算文件夹总大小(字节)- 使用缓存优化"""
# 检查缓存
current_time = time.time()
cache_key = folder_path
if cache_key in self._size_cache:
cached_size, cache_time = self._size_cache[cache_key]
if current_time - cache_time < self._size_cache_timeout:
return cached_size
total_size = 0
try:
# 使用更快的文件遍历方法
for root, dirs, files in os.walk(folder_path, followlinks=False):
# 限制遍历深度,避免深层嵌套目录
if root.count(os.sep) - folder_path.count(os.sep) > 10:
dirs[:] = [] # 不继续深入
continue
for file in files:
file_path = os.path.join(root, file)
try:
stat = os.lstat(file_path) # 使用lstat避免符号链接
total_size += stat.st_size
except (OSError, PermissionError, FileNotFoundError):
# 跳过无法访问的文件
continue
except (OSError, PermissionError):
# 跳过无法访问的文件夹
pass
# 缓存结果
self._size_cache[cache_key] = (total_size, current_time)
return total_size
def format_size(self, size_bytes: int) -> str:
"""格式化文件大小显示"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
size = float(size_bytes)
while size >= 1024 and i < len(size_names) - 1:
size /= 1024
i += 1
return f"{size:.1f} {size_names[i]}"
def format_time(self, seconds) -> str:
"""格式化时间显示"""
if seconds < 0:
seconds = 0
seconds = int(seconds) # 转换为整数
if seconds < 60:
return f"{seconds}"
elif seconds < 3600:
minutes = seconds // 60
secs = seconds % 60
return f"{minutes}{secs}"
else:
hours = seconds // 3600
minutes = (seconds % 3600) // 60
secs = seconds % 60
return f"{hours}小时{minutes}{secs}"
def init_log_file(self, log_dir: str, session_name: str):
"""初始化日志文件"""
try:
# 确保日志目录存在
os.makedirs(log_dir, exist_ok=True)
# 生成日志文件名(使用时间戳和会话名称)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_filename = f"copy_log_{timestamp}_{session_name}.log"
log_path = os.path.join(log_dir, log_filename)
self.log_file = open(log_path, 'w', encoding='utf-8')
# 写入日志头
self.log_file.write("="*80 + "\n")
self.log_file.write(f"CardCopyer-拷贝乐 - 拷贝日志\n")
self.log_file.write(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
self.log_file.write(f"会话名称: {session_name}\n")
self.log_file.write("="*80 + "\n\n")
self.log_file.flush()
return log_path
except Exception as e:
print(f"初始化日志文件失败: {e}")
return None
def write_log(self, message: str):
"""写入日志到文件"""
if self.log_file:
try:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.log_file.write(f"[{timestamp}] {message}\n")
self.log_file.flush()
except Exception as e:
print(f"写入日志失败: {e}")
def close_log_file(self):
"""关闭日志文件 - 优化清理过程"""
if self.log_file:
try:
# 确保所有缓冲的日志都被写入
if self.log_buffer:
for message in self.log_buffer:
self.log_file.write(message + "\n")
self.log_buffer.clear()
self.log_file.write("\n" + "="*80 + "\n")
self.log_file.write(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
self.log_file.write("="*80 + "\n")
self.log_file.flush() # 确保数据写入磁盘
self.log_file.close()
self.log_file = None
except Exception as e:
print(f"关闭日志文件失败: {e}")
# 清理缓存
self._size_cache.clear()
class StartupWindow:
"""启动窗口 - 显示加载进度"""
def __init__(self):
# 确保tkinter已导入
if 'tk' not in globals():
import tkinter as tk
from tkinter import ttk
# 确保PIL已导入
if 'PIL_Image' not in globals() or 'PIL_ImageTk' not in globals():
try:
from PIL import Image as PIL_Image, ImageTk as PIL_ImageTk
except ImportError:
pass
self.root = tk.Tk()
self.root.title("CardCopyer-拷贝乐 - 启动中")
self.root.geometry("300x150")
self.root.resizable(False, False)
# 设置窗口图标
try:
icon_path = get_icon_path()
if icon_path and 'PIL_Image' in globals() and 'PIL_ImageTk' in globals():
icon_image = PIL_Image.open(icon_path)
icon_photo = PIL_ImageTk.PhotoImage(icon_image)
self.root.iconphoto(True, icon_photo)
except Exception:
pass # 如果图标设置失败,继续使用默认图标
# 居中显示
self.root.update_idletasks()
x = (self.root.winfo_screenwidth() - 300) // 2
y = (self.root.winfo_screenheight() - 150) // 2
self.root.geometry(f"+{x}+{y}")
# 设置样式
self.root.configure(bg='#2b3e50')
# 标题
title_label = tk.Label(
self.root,
text="CardCopyer-拷贝乐",
font=("Arial", 16, "bold"),
bg='#2b3e50',
fg='white'
)
title_label.pack(pady=20)
# 进度标签
self.progress_label = tk.Label(
self.root,
text="正在初始化...",
font=("Arial", 10),
bg='#2b3e50',
fg='white'
)
self.progress_label.pack()
# 进度条
self.progress = ttk.Progressbar(
self.root,
mode='indeterminate',
length=250
)
self.progress.pack(pady=10)
self.progress.start()
def update_progress(self, message):
"""更新进度信息"""
self.progress_label.config(text=message)
self.root.update()
def close(self):
"""关闭启动窗口"""
self.progress.stop()
self.root.destroy()
class DITCopyTool:
"""CardCopyer主窗口"""
def __init__(self):
# 导入重量级模块
import_heavy_modules()
# 检查ttkbootstrap是否可用
if tb is None:
self.show_error_and_exit("ttkbootstrap模块不可用", "请安装ttkbootstrap: pip install ttkbootstrap")
return
# 预加载图标,确保在窗口创建前准备好
preload_icon()
self.icon_photo = None
# 创建主窗口但先隐藏,避免显示默认图标
self.window = tb.Window(
title="CardCopyer-拷贝乐",
themename="darkly",
size=(1400, 900),
resizable=(True, True)
)
self.original_theme = "darkly"
self.christmas_mode = False
self.christmas_bg_canvas = None
# 立即隐藏窗口,防止显示默认图标
self.window.withdraw()
# 尝试立即设置图标(窗口隐藏状态下)
try:
icon_image = get_global_icon_image()
if icon_image and 'PIL_ImageTk' in globals():
self.icon_photo = PIL_ImageTk.PhotoImage(icon_image, master=self.window)
self.window.wm_iconphoto(True, self.icon_photo)
if hasattr(self.window, 'iconphoto'):
self.window.iconphoto(True, self.icon_photo)
print("主窗口图标在隐藏状态下设置成功")
except Exception as e:
print(f"隐藏状态下设置主窗口图标失败: {e}")
# 设置窗口关闭事件处理
self.window.protocol("WM_DELETE_WINDOW", self.on_closing)
self.copy_manager = CopyManager()
self.source_items = [] # 源项目列表
self.destination_path = ""
self.copy_thread = None
self.media_extensions = {
".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp", ".heic", ".heif",
".cr2", ".cr3", ".nef", ".arw", ".dng", ".rw2", ".orf", ".raf", ".srw", ".pef", ".rwl",
".r3d", ".braw", ".ari", ".cine",
".mp4", ".mov", ".avi", ".mkv", ".wmv", ".flv", ".m4v", ".webm", ".mxf", ".mts", ".m2ts", ".ts", ".3gp", ".3g2",
".mp3", ".wav", ".aac", ".flac", ".ogg", ".m4a", ".aiff", ".aif", ".wma"
}
# 延迟UI初始化窗口仍在隐藏状态
self.window.after(100, self._show_main_window_with_icon)
# 启动图标监控定时器
if self.icon_photo:
self._start_icon_monitor()
def _show_main_window_with_icon(self):
"""显示主窗口并确保图标正确设置"""
try:
# 先设置UI
self.setup_ui()
# 尝试设置图标
self._try_set_icon()
# 显示窗口
self.window.deiconify()
print("主窗口已显示")
except Exception as e:
print(f"显示主窗口时发生异常: {e}")
# 确保窗口显示但不重复设置UI
self.window.deiconify()
def _try_set_icon(self):
"""尝试设置窗口图标,处理跨实例问题"""
try:
# 获取PIL Image对象而不是预先创建的PhotoImage
icon_image = get_global_icon_image()
if icon_image:
# 为当前窗口创建专用的PhotoImage
if 'PIL_ImageTk' in globals():
self.icon_photo = PIL_ImageTk.PhotoImage(icon_image, master=self.window)
self.window.wm_iconphoto(True, self.icon_photo)
if hasattr(self.window, 'iconphoto'):
self.window.iconphoto(True, self.icon_photo)
print("图标设置成功")
except Exception as e:
print(f"设置图标失败: {e}")
def _set_window_icon(self):
"""延迟设置主窗口图标,使用全局图标确保一致性"""
try:
# 使用全局图标管理器获取Image对象
icon_image = get_global_icon_image()
if icon_image and self.window:
# 为当前窗口创建专用的PhotoImage
if 'PIL_ImageTk' in globals():
self.icon_photo = PIL_ImageTk.PhotoImage(icon_image, master=self.window)
# 使用多种方法设置图标,确保兼容性
self.window.wm_iconphoto(True, self.icon_photo)
if hasattr(self.window, 'iconphoto'):
self.window.iconphoto(True, self.icon_photo)
# 启动图标监控定时器
self._start_icon_monitor()
elif not icon_image:
# 如果全局图标不可用,尝试本地创建
icon_path = get_icon_path()
if icon_path and 'PIL_Image' in globals() and 'PIL_ImageTk' in globals():
icon_image = PIL_Image.open(icon_path)
self.icon_photo = PIL_ImageTk.PhotoImage(icon_image, master=self.window)
if self.window and self.icon_photo:
self.window.wm_iconphoto(True, self.icon_photo)
if hasattr(self.window, 'iconphoto'):
self.window.iconphoto(True, self.icon_photo)
# 启动图标监控定时器
self._start_icon_monitor()
except Exception as e:
print(f"设置主窗口图标失败: {e}")
pass
def _start_icon_monitor(self):
"""启动图标监控定时器,防止图标被系统重置"""
def check_and_restore_icon():
try:
# 检查图标是否仍然有效
if self.window and self.icon_photo:
# 重新应用图标
self.window.wm_iconphoto(True, self.icon_photo)
if hasattr(self.window, 'iconphoto'):
self.window.iconphoto(True, self.icon_photo)
# 每5秒检查一次
self.window.after(5000, check_and_restore_icon)
except Exception:
pass
# 启动第一次检查
self.window.after(5000, check_and_restore_icon)
def show_error_and_exit(self, title, message):
"""显示错误并退出"""
messagebox.showerror(title, message)
sys.exit(1)
def on_closing(self):
"""窗口关闭事件处理 - 优化退出性能"""
try:
# 如果有正在进行的拷贝线程,先停止它
if self.copy_thread and self.copy_thread.is_alive():
if messagebox.askyesno("确认", "有拷贝任务正在进行中,确定要退出吗?"):
# 设置停止标志
self.copy_manager.copying = False
self.copy_manager.verifying = False
# 等待线程结束最多3秒
self.copy_thread.join(timeout=3)
else:
return
# 清理资源
self.cleanup_resources()
# 销毁窗口
self.window.quit()
self.window.destroy()
except Exception as e:
print(f"退出时出错: {e}")
# 强制退出
try:
self.window.quit()
self.window.destroy()
except:
pass
def cleanup_resources(self):
"""清理资源 - 优化退出性能"""
try:
# 关闭日志文件
if hasattr(self, 'copy_manager'):
self.copy_manager.close_log_file()
# 清理UI组件引用帮助垃圾回收
if hasattr(self, 'source_tree'):
self.source_tree.delete(*self.source_tree.get_children())
# 清理线程引用
if hasattr(self, 'copy_thread'):
self.copy_thread = None
# 清理大对象引用
if hasattr(self, 'copy_manager'):
self.copy_manager = None
except Exception as e:
print(f"清理资源时出错: {e}")
def setup_ui(self):
"""设置UI界面"""
# 主框架
self.main_frame = tb.Frame(self.window, padding=20)
self.main_frame.pack(fill="both", expand=True)
main_frame = self.main_frame
header_frame = tb.Frame(main_frame)
header_frame.pack(fill="x")
title_label = tb.Label(header_frame, text="CardCopyer-拷贝乐", font=("Arial", 24, "bold"), bootstyle="primary")
title_label.pack(side="left", pady=(0, 20))
self.christmas_btn = tb.Button(header_frame, text="🎄", bootstyle="success", width=3, command=self.toggle_christmas_theme)
if self.is_christmas_period():
self.christmas_btn.pack(side="right", pady=(0, 20))
# 主要内容区域
content_frame = tb.Frame(main_frame)
content_frame.pack(fill="both", expand=True)
# 左侧 - 源选择
self.setup_source_frame(content_frame)
# 中间列容器
middle_column = tb.Frame(content_frame)
middle_column.pack(side="left", fill="both", expand=True, padx=(0, 10))
# 中间 - 目的地选择
self.setup_destination_frame(middle_column)
# 中间 - 设置
self.setup_settings_frame(middle_column)
# 右侧 - 进度显示
self.setup_progress_frame(content_frame)
# 底部按钮区域
self.setup_bottom_frame(main_frame)
def setup_source_frame(self, parent):
"""设置源选择框架"""
source_frame = ttk.LabelFrame(
parent,
text="源文件夹选择",
bootstyle="primary",
padding=15
)
source_frame.pack(side="left", fill="both", expand=True, padx=(0, 10))
# 文件夹选择区域
folder_select_frame = tb.Frame(source_frame)
folder_select_frame.pack(fill="x", pady=(0, 10))
# 添加文件夹按钮
add_folder_btn = tb.Button(
folder_select_frame,
text="添加文件夹",
bootstyle="success-outline",
command=self.add_source_folder
)
add_folder_btn.pack(side="left", padx=(0, 10))
# 文件夹大小统计
self.folder_size_label = tb.Label(
folder_select_frame,
text="总大小: 0 GB",
font=("Arial", 10),
bootstyle="info"
)
self.folder_size_label.pack(side="left")
# 源项目列表
tb.Label(source_frame, text="已选择的源文件夹:", font=("Arial", 12, "bold")).pack(pady=(10, 5))
source_items_frame = tb.Frame(source_frame)
source_items_frame.pack(fill="both", expand=True)
source_scroll = tb.Scrollbar(source_items_frame)
source_scroll.pack(side="right", fill="y")
self.source_items_listbox = tk.Listbox(
source_items_frame,
yscrollcommand=source_scroll.set,
font=("Arial", 10),
bg="#1e1e1e",
fg="white",
selectbackground="#005a9e"
)
self.source_items_listbox.pack(side="left", fill="both", expand=True)
source_scroll.config(command=self.source_items_listbox.yview)
# 操作按钮区域
button_frame = tb.Frame(source_frame)
button_frame.pack(fill="x", pady=(10, 0))
# 修改名称按钮
rename_folder_btn = tb.Button(
button_frame,
text="修改名称",
bootstyle="warning-outline",
command=self.rename_selected_folder
)
rename_folder_btn.pack(side="left", padx=(0, 10))
# 移除选中文件夹按钮
remove_folder_btn = tb.Button(
button_frame,
text="移除选中",
bootstyle="danger-outline",
command=self.remove_selected_folder
)
remove_folder_btn.pack(side="left", padx=(0, 10))
# 清空所有按钮
clear_all_btn = tb.Button(
button_frame,
text="清空所有",
bootstyle="secondary-outline",
command=self.clear_all_folders
)
clear_all_btn.pack(side="left")
def setup_destination_frame(self, parent):
"""设置目的地选择框架"""
dest_frame = ttk.LabelFrame(
parent,
text="目的地选择",
bootstyle="info",
padding=15
)
dest_frame.pack(fill="x", padx=(0, 10))
# 目的地路径显示
self.dest_path_label = tb.Label(
dest_frame,
text="未选择目的地",
font=("Arial", 11),
bootstyle="secondary",
wraplength=300
)
self.dest_path_label.pack(pady=(0, 15))
# 选择目的地按钮
select_dest_btn = tb.Button(
dest_frame,
text="选择目的地文件夹",
bootstyle="info",
command=self.select_destination
)
select_dest_btn.pack(pady=(0, 10))
# 自动创建文件夹(默认启用,不再显示选框)
self.auto_folder_var = tk.BooleanVar(value=True)
# 自动日期前缀开关
self.auto_date_prefix_var = tk.BooleanVar(value=True)
date_prefix_cb = tb.Checkbutton(
dest_frame,
text="自动添加日期前缀",
variable=self.auto_date_prefix_var,
bootstyle="info-round-toggle",
command=self.update_folder_preview
)
date_prefix_cb.pack(pady=(0, 10))
# 项目名称输入区域
project_frame = tb.Frame(dest_frame)
project_frame.pack(fill="x", pady=(0, 15))
tb.Label(
project_frame,
text="项目名称:",
bootstyle="info"
).pack(side="left", padx=(0, 10))
self.project_name_var = tk.StringVar()
self.project_name_entry = tb.Entry(
project_frame,
textvariable=self.project_name_var,
width=20,
bootstyle="info"
)
self.project_name_entry.pack(side="left", fill="x", expand=True)
# 项目名称提示
tb.Label(
dest_frame,
text="关闭日期前缀时必须输入项目名称",
font=("Arial", 9),
bootstyle="secondary"
).pack(pady=(0, 5))
# 文件夹名称预览
self.folder_preview_label = tb.Label(
dest_frame,
text="",
font=("Arial", 9, "italic"),
bootstyle="info"
)
self.folder_preview_label.pack(pady=(0, 15))
# 目的地信息
self.dest_info_label = tb.Label(
dest_frame,
text="",
font=("Arial", 10),
bootstyle="secondary"
)
self.dest_info_label.pack()
# 绑定项目名称变化事件,实时更新预览
self.project_name_var.trace_add("write", lambda *args: self.update_folder_preview())
self.update_folder_preview()
self.backup_dest_paths = []
self.multi_dest_frame = ttk.LabelFrame(
dest_frame,
text="备用目的地",
bootstyle="secondary",
padding=10
)
backup_list_row = tb.Frame(self.multi_dest_frame)
backup_list_row.pack(fill="x")
self.backup_dest_listbox = tk.Listbox(
backup_list_row,
height=5,
font=("Arial", 10),
bg="#1e1e1e",
fg="white",
selectbackground="#005a9e"
)
self.backup_dest_listbox.pack(side="left", fill="both", expand=True)
backup_scroll = tb.Scrollbar(backup_list_row)
backup_scroll.pack(side="right", fill="y")
self.backup_dest_listbox.config(yscrollcommand=backup_scroll.set)
backup_scroll.config(command=self.backup_dest_listbox.yview)
backup_btn_row = tb.Frame(self.multi_dest_frame)
backup_btn_row.pack(fill="x", pady=(10, 0))
add_backup_btn = tb.Button(
backup_btn_row,
text="添加备用目的地",
bootstyle="info",
command=self.add_backup_destination
)
add_backup_btn.pack(side="left")
remove_backup_btn = tb.Button(
backup_btn_row,
text="移除选中",
bootstyle="danger-outline",
command=self.remove_selected_backup_destination
)
remove_backup_btn.pack(side="left", padx=(10, 0))
set_primary_btn = tb.Button(
backup_btn_row,
text="设为主目的地",
bootstyle="warning-outline",
command=self.set_primary_destination
)
set_primary_btn.pack(side="right")
def setup_settings_frame(self, parent):
settings_frame = ttk.LabelFrame(
parent,
text="设置",
bootstyle="secondary",
padding=10
)
settings_frame.pack(fill="x", pady=(15, 0))
settings_row = tb.Frame(settings_frame)
settings_row.pack(fill="x")
self.only_media_var = tk.BooleanVar(value=False)
only_media_cb = tb.Checkbutton(
settings_row,
text="是否只拷贝媒体文件",
variable=self.only_media_var,
bootstyle="info-round-toggle",
command=self.on_only_media_toggle
)
only_media_cb.pack(side="left")
help_label = tb.Label(
settings_row,
text="?",
font=("Arial", 10, "bold"),
bootstyle="info",
cursor="hand2"
)
help_label.pack(side="left", padx=(8, 0))
Tooltip(help_label, "启用后仅拷贝常见媒体文件图片、视频、音频、RAW。不拷贝文档、工程文件等。")
btn_row = tb.Frame(settings_frame)
btn_row.pack(fill="x", pady=(10, 0))
self.edit_media_btn = tb.Button(
btn_row,
text="编辑媒体文件类型",
bootstyle="info",
command=self.open_media_types_editor,
state="disabled",
width=20
)
self.edit_media_btn.pack(side="left")
self.on_only_media_toggle()
self.multi_dest_var = tk.BooleanVar(value=False)
multi_dest_cb = tb.Checkbutton(
settings_frame,
text="多目的地备份",
variable=self.multi_dest_var,
bootstyle="info-round-toggle",
command=self.on_multi_dest_toggle
)
multi_dest_cb.pack(pady=(10, 0), anchor="w")
def on_only_media_toggle(self):
if self.only_media_var.get():
self.edit_media_btn.config(state="normal")
messagebox.showinfo("提示", "已开启仅拷贝媒体文件,请核对文件类型是否正确。")
else:
self.edit_media_btn.config(state="disabled")
def on_multi_dest_toggle(self):
if hasattr(self, "multi_dest_frame"):
if self.multi_dest_var.get():
self.multi_dest_frame.pack(fill="x", pady=(10, 0))
if hasattr(self, "backup_progress_title"):
self.backup_progress_title.pack(anchor="w", pady=(20, 5))
if hasattr(self, "backup_progress"):
self.backup_progress.pack(fill="x", pady=(5, 15))
if hasattr(self, "backup_status_label"):
self.backup_status_label.pack(anchor="w")
else:
self.multi_dest_frame.forget()
if hasattr(self, "backup_progress_title"):
self.backup_progress_title.pack_forget()
if hasattr(self, "backup_progress"):
self.backup_progress.pack_forget()
self.backup_progress.config(value=0)
if hasattr(self, "backup_status_label"):
self.backup_status_label.pack_forget()
self.backup_status_label.config(text="等待备用拷贝...")
def open_media_types_editor(self):
editor = tk.Toplevel(self.window)
editor.title("编辑媒体文件类型")
editor.geometry("400x500")
editor.transient(self.window)
editor.grab_set()
text = tk.Text(editor, font=("Courier", 11))
text.pack(fill="both", expand=True)
initial = "\n".join(sorted(self.get_media_extensions()))
text.insert("1.0", initial)
button_frame = tb.Frame(editor, padding=10)
button_frame.pack(fill="x")
def save():
content = text.get("1.0", "end").strip().splitlines()
cleaned = set()
for line in content:
s = line.strip().lower()
if not s:
continue
if not s.startswith("."):
s = "." + s
cleaned.add(s)
if cleaned:
self.media_extensions = cleaned
editor.destroy()
save_btn = tb.Button(button_frame, text="保存", bootstyle="success", command=save, width=12)
save_btn.pack(side="right", padx=(10, 0))
cancel_btn = tb.Button(button_frame, text="取消", bootstyle="secondary", command=editor.destroy, width=12)
cancel_btn.pack(side="right")
def setup_progress_frame(self, parent):
"""设置进度显示框架"""
progress_frame = ttk.LabelFrame(
parent,
text="拷贝进度",
bootstyle="success",
padding=15
)
progress_frame.pack(side="left", fill="both", expand=True)
# 拷贝进度
tb.Label(progress_frame, text="拷贝进度:", font=("Arial", 12, "bold")).pack(anchor="w")
self.copy_progress = tb.Progressbar(
progress_frame,
bootstyle="success-striped",
length=300,
mode='determinate'
)
self.copy_progress.pack(fill="x", pady=(5, 15))
self.copy_status_label = tb.Label(
progress_frame,
text="等待开始拷贝...",
font=("Arial", 10)
)
self.copy_status_label.pack(anchor="w")
# 拷贝速度和进度信息
self.copy_speed_label = tb.Label(
progress_frame,
text="速度: 0 MB/s | 已用: 00:00 | 剩余: 00:00",
font=("Arial", 9)
)
self.copy_speed_label.pack(anchor="w", pady=(2, 0))
# 验证进度
tb.Label(progress_frame, text="验证进度:", font=("Arial", 12, "bold")).pack(anchor="w", pady=(20, 5))
self.verify_progress = tb.Progressbar(
progress_frame,
bootstyle="warning-striped",
length=300,
mode='determinate'
)
self.verify_progress.pack(fill="x", pady=(5, 15))
self.verify_status_label = tb.Label(
progress_frame,
text="等待拷贝完成...",
font=("Arial", 10)
)
self.verify_status_label.pack(anchor="w")
# 验证速度信息
self.verify_speed_label = tb.Label(
progress_frame,
text="速度: 0 MB/s | 已用: 00:00 | 剩余: 00:00",
font=("Arial", 9)
)
self.verify_speed_label.pack(anchor="w", pady=(2, 0))
self.backup_progress_title = tb.Label(progress_frame, text="备用拷贝进度:", font=("Arial", 12, "bold"))
self.backup_progress = tb.Progressbar(
progress_frame,
bootstyle="info-striped",
length=300,
mode='determinate'
)
self.backup_status_label = tb.Label(
progress_frame,
text="等待备用拷贝...",
font=("Arial", 10)
)
# 统计信息
stats_frame = tb.Frame(progress_frame)
stats_frame.pack(fill="x", pady=(20, 0))
self.total_files_label = tb.Label(stats_frame, text="总文件数: 0", font=("Arial", 10))
self.total_files_label.pack(side="left")
self.copied_files_label = tb.Label(stats_frame, text="已拷贝: 0", font=("Arial", 10))
self.copied_files_label.pack(side="left", padx=(20, 0))
self.verified_files_label = tb.Label(stats_frame, text="已验证: 0", font=("Arial", 10))
self.verified_files_label.pack(side="left", padx=(20, 0))
# 日志区域
tb.Label(progress_frame, text="操作日志:", font=("Arial", 12, "bold")).pack(anchor="w", pady=(20, 5))
log_frame = tb.Frame(progress_frame)
log_frame.pack(fill="both", expand=True)
log_scroll = tb.Scrollbar(log_frame)
log_scroll.pack(side="right", fill="y")
self.log_text = tk.Text(
log_frame,
height=10,
yscrollcommand=log_scroll.set,
font=("Courier", 9),
bg="#1e1e1e",
fg="white"
)
self.log_text.pack(side="left", fill="both", expand=True)
log_scroll.config(command=self.log_text.yview)
def setup_bottom_frame(self, parent):
"""设置底部按钮框架"""
bottom_frame = tb.Frame(parent)
bottom_frame.pack(fill="x", pady=(20, 0))
# 开始按钮
self.start_btn = tb.Button(
bottom_frame,
text="开始拷贝",
bootstyle="success",
command=self.start_copy,
width=20
)
self.start_btn.pack(side="left", padx=(0, 10))
# 停止按钮
self.stop_btn = tb.Button(
bottom_frame,
text="停止拷贝",
bootstyle="danger",
command=self.stop_copy,
width=20,
state="disabled"
)
self.stop_btn.pack(side="left", padx=(0, 10))
# 清空按钮
clear_btn = tb.Button(
bottom_frame,
text="清空列表",
bootstyle="warning",
command=self.clear_all,
width=15
)
clear_btn.pack(side="left", padx=(0, 10))
# 查看日志按钮
view_logs_btn = tb.Button(
bottom_frame,
text="查看日志",
bootstyle="info",
command=self.open_log_viewer,
width=15
)
view_logs_btn.pack(side="left", padx=(0, 10))
# 版权信息标签(可点击)
copyright_label = tb.Label(
bottom_frame,
text="Copyright ©️ 2025-Now SuperJia 保留所有权利CardCopyer-拷贝乐 v1.1.5(beta) 点击前往官网",
font=("Arial", 9),
bootstyle="secondary",
cursor="hand2" # 鼠标悬停时显示手型光标
)
copyright_label.pack(side="left", padx=(10, 0))
# 绑定点击事件
copyright_label.bind("<Button-1>", self.open_official_website)
# 退出按钮
exit_btn = tb.Button(
bottom_frame,
text="退出",
bootstyle="secondary",
command=self.window.destroy,
width=15
)
exit_btn.pack(side="right")
def is_christmas_period(self):
from datetime import datetime
now = datetime.now()
return now.month == 12 and 20 <= now.day <= 30
def toggle_christmas_theme(self):
if not self.is_christmas_period():
return
if not self.christmas_mode:
try:
self.window.style.theme_use("minty")
except Exception:
try:
self.window.style.theme_use("flatly")
except Exception:
self.window.style.theme_use(self.original_theme)
self.christmas_mode = True
else:
try:
self.window.style.theme_use(self.original_theme)
except Exception:
pass
self.christmas_mode = False
def format_time(self, seconds):
"""格式化时间显示"""
if seconds <= 0:
return "00:00"
minutes = int(seconds // 60)
secs = int(seconds % 60)
if minutes >= 60:
hours = minutes // 60
minutes = minutes % 60
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
else:
return f"{minutes:02d}:{secs:02d}"
def update_total_size(self):
"""更新总大小显示"""
total_size = sum(item['size'] for item in self.source_items)
size_str = self.copy_manager.format_size(total_size)
self.folder_size_label.config(text=f"总大小: {size_str}")
# 如果总大小超过1GB显示更详细的信息
if total_size > 1024**3:
gb_size = total_size / (1024**3)
self.folder_size_label.config(text=f"总大小: {size_str} ({gb_size:.2f} GB)")
def add_source_folder(self):
"""添加源文件夹"""
folder = filedialog.askdirectory(title="选择源文件夹")
if folder:
# 获取文件夹名称和大小
folder_name = os.path.basename(folder)
try:
# 使用复制管理器的文件夹大小计算功能
total_size = self.copy_manager.get_folder_size(folder)
size_str = self.copy_manager.format_size(total_size)
# 弹出对话框让用户自定义命名
custom_name = self.ask_custom_folder_name(folder_name)
if custom_name is None: # 用户取消
return
source_item = {
'path': folder,
'name': folder_name,
'custom_name': custom_name,
'size': total_size,
'display': f"{folder_name} - {size_str}"
}
# 如果自定义名称与原始名称不同,在显示中添加提示
if custom_name != folder_name:
source_item['display'] = f"{folder_name} - {size_str} (→ {custom_name})"
self.source_items.append(source_item)
self.source_items_listbox.insert(tk.END, source_item['display'])
self.log_message(f"添加源文件夹: {folder} ({size_str})")
if custom_name != folder_name:
self.log_message(f"自定义命名为: {custom_name}")
# 更新总大小显示
self.update_total_size()
except Exception as e:
messagebox.showerror("错误", f"无法读取文件夹信息: {str(e)}")
def ask_custom_folder_name(self, original_name):
"""询问用户自定义文件夹名称"""
dialog = tk.Toplevel(self.window)
dialog.title("自定义文件夹名称")
dialog.geometry("400x200")
dialog.transient(self.window)
dialog.grab_set()
# 居中显示
dialog.update_idletasks()
x = (dialog.winfo_screenwidth() - dialog.winfo_width()) // 2
y = (dialog.winfo_screenheight() - dialog.winfo_height()) // 2
dialog.geometry(f"+{x}+{y}")
# 提示信息
info_frame = tb.Frame(dialog, padding=20)
info_frame.pack(fill="both", expand=True)
tb.Label(info_frame, text="为源文件夹设置目标名称", font=("Arial", 12, "bold")).pack(pady=(0, 10))
tb.Label(info_frame, text=f"原始名称: {original_name}", font=("Arial", 10)).pack(pady=(0, 15))
# 输入框
name_var = tk.StringVar(value=original_name)
entry = tb.Entry(info_frame, textvariable=name_var, width=40, font=("Arial", 11))
entry.pack(pady=(0, 20))
entry.focus()
entry.select_range(0, tk.END)
# 按钮区域
button_frame = tb.Frame(info_frame)
button_frame.pack(fill="x")
def on_ok():
dialog.result = name_var.get().strip()
dialog.destroy()
def on_cancel():
dialog.result = None
dialog.destroy()
tb.Button(button_frame, text="确定", bootstyle="success", command=on_ok).pack(side="right", padx=(10, 0))
tb.Button(button_frame, text="取消", bootstyle="secondary", command=on_cancel).pack(side="right")
# 绑定回车键
entry.bind("<Return>", lambda e: on_ok())
entry.bind("<Escape>", lambda e: on_cancel())
# 等待对话框关闭
dialog.wait_window(dialog)
return getattr(dialog, 'result', None)
def rename_selected_folder(self):
"""修改选中文件夹的自定义名称"""
selection = self.source_items_listbox.curselection()
if not selection:
messagebox.showinfo("提示", "请先选择要修改名称的文件夹")
return
if len(selection) > 1:
messagebox.showinfo("提示", "一次只能修改一个文件夹的名称")
return
index = selection[0]
source_item = self.source_items[index]
# 询问新的自定义名称
new_custom_name = self.ask_custom_folder_name(source_item['name'])
if new_custom_name is None: # 用户取消
return
# 更新自定义名称
old_custom_name = source_item.get('custom_name', source_item['name'])
source_item['custom_name'] = new_custom_name
# 更新显示
size_str = self.copy_manager.format_size(source_item['size'])
if new_custom_name != source_item['name']:
source_item['display'] = f"{source_item['name']} - {size_str} (→ {new_custom_name})"
else:
source_item['display'] = f"{source_item['name']} - {size_str}"
# 更新列表框
self.source_items_listbox.delete(index)
self.source_items_listbox.insert(index, source_item['display'])
self.source_items_listbox.selection_set(index)
# 记录日志
if old_custom_name != new_custom_name:
self.log_message(f"修改文件夹名称: {source_item['path']}")
self.log_message(f"'{old_custom_name}' 改为 '{new_custom_name}'")
def remove_selected_folder(self):
"""移除选中的文件夹"""
selection = self.source_items_listbox.curselection()
if selection:
# 从后往前删除,避免索引问题
for index in reversed(selection):
removed_item = self.source_items.pop(index)
self.source_items_listbox.delete(index)
self.log_message(f"移除源文件夹: {removed_item['path']}")
# 更新总大小显示
self.update_total_size()
else:
messagebox.showinfo("提示", "请先选择要移除的文件夹")
def clear_all_folders(self):
"""清空所有文件夹"""
if not self.source_items:
messagebox.showinfo("提示", "当前没有选择任何文件夹")
return
# 确认对话框
result = messagebox.askyesno(
"确认清空",
f"确定要清空所有 {len(self.source_items)} 个文件夹吗?\n\n此操作不会删除实际文件,只是取消选择。"
)
if result:
self.source_items.clear()
self.source_items_listbox.delete(0, tk.END)
self.update_total_size()
self.log_message("清空所有源文件夹")
def select_destination(self):
"""选择目的地"""
folder = filedialog.askdirectory(title="选择目的地文件夹")
if folder:
self.destination_path = folder
self.dest_path_label.config(text=folder)
self.update_folder_preview()
# 更新目的地信息
try:
usage = psutil.disk_usage(folder)
free_gb = usage.free / (1024**3)
self.dest_info_label.config(text=f"可用空间: {free_gb:.2f}GB")
except:
self.dest_info_label.config(text="无法获取空间信息")
self.log_message(f"选择目的地: {folder}")
# 同步UI
if self.multi_dest_var.get():
pass
def add_backup_destination(self):
if len(self.backup_dest_paths) >= 10:
messagebox.showwarning("提示", "最多只能添加10个备用目的地")
return
folder = filedialog.askdirectory(title="选择备用目的地文件夹")
if folder:
if folder == self.destination_path or folder in self.backup_dest_paths:
messagebox.showinfo("提示", "该目的地已存在或与主目的地重复")
return
self.backup_dest_paths.append(folder)
self.backup_dest_listbox.insert(tk.END, folder)
self.log_message(f"添加备用目的地: {folder}")
def remove_selected_backup_destination(self):
selection = self.backup_dest_listbox.curselection()
if not selection:
messagebox.showinfo("提示", "请先选择要移除的备用目的地")
return
for idx in reversed(selection):
removed = self.backup_dest_paths.pop(idx)
self.backup_dest_listbox.delete(idx)
self.log_message(f"移除备用目的地: {removed}")
def set_primary_destination(self):
selection = self.backup_dest_listbox.curselection()
if not selection:
messagebox.showinfo("提示", "请从备用目的地列表中选择一个作为主目的地")
return
if len(selection) > 1:
messagebox.showinfo("提示", "一次只能设置一个主目的地")
return
idx = selection[0]
new_primary = self.backup_dest_paths.pop(idx)
old_primary = self.destination_path
self.destination_path = new_primary
self.dest_path_label.config(text=new_primary)
self.update_folder_preview()
if old_primary:
self.backup_dest_paths.append(old_primary)
self.backup_dest_listbox.insert(tk.END, old_primary)
self.backup_dest_listbox.delete(idx)
self.log_message(f"设置主目的地为: {new_primary}")
def update_folder_preview(self):
"""更新文件夹名称预览"""
# 生成预览文件夹名称
from datetime import datetime
project_name = self.project_name_var.get().strip()
# 清理项目名称中的特殊字符
safe_project_name = ""
if project_name:
safe_project_name = "".join(c for c in project_name if c.isalnum() or c in "-_ ")
safe_project_name = safe_project_name.strip().replace(" ", "_")
if self.auto_date_prefix_var.get():
date_str = datetime.now().strftime("%Y%m%d_%H%M%S")
if safe_project_name:
folder_name = f"{date_str}_{safe_project_name}"
else:
folder_name = date_str
else:
# 不使用日期前缀
if safe_project_name:
folder_name = safe_project_name
else:
folder_name = "(需输入项目名称)"
self.folder_preview_label.config(text=f"📁 将创建文件夹: {folder_name}")
def get_media_extensions(self):
return getattr(self, "media_extensions", set())
def is_media_file(self, file_path):
ext = os.path.splitext(file_path)[1].lower()
return ext in self.get_media_extensions()
def start_copy(self):
"""开始拷贝"""
if not self.source_items:
messagebox.showwarning("警告", "请先选择源文件夹")
return
if not self.destination_path:
messagebox.showwarning("警告", "请先选择目的地")
return
# 验证项目名称
if not self.auto_date_prefix_var.get():
project_name = self.project_name_var.get().strip()
if not project_name:
messagebox.showwarning("警告", "关闭自动日期前缀后,必须输入项目名称!")
return
# 检查有效字符
safe_name = "".join(c for c in project_name if c.isalnum() or c in "-_ ")
if not safe_name.strip():
messagebox.showwarning("警告", "项目名称必须包含字母、数字、下划线或连字符!")
return
# 禁用开始按钮,启用停止按钮
self.start_btn.config(state="disabled")
self.stop_btn.config(state="normal")
# 开始拷贝线程
self.copy_thread = threading.Thread(target=self.copy_process)
self.copy_thread.daemon = True
self.copy_thread.start()
def stop_copy(self):
"""停止拷贝"""
self.copy_manager.copying = False
self.log_message("用户停止拷贝操作")
def copy_process(self):
"""拷贝过程"""
import time
try:
self.copy_manager.copying = True
self.copy_manager.total_files = 0
self.copy_manager.copied_files = 0
self.copy_manager.verified_files = 0
self.copy_manager.total_size = 0
self.copy_manager.copied_size = 0
# 重置日期文件夹,确保每次拷贝都使用新的时间戳
self.copy_manager.date_folder = None
# 初始化日志文件
log_dir = get_log_directory()
session_name = self.project_name_var.get().strip() or "untitled"
log_path = self.copy_manager.init_log_file(log_dir, session_name)
if log_path:
self.log_message(f"📋 日志文件已创建: {log_path}")
else:
self.log_message("⚠️ 日志文件创建失败,继续拷贝...")
# 创建目标文件夹
if self.auto_folder_var.get():
# 生成文件夹名称
if self.copy_manager.date_folder is None:
project_name = self.project_name_var.get().strip()
# 清理项目名称
safe_project_name = ""
if project_name:
safe_project_name = "".join(c for c in project_name if c.isalnum() or c in "-_ ")
safe_project_name = safe_project_name.strip().replace(" ", "_")
if self.auto_date_prefix_var.get():
date_str = datetime.now().strftime("%Y%m%d_%H%M%S")
if safe_project_name:
self.copy_manager.date_folder = f"{date_str}_{safe_project_name}"
else:
self.copy_manager.date_folder = date_str
else:
# 不使用日期前缀,使用项目名称
self.copy_manager.date_folder = safe_project_name
final_dest = os.path.join(self.destination_path, self.copy_manager.date_folder)
else:
final_dest = self.destination_path
os.makedirs(final_dest, exist_ok=True)
self.log_message(f"创建目标文件夹: {final_dest}")
if self.auto_folder_var.get() and self.copy_manager.date_folder:
self.log_message(f"使用日期文件夹: {self.copy_manager.date_folder}")
# 统计总文件数和总大小
self.log_message("正在统计文件...")
for source_item in self.source_items:
for root, dirs, files in os.walk(source_item['path']):
for file in files:
file_path = os.path.join(root, file)
if self.only_media_var.get() and not self.is_media_file(file_path):
continue
self.copy_manager.total_files += 1
try:
self.copy_manager.total_size += os.path.getsize(file_path)
except:
pass
self.update_stats()
self.log_message(f"总计 {self.copy_manager.total_files} 个文件 ({self.copy_manager.format_size(self.copy_manager.total_size)})")
# 记录开始时间
self.copy_manager.copy_start_time = time.time()
# 开始拷贝
for source_item in self.source_items:
if not self.copy_manager.copying:
break
# 使用自定义名称进行拷贝
folder_name = source_item.get('custom_name', source_item['name'])
self.copy_folder(source_item['path'], final_dest, folder_name)
# 开始验证
if self.copy_manager.copying and self.copy_manager.copied_files > 0:
self.verify_files()
if self.copy_manager.copying and self.multi_dest_var.get() and self.backup_dest_paths:
self.log_message("开始拷贝到备用目的地")
self.copy_manager.backup_copying = True
if self.auto_folder_var.get():
backup_date_folder = self.copy_manager.date_folder
self.copy_manager.total_backup_destinations = len(self.backup_dest_paths)
for backup_dest in self.backup_dest_paths:
if not self.copy_manager.copying:
break
self.copy_manager.current_backup_index += 1
self.copy_manager.backup_start_time = time.time()
self.copy_manager.backup_total_files = self.copy_manager.total_files
self.copy_manager.backup_total_size = self.copy_manager.total_size
self.copy_manager.backup_copied_files = 0
self.copy_manager.backup_copied_size = 0
self.backup_progress.config(value=0)
if self.auto_folder_var.get():
backup_final_dest = os.path.join(backup_dest, backup_date_folder)
else:
backup_final_dest = backup_dest
os.makedirs(backup_final_dest, exist_ok=True)
self.log_message(f"创建备用目标文件夹: {backup_final_dest}")
self.backup_status_label.config(text=f"正在拷贝到备用目的地 {self.copy_manager.current_backup_index}/{self.copy_manager.total_backup_destinations}: {backup_dest}")
for source_item in self.source_items:
if not self.copy_manager.copying:
break
folder_name = source_item.get('custom_name', source_item['name'])
self.copy_folder(source_item['path'], backup_final_dest, folder_name)
self.backup_progress.config(value=100)
self.copy_manager.backup_copying = False
self.log_message("备用目的地拷贝完成")
self.verify_backup_destinations()
# 完成
if self.copy_manager.copying:
self.copy_complete()
else:
self.copy_stopped()
except Exception as e:
self.log_message(f"拷贝过程出错: {str(e)}")
messagebox.showerror("错误", f"拷贝过程出错: {str(e)}")
finally:
# 恢复按钮状态
self.start_btn.config(state="normal")
self.stop_btn.config(state="disabled")
def copy_file_with_progress(self, source_file, dest_file, file_size):
"""优化的分块拷贝文件,支持实时进度更新"""
import time
# 根据文件大小调整块大小 - 优化大文件处理
if file_size > 500 * 1024 * 1024: # 大于500MB
chunk_size = 16 * 1024 * 1024 # 16MB块
elif file_size > 100 * 1024 * 1024: # 大于100MB
chunk_size = 8 * 1024 * 1024 # 8MB块
elif file_size > 10 * 1024 * 1024: # 大于10MB
chunk_size = 4 * 1024 * 1024 # 4MB块
else:
chunk_size = 1024 * 1024 # 1MB块
copied_size = 0
last_update_time = time.time()
last_progress_log = 0 # 上次记录进度的时间
update_interval = 0.2 # 减少更新频率到200ms
# 对于大文件,显示开始拷贝信息
if file_size > 50 * 1024 * 1024:
self.log_message(f"📁 开始拷贝大文件: {os.path.basename(source_file)} ({self.copy_manager.format_size(file_size)})")
try:
# 使用缓冲IO提高性能
with open(source_file, 'rb', buffering=chunk_size) as src:
with open(dest_file, 'wb', buffering=chunk_size) as dst:
while True:
if not self.copy_manager.copying:
break
# 读取数据块
chunk = src.read(chunk_size)
if not chunk:
break
# 写入数据块
dst.write(chunk)
copied_size += len(chunk)
# 按间隔更新进度,避免过于频繁
current_time = time.time()
if current_time - last_update_time >= update_interval:
# 对于大文件,记录进度百分比(减少日志频率)
if file_size > 100 * 1024 * 1024 and current_time - last_progress_log >= 10: # 每10秒记录一次
progress_percent = (copied_size / file_size) * 100
self.log_message(f"⏳ 拷贝进度: {progress_percent:.1f}% ({self.copy_manager.format_size(copied_size)}/{self.copy_manager.format_size(file_size)})")
last_progress_log = current_time
# 批量更新进度减少UI更新频率
if not self.copy_manager.backup_copying and self.copy_manager.total_size > 0:
file_progress_ratio = copied_size / file_size if file_size > 0 else 0
temp_copied_size = self.copy_manager.copied_size + (file_size * file_progress_ratio)
temp_copied_size = min(temp_copied_size, self.copy_manager.total_size)
if abs(temp_copied_size - self.copy_manager.copied_size) > (self.copy_manager.total_size * 0.01):
original_copied_size = self.copy_manager.copied_size
self.copy_manager.copied_size = temp_copied_size
self.update_progress()
self.copy_manager.copied_size = original_copied_size
self.window.update()
last_update_time = current_time
except Exception as e:
# 如果分块拷贝失败,回退到标准拷贝
self.log_message(f"分块拷贝失败,回退到标准拷贝: {str(e)}")
shutil.copy2(source_file, dest_file)
def copy_folder(self, source_path, dest_path, folder_name):
"""拷贝文件夹"""
import time
target_path = os.path.join(dest_path, folder_name)
# 调试信息:路径构建
self.log_message(f"📁 拷贝文件夹信息:")
self.log_message(f" 源路径: {source_path}")
self.log_message(f" 目标基础路径: {dest_path}")
self.log_message(f" 文件夹名称: {folder_name}")
self.log_message(f" 完整目标路径: {target_path}")
os.makedirs(target_path, exist_ok=True)
for root, dirs, files in os.walk(source_path):
if not self.copy_manager.copying:
break
# 创建子目录
rel_path = os.path.relpath(root, source_path)
if rel_path == '.':
current_dest = target_path
else:
current_dest = os.path.join(target_path, rel_path)
# 调试信息:相对路径处理
self.log_message(f"📂 相对路径处理:")
self.log_message(f" 源根目录: {source_path}")
self.log_message(f" 当前源目录: {root}")
self.log_message(f" 相对路径: {rel_path}")
self.log_message(f" 目标根目录: {target_path}")
self.log_message(f" 当前目标目录: {current_dest}")
# 调试信息:目录创建
self.log_message(f"📂 创建目录: {current_dest}")
try:
os.makedirs(current_dest, exist_ok=True)
self.log_message(f" ✅ 目录创建成功: {current_dest}")
self.log_message(f" 📍 目录存在: {os.path.exists(current_dest)}")
except Exception as e:
self.log_message(f" ❌ 目录创建失败: {current_dest} - {str(e)}")
# 拷贝文件
for file in files:
if not self.copy_manager.copying:
break
source_file = os.path.join(root, file)
dest_file = os.path.join(current_dest, file)
if self.only_media_var.get() and not self.is_media_file(source_file):
continue
try:
# 获取文件大小
file_size = os.path.getsize(source_file)
copy_start = time.time()
# 调试信息
self.log_message(f"📁 拷贝文件: {file}")
self.log_message(f" 从: {source_file}")
self.log_message(f" 到: {dest_file}")
# 分块拷贝文件,支持实时进度更新
self.copy_file_with_progress(source_file, dest_file, file_size)
# 验证拷贝结果
if os.path.exists(dest_file):
self.log_message(f" ✅ 拷贝成功: {file}")
# 验证文件大小
source_size = os.path.getsize(source_file)
dest_size = os.path.getsize(dest_file)
if source_size == dest_size:
self.log_message(f" ✅ 文件大小匹配: {source_size} bytes")
else:
self.log_message(f" ⚠️ 文件大小不匹配: 源={source_size}, 目标={dest_size}")
else:
self.log_message(f" ❌ 拷贝后文件不存在: {file}")
# 检查父目录
parent_dir = os.path.dirname(dest_file)
self.log_message(f" 📍 父目录: {parent_dir}")
self.log_message(f" 📂 父目录存在: {os.path.exists(parent_dir)}")
if os.path.exists(parent_dir):
files = os.listdir(parent_dir)
self.log_message(f" 📄 父目录内容: {files}")
# 文件拷贝完成,更新进度
copy_time = time.time() - copy_start
if self.copy_manager.backup_copying:
self.copy_manager.backup_copied_files += 1
self.copy_manager.backup_copied_size += file_size
self.update_backup_progress()
else:
self.copy_manager.copied_files += 1
self.copy_manager.copied_size += file_size
if copy_time > 0:
file_speed = file_size / copy_time
self.copy_manager.copy_speed = file_speed
self.update_progress()
self.log_message(f"已拷贝: {file} ({self.copy_manager.format_size(file_size)})")
except Exception as e:
self.log_message(f"拷贝失败 {file}: {str(e)}")
def verify_files(self):
"""验证文件"""
import time
self.copy_manager.verifying = True
self.copy_manager.verify_start_time = time.time()
self.copy_manager.md5_start_time = time.time() # MD5验证开始时间
self.copy_manager.md5_verified_files = 0
self.copy_manager.md5_calc_size = 0
# 统计需要验证的文件总数
total_files = 0
for source_item in self.source_items:
for root, dirs, files in os.walk(source_item['path']):
total_files += len(files)
self.copy_manager.total_md5_files = total_files
self.log_message(f"开始MD5验证{total_files} 个文件...")
# 构建与拷贝时相同的目标路径
if self.auto_folder_var.get():
# 使用拷贝时保存的日期文件夹,确保时间戳一致
if self.copy_manager.date_folder is not None:
final_dest = os.path.join(self.destination_path, self.copy_manager.date_folder)
else:
# 如果date_folder不存在生成新的这种情况不应该发生
final_dest = self.destination_path
else:
final_dest = self.destination_path
# 这里应该实现MD5验证逻辑
# 简化版本:只检查文件是否存在
for source_item in self.source_items:
if not self.copy_manager.copying:
break
source_path = source_item['path']
# 使用与拷贝时相同的路径构建逻辑,包括自定义名称
folder_name = source_item.get('custom_name', source_item['name'])
dest_path = os.path.join(final_dest, folder_name)
# 调试信息:验证路径
self.log_message(f"🔍 验证路径构建:")
self.log_message(f" 源项目路径: {source_item['path']}")
self.log_message(f" 源项目名称: {source_item['name']}")
self.log_message(f" 最终目标路径: {final_dest}")
self.log_message(f" 验证目标路径: {dest_path}")
if self.auto_folder_var.get() and self.copy_manager.date_folder:
self.log_message(f" 使用日期文件夹: {self.copy_manager.date_folder}")
self.verify_folder(source_path, dest_path)
# 验证完成,显示总结
import time
elapsed_time = time.time() - self.copy_manager.md5_start_time
if elapsed_time > 0:
self.copy_manager.md5_calc_speed = self.copy_manager.md5_calc_size / elapsed_time
self.log_message("\n" + "="*60)
self.log_message("🎉 MD5验证完成")
self.log_message(f"📊 验证统计:")
self.log_message(f" 总文件数: {self.copy_manager.total_md5_files}")
self.log_message(f" 验证成功: {self.copy_manager.verified_files}")
self.log_message(f" 验证失败: {self.copy_manager.total_md5_files - self.copy_manager.verified_files}")
def verify_backup_destinations(self):
if not (self.multi_dest_var.get() and self.backup_dest_paths):
return
results = []
if self.auto_folder_var.get():
backup_date_folder = self.copy_manager.date_folder
for backup_dest in self.backup_dest_paths:
if not self.copy_manager.copying:
break
if self.auto_folder_var.get():
backup_final_dest = os.path.join(backup_dest, backup_date_folder)
else:
backup_final_dest = backup_dest
total_files = 0
verified_files = 0
for source_item in self.source_items:
source_path = source_item['path']
folder_name = source_item.get('custom_name', source_item['name'])
base_dest = os.path.join(backup_final_dest, folder_name)
for root, dirs, files in os.walk(source_path):
rel_path = os.path.relpath(root, source_path)
if rel_path == '.':
current_dest = base_dest
else:
current_dest = os.path.join(base_dest, rel_path)
for file in files:
source_file = os.path.join(root, file)
if self.only_media_var.get() and not self.is_media_file(source_file):
continue
total_files += 1
dest_file = os.path.join(current_dest, file)
if os.path.exists(dest_file):
try:
if os.path.getsize(source_file) == os.path.getsize(dest_file):
verified_files += 1
except:
verified_files += 1
results.append({
"destination": backup_dest,
"total": total_files,
"verified": verified_files
})
self.copy_manager.backup_results = results
self.log_message("备用目的地验证完成")
self.copy_manager.verifying = False
def verify_folder(self, source_path, dest_path):
"""验证文件夹"""
from md5_verifier import MD5Verifier
verifier = MD5Verifier()
for root, dirs, files in os.walk(source_path):
if not self.copy_manager.copying:
break
rel_path = os.path.relpath(root, source_path)
if rel_path == '.':
current_dest = dest_path
else:
current_dest = os.path.join(dest_path, rel_path)
for file in files:
if not self.copy_manager.copying:
break
source_file = os.path.join(root, file)
dest_file = os.path.join(current_dest, file)
# 更新MD5验证进度
self.copy_manager.md5_verified_files += 1
# 计算文件大小用于速度统计
try:
file_size = os.path.getsize(source_file)
self.copy_manager.md5_calc_size += file_size
except:
file_size = 0
# 调试信息
self.log_message(f"🔍 检查文件: {file}")
self.log_message(f" 源路径: {source_file}")
self.log_message(f" 目标路径: {dest_file}")
self.log_message(f" 目标存在: {os.path.exists(dest_file)}")
if os.path.exists(dest_file):
try:
# 计算MD5验证进度和速度
import time
elapsed_time = time.time() - self.copy_manager.md5_start_time
# 确保时间不为负数(防止系统时间被修改)
if elapsed_time < 0:
self.log_message(f"⚠️ 检测到负MD5时间: {elapsed_time:.2f}s重置为0")
elapsed_time = 0
if elapsed_time > 0:
self.copy_manager.md5_calc_speed = self.copy_manager.md5_calc_size / elapsed_time
md5_progress = (self.copy_manager.md5_verified_files / self.copy_manager.total_md5_files) * 100
# 使用MD5验证文件 - 显示详细进度
self.log_message(f"🔍 [{md5_progress:.1f}%] 开始MD5验证: {file}")
self.log_message(f" 进度: {self.copy_manager.md5_verified_files}/{self.copy_manager.total_md5_files} 文件")
self.log_message(f" 速度: {self.copy_manager.format_size(int(self.copy_manager.md5_calc_speed))}/s")
# 计算源文件MD5
self.log_message(f" 计算源文件MD5...")
source_md5 = verifier.calculate_md5(source_file)
self.log_message(f" 源MD5: {source_md5}")
# 计算目标文件MD5
self.log_message(f" 计算目标文件MD5...")
dest_md5 = verifier.calculate_md5(dest_file)
self.log_message(f" 目标MD5: {dest_md5}")
# 对比MD5值
if source_md5 == dest_md5:
self.copy_manager.verified_files += 1
self.update_verify_progress()
self.log_message(f" ✅ MD5匹配: {file}")
self.log_message(f" 哈希值: {source_md5}")
else:
self.log_message(f" ❌ MD5不匹配: {file}")
self.log_message(f" 源哈希: {source_md5}")
self.log_message(f" 目标哈希: {dest_md5}")
except Exception as e:
self.log_message(f" ❌ MD5验证错误: {file} - {str(e)}")
self.log_message(f" 错误详情: {str(e)}")
else:
self.log_message(f"⚠️ 文件不存在: {file}")
# 检查父目录是否存在
parent_dir = os.path.dirname(dest_file)
self.log_message(f" 父目录存在: {os.path.exists(parent_dir)}")
if os.path.exists(parent_dir):
# 列出父目录中的文件
try:
files_in_dir = os.listdir(parent_dir)
self.log_message(f" 父目录中的文件: {files_in_dir}")
except:
self.log_message(f" 无法读取父目录")
def update_progress(self):
"""更新拷贝进度"""
import time
if self.copy_manager.total_files > 0:
# 文件进度
file_progress = (self.copy_manager.copied_files / self.copy_manager.total_files) * 100
self.copy_progress.config(value=file_progress)
# 大小进度
size_progress = 0
if self.copy_manager.total_size > 0:
size_progress = (self.copy_manager.copied_size / self.copy_manager.total_size) * 100
# 时间计算
elapsed_time = 0
if self.copy_manager.copy_start_time > 0:
elapsed_time = time.time() - self.copy_manager.copy_start_time
# 确保时间不为负数(防止系统时间被修改)
if elapsed_time < 0:
self.log_message(f"⚠️ 检测到负时间: {elapsed_time:.2f}s重置为0")
elapsed_time = 0
# 速度计算
speed_mb_s = 0
if elapsed_time > 0 and self.copy_manager.copied_size >= 0:
speed_mb_s = (self.copy_manager.copied_size / (1024 * 1024)) / elapsed_time
# 确保速度不为负数
speed_mb_s = max(0, speed_mb_s)
# 调试信息:如果速度异常,记录详细信息
if speed_mb_s < 0 or speed_mb_s > 10000: # 异常速度负数或超过10GB/s
self.log_message(f"⚠️ 速度异常: {speed_mb_s:.2f} MB/s")
self.log_message(f" 已拷贝大小: {self.copy_manager.copied_size} bytes ({self.copy_manager.format_size(self.copy_manager.copied_size)})")
self.log_message(f" 总大小: {self.copy_manager.total_size} bytes ({self.copy_manager.format_size(self.copy_manager.total_size)})")
self.log_message(f" 已用时间: {elapsed_time:.2f} seconds")
self.log_message(f" 开始时间: {self.copy_manager.copy_start_time}")
self.log_message(f" 当前时间: {time.time()}")
# 预估剩余时间
eta_seconds = 0
if speed_mb_s > 0 and self.copy_manager.total_size > self.copy_manager.copied_size:
remaining_mb = (self.copy_manager.total_size - self.copy_manager.copied_size) / (1024 * 1024)
eta_seconds = remaining_mb / speed_mb_s
# 格式化时间显示
elapsed_str = self.format_time(elapsed_time)
eta_str = self.format_time(eta_seconds)
# 更新状态标签
self.copy_status_label.config(
text=f"已拷贝 {self.copy_manager.copied_files}/{self.copy_manager.total_files} 个文件 ({file_progress:.1f}%)"
)
self.copy_speed_label.config(
text=f"速度: {speed_mb_s:.1f} MB/s | 已用: {elapsed_str} | 剩余: {eta_str}"
)
self.update_stats()
def update_verify_progress(self):
"""更新验证进度"""
import time
if self.copy_manager.total_files > 0:
# 文件进度
file_progress = (self.copy_manager.verified_files / self.copy_manager.total_files) * 100
self.verify_progress.config(value=file_progress)
# 时间计算
elapsed_time = 0
if self.copy_manager.verify_start_time > 0:
elapsed_time = time.time() - self.copy_manager.verify_start_time
# 确保时间不为负数(防止系统时间被修改)
if elapsed_time < 0:
self.log_message(f"⚠️ 检测到负验证时间: {elapsed_time:.2f}s重置为0")
elapsed_time = 0
# 速度计算(基于文件数量估算)
verify_speed = 0
if elapsed_time > 0:
verify_speed = self.copy_manager.verified_files / elapsed_time # 文件/秒
# 预估剩余时间
eta_seconds = 0
if verify_speed > 0 and self.copy_manager.total_files > self.copy_manager.verified_files:
remaining_files = self.copy_manager.total_files - self.copy_manager.verified_files
eta_seconds = remaining_files / verify_speed
# 格式化时间显示
elapsed_str = self.format_time(elapsed_time)
eta_str = self.format_time(eta_seconds)
# 更新状态标签
self.verify_status_label.config(
text=f"已验证 {self.copy_manager.verified_files}/{self.copy_manager.total_files} 个文件 ({file_progress:.1f}%)"
)
self.verify_speed_label.config(
text=f"速度: {verify_speed:.1f} 文件/秒 | 已用: {elapsed_str} | 剩余: {eta_str}"
)
self.update_stats()
def update_backup_progress(self):
import time
if self.copy_manager.backup_total_files > 0:
file_progress = (self.copy_manager.backup_copied_files / self.copy_manager.backup_total_files) * 100
self.backup_progress.config(value=file_progress)
elapsed_time = 0
if self.copy_manager.backup_start_time > 0:
elapsed_time = time.time() - self.copy_manager.backup_start_time
if elapsed_time < 0:
elapsed_time = 0
speed_files_s = 0
if elapsed_time > 0:
speed_files_s = self.copy_manager.backup_copied_files / elapsed_time
remaining_files = max(0, self.copy_manager.backup_total_files - self.copy_manager.backup_copied_files)
eta_seconds = 0
if speed_files_s > 0:
eta_seconds = remaining_files / speed_files_s
elapsed_str = self.format_time(elapsed_time)
eta_str = self.format_time(eta_seconds)
self.backup_status_label.config(text=f"备用拷贝 {self.copy_manager.current_backup_index}/{self.copy_manager.total_backup_destinations} | 速度: {speed_files_s:.1f} 文件/秒 | 已用: {elapsed_str} | 剩余: {eta_str}")
self.window.update()
def update_stats(self):
"""更新统计信息"""
self.total_files_label.config(text=f"总文件数: {self.copy_manager.total_files}")
self.copied_files_label.config(text=f"已拷贝: {self.copy_manager.copied_files}")
self.verified_files_label.config(text=f"已验证: {self.copy_manager.verified_files}")
def copy_complete(self):
"""拷贝完成"""
self.log_message("拷贝和验证完成!")
self.copy_progress.config(value=100)
self.verify_progress.config(value=100)
self.copy_status_label.config(text="拷贝完成!")
self.verify_status_label.config(text="验证完成!")
if self.multi_dest_var.get() and self.backup_dest_paths:
self.backup_progress.config(value=100)
self.backup_status_label.config(text="备用拷贝完成!")
# 仅媒体拷贝提示
try:
if hasattr(self, "only_media_var") and self.only_media_var.get():
messagebox.showwarning(
"提示",
"已开启“仅拷贝媒体文件”。请注意:文档、工程、缓存等非媒体文件可能未被拷贝。\n\n"
"建议立即核对源与目标文件夹,确认是否需要补拷。"
)
except Exception:
pass
# 关闭日志文件
self.copy_manager.close_log_file()
# 计算统计信息
total_time = 0
avg_speed = 0
if self.copy_manager.copy_start_time > 0:
total_time = time.time() - self.copy_manager.copy_start_time
if total_time > 0 and self.copy_manager.copied_size > 0:
avg_speed = (self.copy_manager.copied_size / (1024 * 1024)) / total_time
# 验证状态
verify_status = "已完成" if self.copy_manager.verified_files > 0 else "未验证"
if self.copy_manager.verified_files > 0 and self.copy_manager.total_files > 0:
verify_status = f"已完成 ({self.copy_manager.verified_files}/{self.copy_manager.total_files})"
# 显示增强版庆祝动画
self.celebrate_completion_with_stats(
total_files=self.copy_manager.total_files,
total_size=self.copy_manager.copied_size,
avg_speed=avg_speed,
total_time=total_time,
verify_status=verify_status,
backup_results=self.copy_manager.backup_results if (self.multi_dest_var.get() and self.backup_dest_paths) else []
)
def copy_stopped(self):
"""拷贝被停止"""
self.log_message("拷贝已停止")
self.copy_status_label.config(text="拷贝已停止")
self.verify_status_label.config(text="验证已停止")
# 关闭日志文件
self.copy_manager.close_log_file()
def open_log_viewer(self):
"""打开日志查看器"""
try:
# 在打开日志查看器之前,确保主窗口图标已设置
self.restore_main_window_icon()
log_viewer = LogViewerWindow()
log_viewer.mainloop()
# 日志查看器关闭后,重新设置主窗口图标以确保一致性
self.restore_main_window_icon()
except Exception as e:
messagebox.showerror("错误", f"无法打开日志查看器: {str(e)}")
def restore_main_window_icon(self):
"""恢复主窗口图标 - 增强版本"""
try:
icon_path = get_icon_path()
if icon_path and 'PIL_Image' in globals() and 'PIL_ImageTk' in globals():
icon_image = PIL_Image.open(icon_path)
self.icon_photo = PIL_ImageTk.PhotoImage(icon_image) # 重新创建图标引用
# 使用多种方法设置图标,确保兼容性
if self.window:
self.window.wm_iconphoto(True, self.icon_photo)
if hasattr(self.window, 'iconphoto'):
self.window.iconphoto(True, self.icon_photo)
# 额外:强制刷新窗口属性
self.window.update_idletasks()
except Exception as e:
print(f"恢复主窗口图标失败: {e}")
pass # 如果恢复失败,保持当前状态
def open_official_website(self, event=None):
"""打开官方网站"""
try:
import webbrowser
webbrowser.open("https://dit.superjia.com.cn")
except Exception as e:
messagebox.showerror("错误", f"无法打开官网: {str(e)}")
def celebrate_completion_with_stats(self, total_files, total_size, avg_speed, total_time, verify_status, backup_results=None):
"""增强版庆祝完成动画,包含统计信息"""
# 创建庆祝窗口
celebrate_window = tk.Toplevel(self.window)
celebrate_window.title("🎉 拷贝完成!")
celebrate_window.geometry("500x400")
celebrate_window.transient(self.window)
celebrate_window.configure(bg="#1e1e1e")
# 居中显示
celebrate_window.update_idletasks()
x = (celebrate_window.winfo_screenwidth() // 2) - (celebrate_window.winfo_width() // 2)
y = (celebrate_window.winfo_screenheight() // 2) - (celebrate_window.winfo_height() // 2)
celebrate_window.geometry(f"+{x}+{y}")
# 阻止窗口关闭按钮(用户必须通过按钮操作)
celebrate_window.protocol("WM_DELETE_WINDOW", lambda: None)
# 主框架
main_frame = ttk.Frame(celebrate_window, padding="20")
main_frame.pack(fill="both", expand=True)
# 庆祝标题
title_label = ttk.Label(
main_frame,
text="🎉 拷贝完成! 🎉",
font=("Arial", 28, "bold"),
bootstyle="success",
anchor="center"
)
title_label.pack(pady=(0, 20))
# 统计信息框架
stats_frame = ttk.Frame(main_frame)
stats_frame.pack(fill="both", expand=True, pady=10)
# 文件数量
files_label = ttk.Label(
stats_frame,
text=f"📁 文件数量: {total_files}",
font=("Arial", 12),
bootstyle="info"
)
files_label.pack(anchor="w", pady=5)
# 总大小
size_label = ttk.Label(
stats_frame,
text=f"💾 总大小: {self.copy_manager.format_size(total_size)}",
font=("Arial", 12),
bootstyle="info"
)
size_label.pack(anchor="w", pady=5)
# 平均速度
speed_label = ttk.Label(
stats_frame,
text=f"⚡ 平均速度: {avg_speed:.1f} MB/s",
font=("Arial", 12),
bootstyle="info"
)
speed_label.pack(anchor="w", pady=5)
# 总用时
time_label = ttk.Label(
stats_frame,
text=f"⏱️ 总用时: {self.format_time(total_time)}",
font=("Arial", 12),
bootstyle="info"
)
time_label.pack(anchor="w", pady=5)
# 验证状态
verify_label = ttk.Label(
stats_frame,
text=f"✅ 验证状态: {verify_status}",
font=("Arial", 12),
bootstyle="success"
)
verify_label.pack(anchor="w", pady=5)
if backup_results:
br_title = ttk.Label(
stats_frame,
text="📦 备用目的地结果:",
font=("Arial", 12, "bold"),
bootstyle="info"
)
br_title.pack(anchor="w", pady=(15, 5))
for item in backup_results:
dest = item.get("destination", "")
total = item.get("total", 0)
verified = item.get("verified", 0)
line = ttk.Label(
stats_frame,
text=f"{dest}:拷贝 {total},验证 {verified}/{total}",
font=("Arial", 11),
bootstyle="secondary"
)
line.pack(anchor="w", pady=2)
# 按钮框架
button_frame = ttk.Frame(main_frame)
button_frame.pack(fill="x", pady=(20, 0))
# 打开目标文件夹按钮
open_folder_btn = ttk.Button(
button_frame,
text="📂 打开目标文件夹",
bootstyle="info-outline",
command=lambda: self.open_destination_folder(celebrate_window)
)
open_folder_btn.pack(side="left", padx=(0, 10))
# 确定按钮
ok_btn = ttk.Button(
button_frame,
text="确定",
bootstyle="success",
command=celebrate_window.destroy
)
ok_btn.pack(side="right")
# 设置按钮焦点
ok_btn.focus()
# 绑定回车键关闭窗口
celebrate_window.bind("<Return>", lambda e: celebrate_window.destroy())
# 确保窗口在最前面
celebrate_window.lift()
celebrate_window.attributes('-topmost', True)
celebrate_window.after(100, lambda: celebrate_window.attributes('-topmost', False))
def open_destination_folder(self, parent_window):
"""打开目标文件夹"""
try:
if self.destination_path and os.path.exists(self.destination_path):
if os.name == 'nt': # Windows
os.startfile(self.destination_path)
elif os.name == 'posix': # macOS and Linux
subprocess.run(['open', self.destination_path])
parent_window.destroy() # 打开文件夹后关闭庆祝窗口
else:
messagebox.showwarning("警告", "目标文件夹不存在!")
except Exception as e:
messagebox.showerror("错误", f"无法打开目标文件夹: {str(e)}")
def clear_all(self):
"""清空所有选择"""
self.source_items.clear()
self.source_items_listbox.delete(0, tk.END)
self.destination_path = ""
self.dest_path_label.config(text="未选择目的地")
self.dest_info_label.config(text="")
self.log_text.delete(1.0, tk.END)
# 重置进度
self.copy_progress.config(value=0)
self.verify_progress.config(value=0)
self.copy_status_label.config(text="等待开始拷贝...")
self.verify_status_label.config(text="等待拷贝完成...")
self.copy_manager.total_files = 0
self.copy_manager.copied_files = 0
self.copy_manager.verified_files = 0
self.update_stats()
def log_message(self, message):
"""记录日志消息"""
timestamp = datetime.now().strftime("%H:%M:%S")
log_entry = f"[{timestamp}] {message}\n"
self.log_text.insert(tk.END, log_entry)
self.log_text.see(tk.END)
self.window.update()
# 同时写入日志文件
if self.copy_manager.log_file:
self.copy_manager.write_log(message)
class LogViewerWindow:
"""日志查看器窗口"""
def __init__(self):
"""初始化日志查看器 - 完全避免默认图标闪烁"""
# 确保重量级模块已导入
if 'tb' not in globals() or tb is None:
import_heavy_modules()
# 预加载图标
preload_icon()
self.icon_photo = None
# 创建窗口但先隐藏,避免显示默认图标
self.window = tb.Window(
title="日志查看器 - CardCopyer-拷贝乐",
themename="darkly",
size=(1200, 800),
resizable=(True, True)
)
# 立即隐藏窗口,防止显示默认图标
self.window.withdraw()
# 尝试立即设置图标(窗口隐藏状态下)
try:
icon_image = get_global_icon_image()
if icon_image and 'PIL_ImageTk' in globals():
self.icon_photo = PIL_ImageTk.PhotoImage(icon_image, master=self.window)
self.window.wm_iconphoto(True, self.icon_photo)
if hasattr(self.window, 'iconphoto'):
self.window.iconphoto(True, self.icon_photo)
print("日志查看器图标在隐藏状态下设置成功")
except Exception as e:
print(f"隐藏状态下设置图标失败: {e}")
self.log_dir = get_log_directory()
self.current_log_file = None
self.current_log_content = ""
# 设置UI窗口仍在隐藏状态
self.setup_ui()
self.load_log_files()
# 延迟显示窗口,确保图标已完全设置
self.window.after(100, self._show_window_with_icon)
def _show_window_with_icon(self):
"""显示窗口并确保图标正确设置"""
try:
# 尝试设置图标
if not self.icon_photo:
icon_image = get_global_icon_image()
if icon_image and 'PIL_ImageTk' in globals():
self.icon_photo = PIL_ImageTk.PhotoImage(icon_image, master=self.window)
self.window.wm_iconphoto(True, self.icon_photo)
if hasattr(self.window, 'iconphoto'):
self.window.iconphoto(True, self.icon_photo)
# 显示窗口
self.window.deiconify()
print("日志查看器窗口已显示")
# 启动图标监控定时器
if self.icon_photo:
self._start_icon_monitor()
except Exception as e:
print(f"显示窗口时设置图标失败: {e}")
# 即使图标设置失败,也要显示窗口
self.window.deiconify()
def _set_window_icon(self):
"""延迟设置日志查看器窗口图标,使用全局图标确保一致性"""
try:
# 使用全局图标管理器获取Image对象
icon_image = get_global_icon_image()
if icon_image and self.window:
# 为当前窗口创建专用的PhotoImage
if 'PIL_ImageTk' in globals():
self.icon_photo = PIL_ImageTk.PhotoImage(icon_image, master=self.window)
# 使用多种方法设置图标,确保兼容性
self.window.wm_iconphoto(True, self.icon_photo)
if hasattr(self.window, 'iconphoto'):
self.window.iconphoto(True, self.icon_photo)
# 启动图标监控定时器
self._start_icon_monitor()
elif not icon_image:
# 如果全局图标不可用,尝试本地创建
icon_path = get_icon_path()
if icon_path and 'PIL_Image' in globals() and 'PIL_ImageTk' in globals():
icon_image = PIL_Image.open(icon_path)
self.icon_photo = PIL_ImageTk.PhotoImage(icon_image, master=self.window)
if self.window and self.icon_photo:
self.window.wm_iconphoto(True, self.icon_photo)
if hasattr(self.window, 'iconphoto'):
self.window.iconphoto(True, self.icon_photo)
# 启动图标监控定时器
self._start_icon_monitor()
except Exception as e:
print(f"设置日志查看器图标失败: {e}")
pass
def setup_ui(self):
"""设置UI界面"""
# 主框架
main_frame = tb.Frame(self.window, padding=20)
main_frame.pack(fill="both", expand=True)
# 标题
title_label = tb.Label(
main_frame,
text="日志查看器",
font=("Arial", 20, "bold"),
bootstyle="primary"
)
title_label.pack(pady=(0, 20))
# 主要内容区域 - 左右分栏
content_frame = tb.Frame(main_frame)
content_frame.pack(fill="both", expand=True)
# 左侧 - 日志文件列表
left_frame = tb.Frame(content_frame)
left_frame.pack(side="left", fill="both", expand=True, padx=(0, 10))
tb.Label(left_frame, text="历史日志文件:", font=("Arial", 12, "bold")).pack(pady=(0, 10))
# 日志文件列表框架
list_frame = tb.Frame(left_frame)
list_frame.pack(fill="both", expand=True)
# 滚动条
list_scroll = tb.Scrollbar(list_frame)
list_scroll.pack(side="right", fill="y")
self.log_listbox = tk.Listbox(
list_frame,
yscrollcommand=list_scroll.set,
font=("Arial", 10),
bg="#1e1e1e",
fg="white",
selectbackground="#005a9e",
height=20
)
self.log_listbox.pack(side="left", fill="both", expand=True)
list_scroll.config(command=self.log_listbox.yview)
# 绑定选择事件
self.log_listbox.bind('<<ListboxSelect>>', self.on_log_selected)
# 右侧 - 日志内容显示
right_frame = tb.Frame(content_frame)
right_frame.pack(side="left", fill="both", expand=True)
tb.Label(right_frame, text="日志内容:", font=("Arial", 12, "bold")).pack(pady=(0, 10))
# 日志信息显示框架
info_frame = tb.Frame(right_frame)
info_frame.pack(fill="x", pady=(0, 10))
self.log_info_label = tb.Label(
info_frame,
text="请选择一个日志文件",
font=("Arial", 10),
bootstyle="secondary"
)
self.log_info_label.pack(side="left")
# 日志内容显示
content_frame = tb.Frame(right_frame)
content_frame.pack(fill="both", expand=True)
content_scroll = tb.Scrollbar(content_frame)
content_scroll.pack(side="right", fill="y")
self.log_content_text = tk.Text(
content_frame,
yscrollcommand=content_scroll.set,
font=("Courier", 9),
bg="#1e1e1e",
fg="white",
wrap=tk.WORD,
height=25,
width=60
)
self.log_content_text.pack(side="left", fill="both", expand=True)
content_scroll.config(command=self.log_content_text.yview)
# 底部按钮区域
bottom_frame = tb.Frame(main_frame)
bottom_frame.pack(fill="x", pady=(20, 0))
# 刷新按钮
refresh_btn = tb.Button(
bottom_frame,
text="刷新日志列表",
bootstyle="info-outline",
command=self.load_log_files
)
refresh_btn.pack(side="left", padx=(0, 10))
# 导出按钮
export_btn = tb.Button(
bottom_frame,
text="导出日志",
bootstyle="success-outline",
command=self.export_log
)
export_btn.pack(side="left", padx=(0, 10))
# 删除按钮
delete_btn = tb.Button(
bottom_frame,
text="删除日志",
bootstyle="danger-outline",
command=self.delete_log
)
delete_btn.pack(side="left")
# 关闭按钮
close_btn = tb.Button(
bottom_frame,
text="关闭",
bootstyle="secondary",
command=self.window.destroy
)
close_btn.pack(side="right")
def load_log_files(self):
"""加载日志文件列表"""
self.log_listbox.delete(0, tk.END)
try:
if os.path.exists(self.log_dir):
log_files = [f for f in os.listdir(self.log_dir) if f.endswith('.log')]
log_files.sort(reverse=True) # 最新的在前
for log_file in log_files:
# 获取文件信息
file_path = os.path.join(self.log_dir, log_file)
file_size = os.path.getsize(file_path)
file_time = os.path.getmtime(file_path)
file_date = datetime.fromtimestamp(file_time).strftime('%Y-%m-%d %H:%M')
# 显示格式:文件名 (大小, 日期)
size_str = self.format_size(file_size)
display_text = f"{log_file} ({size_str}, {file_date})"
self.log_listbox.insert(tk.END, display_text)
if log_files:
self.log_info_label.config(text=f"找到 {len(log_files)} 个日志文件")
else:
self.log_info_label.config(text="暂无日志文件")
else:
self.log_info_label.config(text="日志目录不存在")
except Exception as e:
self.log_info_label.config(text=f"加载日志文件失败: {str(e)}")
def format_size(self, size_bytes: int) -> str:
"""格式化文件大小显示"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
size = float(size_bytes)
while size >= 1024 and i < len(size_names) - 1:
size /= 1024
i += 1
return f"{size:.1f} {size_names[i]}"
def on_log_selected(self, event):
"""选择日志文件时的处理"""
selection = self.log_listbox.curselection()
if not selection:
return
try:
# 获取选中的日志文件名
selected_text = self.log_listbox.get(selection[0])
log_filename = selected_text.split(' (')[0] # 提取文件名
# 读取日志内容
log_path = os.path.join(self.log_dir, log_filename)
with open(log_path, 'r', encoding='utf-8') as f:
content = f.read()
self.current_log_file = log_filename
self.current_log_content = content
# 显示日志内容
self.log_content_text.delete(1.0, tk.END)
self.log_content_text.insert(1.0, content)
# 更新信息标签
file_size = os.path.getsize(log_path)
file_time = os.path.getmtime(log_path)
file_date = datetime.fromtimestamp(file_time).strftime('%Y-%m-%d %H:%M:%S')
info_text = f"文件名: {log_filename} | 大小: {self.format_size(file_size)} | 修改时间: {file_date}"
self.log_info_label.config(text=info_text)
except Exception as e:
self.log_info_label.config(text=f"读取日志文件失败: {str(e)}")
self.log_content_text.delete(1.0, tk.END)
self.log_content_text.insert(1.0, f"错误: 无法读取日志文件\n\n{str(e)}")
def export_log(self):
"""导出当前选中的日志"""
if not self.current_log_file or not self.current_log_content:
messagebox.showwarning("提示", "请先选择一个日志文件")
return
# 选择导出位置
export_path = filedialog.asksaveasfilename(
title="导出日志",
initialfile=self.current_log_file,
defaultextension=".log",
filetypes=[("日志文件", "*.log"), ("文本文件", "*.txt"), ("所有文件", "*.*")]
)
if export_path:
try:
with open(export_path, 'w', encoding='utf-8') as f:
f.write(self.current_log_content)
messagebox.showinfo("成功", f"日志已导出到:\n{export_path}")
except Exception as e:
messagebox.showerror("错误", f"导出日志失败:\n{str(e)}")
def delete_log(self):
"""删除当前选中的日志"""
if not self.current_log_file:
messagebox.showwarning("提示", "请先选择一个日志文件")
return
result = messagebox.askyesno(
"确认删除",
f"确定要删除日志文件:\n{self.current_log_file}?\n\n此操作不可恢复。"
)
if result:
try:
log_path = os.path.join(self.log_dir, self.current_log_file)
os.remove(log_path)
messagebox.showinfo("成功", "日志文件已删除")
# 清空显示
self.log_content_text.delete(1.0, tk.END)
self.current_log_file = None
self.current_log_content = ""
# 重新加载列表
self.load_log_files()
except Exception as e:
messagebox.showerror("错误", f"删除日志文件失败:\n{str(e)}")
def mainloop(self):
"""运行日志查看器主循环 - 增强图标一致性"""
try:
# 确保窗口图标已正确设置 - 使用全局图标确保一致性
global_icon = get_global_icon_photo()
if global_icon and self.window:
self.icon_photo = global_icon
# 延迟设置以避免初始化问题
self.window.after(100, lambda: self._apply_window_icon())
# 运行主循环
self.window.mainloop()
except Exception as e:
print(f"日志查看器主循环错误: {e}")
raise
def _apply_window_icon(self):
"""应用窗口图标 - 分离方法以确保可靠性"""
try:
if self.window and self.icon_photo:
self.window.wm_iconphoto(True, self.icon_photo)
if hasattr(self.window, 'iconphoto'):
self.window.iconphoto(True, self.icon_photo)
except Exception as e:
print(f"应用窗口图标失败: {e}")
def _start_icon_monitor(self):
"""启动图标监控定时器,防止图标被系统重置"""
def check_and_restore_icon():
try:
# 检查图标是否仍然有效
if self.window and self.icon_photo:
# 重新应用图标
self.window.wm_iconphoto(True, self.icon_photo)
if hasattr(self.window, 'iconphoto'):
self.window.iconphoto(True, self.icon_photo)
# 每5秒检查一次
self.window.after(5000, check_and_restore_icon)
except Exception:
pass
# 启动第一次检查
self.window.after(5000, check_and_restore_icon)
def show_startup_error(message, detail=""):
"""显示启动错误对话框"""
import tkinter as tk
from tkinter import messagebox
root = tk.Tk()
root.withdraw() # 隐藏主窗口
messagebox.showerror("启动错误", f"{message}\n\n{detail}")
root.destroy()
def main():
"""主函数 - 优化启动流程"""
startup_window = None
try:
# 预加载图标,确保在窗口创建前就可用了
startup_window = StartupWindow()
startup_window.update_progress("正在预加载图标...")
# 在后台线程中预加载图标
def preload_icon_in_background():
try:
preload_success = preload_icon()
if preload_success:
startup_window.update_progress("图标预加载成功...")
else:
startup_window.update_progress("图标预加载失败,将使用默认设置...")
except Exception as e:
print(f"图标预加载异常: {e}")
startup_window.update_progress("图标预加载异常,继续启动...")
# 立即开始图标预加载
threading.Thread(target=preload_icon_in_background, daemon=True).start()
# 快速依赖检查
startup_window.update_progress("正在检查依赖...")
if not quick_check_dependencies():
startup_window.close()
return
# 在后台线程中进行完整依赖检查
def check_deps_in_background():
success, error_msg = full_check_dependencies()
if not success:
startup_window.close()
show_startup_error("依赖检查失败", error_msg)
return
# 依赖检查通过后,启动主应用
startup_window.update_progress("正在加载界面...")
# 关闭启动窗口
if startup_window:
startup_window.close()
# 创建主应用
app = DITCopyTool()
app.window.mainloop()
# 延迟启动依赖检查,让图标预加载先完成
startup_window.root.after(800, check_deps_in_background)
# 运行启动窗口的主循环
startup_window.root.mainloop()
except KeyboardInterrupt:
if startup_window:
startup_window.close()
except Exception as e:
if startup_window:
startup_window.close()
show_startup_error("程序启动失败", str(e))
if __name__ == "__main__":
main()