ai-robot-core/tmp_kb_transform.py

348 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from pathlib import Path
import re
import csv
SRC = Path(r"D:/wxChatData/xwechat_files/wxid_j9wciaq7pbxo22_667d/msg/file/2026-03/知识库_课程知识库")
OUT_ROOT = Path(r"Q:/agentProject/ai-robot-core/docs/kb/result")
KB_DIR = OUT_ROOT / "课程知识库_原子化_单类目"
KB_DIR.mkdir(parents=True, exist_ok=True)
INFO_TYPE_MAP = {
"课表": "schedule",
"课程收获": "benefit",
"主讲": "feature",
"赠礼": "feature",
"课程": "overview",
"知识点": "objective",
"案例": "feature",
"方法": "objective",
"海报": "overview",
}
DROP_KEYWORDS = [
"高途是一个在线教育平台",
"我的动态功能",
"我的订单功能",
"学币商城",
"购物车功能",
"意见反馈功能",
"中奖记录",
"帮助中心",
"社区公约",
"专题中心",
"赚现金",
"周周分享",
"得钻石",
"100+高校vlog",
"邀请有礼",
"推荐有礼",
"课程评价功能",
"我的预约功能",
"我的关注功能",
"平台内",
]
LOW_VALUE_PATTERNS = [
r"^以下是.+详细安排$",
r"^课程安排包含.+$",
r"^课程状态显示为已过期$",
r"^用户可以查看全部课程列表$",
r"^平台内.+板块按科目分类展示课程$",
r"^该训练营课程共包含\d+节内容.*$",
r"^课程针对中考必考的最复杂最值问题$",
r"^课程从生活出发$",
]
GENERIC_SUBJECTS = {"课程", "该课程", "该语文课", "该英语课", "该数学课", "该物理课", "该化学课"}
def parse_file_meta(name: str):
m = re.match(r"([^_]+)_([^_]+)_([^\.]+)\.txt", name)
if not m:
return "通用", "通用", "通用"
return m.group(1), m.group(2), m.group(3)
def infer_info_type(text: str, default_type: str) -> str:
if re.search(r"Day\d+|周[一二三四五六日天]|\d{1,2}:\d{2}-\d{1,2}:\d{2}|开课", text):
return "schedule"
if any(k in text for k in ["目标", "旨在", "掌握", "培养", "帮助学生", "学会"]):
return "objective"
if any(k in text for k in ["收获", "提分", "提高", "打下基础", "建立"]):
return "benefit"
if any(k in text for k in ["老师", "主讲", "教龄", "学位", "博士", "硕士", "冠军", "称号"]):
return "feature"
if any(k in text for k in ["课程名称", "训练营", "课程计划", "教育项目", "课程是", "计划是"]):
return "overview"
return default_type
def clean_text(t: str) -> str:
t = re.sub(r"^【[^】]+】", "", t).strip().strip("")
t = t.replace("该课程", "课程").replace("本课程", "课程")
t = t.replace("该项目", "飞跃领航计划").replace("该计划", "飞跃领航计划")
return t
def split_text(t: str):
segs = [t]
for token in [",旨在", ",目标是", ",帮助", ",通过"]:
tmp = []
for s in segs:
if token in s and len(s) > 24:
a, b = s.split(token, 1)
tmp.append(a.strip(",。 "))
b = b.strip(",。 ")
if b:
if not b.startswith("课程"):
b = "课程目标" + b
tmp.append(b)
else:
tmp.append(s)
segs = tmp
return [x.strip(",。 ") for x in segs if len(x.strip(",。 ")) >= 6]
def is_low_value(seg: str) -> bool:
if any(re.match(pat, seg) for pat in LOW_VALUE_PATTERNS):
return True
# 去掉无关键实体且信息泛化的句子
has_time = bool(re.search(r"Day\d+|周[一二三四五六日天]|\d{1,2}:\d{2}-\d{1,2}:\d{2}", seg))
has_teacher = "老师" in seg or "主讲" in seg
has_specific_course = any(k in seg for k in ["课程名称", "课节名称", "主题是", "科目为", "中考", "真题", "文言文", "勾股定理", "酸碱", "凸透镜"])
if not (has_time or has_teacher or has_specific_course):
if len(seg) <= 18:
return True
if seg in GENERIC_SUBJECTS:
return True
# OCR噪声或结构残句
if any(k in seg for k in ["课程目标动态图形展示", "课程目标实验让学生", "课程目标文本与真题解析"]):
return True
return False
def resolve_course_reference(seg: str, last_course_name: str, last_subject: str) -> str:
explicit_name = last_course_name
if not explicit_name and last_subject:
explicit_name = f"{last_subject}"
if not explicit_name:
return seg
# 代词课程名显式化
seg = re.sub(r"^该语法课程", f"{explicit_name}课程", seg)
seg = re.sub(r"^该英语课", f"{explicit_name}课程", seg)
seg = re.sub(r"^该语文课", f"{explicit_name}课程", seg)
seg = re.sub(r"^该数学课", f"{explicit_name}课程", seg)
seg = re.sub(r"^该物理课", f"{explicit_name}课程", seg)
seg = re.sub(r"^该化学课", f"{explicit_name}课程", seg)
seg = re.sub(r"^该课程", f"{explicit_name}课程", seg)
# 泛化“课程”在可用上下文下显式化
seg = re.sub(r"^课程(旨在|目标|强调|还|将|内容|从|通过|运用|帮助|解决|涵盖)", f"{explicit_name}课程\\1", seg)
seg = re.sub(r"^课程使学生", f"{explicit_name}课程使学生", seg)
seg = re.sub(r"^课程学习", f"{explicit_name}课程学习", seg)
seg = re.sub(r"^课程时间表", f"{explicit_name}课程时间表", seg)
seg = re.sub(r"^课程安排", f"{explicit_name}课程安排", seg)
return seg
def process_one(fp: Path):
grade, subject, course_plan = parse_file_meta(fp.name)
default_type = INFO_TYPE_MAP.get(course_plan, "overview")
raw_lines = fp.read_text(encoding="utf-8", errors="ignore").splitlines()
part_a = []
rows = []
ambiguities = []
last_course_name = ""
last_subject = ""
for line in raw_lines:
line = line.strip()
if not line:
continue
text = clean_text(line)
if len(text) < 6:
continue
if any(k in text for k in DROP_KEYWORDS):
continue
for seg in split_text(text):
# 更新上下文课程名
m_name1 = re.search(r"课节名称为[“\"]([^”\"]+)[”\"]", seg)
m_name2 = re.search(r"课程名称是[“\"]([^”\"]+)[”\"]", seg)
m_name3 = re.search(r"主题是[“\"]([^”\"]+)[”\"]", seg)
if m_name1:
last_course_name = m_name1.group(1).strip()
elif m_name2:
last_course_name = m_name2.group(1).strip()
elif m_name3:
last_course_name = m_name3.group(1).strip()
# 兼容“课程的名称为【xx】”模式
m_name4 = re.search(r"课程的名称为[【\"]([^】\"]+)[】\"]", seg)
if m_name4:
last_course_name = m_name4.group(1).strip()
# 提取科目上下文
m_subject = re.search(r"科目为([^,。]+)", seg)
if m_subject:
last_subject = m_subject.group(1).strip()
seg = resolve_course_reference(seg, last_course_name, last_subject)
# 仍以“课程”开头且无显式课程名的句子视为低价值
if seg.startswith("课程") and not last_course_name:
continue
if any(k in seg for k in DROP_KEYWORDS):
continue
if is_low_value(seg):
continue
info_type = infer_info_type(seg, default_type)
type_label = {
"schedule": "课表",
"objective": "课程目标",
"benefit": "课程收获",
"feature": "课程特色",
"overview": "课程概述",
}[info_type]
a_line = f"{grade}-课程咨询-{type_label}{seg}"
part_a.append(a_line)
day_index = ""
m_day = re.search(r"(Day\d+)", seg)
if m_day:
day_index = m_day.group(1)
time_range = ""
m_time = re.search(r"(\d{1,2}:\d{2}-\d{1,2}:\d{2})", seg)
if m_time:
time_range = m_time.group(1)
teacher = ""
m_teacher = re.search(r"([\u4e00-\u9fa5]{2,4})老师", seg)
if m_teacher:
teacher = m_teacher.group(1)
rows.append([
a_line,
grade,
subject if subject else "通用",
"课程咨询",
course_plan,
day_index,
time_range,
teacher,
info_type,
])
if any(x in seg for x in ["思想道德(数)", "一文会云题", "1+1方法", "高考阅读提分"]):
ambiguities.append(seg)
# 去重
uniq = []
seen = set()
for x in part_a:
if x not in seen:
uniq.append(x)
seen.add(x)
part_a = uniq
uniq_rows = []
seen2 = set()
for r in rows:
if r[0] in seen2:
continue
seen2.add(r[0])
uniq_rows.append(r)
rows = uniq_rows
info_type_to_label = {
"schedule": "课表",
"objective": "课程目标",
"benefit": "课程收获",
"feature": "课程特色",
"overview": "课程概述",
}
grouped_rows: dict[str, list[list[str]]] = {
"schedule": [],
"objective": [],
"benefit": [],
"feature": [],
"overview": [],
}
for r in rows:
grouped_rows[r[8]].append(r)
outputs = []
for info_type, g_rows in grouped_rows.items():
if not g_rows:
continue
part_a_group = [r[0] for r in g_rows]
type_cn = info_type_to_label[info_type]
out_file = KB_DIR / f"{fp.stem}_{info_type}_原子化.md"
lines = []
lines.append("Part A原子知识行用于知识库正文")
lines.append("")
lines.extend(part_a_group)
lines.append("")
lines.append("Part B文件级元数据建议用于按文件打标")
lines.append("")
lines.append(f"- grade: {grade}")
lines.append(f"- subject: {subject if subject else '通用'}")
lines.append("- kb_scene: 课程咨询")
lines.append(f"- course_plan: {course_plan}")
lines.append(f"- info_type: {info_type}")
lines.append("")
lines.append("自检结果")
lines.append(f"- 总行数:{len(part_a_group)}")
lines.append(f"- 可打标行数:{len(part_a_group)}")
lines.append("- 拆分前后知识点完整性说明:已拆分混合句并保留时间、老师、课程名、目标等关键实体。")
lines.append(f"- 文档类目一致性:本文件仅包含 info_type={info_type}{type_cn})。")
if ambiguities:
lines.append("- 发现的歧义项:" + "".join(ambiguities) + "(需人工确认)")
else:
lines.append("- 发现的歧义项:无明显歧义项。")
out_file.write_text("\n".join(lines), encoding="utf-8")
outputs.append((info_type, len(part_a_group), str(out_file)))
total_lines = sum(x[1] for x in outputs)
return fp.name, total_lines, outputs
def main():
files = sorted([p for p in SRC.glob("*.txt") if p.name != "metadata_config.json"])
summary = []
for fp in files:
source_file, total_lines, outputs = process_one(fp)
for info_type, atomic_lines, output_file in outputs:
summary.append([source_file, info_type, atomic_lines, output_file])
index_file = KB_DIR / "_处理结果索引.csv"
with index_file.open("w", encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
writer.writerow(["source_file", "info_type", "atomic_lines", "output_file"])
writer.writerows(summary)
print(f"done: {len(files)} source files, {len(summary)} output files")
print(index_file)
if __name__ == "__main__":
main()