#!/usr/bin/env python3 """ run_ptp_experiment_e2e.py — JBOD Boundary Clock (single ptp4l per BC node) Straight-line Ethernet chain (no loopback). All nodes are already placed in ONE L3 subnet by your init script (static /32 routes hop-by-hop). This runner embeds ptp4l configs and lets you change parameters at runtime. Roles - First node (nodes[0]): Grandmaster (GM) on eth2 - Middle nodes: Single ptp4l with ports eth1+eth2 and boundary_clock_jbod=1 PHC bridge: phc2sys -s eth1 -c eth2 Measurement tie: phc2sys -s eth1 -c eth0 - Last node (nodes[-1]): Slave (OC) on eth1 (measurement tie: eth1 -> eth0) Transport - UDPv4 (default) or L2. For UDPv4 + /32 interfaces, we add multicast routes on eth1/eth2. Behavior - Does NOT delete logs at teardown (only pre-run cleanup) - Uses bash -lc + nullglob for safe remote globs (no zsh errors) - Log collection is tolerant: missing files do not cause failure """ import argparse import datetime import os import subprocess import sys import time PTP_REMOTE_CONFIG_PATH = "/opt/ptp_conf" PTP4L_BIN = "ptp4l" PHC2SYS_BIN = "phc2sys" RECEIVER_PATH = "/opt/timestamping/receiver_hwts_logger" BROADCASTER_PATH = "/home/apu/testbed_files/experiments/reference_broadcast/broadcaster" # Remote log paths GM_LOG = "/tmp/ptp4l_gm.log" BC_LOG = "/tmp/ptp4l_bc.log" SLAVE_LOG = "/tmp/ptp4l_slave.log" PHC2SYS_MEAS_LOG = "/tmp/phc2sys_meas.log" # measurement NIC tie PHC2SYS_BRIDGE_LOG = "/tmp/phc2sys_bridge.log" # eth1 -> eth2 bridge on middles # -------------------------- basic helpers ----------------------------------- def host(node_id: int) -> str: return f"apu{node_id:02d}" def sh(cmd, check=True, capture=False): if capture: return subprocess.run(cmd, check=check, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) return subprocess.run(cmd, check=check) def rsh(hostname, cmd, check=True, capture=False): # Always use SSH; cmd should be a single remote shell string return sh(["ssh", hostname, cmd], check=check, capture=capture) def scp_from(hostname, remote_path, local_dir): os.makedirs(local_dir, exist_ok=True) try: sh(["scp", "-q", f"{hostname}:{remote_path}", local_dir], check=True) except subprocess.CalledProcessError: pass def now_stamp(): return datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") # ---------------------- inline ptp4l config builders ------------------------ def build_global_section(args, role): """ Build a [global] ptp4l section string based on CLI args and role. role ∈ {"gm","bc","slave"}. """ lines = ["[global]"] # Transport & basic behavior lines.append("twoStepFlag 1") lines.append(f"network_transport {args.transport}") lines.append(f"domainNumber {args.domain}") lines.append(f"timeSource {args.time_source}") lines.append(f"delay_mechanism {args.delay}") # Clock identity/priorities per role if role == "gm": lines += [ "masterOnly 1", f"clockClass {args.gm_clock_class}", f"clockAccuracy {args.gm_clock_accuracy}", f"offsetScaledLogVariance {args.gm_oslv}", f"priority1 {args.gm_prio1}", f"priority2 {args.gm_prio2}", ] elif role == "bc": # Single process, multi-port, JBOD lines += [ f"clockClass {args.bc_clock_class}", f"clockAccuracy {args.bc_clock_accuracy}", f"offsetScaledLogVariance {args.bc_oslv}", f"priority1 {args.bc_prio1}", f"priority2 {args.bc_prio2}", ] if args.jbod: lines.append("boundary_clock_jbod 1") elif role == "slave": lines += [ "clientOnly 1", f"clockClass {args.sl_clock_class}", f"clockAccuracy {args.sl_clock_accuracy}", f"offsetScaledLogVariance {args.sl_oslv}", f"priority1 {args.sl_prio1}", f"priority2 {args.sl_prio2}", f"announceReceiptTimeout {args.announce_timeout}", ] else: raise ValueError("unknown role") # Message intervals lines += [ f"logSyncInterval {args.log_sync}", f"logAnnounceInterval {args.log_announce}", f"logMinDelayReqInterval {args.log_delayreq}", f"tx_timestamp_timeout {args.tx_ts_timeout}", ] # Servo selection if args.servo.lower() == "pi": lines.append("clock_servo pi") lines.append(f"pi_proportional_const {args.kp}") lines.append(f"pi_integral_const {args.ki}") else: lines.append("clock_servo linreg") if args.linreg_window is not None: lines.append(f"linreg_filter_length {args.linreg_window}") if args.linreg_sanity_sigma is not None: lines.append(f"linreg_sanity_sigma {args.linreg_sanity_sigma}") # Steps / clamps if args.step_threshold is not None: lines.append(f"step_threshold {args.step_threshold}") if args.max_freq is not None: lines.append(f"max_frequency {args.max_freq}") return "\n".join(lines) + "\n" def write_remote_conf(h, name, text): rsh(h, f"sudo mkdir -p {PTP_REMOTE_CONFIG_PATH}", check=False) payload = text.replace("'", "'\"'\"'") rsh(h, f"bash -lc 'cat > {PTP_REMOTE_CONFIG_PATH}/{name} <<\"CONF\"\n{payload}CONF\n'", check=True) print(f"[{h}] wrote {name}") # ---------------------------- UDPv4 multicast prep -------------------------- def prep_udpv4_multicast(nodes): """Bring PTP ports up and add IPv4 multicast routes (safe on /32 topologies).""" for nid in nodes: h = host(nid) for ifc in ("eth1", "eth2"): # Skip if the interface doesn't exist exists = rsh(h, f'[ -d "/sys/class/net/{ifc}" ] && echo yes || echo no', capture=True, check=False).stdout.strip() if exists != "yes": continue # Bring link UP (IFF_UP), relax rp_filter rsh(h, f"sudo ip link set {ifc} up", check=False) rsh(h, f"sudo sysctl -q -w net.ipv4.conf.{ifc}.rp_filter=0", check=False) # Install/refresh PTP multicast route; ignore errors rsh(h, f"sudo ip route replace 224.0.0.0/4 dev {ifc}", check=False) # ---------------------------- process control ------------------------------- def stop_procs(nodes): """Stop daemons but KEEP logs (used at teardown).""" print("Stopping daemons (keeping logs)...") for nid in nodes: h = host(nid) for cmd in [ "pkill -f ptp4l || true", "pkill -f phc2sys || true", "pkill -f receiver_hwts_logger || true", "sudo timedatectl set-ntp false || true", "sudo systemctl disable --now systemd-timesyncd 2>/dev/null || true", "sync", ]: rsh(h, cmd, check=False) def cleanup_old(nodes): """Pre-run: delete old logs on the nodes to avoid mixing with new run outputs.""" print("Removing old logs (pre-run only)...") for nid in nodes: h = host(nid) for cmd in [ r"bash -lc 'shopt -s nullglob; rm -rf /tmp/tslog_* 2>/dev/null || true'", r"bash -lc 'rm -f /tmp/ptp4l_*.log /tmp/phc2sys*.log 2>/dev/null || true'", "sync", ]: rsh(h, cmd, check=False) def start_receivers(nodes): for nid in nodes: h = host(nid) cmd = f"sudo taskset -c 1 chrt -f 90 {RECEIVER_PATH} eth0 >/dev/null 2>&1 &" print(f"[{h}] receiver on eth0") rsh(h, cmd, check=False) def wait_receivers(nodes, expected=1, timeout=20): for nid in nodes: h = host(nid) ok = False for _ in range(timeout): res = rsh(h, "pgrep -fa receiver_hwts_logger | wc -l", capture=True, check=False) try: cnt = int(res.stdout.strip()) except ValueError: cnt = 0 if cnt >= expected: ok = True break time.sleep(1) if not ok: detail = rsh(h, "pgrep -fa receiver_hwts_logger || true", capture=True, check=False).stdout raise RuntimeError(f"receiver not up on {h} (found {cnt})\n{detail}") def start_broadcaster(rate_hz=1000, iface="eth0"): cmd = f"sudo taskset -c 1 chrt -f 90 {BROADCASTER_PATH} {iface} {rate_hz} >/dev/null 2>&1 & echo $!" out = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, text=True) try: pid = int(out.stdout.strip()) except ValueError: pid = -1 print(f"[controller] broadcaster PID {pid} on {iface}@{rate_hz}Hz") return pid def is_ptp_running(h): return rsh(h, "pgrep -x ptp4l >/dev/null", check=False).returncode == 0 def wait_ptp(nodes, retries=15, delay=2): print("Waiting for ptp4l on all nodes...") for _ in range(retries): missing = [host(n) for n in nodes if not is_ptp_running(host(n))] if not missing: print("ptp4l running everywhere.") return True print("Not yet on:", ", ".join(missing)) time.sleep(delay) return False # ---------------------------- role start ------------------------------------ def start_first_gm(args, nid): h = host(nid) gm_conf = build_global_section(args, "gm") write_remote_conf(h, "ptp4l_gm.conf", gm_conf) # GM on eth2 cmd_gm = f"{PTP4L_BIN} -i eth2 -m -H -f {PTP_REMOTE_CONFIG_PATH}/ptp4l_gm.conf > {GM_LOG} 2>&1 &" print(f"[{h}] {cmd_gm}") rsh(h, cmd_gm, check=True) time.sleep(1) if not is_ptp_running(h): log = rsh(h, f"tail -n +1 {GM_LOG} 2>/dev/null || true", capture=True, check=False).stdout raise RuntimeError(f"ptp4l (GM) did not stay up on {h}.\n---- {GM_LOG} ----\n{log}") # Tie measurement NIC to GM port clock cmd_pxc = f"{PHC2SYS_BIN} -s eth2 -c eth0 -O 0 -m -w --step_threshold=1 > {PHC2SYS_MEAS_LOG} 2>&1 &" print(f"[{h}] {cmd_pxc}") rsh(h, cmd_pxc, check=True) def start_last_slave(args, nid): h = host(nid) sl_conf = build_global_section(args, "slave") write_remote_conf(h, "ptp4l_slave.conf", sl_conf) # Slave on eth1 cmd_sl = f"{PTP4L_BIN} -i eth1 -m -H -f {PTP_REMOTE_CONFIG_PATH}/ptp4l_slave.conf > {SLAVE_LOG} 2>&1 &" print(f"[{h}] {cmd_sl}") rsh(h, cmd_sl, check=True) time.sleep(1) if not is_ptp_running(h): log = rsh(h, f"tail -n +1 {SLAVE_LOG} 2>/dev/null || true", capture=True, check=False).stdout raise RuntimeError(f"ptp4l (Slave last) did not stay up on {h}.\n---- {SLAVE_LOG} ----\n{log}") # Tie measurement NIC to slave port clock cmd_pxc = f"{PHC2SYS_BIN} -s eth1 -c eth0 -O 0 -m -w --step_threshold=1 > {PHC2SYS_MEAS_LOG} 2>&1 &" print(f"[{h}] {cmd_pxc}") rsh(h, cmd_pxc, check=True) def start_middle_jbod(args, nid): """Middle node: ptp4l (eth1+eth2) with boundary_clock_jbod, bridge eth1->eth2, tie eth1->eth0.""" h = host(nid) bc_conf = build_global_section(args, "bc") write_remote_conf(h, "ptp4l_bc.conf", bc_conf) # Single ptp4l with both ports cmd_bc = f"{PTP4L_BIN} -i eth1 -i eth2 -m -H -f {PTP_REMOTE_CONFIG_PATH}/ptp4l_bc.conf > {BC_LOG} 2>&1 &" print(f"[{h}] {cmd_bc}") rsh(h, cmd_bc, check=True) time.sleep(1) if not is_ptp_running(h): log = rsh(h, f"tail -n +1 {BC_LOG} 2>/dev/null || true", capture=True, check=False).stdout raise RuntimeError(f"ptp4l (BC JBOD) did not stay up on {h}.\n---- {BC_LOG} ----\n{log}") # Bridge PHCs: make eth2 follow eth1 cmd_bridge = f"{PHC2SYS_BIN} -s eth1 -c eth2 -O 0 -m -w --step_threshold=1 > {PHC2SYS_BRIDGE_LOG} 2>&1 &" print(f"[{h}] {cmd_bridge}") rsh(h, cmd_bridge, check=True) # Measurement tie: make eth0 follow eth1 cmd_meas = f"{PHC2SYS_BIN} -s eth1 -c eth0 -O 0 -m -w --step_threshold=1 > {PHC2SYS_MEAS_LOG} 2>&1 &" print(f"[{h}] {cmd_meas}") rsh(h, cmd_meas, check=True) # ---------------------------- log collection -------------------------------- def out_dir_name(nodes, duration): return f"{now_stamp()}_e2e_ptp4l_{duration}s_nodes({'-'.join(str(n) for n in nodes)})" def collect_logs(nodes, base_dir, duration): exp_dir = os.path.join(base_dir, out_dir_name(nodes, duration)) os.makedirs(exp_dir, exist_ok=True) first_id, last_id = nodes[0], nodes[-1] for nid in nodes: h = host(nid) node_dir = os.path.join(exp_dir, host(nid)) os.makedirs(node_dir, exist_ok=True) ts_dir = os.path.join(node_dir, "tslogs") os.makedirs(ts_dir, exist_ok=True) # Newest two tslog folders (ok if none) res = rsh(h, r"bash -lc 'shopt -s nullglob; ls -td /tmp/tslog_* 2>/dev/null | head -n2 || true'", capture=True, check=False) folders = [ln.strip() for ln in res.stdout.splitlines() if ln.strip()] taken = set() for fld in folders: # list CSVs (ok if none) lsres = rsh(h, f'bash -lc \'shopt -s nullglob; for f in "{fld}"/*.csv; do echo "$f"; done\'', capture=True, check=False) files = [ln.strip() for ln in lsres.stdout.splitlines() if ln.strip()] for remote in files: base = os.path.basename(remote) if base in taken: continue scp_from(h, remote, ts_dir) taken.add(base) # Role-based logs if nid == first_id: role_logs = [("ptp4l_gm", GM_LOG), ("phc2sys_meas", PHC2SYS_MEAS_LOG)] elif nid == last_id: role_logs = [("ptp4l_slave", SLAVE_LOG), ("phc2sys_meas", PHC2SYS_MEAS_LOG)] else: role_logs = [ ("ptp4l_bc", BC_LOG), ("phc2sys_meas", PHC2SYS_MEAS_LOG), ("phc2sys_bridge", PHC2SYS_BRIDGE_LOG), ] for label, path in role_logs: res = rsh(h, f'bash -lc \'sync; sleep 1; if [ -f "{path}" ]; then cat "{path}"; fi; exit 0\'', capture=True, check=False) with open(os.path.join(node_dir, f"{label}.log"), "w") as f: f.write(res.stdout.replace("\x00", "")) # ------------------------------- CLI ---------------------------------------- def parse_args(): p = argparse.ArgumentParser(description="Straight-line E2E/P2P PTP runner with inline configs (JBOD BC).") p.add_argument("duration", type=int, help="run time in seconds") p.add_argument("nodes", type=int, nargs="+", help="ordered node IDs, e.g., 0 1 2 3 4") # PTP knobs p.add_argument("--delay", choices=["E2E", "P2P"], default="E2E", help="delay mechanism") p.add_argument("--servo", choices=["pi","linreg"], default="pi", help="ptp4l servo") p.add_argument("--kp", type=float, default=0.7, help="PI Kp") p.add_argument("--ki", type=float, default=0.3, help="PI Ki") p.add_argument("--linreg-window", type=int, default=None, help="linreg filter length (if supported)") p.add_argument("--linreg-sanity-sigma", type=float, default=None, help="linreg sigma clamp (if supported)") # Message intervals p.add_argument("--log-sync", type=str, default="0", help="logSyncInterval (e.g., 0, -1, -3)") p.add_argument("--log-announce", type=str, default="1", help="logAnnounceInterval") p.add_argument("--log-delayreq", type=str, default="-3", help="logMinDelayReqInterval") p.add_argument("--announce-timeout", type=int, default=6, help="announceReceiptTimeout (slaves)") p.add_argument("--tx-ts-timeout", type=str, default="400", help="tx_timestamp_timeout") # Transport / domain / source p.add_argument("--transport", choices=["UDPv4","L2"], default="UDPv4") p.add_argument("--domain", type=int, default=0) p.add_argument("--time-source", default="0x10") # Servo/general limits p.add_argument("--step-threshold", type=float, default=0.001, help="ptp4l step_threshold (s)") p.add_argument("--max-freq", type=int, default=None, help="max_frequency (ppm)") # Clock quality / priorities per role p.add_argument("--gm-clock-class", type=int, default=6) p.add_argument("--gm-clock-accuracy", default="0x20") p.add_argument("--gm-oslv", default="0x200") p.add_argument("--gm-prio1", type=int, default=128) p.add_argument("--gm-prio2", type=int, default=128) p.add_argument("--bc-clock-class", type=int, default=248) p.add_argument("--bc-clock-accuracy", default="0xFE") p.add_argument("--bc-oslv", default="0x200") p.add_argument("--bc-prio1", type=int, default=248) p.add_argument("--bc-prio2", type=int, default=128) p.add_argument("--sl-clock-class", type=int, default=248) p.add_argument("--sl-clock-accuracy", default="0xFE") p.add_argument("--sl-oslv", default="0x200") p.add_argument("--sl-prio1", type=int, default=248) p.add_argument("--sl-prio2", type=int, default=128) # JBOD toggle (on by default for BC nodes) p.add_argument("--no-jbod", action="store_true", help="disable boundary_clock_jbod on BC nodes") # SARB broadcaster knobs p.add_argument("--rb-rate", type=int, default=1000, help="reference broadcast rate (Hz)") p.add_argument("--rb-iface", default="eth0", help="broadcaster interface on controller") # Output p.add_argument("--out", default="logs", help="base output dir") return p.parse_args() # -------------------------------- main -------------------------------------- def main(): args = parse_args() args.jbod = (not args.no_jbod) # convenience flag internally nodes = [int(n) for n in args.nodes] if len(nodes) < 2: print("Need at least two nodes for a line.") sys.exit(1) print(f"Nodes: {nodes} | duration: {args.duration}s | delay={args.delay} servo={args.servo} | JBOD={'on' if args.jbod else 'off'}") # PRE-RUN: stop leftovers, then delete old logs stop_procs(nodes) cleanup_old(nodes) # If UDPv4, prepare multicast routing on eth1/eth2 if args.transport == "UDPv4": prep_udpv4_multicast(nodes) print("Stopping local reference broadcaster") subprocess.run(["sudo", "pkill", "broadcaster"]) bpid = None try: # Start SARB start_receivers(nodes) wait_receivers(nodes, expected=1, timeout=20) bpid = start_broadcaster(rate_hz=args.rb_rate, iface=args.rb_iface) # Start PTP roles first, last = nodes[0], nodes[-1] start_first_gm(args, first) for nid in nodes[1:-1]: start_middle_jbod(args, nid) start_last_slave(args, last) if not wait_ptp(nodes): raise RuntimeError("ptp4l failed to start everywhere") # Measurement window print(f"Running for {args.duration} s...") for t in range(args.duration, 0, -1): sys.stdout.write(f"\rTime left: {t:4d}s") sys.stdout.flush() time.sleep(1) print("\nStopping...") finally: # TEARDOWN: stop processes (keep logs), then collect stop_procs(nodes) subprocess.run(["sudo", "pkill", "-f", "broadcaster"], check=False) try: collect_logs(nodes, args.out, args.duration) except Exception as e: print(f"[WARN] log collection encountered an issue: {e}") print("Done.") if __name__ == "__main__": main()