#!/usr/bin/env python3 """ run_experiment_sweep.py Batch sweep wrapper for run_ptp_experiment_e2e.py with: - Grid-file support (JSON/YAML) with multiple profiles - Explicit list-only handling for --log-sync (no ranges) - Per-run folder tagging and renaming - Live progress + ETA - Per-run analysis (if --analyze or sweep.analyze: true); analysis flags from sweep.analysis - Telegram notifications configured in grid file (sweep.notify.telegram) - Sends ALL PNGs found in run/ (default: _analysis) to Telegram Usage: ./run_experiment_sweep.py --grid-file grid.json """ import argparse, itertools, os, random, shlex, subprocess, sys, time, json from pathlib import Path from datetime import datetime from urllib import request as _ur, parse as _parse import uuid, mimetypes # ---------------- Telegram globals ---------------- TG_ON = False TG_TOKEN = None TG_CHAT = None TG_QUIET = False # ---------------- Utilities ---------------- def now_stamp(): return datetime.now().strftime("%Y-%m-%d_%H-%M-%S") def secs_hms(secs): secs = int(round(max(0, secs))) h = secs // 3600 m = (secs % 3600) // 60 s = secs % 60 return f"{h:02d}:{m:02d}:{s:02d}" def newest_dir(d: Path): cands = [p for p in d.iterdir() if p.is_dir()] if not cands: return None return max(cands, key=lambda p: p.stat().st_mtime) def parse_list_or_range(s, cast=float): if s is None: return [] if isinstance(s, (list, tuple)): return [cast(x) for x in s] s = str(s).strip() if s == "": return [] if ',' in s: return [cast(x.strip()) for x in s.split(',') if x.strip()!=''] if ':' in s: a,b,c = s.split(':') a = cast(a); b = cast(b); c = cast(c) out = [] i = 0 forward = c > 0 x = a while (x <= b + 1e-12) if forward else (x >= b - 1e-12): out.append(cast(x)) i += 1 x = a + i*c return out return [cast(s)] def parse_int_list_only(s): if s is None: return [] if isinstance(s, (list, tuple)): return [int(x) for x in s] s = str(s).strip() if s == "": return [] if ":" in s: raise ValueError("--log-sync must be a comma list of integers (no ranges). Example: --log-sync -3,0,1") return [int(x.strip()) for x in s.split(",") if x.strip() != ""] def validate_log_sync(values, lo=-7, hi=4): for v in values: if v < lo or v > hi: raise ValueError(f"--log-sync value {v} out of expected range [{lo}..{hi}] (Sync interval = 2^value seconds).") def parse_str_list(s): if s is None: return [] if isinstance(s, (list, tuple)): return [str(x) for x in s] return [x.strip() for x in str(s).split(',') if x.strip()!=''] def kv_tag(k, v): return f"{k}{str(v).replace('.', 'p').replace('-', 'm')}" # ---------------- Telegram client (stdlib only) ---------------- def _tg_escape_html(s: str) -> str: return (s.replace("&", "&").replace("<", "<").replace(">", ">")) def tg_send(text: str, disable_notification=False): """Send a Telegram text message if configured. Never raises.""" try: if not TG_ON: return url = f"https://api.telegram.org/bot{TG_TOKEN}/sendMessage" data = { "chat_id": TG_CHAT, "text": _tg_escape_html(text), "parse_mode": "HTML", "disable_notification": bool(disable_notification or TG_QUIET), } payload = _parse.urlencode(data).encode("utf-8") req = _ur.Request(url, data=payload, method="POST") req.add_header("Content-Type", "application/x-www-form-urlencoded") with _ur.urlopen(req, timeout=10) as resp: _ = resp.read() except Exception: pass def _multipart_form(fields: dict, files: list): """ fields: dict of text fields files: list of (fieldname, filename, content_bytes) returns (body_bytes, content_type_header) """ boundary = f"----PTPSWEEP{uuid.uuid4().hex}" crlf = b"\r\n" body = bytearray() for k, v in fields.items(): body.extend(b"--" + boundary.encode() + crlf) body.extend(f'Content-Disposition: form-data; name="{k}"'.encode() + crlf + crlf) body.extend(str(v).encode() + crlf) for field, fname, content in files: body.extend(b"--" + boundary.encode() + crlf) ctype = mimetypes.guess_type(fname)[0] or "application/octet-stream" body.extend(f'Content-Disposition: form-data; name="{field}"; filename="{fname}"'.encode() + crlf) body.extend(f"Content-Type: {ctype}".encode() + crlf + crlf) body.extend(content + crlf) body.extend(b"--" + boundary.encode() + b"--" + crlf) return bytes(body), f"multipart/form-data; boundary={boundary}" def tg_send_file(filepath: Path, caption: str = "", disable_notification=False): """Send a file (PNG, CSV, etc.) to Telegram using sendDocument.""" try: if not TG_ON: return url = f"https://api.telegram.org/bot{TG_TOKEN}/sendDocument" with open(filepath, "rb") as f: content = f.read() fields = { "chat_id": TG_CHAT, "caption": caption, "disable_notification": str(bool(disable_notification or TG_QUIET)).lower(), } files = [("document", filepath.name, content)] body, ctype = _multipart_form(fields, files) req = _ur.Request(url, data=body, method="POST") req.add_header("Content-Type", ctype) with _ur.urlopen(req, timeout=20) as resp: _ = resp.read() except Exception: pass def resolve_secret(spec: str | None): if not spec: return None s = str(spec) if s.startswith("env:"): return os.getenv(s[4:]) or None if s.startswith("file:"): p = s[5:] try: with open(p, "r") as f: return f.read().strip() except Exception: return None return s # ---------------- Grid building ---------------- def build_grid_from_cli(args): delays = parse_str_list(args.delay) or ["E2E"] servos = parse_str_list(args.servo) or ["pi"] transps = parse_str_list(args.transport) or ["UDPv4"] logsync = parse_int_list_only(args.log_sync) or [0] validate_log_sync(logsync) logann = parse_list_or_range(args.log_announce, cast=int) or [1] logdreq = parse_list_or_range(args.log_delayreq, cast=int) or [-3] kp_list = parse_list_or_range(args.kp, cast=float) or [0.7] ki_list = parse_list_or_range(args.ki, cast=float) or [0.3] lwin = parse_list_or_range(args.linreg_win, cast=int) or [64] lhist = parse_list_or_range(args.linreg_hist, cast=int) or [256] combos = [] for (delay, servo, transport, ls, la, ld) in itertools.product(delays, servos, transps, logsync, logann, logdreq): base = { "profile": None, "delay": delay, "servo": servo, "transport": transport, "log_sync": int(ls), "log_announce": int(la), "log_delayreq": int(ld), } if servo.lower() == "pi": for kp, ki in itertools.product(kp_list, ki_list): d = base.copy(); d["kp"] = float(kp); d["ki"] = float(ki) combos.append(d) else: for w, h in itertools.product(lwin, lhist): d = base.copy(); d["linreg_win"] = int(w); d["linreg_hist"] = int(h) combos.append(d) seen = set(); unique = [] for c in combos: key = tuple(sorted(c.items())) if key in seen: continue seen.add(key); unique.append(c) return unique def load_grid_file(path: Path): txt = path.read_text() if path.suffix.lower() in (".yml", ".yaml"): try: import yaml # type: ignore except Exception: print("YAML provided but PyYAML not installed. pip install pyyaml", file=sys.stderr) sys.exit(2) return yaml.safe_load(txt) return json.loads(txt) def expand_profile(defaults: dict, profile: dict): name = profile.get("name") or "profile" fixed = profile.get("fixed", {}) or {} sweep = profile.get("sweep", {}) or {} axes = {} for k, v in (defaults or {}).items(): if k == "log_sync": vals = parse_int_list_only(v); validate_log_sync(vals) axes[k] = vals else: cast = str if k in ("kp","ki"): cast = float elif k in ("log_announce","log_delayreq","linreg_win","linreg_hist"): cast = int axes[k] = parse_list_or_range(v, cast=cast) for k, v in fixed.items(): if k == "log_sync": vals = parse_int_list_only(v); validate_log_sync(vals) axes[k] = vals else: axes[k] = [v] for k, v in sweep.items(): if k == "log_sync": vals = parse_int_list_only(v); validate_log_sync(vals) axes[k] = vals else: cast = str if k in ("kp","ki"): cast = float elif k in ("log_announce","log_delayreq","linreg_win","linreg_hist"): cast = int axes[k] = parse_list_or_range(v, cast=cast) axes.setdefault("delay", ["E2E"]) axes.setdefault("servo", ["pi"]) axes.setdefault("transport", ["UDPv4"]) axes.setdefault("log_sync", [0]) axes.setdefault("log_announce", [1]) if "E2E" in axes["delay"]: axes.setdefault("log_delayreq", [-3]) keys = list(axes.keys()) vals = [axes[k] for k in keys] combos = [] for tup in itertools.product(*vals): d = dict(zip(keys, tup)) d["profile"] = name combos.append(d) return combos def build_grid_from_file(cfg: dict): s = cfg.get("sweep", {}) defaults = (s.get("defaults") or {}) profiles = s.get("profiles") or [] combos = [] for p in profiles: combos += expand_profile(defaults, p) seen = set(); unique = [] for c in combos: key = tuple(sorted(c.items())) if key in seen: continue seen.add(key); unique.append(c) top = { "duration": s.get("duration"), "nodes": s.get("nodes"), "repeats": s.get("repeats"), "overhead_sec": s.get("overhead_sec"), "out_root": s.get("out_root"), "analyze": s.get("analyze"), "analyzer": s.get("analyzer"), "pass_through": s.get("pass_through"), "sweep_name": s.get("sweep_name"), "notify": s.get("notify"), "analysis": s.get("analysis"), # <-- analysis block } return unique, top # ---------------- Runner plumbing ---------------- def param_tag(p): parts = [] if p.get("profile"): parts.append(p["profile"]) parts += [ p["delay"], p["servo"], kv_tag("sync", p["log_sync"]), kv_tag("ann", p["log_announce"]), ] if p["delay"].upper() == "E2E" and "log_delayreq" in p: parts.append(kv_tag("dreq", p["log_delayreq"])) parts.append(p["transport"]) if p["servo"].lower() == "pi": parts += [kv_tag("kp", p["kp"]), kv_tag("ki", p["ki"])] else: parts += [kv_tag("lwin", p["linreg_win"]), kv_tag("lhist", p["linreg_hist"])] return "_".join(parts) def build_runner_cmd(args, p, out_dir: Path): cmd = [args.runner, str(args.duration)] cmd += [str(n) for n in args.nodes] cmd += ["--delay", p["delay"]] cmd += ["--servo", p["servo"]] cmd += ["--log-sync", str(int(p["log_sync"]))] cmd += ["--log-announce", str(int(p["log_announce"]))] if p["delay"].upper() == "E2E" and "log_delayreq" in p: cmd += ["--log-delayreq", str(int(p["log_delayreq"]))] cmd += ["--transport", p["transport"]] if p["servo"].lower() == "pi": cmd += ["--kp", str(p["kp"]), "--ki", str(p["ki"])] else: cmd += ["--linreg-win", str(p["linreg_win"]), "--linreg-hist", str(p["linreg_hist"])] if args.out_root: cmd += ["--out", str(out_dir)] if args.pass_through: cmd += shlex.split(args.pass_through) return cmd def build_analyzer_args(analysis_opts: dict): a = analysis_opts or {} out = [] if "out_name" in a: out += ["--out-name", str(a["out_name"])] if "warmup_sec" in a: out += ["--warmup-sec", str(a["warmup_sec"])] if "sarb_skip" in a: out += ["--sarb-skip", str(a["sarb_skip"])] if "clean_mad" in a: out += ["--clean-mad", str(a["clean_mad"])] if "lock_threshold_ns" in a: out += ["--lock-threshold-ns", str(a["lock_threshold_ns"])] if "lock_consec" in a: out += ["--lock-consec", str(a["lock_consec"])] return out # ---------------- Main ---------------- def main(): global TG_ON, TG_TOKEN, TG_CHAT, TG_QUIET ap = argparse.ArgumentParser(description="PTP multi-parameter sweep (grid-file aware, list-only log-sync).") ap.add_argument("--runner", default="./run_ptp_experiment_e2e.py", help="Path to run_ptp_experiment_e2e.py") ap.add_argument("--duration", type=int, help="Seconds per run") ap.add_argument("--nodes", type=int, nargs="+", help="Node IDs in hop order") ap.add_argument("--out-root", default="./logs", help="Base folder where the runner writes runs") ap.add_argument("--sweep-name", default=None, help="Optional label appended to the sweep root") ap.add_argument("--delay", default="E2E", help="Comma-list (E2E,P2P)") ap.add_argument("--servo", default="pi", help="Comma-list (pi,linreg)") ap.add_argument("--transport", default="UDPv4", help="Comma-list (UDPv4,L2)") ap.add_argument("--kp", default=None, help="PI Kp values/range") ap.add_argument("--ki", default=None, help="PI Ki values/range") ap.add_argument("--linreg-win", default=None, help="LINREG window values/range (ints)") ap.add_argument("--linreg-hist", default=None, help="LINREG history values/range (ints)") ap.add_argument("--log-sync", default="0", help="Sync interval exponent (LIST ONLY; e.g., -3,0,1)") ap.add_argument("--log-announce", default="1", help="Announce interval exponent (list or range)") ap.add_argument("--log-delayreq", default="-3", help="Delay_Req exponent (list or range; E2E only)") ap.add_argument("--repeats", type=int, default=1, help="Repeat every combo N times") ap.add_argument("--overhead-sec", type=int, default=25, help="Estimated overhead per run (s)") ap.add_argument("--stop-on-fail", action="store_true", help="Abort sweep on first failing run") ap.add_argument("--shuffle", action="store_true", help="Shuffle run order") ap.add_argument("--dry-run", action="store_true", help="Print commands only") ap.add_argument("--pass-through", default="", help="Extra args to pass verbatim to the runner") ap.add_argument("--analyze", action="store_true", help="Analyze after EACH run (also enabled via sweep.analyze: true in grid file)") ap.add_argument("--analyzer", default="./analyze_ptp_runs.py", help="Path to analyzer script") ap.add_argument("--grid-file", help="YAML or JSON file describing the sweep") args = ap.parse_args() grid = None top = {} if args.grid_file: cfg = load_grid_file(Path(args.grid_file)) grid, top = build_grid_from_file(cfg) if top.get("duration") is not None: args.duration = int(top["duration"]) if top.get("nodes") is not None: args.nodes = [int(x) for x in top["nodes"]] if top.get("out_root"): args.out_root = str(top["out_root"]) if top.get("repeats") is not None: args.repeats = int(top["repeats"]) if top.get("overhead_sec") is not None: args.overhead_sec = int(top["overhead_sec"]) if top.get("analyze") is not None: args.analyze = bool(top["analyze"]) if top.get("analyzer"): args.analyzer = str(top["analyzer"]) if top.get("pass_through"): args.pass_through = str(top["pass_through"] or "") if top.get("sweep_name"): args.sweep_name = str(top["sweep_name"]) notify_cfg = (top.get("notify") or {}).get("telegram") if top.get("notify") else None if notify_cfg: TG_TOKEN = resolve_secret(notify_cfg.get("token")) TG_CHAT = resolve_secret(notify_cfg.get("chat_id")) TG_QUIET = bool(notify_cfg.get("quiet", False)) TG_ON = bool(notify_cfg.get("enabled", True) and TG_TOKEN and TG_CHAT) analysis_opts = top.get("analysis") if top.get("analysis") else {} else: analysis_opts = {} if grid is None: if args.duration is None or args.nodes is None: print("ERROR: --duration and --nodes are required without --grid-file", file=sys.stderr) sys.exit(2) grid = build_grid_from_cli(args) if args.duration is None or args.nodes is None: print("ERROR: duration/nodes not provided (grid-file or CLI).", file=sys.stderr) sys.exit(2) out_root = Path(args.out_root).expanduser().resolve() out_root.mkdir(parents=True, exist_ok=True) stamp = now_stamp() name = f"sweep_{stamp}" if args.sweep_name: name += f"_{args.sweep_name}" sweep_root = (out_root / name) sweep_root.mkdir(parents=True, exist_ok=True) if args.shuffle: random.shuffle(grid) total = len(grid) * max(1, args.repeats) if total == 0: print("No parameter combinations to run.") sys.exit(1) est_per = args.duration + args.overhead_sec est_total = est_per * total print(f"Runs to execute: {total} (combos={len(grid)} × repeats={args.repeats}); " f"~{secs_hms(est_total)} total (≈{secs_hms(est_per)} per run).") print(f"Sweep root: {sweep_root}") tg_send( f"đŸ§Ē Sweep start\n" f"Root: {sweep_root}\n" f"Combos: {len(grid)} Repeats: {args.repeats}\n" f"Per-run: {args.duration}s + {args.overhead_sec}s\n" f"Total runs: {total} ETA: ~{secs_hms(est_total)}" ) run_idx = 0 start_all = time.time() analyzer_flags = build_analyzer_args(analysis_opts) out_name = analysis_opts.get("out_name") if analysis_opts else None out_name = out_name or "_analysis" for rep in range(args.repeats): for combo in grid: run_idx += 1 tag = param_tag(combo) print(f"\n[{run_idx}/{total}] {tag}") out_dir = sweep_root cmd = build_runner_cmd(args, combo, out_dir) cmd_str = " ".join(shlex.quote(x) for x in cmd) print("CMD:", cmd_str) tg_send(f"â–ļī¸ [{run_idx}/{total}] {tag}\nCMD: {cmd_str}", disable_notification=True) if args.dry_run: continue t0 = time.time() try: before = set(p.name for p in out_dir.iterdir() if p.is_dir()) subprocess.run(cmd, check=True) except subprocess.CalledProcessError as e: msg = f"❌ [{run_idx}/{total}] {tag} failed (exit {e.returncode})" print(msg) tg_send(msg) if args.stop_on_fail: total_elapsed = time.time() - start_all tg_send(f"â›”ī¸ Sweep aborted after {secs_hms(total_elapsed)}") sys.exit(e.returncode) continue # Detect newly created run directory and rename with tag time.sleep(1) after = set(p.name for p in out_dir.iterdir() if p.is_dir()) new_dirs = sorted(list(after - before)) if not new_dirs: nd = newest_dir(out_dir) new_dir = nd.name if nd else None else: new_dir = new_dirs[-1] dst = None if new_dir is None: print("WARN: could not detect run output directory; skipping rename/label.") else: src = out_dir / new_dir dst_name = f"{new_dir}__{tag}" dst = out_dir / dst_name i = 1 while dst.exists(): i += 1 dst = out_dir / f"{dst_name}_{i}" try: src.rename(dst) except Exception: import shutil shutil.copytree(src, dst) elapsed = time.time() - t0 remaining = est_per*(total-run_idx) - max(0, elapsed-args.overhead_sec) print(f"Run time: {secs_hms(elapsed)} | ETA remaining: {secs_hms(remaining)}") tg_send( f"✅ [{run_idx}/{total}] {tag} done in {secs_hms(elapsed)}\n" f"→ {(dst if dst else sweep_root)}\n" f"ETA rem: {secs_hms(remaining)}", disable_notification=True ) # -------- Per-run analysis + send ALL PNGs -------- if args.analyze and dst: try: tg_send(f"📈 Analyzing {tag}â€Ļ", disable_notification=True) cmd_an = [args.analyzer, str(dst)] + analyzer_flags subprocess.run(cmd_an, check=True) analysis_dir = dst / out_name if analysis_dir.exists(): pngs = sorted(analysis_dir.rglob("*.png")) if not pngs: tg_send(f"â„šī¸ {tag}: no PNGs produced in {analysis_dir}", disable_notification=True) else: for p in pngs: tg_send_file(p, caption=f"{tag} ¡ {p.name}", disable_notification=True) else: tg_send(f"âš ī¸ {tag}: analysis folder not found: {analysis_dir}") except subprocess.CalledProcessError as e: tg_send(f"âš ī¸ Analyzer failed for {tag} (exit {e.returncode})") except Exception as e: tg_send(f"âš ī¸ Analyzer error for {tag}: {e}") total_elapsed = time.time() - start_all print(f"\nSweep complete in {secs_hms(total_elapsed)}. Output in: {sweep_root}") tg_send(f"🏁 Sweep complete in {secs_hms(total_elapsed)}\nRoot: {sweep_root}") if __name__ == "__main__": main()