Coverage for src / gh_secrets_and_vars_async / rulesets.py: 95%

193 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-16 17:56 +0000

1"""Thin rulesets command: view or safely apply a caller-supplied spec. 

2 

3ai-gh holds no ruleset template knowledge. Callers (e.g. augint-shell skills) 

4generate a ruleset spec in-context and hand it to ``ai-gh rulesets apply``, 

5which finds the repo-scope branch ruleset with a matching name and replaces 

6only that one -- never touching org-inherited, repository-scope, or tag-scope 

7rulesets. 

8""" 

9 

10import json 

11from pathlib import Path 

12 

13import click 

14from loguru import logger 

15from rich import print 

16from rich.panel import Panel 

17from rich.table import Table 

18 

19from .common import ( 

20 configure_logging, 

21 get_github_repo, 

22 load_env_config, 

23) 

24 

25REQUIRED_SPEC_FIELDS = ("name", "target", "rules") 

26 

27 

28def get_rulesets(repo) -> list[dict]: 

29 """Fetch all rulesets for the repository with full details. 

30 

31 The list endpoint only returns summaries. We fetch each ruleset 

32 individually to get conditions, rules, and bypass actors. 

33 """ 

34 _headers, summaries = repo._requester.requestJsonAndCheck("GET", f"{repo.url}/rulesets") 

35 rulesets = [] 

36 for summary in summaries: 

37 _h, detail = repo._requester.requestJsonAndCheck( 

38 "GET", f"{repo.url}/rulesets/{summary['id']}" 

39 ) 

40 rulesets.append(dict(detail)) 

41 return rulesets 

42 

43 

44def validate_ruleset_spec(spec: object) -> None: 

45 """Fail fast on a malformed spec. Raises click.ClickException with a clear message.""" 

46 if not isinstance(spec, dict): 

47 raise click.ClickException("Ruleset spec must be a JSON object (dict).") 

48 

49 for field in REQUIRED_SPEC_FIELDS: 

50 if field not in spec: 

51 raise click.ClickException(f"Ruleset spec is missing required field: '{field}'.") 

52 

53 name = spec.get("name") 

54 if not isinstance(name, str) or not name.strip(): 

55 raise click.ClickException("Ruleset spec 'name' must be a non-empty string.") 

56 

57 target = spec.get("target") 

58 if target != "branch": 

59 raise click.ClickException( 

60 f"Ruleset spec 'target' must be 'branch' (got {target!r}). " 

61 "ai-gh only applies branch-scope rulesets." 

62 ) 

63 

64 rules = spec.get("rules") 

65 if not isinstance(rules, list): 

66 raise click.ClickException("Ruleset spec 'rules' must be a list.") 

67 

68 

69def _is_safe_to_mutate_ruleset(rs: dict) -> bool: 

70 """Shared safety filter used by every code path that deletes or replaces a ruleset. 

71 

72 Returns True only for repo-scope branch rulesets. This is the T0-1 fix: 

73 

74 - ``target == "branch"`` -- never touch repository or tag scope. 

75 - ``source_type == "Repository"`` -- never touch org-inherited rulesets. 

76 """ 

77 if rs.get("target") != "branch": 

78 return False 

79 source_type = rs.get("source_type") 

80 if source_type and source_type != "Repository": 

81 return False 

82 return True 

83 

84 

85def find_replaceable_ruleset(repo, name: str) -> dict | None: 

86 """Find the one repo-scope branch ruleset whose name matches.""" 

87 for rs in get_rulesets(repo): 

88 if not _is_safe_to_mutate_ruleset(rs): 

89 continue 

90 if rs.get("name") != name: 

91 continue 

92 return rs 

93 return None 

94 

95 

96def find_deletable_ruleset(repo, name: str) -> tuple[dict | None, str | None]: 

