#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Deep-dive SARB analysis from RAW timestamp logs for a single experiment folder. Example: ./deep_dive_sarb_run.py \ --exp-dir "/path/to/" \ --iface eth0 \ --baseline-node apu00 \ --time-domain hw_raw \ --burn-in-count 60 Time domains: --time-domain hw_raw -> (hw_raw_sec, hw_raw_nsec) --time-domain sw -> (sw_sec, sw_nsec) --time-domain sys -> (sys_sec, sys_nsec) --time-domain send -> (send_sec, send_nsec) Alternatively, use explicit: --sec-col --nsec-col or a single column: --ns-col Outputs in: /_deep_dive/ """ import argparse import json from pathlib import Path import sys from typing import Optional import pandas as pd import numpy as np import matplotlib matplotlib.use("Agg") # headless import matplotlib.pyplot as plt # ---------- Defaults ---------- NODE_TO_HOP_DEFAULT = { "apu00": 0, "apu01": 1, "apu02": 2, "apu03": 3, "apu04": 4, "apu09": 5, "apu14": 6, "apu19": 7, "apu24": 8, } TSLOG_REL = "tslogs" # subfolder under each node RES_COL = "residual_ns" DOMAIN_MAP = { "hw_raw": ("hw_raw_sec", "hw_raw_nsec"), "sw": ("sw_sec", "sw_nsec"), "sys": ("sys_sec", "sys_nsec"), "send": ("send_sec", "send_nsec"), } # ---------- Helpers ---------- def read_hop_map(json_path: Optional[Path]) -> dict: if json_path is None: return NODE_TO_HOP_DEFAULT try: return json.loads(json_path.read_text()) except Exception as e: print(f"[WARN] Failed to read hop map JSON ({json_path}): {e}. Using default.", file=sys.stderr) return NODE_TO_HOP_DEFAULT def robust_mad(series: pd.Series) -> float: med = series.median() return (series - med).abs().median() def _read_csv_any(path: Path, debug: bool=False) -> pd.DataFrame: # Try common delimiters; fallback to pandas auto for sep in [",", ";", "\t", None]: try: df = pd.read_csv(path, sep=sep, engine="python") df.columns = [c.strip() for c in df.columns] if debug: print(f"[DEBUG] {path}: columns -> {list(df.columns)}", file=sys.stderr) return df except Exception: continue df = pd.read_csv(path) df.columns = [c.strip() for c in df.columns] if debug: print(f"[DEBUG] {path}: columns -> {list(df.columns)}", file=sys.stderr) return df def _detect_rx_ns(df: pd.DataFrame, path: Path) -> pd.Series: """ Fallback auto-detect if user didn't specify columns. Looks for pairs (*_sec, *_nsec), single ns, or single seconds columns. """ cols = {c.lower(): c for c in df.columns} # lower->original # Candidate pairs (sec, nsec) pair_prefixes = [ ("rx_sec", "rx_nsec"), ("hw_sec", "hw_nsec"), ("ts_sec", "ts_nsec"), ("sec", "nsec"), ("rx_sec", "nsec"), ("sec", "rx_nsec"), ] for sec_key, nsec_key in pair_prefixes: if sec_key in cols and nsec_key in cols: a = pd.to_numeric(df[cols[sec_key]], errors="coerce").fillna(0).astype(np.int64) b = pd.to_numeric(df[cols[nsec_key]], errors="coerce").fillna(0).astype(np.int64) rx_ns = a * 1_000_000_000 + b print(f"[INFO] {path.name}: auto-detected ({cols[sec_key]},{cols[nsec_key]}) -> rx_ns", file=sys.stderr) return rx_ns # Single nanoseconds columns single_ns = ["rx_ns", "hw_ns", "ts_ns", "timestamp_ns", "ns", "rx_timestamp_ns"] for key in single_ns: if key in cols: rx_ns = pd.to_numeric(df[cols[key]], errors="coerce").astype("Int64").dropna().astype(np.int64) print(f"[INFO] {path.name}: auto-detected single ns col '{cols[key]}'", file=sys.stderr) return rx_ns # Single seconds columns -> ns single_s = ["rx_s", "ts_s", "time_s", "sec"] for key in single_s: if key in cols: vals = pd.to_numeric(df[cols[key]], errors="coerce").fillna(0) rx_ns = (vals * 1_000_000_000).astype(np.int64) print(f"[INFO] {path.name}: auto-detected seconds col '{cols[key]}' -> ns", file=sys.stderr) return rx_ns raise ValueError(f"No usable time columns found in {path}") def load_ts_csv(path: Path, ns_col: Optional[str], sec_col: Optional[str], nsec_col: Optional[str], debug: bool=False) -> pd.DataFrame: df = _read_csv_any(path, debug=debug) # find 'seq' column (case-insensitive) seq_col = None for c in df.columns: if c.lower() == "seq": seq_col = c break if seq_col is None: raise ValueError(f"Missing 'seq' column in {path}") # explicit overrides if ns_col is not None: if ns_col not in df.columns: raise ValueError(f"--ns-col '{ns_col}' not found in {path}") rx_ns = pd.to_numeric(df[ns_col], errors="coerce").astype("Int64").dropna().astype(np.int64) print(f"[INFO] {path.name}: using --ns-col '{ns_col}'", file=sys.stderr) elif sec_col is not None and nsec_col is not None: if sec_col not in df.columns or nsec_col not in df.columns: raise ValueError(f"--sec-col '{sec_col}' and/or --nsec-col '{nsec_col}' not found in {path}") a = pd.to_numeric(df[sec_col], errors="coerce").fillna(0).astype(np.int64) b = pd.to_numeric(df[nsec_col], errors="coerce").fillna(0).astype(np.int64) rx_ns = a * 1_000_000_000 + b print(f"[INFO] {path.name}: using --sec-col '{sec_col}' + --nsec-col '{nsec_col}'", file=sys.stderr) else: # auto-detect fallback rx_ns = _detect_rx_ns(df, path) out = pd.DataFrame({ "seq": pd.to_numeric(df[seq_col], errors="coerce").astype("Int64"), "rx_ns": pd.to_numeric(rx_ns, errors="coerce").astype("Int64"), }).dropna().astype({"seq": np.int64, "rx_ns": np.int64}) out = out.sort_values("seq").drop_duplicates(subset=["seq"], keep="first") return out[["seq", "rx_ns"]] def gather_node_ts(exp_dir: Path, iface: str, ns_col: Optional[str], sec_col: Optional[str], nsec_col: Optional[str], debug: bool=False) -> dict[str, pd.DataFrame]: node_dfs: dict[str, pd.DataFrame] = {} for node_dir in sorted([p for p in exp_dir.iterdir() if p.is_dir()]): ts_path = node_dir / TSLOG_REL / f"{iface}.csv" if not ts_path.exists(): continue try: node_dfs[node_dir.name] = load_ts_csv(ts_path, ns_col, sec_col, nsec_col, debug) except Exception as e: print(f"[WARN] Skipping {ts_path}: {e}", file=sys.stderr) if not node_dfs: raise FileNotFoundError(f"No '{TSLOG_REL}/{iface}.csv' files parsed under {exp_dir}") return node_dfs def summarize_by_hop(res_df: pd.DataFrame) -> pd.DataFrame: out_rows = [] for hop, g in res_df.groupby("hop"): total = len(g) if total == 0: continue kept = int(g["kept"].sum()) frac_kept = kept / total if total else 0.0 kept_vals = g.loc[g["kept"], RES_COL] if kept_vals.empty: row = {"hop": int(hop), "samples_total": int(total), "samples_kept": 0, "fraction_kept": 0.0, "p50_ns": np.nan, "p95_ns": np.nan, "p99_ns": np.nan, "mad_ns": np.nan} else: row = { "hop": int(hop), "samples_total": int(total), "samples_kept": kept, "fraction_kept": float(frac_kept), "p50_ns": float(np.percentile(kept_vals, 50)), "p95_ns": float(np.percentile(kept_vals, 95)), "p99_ns": float(np.percentile(kept_vals, 99)), "mad_ns": float(robust_mad(kept_vals)), } out_rows.append(row) return pd.DataFrame(out_rows).sort_values("hop") def plot_line(x, y, ylabel, title, out_path: Path): plt.figure(figsize=(7,5)) plt.plot(x, y, marker="o") plt.xlabel("Hop count") plt.ylabel(ylabel) plt.title(title) plt.grid(True) plt.tight_layout() plt.savefig(out_path, dpi=200, bbox_inches="tight") plt.close() def plot_box_by_hop(df: pd.DataFrame, value_col: str, title: str, out_path: Path): dplot = df[df["kept"] == True][["hop", value_col]].dropna().copy() if dplot.empty: return hops = sorted(dplot["hop"].unique()) data = [dplot[dplot["hop"] == h][value_col].values for h in hops] plt.figure(figsize=(9,5)) plt.boxplot(data, positions=range(1, len(hops)+1), showfliers=False) plt.xticks(range(1, len(hops)+1), hops) plt.xlabel("Hop count") plt.ylabel("ns") plt.title(title) plt.grid(True, axis="y", alpha=0.4) plt.tight_layout() plt.savefig(out_path, dpi=200, bbox_inches="tight") plt.close() # ---------- Main ---------- def main(): ap = argparse.ArgumentParser(description="Deep-dive SARB analysis from RAW timestamp logs (single experiment).") ap.add_argument("--exp-dir", type=Path, required=True, help="Path to the experiment folder to analyze.") ap.add_argument("--iface", type=str, default="eth0", help="Interface name for tslogs/.csv (default: eth0).") ap.add_argument("--baseline-node", type=str, default="apu00", help="Baseline node (default: apu00).") ap.add_argument("--hop-map-json", type=Path, default=None, help="Optional JSON mapping node->hop.") ap.add_argument("--mad-k", type=float, default=6.0, help="MAD filter factor (median ± K*MAD). Set 0 to disable.") # Burn-in ap.add_argument("--burn-in-count", type=int, default=0, help="Drop first N aligned packets per node.") ap.add_argument("--burn-in-seconds", type=float, default=0.0, help="Drop samples where baseline time since start < S seconds.") ap.add_argument("--burn-in-seq-min", type=int, default=None, help="Keep only packets with seq ≥ this value.") # Boxplots ap.add_argument("--abs-boxplot", action="store_true", help="Also create boxplot of ABS residuals by hop.") # Time-domain selection ap.add_argument("--time-domain", choices=list(DOMAIN_MAP.keys()), help="Shortcut for picking time columns") ap.add_argument("--ns-col", type=str, default=None, help="Name of a single nanosecond timestamp column to use.") ap.add_argument("--sec-col", type=str, default=None, help="Name of the seconds column (use with --nsec-col).") ap.add_argument("--nsec-col", type=str, default=None, help="Name of the nanoseconds column (use with --sec-col).") # Debug ap.add_argument("--debug-schema", action="store_true", help="Print headers detected for each tslogs file.") args = ap.parse_args() exp_dir = args.exp_dir.resolve() if not exp_dir.exists(): print(f"ERROR: exp-dir does not exist: {exp_dir}", file=sys.stderr) sys.exit(1) # Apply time-domain shortcut if sec/nsec not explicitly provided if args.time_domain and not (args.sec_col or args.nsec_col or args.ns_col): args.sec_col, args.nsec_col = DOMAIN_MAP[args.time_domain] out_dir = exp_dir / "_deep_dive" out_dir.mkdir(parents=True, exist_ok=True) hop_map = read_hop_map(args.hop_map_json) # Load per-node time series node_dfs = gather_node_ts( exp_dir=exp_dir, iface=args.iface, ns_col=args.ns_col, sec_col=args.sec_col, nsec_col=args.nsec_col, debug=args.debug_schema ) if args.baseline_node not in node_dfs: raise FileNotFoundError(f"Baseline node '{args.baseline_node}' has no {args.iface}.csv under {exp_dir}") base_df = node_dfs[args.baseline_node].rename(columns={"rx_ns": "rx_ns_base"}).copy() base_df = base_df.sort_values("seq").drop_duplicates(subset=["seq"], keep="first") base_start_ns = int(base_df["rx_ns_base"].iloc[0]) # Build aligned residuals per node rows = [] for node, df_node in node_dfs.items(): if node == args.baseline_node: continue hop = hop_map.get(node, None) if hop is None: continue df_node = df_node.sort_values("seq").drop_duplicates(subset=["seq"], keep="first") aligned = base_df.merge(df_node.rename(columns={"rx_ns": "rx_ns_node"}), on="seq", how="inner") # Burn-in trims if args.burn_in_seq_min is not None: aligned = aligned[aligned["seq"] >= args.burn_in_seq_min] if args.burn_in_seconds and args.burn_in_seconds > 0: dt_ns = int(args.burn_in_seconds * 1e9) aligned = aligned[(aligned["rx_ns_base"] - base_start_ns) >= dt_ns] if args.burn_in_count and args.burn_in_count > 0: aligned = aligned.sort_values("seq").iloc[args.burn_in_count:] if aligned.empty: continue aligned[RES_COL] = aligned["rx_ns_node"].astype(np.int64) - aligned["rx_ns_base"].astype(np.int64) sub = aligned[["seq", RES_COL]].copy() sub["node"] = node sub["hop"] = int(hop) rows.append(sub) if not rows: raise RuntimeError("No aligned residuals after trimming; check data and flags.") res_all = pd.concat(rows, ignore_index=True) res_all["kept"] = True # MAD-based outlier cleaning (per hop) if args.mad_k and args.mad_k > 0: kept_flags = [] for hop, g in res_all.groupby("hop"): med = g[RES_COL].median() mad = robust_mad(g[RES_COL]) if mad == 0: mask = (g[RES_COL] == med) else: lo = med - args.mad_k * mad hi = med + args.mad_k * mad mask = (g[RES_COL] >= lo) & (g[RES_COL] <= hi) kept_flags.append(pd.Series(mask.values, index=g.index)) kept_cat = pd.concat(kept_flags).sort_index() res_all.loc[kept_cat.index, "kept"] = kept_cat.values # Save long-form residuals (out_dir / "residuals_by_hop.csv").write_text(res_all.to_csv(index=False)) # Summary per hop summary_df = summarize_by_hop(res_all) summary_df.to_csv(out_dir / "per_hop_summary.csv", index=False) # Plots if not summary_df.empty: hops = summary_df["hop"].values if "p95_ns" in summary_df.columns: plot_line(hops, summary_df["p95_ns"].values, "Residual p95 (ns)", "p95 vs hop", out_dir / "p95_vs_hop.png") if "p50_ns" in summary_df.columns: plot_line(hops, summary_df["p50_ns"].values, "Residual median (ns)", "median vs hop", out_dir / "median_vs_hop.png") if "p99_ns" in summary_df.columns: plot_line(hops, summary_df["p99_ns"].values, "Residual p99 (ns)", "p99 vs hop", out_dir / "p99_vs_hop.png") # Boxplots plot_box_by_hop(res_all, RES_COL, "Signed residuals by hop (ns)", out_dir / "boxplot_residuals_by_hop.png") if args.abs_boxplot: ra = res_all.copy() ra["abs_residual_ns"] = ra[RES_COL].abs() plot_box_by_hop(ra, "abs_residual_ns", "ABS residuals by hop (ns)", out_dir / "boxplot_abs_residuals_by_hop.png") print("✓ Deep-dive from RAW complete.") print(f"Outputs in: {out_dir}") print("- residuals_by_hop.csv") print("- per_hop_summary.csv") print("- p95_vs_hop.png, median_vs_hop.png, p99_vs_hop.png (if computed)") print("- boxplot_residuals_by_hop.png", "(and boxplot_abs_residuals_by_hop.png if requested)") if __name__ == "__main__": main()