v1.0.0: writing style skill template with auto-learning

This commit is contained in:
jzocb
2026-03-24 17:52:47 -04:00
commit d94826e1cd
4 changed files with 1012 additions and 0 deletions

71
README.md Normal file
View File

@@ -0,0 +1,71 @@
# Writing Style Skill
可复用的写作风格 Skill 模板。**内置自动学习** — 从你的修改中自动提取规则SKILL.md 越用越准。
兼容 **Claude Code** + **OpenClaw (ClawHub)**
## 原理
```
AI 用 SKILL.md 写初稿 → 你改到满意 → diff 两版 → 提取规则 → 更新 SKILL.md → 下次更准
```
只要两个数据点:**original**AI 第一版)和 **final**(你最终版)。中间改了多少轮不管。
## 安装
```bash
# Claude Code
git clone https://github.com/jzOcb/writing-style-skill.git
cp -r writing-style-skill ~/.claude/skills/my-writing-style
# OpenClaw / ClawHub
npx clawhub@latest install jz-writing-style-skill
```
## 快速开始
1. **改 SKILL.md** — 把模板里的风格规则改成你自己的(或留空,让自动学习帮你填)
2. **让 AI 用这个 skill 写内容**
3. **你改到满意**
4. 记录:
```bash
python3 scripts/observe.py record-original draft.md
# ... 你修改 ...
python3 scripts/observe.py record-final final.md
```
5. 提取规则:
```bash
python3 scripts/improve.py auto --skill .
```
## 文件结构
```
writing-style-skill/
├── SKILL.md # 你的写作风格(模板,改成你的)
├── README.md # 本文件
└── scripts/
├── observe.py # 记录 original / final零依赖
└── improve.py # 提取 / 应用 / 回滚(需要 LLM CLI
```
## 自动学习怎么工作
- `observe.py` 记录 AI 原稿和你的最终版
- `improve.py` 用 LLM 分析 diff提取写作规则
- 规则按置信度分 P0/P1/P2P0 自动应用
- 每次更新前自动备份,一键回滚
## LLM 支持
`improve.py` 自动检测:
- `claude` (Claude Code) — 优先
- `llm` (pip install llm) — 通用
- `IMPROVE_LLM_CMD` 环境变量 — 自定义
`observe.py` 零依赖纯 Python。
## License
MIT

206
SKILL.md Normal file
View File

