1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
| import os import re import time import json import hashlib import threading import openai from datetime import datetime import concurrent.futures from openai import OpenAI
API_KEY = os.environ.get("DASHSCOPE_API_KEY", "sk-") BASE_URL = "https://" MODEL_NAME = "" POSTS_DIR = r"source/_posts" CACHE_FILE = "process_cache.json"
MAX_WORKERS = 7 MAX_RETRIES = 3
client = OpenAI(api_key=API_KEY, base_url=BASE_URL) YAML_PATTERN = re.compile(r'^---.*?\r?\n(.*?)\r?\n---.*?\r?\n', re.DOTALL)
print_lock = threading.Lock() cache_lock = threading.Lock()
def safe_print(message): with print_lock: print(message)
def get_content_md5(content): return hashlib.md5(content.encode('utf-8')).hexdigest()
def load_cache(): if os.path.exists(CACHE_FILE): try: with open(CACHE_FILE, 'r', encoding='utf-8') as f: return json.load(f) except Exception: return {} return {}
def save_cache(cache_data): with open(CACHE_FILE, 'w', encoding='utf-8') as f: json.dump(cache_data, f, indent=4, ensure_ascii=False)
def get_ai_metadata_with_retry(filename, content_snippet): clean_name = os.path.splitext(filename)[0] prompt = f""" 请根据以下 Markdown 笔记的【文件名】和【正文前几行】,推断最合适的 1 个大分类(categories)和 1-3 个标签(tags)。
文件名: {clean_name} 正文片段: {content_snippet} 请以纯 JSON 格式输出,包含 "category" (字符串) 和 "tags" (字符串数组) 两个字段。 """
for attempt in range(MAX_RETRIES): try: response = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": "你是一个严谨的博客分类助手。只能输出合法的 JSON 格式。"}, {"role": "user", "content": prompt} ], temperature=0.1, response_format={ "type": "json_object" }, timeout=15 )
data = json.loads(response.choices[0].message.content.strip()) category = data.get('category', '未分类') tags = data.get('tags', ['待整理'])
yaml_lines = ["categories:", f" - {category}", "tags:"] for tag in tags: yaml_lines.append(f" - {tag}") return "\n".join(yaml_lines)
except openai.RateLimitError as e: wait_time = (2 ** attempt) + 2 safe_print(f" [{filename}] 触发限流 (429),等待 {wait_time} 秒后重试...") time.sleep(wait_time)
except openai.AuthenticationError as e: safe_print(f" [{filename}] 致命错误(API Key失效),放弃重试。") break
except openai.BadRequestError as e: safe_print(f" [{filename}] 请求格式错误,放弃重试: {e}") break
except (openai.APIConnectionError, openai.InternalServerError) as e: wait_time = 2 ** attempt safe_print(f" [{filename}] 网络/服务端异常,等待 {wait_time} 秒: {e}") time.sleep(wait_time)
except Exception as e: wait_time = 2 ** attempt safe_print(f" [{filename}] 未知异常,等待 {wait_time} 秒: {e}") time.sleep(wait_time)
return "categories:\n - 未分类\ntags:\n - 待整理"
def process_single_file(filename, global_cache): filepath = os.path.join(POSTS_DIR, filename)
try: with open(filepath, 'r', encoding='utf-8') as f: full_content = f.read()
match = YAML_PATTERN.match(full_content) if match: old_yaml = match.group(1) raw_body = full_content[match.end():].strip() date_match = re.search(r'^date:\s*(.+)$', old_yaml, re.MULTILINE) date_str = date_match.group(1).strip() if date_match else None else: raw_body = full_content.strip() date_str = None
current_md5 = get_content_md5(raw_body)
file_cache = global_cache.get(filename, {}) if isinstance(file_cache, str): file_cache = {"md5": file_cache, "yaml": None}
cached_md5 = file_cache.get("md5") cached_yaml = file_cache.get("yaml")
if cached_md5 == current_md5 and match: return "skipped"
if not date_str: ctime = os.path.getctime(filepath) date_str = datetime.fromtimestamp(ctime).strftime('%Y-%m-%d %H:%M:%S')
if cached_yaml: ai_metadata = cached_yaml status_msg = f" [{filename}] 检测到微小修改,已从缓存恢复文件头 (免Token)" result_code = "restored" else: content_snippet = raw_body[:500].replace('\n', ' ') ai_metadata = get_ai_metadata_with_retry(filename, content_snippet) status_msg = f" [{filename}] AI 处理成功并生成全新头部" result_code = "success_ai"
title = os.path.splitext(filename)[0] new_front_matter = f"---\ntitle: {title}\ndate: {date_str}\n{ai_metadata}\n---\n\n"
temp_filepath = filepath + ".tmp" with open(temp_filepath, 'w', encoding='utf-8') as f: f.write(new_front_matter + raw_body) os.replace(temp_filepath, filepath)
with cache_lock: global_cache[filename] = { "md5": current_md5, "yaml": ai_metadata }
safe_print(status_msg) return result_code
except Exception as e: safe_print(f" [{filename}] 错误: {e}") return "error"
def main(): start_time = time.time() safe_print("=== 2. 开始检查文件头与内容更新 ===") global_cache = load_cache()
if not os.path.exists(POSTS_DIR): safe_print(f"错误:找不到目录 {POSTS_DIR},请检查路径。") return
files = [f for f in os.listdir(POSTS_DIR) if f.endswith(".md")] safe_print(f" 扫描到 {len(files)} 篇笔记,缓存库中已记录 {len(global_cache)} 条数据。")
stats = {"success_ai": 0, "restored": 0, "error": 0, "skipped": 0}
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: future_to_file = {executor.submit(process_single_file, f, global_cache): f for f in files}
for future in concurrent.futures.as_completed(future_to_file): result = future.result() stats[result] += 1
save_cache(global_cache)
end_time = time.time() safe_print("\n" + "="*40) safe_print(f" 全部处理完成!耗时: {round(end_time - start_time, 2)} 秒") safe_print(f" 统计: AI新生成 {stats['success_ai']} 篇, 缓存恢复 {stats['restored']} 篇, 跳过 {stats['skipped']} 篇, 失败 {stats['error']} 篇")
if __name__ == "__main__": main()
|