refactor(p1-01f): JSON 持久化抽到 services/json_storage.py
All checks were successful
CD Pipeline / deploy (push) Successful in 1m9s
All checks were successful
CD Pipeline / deploy (push) Successful in 1m9s
- load_categories / save_categories / load_scheduler_stats 三個函數搬出 - CATEGORIES_JSON_PATH / SCHEDULER_STATS_PATH 常數同步搬移 - app.py 改 import 維持原呼叫路徑 行數變化: app.py 7,070 → 7,053 (-17)
This commit is contained in:
29
app.py
29
app.py
@@ -407,29 +407,12 @@ sys_log.info("[Template] ✅ 全域模板變數已注入 (metabase_url, grist_ur
|
||||
# ================= 🛠️ V9.72: 分類設定管理核心 =================
|
||||
CATEGORIES_JSON_PATH = os.path.join(BASE_DIR, 'data', 'categories.json')
|
||||
|
||||
def load_categories():
|
||||
"""從 JSON 檔案載入分類列表"""
|
||||
try:
|
||||
with open(CATEGORIES_JSON_PATH, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
return []
|
||||
|
||||
def save_categories(categories):
|
||||
"""將分類列表儲存到 JSON 檔案"""
|
||||
with open(CATEGORIES_JSON_PATH, 'w', encoding='utf-8') as f:
|
||||
json.dump(categories, f, ensure_ascii=False, indent=4)
|
||||
|
||||
def load_scheduler_stats():
|
||||
"""讀取排程統計資料"""
|
||||
stats_path = os.path.join(BASE_DIR, 'data', 'scheduler_stats.json')
|
||||
if os.path.exists(stats_path):
|
||||
try:
|
||||
with open(stats_path, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
except (IOError, json.JSONDecodeError):
|
||||
return {}
|
||||
return {}
|
||||
# JSON 持久化:實作搬至 services/json_storage.py
|
||||
from services.json_storage import ( # noqa: E402, F401
|
||||
load_categories,
|
||||
save_categories,
|
||||
load_scheduler_stats,
|
||||
)
|
||||
|
||||
# ================= 🛠️ 數據處理核心 (封裝) =================
|
||||
|
||||
|
||||
37
services/json_storage.py
Normal file
37
services/json_storage.py
Normal file
@@ -0,0 +1,37 @@
|
||||
"""JSON 檔案儲存:分類設定、排程統計等小型持久化資料。
|
||||
|
||||
從 app.py 抽出。每個函數對應一個 JSON 檔案。
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
|
||||
from config import BASE_DIR
|
||||
|
||||
CATEGORIES_JSON_PATH = os.path.join(BASE_DIR, 'data', 'categories.json')
|
||||
SCHEDULER_STATS_PATH = os.path.join(BASE_DIR, 'data', 'scheduler_stats.json')
|
||||
|
||||
|
||||
def load_categories():
|
||||
"""從 JSON 檔案載入分類列表(檔案不存在或損毀回傳 [])。"""
|
||||
try:
|
||||
with open(CATEGORIES_JSON_PATH, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
return []
|
||||
|
||||
|
||||
def save_categories(categories):
|
||||
"""將分類列表儲存到 JSON 檔案。"""
|
||||
with open(CATEGORIES_JSON_PATH, 'w', encoding='utf-8') as f:
|
||||
json.dump(categories, f, ensure_ascii=False, indent=4)
|
||||
|
||||
|
||||
def load_scheduler_stats():
|
||||
"""讀取排程統計資料(檔案不存在或損毀回傳 {})。"""
|
||||
if os.path.exists(SCHEDULER_STATS_PATH):
|
||||
try:
|
||||
with open(SCHEDULER_STATS_PATH, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
except (IOError, json.JSONDecodeError):
|
||||
return {}
|
||||
return {}
|
||||
Reference in New Issue
Block a user