Coverage for src / gh_secrets_and_vars_async / rulesets.py: 95%
193 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-16 17:56 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-16 17:56 +0000
1"""Thin rulesets command: view or safely apply a caller-supplied spec.
3ai-gh holds no ruleset template knowledge. Callers (e.g. augint-shell skills)
4generate a ruleset spec in-context and hand it to ``ai-gh rulesets apply``,
5which finds the repo-scope branch ruleset with a matching name and replaces
6only that one -- never touching org-inherited, repository-scope, or tag-scope
7rulesets.
8"""
10import json
11from pathlib import Path
13import click
14from loguru import logger
15from rich import print
16from rich.panel import Panel
17from rich.table import Table
19from .common import (
20 configure_logging,
21 get_github_repo,
22 load_env_config,
23)
25REQUIRED_SPEC_FIELDS = ("name", "target", "rules")
28def get_rulesets(repo) -> list[dict]:
29 """Fetch all rulesets for the repository with full details.
31 The list endpoint only returns summaries. We fetch each ruleset
32 individually to get conditions, rules, and bypass actors.
33 """
34 _headers, summaries = repo._requester.requestJsonAndCheck("GET", f"{repo.url}/rulesets")
35 rulesets = []
36 for summary in summaries:
37 _h, detail = repo._requester.requestJsonAndCheck(
38 "GET", f"{repo.url}/rulesets/{summary['id']}"
39 )
40 rulesets.append(dict(detail))
41 return rulesets
44def validate_ruleset_spec(spec: object) -> None:
45 """Fail fast on a malformed spec. Raises click.ClickException with a clear message."""
46 if not isinstance(spec, dict):
47 raise click.ClickException("Ruleset spec must be a JSON object (dict).")
49 for field in REQUIRED_SPEC_FIELDS:
50 if field not in spec:
51 raise click.ClickException(f"Ruleset spec is missing required field: '{field}'.")
53 name = spec.get("name")
54 if not isinstance(name, str) or not name.strip():
55 raise click.ClickException("Ruleset spec 'name' must be a non-empty string.")
57 target = spec.get("target")
58 if target != "branch":
59 raise click.ClickException(
60 f"Ruleset spec 'target' must be 'branch' (got {target!r}). "
61 "ai-gh only applies branch-scope rulesets."
62 )
64 rules = spec.get("rules")
65 if not isinstance(rules, list):
66 raise click.ClickException("Ruleset spec 'rules' must be a list.")
69def _is_safe_to_mutate_ruleset(rs: dict) -> bool:
70 """Shared safety filter used by every code path that deletes or replaces a ruleset.
72 Returns True only for repo-scope branch rulesets. This is the T0-1 fix:
74 - ``target == "branch"`` -- never touch repository or tag scope.
75 - ``source_type == "Repository"`` -- never touch org-inherited rulesets.
76 """
77 if rs.get("target") != "branch":
78 return False
79 source_type = rs.get("source_type")
80 if source_type and source_type != "Repository":
81 return False
82 return True
85def find_replaceable_ruleset(repo, name: str) -> dict | None:
86 """Find the one repo-scope branch ruleset whose name matches."""
87 for rs in get_rulesets(repo):
88 if not _is_safe_to_mutate_ruleset(rs):
89 continue
90 if rs.get("name") != name:
91 continue
92 return rs
93 return None
96def find_deletable_ruleset(repo, name: str) -> tuple[dict | None, str | None]:
97 """Look up a ruleset by name for safe deletion.
99 Walks all rulesets on the repo and distinguishes three outcomes:
101 - ``(ruleset, None)`` -- a repo-scope branch ruleset with that name was
102 found and is safe to delete.
103 - ``(None, reason)`` -- a ruleset with that name exists but is not
104 deletable (org-inherited, repository-target, or tag-target). The
105 caller should exit with an error using ``reason`` as the message.
106 - ``(None, None)`` -- no ruleset with that name exists at all. The
107 caller should treat this as idempotent "nothing to delete".
109 If multiple rulesets share the name (unusual but possible), a
110 deletable one is preferred over a blocked one.
111 """
112 blocked_reason: str | None = None
113 for rs in get_rulesets(repo):
114 if rs.get("name") != name:
115 continue
116 if _is_safe_to_mutate_ruleset(rs):
117 return rs, None
118 # Same name, not safe to touch -- record why and keep looking
119 target = rs.get("target")
120 source_type = rs.get("source_type")
121 if target != "branch":
122 blocked_reason = (
123 f"ruleset '{name}' exists but is {target}-scope, not branch-scope; "
124 "refusing to delete"
125 )
126 else:
127 blocked_reason = (
128 f"ruleset '{name}' exists but is org-inherited "
129 f"(source_type={source_type}); refusing to delete"
130 )
131 return None, blocked_reason
134def _canonical_ruleset(rs: dict) -> dict:
135 """Normalize a ruleset for structural comparison.
137 Strips id/source/source_type metadata and sorts rules and bypass_actors
138 so API-returned ordering differences don't cause false drift.
139 """
141 def _rule_sort_key(rule: dict) -> str:
142 return str(rule.get("type", ""))
144 def _actor_sort_key(actor: dict) -> tuple[str, int]:
145 return (str(actor.get("actor_type", "")), int(actor.get("actor_id", 0) or 0))
147 def _normalize_rule(rule: dict) -> dict:
148 rtype = rule.get("type")
149 if rtype == "required_status_checks":
150 params = dict(rule.get("parameters", {}))
151 checks = params.get("required_status_checks", [])
152 params["required_status_checks"] = sorted(
153 checks, key=lambda c: str(c.get("context", ""))
154 )
155 return {"type": rtype, "parameters": params}
156 return {"type": rtype, "parameters": rule.get("parameters", {})}
158 return {
159 "name": rs.get("name"),
160 "target": rs.get("target"),
161 "enforcement": rs.get("enforcement"),
162 "conditions": rs.get("conditions", {}),
163 "rules": sorted((_normalize_rule(r) for r in rs.get("rules", [])), key=_rule_sort_key),
164 "bypass_actors": sorted(rs.get("bypass_actors", []), key=_actor_sort_key),
165 }
168def rulesets_match(existing: dict, spec: dict) -> bool:
169 """Return True if existing ruleset is already structurally equal to spec."""
170 return _canonical_ruleset(existing) == _canonical_ruleset(spec)
173def create_ruleset(repo, spec: dict, dry_run: bool = False) -> dict | None:
174 """POST a new ruleset from a spec dict. Strips repo-specific metadata first."""
175 payload = {k: v for k, v in spec.items() if k not in ("id", "source_type", "source")}
176 if dry_run:
177 logger.info(f"[DRY RUN] Would create ruleset: {payload.get('name', 'unknown')}")
178 return payload
179 _headers, data = repo._requester.requestJsonAndCheck(
180 "POST", f"{repo.url}/rulesets", input=payload
181 )
182 result: dict = data
183 logger.info(f"Created ruleset: {result.get('name', 'unknown')} (id={result.get('id', '?')})")
184 return result
187def apply_ruleset_spec(repo, spec: dict, dry_run: bool = False) -> dict | None:
188 """Apply one ruleset spec safely and idempotently.
190 - Validates the spec.
191 - Finds a replaceable repo-scope branch ruleset with the same name.
192 - If none found: create.
193 - If found and already structurally equal: no-op.
194 - If found but drifted: DELETE that one, then create the new version.
196 Never touches org-inherited, repository-scope, or tag-scope rulesets.
197 """
198 validate_ruleset_spec(spec)
199 name = spec["name"]
201 existing = find_replaceable_ruleset(repo, name)
203 if existing is not None and rulesets_match(existing, spec):
204 logger.info(f"Ruleset '{name}' already up-to-date (id={existing.get('id', '?')}).")
205 print(f"[green]Ruleset '{name}' is up-to-date. No changes.[/green]")
206 return existing
208 if existing is not None:
209 rs_id = existing.get("id")
210 if dry_run:
211 logger.info(f"[DRY RUN] Would delete drifted ruleset '{name}' (id={rs_id})")
212 else:
213 repo._requester.requestJsonAndCheck("DELETE", f"{repo.url}/rulesets/{rs_id}")
214 logger.info(f"Deleted drifted ruleset '{name}' (id={rs_id})")
216 return create_ruleset(repo, spec, dry_run=dry_run)
219def display_rulesets(rulesets: list[dict]) -> None:
220 """Pretty-print rulesets using Rich.
222 Shows name, enforcement, target, source_type (so org-inherited vs
223 repo-owned is visible), branch patterns, rules, and bypass actors.
224 """
225 if not rulesets:
226 print("[yellow]No rulesets configured for this repository.[/yellow]")
227 return
229 for rs in rulesets:
230 table = Table(show_header=False, box=None, padding=(0, 2))
231 table.add_column("Field", style="bold cyan")
232 table.add_column("Value")
234 table.add_row("Enforcement", str(rs.get("enforcement", "unknown")))
235 table.add_row("Target", str(rs.get("target", "unknown")))
236 table.add_row("Source", str(rs.get("source_type", "Repository")))
238 conditions = rs.get("conditions", {}).get("ref_name", {})
239 branches = ", ".join(conditions.get("include", []))
240 table.add_row("Branches", branches or "none")
242 rules = rs.get("rules", [])
243 rule_types = []
244 for rule in rules:
245 if rule["type"] == "required_status_checks":
246 checks = rule.get("parameters", {}).get("required_status_checks", [])
247 check_names = [c["context"] for c in checks]
248 rule_types.append(f"status_checks: {', '.join(check_names)}")
249 else:
250 rule_types.append(rule["type"])
251 table.add_row("Rules", "\n".join(rule_types) if rule_types else "none")
253 bypass = rs.get("bypass_actors", [])
254 bypass_desc = [f"{b.get('actor_type', '?')} ({b.get('bypass_mode', '?')})" for b in bypass]
255 table.add_row("Bypass", ", ".join(bypass_desc) if bypass_desc else "none")
257 print(Panel(table, title=f"[bold]{rs.get('name', 'Unnamed')}[/bold]", expand=False))
260@click.group("rulesets")
261def rulesets_command() -> None:
262 """View, apply, or delete branch rulesets on a GitHub repository."""
265@rulesets_command.command("view")
266@click.option("--verbose", "-v", is_flag=True, help="Print detailed output.")
267def view_cmd(verbose: bool) -> None:
268 """Show the current rulesets on the repository."""
269 configure_logging(verbose)
270 gh_repo, gh_account, _ = load_env_config()
271 if not gh_repo or not gh_account:
272 raise click.ClickException("GH_REPO and GH_ACCOUNT must be set in .env or environment.")
273 repo = get_github_repo(gh_account, gh_repo)
274 rulesets = get_rulesets(repo)
275 display_rulesets(rulesets)
278@rulesets_command.command("apply")
279@click.argument(
280 "spec_path",
281 type=click.Path(exists=True, dir_okay=False, path_type=Path),
282)
283@click.option(
284 "--dry-run", "-d", is_flag=True, help="Show what would be done without making changes."
285)
286@click.option("--verbose", "-v", is_flag=True, help="Print detailed output.")
287def apply_cmd(spec_path: Path, dry_run: bool, verbose: bool) -> None:
288 """Apply a caller-supplied ruleset JSON spec to the repository.
290 SPEC_PATH must point to a JSON file containing a single ruleset object
291 conforming to GitHub's ruleset schema. The spec's 'name' is the match key:
292 an existing repo-scope branch ruleset with the same name is replaced
293 surgically. Org-inherited and non-branch rulesets are never touched.
294 """
295 configure_logging(verbose)
297 try:
298 raw = spec_path.read_text()
299 except OSError as e:
300 raise click.ClickException(f"Cannot read spec file {spec_path}: {e}") from e
302 try:
303 spec = json.loads(raw)
304 except json.JSONDecodeError as e:
305 raise click.ClickException(f"Spec file {spec_path} is not valid JSON: {e}") from e
307 validate_ruleset_spec(spec)
309 gh_repo, gh_account, _ = load_env_config()
310 if not gh_repo or not gh_account:
311 raise click.ClickException("GH_REPO and GH_ACCOUNT must be set in .env or environment.")
312 repo = get_github_repo(gh_account, gh_repo)
314 try:
315 result = apply_ruleset_spec(repo, spec, dry_run=dry_run)
316 except click.ClickException:
317 raise
318 except Exception as e:
319 raise click.ClickException(f"Failed to apply ruleset spec: {e}") from e
321 if result is not None and verbose:
322 display_rulesets([result])
325@rulesets_command.command("delete")
326@click.argument("name")
327@click.option(
328 "--dry-run", "-d", is_flag=True, help="Show what would be done without making changes."
329)
330@click.option("--verbose", "-v", is_flag=True, help="Print detailed output.")
331def delete_cmd(name: str, dry_run: bool, verbose: bool) -> None:
332 """Delete a repo-scope branch ruleset by NAME.
334 Idempotent: if no ruleset with NAME exists, exits 0 with a
335 'nothing to delete' message -- safe to call unconditionally from
336 migration scripts. Refuses (non-zero exit) if a ruleset with NAME
337 exists but is org-inherited or non-branch-scope.
338 """
339 configure_logging(verbose)
341 gh_repo, gh_account, _ = load_env_config()
342 if not gh_repo or not gh_account:
343 raise click.ClickException("GH_REPO and GH_ACCOUNT must be set in .env or environment.")
344 repo = get_github_repo(gh_account, gh_repo)
346 match, blocked_reason = find_deletable_ruleset(repo, name)
348 if match is None and blocked_reason is None:
349 logger.info(f"Ruleset '{name}' not found; nothing to delete.")
350 print(f"[dim]Ruleset '{name}' not found. Nothing to delete.[/dim]")
351 return
353 if match is None:
354 raise click.ClickException(f"Refusing to delete: {blocked_reason}")
356 rs_id = match.get("id")
358 if dry_run:
359 logger.info(f"[DRY RUN] Would delete ruleset '{name}' (id={rs_id})")
360 print(f"[yellow][DRY RUN] Would delete ruleset '{name}' (id={rs_id}).[/yellow]")
361 return
363 try:
364 repo._requester.requestJsonAndCheck("DELETE", f"{repo.url}/rulesets/{rs_id}")
365 except Exception as e:
366 raise click.ClickException(f"Failed to delete ruleset '{name}': {e}") from e
368 logger.info(f"Deleted ruleset '{name}' (id={rs_id})")
369 print(f"[green]Deleted ruleset '{name}' (id={rs_id}).[/green]")