97 """Look up a ruleset by name for safe deletion. 

98 

99 Walks all rulesets on the repo and distinguishes three outcomes: 

100 

101 - ``(ruleset, None)`` -- a repo-scope branch ruleset with that name was 

102 found and is safe to delete. 

103 - ``(None, reason)`` -- a ruleset with that name exists but is not 

104 deletable (org-inherited, repository-target, or tag-target). The 

105 caller should exit with an error using ``reason`` as the message. 

106 - ``(None, None)`` -- no ruleset with that name exists at all. The 

107 caller should treat this as idempotent "nothing to delete". 

108 

109 If multiple rulesets share the name (unusual but possible), a 

110 deletable one is preferred over a blocked one. 

111 """ 

112 blocked_reason: str | None = None 

113 for rs in get_rulesets(repo): 

114 if rs.get("name") != name: 

115 continue 

116 if _is_safe_to_mutate_ruleset(rs): 

117 return rs, None 

118 # Same name, not safe to touch -- record why and keep looking 

119 target = rs.get("target") 

120 source_type = rs.get("source_type") 

121 if target != "branch": 

122 blocked_reason = ( 

123 f"ruleset '{name}' exists but is {target}-scope, not branch-scope; " 

124 "refusing to delete" 

125 ) 

126 else: 

127 blocked_reason = ( 

128 f"ruleset '{name}' exists but is org-inherited " 

129 f"(source_type={source_type}); refusing to delete" 

130 ) 

131 return None, blocked_reason 

132 

133 

134def _canonical_ruleset(rs: dict) -> dict: 

135 """Normalize a ruleset for structural comparison. 

136 

137 Strips id/source/source_type metadata and sorts rules and bypass_actors 

138 so API-returned ordering differences don't cause false drift. 

139 """ 

140 

141 def _rule_sort_key(rule: dict) -> str: 

142 return str(rule.get("type", "")) 

143 

144 def _actor_sort_key(actor: dict) -> tuple[str, int]: 

145 return (str(actor.get("actor_type", "")), int(actor.get("actor_id", 0) or 0)) 

146 

147 def _normalize_rule(rule: dict) -> dict: 

148 rtype = rule.get("type") 

149 if rtype == "required_status_checks": 

150 params = dict(rule.get("parameters", {})) 

151 checks = params.get("required_status_checks", []) 

152 params["required_status_checks"] = sorted( 

153 checks, key=lambda c: str(c.get("context", "")) 

154 ) 

155 return {"type": rtype, "parameters": params} 

156 return {"type": rtype, "parameters": rule.get("parameters", {})} 

157 

158 return { 

159 "name": rs.get("name"), 

160 "target": rs.get("target"), 

161 "enforcement": rs.get("enforcement"), 

162 "conditions": rs.get("conditions", {}), 

163 "rules": sorted((_normalize_rule(r) for r in rs.get("rules", [])), key=_rule_sort_key), 

164 "bypass_actors": sorted(rs.get("bypass_actors", []), key=_actor_sort_key), 

165 } 

166 

167 

168def rulesets_match(existing: dict, spec: dict) -> bool: 

169 """Return True if existing ruleset is already structurally equal to spec.""" 

170 return _canonical_ruleset(existing) == _canonical_ruleset(spec) 

171 

172 

173def create_ruleset(repo, spec: dict, dry_run: bool = False) -> dict | None: 

174 """POST a new ruleset from a spec dict. Strips repo-specific metadata first.""" 

175 payload = {k: v for k, v in spec.items() if k not in ("id", "source_type", "source")} 

176 if dry_run: 

177 logger.info(f"[DRY RUN] Would create ruleset: {payload.get('name', 'unknown')}") 

178 return payload 

179 _headers, data = repo._requester.requestJsonAndCheck( 

180 "POST", f"{repo.url}/rulesets", input=payload 

181 ) 

182 result: dict = data 

183 logger.info(f"Created ruleset: {result.get('name', 'unknown')} (id={result.get('id', '?')})") 

184 return result 

185 

186 

187def apply_ruleset_spec(repo, spec: dict, dry_run: bool = False) -> dict | None: 

188 """Apply one ruleset spec safely and idempotently. 

189 

190 - Validates the spec. 

191 - Finds a replaceable repo-scope branch ruleset with the same name. 

192 - If none found: create. 

193 - If found and already structurally equal: no-op. 

194 - If found but drifted: DELETE that one, then create the new version. 

195 

196 Never touches org-inherited, repository-scope, or tag-scope rulesets. 

197 """ 

