#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ analyze_ptp_runs.py Analyze either: • a single experiment folder (contains apu?? subfolders), or • a sweep folder (contains many experiment folders). Writes ALL results into a subfolder inside each experiment directory (default: "_analysis"). Input directory structure (per experiment): /apu00/, /apu01/, ... Each per-node directory may contain: - ptp4l logs: ptp4l_gm.log / ptp4l_bc.log / ptp4l_slave.log - phc2sys logs: phc2sys_meas.log (and optionally phc2sys_bridge.log) - tslogs/eth0.csv (SARB receiver dump; or other *.csv with seq + hw_raw_sec/nsec) Outputs (in /_analysis): - nodes_summary.csv (per-node metrics) - run_summary.csv (per-run aggregates: slope ns/hop, last-hop p95, lock time) - sarb_raw_nodes.csv (per-node SARB raw robust stats, if available) - sarb_clean_nodes.csv (per-node SARB CLEANED robust stats, if available) * NEW: includes clean_absMED_ns (median |residual| after cleaning) - sarb_jitter_nodes.csv (per-node SARB inter-arrival stats, if available) - Plots (PNG) in the same folder Usage (single run): python analyze_ptp_runs.py /path/to/EXPERIMENT_DIR [options] Usage (sweep root — auto-detects runs inside and processes each): python analyze_ptp_runs.py /path/to/SWEEP_ROOT --exclude-dir unused [options] Options: --out-name _analysis Name of output subfolder created inside each exp dir --warmup-sec 60 Seconds to ignore from each ptp4l/phc2sys series start --sarb-skip 0 Number of initial SARB sequences to skip --clean-mad 6.0 Outlier removal: keep |residual| <= clean_mad * MAD per node --lock-threshold-ns 5000 Lock threshold for phc2sys offset (ns) --lock-consec 5 Samples in a row required below threshold --exclude-dir NAME (repeatable) subfolder names to skip when scanning a sweep """ import argparse import re import sys import math from pathlib import Path import numpy as np import pandas as pd # Plots import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt # ---------------------------- regex patterns ----------------------------- RE_BRACKET_T = re.compile(r'\[(\d+(?:\.\d+)?)\]') # [time] RE_PTP_COMPACT = re.compile(r'master offset\s+(-?\d+)\s+s\d\s+freq\s+([+-]?\d+)\s+path delay\s+(\d+)', re.I) RE_OFFSET_NS_ONLY = re.compile(r'offset\s+(-?\d+)\s*ns', re.I) RE_OFFSET_SECS_GUARD = re.compile(r'offset\s+(-?\d+(?:\.\d+)?)\s*s(?!\d)', re.I) RE_PATH_SECS = re.compile(r'path delay\s+(-?\d+(?:\.\d+)?)\s*s', re.I) RE_PHC_COMPACT = re.compile(r'offset\s+(-?\d+)\s+s\d\s+freq\s+([+-]?\d+)\s+delay\s+(\d+)', re.I) RE_PHC_NS = re.compile(r'offset\s+(-?\d+)\s*ns', re.I) RE_PHC_SEC = re.compile(r'offset\s+(-?\d+(?:\.\d+)?)\s*s', re.I) RE_NODES_IN_NAME = re.compile(r'nodes\(([\d\-]+)\)') # ---------------------------- I/O helpers -------------------------------- def ensure_dir(p: Path): p.mkdir(parents=True, exist_ok=True) return p # ---------------------------- parsing functions -------------------------- def parse_ptp4l_metrics(path: Path): rows = [] if not path.exists() or path.stat().st_size == 0: return pd.DataFrame(columns=["t","offset_ns","path_delay_ns","freq_ppb"]) with open(path, 'r', errors='ignore') as f: for line in f: mt = RE_BRACKET_T.search(line) if not mt: continue t = float(mt.group(1)) m = RE_PTP_COMPACT.search(line) if m: rows.append((t, float(m.group(1)), float(m.group(3)), float(m.group(2)))) continue m = RE_OFFSET_NS_ONLY.search(line) p = RE_PATH_SECS.search(line) if m or p: off = float(m.group(1)) if m else math.nan pdelay = float(p.group(1))*1e9 if p else math.nan rows.append((t, off, pdelay, math.nan)) continue m = RE_OFFSET_SECS_GUARD.search(line) if m: rows.append((t, float(m.group(1))*1e9, math.nan, math.nan)) if not rows: return pd.DataFrame(columns=["t","offset_ns","path_delay_ns","freq_ppb"]) df = pd.DataFrame(rows, columns=["t","offset_ns","path_delay_ns","freq_ppb"]).sort_values("t") df["offset_ns"] = df["offset_ns"].ffill() df["path_delay_ns"] = df["path_delay_ns"].ffill() return df def parse_phc2sys_offsets(path: Path): rows = [] if not path.exists() or path.stat().st_size == 0: return pd.DataFrame(columns=["t","offset_ns","delay_ns","freq_ppb"]) with open(path, 'r', errors='ignore') as f: for line in f: mt = RE_BRACKET_T.search(line) if not mt: continue t = float(mt.group(1)) m = RE_PHC_COMPACT.search(line) if m: rows.append((t, float(m.group(1)), float(m.group(3)), float(m.group(2)))) continue m = RE_PHC_NS.search(line) if m: rows.append((t, float(int(m.group(1))), math.nan, math.nan)) continue m = RE_PHC_SEC.search(line) if m: rows.append((t, float(m.group(1))*1e9, math.nan, math.nan)) if not rows: return pd.DataFrame(columns=["t","offset_ns","delay_ns","freq_ppb"]) return pd.DataFrame(rows, columns=["t","offset_ns","delay_ns","freq_ppb"]).sort_values("t") def robust_stats_abs(x): """ Given x (any real series), return: - median of x - median ABSOLUTE deviation (MAD) around median(x) - p95 and p99 of |x| - std(x) - NEW: median_abs = median(|x|) """ x = np.asarray(x, dtype=float) x = x[~np.isnan(x)] if x.size == 0: return dict(n=0, median=math.nan, MAD=math.nan, p95=math.nan, p99=math.nan, std=math.nan, median_abs=math.nan) med = float(np.median(x)) mad = float(np.median(np.abs(x - med))) p95 = float(np.percentile(np.abs(x), 95)) p99 = float(np.percentile(np.abs(x), 99)) std = float(np.std(x)) med_abs = float(np.median(np.abs(x))) return dict(n=int(x.size), median=med, MAD=mad, p95=p95, p99=p99, std=std, median_abs=med_abs) def infer_role(node_dir: Path): gm = node_dir / "ptp4l_gm.log" bc = node_dir / "ptp4l_bc.log" sl = node_dir / "ptp4l_slave.log" if gm.exists() and gm.stat().st_size > 0: return "gm" if sl.exists() and (not bc.exists() or bc.stat().st_size == 0): return "slave" if bc.exists() and bc.stat().st_size > 0: return "middle" return "unknown" def pick_sarb_csv(tsdir: Path): if not tsdir.exists(): return None eth0 = tsdir / "eth0.csv" if eth0.exists(): return eth0 csvs = sorted(tsdir.glob("*.csv"), key=lambda p: p.stat().st_mtime, reverse=True) return csvs[0] if csvs else None def parse_sarb_csv(csv_path: Path): df = pd.read_csv(csv_path) if {"hw_raw_sec","hw_raw_nsec"}.issubset(df.columns): rx_ns = df["hw_raw_sec"].astype("int64")*1_000_000_000 + df["hw_raw_nsec"].astype("int64") elif {"hw_sys_sec","hw_sys_nsec"}.issubset(df.columns): rx_ns = df["hw_sys_sec"].astype("int64")*1_000_000_000 + df["hw_sys_nsec"].astype("int64") elif {"sw_sec","sw_nsec"}.issubset(df.columns): rx_ns = df["sw_sec"].astype("int64")*1_000_000_000 + df["sw_nsec"].astype("int64") else: raise ValueError(f"Unsupported SARB CSV schema in {csv_path.name}") return pd.DataFrame({"seq": df["seq"].astype(int), "rx_ns": rx_ns}) def lock_time(df, thresh_ns=5000.0, consecutive=5): if df is None or df.empty: return math.nan t0 = df["t"].min() x = (df["t"] - t0).to_numpy() y = np.abs(df["offset_ns"].to_numpy(dtype=float)) count = 0 for i, val in enumerate(y): if val <= thresh_ns: count += 1 if count >= consecutive: return float(x[i - consecutive + 1]) else: count = 0 return math.nan # ---------------------------- per-run analysis --------------------------- def analyze_run(run_dir: Path, out_run: Path, warmup_sec: float, sarb_skip: int, clean_mad: float, lock_thresh_ns: float = 5000.0, lock_consec: int = 5): ensure_dir(out_run) m = RE_NODES_IN_NAME.search(run_dir.name) if m: order_ids = [int(x) for x in m.group(1).split('-')] order_nodes = [f"apu{n:02d}" for n in order_ids] else: order_nodes = [p.name for p in sorted(run_dir.glob("apu??"))] rows_nodes = [] ptp_series = {} phc_series = {} for node in order_nodes: ndir = run_dir / node if not ndir.exists(): continue role = infer_role(ndir) ptp_path = ndir / ("ptp4l_gm.log" if role=="gm" else ("ptp4l_slave.log" if role=="slave" else "ptp4l_bc.log")) phc_path = ndir / "phc2sys_meas.log" dfp = parse_ptp4l_metrics(ptp_path) dph = parse_phc2sys_offsets(phc_path) def after_warm(df): if df is None or df.empty: return df t0 = df["t"].min() return df[df["t"] >= t0 + warmup_sec] dfp_w = after_warm(dfp) dph_w = after_warm(dph) def absstats(df, col): if df is None or df.empty or col not in df: return dict(n=0, median=math.nan, MAD=math.nan, p95=math.nan, p99=math.nan, std=math.nan, median_abs=math.nan) return robust_stats_abs(np.abs(df[col].to_numpy(dtype=float))) ptp_stats = absstats(dfp_w, "offset_ns") phc_stats = absstats(dph_w, "offset_ns") pdelay_mean = float(dfp_w["path_delay_ns"].mean()) if ("path_delay_ns" in dfp_w and not dfp_w["path_delay_ns"].empty) else math.nan ptp_series[node] = dfp phc_series[node] = dph rows_nodes.append({ "run": run_dir.name, "node": node, "role": role, "ptp_abs_median_ns": ptp_stats["median"], "ptp_abs_MAD_ns": ptp_stats["MAD"], "ptp_abs_p95_ns": ptp_stats["p95"], "ptp_abs_p99_ns": ptp_stats["p99"], "ptp_samples": ptp_stats["n"], "ptp_path_delay_mean_ns": pdelay_mean, "phc_abs_median_ns": phc_stats["median"], "phc_abs_MAD_ns": phc_stats["MAD"], "phc_abs_p95_ns": phc_stats["p95"], "phc_abs_p99_ns": phc_stats["p99"], "phc_samples": phc_stats["n"], }) nodes_summary = pd.DataFrame(rows_nodes) # slope of phc p95 vs hop (approximate hop order by index in order_nodes) hop_idx, p95_vals = [], [] order_nodes_existing = [n for n in order_nodes if n in nodes_summary["node"].values] for i, n in enumerate(order_nodes_existing): v = float(nodes_summary.loc[nodes_summary["node"]==n, "phc_abs_p95_ns"].values[0]) if math.isfinite(v): hop_idx.append(i); p95_vals.append(v) slope_ns_per_hop = float(np.polyfit(hop_idx, p95_vals, 1)[0]) if len(hop_idx) >= 2 else math.nan last_node = order_nodes_existing[-1] if order_nodes_existing else None last_lock = math.nan if last_node and last_node in phc_series and phc_series[last_node] is not None: last_lock = lock_time(phc_series[last_node], thresh_ns=lock_thresh_ns, consecutive=lock_consec) last_phc_p95 = float(nodes_summary.loc[nodes_summary["node"]==last_node, "phc_abs_p95_ns"].values[0]) if last_node else math.nan run_summary = pd.DataFrame([{ "run": run_dir.name, "slope_phc_p95_ns_per_hop": slope_ns_per_hop, "last_node": last_node, "last_phc_p95_ns": last_phc_p95, "last_lock_s": last_lock }]) # ---------------- SARB ---------------- sarb_node = {} for node in order_nodes_existing: tsdir = (run_dir / node / "tslogs") csv = pick_sarb_csv(tsdir) if csv is None: continue try: sarb_node[node] = parse_sarb_csv(csv) except Exception: continue sarb_raw_nodes = pd.DataFrame() sarb_clean_nodes = pd.DataFrame() sarb_jitter_nodes = pd.DataFrame() if sarb_node: # common sequences common_seq = None for node, df in sarb_node.items(): s = set(df["seq"].tolist()) common_seq = s if common_seq is None else (common_seq & s) common = sorted(common_seq) if common_seq else [] if sarb_skip > 0 and common: common = common[sarb_skip:] if common: baseline = order_nodes_existing[0] idx_maps = {n: dict(zip(df["seq"].tolist(), df["rx_ns"].tolist())) for n, df in sarb_node.items()} # per-node residuals and jitter raw_rows = [] clean_rows = [] jit_rows = [] for n in order_nodes_existing: if n not in idx_maps: continue # residuals vs baseline (signed), then center -> “centered” vals = [] rx_series = [] for q in common: b = idx_maps.get(baseline, {}).get(q) r = idx_maps.get(n, {}).get(q) if b is None or r is None: continue vals.append(r - b) rx_series.append(r) if vals: vals = np.asarray(vals, dtype=float) med = float(np.median(vals)) centered = vals - med # RAW robust stats on centered residuals st = robust_stats_abs(centered) raw_rows.append({ "run": run_dir.name, "node": n, "samples": st["n"], "SARB_offset_centered_median_ns": st["median"], "SARB_offset_absMED_ns": st["median_abs"], # NEW: median(|centered|) "SARB_offset_absMAD_ns": st["MAD"], "SARB_offset_absP95_ns": st["p95"], "SARB_offset_absP99_ns": st["p99"], "SARB_offset_centered_std_ns": st["std"], "baseline": baseline }) # CLEANING by MAD (on centered residuals) mad = max(st["MAD"], 1.0) thr = clean_mad * mad kept = centered[np.abs(centered) <= thr] stc = robust_stats_abs(kept) clean_rows.append({ "run": run_dir.name, "node": n, "samples_kept": stc["n"], "fraction_kept": float(stc["n"]) / float(st["n"]) if st["n"]>0 else math.nan, "clean_absMED_ns": stc["median_abs"], # NEW: median(|kept|) "clean_absMAD_ns": stc["MAD"], "clean_absP95_ns": stc["p95"], "clean_absP99_ns": stc["p99"], "baseline": baseline }) # inter-arrival jitter if len(rx_series) >= 3: v = np.asarray(rx_series, dtype=float) d = np.diff(v) d_med = float(np.median(d)) jitter = np.abs(d - d_med) jit_rows.append({ "run": run_dir.name, "node": n, "inter_arrival_median_ns": d_med, "inter_arrival_absP95_ns": float(np.percentile(jitter, 95)), "inter_arrival_centered_std_ns": float(np.std(d - d_med)) }) sarb_raw_nodes = pd.DataFrame(raw_rows) sarb_clean_nodes = pd.DataFrame(clean_rows) sarb_jitter_nodes = pd.DataFrame(jit_rows) # write per-run CSVs nodes_summary.to_csv(out_run/"nodes_summary.csv", index=False) run_summary.to_csv(out_run/"run_summary.csv", index=False) if not sarb_raw_nodes.empty: sarb_raw_nodes.to_csv(out_run/"sarb_raw_nodes.csv", index=False) if not sarb_clean_nodes.empty: sarb_clean_nodes.to_csv(out_run/"sarb_clean_nodes.csv", index=False) if not sarb_jitter_nodes.empty: sarb_jitter_nodes.to_csv(out_run/"sarb_jitter_nodes.csv", index=False) # ---------------- Per-run plots ---------------- def save_bar(xlabs, yvals, title, xlabel, ylabel, path): plt.figure() plt.title(title) plt.xlabel(xlabel) plt.ylabel(ylabel) plt.bar(xlabs, yvals) plt.tight_layout() plt.savefig(path) plt.close() def save_line(xlabs, yvals, title, xlabel, ylabel, path): plt.figure() plt.title(title) plt.xlabel(xlabel) plt.ylabel(ylabel) plt.plot(xlabs, yvals, marker='o') plt.tight_layout() plt.savefig(path) plt.close() # 1) phc2sys p95 vs hop try: labs, vals = [], [] for i, n in enumerate(order_nodes_existing): r = nodes_summary[nodes_summary["node"]==n] if r.empty: continue labs.append(n.replace("apu","")) vals.append(float(r["phc_abs_p95_ns"].values[0])) if labs: save_bar(labs, vals, "phc2sys |offset| p95 vs hop (post-warmup)", "Node (hop order)", "p95 (ns)", out_run/"phc_p95_vs_hop.png") except Exception: pass # 2) ptp4l median |offset| vs hop try: labs, vals = [], [] for i, n in enumerate(order_nodes_existing): r = nodes_summary[nodes_summary["node"]==n] if r.empty: continue labs.append(n.replace("apu","")) vals.append(float(r["ptp_abs_median_ns"].values[0])) if labs: save_line(labs, vals, "ptp4l median |offset| vs hop (post-warmup)", "Node (hop order)", "median (ns)", out_run/"ptp_median_vs_hop.png") except Exception: pass # 3) last node phc2sys offset vs time try: if last_node and last_node in phc_series and phc_series[last_node] is not None and not phc_series[last_node].empty: dph = phc_series[last_node] t0 = dph["t"].min() x = dph["t"] - t0 y = dph["offset_ns"] plt.figure() plt.title(f"{last_node} phc2sys offset vs time") plt.xlabel("t - t0 (s)") plt.ylabel("offset (ns)") plt.plot(x, y) plt.tight_layout() plt.savefig(out_run/"last_node_phc_offset_timeseries.png") plt.close() except Exception: pass # 4) SARB raw p95 vs hop try: if not sarb_raw_nodes.empty: labs, vals = [], [] for n in order_nodes_existing: row = sarb_raw_nodes[sarb_raw_nodes["node"]==n] if row.empty: continue labs.append(n.replace("apu","")) vals.append(float(row["SARB_offset_absP95_ns"].values[0])) if labs: save_bar(labs, vals, "SARB |offset| p95 vs hop (raw, centered)", "Node (hop order)", "p95 (ns)", out_run/"sarb_raw_p95_vs_hop.png") except Exception: pass # 5) SARB cleaned p95 vs hop try: if not sarb_clean_nodes.empty: labs, vals = [], [] for n in order_nodes_existing: row = sarb_clean_nodes[sarb_clean_nodes["node"]==n] if row.empty: continue labs.append(n.replace("apu","")) vals.append(float(row["clean_absP95_ns"].values[0])) if labs: save_line(labs, vals, "SARB |offset| p95 vs hop (cleaned, centered)", "Node (hop order)", "p95 (ns)", out_run/"sarb_clean_p95_vs_hop.png") except Exception: pass # 6) SARB residual histograms (centered) try: if not sarb_clean_nodes.empty: baseline = sarb_clean_nodes["baseline"].iloc[0] idx_maps = {n: dict(zip(df["seq"].tolist(), df["rx_ns"].tolist())) for n, df in sarb_node.items()} # determine common after skip common_seq = None for node, df in sarb_node.items(): s = set(df["seq"].tolist()) common_seq = s if common_seq is None else (common_seq & s) common = sorted(common_seq) if common_seq else [] if sarb_skip > 0 and common: common = common[sarb_skip:] top3 = sarb_clean_nodes.sort_values("clean_absP95_ns", ascending=False)["node"].head(3).tolist() for n in top3: if n not in idx_maps: continue vals = [] for q in common: b = idx_maps.get(baseline, {}).get(q) r = idx_maps.get(n, {}).get(q) if b is None or r is None: continue vals.append(r - b) if not vals: continue vals = np.asarray(vals, dtype=float) med = float(np.median(vals)) centered = vals - med plt.figure() plt.title(f"SARB residuals (centered) for {n}") plt.xlabel("offset vs baseline (ns)") plt.ylabel("count") plt.hist(centered, bins=60) plt.tight_layout() plt.savefig(out_run/f"sarb_residual_hist_{n}.png") plt.close() except Exception: pass # 7) SARB inter-arrival jitter p95 vs hop try: if not sarb_jitter_nodes.empty: labs, vals = [], [] for n in order_nodes_existing: row = sarb_jitter_nodes[sarb_jitter_nodes["node"]==n] if row.empty: continue labs.append(n.replace("apu","")) vals.append(float(row["inter_arrival_absP95_ns"].values[0])) if labs: save_line(labs, vals, "SARB inter-arrival |jitter| p95 vs hop", "Node (hop order)", "p95 (ns)", out_run/"sarb_interarrival_p95_vs_hop.png") except Exception: pass return nodes_summary, run_summary, sarb_raw_nodes, sarb_clean_nodes # ---------------------------- sweep / main ------------------------------- def looks_like_experiment_dir(path: Path) -> bool: if not path.is_dir(): return False apus = [x for x in path.iterdir() if x.is_dir() and re.match(r'^apu\d{2}$', x.name)] return len(apus) >= 2 def iter_experiments_under(root: Path, exclude_dirs: list[str]): excl = {e.lower() for e in (exclude_dirs or [])} for child in sorted(root.iterdir()): if child.is_dir(): if child.name.lower() in excl: continue if looks_like_experiment_dir(child): yield child def main(): ap = argparse.ArgumentParser(description="Analyze a PTP+SARB experiment folder (or a sweep root) and write outputs inside each experiment.") ap.add_argument("exp_or_sweep_dir", help="Experiment directory OR sweep root (containing many experiments)") ap.add_argument("--out-name", default="_analysis", help="Name of output subfolder to create inside each exp dir (default: _analysis)") ap.add_argument("--warmup-sec", type=float, default=60.0, help="Warm-up seconds to drop from ptp4l/phc2sys.") ap.add_argument("--sarb-skip", type=int, default=0, help="Initial SARB sequences to skip.") ap.add_argument("--clean-mad", type=float, default=6.0, help="Outlier removal threshold (MAD multiplier).") ap.add_argument("--lock-threshold-ns", type=float, default=5000.0, help="Lock threshold for phc2sys offset (ns).") ap.add_argument("--lock-consec", type=int, default=5, help="Samples in a row required below threshold.") ap.add_argument("--exclude-dir", action="append", default=[], help="Folder names to exclude when scanning a sweep (repeatable).") args = ap.parse_args() root = Path(args.exp_or_sweep_dir).expanduser().resolve() if not root.exists() or not root.is_dir(): print(f"ERROR: {root} is not a directory") sys.exit(2) # Single experiment vs sweep exp_dirs = [] if looks_like_experiment_dir(root): exp_dirs = [root] else: exp_dirs = list(iter_experiments_under(root, args.exclude_dir)) if not exp_dirs: print(f"ERROR: {root} does not look like an experiment dir or contain experiment subdirs") sys.exit(3) for exp in exp_dirs: out_run = (exp / args.out_name) out_run.mkdir(parents=True, exist_ok=True) print(f"\nAnalyzing: {exp.name}") analyze_run( exp, out_run, args.warmup_sec, args.sarb_skip, args.clean_mad, lock_thresh_ns=args.lock_threshold_ns, lock_consec=args.lock_consec ) print("\nDone.") if __name__ == "__main__": main()