#!/usr/bin/env python3 """ analyze_ptp_runs.py (single-run mode) Analyze one experiment folder (e.g., 2025-09-12_10-37-20_e2e_ptp4l_300s_nodes(0-1-2-3-4-9-14-19-24) ) and write ALL results into a subfolder inside that experiment directory (default: "_analysis"). Input directory structure (required): /apu00/, /apu01/, ... Each per-node directory may contain: - ptp4l logs: ptp4l_gm.log / ptp4l_bc.log / ptp4l_slave.log - phc2sys logs: phc2sys_meas.log (and optionally phc2sys_bridge.log) - tslogs/eth0.csv (SARB receiver dump; or other *.csv with seq + hw_raw_sec/nsec) Outputs (default: /_analysis): - nodes_summary.csv (per-node metrics) - run_summary.csv (per-run aggregates: slope ns/hop, last-hop p95, lock time) - sarb_raw_nodes.csv (per-node SARB raw robust stats, if available) - sarb_clean_nodes.csv (per-node SARB cleaned robust stats, if available) - sarb_jitter_nodes.csv (per-node SARB inter-arrival stats, if available) - Plots (PNG) in the same folder Usage: python analyze_ptp_runs.py /path/to/EXPERIMENT_DIR [options] Options: --out-name _analysis Name of output subfolder created inside the exp dir --warmup-sec 60 Seconds to ignore from each ptp4l/phc2sys series start --sarb-skip 0 Number of initial SARB sequences to skip --clean-mad 6.0 Outlier removal: keep |residual| <= clean_mad * MAD per node --lock-threshold-ns 5000 Lock threshold for phc2sys offset (ns) --lock-consec 5 Samples in a row required below threshold """ import argparse import os import re import sys import json import math import shutil import zipfile import tempfile from pathlib import Path import numpy as np import pandas as pd # Plots import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt # ---------------------------- regex patterns ----------------------------- RE_BRACKET_T = re.compile(r'\[(\d+(?:\.\d+)?)\]') # [time] RE_PTP_COMPACT = re.compile(r'master offset\s+(-?\d+)\s+s\d\s+freq\s+([+-]?\d+)\s+path delay\s+(\d+)', re.I) RE_OFFSET_NS_ONLY = re.compile(r'offset\s+(-?\d+)\s*ns', re.I) RE_OFFSET_SECS_GUARD = re.compile(r'offset\s+(-?\d+(?:\.\d+)?)\s*s(?!\d)', re.I) RE_PATH_SECS = re.compile(r'path delay\s+(-?\d+(?:\.\d+)?)\s*s', re.I) RE_PHC_COMPACT = re.compile(r'offset\s+(-?\d+)\s+s\d\s+freq\s+([+-]?\d+)\s+delay\s+(\d+)', re.I) RE_PHC_NS = re.compile(r'offset\s+(-?\d+)\s*ns', re.I) RE_PHC_SEC = re.compile(r'offset\s+(-?\d+(?:\.\d+)?)\s*s', re.I) RE_NODES_IN_NAME = re.compile(r'nodes\(([\d\-]+)\)') # ---------------------------- I/O helpers -------------------------------- def ensure_dir(p: Path): p.mkdir(parents=True, exist_ok=True) return p # ---------------------------- parsing functions --------------------------- def parse_ptp4l_metrics(path: Path): rows = [] if not path.exists() or path.stat().st_size == 0: return pd.DataFrame(columns=["t","offset_ns","path_delay_ns","freq_ppb"]) with open(path, 'r', errors='ignore') as f: for line in f: mt = RE_BRACKET_T.search(line) if not mt: continue t = float(mt.group(1)) m = RE_PTP_COMPACT.search(line) if m: rows.append((t, float(m.group(1)), float(m.group(3)), float(m.group(2)))) continue m = RE_OFFSET_NS_ONLY.search(line) p = RE_PATH_SECS.search(line) if m or p: off = float(m.group(1)) if m else math.nan pdelay = float(p.group(1))*1e9 if p else math.nan rows.append((t, off, pdelay, math.nan)) continue m = RE_OFFSET_SECS_GUARD.search(line) if m: rows.append((t, float(m.group(1))*1e9, math.nan, math.nan)) if not rows: return pd.DataFrame(columns=["t","offset_ns","path_delay_ns","freq_ppb"]) df = pd.DataFrame(rows, columns=["t","offset_ns","path_delay_ns","freq_ppb"]).sort_values("t") df["offset_ns"] = df["offset_ns"].ffill() df["path_delay_ns"] = df["path_delay_ns"].ffill() return df def parse_phc2sys_offsets(path: Path): rows = [] if not path.exists() or path.stat().st_size == 0: return pd.DataFrame(columns=["t","offset_ns","delay_ns","freq_ppb"]) with open(path, 'r', errors='ignore') as f: for line in f: mt = RE_BRACKET_T.search(line) if not mt: continue t = float(mt.group(1)) m = RE_PHC_COMPACT.search(line) if m: rows.append((t, float(m.group(1)), float(m.group(3)), float(m.group(2)))) continue m = RE_PHC_NS.search(line) if m: rows.append((t, float(int(m.group(1))), math.nan, math.nan)) continue m = RE_PHC_SEC.search(line) if m: rows.append((t, float(m.group(1))*1e9, math.nan, math.nan)) if not rows: return pd.DataFrame(columns=["t","offset_ns","delay_ns","freq_ppb"]) return pd.DataFrame(rows, columns=["t","offset_ns","delay_ns","freq_ppb"]).sort_values("t") def robust_stats_abs(x): x = np.asarray(x, dtype=float) x = x[~np.isnan(x)] if x.size == 0: return dict(n=0, median=math.nan, MAD=math.nan, p95=math.nan, p99=math.nan, std=math.nan) med = float(np.median(x)) mad = float(np.median(np.abs(x - med))) p95 = float(np.percentile(np.abs(x), 95)) p99 = float(np.percentile(np.abs(x), 99)) std = float(np.std(x)) return dict(n=int(x.size), median=med, MAD=mad, p95=p95, p99=p99, std=std) def infer_role(node_dir: Path): gm = node_dir / "ptp4l_gm.log" bc = node_dir / "ptp4l_bc.log" sl = node_dir / "ptp4l_slave.log" if gm.exists() and gm.stat().st_size > 0: return "gm" if sl.exists() and (not bc.exists() or bc.stat().st_size == 0): return "slave" if bc.exists() and bc.stat().st_size > 0: return "middle" return "unknown" def pick_sarb_csv(tsdir: Path): if not tsdir.exists(): return None eth0 = tsdir / "eth0.csv" if eth0.exists(): return eth0 csvs = sorted(tsdir.glob("*.csv"), key=lambda p: p.stat().st_mtime, reverse=True) return csvs[0] if csvs else None def parse_sarb_csv(csv_path: Path): df = pd.read_csv(csv_path) if {"hw_raw_sec","hw_raw_nsec"}.issubset(df.columns): rx_ns = df["hw_raw_sec"].astype("int64")*1_000_000_000 + df["hw_raw_nsec"].astype("int64") elif {"hw_sys_sec","hw_sys_nsec"}.issubset(df.columns): rx_ns = df["hw_sys_sec"].astype("int64")*1_000_000_000 + df["hw_sys_nsec"].astype("int64") else: rx_ns = df["sw_sec"].astype("int64")*1_000_000_000 + df["sw_nsec"].astype("int64") return pd.DataFrame({"seq": df["seq"].astype(int), "rx_ns": rx_ns}) def lock_time(df, thresh_ns=5000.0, consecutive=5): if df is None or df.empty: return math.nan t0 = df["t"].min() x = (df["t"] - t0).to_numpy() y = np.abs(df["offset_ns"].to_numpy(dtype=float)) count = 0 for i, val in enumerate(y): if val <= thresh_ns: count += 1 if count >= consecutive: return float(x[i - consecutive + 1]) else: count = 0 return math.nan # ---------------------------- per-run analysis ---------------------------- def analyze_run(run_dir: Path, out_run: Path, warmup_sec: float, sarb_skip: int, clean_mad: float, lock_thresh_ns: float = 5000.0, lock_consec: int = 5): ensure_dir(out_run) m = RE_NODES_IN_NAME.search(run_dir.name) if m: order_ids = [int(x) for x in m.group(1).split('-')] order_nodes = [f"apu{n:02d}" for n in order_ids] else: order_nodes = [p.name for p in sorted(run_dir.glob("apu??"))] rows_nodes = [] ptp_series = {} phc_series = {} for node in order_nodes: ndir = run_dir / node if not ndir.exists(): continue role = infer_role(ndir) ptp_path = ndir / ("ptp4l_gm.log" if role=="gm" else ("ptp4l_slave.log" if role=="slave" else "ptp4l_bc.log")) phc_path = ndir / "phc2sys_meas.log" dfp = parse_ptp4l_metrics(ptp_path) dph = parse_phc2sys_offsets(phc_path) def after_warm(df): if df is None or df.empty: return df t0 = df["t"].min() return df[df["t"] >= t0 + warmup_sec] dfp_w = after_warm(dfp) dph_w = after_warm(dph) def absstats(df, col): if df is None or df.empty or col not in df: return dict(n=0, median=math.nan, MAD=math.nan, p95=math.nan, p99=math.nan, std=math.nan) return robust_stats_abs(np.abs(df[col].to_numpy(dtype=float))) ptp_stats = absstats(dfp_w, "offset_ns") phc_stats = absstats(dph_w, "offset_ns") pdelay_mean = float(dfp_w["path_delay_ns"].mean()) if ("path_delay_ns" in dfp_w and not dfp_w["path_delay_ns"].empty) else math.nan ptp_series[node] = dfp phc_series[node] = dph rows_nodes.append({ "run": run_dir.name, "node": node, "role": role, "ptp_abs_median_ns": ptp_stats["median"], "ptp_abs_MAD_ns": ptp_stats["MAD"], "ptp_abs_p95_ns": ptp_stats["p95"], "ptp_abs_p99_ns": ptp_stats["p99"], "ptp_samples": ptp_stats["n"], "ptp_path_delay_mean_ns": pdelay_mean, "phc_abs_median_ns": phc_stats["median"], "phc_abs_MAD_ns": phc_stats["MAD"], "phc_abs_p95_ns": phc_stats["p95"], "phc_abs_p99_ns": phc_stats["p99"], "phc_samples": phc_stats["n"], }) nodes_summary = pd.DataFrame(rows_nodes) # slope of phc p95 vs hop hop_idx = [] p95_vals = [] order_nodes_existing = [n for n in order_nodes if n in nodes_summary["node"].values] for i, n in enumerate(order_nodes_existing): v = float(nodes_summary.loc[nodes_summary["node"]==n, "phc_abs_p95_ns"].values[0]) if math.isfinite(v): hop_idx.append(i); p95_vals.append(v) slope_ns_per_hop = float(np.polyfit(hop_idx, p95_vals, 1)[0]) if len(hop_idx) >= 2 else math.nan last_node = order_nodes_existing[-1] if order_nodes_existing else None last_lock = math.nan if last_node and last_node in phc_series and phc_series[last_node] is not None: last_lock = lock_time(phc_series[last_node], thresh_ns=lock_thresh_ns, consecutive=lock_consec) last_phc_p95 = float(nodes_summary.loc[nodes_summary["node"]==last_node, "phc_abs_p95_ns"].values[0]) if last_node else math.nan run_summary = pd.DataFrame([{ "run": run_dir.name, "slope_phc_p95_ns_per_hop": slope_ns_per_hop, "last_node": last_node, "last_phc_p95_ns": last_phc_p95, "last_lock_s": last_lock }]) # ---------------- SARB ---------------- sarb_node = {} for node in order_nodes_existing: tsdir = (run_dir / node / "tslogs") csv = pick_sarb_csv(tsdir) if csv is None: continue try: sarb_node[node] = parse_sarb_csv(csv) except Exception: continue sarb_raw_nodes = pd.DataFrame() sarb_clean_nodes = pd.DataFrame() sarb_jitter_nodes = pd.DataFrame() if sarb_node: # common sequences common_seq = None for node, df in sarb_node.items(): s = set(df["seq"].tolist()) common_seq = s if common_seq is None else (common_seq & s) common = sorted(common_seq) if common_seq else [] if sarb_skip > 0 and common: common = common[sarb_skip:] if common: baseline = order_nodes_existing[0] idx_maps = {n: dict(zip(df["seq"].tolist(), df["rx_ns"].tolist())) for n, df in sarb_node.items()} # per-node residuals and jitter raw_rows = [] clean_rows = [] jit_rows = [] for n in order_nodes_existing: if n not in idx_maps: continue # residuals vs baseline vals = [] rx_series = [] for q in common: b = idx_maps.get(baseline, {}).get(q) r = idx_maps.get(n, {}).get(q) if b is None or r is None: continue vals.append(r - b) rx_series.append(r) if vals: vals = np.asarray(vals, dtype=float) med = float(np.median(vals)) centered = vals - med st = robust_stats_abs(centered) raw_rows.append({ "run": run_dir.name, "node": n, "samples": st["n"], "SARB_offset_centered_median_ns": st["median"], "SARB_offset_absMAD_ns": st["MAD"], "SARB_offset_absP95_ns": st["p95"], "SARB_offset_absP99_ns": st["p99"], "SARB_offset_centered_std_ns": st["std"], "baseline": baseline }) # cleaned mad = max(st["MAD"], 1.0) thr = clean_mad * mad kept = centered[np.abs(centered) <= thr] stc = robust_stats_abs(kept) clean_rows.append({ "run": run_dir.name, "node": n, "samples_kept": stc["n"], "fraction_kept": float(stc["n"]) / float(st["n"]) if st["n"]>0 else math.nan, "clean_absMAD_ns": stc["MAD"], "clean_absP95_ns": stc["p95"], "clean_absP99_ns": stc["p99"], "baseline": baseline }) # inter-arrival jitter if len(rx_series) >= 3: v = np.asarray(rx_series, dtype=float) d = np.diff(v) d_med = float(np.median(d)) jitter = np.abs(d - d_med) jit_rows.append({ "run": run_dir.name, "node": n, "inter_arrival_median_ns": d_med, "inter_arrival_absP95_ns": float(np.percentile(jitter, 95)), "inter_arrival_centered_std_ns": float(np.std(d - d_med)) }) sarb_raw_nodes = pd.DataFrame(raw_rows) sarb_clean_nodes = pd.DataFrame(clean_rows) sarb_jitter_nodes = pd.DataFrame(jit_rows) # write per-run CSVs nodes_summary.to_csv(out_run/"nodes_summary.csv", index=False) run_summary.to_csv(out_run/"run_summary.csv", index=False) if not sarb_raw_nodes.empty: sarb_raw_nodes.to_csv(out_run/"sarb_raw_nodes.csv", index=False) if not sarb_clean_nodes.empty: sarb_clean_nodes.to_csv(out_run/"sarb_clean_nodes.csv", index=False) if not sarb_jitter_nodes.empty: sarb_jitter_nodes.to_csv(out_run/"sarb_jitter_nodes.csv", index=False) # ---------------- Per-run plots ---------------- # Helper to save a simple bar def save_bar(xlabs, yvals, title, xlabel, ylabel, path): plt.figure() plt.title(title) plt.xlabel(xlabel) plt.ylabel(ylabel) plt.bar(xlabs, yvals) plt.tight_layout() plt.savefig(path) plt.close() # Helper to save a simple line def save_line(xlabs, yvals, title, xlabel, ylabel, path): plt.figure() plt.title(title) plt.xlabel(xlabel) plt.ylabel(ylabel) plt.plot(xlabs, yvals, marker='o') plt.tight_layout() plt.savefig(path) plt.close() # 1) phc2sys p95 vs hop try: labs = [] vals = [] for i, n in enumerate(order_nodes_existing): r = nodes_summary[nodes_summary["node"]==n] if r.empty: continue labs.append(n.replace("apu","")) vals.append(float(r["phc_abs_p95_ns"].values[0])) if labs: save_bar(labs, vals, "phc2sys |offset| p95 vs hop (post-warmup)", "Node (hop order)", "p95 (ns)", out_run/"phc_p95_vs_hop.png") except Exception: pass # 2) ptp4l median |offset| vs hop try: labs = [] vals = [] for i, n in enumerate(order_nodes_existing): r = nodes_summary[nodes_summary["node"]==n] if r.empty: continue labs.append(n.replace("apu","")) vals.append(float(r["ptp_abs_median_ns"].values[0])) if labs: save_line(labs, vals, "ptp4l median |offset| vs hop (post-warmup)", "Node (hop order)", "median (ns)", out_run/"ptp_median_vs_hop.png") except Exception: pass # 3) last node phc2sys offset vs time try: if last_node and last_node in phc_series and phc_series[last_node] is not None and not phc_series[last_node].empty: dph = phc_series[last_node] t0 = dph["t"].min() x = dph["t"] - t0 y = dph["offset_ns"] plt.figure() plt.title(f"{last_node} phc2sys offset vs time") plt.xlabel("t - t0 (s)") plt.ylabel("offset (ns)") plt.plot(x, y) plt.tight_layout() plt.savefig(out_run/"last_node_phc_offset_timeseries.png") plt.close() except Exception: pass # 4) SARB raw p95 vs hop try: if not sarb_raw_nodes.empty: labs = [] vals = [] for n in order_nodes_existing: row = sarb_raw_nodes[sarb_raw_nodes["node"]==n] if row.empty: continue labs.append(n.replace("apu","")) vals.append(float(row["SARB_offset_absP95_ns"].values[0])) if labs: save_bar(labs, vals, "SARB |offset| p95 vs hop (raw, centered)", "Node (hop order)", "p95 (ns)", out_run/"sarb_raw_p95_vs_hop.png") except Exception: pass # 5) SARB cleaned p95 vs hop try: if not sarb_clean_nodes.empty: labs = [] vals = [] for n in order_nodes_existing: row = sarb_clean_nodes[sarb_clean_nodes["node"]==n] if row.empty: continue labs.append(n.replace("apu","")) vals.append(float(row["clean_absP95_ns"].values[0])) if labs: save_line(labs, vals, "SARB |offset| p95 vs hop (cleaned, centered)", "Node (hop order)", "p95 (ns)", out_run/"sarb_clean_p95_vs_hop.png") except Exception: pass # 6) SARB residual histograms for top-3 nodes by cleaned p95 try: if not sarb_clean_nodes.empty: # Build residuals again for those nodes baseline = sarb_clean_nodes["baseline"].iloc[0] # Rebuild idx_maps quickly idx_maps = {n: dict(zip(df["seq"].tolist(), df["rx_ns"].tolist())) for n, df in sarb_node.items()} # determine common after skip common_seq = None for node, df in sarb_node.items(): s = set(df["seq"].tolist()) common_seq = s if common_seq is None else (common_seq & s) common = sorted(common_seq) if common_seq else [] if sarb_skip > 0 and common: common = common[sarb_skip:] top3 = sarb_clean_nodes.sort_values("clean_absP95_ns", ascending=False)["node"].head(3).tolist() for n in top3: if n not in idx_maps: continue vals = [] for q in common: b = idx_maps.get(baseline, {}).get(q) r = idx_maps.get(n, {}).get(q) if b is None or r is None: continue vals.append(r - b) if not vals: continue vals = np.asarray(vals, dtype=float) med = float(np.median(vals)) centered = vals - med plt.figure() plt.title(f"SARB residuals (centered) for {n}") plt.xlabel("offset vs baseline (ns)") plt.ylabel("count") plt.hist(centered, bins=60) plt.tight_layout() plt.savefig(out_run/f"sarb_residual_hist_{n}.png") plt.close() except Exception: pass # 7) SARB inter-arrival jitter p95 vs hop try: if not sarb_jitter_nodes.empty: labs = [] vals = [] for n in order_nodes_existing: row = sarb_jitter_nodes[sarb_jitter_nodes["node"]==n] if row.empty: continue labs.append(n.replace("apu","")) vals.append(float(row["inter_arrival_absP95_ns"].values[0])) if labs: save_line(labs, vals, "SARB inter-arrival |jitter| p95 vs hop", "Node (hop order)", "p95 (ns)", out_run/"sarb_interarrival_p95_vs_hop.png") except Exception: pass return nodes_summary, run_summary, sarb_raw_nodes, sarb_clean_nodes # ---------------------------- main --------------------------------------- def main(): ap = argparse.ArgumentParser(description="Analyze a single PTP+SARB experiment folder and write outputs inside it.") ap.add_argument("exp_dir", help="Experiment directory containing apu*/ subfolders") ap.add_argument("--out-name", default="_analysis", help="Name of output subfolder to create inside exp_dir (default: _analysis)") ap.add_argument("--warmup-sec", type=float, default=60.0, help="Warm-up seconds to drop from ptp4l/phc2sys.") ap.add_argument("--sarb-skip", type=int, default=0, help="Initial SARB sequences to skip.") ap.add_argument("--clean-mad", type=float, default=6.0, help="Outlier removal threshold (MAD multiplier).") ap.add_argument("--lock-threshold-ns", type=float, default=5000.0, help="Lock threshold for phc2sys offset (ns).") ap.add_argument("--lock-consec", type=int, default=5, help="Samples in a row required below threshold.") args = ap.parse_args() run_dir = Path(args.exp_dir).expanduser().resolve() if not run_dir.exists() or not run_dir.is_dir(): print(f"ERROR: {run_dir} is not a directory") sys.exit(2) # quick sanity: must contain at least two apu?? directories apus = [x for x in run_dir.iterdir() if x.is_dir() and re.match(r'^apu\d{2}$', x.name)] if len(apus) < 2: print(f"ERROR: {run_dir} does not look like an experiment dir (found {len(apus)} apu?? subdirs)") sys.exit(3) out_run = (run_dir / args.out_name) out_run.mkdir(parents=True, exist_ok=True) print(f"Analyzing: {run_dir.name}") nodes_summary, run_summary, sarb_raw_nodes, sarb_clean_nodes = analyze_run( run_dir, out_run, args.warmup_sec, args.sarb_skip, args.clean_mad, lock_thresh_ns=args.lock_threshold_ns, lock_consec=args.lock_consec ) print("Done.") print("Wrote:") print(f" {out_run/'nodes_summary.csv'}") print(f" {out_run/'run_summary.csv'}") if not sarb_raw_nodes.empty: print(f" {out_run/'sarb_raw_nodes.csv'}") if not sarb_clean_nodes.empty: print(f" {out_run/'sarb_clean_nodes.csv'}") # list some plots if present pngs = sorted(out_run.glob('*.png')) if pngs: for p in pngs: print(f" {p}") if __name__ == "__main__": main()