#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ run_experiment_sweep.py โ€” grid-driven PTP sweep with role_mode + Telegram PNG uploads Features - role_mode axis (BC, OC_L2) โ†’ passes --role-mode to runner and enforces E2E in OC modes - per-run analysis: calls analyzer on the single run folder; expects outputs in /_analysis (configurable) - Telegram notifications: text updates + sendDocument for all PNGs found in _analysis - Folder tagging: appends a readable parameter tag to each run folder - ETA countdown and best-effort failure handling Grid file (JSON) keys (example): { "sweep": { "duration": 300, "nodes": [0,1,2,3,4,9,14,19,24], "repeats": 1, "overhead_sec": 25, "out_root": "./logs", "analyze": true, "analyzer": "./analyze_ptp_runs.py", "sweep_name": "ptp_param_matrix", "pass_through": "", "defaults": { "role_mode": ["BC","OC_L2"], "transport": ["UDPv4"], "log_sync": [0,1,-3], "log_announce": [1], "log_delayreq": [-3] }, "profiles": [ { "name": "E2E_PI", "fixed": { "delay": "E2E", "servo": "pi" }, "sweep": { "kp": [0.7,0.9,0.5], "ki": [0.3,0.2] } } ], "analysis": { "warmup_sec": 60, "sarb_skip": 0, "clean_mad": 6.0, "lock_threshold_ns": 5000, "lock_consec": 5, "out_name": "_analysis" }, "notify": { "telegram": { "enabled": true, "token": "file:./telegram/tg_token", "chat_id": "file:./telegram/tg_chat_id", "quiet": true } } } } """ import argparse, json, itertools, os, shlex, subprocess, sys, time, uuid from pathlib import Path from datetime import datetime RUNNER = "./run_ptp_experiment_e2e.py" # adjust if needed # --------------------- utilities --------------------- def ts() -> str: return datetime.now().strftime("%Y-%m-%d_%H-%M-%S") def secs_hms(s: float) -> str: s = int(max(0, s)) h, r = divmod(s, 3600) m, s = divmod(r, 60) if h: return f"{h:d}h {m:02d}m {s:02d}s" if m: return f"{m:d}m {s:02d}s" return f"{s:d}s" def secs_dhm(s: float) -> str: s = int(max(0, s)) d, r = divmod(s, 86400) h, r = divmod(r, 3600) m, _ = divmod(r, 60) return f"{d:02d}:{h:02d}:{m:02d}" # days:hours:minutes def secs_dhm_verbose(s: float) -> str: s = int(max(0, s)) d, r = divmod(s, 86400) h, r = divmod(r, 3600) m, _ = divmod(r, 60) return f"{d:02d} Days {h:02d} Hours {m:02d} Minutes" def ensure_list(x): if x is None: return [] return x if isinstance(x, list) else [x] def read_secret(v: str) -> str: if isinstance(v, str) and v.startswith("file:"): p = v[len("file:"):] return Path(p).read_text(encoding="utf-8").strip() return str(v) def load_grid_file(p: Path) -> dict: return json.loads(p.read_text(encoding="utf-8")) def cartesian(dict_of_lists): keys = list(dict_of_lists.keys()) vals = [dict_of_lists[k] for k in keys] for combo in itertools.product(*vals): yield dict(zip(keys, combo)) def sanitize_logexp(exp: int) -> str: return f"m{abs(exp)}" if exp < 0 else f"{exp}" # ----------------- tagging & validation ----------------- def param_tag(profile_name: str, p: dict) -> str: parts = [] parts.append(p.get("role_mode","BC")) # role first parts.append(profile_name) parts.append(p["delay"]) parts.append(p["servo"]) parts.append(f"sync{sanitize_logexp(int(p['log_sync']))}") parts.append(f"ann{sanitize_logexp(int(p['log_announce']))}") if p["delay"] == "E2E": parts.append(f"dreq{sanitize_logexp(int(p['log_delayreq']))}") parts.append(p["transport"]) if p["servo"] == "pi": parts.append(f"kp{str(p['kp']).replace('.','p')}") parts.append(f"ki{str(p['ki']).replace('.','p')}") else: if "linreg_win" in p: parts.append(f"lwin{p['linreg_win']}") if "linreg_hist" in p: parts.append(f"lhist{p['linreg_hist']}") return "_".join(parts) def validate_and_fix(p: dict, msgs: list): if p.get("role_mode") not in ("BC","OC_L2"): raise ValueError(f"invalid role_mode={p.get('role_mode')} (use BC or OC_L2)") if p["role_mode"] != "BC" and p["delay"] != "E2E": msgs.append("forcing delay=E2E for OC mode") p["delay"] = "E2E" if p["role_mode"] == "OC_L2" and p["transport"] != "L2": msgs.append("OC_L2 works best with transport=L2") for key in ("log_sync","log_announce","log_delayreq"): if key in p: e = int(p[key]) if e < -7 or e > 4: raise ValueError(f"{key} out of range: {e}") # ----------------- build commands ----------------- def build_runner_cmd(sweep_cfg: dict, p: dict, nodes: list, duration: int, out_root: str, pass_through: str): cmd = [RUNNER, str(duration)] + [str(n) for n in nodes] cmd += ["--role-mode", p.get("role_mode","BC")] cmd += ["--delay", p["delay"], "--servo", p["servo"]] cmd += ["--transport", p["transport"]] cmd += ["--log-sync", str(p["log_sync"]), "--log-announce", str(p["log_announce"])] if p["delay"] == "E2E": cmd += ["--log-delayreq", str(p["log_delayreq"])] if p["servo"] == "pi": cmd += ["--kp", str(p["kp"]), "--ki", str(p["ki"])] else: if "linreg_win" in p: cmd += ["--linreg-window", str(p["linreg_win"])] # if you later expose more linreg flags in the runner, add them here cmd += ["--out", out_root] pt = (sweep_cfg.get("pass_through") or "").strip() if pass_through: pt = (pt + " " + pass_through).strip() if pt: cmd += shlex.split(pt) return cmd # ----------------- Telegram (IPv4-forced fallback, short timeouts) ----------------- import html as _html, socket as _sock, ssl as _ssl from urllib import request as _req, parse as _parse _TG_HOST = "api.telegram.org" def tg_cfg_or_none(grid: dict): tg = (((grid.get("sweep") or {}).get("notify") or {}).get("telegram") or {}) if not tg or not tg.get("enabled"): return None try: token = read_secret(tg["token"]) chat_id = read_secret(tg["chat_id"]) quiet = bool(tg.get("quiet", True)) force4 = bool(tg.get("force_ipv4", False)) if not token or not chat_id: return None return {"token": token, "chat_id": chat_id, "quiet": quiet, "force4": force4} except Exception: return None def _resolve_a(host: str): """Return list of IPv4 addresses for host (A records).""" try: infos = _sock.getaddrinfo(host, 443, _sock.AF_INET, _sock.SOCK_STREAM) seen, out = set(), [] for _, _, _, _, sa in infos: ip = sa[0] if ip not in seen: seen.add(ip); out.append(ip) return out except Exception: return [] def _https_post_via_ip(host: str, ip: str, path: str, body: bytes, content_type: str, timeout=6): """POST HTTPS to specific IPv4 with SNI Host header; return parsed JSON dict or {'ok':False,...}.""" s = _sock.socket(_sock.AF_INET, _sock.SOCK_STREAM) s.settimeout(timeout) ctx = _ssl.create_default_context() try: s.connect((ip, 443)) tls = ctx.wrap_socket(s, server_hostname=host) req = ( f"POST {path} HTTP/1.1\r\n" f"Host: {host}\r\n" f"Content-Type: {content_type}\r\n" f"Content-Length: {len(body)}\r\n" f"Connection: close\r\n\r\n" ).encode() + body tls.sendall(req) buf = bytearray() while True: chunk = tls.recv(4096) if not chunk: break buf.extend(chunk) # split headers/body parts = bytes(buf).split(b"\r\n\r\n", 1) js = parts[1] if len(parts) == 2 else bytes(buf) try: import json as _json return _json.loads(js.decode("utf-8", "ignore")) except Exception: return {"ok": False, "description": (js[:200].decode("utf-8", "ignore") + "...")} except Exception as e: return {"ok": False, "description": str(e)} finally: try: tls.close() except: pass try: s.close() except: pass def tg_send(tg, text: str, disable_notification=None): """Send text; use normal HTTPS, fallback to IPv4 direct if forced or on failure.""" if not tg: return payload = { "chat_id": tg["chat_id"], "text": _html.escape(text or "", quote=False), "parse_mode": "HTML", "disable_web_page_preview": "true", "disable_notification": "true" if (tg["quiet"] if disable_notification is None else disable_notification) else "false", } body = _parse.urlencode(payload).encode() url = f"https://{_TG_HOST}/bot{tg['token']}/sendMessage" def normal(): try: req = _req.Request(url, data=body, method="POST") with _req.urlopen(req, timeout=5) as r: import json as _json js = _json.loads(r.read().decode("utf-8","ignore")) if not js.get("ok", False): print(f"[telegram] sendMessage error: {js.get('description','?')}") return js.get("ok", False) except Exception: return False def via_ipv4(): a = _resolve_a(_TG_HOST) if not a: return False js = _https_post_via_ip(_TG_HOST, a[0], f"/bot{tg['token']}/sendMessage", body, "application/x-www-form-urlencoded", timeout=6) if not js.get("ok", False): print(f"[telegram] sendMessage IPv4 error: {js.get('description','?')}") return js.get("ok", False) if tg.get("force4") or not normal(): via_ipv4() def _multipart_form(fields: dict, files: dict): """Build multipart/form-data body.""" boundary = "----WebKitFormBoundary" + uuid.uuid4().hex crlf = b"\r\n" body = bytearray() def add_field(name, value): body.extend(b"--" + boundary.encode() + crlf) body.extend(f'Content-Disposition: form-data; name="{name}"'.encode() + crlf + crlf) body.extend((value if isinstance(value, bytes) else str(value).encode()) + crlf) def add_file(name, filename, content, ctype): body.extend(b"--" + boundary.encode() + crlf) disp = f'Content-Disposition: form-data; name="{name}"; filename="{filename}"' body.extend(disp.encode() + crlf) body.extend(f"Content-Type: {ctype}".encode() + crlf + crlf) body.extend(content + crlf) for k, v in fields.items(): add_field(k, v) for k, (fn, data, ctype) in files.items(): add_file(k, fn, data, ctype) body.extend(b"--" + boundary.encode() + b"--" + crlf) return bytes(body), f"multipart/form-data; boundary={boundary}" def tg_send_file(tg, path: Path, caption: str = "", disable_notification=True): """Send PNGs/documents; try normal sendDocument, then IPv4; fallback to sendPhoto if needed.""" if not tg: return try: data = path.read_bytes() except Exception as e: print(f"[telegram] cannot read {path}: {e}") return fields = { "chat_id": tg["chat_id"], "caption": _html.escape(caption or "", quote=False)[:1024], "parse_mode": "HTML", "disable_notification": "true" if (tg["quiet"] if disable_notification is None else disable_notification) else "false", } body_doc, ctype_doc = _multipart_form(fields, {"document": (path.name, data, "application/octet-stream")}) body_pho, ctype_pho = None, None if path.suffix.lower() in (".png",".jpg",".jpeg"): body_pho, ctype_pho = _multipart_form(fields, {"photo": (path.name, data, "image/png")}) url_doc = f"https://{_TG_HOST}/bot{tg['token']}/sendDocument" url_pho = f"https://{_TG_HOST}/bot{tg['token']}/sendPhoto" def normal_document(): try: req = _req.Request(url_doc, data=body_doc, headers={"Content-Type": ctype_doc}, method="POST") with _req.urlopen(req, timeout=10) as r: import json as _json js = _json.loads(r.read().decode("utf-8","ignore")) if not js.get("ok", False): print(f"[telegram] sendDocument error: {js.get('description','?')}") return js.get("ok", False) except Exception: return False def ipv4_document(): a = _resolve_a(_TG_HOST) if not a: return False js = _https_post_via_ip(_TG_HOST, a[0], f"/bot{tg['token']}/sendDocument", body_doc, ctype_doc, timeout=12) if not js.get("ok", False): print(f"[telegram] sendDocument IPv4 error: {js.get('description','?')}") return js.get("ok", False) def ipv4_photo(): if body_pho is None: return False a = _resolve_a(_TG_HOST) if not a: return False js = _https_post_via_ip(_TG_HOST, a[0], f"/bot{tg['token']}/sendPhoto", body_pho, ctype_pho, timeout=12) if not js.get("ok", False): print(f"[telegram] sendPhoto IPv4 error: {js.get('description','?')}") return js.get("ok", False) sent = False if tg.get("force4"): sent = ipv4_document() or ipv4_photo() else: sent = normal_document() if not sent: sent = ipv4_document() or ipv4_photo() # ----------------- Analyzer ----------------- def run_analyzer_if_any(grid: dict, exp_dir: Path): s = grid.get("sweep") or {} if not s.get("analyze", False): return None an = (s.get("analyzer") or "").strip() if not an: return None analysis = s.get("analysis") or {} cmd = [an, str(exp_dir)] if "warmup_sec" in analysis: cmd += ["--warmup-sec", str(analysis["warmup_sec"])] if "sarb_skip" in analysis: cmd += ["--sarb-skip", str(analysis["sarb_skip"])] if "clean_mad" in analysis: cmd += ["--clean-mad", str(analysis["clean_mad"])] if "lock_threshold_ns" in analysis: cmd += ["--lock-threshold-ns", str(analysis["lock_threshold_ns"])] if "lock_consec" in analysis: cmd += ["--lock-consec", str(analysis["lock_consec"])] if "out_name" in analysis: cmd += ["--out-name", str(analysis["out_name"])] print(f"[analyzer] {' '.join(shlex.quote(x) for x in cmd)}") try: subprocess.run(cmd, check=True) except subprocess.CalledProcessError as e: print(f"[WARN] analyzer exit {e.returncode} for {exp_dir}") return analysis.get("out_name", "_analysis") # ----------------- expansion ----------------- def expand_profiles(grid: dict): s = grid["sweep"] defaults = s.get("defaults", {}) default_axes = { "role_mode": ensure_list(defaults.get("role_mode", ["BC"])), "transport": ensure_list(defaults.get("transport", ["UDPv4"])), "log_sync": ensure_list(defaults.get("log_sync", [0])), "log_announce": ensure_list(defaults.get("log_announce", [1])), "log_delayreq": ensure_list(defaults.get("log_delayreq", [-3])), } profiles = s.get("profiles", []) all_runs = [] for prof in profiles: name = prof["name"] fixed = prof.get("fixed", {}) axes = dict(default_axes) if "role_mode" in fixed: axes["role_mode"] = ensure_list(fixed["role_mode"]) if "sweep" in prof: for k, v in prof["sweep"].items(): axes[k] = ensure_list(v) base = { "delay": fixed["delay"], "servo": fixed["servo"] } for a in cartesian(axes): p = dict(base); p.update(a) if p["servo"] == "pi": if "kp" not in p or "ki" not in p: continue msgs = [] validate_and_fix(p, msgs) tag = param_tag(name, p) all_runs.append((name, p, tag, msgs)) return all_runs # ----------------- main ----------------- def estimate_total_seconds(grid: dict, num_runs: int) -> int: s = grid["sweep"] dur = int(s.get("duration", 300)) reps = int(s.get("repeats", 1)) overhead = int(s.get("overhead_sec", 20)) return num_runs * reps * (dur + overhead) def main(): ap = argparse.ArgumentParser(description="Sweep PTP experiments over a grid (role_mode + Telegram PNG uploads).") ap.add_argument("--grid-file", default="grid.json") ap.add_argument("--dry-run", action="store_true") ap.add_argument("--pass-through", default="", help="extra flags appended to runner cmd") args = ap.parse_args() grid = load_grid_file(Path(args.grid_file)) s = grid["sweep"] nodes = s["nodes"] duration = int(s.get("duration", 300)) repeats = int(s.get("repeats", 1)) out_root = s.get("out_root", "./logs") sweep_name = s.get("sweep_name", "ptp_param_matrix") overhead_sec = int(s.get("overhead_sec", 20)) tg = tg_cfg_or_none(grid) combos = expand_profiles(grid) total_runs = len(combos) * repeats total_sec = estimate_total_seconds(grid, len(combos)) sweep_dir = Path(out_root) / f"sweep_{ts()}_{sweep_name}" sweep_dir.mkdir(parents=True, exist_ok=True) tg_send(tg, f"๐Ÿงช Sweep start\nRoot: {sweep_dir}\n" f"Combos: {len(combos)} Repeats: {repeats}\n" f"Per-run: {duration}s + {overhead_sec}s\n" f"Total runs: {total_runs} ETA: ~{secs_dhm_verbose(total_sec)}") run_idx = 0 t_start = time.time() for name, params, tag, msgs in combos: for r in range(repeats): run_idx += 1 elapsed = time.time() - t_start remaining = total_sec - elapsed eta_min = max(0, int(remaining//60)) warn = f" | WARN: {', '.join(msgs)}" if msgs else "" tg_send(tg, f"โ–ถ๏ธ [{run_idx}/{total_runs}] {tag}{warn}\nETA ~{secs_dhm_verbose(remaining)}", disable_notification=True) # tg_send(tg, f"โ–ถ๏ธ [{run_idx}/{total_runs}] {tag}{warn}\nETA ~{secs_dhm(remaining)}", disable_notification=True) # detect new run dir by snapshot before = set(p.name for p in sweep_dir.iterdir() if p.is_dir()) cmd = build_runner_cmd(s, params, nodes, duration, str(sweep_dir), args.pass_through) print(f"\n[{run_idx}/{total_runs}] RUN: {' '.join(shlex.quote(x) for x in cmd)}") if args.dry_run: time.sleep(0.05) continue try: subprocess.run(cmd, check=True) except subprocess.CalledProcessError as e: tg_send(tg, f"โŒ [{run_idx}] {tag} failed (exit {e.returncode})") continue # identify the created experiment folder and rename with tag time.sleep(1) after = set(p.name for p in sweep_dir.iterdir() if p.is_dir()) new_dirs = sorted(list(after - before)) exp_dir = None if new_dirs: exp_dir = sweep_dir / new_dirs[-1] dst = sweep_dir / f"{new_dirs[-1]}__{tag}" i = 1 while dst.exists(): i += 1 dst = sweep_dir / f"{new_dirs[-1]}__{tag}_{i}" try: exp_dir.rename(dst) exp_dir = dst except Exception: # Fallback: copy, then keep original import shutil shutil.copytree(exp_dir, dst, dirs_exist_ok=False) exp_dir = dst # Analyzer out_name = run_analyzer_if_any(grid, exp_dir) if exp_dir else None # Send PNGs if exp_dir and out_name: adir = exp_dir / out_name if adir.exists(): pngs = sorted(adir.rglob("*.png")) if pngs: tg_send(tg, f"๐Ÿ“ˆ [{run_idx}] {tag} {len(pngs)} figure(s) from {adir}", disable_notification=True) for p in pngs: tg_send_file(tg, p, caption=f"{tag} ยท {p.name}", disable_notification=True) else: tg_send(tg, f"โ„น๏ธ [{run_idx}] {tag}: no PNGs in {adir}", disable_notification=True) else: tg_send(tg, f"โš ๏ธ [{run_idx}] {tag}: analysis folder not found: {adir}") # ETA report run_elapsed = time.time() - t_start done = run_idx rem = max(0, total_sec - run_elapsed) # tg_send(tg, f"โœ… [{run_idx}/{total_runs}] {tag} done. Remaining ~{secs_dms(rem)}", disable_notification=True) tg_send(tg, f"โœ… [{run_idx}/{total_runs}] {tag} done. Remaining ~{secs_dhm_verbose(rem)}", disable_notification=True) time.sleep(overhead_sec) total_elapsed = time.time() - t_start tg_send(tg, f"๐Ÿ Sweep complete in {secs_hms(total_elapsed)}\nRoot: {sweep_dir}") print("All runs complete.") if __name__ == "__main__": main()