commit dc2327a170a5d5d02569197541f271f9d233738a Author: linsan Date: Wed Feb 4 09:54:24 2026 +0800 Initial commit code only diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..183471e --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +__pycache__/ +hf_cache/ +models/ +temp_uploads/ +debug_raw_heatmap.png +data/ +*.pyc +.DS_Store diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..58d5fbd --- /dev/null +++ b/Dockerfile @@ -0,0 +1,48 @@ +# 基础镜像:CUDA 12.4 (匹配 RTX 50 系) +FROM nvidia/cuda:12.4.1-cudnn-devel-ubuntu22.04 + +# 环境变量 +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 +ENV DEBIAN_FRONTEND=noninteractive +ENV HF_HOME=/root/.cache/huggingface + +WORKDIR /app + +# 1. 系统依赖 +RUN apt-get update && apt-get install -y \ + python3.10 \ + python3-pip \ + git \ + wget \ + vim \ + libgl1 \ + libglib2.0-0 \ + && rm -rf /var/lib/apt/lists/* + +# 2. 设置 pip 清华源全局配置 (省去每次敲参数) +RUN pip3 config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple + +# 3. 🔥 核心:RTX 5060 专用 PyTorch (cu128) +# 必须显式指定 index-url 覆盖上面的清华源配置,因为 cu128 只有官方 nightly 有 +RUN pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu128 + +# 4. 安装 DiffSim/Diffusion 核心库 +# 包含 tqdm 用于显示进度条 +RUN pip install \ + "diffusers>=0.24.0" \ + "transformers>=4.35.0" \ + accelerate \ + scipy \ + safetensors \ + opencv-python \ + ftfy \ + regex \ + timm \ + einops \ + lpips \ + tqdm \ + matplotlib + +# 5. 默认指令:启动 bash,让你进入终端 +CMD ["/bin/bash"] diff --git a/app.py b/app.py new file mode 100644 index 0000000..c5bda56 --- /dev/null +++ b/app.py @@ -0,0 +1,192 @@ +import streamlit as st +import subprocess +import os +from PIL import Image + +# === 页面基础配置 === +st.set_page_config(layout="wide", page_title="多核违建检测平台 v3.0") + +st.title("🚁 多核无人机违建检测平台 v3.0") +st.caption("集成内核: DINOv2 Giant | DiffSim-Pro v2.1 | DreamSim Ensemble") + +# ========================================== +# 1. 侧边栏:算法核心选择 +# ========================================== +st.sidebar.header("🧠 算法内核 (Core)") + +algo_type = st.sidebar.radio( + "选择检测模式", + ( + "DreamSim(语义/感知) 🔥", + #"DINOv2 Giant (切片/精度)", + #"DiffSim (结构/抗视差)", + #"DINOv2 Giant (全图/感知)" + ), + index=0, # 默认选中 DINOv2 切片版,因为它是目前效果最好的 + help="DINOv2: 几何结构最敏感,抗光照干扰。\nDiffSim: 可调参数多,适合微调。\nDreamSim: 关注整体风格差异。" +) + +st.sidebar.markdown("---") +st.sidebar.header("🛠️ 参数配置") + +# 初始化默认参数变量 +script_name = "" +cmd_extra_args = [] +show_slice_params = True # 默认显示切片参数 + +# ========================================== +# 2. 根据选择配置参数 +# ========================================== + +if "DINOv2" in algo_type: + # === DINOv2 通用配置 === + st.sidebar.info(f"当前内核: {algo_type.split(' ')[0]}") + + # 阈值控制 (DINO 的差异值通常较小,默认给 0.3) + thresh = st.sidebar.slider("敏感度阈值 (Thresh)", 0.0, 1.0, 0.30, 0.01, help="值越小越敏感,值越大越只看显著变化") + + # 区分全图 vs 切片 + if "切片" in algo_type: + script_name = "main_dinov2_sliced.py" + show_slice_params = True + st.sidebar.caption("✅ 适用场景:4K/8K 大图,寻找细微违建。") + else: + script_name = "main_dinov2.py" # 对应你之前的 main_dinov2_giant.py (全图版) + show_slice_params = False # 全图模式不需要调切片 + st.sidebar.caption("⚡ 适用场景:快速扫描,显存充足,寻找大面积变化。") + +elif "DiffSim" in algo_type: + # === DiffSim 配置 === + script_name = "main_finally.py" + show_slice_params = True # DiffSim 需要切片 + + st.sidebar.subheader("1. 感知权重") + w_struct = st.sidebar.slider("结构权重 (Struct)", 0.0, 1.0, 0.3, 0.05) + w_sem = st.sidebar.slider("语义权重 (Sem)", 0.0, 1.0, 0.7, 0.05) + w_tex = st.sidebar.slider("纹理权重 (Texture)", 0.0, 1.0, 0.0, 0.05) + + st.sidebar.subheader("2. 信号处理") + kernel = st.sidebar.number_input("抗视差窗口 (Kernel)", value=5, step=2) + gamma = st.sidebar.slider("Gamma 压制", 0.5, 4.0, 1.0, 0.1) + thresh = st.sidebar.slider("可视化阈值", 0.0, 1.0, 0.15, 0.01) + + # 封装 DiffSim 独有的参数 + cmd_extra_args = [ + "--model", "Manojb/stable-diffusion-2-1-base", + "--w_struct", str(w_struct), + "--w_sem", str(w_sem), + "--w_tex", str(w_tex), + "--gamma", str(gamma), + "--kernel", str(kernel) + ] + +else: # DreamSim + # === DreamSim 配置 === + script_name = "main_dreamsim.py" + show_slice_params = True + + st.sidebar.subheader("阈值控制") + thresh = st.sidebar.slider("可视化阈值 (Thresh)", 0.0, 1.0, 0.3, 0.01) + +# ========================================== +# 3. 公共参数 (切片策略) +# ========================================== +if show_slice_params: + st.sidebar.subheader("🚀 扫描策略") + # DINOv2 切片版建议 Batch 小一点 + default_batch = 8 if "DINOv2" in algo_type else 16 + + crop_size = st.sidebar.number_input("切片大小 (Crop)", value=224) + step_size = st.sidebar.number_input("步长 (Step, 0=自动)", value=0) + batch_size = st.sidebar.number_input("批次大小 (Batch)", value=default_batch) +else: + # 全图模式,隐藏参数但保留变量防报错 + st.sidebar.success("全图模式:无需切片设置 (One-Shot)") + crop_size, step_size, batch_size = 224, 0, 1 + +# ========================================== +# 4. 主界面:图片上传与执行 +# ========================================== +col1, col2 = st.columns(2) +with col1: + file_t1 = st.file_uploader("上传基准图 (Base / Old)", type=["jpg","png","jpeg"], key="t1") + if file_t1: st.image(file_t1, use_column_width=True) +with col2: + file_t2 = st.file_uploader("上传现状图 (Current / New)", type=["jpg","png","jpeg"], key="t2") + if file_t2: st.image(file_t2, use_column_width=True) + +st.markdown("---") + +# 启动按钮 +if st.button("🚀 启动检测内核", type="primary", use_container_width=True): + if not file_t1 or not file_t2: + st.error("请先上传两张图片!") + else: + # 1. 保存临时文件 + os.makedirs("temp_uploads", exist_ok=True) + t1_path = os.path.join("temp_uploads", "t1.jpg") + t2_path = os.path.join("temp_uploads", "t2.jpg") + + # 结果文件名根据算法区分,防止缓存混淆 + result_name = f"result_{script_name.replace('.py', '')}.jpg" + out_path = os.path.join("temp_uploads", result_name) + + with open(t1_path, "wb") as f: f.write(file_t1.getbuffer()) + with open(t2_path, "wb") as f: f.write(file_t2.getbuffer()) + + # 2. 构建命令 + # 基础命令: python3 script.py t1 t2 out + cmd = ["python3", script_name, t1_path, t2_path, out_path] + + # 添加通用参数 (所有脚本都兼容 -c -s -b --thresh 格式) + # 注意:即使全图模式脚本忽略 -c -s,传进去也不会报错,保持逻辑简单 + cmd.extend([ + "--crop", str(crop_size), + "--step", str(step_size), + "--batch", str(batch_size), + "--thresh", str(thresh) + ]) + + # 添加特定算法参数 (DiffSim) + if cmd_extra_args: + cmd.extend(cmd_extra_args) + + # 3. 显示状态与运行 + st.info(f"⏳ 正在调用内核: `{script_name}` ...") + st.text(f"执行命令: {' '.join(cmd)}") # 方便调试 + + try: + # 实时显示进度条可能比较难,这里用 spinner + with st.spinner('AI 正在进行特征提取与比对... (DINO Giant 可能需要几秒钟)'): + result = subprocess.run(cmd, capture_output=True, text=True) + + # 4. 结果处理 + # 展开日志供查看 + with st.expander("📄 查看内核运行日志", expanded=(result.returncode != 0)): + if result.stdout: st.code(result.stdout, language="bash") + if result.stderr: st.error(result.stderr) + + if result.returncode == 0: + st.success(f"✅ 检测完成!耗时逻辑已结束。") + + # 结果展示区 + r_col1, r_col2 = st.columns(2) + with r_col1: + # 尝试读取调试热力图 (如果有的话) + if os.path.exists("debug_raw_heatmap.png"): + st.image("debug_raw_heatmap.png", caption="🔍 原始差异热力图 (Debug)", use_column_width=True) + else: + st.warning("无调试热力图生成") + + with r_col2: + if os.path.exists(out_path): + # 使用 PIL 打开以强制刷新缓存 (Streamlit 有时会缓存同名图片) + res_img = Image.open(out_path) + st.image(res_img, caption=f"🎯 最终检测结果 ({algo_type})", use_column_width=True) + else: + st.error(f"❌ 未找到输出文件: {out_path}") + else: + st.error("❌ 内核运行出错,请检查上方日志。") + + except Exception as e: + st.error(f"系统错误: {e}") diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..2652aa7 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,35 @@ +services: + diffsim-runner: + build: . + container_name: diffsim_runner + # runtime: nvidia + + # 🔥 新增:端口映射 + # 左边是宿主机端口,右边是容器端口 (Streamlit 默认 8501) + ports: + - "8601:8501" + + # 挂载目录:代码、数据、模型缓存 + volumes: + - .:/app # 当前目录代码映射到容器 /app + - ./data:/app/data # 图片数据映射 + - ./hf_cache:/root/.cache/huggingface # 模型缓存映射 + + environment: + - NVIDIA_VISIBLE_DEVICES=all + # 🔥 新增:设置国内 HF 镜像,确保每次启动容器都能由镜像站加速 + - HF_ENDPOINT=https://hf-mirror.com + + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: 1 + capabilities: [gpu] + + # 让容器启动后不退出,像一个虚拟机一样待命 + # 你可以随时 docker exec 进去,然后手动运行 streamlit run app.py + command: tail -f /dev/null + + restart: unless-stopped diff --git a/main.py b/main.py new file mode 100644 index 0000000..dbdd8b9 --- /dev/null +++ b/main.py @@ -0,0 +1,278 @@ +import os +# 🔥 强制设置 HF 镜像 (必须放在最前面) +os.environ["HF_ENDPOINT"] = "https://hf-mirror.com" + +import sys +import torch +import torch.nn as nn +import torch.nn.functional as F +import cv2 +import numpy as np +import argparse +from PIL import Image +from torchvision import transforms +from diffusers import StableDiffusionPipeline +from tqdm import tqdm + +# === 配置 === +# 使用 SD 1.5,无需鉴权,且对小切片纹理更敏感 +MODEL_ID = "runwayml/stable-diffusion-v1-5" +DEVICE = "cuda" if torch.cuda.is_available() else "cpu" +THRESHOLD = 0.35 +IMG_RESIZE = 224 + +# ========================================== +# 🔥 核心:DiffSim Pro 模型定义 (修复版) +# ========================================== +class DiffSimPro(nn.Module): + def __init__(self, device): + super().__init__() + print(f"🚀 [系统] 初始化 DiffSim Pro (基于 {MODEL_ID})...") + + if device == "cuda": + print(f"✅ [硬件确认] 正在使用显卡: {torch.cuda.get_device_name(0)}") + else: + print("❌ [警告] 未检测到显卡,正在使用 CPU 慢速运行!") + + # 1. 加载 SD 模型 + self.pipe = StableDiffusionPipeline.from_pretrained(MODEL_ID, torch_dtype=torch.float16).to(device) + self.pipe.set_progress_bar_config(disable=True) + + # 冻结参数 + self.pipe.vae.requires_grad_(False) + self.pipe.unet.requires_grad_(False) + self.pipe.text_encoder.requires_grad_(False) + + # 🔥【修复逻辑】:预先计算“空文本”的 Embedding + # UNet 必须要有这个 encoder_hidden_states 参数才能运行 + with torch.no_grad(): + prompt = "" + text_inputs = self.pipe.tokenizer( + prompt, + padding="max_length", + max_length=self.pipe.tokenizer.model_max_length, + truncation=True, + return_tensors="pt", + ) + text_input_ids = text_inputs.input_ids.to(device) + # 获取空文本特征 [1, 77, 768] + self.empty_text_embeds = self.pipe.text_encoder(text_input_ids)[0] + + # 2. 定义特征容器和 Hooks + self.features = {} + + # 注册 Hooks:抓取 纹理(1)、结构(2)、语义(3) + for name, layer in self.pipe.unet.named_modules(): + if "up_blocks.1" in name and name.endswith("resnets.2"): + layer.register_forward_hook(self.get_hook("feat_high")) + elif "up_blocks.2" in name and name.endswith("resnets.2"): + layer.register_forward_hook(self.get_hook("feat_mid")) + elif "up_blocks.3" in name and name.endswith("resnets.2"): + layer.register_forward_hook(self.get_hook("feat_low")) + + def get_hook(self, name): + def hook(model, input, output): + self.features[name] = output + return hook + + def extract_features(self, images): + """ VAE Encode -> UNet Forward -> Hook Features """ + # 1. VAE 编码 + latents = self.pipe.vae.encode(images).latent_dist.sample() * self.pipe.vae.config.scaling_factor + + # 2. 准备参数 + batch_size = latents.shape[0] + t = torch.zeros(batch_size, device=DEVICE, dtype=torch.long) + + # 🔥【修复逻辑】:将空文本 Embedding 扩展到当前 Batch 大小 + # 形状变为 [batch_size, 77, 768] + encoder_hidden_states = self.empty_text_embeds.expand(batch_size, -1, -1) + + # 3. UNet 前向传播 (带上 encoder_hidden_states) + self.pipe.unet(latents, t, encoder_hidden_states=encoder_hidden_states) + + return {k: v.clone() for k, v in self.features.items()} + + def robust_similarity(self, f1, f2, kernel_size=3): + """ 抗视差匹配算法 """ + f1 = F.normalize(f1, dim=1) + f2 = F.normalize(f2, dim=1) + + padding = kernel_size // 2 + b, c, h, w = f2.shape + + f2_unfolded = F.unfold(f2, kernel_size=kernel_size, padding=padding) + f2_unfolded = f2_unfolded.view(b, c, kernel_size*kernel_size, h, w) + + sim_map = (f1.unsqueeze(2) * f2_unfolded).sum(dim=1) + max_sim, _ = sim_map.max(dim=1) + + return max_sim + + def compute_batch_distance(self, batch_p1, batch_p2): + feat_a = self.extract_features(batch_p1) + feat_b = self.extract_features(batch_p2) + + total_score = 0 + # 权重:结构层(mid)最重要 + weights = {"feat_high": 0.2, "feat_mid": 0.5, "feat_low": 0.3} + + for name, w in weights.items(): + fa, fb = feat_a[name].float(), feat_b[name].float() + + if name == "feat_high": + sim_map = self.robust_similarity(fa, fb, kernel_size=3) + dist = 1 - sim_map.mean(dim=[1, 2]) + else: + dist = 1 - F.cosine_similarity(fa.flatten(1), fb.flatten(1)) + + total_score += dist * w + + return total_score + +# ========================================== +# 🛠️ 辅助函数 & 扫描逻辑 (保持不变) +# ========================================== + +def get_transforms(): + return transforms.Compose([ + transforms.Resize((IMG_RESIZE, IMG_RESIZE)), + transforms.ToTensor(), + transforms.Normalize([0.5], [0.5]) + ]) + +def scan_and_draw(model, t1_path, t2_path, output_path, patch_size, stride, batch_size): + # 1. OpenCV 读取 + img1_cv = cv2.imread(t1_path) + img2_cv = cv2.imread(t2_path) + + if img1_cv is None or img2_cv is None: + print("❌ 错误: 无法读取图片") + return + + # 强制 Resize 对齐 + h, w = img2_cv.shape[:2] + img1_cv = cv2.resize(img1_cv, (w, h)) + + preprocess = get_transforms() + + # 2. 准备滑动窗口 + print(f"🔪 [切片] 开始扫描... 尺寸: {w}x{h}") + print(f" - 切片大小: {patch_size}, 步长: {stride}, 批次: {batch_size}") + + patches1 = [] + patches2 = [] + coords = [] + + for y in range(0, h - patch_size + 1, stride): + for x in range(0, w - patch_size + 1, stride): + crop1 = img1_cv[y:y+patch_size, x:x+patch_size] + crop2 = img2_cv[y:y+patch_size, x:x+patch_size] + + p1 = preprocess(Image.fromarray(cv2.cvtColor(crop1, cv2.COLOR_BGR2RGB))) + p2 = preprocess(Image.fromarray(cv2.cvtColor(crop2, cv2.COLOR_BGR2RGB))) + + patches1.append(p1) + patches2.append(p2) + coords.append((x, y)) + + if not patches1: + print("⚠️ 图片太小,无法切片") + return + + total_patches = len(patches1) + print(f"🧠 [推理] 共 {total_patches} 个切片,开始 DiffSim Pro 计算...") + + all_distances = [] + + # 3. 批量推理 + for i in tqdm(range(0, total_patches, batch_size), unit="batch"): + batch_p1 = torch.stack(patches1[i : i + batch_size]).to(DEVICE, dtype=torch.float16) + batch_p2 = torch.stack(patches2[i : i + batch_size]).to(DEVICE, dtype=torch.float16) + + with torch.no_grad(): + dist_batch = model.compute_batch_distance(batch_p1, batch_p2) + all_distances.append(dist_batch.cpu()) + + distances = torch.cat(all_distances) + + # 4. 生成原始热力数据 + heatmap = np.zeros((h, w), dtype=np.float32) + count_map = np.zeros((h, w), dtype=np.float32) + max_score = 0 + + for idx, score in enumerate(distances): + val = score.item() + x, y = coords[idx] + if val > max_score: max_score = val + + heatmap[y:y+patch_size, x:x+patch_size] += val + count_map[y:y+patch_size, x:x+patch_size] += 1 + + count_map[count_map == 0] = 1 + heatmap_avg = heatmap / count_map + + # 5. 后处理 + norm_factor = max(max_score, 0.1) + heatmap_vis = (heatmap_avg / norm_factor * 255).clip(0, 255).astype(np.uint8) + heatmap_color = cv2.applyColorMap(heatmap_vis, cv2.COLORMAP_JET) + + alpha = 0.4 + beta = 1.0 - alpha + blended_img = cv2.addWeighted(img2_cv, alpha, heatmap_color, beta, 0) + + # 画框 + _, thresh = cv2.threshold(heatmap_vis, int(255 * THRESHOLD), 255, cv2.THRESH_BINARY) + contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + result_img = blended_img.copy() + found_issue = False + + for cnt in contours: + area = cv2.contourArea(cnt) + min_area = (patch_size * patch_size) * 0.05 + + if area > min_area: + found_issue = True + x, y, bw, bh = cv2.boundingRect(cnt) + + cv2.rectangle(result_img, (x, y), (x+bw, y+bh), (255, 255, 255), 4) + cv2.rectangle(result_img, (x, y), (x+bw, y+bh), (0, 0, 255), 2) + + roi_score = heatmap_avg[y:y+bh, x:x+bw].mean() + label = f"Diff: {roi_score:.2f}" + + cv2.rectangle(result_img, (x, y-25), (x+130, y), (0,0,255), -1) + cv2.putText(result_img, label, (x+5, y-7), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2) + + output_full_path = output_path + if not os.path.isabs(output_path) and not output_path.startswith("."): + output_full_path = os.path.join("/app/data", output_path) + os.makedirs(os.path.dirname(output_full_path) if os.path.dirname(output_full_path) else ".", exist_ok=True) + + cv2.imwrite(output_full_path, result_img) + + print("="*40) + print(f"🎯 扫描完成! 最大差异分: {max_score:.4f}") + if found_issue: + print(f"⚠️ 警告: 检测到潜在违建区域!") + print(f"🖼️ 热力图结果已保存至: {output_full_path}") + print("="*40) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="DiffSim Pro 违建检测 (抗视差版)") + parser.add_argument("t1", help="基准图路径") + parser.add_argument("t2", help="现状图路径") + parser.add_argument("out", nargs="?", default="heatmap_diffsim.jpg", help="输出文件名") + parser.add_argument("-c", "--crop", type=int, default=224, help="切片大小") + parser.add_argument("-s", "--step", type=int, default=0, help="滑动步长") + parser.add_argument("-b", "--batch", type=int, default=16, help="批次大小") + + args = parser.parse_args() + stride = args.step if args.step > 0 else args.crop // 2 + + # 初始化模型 + diffsim_model = DiffSimPro(DEVICE) + + print(f"📂 启动热力图扫描: {args.t1} vs {args.t2}") + scan_and_draw(diffsim_model, args.t1, args.t2, args.out, args.crop, stride, args.batch) diff --git a/main_dinov2.py b/main_dinov2.py new file mode 100644 index 0000000..b285874 --- /dev/null +++ b/main_dinov2.py @@ -0,0 +1,192 @@ +import sys +import os +import torch +import torch.nn.functional as F +import cv2 +import numpy as np +import argparse +from PIL import Image +from torchvision import transforms +from tqdm import tqdm + +# === 配置 === +# 使用 DINOv2 Giant 带寄存器版本 (修复背景伪影,最强版本) +# 既然你不在乎显存,我们直接上 1.1B 参数的模型 +MODEL_NAME = 'dinov2_vitg14_reg' +DEVICE = "cuda" if torch.cuda.is_available() else "cpu" + +def init_model(): + print(f"🚀 [系统] 初始化 DINOv2 ({MODEL_NAME})...") + if DEVICE == "cuda": + print(f"✅ [硬件确认] 正在使用显卡: {torch.cuda.get_device_name(0)}") + print(f" (显存状态: {torch.cuda.memory_allocated()/1024**2:.2f}MB 已用)") + else: + print("❌ [警告] 未检测到显卡,Giant 模型在 CPU 上会非常慢!") + + # 加载模型 + # force_reload=False 避免每次都下载 + #model = torch.hub.load('facebookresearch/dinov2', MODEL_NAME) + local_path = '/root/.cache/torch/hub/facebookresearch_dinov2_main' + + print(f"📂 [系统] 正在从本地缓存加载代码: {local_path}") + if os.path.exists(local_path): + model = torch.hub.load(local_path, MODEL_NAME, source='local') + else: + # 如果万一路径不对,再回退到在线加载(虽然大概率会失败) + print("⚠️ 本地缓存未找到,尝试在线加载...") + model = torch.hub.load('facebookresearch/dinov2', MODEL_NAME) + model.to(DEVICE) + model.eval() + + return model + +def preprocess_for_dino(img_cv): + """ + DINOv2 专用预处理: + 1. 尺寸必须是 14 的倍数 + 2. 标准 ImageNet 归一化 + """ + h, w = img_cv.shape[:2] + + # 向下取整到 14 的倍数 + new_h = (h // 14) * 14 + new_w = (w // 14) * 14 + + img_resized = cv2.resize(img_cv, (new_w, new_h)) + img_pil = Image.fromarray(cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB)) + + transform = transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), + ]) + + return transform(img_pil).unsqueeze(0).to(DEVICE), new_h, new_w + +def scan_and_draw(model, t1_path, t2_path, output_path, threshold): + # 1. OpenCV 读取 + img1_cv = cv2.imread(t1_path) + img2_cv = cv2.imread(t2_path) + + if img1_cv is None or img2_cv is None: + print("❌ 错误: 无法读取图片") + return + + # 强制 Resize 对齐 (以现状图 T2 为准,逻辑保持不变) + h_orig, w_orig = img2_cv.shape[:2] + img1_cv = cv2.resize(img1_cv, (w_orig, h_orig)) + + print(f"🔪 [处理] DINOv2 扫描... 原始尺寸: {w_orig}x{h_orig}") + + # 2. 预处理 (DINO 需要整图输入,不再需要 sliding window 切片循环) + # 但为了兼容 DINO 的 Patch 机制,我们需要微调尺寸为 14 的倍数 + t1_tensor, h_align, w_align = preprocess_for_dino(img1_cv) + t2_tensor, _, _ = preprocess_for_dino(img2_cv) + + print(f"🧠 [推理] Giant Model 计算中 (Patch网格: {h_align//14}x{w_align//14})...") + + with torch.no_grad(): + # DINOv2 前向传播 (提取 Patch Token) + # feat 形状: [1, N_patches, 1536] (Giant 的维度是 1536) + feat1 = model.forward_features(t1_tensor)["x_norm_patchtokens"] + feat2 = model.forward_features(t2_tensor)["x_norm_patchtokens"] + + # 计算余弦相似度 + similarity = F.cosine_similarity(feat1, feat2, dim=-1) # [1, N_patches] + + # 3. 生成热力图数据 + # reshape 回二维网格 + grid_h, grid_w = h_align // 14, w_align // 14 + sim_map = similarity.reshape(grid_h, grid_w).cpu().numpy() + + # 转换逻辑:相似度 -> 差异度 (Diff = 1 - Sim) + heatmap_raw = 1.0 - sim_map + + # 将 14x14 的小格子放大回原图尺寸,以便与原图叠加 + heatmap_avg = cv2.resize(heatmap_raw, (w_orig, h_orig), interpolation=cv2.INTER_CUBIC) + + # 统计信息 (逻辑保持不变) + min_v, max_v = heatmap_avg.min(), heatmap_avg.max() + print(f"\n📊 [统计] 差异分布: Min={min_v:.4f} | Max={max_v:.4f} | Mean={heatmap_avg.mean():.4f}") + + # ========================================== + # 🔥 关键:保存原始灰度图 (逻辑保持不变) + # ========================================== + raw_norm = (heatmap_avg - min_v) / (max_v - min_v + 1e-6) + cv2.imwrite("debug_raw_heatmap.png", (raw_norm * 255).astype(np.uint8)) + print(f"💾 [调试] 原始热力图已保存: debug_raw_heatmap.png") + + # ========================================== + # 5. 可视化后处理 (逻辑保持不变) + # ========================================== + + # 归一化 (DINO 的差异通常在 0~1 之间,这里做动态拉伸以增强显示) + # 如果差异非常小,max_v 可能很小,这里设置一个最小分母防止噪点放大 + norm_factor = max(max_v, 0.4) + heatmap_vis = (heatmap_avg / norm_factor * 255).clip(0, 255).astype(np.uint8) + + # 色彩映射 + heatmap_color = cv2.applyColorMap(heatmap_vis, cv2.COLORMAP_JET) + + # 图像叠加 + alpha = 0.4 + blended_img = cv2.addWeighted(img2_cv, alpha, heatmap_color, 1.0 - alpha, 0) + + # 阈值过滤与画框 (逻辑完全保持不变) + _, thresh_img = cv2.threshold(heatmap_vis, int(255 * threshold), 255, cv2.THRESH_BINARY) + contours, _ = cv2.findContours(thresh_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + result_img = blended_img.copy() + box_count = 0 + + # 既然用了 Giant,我们可以更精细地设定最小面积 + # 此处保持和你之前代码一致的逻辑,但 DINO 不需要 PatchSize 参数,我们用原图比例 + min_area = (w_orig * h_orig) * 0.005 # 0.5% 的面积 + + for cnt in contours: + area = cv2.contourArea(cnt) + + if area > min_area: + box_count += 1 + x, y, bw, bh = cv2.boundingRect(cnt) + + # 画框 (白色粗框 + 红色细框) + cv2.rectangle(result_img, (x, y), (x+bw, y+bh), (255, 255, 255), 4) + cv2.rectangle(result_img, (x, y), (x+bw, y+bh), (0, 0, 255), 2) + + # 显示分数 + # 计算该区域内的平均差异 + region_score = heatmap_avg[y:y+bh, x:x+bw].mean() + label = f"{region_score:.2f}" + + # 标签背景与文字 + cv2.rectangle(result_img, (x, y-25), (x+80, y), (0,0,255), -1) + cv2.putText(result_img, label, (x+5, y-7), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2) + + # 保存最终结果 + cv2.imwrite(output_path, result_img) + + print("="*40) + print(f"🎯 扫描完成! 发现区域: {box_count} 个") + print(f"🖼️ 结果已保存至: {output_path}") + print("="*40) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="DINOv2 Giant 违建热力图检测 (结构敏感版)") + parser.add_argument("t1", help="基准图") + parser.add_argument("t2", help="现状图") + parser.add_argument("out", nargs="?", default="dino_result.jpg", help="输出图片名") + + # 为了兼容你的习惯,保留了 crop/step 参数接口,虽然 DINO 不需要它们 + parser.add_argument("-c", "--crop", type=int, default=224, help="(已忽略) DINOv2 全图推理") + parser.add_argument("-s", "--step", type=int, default=0, help="(已忽略) DINOv2 全图推理") + parser.add_argument("-b", "--batch", type=int, default=16, help="(已忽略) DINOv2 全图推理") + + # 核心参数 + # DINO 的 Cosine 差异通常比 DreamSim 小,建议阈值给低一点 (如 0.25 - 0.35) + parser.add_argument("--thresh", type=float, default=0.30, help="检测阈值 (0.0-1.0)") + + args = parser.parse_args() + + # 初始化并运行 + model = init_model() + scan_and_draw(model, args.t1, args.t2, args.out, args.thresh) diff --git a/main_dinov2_sliced.py b/main_dinov2_sliced.py new file mode 100644 index 0000000..72ebe1e --- /dev/null +++ b/main_dinov2_sliced.py @@ -0,0 +1,165 @@ +import sys +import os +import torch +import torch.nn.functional as F +import cv2 +import numpy as np +import argparse +from PIL import Image +from torchvision import transforms +from tqdm import tqdm + +# === 配置 === +MODEL_NAME = 'dinov2_vitg14_reg' +DEVICE = "cuda" if torch.cuda.is_available() else "cpu" + +def init_model(): + print(f"🚀 [系统] 初始化 DINOv2 ({MODEL_NAME})...") + if DEVICE == "cuda": + print(f"✅ [硬件] 使用设备: {torch.cuda.get_device_name(0)}") + + # === 关键修正:强制使用本地缓存加载 === + local_path = '/root/.cache/torch/hub/facebookresearch_dinov2_main' + if os.path.exists(local_path): + print(f"📂 [加载] 命中本地缓存: {local_path}") + model = torch.hub.load(local_path, MODEL_NAME, source='local') + else: + print("⚠️ 未找到本地缓存,尝试在线加载...") + model = torch.hub.load('facebookresearch/dinov2', MODEL_NAME) + + model.to(DEVICE) + model.eval() + return model + +def get_transform(): + return transforms.Compose([ + transforms.Resize((224, 224)), + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), + ]) + +# === 修正:增加 threshold 参数 === +def scan_and_draw(model, t1_path, t2_path, output_path, patch_size, stride, batch_size, threshold): + img1_cv = cv2.imread(t1_path) + img2_cv = cv2.imread(t2_path) + + if img1_cv is None or img2_cv is None: + print("❌ 错误: 无法读取图片") + return + + # 强制对齐 + h, w = img2_cv.shape[:2] + img1_cv = cv2.resize(img1_cv, (w, h)) + + print(f"🔪 [切片] DINOv2 扫描... 尺寸: {w}x{h}") + print(f" - 参数: Crop={patch_size}, Step={stride}, Thresh={threshold}") + + # 准备切片 + patches1_pil = [] + patches2_pil = [] + coords = [] + + for y in range(0, h - patch_size + 1, stride): + for x in range(0, w - patch_size + 1, stride): + crop1 = img1_cv[y:y+patch_size, x:x+patch_size] + crop2 = img2_cv[y:y+patch_size, x:x+patch_size] + + p1 = Image.fromarray(cv2.cvtColor(crop1, cv2.COLOR_BGR2RGB)) + p2 = Image.fromarray(cv2.cvtColor(crop2, cv2.COLOR_BGR2RGB)) + + patches1_pil.append(p1) + patches2_pil.append(p2) + coords.append((x, y)) + + if not patches1_pil: + print("⚠️ 图片太小,无法切片") + return + + total_patches = len(patches1_pil) + print(f"🧠 [推理] 共 {total_patches} 个切片...") + + all_distances = [] + transform = get_transform() + + for i in tqdm(range(0, total_patches, batch_size), unit="batch"): + batch_p1_list = [transform(p) for p in patches1_pil[i : i + batch_size]] + batch_p2_list = [transform(p) for p in patches2_pil[i : i + batch_size]] + + if not batch_p1_list: break + + batch_p1 = torch.stack(batch_p1_list).to(DEVICE) + batch_p2 = torch.stack(batch_p2_list).to(DEVICE) + + with torch.no_grad(): + feat1 = model.forward_features(batch_p1)["x_norm_clstoken"] + feat2 = model.forward_features(batch_p2)["x_norm_clstoken"] + sim_batch = F.cosine_similarity(feat1, feat2, dim=-1) + dist_batch = 1.0 - sim_batch + all_distances.append(dist_batch.cpu()) + + distances = torch.cat(all_distances) + + # 重建热力图 + heatmap = np.zeros((h, w), dtype=np.float32) + count_map = np.zeros((h, w), dtype=np.float32) + max_score = distances.max().item() + + for idx, score in enumerate(distances): + val = score.item() + x, y = coords[idx] + heatmap[y:y+patch_size, x:x+patch_size] += val + count_map[y:y+patch_size, x:x+patch_size] += 1 + + count_map[count_map == 0] = 1 + heatmap_avg = heatmap / count_map + + # 可视化 + norm_denom = max(max_score, 0.4) + heatmap_vis = (heatmap_avg / norm_denom * 255).clip(0, 255).astype(np.uint8) + heatmap_color = cv2.applyColorMap(heatmap_vis, cv2.COLORMAP_JET) + blended_img = cv2.addWeighted(img2_cv, 0.4, heatmap_color, 0.6, 0) + + # === 使用传入的 threshold 参数 === + _, thresh_img = cv2.threshold(heatmap_vis, int(255 * threshold), 255, cv2.THRESH_BINARY) + contours, _ = cv2.findContours(thresh_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + result_img = blended_img.copy() + box_count = 0 + + for cnt in contours: + area = cv2.contourArea(cnt) + if area > (patch_size * patch_size) * 0.03: + box_count += 1 + x, y, bw, bh = cv2.boundingRect(cnt) + cv2.rectangle(result_img, (x, y), (x+bw, y+bh), (255, 255, 255), 4) + cv2.rectangle(result_img, (x, y), (x+bw, y+bh), (0, 0, 255), 2) + + score_val = heatmap_avg[y:y+bh, x:x+bw].mean() + cv2.rectangle(result_img, (x, y-25), (x+130, y), (0,0,255), -1) + cv2.putText(result_img, f"Diff: {score_val:.2f}", (x+5, y-7), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2) + + cv2.imwrite(output_path, result_img) + print("="*40) + print(f"🎯 检测完成! 最大差异: {max_score:.4f} | 发现区域: {box_count}") + print(f"🖼️ 结果: {output_path}") + print("="*40) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="DINOv2 Giant 切片版") + parser.add_argument("t1", help="基准图") + parser.add_argument("t2", help="现状图") + parser.add_argument("out", nargs="?", default="dino_sliced_result.jpg") + parser.add_argument("-c", "--crop", type=int, default=224, help="切片大小") + parser.add_argument("-s", "--step", type=int, default=0, help="步长") + parser.add_argument("-b", "--batch", type=int, default=8, help="批次") + + # === 修正:添加 --thresh 参数接口 === + parser.add_argument("--thresh", type=float, default=0.30, help="检测阈值") + + args = parser.parse_args() + stride = args.step if args.step > 0 else args.crop // 2 + + model = init_model() + # 传入 args.thresh + scan_and_draw(model, args.t1, args.t2, args.out, args.crop, stride, args.batch, args.thresh) diff --git a/main_dreamsim.py b/main_dreamsim.py new file mode 100644 index 0000000..ccc2d3b --- /dev/null +++ b/main_dreamsim.py @@ -0,0 +1,185 @@ +import sys +import os +import torch +import cv2 +import numpy as np +import argparse +from dreamsim import dreamsim +from PIL import Image +from tqdm import tqdm + +# === 配置 === +# DreamSim 官方推荐 ensemble 模式效果最好,虽然慢一点但更准 +MODEL_TYPE = "ensemble" +DEVICE = "cuda" if torch.cuda.is_available() else "cpu" + +def init_model(): + print(f"🚀 [系统] 初始化 DreamSim ({MODEL_TYPE})...") + if DEVICE == "cuda": + print(f"✅ [硬件确认] 正在使用显卡: {torch.cuda.get_device_name(0)}") + print(f" (显存状态: {torch.cuda.memory_allocated()/1024**2:.2f}MB 已用)") + else: + print("❌ [警告] 未检测到显卡,正在使用 CPU 慢速运行!") + + # 加载模型 + model, preprocess = dreamsim(pretrained=True, dreamsim_type=MODEL_TYPE, device=DEVICE) + model.to(DEVICE) + + return model, preprocess + +def scan_and_draw(model, preprocess, t1_path, t2_path, output_path, patch_size, stride, batch_size, threshold): + # 1. OpenCV 读取 + img1_cv = cv2.imread(t1_path) + img2_cv = cv2.imread(t2_path) + + if img1_cv is None or img2_cv is None: + print("❌ 错误: 无法读取图片") + return + + # 强制 Resize 对齐 (以现状图 T2 为准) + h, w = img2_cv.shape[:2] + img1_cv = cv2.resize(img1_cv, (w, h)) + + print(f"🔪 [切片] DreamSim 扫描... 尺寸: {w}x{h}") + print(f" - 参数: Crop={patch_size}, Step={stride}, Batch={batch_size}, Thresh={threshold}") + + # 2. 准备滑动窗口 + patches1 = [] + patches2 = [] + coords = [] + + for y in range(0, h - patch_size + 1, stride): + for x in range(0, w - patch_size + 1, stride): + crop1 = img1_cv[y:y+patch_size, x:x+patch_size] + crop2 = img2_cv[y:y+patch_size, x:x+patch_size] + + # DreamSim 预处理 + p1 = preprocess(Image.fromarray(cv2.cvtColor(crop1, cv2.COLOR_BGR2RGB))) + p2 = preprocess(Image.fromarray(cv2.cvtColor(crop2, cv2.COLOR_BGR2RGB))) + + # 修正维度: preprocess 可能返回 [1, 3, 224, 224],我们需要 [3, 224, 224] + if p1.ndim == 4: p1 = p1.squeeze(0) + if p2.ndim == 4: p2 = p2.squeeze(0) + + patches1.append(p1) + patches2.append(p2) + coords.append((x, y)) + + if not patches1: + print("⚠️ 图片太小,无法切片") + return + + total_patches = len(patches1) + print(f"🧠 [推理] 共 {total_patches} 个切片,开始计算...") + + all_distances = [] + + # 3. 批量推理 (使用 tqdm 显示进度) + for i in tqdm(range(0, total_patches, batch_size), unit="batch"): + batch_p1 = torch.stack(patches1[i : i + batch_size]).to(DEVICE) + batch_p2 = torch.stack(patches2[i : i + batch_size]).to(DEVICE) + + with torch.no_grad(): + # DreamSim 前向传播 + dist_batch = model(batch_p1, batch_p2) + all_distances.append(dist_batch.cpu()) + + distances = torch.cat(all_distances) + + # 4. 生成热力图数据 + heatmap = np.zeros((h, w), dtype=np.float32) + count_map = np.zeros((h, w), dtype=np.float32) + + # 统计信息 + min_v, max_v = distances.min().item(), distances.max().item() + print(f"\n📊 [统计] 分数分布: Min={min_v:.4f} | Max={max_v:.4f} | Mean={distances.mean().item():.4f}") + + for idx, score in enumerate(distances): + val = score.item() + x, y = coords[idx] + + heatmap[y:y+patch_size, x:x+patch_size] += val + count_map[y:y+patch_size, x:x+patch_size] += 1 + + # 平均化重叠区域 + count_map[count_map == 0] = 1 + heatmap_avg = heatmap / count_map + + # ========================================== + # 🔥 关键:保存原始灰度图 (供前端调试) + # ========================================== + raw_norm = (heatmap_avg - min_v) / (max_v - min_v + 1e-6) + cv2.imwrite("debug_raw_heatmap.png", (raw_norm * 255).astype(np.uint8)) + print(f"💾 [调试] 原始热力图已保存: debug_raw_heatmap.png") + + # ========================================== + # 5. 可视化后处理 + # ========================================== + + # 归一化 (使用 max_v 或固定因子) + norm_factor = max(max_v, 0.1) + heatmap_vis = (heatmap_avg / norm_factor * 255).clip(0, 255).astype(np.uint8) + + # 色彩映射 + heatmap_color = cv2.applyColorMap(heatmap_vis, cv2.COLORMAP_JET) + + # 图像叠加 + alpha = 0.4 + blended_img = cv2.addWeighted(img2_cv, alpha, heatmap_color, 1.0 - alpha, 0) + + # 阈值过滤与画框 + # 使用传入的 threshold 参数 + _, thresh_img = cv2.threshold(heatmap_vis, int(255 * threshold), 255, cv2.THRESH_BINARY) + contours, _ = cv2.findContours(thresh_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + result_img = blended_img.copy() + box_count = 0 + + for cnt in contours: + area = cv2.contourArea(cnt) + # 过滤过小的区域 (3% 的切片面积) + min_area = (patch_size * patch_size) * 0.03 + + if area > min_area: + box_count += 1 + x, y, bw, bh = cv2.boundingRect(cnt) + + # 画框 + cv2.rectangle(result_img, (x, y), (x+bw, y+bh), (255, 255, 255), 4) + cv2.rectangle(result_img, (x, y), (x+bw, y+bh), (0, 0, 255), 2) + + # 显示分数 + label = f"{heatmap_avg[y:y+bh, x:x+bw].mean():.2f}" + cv2.rectangle(result_img, (x, y-25), (x+80, y), (0,0,255), -1) + cv2.putText(result_img, label, (x+5, y-7), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2) + + # 保存最终结果 + cv2.imwrite(output_path, result_img) + + print("="*40) + print(f"🎯 扫描完成! 发现区域: {box_count} 个") + print(f"🖼️ 结果已保存至: {output_path}") + print("="*40) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="DreamSim 违建热力图检测 (标准化版)") + parser.add_argument("t1", help="基准图") + parser.add_argument("t2", help="现状图") + parser.add_argument("out", nargs="?", default="heatmap_result.jpg", help="输出图片名") + + # 扫描参数 + parser.add_argument("-c", "--crop", type=int, default=224, help="切片大小") + parser.add_argument("-s", "--step", type=int, default=0, help="步长") + parser.add_argument("-b", "--batch", type=int, default=16, help="批次") + + # 核心参数 + parser.add_argument("--thresh", type=float, default=0.30, help="检测阈值 (0.0-1.0)") + + args = parser.parse_args() + + # 自动计算步长 + stride = args.step if args.step > 0 else args.crop // 2 + + # 初始化并运行 + model, preprocess = init_model() + scan_and_draw(model, preprocess, args.t1, args.t2, args.out, args.crop, stride, args.batch, args.thresh) diff --git a/main_finally.py b/main_finally.py new file mode 100644 index 0000000..2a7d957 --- /dev/null +++ b/main_finally.py @@ -0,0 +1,293 @@ +import os +# 🚀 强制使用国内镜像 +os.environ["HF_ENDPOINT"] = "https://hf-mirror.com" + +import torch +import torch.nn as nn +import torch.nn.functional as F +import cv2 +import numpy as np +import argparse +from PIL import Image +from torchvision import transforms +from diffusers import StableDiffusionPipeline +from tqdm import tqdm + +# ========================================================================= +# PART 1: DiffSim 官方核心逻辑还原 +# 基于: https://github.com/showlab/DiffSim/blob/main/diffsim/models/diffsim.py +# ========================================================================= + +class DiffSim(nn.Module): + def __init__(self, model_id="Manojb/stable-diffusion-2-1-base", device="cuda"): + super().__init__() + self.device = device + print(f"🚀 [Core] Loading Official DiffSim Logic (Backbone: {model_id})...") + + # 1. 加载 SD 模型 + try: + self.pipe = StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=torch.float16).to(device) + except Exception as e: + print(f"❌ 模型加载失败,尝试加载默认 ID... Error: {e}") + self.pipe = StableDiffusionPipeline.from_pretrained("stabilityai/stable-diffusion-2-1-base", torch_dtype=torch.float16).to(device) + + self.pipe.set_progress_bar_config(disable=True) + + # 2. 冻结参数 (Freeze) + self.pipe.vae.requires_grad_(False) + self.pipe.unet.requires_grad_(False) + self.pipe.text_encoder.requires_grad_(False) + + # 3. 预计算空文本 Embedding (Unconditional Guidance) + with torch.no_grad(): + prompt = "" + text_input = self.pipe.tokenizer(prompt, padding="max_length", max_length=77, truncation=True, return_tensors="pt") + self.empty_embeds = self.pipe.text_encoder(text_input.input_ids.to(device))[0] + + self.features = {} + self._register_official_hooks() + + def _register_official_hooks(self): + """ + DiffSim 官方策略: 提取 up_blocks.1 (Semantic) 和 up_blocks.2 (Structure) + """ + self.target_layers = { + "up_blocks.1.resnets.1": "feat_semantic", # 语义层 + "up_blocks.2.resnets.1": "feat_structure" # 结构层 + } + + print(f"🔧 [Hook] Registered Layers: {list(self.target_layers.values())}") + + for name, layer in self.pipe.unet.named_modules(): + if name in self.target_layers: + alias = self.target_layers[name] + layer.register_forward_hook(self._get_hook(alias)) + + def _get_hook(self, name): + def hook(model, input, output): + self.features[name] = output + return hook + + def extract_features(self, images): + # VAE Encoding + latents = self.pipe.vae.encode(images).latent_dist.sample() * self.pipe.vae.config.scaling_factor + + # UNet Inference + t = torch.zeros(latents.shape[0], device=self.device, dtype=torch.long) + encoder_hidden_states = self.empty_embeds.expand(latents.shape[0], -1, -1) + + self.features = {} # Reset buffer + self.pipe.unet(latents, t, encoder_hidden_states=encoder_hidden_states) + + return {k: v.clone() for k, v in self.features.items()} + + def calculate_robust_similarity(self, feat_a, feat_b, kernel_size=3): + """ + 官方核心算法: Spatially Robust Similarity + 公式: S(p) = max_{q in Neighbor(p)} cos(F1(p), F2(q)) + """ + # Normalize vectors + feat_a = F.normalize(feat_a, dim=1) + feat_b = F.normalize(feat_b, dim=1) + + if kernel_size <= 1: + # 严格对齐 (Pixel-wise Cosine Similarity) + return (feat_a * feat_b).sum(dim=1) + + # 邻域搜索 (Sliding Window Matching) + b, c, h, w = feat_b.shape + padding = kernel_size // 2 + + # Unfold feature B to find neighbors + feat_b_unfolded = F.unfold(feat_b, kernel_size=kernel_size, padding=padding) + feat_b_unfolded = feat_b_unfolded.view(b, c, kernel_size*kernel_size, h, w) + + # Calculate cosine sim between A and all neighbors of B + # Shape: [B, K*K, H, W] + sim_map = (feat_a.unsqueeze(2) * feat_b_unfolded).sum(dim=1) + + # Take the best match (Max Pooling logic) + best_sim, _ = sim_map.max(dim=1) + + return best_sim + + def forward(self, batch_t1, batch_t2, w_struct, w_sem, kernel_size): + f1 = self.extract_features(batch_t1) + f2 = self.extract_features(batch_t2) + + total_dist = 0 + + # Semantic Distance + if w_sem > 0 and "feat_semantic" in f1: + sim = self.calculate_robust_similarity(f1["feat_semantic"], f2["feat_semantic"], kernel_size) + dist = 1.0 - sim + total_dist += dist.mean(dim=[1, 2]) * w_sem + + # Structure Distance + if w_struct > 0 and "feat_structure" in f1: + sim = self.calculate_robust_similarity(f1["feat_structure"], f2["feat_structure"], kernel_size) + dist = 1.0 - sim + total_dist += dist.mean(dim=[1, 2]) * w_struct + + return total_dist + +# ========================================================================= +# PART 2: 增强后处理逻辑 (Post-Processing) +# 这一部分不在 DiffSim 官方库中,是为了实际工程落地增加的去噪模块 +# ========================================================================= + +def engineering_post_process(heatmap_full, img_bg, args, patch_size): + h, w = heatmap_full.shape + + # 1. 动态范围归一化 + # 避免最大值过小(纯净背景)时,强制放大噪点 + local_max = heatmap_full.max() + safe_max = max(local_max, 0.25) # 设定一个基准置信度,低于此值不拉伸 + + heatmap_norm = (heatmap_full / safe_max * 255).clip(0, 255).astype(np.uint8) + + # 保存原始数据供调试 + cv2.imwrite("debug_raw_heatmap.png", heatmap_norm) + + # 2. 高斯滤波 (去散斑) + heatmap_blur = cv2.GaussianBlur(heatmap_norm, (5, 5), 0) + + # 3. 阈值截断 (Hard Thresholding) + _, binary = cv2.threshold(heatmap_blur, int(255 * args.thresh), 255, cv2.THRESH_BINARY) + + # 4. 形态学闭运算 (Merging) + # 将破碎的邻近区域融合为一个整体 + kernel_morph = cv2.getStructuringElement(cv2.MORPH_RECT, (7, 7)) + binary_closed = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel_morph) + + # 5. 可视化绘制 + heatmap_color = cv2.applyColorMap(heatmap_norm, cv2.COLORMAP_JET) + result_img = cv2.addWeighted(img_bg, 0.4, heatmap_color, 0.6, 0) + + contours, _ = cv2.findContours(binary_closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + box_count = 0 + # 面积过滤: 忽略小于切片面积 3% 的噪点 + min_area = (patch_size ** 2) * 0.03 + + for cnt in contours: + area = cv2.contourArea(cnt) + if area > min_area: + box_count += 1 + x, y, bw, bh = cv2.boundingRect(cnt) + + # 绘制:白边 + 红框 + cv2.rectangle(result_img, (x, y), (x+bw, y+bh), (255, 255, 255), 4) + cv2.rectangle(result_img, (x, y), (x+bw, y+bh), (0, 0, 255), 2) + + # 分数标签 + score_val = heatmap_full[y:y+bh, x:x+bw].mean() + label = f"{score_val:.2f}" + + # 标签背景 + cv2.rectangle(result_img, (x, y-22), (x+55, y), (0,0,255), -1) + cv2.putText(result_img, label, (x+5, y-6), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2) + + return result_img, box_count + +# ========================================================================= +# PART 3: 执行脚本 +# ========================================================================= + +def main(): + parser = argparse.ArgumentParser(description="DiffSim Local Implementation") + parser.add_argument("t1", help="Reference Image") + parser.add_argument("t2", help="Query Image") + parser.add_argument("out", default="result.jpg") + + # DiffSim 官方推荐参数 + parser.add_argument("--w_struct", type=float, default=0.4) + parser.add_argument("--w_sem", type=float, default=0.6) + parser.add_argument("--kernel", type=int, default=3, help="Robust Kernel Size (1, 3, 5)") + + # 工程化参数 + parser.add_argument("--gamma", type=float, default=1.0) + parser.add_argument("--thresh", type=float, default=0.3) + parser.add_argument("-c", "--crop", type=int, default=224) + parser.add_argument("-b", "--batch", type=int, default=16) + parser.add_argument("--model", default="Manojb/stable-diffusion-2-1-base") + + # 兼容性冗余参数 + parser.add_argument("--step", type=int, default=0) + parser.add_argument("--w_tex", type=float, default=0.0) + + args = parser.parse_args() + + # 1. Image IO + t1 = cv2.imread(args.t1) + t2 = cv2.imread(args.t2) + if t1 is None or t2 is None: + print("❌ Error reading images.") + return + + # Resize to match T2 + h, w = t2.shape[:2] + t1 = cv2.resize(t1, (w, h)) + + # 2. Preprocessing + transform = transforms.Compose([ + transforms.Resize((224, 224)), + transforms.ToTensor(), + transforms.Normalize([0.5], [0.5]) + ]) + + patches1, patches2, coords = [], [], [] + stride = args.crop // 2 # 50% Overlap + + print(f"🔪 Slicing images ({w}x{h}) with stride {stride}...") + for y in range(0, h - args.crop + 1, stride): + for x in range(0, w - args.crop + 1, stride): + c1 = t1[y:y+args.crop, x:x+args.crop] + c2 = t2[y:y+args.crop, x:x+args.crop] + + p1 = transform(Image.fromarray(cv2.cvtColor(c1, cv2.COLOR_BGR2RGB))) + p2 = transform(Image.fromarray(cv2.cvtColor(c2, cv2.COLOR_BGR2RGB))) + + patches1.append(p1); patches2.append(p2); coords.append((x, y)) + + if not patches1: return + + # 3. Model Inference + model = DiffSim(args.model) + scores = [] + + print(f"🧠 Running DiffSim Inference on {len(patches1)} patches...") + with torch.no_grad(): + for i in tqdm(range(0, len(patches1), args.batch)): + b1 = torch.stack(patches1[i:i+args.batch]).to("cuda", dtype=torch.float16) + b2 = torch.stack(patches2[i:i+args.batch]).to("cuda", dtype=torch.float16) + + batch_dist = model(b1, b2, args.w_struct, args.w_sem, args.kernel) + scores.append(batch_dist.cpu()) + + all_scores = torch.cat(scores).float().numpy() + + # 4. Reconstruct Heatmap + heatmap_full = np.zeros((h, w), dtype=np.float32) + count_map = np.zeros((h, w), dtype=np.float32) + 1e-6 + + # Apply Gamma *before* merging + if args.gamma != 1.0: + all_scores = np.power(all_scores, args.gamma) + + for idx, score in enumerate(all_scores): + x, y = coords[idx] + heatmap_full[y:y+args.crop, x:x+args.crop] += score + count_map[y:y+args.crop, x:x+args.crop] += 1 + + heatmap_avg = heatmap_full / count_map + + # 5. Post-Processing & Draw + print("🎨 Post-processing results...") + final_img, count = engineering_post_process(heatmap_avg, t2, args, args.crop) + + cv2.imwrite(args.out, final_img) + print(f"✅ Done! Found {count} regions. Saved to {args.out}") + +if __name__ == "__main__": + main() diff --git a/main_plus.py b/main_plus.py new file mode 100644 index 0000000..557fca9 --- /dev/null +++ b/main_plus.py @@ -0,0 +1,204 @@ +import os +# 🔥 强制设置 HF 镜像 +os.environ["HF_ENDPOINT"] = "https://hf-mirror.com" + +import sys +import torch +import torch.nn as nn +import torch.nn.functional as F +import cv2 +import numpy as np +import argparse +from PIL import Image +from torchvision import transforms +from diffusers import StableDiffusionPipeline +from tqdm import tqdm + +# === 配置 === +MODEL_ID = "Manojb/stable-diffusion-2-1-base" +DEVICE = "cuda" if torch.cuda.is_available() else "cpu" +THRESHOLD = 0.40 # ⬆️ 稍微调高阈值,进一步过滤误报 +IMG_RESIZE = 224 + +class DiffSimSemantic(nn.Module): + def __init__(self, device): + super().__init__() + print(f"🚀 [系统] 初始化 DiffSim (语义增强版)...") + + self.pipe = StableDiffusionPipeline.from_pretrained(MODEL_ID, torch_dtype=torch.float16).to(device) + self.pipe.set_progress_bar_config(disable=True) + + # 冻结参数 + self.pipe.vae.requires_grad_(False) + self.pipe.unet.requires_grad_(False) + self.pipe.text_encoder.requires_grad_(False) + + # 预计算空文本 Embedding + with torch.no_grad(): + prompt = "" + text_inputs = self.pipe.tokenizer( + prompt, + padding="max_length", + max_length=self.pipe.tokenizer.model_max_length, + truncation=True, + return_tensors="pt", + ) + text_input_ids = text_inputs.input_ids.to(device) + self.empty_text_embeds = self.pipe.text_encoder(text_input_ids)[0] + + self.features = {} + + # 🔥 修改 Hooks:只抓取深层特征,忽略浅层纹理 + # up_blocks.1 (纹理层) -> ❌ 移除,太敏感,容易误报 + # up_blocks.2 (结构层) -> ✅ 保留,判断形状 + # up_blocks.3 (语义层) -> ✅ 核心,判断物体类别 + for name, layer in self.pipe.unet.named_modules(): + # 我们不再 Hook up_blocks.1,因为它对光照和纹理太敏感 + if "up_blocks.2" in name and name.endswith("resnets.2"): + layer.register_forward_hook(self.get_hook("feat_structure")) + elif "up_blocks.3" in name and name.endswith("resnets.2"): + layer.register_forward_hook(self.get_hook("feat_semantic")) + + def get_hook(self, name): + def hook(model, input, output): + self.features[name] = output + return hook + + def extract_features(self, images): + latents = self.pipe.vae.encode(images).latent_dist.sample() * self.pipe.vae.config.scaling_factor + batch_size = latents.shape[0] + t = torch.zeros(batch_size, device=DEVICE, dtype=torch.long) + encoder_hidden_states = self.empty_text_embeds.expand(batch_size, -1, -1) + self.pipe.unet(latents, t, encoder_hidden_states=encoder_hidden_states) + return {k: v.clone() for k, v in self.features.items()} + + def robust_similarity(self, f1, f2, kernel_size=5): + """ + 抗视差匹配: + 🔥 将 kernel_size 默认值提升到 5 + 允许更大的几何错位(应对30m高度的视差) + """ + f1 = F.normalize(f1, dim=1) + f2 = F.normalize(f2, dim=1) + + padding = kernel_size // 2 + b, c, h, w = f2.shape + + f2_unfolded = F.unfold(f2, kernel_size=kernel_size, padding=padding) + f2_unfolded = f2_unfolded.view(b, c, kernel_size*kernel_size, h, w) + + sim_map = (f1.unsqueeze(2) * f2_unfolded).sum(dim=1) + max_sim, _ = sim_map.max(dim=1) + + return max_sim + + def compute_batch_distance(self, batch_p1, batch_p2): + feat_a = self.extract_features(batch_p1) + feat_b = self.extract_features(batch_p2) + + total_score = 0 + + # 🔥 调整后的权重策略:纯粹关注结构和语义 + # 0.0 -> 纹理 (彻底忽略颜色深浅、阴影) + # 0.4 -> 结构 (feat_structure): 关注形状变化 + # 0.6 -> 语义 (feat_semantic): 关注物体存在性 (最像 DreamSim 的部分) + weights = {"feat_structure": 0.4, "feat_semantic": 0.6} + + for name, w in weights.items(): + fa, fb = feat_a[name].float(), feat_b[name].float() + + # 对所有层都启用抗视差匹配,增加鲁棒性 + # kernel_size=5 能容忍更大的像素位移 + sim_map = self.robust_similarity(fa, fb, kernel_size=5) + dist = 1 - sim_map.mean(dim=[1, 2]) + + total_score += dist * w + + return total_score + +# ========================================== +# 辅助函数 (保持不变) +# ========================================== +def get_transforms(): + return transforms.Compose([ + transforms.Resize((IMG_RESIZE, IMG_RESIZE)), + transforms.ToTensor(), + transforms.Normalize([0.5], [0.5]) + ]) + +def scan_and_draw(model, t1_path, t2_path, output_path, patch_size, stride, batch_size): + img1_cv = cv2.imread(t1_path) + img2_cv = cv2.imread(t2_path) + if img1_cv is None or img2_cv is None: return + + h, w = img2_cv.shape[:2] + img1_cv = cv2.resize(img1_cv, (w, h)) + preprocess = get_transforms() + + print(f"🔪 [切片] 开始扫描... 尺寸: {w}x{h}, 忽略纹理细节,专注语义差异") + patches1, patches2, coords = [], [], [] + + for y in range(0, h - patch_size + 1, stride): + for x in range(0, w - patch_size + 1, stride): + crop1 = img1_cv[y:y+patch_size, x:x+patch_size] + crop2 = img2_cv[y:y+patch_size, x:x+patch_size] + p1 = preprocess(Image.fromarray(cv2.cvtColor(crop1, cv2.COLOR_BGR2RGB))) + p2 = preprocess(Image.fromarray(cv2.cvtColor(crop2, cv2.COLOR_BGR2RGB))) + patches1.append(p1); patches2.append(p2); coords.append((x, y)) + + if not patches1: return + + all_distances = [] + for i in tqdm(range(0, len(patches1), batch_size), unit="batch"): + b1 = torch.stack(patches1[i:i+batch_size]).to(DEVICE, dtype=torch.float16) + b2 = torch.stack(patches2[i:i+batch_size]).to(DEVICE, dtype=torch.float16) + with torch.no_grad(): + all_distances.append(model.compute_batch_distance(b1, b2).cpu()) + + distances = torch.cat(all_distances) + + heatmap = np.zeros((h, w), dtype=np.float32) + count_map = np.zeros((h, w), dtype=np.float32) + max_score = 0 + for idx, score in enumerate(distances): + val = score.item() + x, y = coords[idx] + if val > max_score: max_score = val + heatmap[y:y+patch_size, x:x+patch_size] += val + count_map[y:y+patch_size, x:x+patch_size] += 1 + + count_map[count_map == 0] = 1 + heatmap_avg = heatmap / count_map + + # 可视化 + norm_factor = max(max_score, 0.1) + heatmap_vis = (heatmap_avg / norm_factor * 255).clip(0, 255).astype(np.uint8) + heatmap_color = cv2.applyColorMap(heatmap_vis, cv2.COLORMAP_JET) + blended_img = cv2.addWeighted(img2_cv, 0.4, heatmap_color, 0.6, 0) + + _, thresh = cv2.threshold(heatmap_vis, int(255 * THRESHOLD), 255, cv2.THRESH_BINARY) + contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + result_img = blended_img.copy() + for cnt in contours: + if cv2.contourArea(cnt) > (patch_size**2)*0.05: + x, y, bw, bh = cv2.boundingRect(cnt) + cv2.rectangle(result_img, (x, y), (x+bw, y+bh), (255, 255, 255), 4) + cv2.rectangle(result_img, (x, y), (x+bw, y+bh), (0, 0, 255), 2) + label = f"Diff: {heatmap_avg[y:y+bh, x:x+bw].mean():.2f}" + cv2.rectangle(result_img, (x, y-25), (x+130, y), (0,0,255), -1) + cv2.putText(result_img, label, (x+5, y-7), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2) + + output_full_path = output_path if os.path.isabs(output_path) else os.path.join("/app/data", output_path) + os.makedirs(os.path.dirname(output_full_path), exist_ok=True) + cv2.imwrite(output_full_path, result_img) + print(f"🎯 完成! 最大差异分: {max_score:.4f}, 结果已保存: {output_full_path}") + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("t1"); parser.add_argument("t2"); parser.add_argument("out", nargs="?", default="result.jpg") + parser.add_argument("-c", "--crop", type=int, default=224) + parser.add_argument("-s", "--step", type=int, default=0) + parser.add_argument("-b", "--batch", type=int, default=16) + args = parser.parse_args() + scan_and_draw(DiffSimSemantic(DEVICE), args.t1, args.t2, args.out, args.crop, args.step if args.step>0 else args.crop//2, args.batch)