#!/usr/bin/env python3 """ run_ptp_experiment_e2e.py — BC chain (JBOD) or OC-only L2 bridged multihop Straight-line Ethernet chain (no loopback). All nodes are already placed in ONE L3 subnet by your init script (static /32 routes hop-by-hop). This runner embeds ptp4l configs and lets you change parameters at runtime. ROLE MODES - BC : Boundary-clock chain (single ptp4l per BC node via boundary_clock_jbod=1) * First node: GM on eth2 * Middles: ptp4l on eth1+eth2 (JBOD), phc2sys eth1->eth2 and eth1->eth0 * Last: Slave on eth1, phc2sys eth1->eth0 - OC_L2 : Ordinary-clock only (no time-aware middle nodes) * Intermediates become L2 bridges (br0: eth1+eth2), run NO PTP * First node: GM on eth2 * All others: Slave on upstream-facing iface (default eth1), phc2sys iface->eth0 * Recommended transport: L2 (raw Ethernet PTP) Transport - UDPv4 (default) or L2. For UDPv4 + /32 interfaces, we add multicast routes on eth1/eth2. Behavior - Does NOT delete logs at teardown (only pre-run cleanup) - Uses bash -lc + nullglob for safe remote globs (no zsh errors) - Log collection is tolerant: missing files do not cause failure """ import argparse import datetime import os import subprocess import sys import time PTP_REMOTE_CONFIG_PATH = "/opt/ptp_conf" PTP4L_BIN = "ptp4l" PHC2SYS_BIN = "phc2sys" RECEIVER_PATH = "/opt/timestamping/receiver_hwts_logger" BROADCASTER_PATH = "/home/apu/testbed_files/experiments/reference_broadcast/broadcaster" # Remote log paths GM_LOG = "/tmp/ptp4l_gm.log" BC_LOG = "/tmp/ptp4l_bc.log" SLAVE_LOG = "/tmp/ptp4l_slave.log" PHC2SYS_MEAS_LOG = "/tmp/phc2sys_meas.log" # measurement NIC tie PHC2SYS_BRIDGE_LOG = "/tmp/phc2sys_bridge.log" # eth1 -> eth2 bridge on middles (BC mode) # -------------------------- basic helpers ----------------------------------- def host(node_id: int) -> str: return f"apu{node_id:02d}" def sh(cmd, check=True, capture=False): if capture: return subprocess.run(cmd, check=check, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) return subprocess.run(cmd, check=check) def rsh(hostname, cmd, check=True, capture=False): return sh(["ssh", hostname, cmd], check=check, capture=capture) def scp_from(hostname, remote_path, local_dir): os.makedirs(local_dir, exist_ok=True) try: sh(["scp", "-q", f"{hostname}:{remote_path}", local_dir], check=True) except subprocess.CalledProcessError: pass def now_stamp(): return datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") # ---------------------- inline ptp4l config builders ------------------------ def build_global_section(args, role): """ Build a [global] ptp4l section string based on CLI args and role. role ∈ {"gm","bc","slave"}. """ lines = ["[global]"] # Transport & basic behavior lines.append("twoStepFlag 1") lines.append(f"network_transport {args.transport}") lines.append(f"domainNumber {args.domain}") lines.append(f"timeSource {args.time_source}") lines.append(f"delay_mechanism {args.delay}") # UDPv4 knobs (harmless for L2; linuxptp ignores unknowns per transport) if args.transport == "UDPv4": if args.udp_ttl is not None: lines.append(f"udp_ttl {args.udp_ttl}") # Clock identity/priorities per role if role == "gm": lines += [ "serverOnly 1", f"clockClass {args.gm_clock_class}", f"clockAccuracy {args.gm_clock_accuracy}", f"offsetScaledLogVariance {args.gm_oslv}", f"priority1 {args.gm_prio1}", f"priority2 {args.gm_prio2}", ] elif role == "bc": lines += [ f"clockClass {args.bc_clock_class}", f"clockAccuracy {args.bc_clock_accuracy}", f"offsetScaledLogVariance {args.bc_oslv}", f"priority1 {args.bc_prio1}", f"priority2 {args.bc_prio2}", ] if args.jbod: lines.append("boundary_clock_jbod 1") elif role == "slave": lines += [ "clientOnly 1", f"clockClass {args.sl_clock_class}", f"clockAccuracy {args.sl_clock_accuracy}", f"offsetScaledLogVariance {args.sl_oslv}", f"priority1 {args.sl_prio1}", f"priority2 {args.sl_prio2}", f"announceReceiptTimeout {args.announce_timeout}", ] else: raise ValueError("unknown role") # Message intervals lines += [ f"logSyncInterval {args.log_sync}", f"logAnnounceInterval {args.log_announce}", f"logMinDelayReqInterval {args.log_delayreq}", f"tx_timestamp_timeout {args.tx_ts_timeout}", ] # Servo selection if args.servo.lower() == "pi": lines.append("clock_servo pi") lines.append(f"pi_proportional_const {args.kp}") lines.append(f"pi_integral_const {args.ki}") else: lines.append("clock_servo linreg") # if args.linreg_window is not None: # lines.append(f"linreg_filter_length {args.linreg_window}") if args.linreg_sanity_sigma is not None: lines.append(f"linreg_sanity_sigma {args.linreg_sanity_sigma}") # Steps / clamps if args.step_threshold is not None: lines.append(f"step_threshold {args.step_threshold}") if args.max_freq is not None: lines.append(f"max_frequency {args.max_freq}") return "\n".join(lines) + "\n" def write_remote_conf(h, name, text): rsh(h, f"sudo mkdir -p {PTP_REMOTE_CONFIG_PATH}", check=False) payload = text.replace("'", "'\"'\"'") rsh(h, f"bash -lc 'cat > {PTP_REMOTE_CONFIG_PATH}/{name} <<\"CONF\"\n{payload}CONF\n'", check=True) print(f"[{h}] wrote {name}") # ---------------------------- UDPv4 multicast prep -------------------------- def prep_udpv4_multicast(nodes): """Bring PTP ports up and add IPv4 multicast routes (safe on /32 topologies).""" for nid in nodes: h = host(nid) for ifc in ("eth1", "eth2"): exists = rsh(h, f'[ -d "/sys/class/net/{ifc}" ] && echo yes || echo no', capture=True, check=False).stdout.strip() if exists != "yes": continue rsh(h, f"sudo ip link set {ifc} up", check=False) rsh(h, f"sudo sysctl -q -w net.ipv4.conf.{ifc}.rp_filter=0", check=False) rsh(h, f"sudo ip route replace 224.0.0.0/4 dev {ifc}", check=False) # ---------------------------- OC_L2 bridging helpers ------------------------ def oc_l2_setup_bridge(nid, br="br0", if1="eth1", if2="eth2"): h = host(nid) cmds = [ f"sudo ip link set {if1} down || true", f"sudo ip link set {if2} down || true", f"sudo ip link del {br} 2>/dev/null || true", f"sudo ip link add {br} type bridge stp_state 0", f"sudo ip addr flush dev {if1}", f"sudo ip addr flush dev {if2}", f"sudo ip link set {if1} master {br}", f"sudo ip link set {if2} master {br}", f"sudo ip link set {if1} up", f"sudo ip link set {if2} up", f"sudo ip link set {br} up", # PTP-friendly toggles: f"echo 0 | sudo tee /sys/class/net/{br}/bridge/multicast_snooping >/dev/null || true", f"echo 0 | sudo tee /sys/class/net/{br}/bridge/vlan_filtering >/dev/null || true", ] for c in cmds: rsh(h, c, check=False) print(f"[{h}] L2 bridge {br} up ({if1}+{if2}), snooping/vlan_filtering=0") def oc_l2_teardown_bridge(nid, br="br0", if1="eth1", if2="eth2"): h = host(nid) cmds = [ f"sudo ip link set {if1} nomaster 2>/dev/null || true", f"sudo ip link set {if2} nomaster 2>/dev/null || true", f"sudo ip link del {br} 2>/dev/null || true", ] for c in cmds: rsh(h, c, check=False) print(f"[{h}] L2 bridge {br} removed") # ---------------------------- process control ------------------------------- def stop_procs(nodes): """Stop daemons but KEEP logs (used at teardown).""" print("Stopping daemons (keeping logs)...") for nid in nodes: h = host(nid) for cmd in [ "pkill -f ptp4l || true", "pkill -f phc2sys || true", "pkill -f receiver_hwts_logger || true", "sudo timedatectl set-ntp false || true", "sudo systemctl disable --now systemd-timesyncd 2>/dev/null || true", "sync", ]: rsh(h, cmd, check=False) def cleanup_old(nodes): """Pre-run: delete old logs on the nodes to avoid mixing with new run outputs.""" print("Removing old logs (pre-run only)...") for nid in nodes: h = host(nid) for cmd in [ r"bash -lc 'shopt -s nullglob; rm -rf /tmp/tslog_* 2>/dev/null || true'", r"bash -lc 'rm -f /tmp/ptp4l_*.log /tmp/phc2sys*.log 2>/dev/null || true'", "sync", ]: rsh(h, cmd, check=False) def start_receivers(nodes): for nid in nodes: h = host(nid) cmd = f"sudo taskset -c 1 chrt -f 90 {RECEIVER_PATH} eth0 >/dev/null 2>&1 &" print(f"[{h}] receiver on eth0") rsh(h, cmd, check=False) def wait_receivers(nodes, expected=1, timeout=20): for nid in nodes: h = host(nid) ok = False for _ in range(timeout): res = rsh(h, "pgrep -fa receiver_hwts_logger | wc -l", capture=True, check=False) try: cnt = int(res.stdout.strip()) except ValueError: cnt = 0 if cnt >= expected: ok = True break time.sleep(1) if not ok: detail = rsh(h, "pgrep -fa receiver_hwts_logger || true", capture=True, check=False).stdout raise RuntimeError(f"receiver not up on {h} (found {cnt})\n{detail}") def start_broadcaster(rate_hz=1000, iface="eth0"): cmd = f"sudo taskset -c 1 chrt -f 90 {BROADCASTER_PATH} {iface} {rate_hz} >/dev/null 2>&1 & echo $!" out = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, text=True) try: pid = int(out.stdout.strip()) except ValueError: pid = -1 print(f"[controller] broadcaster PID {pid} on {iface}@{rate_hz}Hz") return pid def is_ptp_running(h): return rsh(h, "pgrep -x ptp4l >/dev/null", check=False).returncode == 0 def wait_ptp(ptp_hosts, retries=15, delay=2): print("Waiting for ptp4l on required hosts...") for _ in range(retries): missing = [h for h in ptp_hosts if not is_ptp_running(h)] if not missing: print("ptp4l running on all required hosts.") return True print("Not yet on:", ", ".join(missing)) time.sleep(delay) return False # ---------------------------- role start ------------------------------------ def start_first_gm(args, nid): h = host(nid) gm_conf = build_global_section(args, "gm") write_remote_conf(h, "ptp4l_gm.conf", gm_conf) cmd_gm = f"{PTP4L_BIN} -i eth2 -m -H -f {PTP_REMOTE_CONFIG_PATH}/ptp4l_gm.conf > {GM_LOG} 2>&1 &" print(f"[{h}] {cmd_gm}") rsh(h, cmd_gm, check=True) time.sleep(1) if not is_ptp_running(h): log = rsh(h, f"tail -n +1 {GM_LOG} 2>/dev/null || true", capture=True, check=False).stdout raise RuntimeError(f"ptp4l (GM) did not stay up on {h}.\n---- {GM_LOG} ----\n{log}") # Tie measurement NIC to GM port clock cmd_pxc = f"{PHC2SYS_BIN} -s eth2 -c eth0 -O 0 -m -w --step_threshold=1 > {PHC2SYS_MEAS_LOG} 2>&1 &" print(f"[{h}] {cmd_pxc}") rsh(h, cmd_pxc, check=True) def start_last_slave(args, nid, iface="eth1"): h = host(nid) sl_conf = build_global_section(args, "slave") write_remote_conf(h, "ptp4l_slave.conf", sl_conf) cmd_sl = f"{PTP4L_BIN} -i {iface} -m -H -f {PTP_REMOTE_CONFIG_PATH}/ptp4l_slave.conf > {SLAVE_LOG} 2>&1 &" print(f"[{h}] {cmd_sl}") rsh(h, cmd_sl, check=True) time.sleep(1) if not is_ptp_running(h): log = rsh(h, f"tail -n +1 {SLAVE_LOG} 2>/dev/null || true", capture=True, check=False).stdout raise RuntimeError(f"ptp4l (Slave) did not stay up on {h}.\n---- {SLAVE_LOG} ----\n{log}") # Tie measurement NIC to slave port clock cmd_pxc = f"{PHC2SYS_BIN} -s {iface} -c eth0 -O 0 -m -w --step_threshold=1 > {PHC2SYS_MEAS_LOG} 2>&1 &" print(f"[{h}] {cmd_pxc}") rsh(h, cmd_pxc, check=True) def start_middle_jbod(args, nid): """BC mode middle node.""" h = host(nid) bc_conf = build_global_section(args, "bc") write_remote_conf(h, "ptp4l_bc.conf", bc_conf) cmd_bc = f"{PTP4L_BIN} -i eth1 -i eth2 -m -H -f {PTP_REMOTE_CONFIG_PATH}/ptp4l_bc.conf > {BC_LOG} 2>&1 &" print(f"[{h}] {cmd_bc}") rsh(h, cmd_bc, check=True) time.sleep(1) if not is_ptp_running(h): log = rsh(h, f"tail -n +1 {BC_LOG} 2>/dev/null || true", capture=True, check=False).stdout raise RuntimeError(f"ptp4l (BC JBOD) did not stay up on {h}.\n---- {BC_LOG} ----\n{log}") # PHC bridge and measurement tie cmd_bridge = f"{PHC2SYS_BIN} -s eth1 -c eth2 -O 0 -m -w --step_threshold=1 > {PHC2SYS_BRIDGE_LOG} 2>&1 &" print(f"[{h}] {cmd_bridge}") rsh(h, cmd_bridge, check=True) cmd_meas = f"{PHC2SYS_BIN} -s eth1 -c eth0 -O 0 -m -w --step_threshold=1 > {PHC2SYS_MEAS_LOG} 2>&1 &" print(f"[{h}] {cmd_meas}") rsh(h, cmd_meas, check=True) # ---------------------------- log collection -------------------------------- def out_dir_name(nodes, duration, role_mode): return f"{now_stamp()}_e2e_ptp4l_{duration}s_nodes({'-'.join(str(n) for n in nodes)})__{role_mode}" def collect_logs(nodes, base_dir, duration, role_mode, oc_iface="eth1"): exp_dir = os.path.join(base_dir, out_dir_name(nodes, duration, role_mode)) os.makedirs(exp_dir, exist_ok=True) first_id, last_id = nodes[0], nodes[-1] for nid in nodes: h = host(nid) node_dir = os.path.join(exp_dir, host(nid)) os.makedirs(node_dir, exist_ok=True) ts_dir = os.path.join(node_dir, "tslogs") os.makedirs(ts_dir, exist_ok=True) # Newest two tslog folders (ok if none) res = rsh(h, r"bash -lc 'shopt -s nullglob; ls -td /tmp/tslog_* 2>/dev/null | head -n2 || true'", capture=True, check=False) folders = [ln.strip() for ln in res.stdout.splitlines() if ln.strip()] taken = set() for fld in folders: lsres = rsh(h, f'bash -lc \'shopt -s nullglob; for f in "{fld}"/*.csv; do echo "$f"; done\'', capture=True, check=False) files = [ln.strip() for ln in lsres.stdout.splitlines() if ln.strip()] for remote in files: base = os.path.basename(remote) if base in taken: continue scp_from(h, remote, ts_dir) taken.add(base) # Role-based logs role_logs = [] if role_mode == "BC": if nid == first_id: role_logs = [("ptp4l_gm", GM_LOG), ("phc2sys_meas", PHC2SYS_MEAS_LOG)] elif nid == last_id: role_logs = [("ptp4l_slave", SLAVE_LOG), ("phc2sys_meas", PHC2SYS_MEAS_LOG)] else: role_logs = [ ("ptp4l_bc", BC_LOG), ("phc2sys_meas", PHC2SYS_MEAS_LOG), ("phc2sys_bridge", PHC2SYS_BRIDGE_LOG), ] elif role_mode == "OC_L2": if nid == first_id: role_logs = [("ptp4l_gm", GM_LOG), ("phc2sys_meas", PHC2SYS_MEAS_LOG)] else: role_logs = [("ptp4l_slave", SLAVE_LOG), ("phc2sys_meas", PHC2SYS_MEAS_LOG)] for label, path in role_logs: res = rsh(h, f'bash -lc \'sync; sleep 1; if [ -f "{path}" ]; then cat "{path}"; fi; exit 0\'', capture=True, check=False) with open(os.path.join(node_dir, f"{label}.log"), "w") as f: f.write(res.stdout.replace("\x00", "")) # ------------------------------- CLI ---------------------------------------- def parse_args(): p = argparse.ArgumentParser(description="Straight-line PTP runner with inline configs: BC chain (JBOD) or OC-only L2 bridged.") p.add_argument("duration", type=int, help="run time in seconds") p.add_argument("nodes", type=int, nargs="+", help="ordered node IDs, e.g., 0 1 2 3 4") # Role mode p.add_argument("--role-mode", choices=["BC", "OC_L2"], default="BC", help="BC (JBOD boundary clock chain) or OC_L2 (ordinary-clock only, L2-bridged)") # PTP knobs p.add_argument("--delay", choices=["E2E", "P2P"], default="E2E", help="delay mechanism (P2P valid only in BC mode)") p.add_argument("--servo", choices=["pi","linreg"], default="pi", help="ptp4l servo") p.add_argument("--kp", type=float, default=0.7, help="PI Kp") p.add_argument("--ki", type=float, default=0.3, help="PI Ki") p.add_argument("--linreg-window", type=int, default=None, help="linreg filter length (if supported)") p.add_argument("--linreg-sanity-sigma", type=float, default=None, help="linreg sigma clamp (if supported)") # Message intervals p.add_argument("--log-sync", type=str, default="0", help="logSyncInterval (e.g., 0, -1, -3)") p.add_argument("--log-announce", type=str, default="1", help="logAnnounceInterval") p.add_argument("--log-delayreq", type=str, default="-3", help="logMinDelayReqInterval (E2E)") p.add_argument("--announce-timeout", type=int, default=6, help="announceReceiptTimeout (slaves)") p.add_argument("--tx-ts-timeout", type=str, default="400", help="tx_timestamp_timeout") # Transport / domain / source p.add_argument("--transport", choices=["UDPv4","L2"], default="UDPv4") p.add_argument("--udp-ttl", type=int, default=1, help="UDPv4 TTL (ignored in L2)") p.add_argument("--domain", type=int, default=0) p.add_argument("--time-source", default="0x10") # Servo/general limits p.add_argument("--step-threshold", type=float, default=0.001, help="ptp4l step_threshold (s)") p.add_argument("--max-freq", type=int, default=None, help="max_frequency (ppm)") # Clock quality / priorities per role p.add_argument("--gm-clock-class", type=int, default=6) p.add_argument("--gm-clock-accuracy", default="0x20") p.add_argument("--gm-oslv", default="0x200") p.add_argument("--gm-prio1", type=int, default=128) p.add_argument("--gm-prio2", type=int, default=128) p.add_argument("--bc-clock-class", type=int, default=248) p.add_argument("--bc-clock-accuracy", default="0xFE") p.add_argument("--bc-oslv", default="0x200") p.add_argument("--bc-prio1", type=int, default=248) p.add_argument("--bc-prio2", type=int, default=128) p.add_argument("--sl-clock-class", type=int, default=248) p.add_argument("--sl-clock-accuracy", default="0xFE") p.add_argument("--sl-oslv", default="0x200") p.add_argument("--sl-prio1", type=int, default=248) p.add_argument("--sl-prio2", type=int, default=128) # JBOD toggle (BC mode) p.add_argument("--no-jbod", action="store_true", help="disable boundary_clock_jbod on BC nodes") # OC-only options p.add_argument("--oc-upstream-iface", default="eth1", help="slave port facing upstream GM (OC modes)") p.add_argument("--oc-keep-bridge", action="store_true", help="do NOT remove br0 after run (OC_L2 mode)") # SARB broadcaster knobs p.add_argument("--rb-rate", type=int, default=1000, help="reference broadcast rate (Hz)") p.add_argument("--rb-iface", default="eth0", help="broadcaster interface on controller") # Output p.add_argument("--out", default="logs", help="base output dir") return p.parse_args() # -------------------------------- main -------------------------------------- def main(): args = parse_args() args.jbod = (not args.no_jbod) nodes = [int(n) for n in args.nodes] if len(nodes) < 2: print("Need at least two nodes for a line.") sys.exit(1) if args.role_mode == "OC_L2": if args.transport != "L2": print("OC_L2 requires link-layer transport; forcing --transport L2") args.transport = "L2" # OC-only cannot do P2P if args.delay != "E2E": print("OC_L2 requires --delay E2E; forcing E2E") args.delay = "E2E" print(f"Nodes: {nodes} | duration: {args.duration}s | role={args.role_mode} | delay={args.delay} | servo={args.servo}") # PRE-RUN: stop leftovers, then delete old logs stop_procs(nodes) cleanup_old(nodes) # If UDPv4, prepare multicast routing on eth1/eth2 if args.transport == "UDPv4": prep_udpv4_multicast(nodes) # Kill any prior local broadcaster subprocess.run(["sudo", "pkill", "broadcaster"], check=False) # OC_L2: prepare L2 bridges on intermediates if args.role_mode == "OC_L2": for nid in nodes[1:-1]: oc_l2_setup_bridge(nid) print("Stopping local reference broadcaster") subprocess.run(["sudo", "pkill", "broadcaster"]) bpid = None ptp_hosts = [] # hosts that MUST be running ptp4l for wait_ptp() try: # Start SARB start_receivers(nodes) wait_receivers(nodes, expected=1, timeout=20) bpid = start_broadcaster(rate_hz=args.rb_rate, iface=args.rb_iface) first, last = nodes[0], nodes[-1] # Start roles start_first_gm(args, first) ptp_hosts.append(host(first)) if args.role_mode == "BC": for nid in nodes[1:-1]: start_middle_jbod(args, nid) ptp_hosts.append(host(nid)) start_last_slave(args, last, iface="eth1") ptp_hosts.append(host(last)) elif args.role_mode == "OC_L2": # No PTP on intermediates (bridges only). Every downstream node is a slave. for nid in nodes[1:]: start_last_slave(args, nid, iface=args.oc_upstream_iface) ptp_hosts.append(host(nid)) # Wait for required ptp4l processes if not wait_ptp(ptp_hosts): raise RuntimeError("ptp4l failed to start everywhere") # Measurement window print(f"Running for {args.duration} s...") for t in range(args.duration, 0, -1): sys.stdout.write(f"\rTime left: {t:4d}s") sys.stdout.flush() time.sleep(1) print("\nStopping...") finally: # TEARDOWN: stop processes (keep logs), then collect; optionally remove bridges stop_procs(nodes) subprocess.run(["sudo", "pkill", "-f", "broadcaster"], check=False) if args.role_mode == "OC_L2" and not args.oc_keep_bridge: for nid in nodes[1:-1]: oc_l2_teardown_bridge(nid) try: collect_logs(nodes, args.out, args.duration, args.role_mode, oc_iface=args.oc_upstream_iface) except Exception as e: print(f"[WARN] log collection encountered an issue: {e}") print("Done.") if __name__ == "__main__": main()