@@ -0,0 +1,206 @@
---
name: writing-style-skill
version: 1.0.0
description: |
可复用的写作风格 Skill 模板。内置自动学习:
从你的修改中自动提取规则SKILL.md 越用越准。
Fork 后改成你自己的风格。
dependencies: []
allowed-tools:
- Read
- Write
- Edit
- exec
---
# Writing Style Skill模板
**Fork 这个 skill改成你的写作风格。内置自动学习越用越准。**
---
## 🎯 怎么用
1. Fork / clone 这个 skill
2. 把下面的风格规则改成你自己的
3. 让 AI 用这个 skill 写内容
4. 你改到满意 → 脚本自动学习你改了什么
5. 下次 AI 写出来的就更像你
---
## 【0】Voice Dimensions量化你的风格
**用 1-10 分定义你的风格维度。AI 比"写得自然一点"这种话更容易理解数字。**
| Dimension | Score | 你的说明 |
|-----------|-------|---------|
| **formal_casual** | **?/10** | 偏正式还是偏随意? |
| **technical_accessible** | **?/10** | 技术深度? |
| **serious_playful** | **?/10** | 严肃还是活泼? |
| **concise_elaborate** | **?/10** | 简洁还是详细? |
| **reserved_expressive** | **?/10** | 保守还是直接? |
> 💡 **不知道填什么?** 先跑几次 AI 写作 → 你改 → 自动学习会帮你填。
---
## 【1】角色与读者
**我是谁:**
- (填你的身份,比如:独立开发者,新晋 AI 爱好者)
**读者是谁:**
- (填你的读者,比如:对 AI 感兴趣的技术人)
**和读者的关系:**
- (比如:同行交流,不是教学)
---
## 【2】写作规则
### 基础规则
- (填你的规则,比如:不用"深入探讨"、段落要短、要有具体数字)
-
-
### 禁止词
- AI 爱用但你不喜欢的词,比如:值得注意的是、综上所述、本文将介绍)
-
-
### 句式偏好
- (你喜欢的句式,比如:结论前置、不要学术导语)
-
-
> 💡 **不需要一开始就写完。** 这些规则会通过你的修改自动积累。
> 跑完 10 次写作→修改循环后,这里会长出几十条精准规则。
---
## 【3】格式规范
### 平台适配
| 平台 | 格式要求 |
|------|---------|
| X/Twitter | 不渲染 markdown用纯文本 |
| 小红书 | emoji 多、分段短 |
| 博客 | 标准 markdown |
| (你的平台) | (你的要求)|
---
## 🔄 自动学习(内置)
**这个 skill 会从你的修改中自动学习。不需要手动写规则。**
### 工作原理
```
AI 用这个 skill 写初稿
你改到满意
脚本 diff 两版 → 提取你改了什么
新规则自动写入这个 SKILL.md
下次 AI 写出来就更像你
```
### 只需要两个数据点
- **original**: AI 生成的第一版
- **final**: 你最终确认的版本
中间改了几轮不管。在 Google Doc 里来回改了 10 次?无所谓,只比较首尾。
### Agent 操作指南
**写完内容后:**
```bash
python3 scripts/observe.py record-original <file> --account <账号> --content-type <类型>
```
**人类确认最终版后:**
```bash
python3 scripts/observe.py record-final <file> --match <hash>
```
**提取规则(手动或 cron 自动):**
```bash
python3 scripts/improve.py auto --skill .
```
### 规则分级
| 级别 | 含义 | 处理方式 |
|------|------|---------|
| P0 | 高置信度(多次出现) | 自动应用 |
| P1 | 中置信度 | 人工确认 |
| P2 | 低置信度(仅 1 次) | 存档观察 |
### 安全
- 每次更新前自动备份 SKILL.md
- `improve.py rollback` 一键回滚
- auto 模式只应用 P0
---
## 📊 CLI 参考
### observe.py零依赖纯 Python
| 命令 | 功能 |
|------|------|
| `record-original <file>` | 记录 AI 原稿 |
| `record-final <file> --match <hash>` | 记录最终版 |
| `pending` | 查看待配对 |
| `stats` | 统计 |
### improve.py需要 LLM CLI
| 命令 | 功能 |
|------|------|
| `extract [--days 7]` | 提取改进建议 |
| `auto` | 提取 + 自动应用 P0 |
| `show` | 查看提案 |
| `apply <id>` | 应用提案 |
| `rollback` | 回滚 |
支持的 LLM CLI: `claude`Claude Code/ `llm`pip install llm/ `IMPROVE_LLM_CMD` 环境变量
---
## 📂 数据存储
```
~/clawd/memory/ # OpenClaw
~/.claude/memory/ # Claude Code
├── skill-runs/<skill-name>/
│ └── YYYY-MM-DD.jsonl # 每日观察日志
├── skill-proposals/<skill-name>/
│ └── YYYYMMDD-HHMMSS.md # 改进提案
└── skill-backups/<skill-name>/
└── SKILL-YYYYMMDD-HHMMSS.md # 自动备份
```
自动检测环境,不需要手动配置路径。
---
## 🚀 30 天预期
| 时间 | 预期效果 |
|------|---------|
| 第 1 周 | 积累 3-5 次修改,生成第一批规则 |
| 第 2 周 | 10+ 条规则AI 输出明显更像你 |
| 第 1 月 | 30+ 条规则,风格维度自动校准 |
| 持续 | 规则库稳定增长,新 pattern 自动捕捉 |

426
scripts/improve.py Normal file
View File

