eddie-soehnel-portable-iden.../scripts/insights-hub-posts-last-6-months.py
2026-06-16 13:20:04 -06:00

309 lines
8.5 KiB
Python

#!/usr/bin/env python3
"""
Publish Insights Hub records from the Personal Organization JSON database.
What it does:
1. Scans source JSON records.
2. Selects records where HubTags contains "External Platform Posts".
3. Copies matching JSON files into the Insights Hub hrecords folder.
4. Copies referenced File/Image assets into the Insights Hub files folder.
5. Builds a Markdown digest for records dated within the last 6 months,
sorted most recent first.
Run manually weekly:
python insights-hub-posts-last-6-months.py
"""
from __future__ import annotations
import json
import shutil
from datetime import datetime, date
from pathlib import Path
from typing import Any, Iterable
# -----------------------------
# CONFIG
# -----------------------------
SOURCE_HRECORDS_DIR = Path(
r"C:\Users\edsoe\My Drive\Personal Organization\JSON_Database\hrecords"
)
SOURCE_FILES_DIR = Path(
r"C:\Users\edsoe\My Drive\Personal Organization\JSON_Database\files"
)
TARGET_BASE_DIR = Path(
r"C:\projects\ES\eddie-soehnel-portable-identity-document-OPEN\data\insights-hub"
)
TARGET_HRECORDS_DIR = TARGET_BASE_DIR / "hrecords"
TARGET_FILES_DIR = TARGET_BASE_DIR / "files"
# Change this filename if you prefer a different published Markdown name.
OUTPUT_MD_FILE = TARGET_BASE_DIR / "insights-hub-posts-last-6-months.md"
REQUIRED_TAG = "External Platform Posts"
# Approximate "last 6 months" as 183 days. This avoids requiring extra packages.
LAST_N_DAYS = 183
# -----------------------------
# HELPERS
# -----------------------------
def parse_record_date(value: Any) -> date | None:
"""
Parse common date formats found in hrecord JSON files.
Example attached file uses MM/DD/YYYY, e.g. 06/12/2026.
"""
if not value:
return None
text = str(value).strip()
formats = [
"%m/%d/%Y",
"%Y-%m-%d",
"%Y/%m/%d",
"%m-%d-%Y",
"%B %d, %Y",
"%b %d, %Y",
]
for fmt in formats:
try:
return datetime.strptime(text, fmt).date()
except ValueError:
pass
return None
def normalize_to_list(value: Any) -> list[str]:
"""
Accepts strings, lists, or empty values and returns a clean list of strings.
"""
if value is None:
return []
if isinstance(value, list):
return [str(item).strip() for item in value if str(item).strip()]
if isinstance(value, str):
text = value.strip()
return [text] if text else []
return [str(value).strip()] if str(value).strip() else []
def has_required_tag(record: dict[str, Any]) -> bool:
tags = normalize_to_list(record.get("HubTags"))
return REQUIRED_TAG in tags
def safe_asset_names(record: dict[str, Any]) -> list[str]:
"""
Pull file/image references from the JSON.
Supports:
- "File": "filename.pdf"
- "Image": "image.jpg"
- "File": ["one.pdf", "two.pdf"]
- "Image": ["one.jpg", "two.jpg"]
Only local filenames/relative paths are copied. URLs are ignored.
"""
names: list[str] = []
for key in ("File", "Image"):
for item in normalize_to_list(record.get(key)):
if item.startswith(("http://", "https://")):
continue
names.append(item)
# De-duplicate while preserving order.
seen = set()
cleaned = []
for name in names:
if name not in seen:
cleaned.append(name)
seen.add(name)
return cleaned
def copy_file_if_exists(source: Path, target: Path) -> bool:
target.parent.mkdir(parents=True, exist_ok=True)
if not source.exists():
return False
shutil.copy2(source, target)
return True
def load_json_file(path: Path) -> dict[str, Any] | None:
try:
with path.open("r", encoding="utf-8-sig") as f:
data = json.load(f)
if isinstance(data, dict):
return data
print(f"SKIP non-object JSON: {path}")
return None
except Exception as exc:
print(f"SKIP unreadable JSON: {path} | {exc}")
return None
def markdown_escape(text: Any) -> str:
"""
Keep this light so links and normal Markdown in summaries still work.
"""
if text is None:
return ""
return str(text).strip()
def build_image_markdown(record: dict[str, Any]) -> str:
lines = []
for image_name in normalize_to_list(record.get("Image")):
if image_name.startswith(("http://", "https://")):
image_src = image_name
else:
# The Markdown file lives in TARGET_BASE_DIR, and copied images live
# in TARGET_BASE_DIR/files, so this relative path should render correctly.
image_src = f"files/{Path(image_name).name}"
alt = markdown_escape(record.get("Title")) or Path(image_name).stem
lines.append(f"![{alt}]({image_src})")
return "\n\n".join(lines)
def iter_json_files(folder: Path) -> Iterable[Path]:
yield from folder.glob("*.json")
# -----------------------------
# MAIN
# -----------------------------
def main() -> None:
TARGET_HRECORDS_DIR.mkdir(parents=True, exist_ok=True)
TARGET_FILES_DIR.mkdir(parents=True, exist_ok=True)
if not SOURCE_HRECORDS_DIR.exists():
raise FileNotFoundError(f"Source hrecords folder not found: {SOURCE_HRECORDS_DIR}")
if not SOURCE_FILES_DIR.exists():
print(f"WARNING: Source files folder not found: {SOURCE_FILES_DIR}")
print("JSON files will still be copied, but image/file assets cannot be copied.")
today = date.today()
cutoff = today.toordinal() - LAST_N_DAYS
matched_records: list[dict[str, Any]] = []
copied_json_count = 0
copied_asset_count = 0
missing_asset_count = 0
for json_path in iter_json_files(SOURCE_HRECORDS_DIR):
record = load_json_file(json_path)
if not record:
continue
if not has_required_tag(record):
continue
record_date = parse_record_date(record.get("Date"))
record["_source_json_path"] = str(json_path)
record["_json_filename"] = json_path.name
record["_parsed_date"] = record_date
matched_records.append(record)
# Copy JSON record.
if copy_file_if_exists(json_path, TARGET_HRECORDS_DIR / json_path.name):
copied_json_count += 1
# Copy referenced files/images.
for asset_name in safe_asset_names(record):
source_asset = SOURCE_FILES_DIR / asset_name
target_asset = TARGET_FILES_DIR / Path(asset_name).name
if copy_file_if_exists(source_asset, target_asset):
copied_asset_count += 1
else:
missing_asset_count += 1
print(f"MISSING asset referenced by {json_path.name}: {source_asset}")
# Last 6 months only for the Markdown digest.
recent_records = [
r for r in matched_records
if r.get("_parsed_date") is not None
and r["_parsed_date"].toordinal() >= cutoff
]
# Most recent first.
recent_records.sort(
key=lambda r: (
r.get("_parsed_date") or date.min,
str(r.get("HubID", "")),
),
reverse=True,
)
md_lines = [
"# Insights Hub Posts - Last 6 Months",
"",
f"Generated: {today.isoformat()}",
"",
f"Source tag: `{REQUIRED_TAG}`",
"",
"---",
"",
]
for record in recent_records:
title = markdown_escape(record.get("Title")) or "Untitled"
summary = markdown_escape(record.get("Summary"))
record_date = record.get("_parsed_date")
date_text = record_date.isoformat() if record_date else markdown_escape(record.get("Date"))
md_lines.append(f"## {title}")
md_lines.append("")
if date_text:
md_lines.append(f"**Date:** {date_text}")
md_lines.append("")
if summary:
md_lines.append(summary)
md_lines.append("")
image_md = build_image_markdown(record)
if image_md:
md_lines.append(image_md)
md_lines.append("")
md_lines.append("---")
md_lines.append("")
OUTPUT_MD_FILE.parent.mkdir(parents=True, exist_ok=True)
OUTPUT_MD_FILE.write_text("\n".join(md_lines), encoding="utf-8")
print("DONE")
print(f"Matched records: {len(matched_records)}")
print(f"Records in 6-month Markdown: {len(recent_records)}")
print(f"Copied JSON files: {copied_json_count}")
print(f"Copied assets: {copied_asset_count}")
print(f"Missing assets: {missing_asset_count}")
print(f"Markdown output: {OUTPUT_MD_FILE}")
if __name__ == "__main__":
main()