#!/usr/bin/env python3 from __future__ import annotations import argparse import datetime as dt import json import re import subprocess import sys import time from dataclasses import dataclass from pathlib import Path ROOT_DIR = Path(__file__).resolve().parent REMOTE_PTP_CONFIG_PATH = "/opt/ptp_conf" PTP4L_BINARY = "/home/apu/wifi-ptp/ptp/ptp4l" RECEIVER_PATH = "/opt/timestamping/receiver_hwts_logger" BROADCASTER_PATH = "/home/apu/testbed_files/experiments/reference_broadcast/broadcaster" GRANDMASTER_CONF = "ptp4l_grandmaster.conf" SLAVE_CONF = "ptp4l_slave.conf" GRANDMASTER_LOG = "/tmp/ptp4l_grandmaster.log" SLAVE_LOG = "/tmp/ptp4l_slave.log" PHC2SYS_LOG = "/tmp/phc2sys_eth0_sync.log" RECEIVER_LOG = "/tmp/receiver_hwts_logger_eth0.log" SARB_IFACE = "eth0" DEFAULT_BROADCAST_RATE = 1000 @dataclass(frozen=True) class ModeDefinition: name: str hosts: tuple[str, ...] master: str ptp_iface: str mode_dir: Path MODE_DEFINITIONS = { "infra": ModeDefinition( name="infra", hosts=("apu00", "apu01", "apu02"), master="apu01", ptp_iface="wlan0", mode_dir=ROOT_DIR / "infra", ), "adhoc": ModeDefinition( name="adhoc", hosts=("apu10", "apu11", "apu12"), master="apu11", ptp_iface="adhoc0", mode_dir=ROOT_DIR / "adhoc", ), "mesh": ModeDefinition( name="mesh", hosts=("apu20", "apu21", "apu22"), master="apu21", ptp_iface="mesh0", mode_dir=ROOT_DIR / "mesh", ), } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Run parallel Wi-Fi PTP4L experiments across infra, adhoc and mesh." ) parser.add_argument("duration", type=int, help="Experiment duration in seconds.") parser.add_argument( "--experiments", default="infra,adhoc,mesh", help="Comma-separated subset of experiments to run: infra,adhoc,mesh or all.", ) parser.add_argument( "--broadcast-rate", type=int, default=DEFAULT_BROADCAST_RATE, help="Reference broadcaster packet rate in packets per second.", ) parser.add_argument( "--startup-delay", type=float, default=1.0, help="Delay in seconds between startup stages.", ) parser.add_argument( "--ptp-binary", default=PTP4L_BINARY, help="Remote ptp4l binary path on the nodes.", ) return parser.parse_args() def parse_selected_modes(raw_value: str) -> list[ModeDefinition]: raw_value = raw_value.strip().lower() if raw_value == "all": return [MODE_DEFINITIONS[name] for name in ("infra", "adhoc", "mesh")] mode_names = [] for item in raw_value.split(","): name = item.strip().lower() if not name: continue if name not in MODE_DEFINITIONS: raise ValueError(f"Unknown experiment '{name}'.") if name not in mode_names: mode_names.append(name) if not mode_names: raise ValueError("No experiments selected.") return [MODE_DEFINITIONS[name] for name in mode_names] def run_ssh(host: str, command: str, check: bool = True) -> subprocess.CompletedProcess[str]: return subprocess.run( ["ssh", host, command], check=check, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) def run_local_shell(command: str) -> subprocess.CompletedProcess[str]: return subprocess.run( command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) def load_json_as_ptp_conf(path: Path) -> str: data = json.loads(path.read_text()) lines: list[str] = [] for section, options in data.items(): lines.append(f"[{section}]") for key, value in options.items(): lines.append(f"{key} {value}") return "\n".join(lines) def write_remote_config(host: str, remote_name: str, local_path: Path) -> None: run_ssh(host, f"sudo mkdir -p {REMOTE_PTP_CONFIG_PATH}") content = load_json_as_ptp_conf(local_path) subprocess.run( ["ssh", host, f"sudo tee {REMOTE_PTP_CONFIG_PATH}/{remote_name} >/dev/null"], input=content, text=True, check=True, ) def detect_phc_device(host: str, iface: str) -> str: result = run_ssh(host, f"ethtool -T {iface}") match = re.search(r"PTP Hardware Clock:\s*(\d+)", result.stdout) if not match: raise RuntimeError(f"[{host}] Could not determine PHC for interface {iface}.") return f"/dev/ptp{match.group(1)}" def stop_remote_processes(host: str) -> None: for command in ( "pkill -f ptp4l || true", "pkill -f phc2sys || true", "sudo pkill -f receiver_hwts_logger || true", "sync || true", ): run_ssh(host, command, check=False) def clear_remote_logs(host: str) -> None: run_ssh( host, f"sudo rm -f {GRANDMASTER_LOG} {SLAVE_LOG} {PHC2SYS_LOG} {RECEIVER_LOG}", check=False, ) def latest_tslog_dir(host: str) -> str | None: result = run_ssh(host, 'sh -lc "ls -td /tmp/tslog_* 2>/dev/null | head -n1"', check=False) value = result.stdout.strip() return value or None def start_phc2sys(host: str, phc_device: str) -> None: command = ( f"phc2sys -s {phc_device} -c {SARB_IFACE} -O 0 -m --step_threshold=1 " f"> {PHC2SYS_LOG} 2>&1 &" ) run_ssh(host, command) def start_receiver(host: str, startup_delay: float) -> str | None: previous_tslog = latest_tslog_dir(host) command = ( f"sudo taskset -c 1 chrt -f 90 {RECEIVER_PATH} {SARB_IFACE} " f"> {RECEIVER_LOG} 2>&1 &" ) run_ssh(host, command) for _ in range(10): time.sleep(startup_delay) status = run_ssh(host, "pgrep -fa receiver_hwts_logger >/dev/null", check=False) current_tslog = latest_tslog_dir(host) if status.returncode == 0 and current_tslog and current_tslog != previous_tslog: return current_tslog return latest_tslog_dir(host) def start_ptp4l(host: str, iface: str, phc_device: str, ptp_binary: str, master: bool) -> None: conf_name = GRANDMASTER_CONF if master else SLAVE_CONF log_name = GRANDMASTER_LOG if master else SLAVE_LOG command = ( f"{ptp_binary} -i {iface} -p {phc_device} -m -H " f"-f {REMOTE_PTP_CONFIG_PATH}/{conf_name} > {log_name} 2>&1 &" ) run_ssh(host, command) def wait_for_ptp(hosts: list[str], retries: int = 10, delay: float = 1.0) -> bool: for _ in range(retries): if all(run_ssh(host, "pgrep -fa ptp4l >/dev/null", check=False).returncode == 0 for host in hosts): return True time.sleep(delay) return False def stop_local_broadcaster() -> None: subprocess.run(["sudo", "pkill", "-f", "broadcaster"], check=False) def start_local_broadcaster(rate_pps: int) -> str: stop_local_broadcaster() command = ( f"sudo taskset -c 1 chrt -f 90 {BROADCASTER_PATH} {SARB_IFACE} {rate_pps} " "> /dev/null 2>&1 & echo $!" ) result = run_local_shell(command) return result.stdout.strip() def collect_remote_text_log(host: str, remote_path: str, local_path: Path) -> None: result = run_ssh(host, f'sh -lc "[ -f \\"{remote_path}\\" ] && cat \\"{remote_path}\\""', check=False) if result.stdout: local_path.write_text(result.stdout.replace("\x00", "")) def collect_tslog_dir(host: str, remote_dir: str | None, local_dir: Path) -> None: if not remote_dir: return subprocess.run(["scp", "-rq", f"{host}:{remote_dir}", str(local_dir)], check=False) def build_mode_log_dir(mode: ModeDefinition, run_stamp: str, duration: int) -> Path: return mode.mode_dir / "logs" / f"{run_stamp}_{mode.name}_ptp4l_{duration}s" def write_metadata(local_dir: Path, payload: dict) -> None: (local_dir / "metadata.json").write_text(json.dumps(payload, indent=2, sort_keys=True)) def countdown(duration: int) -> None: for remaining in range(duration, 0, -1): sys.stdout.write(f"\rTime left: {remaining:4d} s") sys.stdout.flush() time.sleep(1) print("\rTime left: 0 s") def main() -> int: args = parse_args() try: selected_modes = parse_selected_modes(args.experiments) except ValueError as exc: print(f"ERROR: {exc}") return 1 run_stamp = dt.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") unique_hosts = sorted({host for mode in selected_modes for host in mode.hosts}) phc_devices: dict[str, str] = {} tslog_dirs: dict[str, str | None] = {} try: for mode in selected_modes: master_cfg = mode.mode_dir / "ptp_config" / "ptp4l_grandmaster.json" slave_cfg = mode.mode_dir / "ptp_config" / "ptp4l_slave.json" for host in mode.hosts: stop_remote_processes(host) clear_remote_logs(host) write_remote_config(host, GRANDMASTER_CONF, master_cfg) write_remote_config(host, SLAVE_CONF, slave_cfg) phc_device = detect_phc_device(host, mode.ptp_iface) phc_devices[host] = phc_device start_phc2sys(host, phc_device) time.sleep(args.startup_delay) tslog_dirs[host] = start_receiver(host, args.startup_delay) broadcaster_pid = start_local_broadcaster(args.broadcast_rate) time.sleep(args.startup_delay) for mode in selected_modes: start_ptp4l( mode.master, mode.ptp_iface, phc_devices[mode.master], args.ptp_binary, master=True, ) time.sleep(args.startup_delay) for host in mode.hosts: if host == mode.master: continue start_ptp4l(host, mode.ptp_iface, phc_devices[host], args.ptp_binary, master=False) time.sleep(args.startup_delay) if not wait_for_ptp(unique_hosts, delay=args.startup_delay): print("ERROR: Not all ptp4l processes came up.") return 2 print(f"Broadcaster PID: {broadcaster_pid}") countdown(args.duration) return 0 finally: for host in unique_hosts: stop_remote_processes(host) stop_local_broadcaster() for mode in selected_modes if "selected_modes" in locals() else []: local_dir = build_mode_log_dir(mode, run_stamp, args.duration) local_dir.mkdir(parents=True, exist_ok=True) for host in mode.hosts: host_dir = local_dir / host host_dir.mkdir(parents=True, exist_ok=True) collect_remote_text_log(host, GRANDMASTER_LOG, host_dir / "ptp4l_grandmaster.log") collect_remote_text_log(host, SLAVE_LOG, host_dir / "ptp4l_slave.log") collect_remote_text_log(host, PHC2SYS_LOG, host_dir / "phc2sys_eth0_sync.log") collect_remote_text_log(host, RECEIVER_LOG, host_dir / "receiver_hwts_logger_eth0.log") collect_tslog_dir(host, tslog_dirs.get(host), host_dir) write_metadata( local_dir, { "broadcast_iface": SARB_IFACE, "broadcast_rate_pps": args.broadcast_rate, "duration_seconds": args.duration, "master": mode.master, "mode": mode.name, "phc_devices": {host: phc_devices.get(host, "") for host in mode.hosts}, "ptp_iface": mode.ptp_iface, "run_stamp": run_stamp, "selected_experiments": [item.name for item in selected_modes], "tslog_dirs": {host: tslog_dirs.get(host) for host in mode.hosts}, }, ) if __name__ == "__main__": raise SystemExit(main())