@@ -0,0 +1,426 @@
#!/usr/bin/env python3
"""
Self-Improving Skill — Improver
从人类修改中提取规则,更新目标 SKILL.md。
用法:
python3 improve.py extract [--days 7] [--date 2026-03-17]
python3 improve.py auto # 提取 + 自动应用 P0cron 用)
python3 improve.py show # 查看所有提案
python3 improve.py apply <proposal_id> # 应用指定提案
python3 improve.py rollback # 回滚上次应用
环境变量:
SKILL_LOG_DIR — 日志目录
SKILL_TARGET_PATH — 目标 SKILL.md 路径
SKILL_PROPOSAL_DIR — 提案目录
SKILL_BACKUP_DIR — 备份目录
"""
import sys
import json
import os
import argparse
import subprocess
import shutil
from pathlib import Path
from datetime import datetime, timedelta
# 默认路径 — 自动检测 OpenClaw (~/.openclaw) 或 Claude Code (~/.claude)
def _detect_base():
"""检测数据存储基目录"""
# 优先级: 环境变量 > ~/clawd > ~/.openclaw > ~/.claude > ~/.self-improving
if os.environ.get("SKILL_BASE_DIR"):
return Path(os.environ["SKILL_BASE_DIR"])
candidates = [
Path.home() / "clawd" / "memory",
Path.home() / ".openclaw" / "memory",
Path.home() / ".claude" / "memory",
]
for c in candidates:
if c.parent.exists():
return c
return Path.home() / ".self-improving" / "memory"
_BASE = _detect_base()
DEFAULT_LOG_DIR = _BASE / "skill-runs" / "default"
DEFAULT_PROPOSAL_DIR = _BASE / "skill-proposals" / "default"
DEFAULT_BACKUP_DIR = _BASE / "skill-backups" / "default"
def get_paths(args=None):
"""解析所有路径配置"""
skill_name = "default"
if args and hasattr(args, 'skill') and args.skill:
skill_name = Path(args.skill).name
base = Path.home() / "clawd" / "memory"
log_dir = Path(os.environ.get("SKILL_LOG_DIR",
getattr(args, 'log_dir', None) or
str(base / "skill-runs" / skill_name)))
proposal_dir = Path(os.environ.get("SKILL_PROPOSAL_DIR",
getattr(args, 'proposal_dir', None) or
str(base / "skill-proposals" / skill_name)))
backup_dir = Path(os.environ.get("SKILL_BACKUP_DIR",
getattr(args, 'backup_dir', None) or
str(base / "skill-backups" / skill_name)))
# 目标 SKILL.md
if os.environ.get("SKILL_TARGET_PATH"):
target = Path(os.environ["SKILL_TARGET_PATH"])
elif args and hasattr(args, 'target') and args.target:
target = Path(args.target)
elif args and hasattr(args, 'skill') and args.skill:
target = Path(args.skill) / "SKILL.md"
else:
target = None
for d in (log_dir, proposal_dir, backup_dir):
d.mkdir(parents=True, exist_ok=True)
return log_dir, proposal_dir, backup_dir, target
def read_log_entries(log_file):
if not log_file.exists():
return []
entries = []
with log_file.open("r") as f:
for line in f:
line = line.strip()
if line:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
return entries
def collect_edits(log_dir, days=1, date_str=None):
"""收集有实际修改的 final/edited 记录"""
edits = []
if date_str:
log_file = log_dir / f"{date_str}.jsonl"
entries = read_log_entries(log_file)
edits.extend([e for e in entries
if e["type"] in ("final", "edited") and not e.get("no_change")])
else:
for i in range(days):
date = (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d")
log_file = log_dir / f"{date}.jsonl"
entries = read_log_entries(log_file)
edits.extend([e for e in entries
if e["type"] in ("final", "edited") and not e.get("no_change")])
return edits
def call_llm(prompt, timeout=180):
"""调用 LLM — 自动检测可用的 CLI (claude / openclaw / llm)"""
# 优先级: claude CLI → openclaw exec → generic llm CLI
candidates = [
["claude", "--print", "--model", "sonnet"], # Claude Code
["claude", "--print"], # Claude Code (default model)
["llm", "-m", "claude-sonnet"], # simon willison's llm CLI
["llm"], # llm CLI default
]
for cmd in candidates:
try:
result = subprocess.run(cmd, input=prompt, capture_output=True,
text=True, timeout=timeout)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except (FileNotFoundError, subprocess.TimeoutExpired):
continue
# fallback: 如果 IMPROVE_LLM_CMD 环境变量设置了,用它
custom_cmd = os.environ.get("IMPROVE_LLM_CMD")
if custom_cmd:
try:
result = subprocess.run(custom_cmd.split(), input=prompt,
capture_output=True, text=True, timeout=timeout)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
print("❌ LLM 调用失败。支持的方式:")
print(" - 安装 Claude Code CLI (claude --print)")
print(" - 安装 llm CLI (pip install llm)")
print(" - 设置 IMPROVE_LLM_CMD 环境变量")
return None
def extract_improvements(args):
log_dir, proposal_dir, _, target = get_paths(args)
days = getattr(args, 'days', 1) or 1
date_str = getattr(args, 'date', None)
edits = collect_edits(log_dir, days=days, date_str=date_str)
if not edits:
print("⚠️ 没有修改记录")
return None
print(f"📊 找到 {len(edits)} 次修改,正在分析...")
# 读当前 SKILL.md
current_skill = ""
if target and target.exists():
current_skill = target.read_text()
# 构建对比数据
edit_summaries = []
for i, edit in enumerate(edits):
orig = edit.get("original_content", "")[:3000]
final = edit.get("final_content", edit.get("edited_content", ""))[:3000]
ctx = edit.get("context", {})
edit_summaries.append({
"index": i + 1,
"account": ctx.get("account", "unknown"),
"content_type": ctx.get("content_type", "unknown"),
"original": orig,
"final": final,
})
proposal_id = datetime.now().strftime("%Y%m%d-%H%M%S")
prompt = f"""你是 writing style skill 的改进助手。
分析人类对 AI 生成文章的修改,提取可以加入 SKILL.md 的新规则。
## 当前 SKILL.md最后 3000 字,避免重复)
{current_skill[-3000:]}
## 修改记录original vs final
{json.dumps(edit_summaries, ensure_ascii=False, indent=2)}
## 要求
1. 对比 original 和 final找出系统性修改
2. 只提取有 pattern 的修改(至少 2 次,或单次但改动幅度大且明确)
3. 不要提取已在 SKILL.md 中的规则
4. 每条规则必须可执行
## 输出格式
---
id: {proposal_id}
date: {datetime.now().isoformat()}
source: {len(edits)} edits
status: pending
---
# Improvement Proposal
## 提取的改进建议
### 1. 新禁止词
- **`词`** → 替代: "YYY" | 理由: ... | 优先级: **P0/P1/P2**
### 2. 新风格规则
- 规则描述 | 理由: ... | 优先级: **P0/P1/P2**
### 3. 反模式
- 描述 | 理由: ... | 优先级: **P0/P1/P2**
P0=高置信度(多次), P1=中置信度, P2=低置信度(仅1次)
"""
suggestions = call_llm(prompt)
if not suggestions:
return None
proposal_file = proposal_dir / f"{proposal_id}.md"
proposal_file.write_text(suggestions)
print(f"✅ 改进建议已保存: {proposal_file}")
print(f"\n{suggestions[:2000]}")
if len(suggestions) > 2000:
print(f"\n... (完整内容见文件)")
return proposal_id
def show_proposals(args):
_, proposal_dir, _, _ = get_paths(args)
proposals = list(proposal_dir.glob("*.md"))
if not proposals:
print("⚠️ 没有提案")
return
print(f"\n📋 共 {len(proposals)} 个提案:\n")
for p in sorted(proposals, reverse=True):
content = p.read_text()
status = "unknown"
for line in content.split("\n")[:10]:
if line.startswith("status:"):
status = line.split(":", 1)[1].strip()
icon = {"pending": "", "applied": "", "rejected": ""}.get(
status.split("(")[0].strip(), "")
print(f" {icon} {p.stem}{status}")
def backup_skill(target, backup_dir):
if not target or not target.exists():
return None
name = f"SKILL-{datetime.now().strftime('%Y%m%d-%H%M%S')}.md"
backup_path = backup_dir / name
shutil.copy2(target, backup_path)
print(f"📦 备份: {backup_path}")
return backup_path
def apply_proposal(args):
_, proposal_dir, backup_dir, target = get_paths(args)
proposal_id = args.proposal_id
proposal_file = proposal_dir / f"{proposal_id}.md"
if not proposal_file.exists():
print(f"❌ 提案不存在: {proposal_id}")
return
if not target or not target.exists():
print(f"❌ 目标 SKILL.md 不存在(用 --skill 或 --target 指定)")
return
proposal_content = proposal_file.read_text()
current_skill = target.read_text()
auto_mode = getattr(args, 'auto', False)
filter_level = "P0" if auto_mode else "P0 和 P1"
backup_skill(target, backup_dir)
prompt = f"""把改进提案中的 **{filter_level}** 规则合并到 SKILL.md。
规则:
1. 新禁止词 → 加到禁止词 section
2. 新风格规则 → 加到对应 section
3. 不删除已有规则,不改文件结构
4. version +0.1
## 提案
{proposal_content}
## 当前 SKILL.md
{current_skill}
输出完整更新后的 SKILL.md。不加代码块包裹。"""
updated = call_llm(prompt, timeout=300)
if not updated:
print("❌ 合并失败")
return
target.write_text(updated)
new_content = proposal_content.replace(
"status: pending",
f"status: applied ({datetime.now().strftime('%Y-%m-%d')})")
proposal_file.write_text(new_content)
print(f"✅ 已应用提案 {proposal_id}")
print(f"💡 回滚: python3 improve.py rollback")
def auto_improve(args):
log_dir, _, _, _ = get_paths(args)
edits = collect_edits(log_dir, days=7)
if not edits:
print("⚠️ 最近 7 天没有修改记录,跳过")
return
print(f"🤖 自动模式: {len(edits)} 次修改")
args.days = 7
args.date = None
proposal_id = extract_improvements(args)
if not proposal_id:
return
_, proposal_dir, _, _ = get_paths(args)
content = (proposal_dir / f"{proposal_id}.md").read_text()
if "P0" not in content:
print(" 没有 P0 规则,跳过自动应用")
return
print("\n🔄 自动应用 P0 规则...")
apply_args = argparse.Namespace(**vars(args))
apply_args.proposal_id = proposal_id
apply_args.auto = True
apply_proposal(apply_args)
def rollback(args):
_, _, backup_dir, target = get_paths(args)
if not target:
print("❌ 未指定目标 SKILL.md")
return
backups = sorted(backup_dir.glob("SKILL-*.md"), reverse=True)
if not backups:
print("❌ 没有备份")
return
latest = backups[0]
# 保存当前版本
if target.exists():
emergency = backup_dir / f"SKILL-pre-rollback-{datetime.now().strftime('%Y%m%d-%H%M%S')}.md"
shutil.copy2(target, emergency)
shutil.copy2(latest, target)
print(f"✅ 已回滚到: {latest.name}")
def add_common_args(parser):
parser.add_argument("--skill", help="目标 skill 目录")
parser.add_argument("--target", help="目标 SKILL.md 路径")
parser.add_argument("--log-dir", help="日志目录")
parser.add_argument("--proposal-dir", help="提案目录")
def main():
parser = argparse.ArgumentParser(description="Self-Improving Skill — Improver")
subparsers = parser.add_subparsers(dest="action")
p_ext = subparsers.add_parser("extract", help="提取改进建议")
p_ext.add_argument("--date", help="指定日期")
p_ext.add_argument("--days", type=int, default=1)
add_common_args(p_ext)
p_show = subparsers.add_parser("show", help="查看提案")
add_common_args(p_show)
p_apply = subparsers.add_parser("apply", help="应用提案")
p_apply.add_argument("proposal_id")
add_common_args(p_apply)
p_auto = subparsers.add_parser("auto", help="自动提取+应用P0")
add_common_args(p_auto)
p_rb = subparsers.add_parser("rollback", help="回滚")
add_common_args(p_rb)
args = parser.parse_args()
if not args.action:
parser.print_help()
sys.exit(1)
actions = {
"extract": extract_improvements,
"show": show_proposals,
"apply": apply_proposal,
"auto": auto_improve,
"rollback": rollback,
}
actions[args.action](args)
if __name__ == "__main__":
main()

309
scripts/observe.py Normal file
View File

@@ -0,0 +1,309 @@
#!/usr/bin/env python3
"""
Self-Improving Skill — Observation Layer
通用的 original → final 记录器。适用于任何 "AI 生成 → 人类修改" 的循环。
用法:
python3 observe.py record-original article.md [--text "..."] [--account X] [--content-type article]
python3 observe.py record-final article.md [--match <hash>] [--text "..."]
python3 observe.py pending
python3 observe.py stats
环境变量:
SKILL_LOG_DIR — 日志目录(默认 ~/clawd/memory/skill-runs/default/
"""
import sys
import json
import os
import argparse
from pathlib import Path
from datetime import datetime, timedelta
import hashlib
# 日志目录 — 自动检测 OpenClaw / Claude Code / standalone
def _detect_base():
if os.environ.get("SKILL_BASE_DIR"):
return Path(os.environ["SKILL_BASE_DIR"])
candidates = [
Path.home() / "clawd" / "memory",
Path.home() / ".openclaw" / "memory",
Path.home() / ".claude" / "memory",
]
for c in candidates:
if c.parent.exists():
return c
return Path.home() / ".self-improving" / "memory"
DEFAULT_LOG_DIR = _detect_base() / "skill-runs" / "default"
def get_log_dir(args=None):
"""按优先级决定日志目录:--log-dir > SKILL_LOG_DIR > default"""
if args and hasattr(args, 'log_dir') and args.log_dir:
d = Path(args.log_dir)
elif os.environ.get("SKILL_LOG_DIR"):
d = Path(os.environ["SKILL_LOG_DIR"])
elif args and hasattr(args, 'skill') and args.skill:
skill_name = Path(args.skill).name
d = Path.home() / "clawd" / "memory" / "skill-runs" / skill_name
else:
d = DEFAULT_LOG_DIR
d.mkdir(parents=True, exist_ok=True)
return d
def get_log_file(log_dir, date_str=None):
if date_str is None:
date_str = datetime.now().strftime("%Y-%m-%d")
return log_dir / f"{date_str}.jsonl"
def compute_hash(content):
return hashlib.md5(content.encode()).hexdigest()[:8]
def get_content(args):
if hasattr(args, 'text') and args.text:
return args.text
if hasattr(args, 'stdin') and args.stdin:
return sys.stdin.read()
if hasattr(args, 'file') and args.file:
p = Path(args.file)
if not p.exists():
print(f"❌ 文件不存在: {args.file}")
sys.exit(1)
return p.read_text()
return None
def read_log_entries(log_file):
if not log_file.exists():
return []
entries = []
with log_file.open("r") as f:
for line in f:
line = line.strip()
if line:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
return entries
def find_unmatched(log_dir, days=14):
"""找到有 original 但没有 final 的记录(兼容旧 edited/no-change type"""
all_originals = {}
all_matched = set()
for i in range(days):
date = (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d")
entries = read_log_entries(get_log_file(log_dir, date))
for e in entries:
if e["type"] == "original":
e["_date"] = date
all_originals[e["content_hash"]] = e
elif e["type"] in ("final", "edited", "no-change"):
all_matched.add(e["content_hash"])
return {h: o for h, o in all_originals.items() if h not in all_matched}
def record_original(args):
content = get_content(args)
if not content:
print("❌ 需要提供内容(文件路径、--text 或 --stdin")
sys.exit(1)
log_dir = get_log_dir(args)
content_hash = compute_hash(content)
context = {}
if hasattr(args, 'account') and args.account:
context["account"] = args.account
if hasattr(args, 'content_type') and args.content_type:
context["content_type"] = args.content_type
log_entry = {
"timestamp": datetime.now().isoformat(),
"type": "original",
"content_hash": content_hash,
"file": str(args.file) if hasattr(args, 'file') and args.file else None,
"content": content,
"context": context,
"char_count": len(content),
}
log_file = get_log_file(log_dir)
with log_file.open("a") as f:
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
print(f"✅ 记录原稿: {content_hash}")
print(f"📝 字数: {len(content)}")
print(f"📁 日志: {log_file}")
return content_hash
def record_final(args):
content = get_content(args)
if not content:
print("❌ 需要提供最终版内容")
sys.exit(1)
log_dir = get_log_dir(args)
target_hash = getattr(args, 'match', None)
if target_hash:
# 指定 hash跨天查找
original = None
for i in range(14):
date = (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d")
entries = read_log_entries(get_log_file(log_dir, date))
for e in entries:
if e["type"] == "original" and e["content_hash"] == target_hash:
original = e
break
if original:
break
if not original:
print(f"❌ 找不到 hash {target_hash} 的原稿")
sys.exit(1)
else:
# 自动匹配最近的未配对 original
unmatched = find_unmatched(log_dir, 14)
if not unmatched:
print("❌ 没有待配对的原稿")
sys.exit(1)
target_hash = list(unmatched.keys())[-1]
original = unmatched[target_hash]
is_same = content.strip() == original["content"].strip()
log_entry = {
"timestamp": datetime.now().isoformat(),
"type": "final",
"content_hash": target_hash,
"original_content": original["content"],
"final_content": content,
"original_char_count": len(original["content"]),
"final_char_count": len(content),
"context": original.get("context", {}),
"no_change": is_same,
}
log_file = get_log_file(log_dir)
with log_file.open("a") as f:
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
if is_same:
print(f"✅ 记录最终版: {target_hash} (无修改 — 正反馈)")
else:
diff_pct = ((len(content) - len(original["content"])) / max(len(original["content"]), 1)) * 100
print(f"✅ 记录最终版: {target_hash}")
print(f"📝 原稿: {len(original['content'])} 字 → 最终: {len(content)} 字 ({diff_pct:+.1f}%)")
print(f"📁 日志: {log_file}")
def show_pending(args):
log_dir = get_log_dir(args)
unmatched = find_unmatched(log_dir, 14)
if not unmatched:
print("✅ 全部已配对")
return
print(f"\n{len(unmatched)} 条待配对:\n")
for h, entry in unmatched.items():
preview = entry["content"][:80].replace("\n", " ")
ctx = entry.get("context", {})
account = ctx.get("account", "?")
date = entry.get("_date", "?")
print(f" 📄 {h} | {date} | {account}")
print(f" {preview}...")
print()
def show_stats(args):
log_dir = get_log_dir(args)
all_files = sorted(log_dir.glob("*.jsonl"))
total_originals = 0
total_finals = 0
total_no_change = 0
total_changed = 0
for log_file in all_files:
entries = read_log_entries(log_file)
total_originals += sum(1 for e in entries if e["type"] == "original")
finals = [e for e in entries if e["type"] in ("final", "edited")]
total_finals += len(finals)
total_no_change += sum(1 for e in finals if e.get("no_change"))
total_changed += sum(1 for e in finals if not e.get("no_change"))
pending = find_unmatched(log_dir, 14)
print(f"\n📊 Self-Improving Skill 观察统计")
print(f"{'='*40}")
print(f"📁 日志天数: {len(all_files)}")
print(f"📝 原稿: {total_originals}")
print(f"✅ 已配对: {total_finals}")
print(f" ├ 有修改: {total_changed}")
print(f" └ 无修改: {total_no_change}")
print(f"⏳ 待配对: {len(pending)}")
if total_finals > 0:
print(f"📈 修改率: {total_changed}/{total_finals} = {total_changed/total_finals*100:.0f}%")
def add_common_args(parser):
"""添加通用参数"""
parser.add_argument("--skill", help="目标 skill 目录名(用于定位日志目录)")
parser.add_argument("--log-dir", help="自定义日志目录")
def main():
parser = argparse.ArgumentParser(description="Self-Improving Skill — Observation Layer")
subparsers = parser.add_subparsers(dest="action")
# record-original
p_orig = subparsers.add_parser("record-original", help="记录 AI 原稿")
p_orig.add_argument("file", nargs="?", help="文件路径")
p_orig.add_argument("--text", help="直接传入文本")
p_orig.add_argument("--stdin", action="store_true", help="从 stdin 读取")
p_orig.add_argument("--account", help="发布账号")
p_orig.add_argument("--content-type", help="内容类型")
add_common_args(p_orig)
# record-final
p_final = subparsers.add_parser("record-final", help="记录最终版")
p_final.add_argument("file", nargs="?", help="文件路径")
p_final.add_argument("--text", help="直接传入文本")
p_final.add_argument("--stdin", action="store_true", help="从 stdin 读取")
p_final.add_argument("--match", help="匹配的原稿 hash")
add_common_args(p_final)
# pending
p_pending = subparsers.add_parser("pending", help="查看待配对原稿")
add_common_args(p_pending)
# stats
p_stats = subparsers.add_parser("stats", help="总体统计")
add_common_args(p_stats)
args = parser.parse_args()
if not args.action:
parser.print_help()
sys.exit(1)
actions = {
"record-original": record_original,
"record-final": record_final,
"pending": show_pending,
"stats": show_stats,
}
actions[args.action](args)
if __name__ == "__main__":
main()