Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 199 additions & 23 deletions scripts/audit.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
#!/usr/bin/env python3
"""Security audit for skills in the Skill Evolution registry.
"""Security + stability audit for skills.

Scans all skills (or specific ones) against security rules.
Skills that pass get marked with audited_at timestamp.
Skills that fail get audited_at cleared.

Designed to run periodically (e.g. via cron or scheduler).
Requires SUPABASE_SERVICE_KEY (admin-only operation).
Modes:
1) Security audit (default): scan registry skills via Supabase.
2) Stability audit (--stability-audit): scan local git history for skills/* A->D churn.
"""

import argparse
import json
import re
import subprocess
import sys
import time
import urllib.parse
from pathlib import Path

Expand All @@ -22,10 +21,22 @@


def parse_args():
p = argparse.ArgumentParser(description="Audit skills for security issues")
p = argparse.ArgumentParser(description="Audit skills for security/stability issues")
p.add_argument("--name", default=None, help="Audit a specific skill by name (default: all)")
p.add_argument("--dry-run", action="store_true", help="Show results without updating database")
p.add_argument("--verbose", "-v", action="store_true", help="Show detailed findings per skill")

# Stability audit mode
p.add_argument("--stability-audit", action="store_true", help="Run git-based stability audit for skills/*")
p.add_argument("--repo", default=str(Path(__file__).resolve().parents[2]), help="Repo root for git scan")
p.add_argument("--days", type=int, default=7, help="Lookback window in days for stability audit")
p.add_argument("--top", type=int, default=1, help="Top N churn candidates to output")
p.add_argument("--report-file", default=None, help="Optional path to write markdown audit section")
p.add_argument(
"--include-deleted",
action="store_true",
help="Include skills that are no longer present under skills/ (default: active skills only)",
)
return p.parse_args()


Expand Down Expand Up @@ -109,7 +120,7 @@ def audit_skill(skill):
matches = re.findall(pattern, content, re.IGNORECASE)
for m in matches:
# Skip false positives
if any(fp in m for fp in ["SUPABASE", "TASKPOOL", "$HOME", "${HOME}", "os.environ", "os.getenv"]):
if any(fp in m for fp in ["SUPABASE", "TASKPOOL", "${HOME}", "os.environ", "os.getenv"]):
continue
findings.append(f"FAIL [{rel_path}]: {desc} — {m[:50]}...")

Expand Down Expand Up @@ -141,9 +152,149 @@ def audit_skill(skill):
return not has_fail, findings


def main():
args = parse_args()
def _run_git(repo, args):
cmd = ["git", "-C", repo, *args]
return subprocess.check_output(cmd, text=True, stderr=subprocess.DEVNULL)


def _list_active_skills(repo):
skills_dir = Path(repo) / "skills"
if not skills_dir.exists():
return set()
return {
p.name
for p in skills_dir.iterdir()
if p.is_dir() and not p.name.startswith(".")
}


def run_stability_audit(repo, days, top, report_file=None, include_deleted=False):
"""Scan git history and find skills with A->D churn within lookback window."""
try:
output = _run_git(
repo,
[
"log",
f"--since={days} days ago",
"--name-status",
"--pretty=format:__COMMIT__%H|%ct|%s",
"--",
"skills/",
],
)
except Exception as e:
raise RuntimeError(f"ERROR: git log scan failed: {e}")

skills = {}
current = None

for raw in output.splitlines():
line = raw.strip()
if not line:
continue
if line.startswith("__COMMIT__"):
meta = line.replace("__COMMIT__", "", 1)
parts = meta.split("|", 2)
if len(parts) < 3:
current = None
continue
current = {"hash": parts[0], "ts": int(parts[1]), "subject": parts[2]}
continue

if current is None:
continue

fields = line.split("\t")
if len(fields) < 2:
continue
status = fields[0][0] # A/M/D/R...
path = fields[-1]

if not path.startswith("skills/"):
continue

parts = path.split("/")
if len(parts) < 2:
continue
skill = parts[1]

rec = skills.setdefault(
skill,
{
"skill": skill,
"adds": 0,
"deletes": 0,
"last_add": None,
"last_delete": None,
"latest_ts": 0,
},
)

if status == "A":
rec["adds"] += 1
if rec["last_add"] is None or current["ts"] > rec["last_add"]["ts"]:
rec["last_add"] = current
elif status == "D":
rec["deletes"] += 1
if rec["last_delete"] is None or current["ts"] > rec["last_delete"]["ts"]:
rec["last_delete"] = current

if current["ts"] > rec["latest_ts"]:
rec["latest_ts"] = current["ts"]

churn_candidates = [v for v in skills.values() if v["adds"] > 0 and v["deletes"] > 0]

active_skills = _list_active_skills(repo)
if include_deleted:
candidates = churn_candidates
else:
candidates = [c for c in churn_candidates if c["skill"] in active_skills]

candidates.sort(key=lambda x: (x["adds"] + x["deletes"], x["latest_ts"]), reverse=True)
top_candidates = candidates[: max(1, top)]

result = {
"status": "ok",
"mode": "stability",
"lookback_days": days,
"generated_at": int(time.time()),
"skills_scanned": len(skills),
"active_skills": len(active_skills),
"churn_candidates_total": len(churn_candidates),
"churn_candidates": len(candidates),
"filtered_out_removed": max(0, len(churn_candidates) - len(candidates)),
"include_deleted": include_deleted,
"top": top_candidates,
}

if report_file:
p = Path(report_file)
p.parent.mkdir(parents=True, exist_ok=True)
with p.open("w", encoding="utf-8") as f:
f.write("## Stability Audit Candidates\n\n")
f.write(f"- Lookback: {days} days\n")
scope = "skills/* (active only)" if not include_deleted else "skills/* (including deleted)"
f.write(f"- Scope: {scope}\n")
f.write(f"- A→D churn candidates: {len(candidates)}")
if not include_deleted:
f.write(f" (filtered removed: {max(0, len(churn_candidates) - len(candidates))})")
f.write("\n\n")
if top_candidates:
t = top_candidates[0]
f.write(f"### Top1: `{t['skill']}`\n")
f.write(f"- Change counts: A={t['adds']} / D={t['deletes']}\n")
if t.get("last_add"):
f.write(f"- Recent add: {t['last_add']['hash'][:8]} · {t['last_add']['subject']}\n")
if t.get("last_delete"):
f.write(f"- Recent delete: {t['last_delete']['hash'][:8]} · {t['last_delete']['subject']}\n")
f.write("- Suggestion: prioritize this skill in today's improvement recommendation.\n")
else:
f.write("- No A→D churn candidates detected today.\n")

return result


def run_security_audit(args):
# Fetch skills (service key to read file_tree which may not be exposed via anon)
select = "id,name,variant,description,author,skill_md,file_tree,audited_at"
if args.name:
Expand Down Expand Up @@ -182,24 +333,49 @@ def main():

# Update database
if not args.dry_run:
supabase_rpc("audit_skill", {
"p_skill_id": skill["id"],
"p_passed": passed,
}, service_key=True, exit_on_error=False)
supabase_rpc(
"audit_skill",
{
"p_skill_id": skill["id"],
"p_passed": passed,
},
service_key=True,
exit_on_error=False,
)
status = "PASS" if passed else "FAIL"
print(f" {status}: {label} ({len(findings)} findings)", file=sys.stderr)

if args.dry_run:
print(json.dumps(results, indent=2, ensure_ascii=False))
else:
summary = f"Audit complete: {results['passed']}/{results['total']} passed, {results['failed']} failed"
print(summary, file=sys.stderr)
print(json.dumps({
"status": "ok",
"total": results["total"],
"passed": results["passed"],
"failed": results["failed"],
}, ensure_ascii=False))
print(
json.dumps(
{
"status": "ok",
"total": results["total"],
"passed": results["passed"],
"failed": results["failed"],
},
ensure_ascii=False,
)
)


def main():
args = parse_args()

if args.stability_audit:
result = run_stability_audit(
repo=args.repo,
days=max(1, args.days),
top=max(1, args.top),
report_file=args.report_file,
include_deleted=args.include_deleted,
)
print(json.dumps(result, ensure_ascii=False))
return

run_security_audit(args)


if __name__ == "__main__":
Expand Down