198 validate_ruleset_spec(spec) 

199 name = spec["name"] 

200 

201 existing = find_replaceable_ruleset(repo, name) 

202 

203 if existing is not None and rulesets_match(existing, spec): 

204 logger.info(f"Ruleset '{name}' already up-to-date (id={existing.get('id', '?')}).") 

205 print(f"[green]Ruleset '{name}' is up-to-date. No changes.[/green]") 

206 return existing 

207 

208 if existing is not None: 

209 rs_id = existing.get("id") 

210 if dry_run: 

211 logger.info(f"[DRY RUN] Would delete drifted ruleset '{name}' (id={rs_id})") 

212 else: 

213 repo._requester.requestJsonAndCheck("DELETE", f"{repo.url}/rulesets/{rs_id}") 

214 logger.info(f"Deleted drifted ruleset '{name}' (id={rs_id})") 

215 

216 return create_ruleset(repo, spec, dry_run=dry_run) 

217 

218 

219def display_rulesets(rulesets: list[dict]) -> None: 

220 """Pretty-print rulesets using Rich. 

221 

222 Shows name, enforcement, target, source_type (so org-inherited vs 

223 repo-owned is visible), branch patterns, rules, and bypass actors. 

224 """ 

225 if not rulesets: 

226 print("[yellow]No rulesets configured for this repository.[/yellow]") 

227 return 

228 

229 for rs in rulesets: 

230 table = Table(show_header=False, box=None, padding=(0, 2)) 

231 table.add_column("Field", style="bold cyan") 

232 table.add_column("Value") 

233 

234 table.add_row("Enforcement", str(rs.get("enforcement", "unknown"))) 

235 table.add_row("Target", str(rs.get("target", "unknown"))) 

236 table.add_row("Source", str(rs.get("source_type", "Repository"))) 

237 

238 conditions = rs.get("conditions", {}).get("ref_name", {}) 

239 branches = ", ".join(conditions.get("include", [])) 

240 table.add_row("Branches", branches or "none") 

241 

242 rules = rs.get("rules", []) 

243 rule_types = [] 

244 for rule in rules: 

245 if rule["type"] == "required_status_checks": 

246 checks = rule.get("parameters", {}).get("required_status_checks", []) 

247 check_names = [c["context"] for c in checks] 

248 rule_types.append(f"status_checks: {', '.join(check_names)}") 

249 else: 

250 rule_types.append(rule["type"]) 

251 table.add_row("Rules", "\n".join(rule_types) if rule_types else "none") 

252 

253 bypass = rs.get("bypass_actors", []) 

254 bypass_desc = [f"{b.get('actor_type', '?')} ({b.get('bypass_mode', '?')})" for b in bypass] 

255 table.add_row("Bypass", ", ".join(bypass_desc) if bypass_desc else "none") 

256 

257 print(Panel(table, title=f"[bold]{rs.get('name', 'Unnamed')}[/bold]", expand=False)) 

258 

259 

260@click.group("rulesets") 

261def rulesets_command() -> None: 

262 """View, apply, or delete branch rulesets on a GitHub repository.""" 

263 

264 

265@rulesets_command.command("view") 

266@click.option("--verbose", "-v", is_flag=True, help="Print detailed output.") 

267def view_cmd(verbose: bool) -> None: 

268 """Show the current rulesets on the repository.""" 

269 configure_logging(verbose) 

270 gh_repo, gh_account, _ = load_env_config() 

271 if not gh_repo or not gh_account: 

272 raise click.ClickException("GH_REPO and GH_ACCOUNT must be set in .env or environment.") 

273 repo = get_github_repo(gh_account, gh_repo) 

274 rulesets = get_rulesets(repo) 

275 display_rulesets(rulesets) 

276 

277 

278@rulesets_command.command("apply") 

279@click.argument( 

280 "spec_path", 

281 type=click.Path(exists=True, dir_okay=False, path_type=Path), 

282) 

283@click.option( 

284 "--dry-run", "-d", is_flag=True, help="Show what would be done without making changes." 

285) 

286@click.option("--verbose", "-v", is_flag=True, help="Print detailed output.") 

287def apply_cmd(spec_path: Path, dry_run: bool, verbose: bool) -> None: 

288 """Apply a caller-supplied ruleset JSON spec to the repository. 

289 

290 SPEC_PATH must point to a JSON file containing a single ruleset object 

291 conforming to GitHub's ruleset schema. The spec's 'name' is the match key: 

292 an existing repo-scope branch ruleset with the same name is replaced 

293 surgically. Org-inherited and non-branch rulesets are never touched. 

294 """ 

295 configure_logging(verbose) 

296 

297 try: 

298 raw = spec_path.read_text() 

299 except OSError as e: 

300 raise click.ClickException(f"Cannot read spec file {spec_path}: {e}") from e 

301 

302 try: 

303 spec = json.loads(raw) 

304 except json.JSONDecodeError as e: 

305 raise click.ClickException(f"Spec file {spec_path} is not valid JSON: {e}") from e 

306 

307 validate_ruleset_spec(spec) 

308 

309 gh_repo, gh_account, _ = load_env_config() 

310 if not gh_repo or not gh_account: 

311 raise click.ClickException("GH_REPO and GH_ACCOUNT must be set in .env or environment.") 

312 repo = get_github_repo(gh_account, gh_repo) 

313 

314 try: 

315 result = apply_ruleset_spec(repo, spec, dry_run=dry_run) 

316 except click.ClickException: 

317 raise 

318 except Exception as e: 

319 raise click.ClickException(f"Failed to apply ruleset spec: {e}") from e 

320 

321 if result is not None and verbose: 

322 display_rulesets([result]) 

323 

324 

325@rulesets_command.command("delete") 

326@click.argument("name") 

327@click.option( 

328 "--dry-run", "-d", is_flag=True, help="Show what would be done without making changes." 

329) 

330@click.option("--verbose", "-v", is_flag=True, help="Print detailed output.") 

331def delete_cmd(name: str, dry_run: bool, verbose: bool) -> None: 

332 """Delete a repo-scope branch ruleset by NAME. 

333 

334 Idempotent: if no ruleset with NAME exists, exits 0 with a 

335 'nothing to delete' message -- safe to call unconditionally from 

336 migration scripts. Refuses (non-zero exit) if a ruleset with NAME 

337 exists but is org-inherited or non-branch-scope. 

338 """ 

339 configure_logging(verbose) 

340 

341 gh_repo, gh_account, _ = load_env_config() 

342 if not gh_repo or not gh_account: 

343 raise click.ClickException("GH_REPO and GH_ACCOUNT must be set in .env or environment.") 

344 repo = get_github_repo(gh_account, gh_repo) 

345 

346 match, blocked_reason = find_deletable_ruleset(repo, name) 

347 

348 if match is None and blocked_reason is None: 

349 logger.info(f"Ruleset '{name}' not found; nothing to delete.") 

350 print(f"[dim]Ruleset '{name}' not found. Nothing to delete.[/dim]") 

351 return 

352 

353 if match is None: 

354 raise click.ClickException(f"Refusing to delete: {blocked_reason}") 

355 

356 rs_id = match.get("id") 

357 

358 if dry_run: 

359 logger.info(f"[DRY RUN] Would delete ruleset '{name}' (id={rs_id})") 

360 print(f"[yellow][DRY RUN] Would delete ruleset '{name}' (id={rs_id}).[/yellow]") 

361 return 

362 

363 try: 

364 repo._requester.requestJsonAndCheck("DELETE", f"{repo.url}/rulesets/{rs_id}") 

365 except Exception as e: 

366 raise click.ClickException(f"Failed to delete ruleset '{name}': {e}") from e 

367 

368 logger.info(f"Deleted ruleset '{name}' (id={rs_id})") 

369 print(f"[green]Deleted ruleset '{name}' (id={rs_id}).[/green]")