#!/usr/bin/env python3 import os import re import sys import time import datetime import subprocess import json # PTP_CONF = "/opt/ptp_conf/ptp4l_eth_line_top.conf" PTP_REMOTE_CONFIG_PATH = "/opt/ptp_conf/" GM_FILE = "ptp4l_grandmaster.conf" SV_FILE = "ptp4l_slave.conf" MA_FILE = "ptp4l_master.conf" BC_FILE = "ptp4l_boundary.conf" PTP4L_INSTANCE = "ptp4l" # PTP4L_INSTANCE = "/home/apu/wifi-ptp/ptp/ptp4l" GRANDMASTER_LOG = "/tmp/ptp4l_grandmaster.log" SLAVE_LOG = "/tmp/ptp4l_slave.log" MASTER_LOG = "/tmp/ptp4l_master.log" BOUNDARY_LOG = "/tmp/ptp4l_boundary.log" PHC2SYS_LOG_1 = "/tmp/phc2sys_1.log" PHC2SYS_LOG_2 = "/tmp/phc2sys_2.log" PHC2SYS_LOG_3 = "/tmp/phc2sys_3.log" RECEIVER_PATH = "/opt/timestamping/receiver_hwts_logger" BROADCASTER_PATH = "/home/apu/testbed_files/experiments/reference_broadcast/broadcaster" PTP_CONFIG_DIR = "eth_e2e/ptp_config/eth" EXPERIMENT_NAME = "ptp_loop_refbc_e2e" def load_ptp_config(filepath): with open(filepath, "r") as f: data = json.load(f) lines = [] for section, options in data.items(): lines.append(f"[{section}]") for key, value in options.items(): lines.append(f"{key} {value}") return "\n".join(lines) def usage(): print("Usage: run_ptp_experiment.py ") print("Example: run_ptp_experiment.py 60 0 1 2 3 4") sys.exit(1) def get_hostname(node_id): return f"apu{str(node_id).zfill(2)}" def write_ptp_config_to_node(hostname, role, conf_str): if role == "grandmaster": file_name = GM_FILE elif role == "slave": file_name = SV_FILE elif role == "master": file_name = MA_FILE elif role == "boundary": file_name = BC_FILE else: raise ValueError("Invalid role. Must be 'grandmaster' or 'slave'.") try: subprocess.run( ["ssh", hostname, f"echo '{conf_str}' | sudo tee '{PTP_REMOTE_CONFIG_PATH}/{file_name}' > /dev/null"], shell=False, check=True ) print(f"[{hostname}] Wrote ptp4l config.") except subprocess.CalledProcessError as e: print(f"ERROR: Failed to write ptp4l.conf to {hostname}: {e}") def generate_output_filename(nodes, duration): timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") node_list = "-".join(str(n) for n in nodes) return f"{timestamp}_loop_ptp4l_{duration}s_nodes({node_list}).log" def delete_logs_on_node(hostname): log_files = [GRANDMASTER_LOG, SLAVE_LOG, MASTER_LOG, PHC2SYS_LOG_1, PHC2SYS_LOG_2, BOUNDARY_LOG] delete_cmd = "sudo rm -f " + " ".join(log_files) print(f"[{hostname}] Running: {delete_cmd}") try: result = subprocess.run( ["ssh", hostname, delete_cmd], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) print(f"[{hostname}] Log files deleted.") except subprocess.CalledProcessError as e: print(f"[{hostname}] Failed to delete log files.") print(f"STDERR:\n{e.stderr}") def start_ptp_on_node(node_id, nodes): # New version: generate boundary clock by two ptp4l processes hostname = get_hostname(node_id) min_node = min(nodes) max_node = max(nodes) ETHTOOL_CMD = "ethtool -K eth0 gro off lro off" print(f"[{hostname}] ethtools tuning: {ETHTOOL_CMD}") subprocess.run(["ssh", hostname, ETHTOOL_CMD], check=True) # TUNING = "-R 4 -N 5 -P 0.3 -I 0.05" TUNING = " " delete_logs_on_node(hostname) # FIRST NODE if node_id == min_node: # Grandmaster node config_gm = load_ptp_config(os.path.join(PTP_CONFIG_DIR, "ptp4l_grandmaster.json")) # slave to close loop config_slave = load_ptp_config(os.path.join(PTP_CONFIG_DIR, "ptp4l_bc_slave.json")) # Push configs to node write_ptp_config_to_node(hostname, "grandmaster", config_gm) write_ptp_config_to_node(hostname, "slave", config_slave) # Start Grandmaster PTP4l cmd_grandmaster = f"{PTP4L_INSTANCE} -i eth2 -m -H -f {PTP_REMOTE_CONFIG_PATH}{GM_FILE} > {GRANDMASTER_LOG} 2>&1 &" print(f"[{hostname}] Starting ptp4l (Grandmaster): {cmd_grandmaster}") subprocess.run(["ssh", hostname, cmd_grandmaster], check=True) # -------------- NO REALTIME_CLOCK ----------------------- # cmd_phc2sys_1 = f"phc2sys -s eth2 -c eth0 -O 0 -m {TUNING} --step_threshold=1 > {PHC2SYS_LOG_1} 2>&1 &" print(f"[{hostname}] Starting ptp4l (Grandmaster): {cmd_phc2sys_1}") subprocess.run(["ssh", hostname, cmd_phc2sys_1], check=True) # -------------------------------------------------------- # # # Start phc2sys from outgoing interface to clock_realtime # # and wait for sync (jump when above 1 second) # cmd_phc2sys_1 = f"phc2sys -s eth2 -c CLOCK_REALTIME -O 0 -m {TUNING} --step_threshold=1 > {PHC2SYS_LOG_1} 2>&1 &" # print(f"[{hostname}] Starting ptp4l (Grandmaster): {cmd_phc2sys_1}") # subprocess.run(["ssh", hostname, cmd_phc2sys_1], check=True) # # Start phc2sys from clock_realtime to test port (eth0) # # and wait for sync (jump when above 1 second) # cmd_phc2sys_2 = f"phc2sys -s CLOCK_REALTIME -c eth0 -O 0 -m {TUNING} --step_threshold=1 > {PHC2SYS_LOG_2} 2>&1 &" # print(f"[{hostname}] Starting ptp4l (Grandmaster): {cmd_phc2sys_2}") # subprocess.run(["ssh", hostname, cmd_phc2sys_2], check=True) # Build full command chain cmd_slave = f"{PTP4L_INSTANCE} -i eth1 -m -H -f {PTP_REMOTE_CONFIG_PATH}{SV_FILE} > {SLAVE_LOG} 2>&1 &" print(f"[{hostname}] Starting ptp4l slave (Grandmaster): {cmd_slave}") subprocess.run(["ssh", hostname, cmd_slave], check=True) # LAST NODE elif node_id == max_node: # Last node (slave only) config_slave = load_ptp_config(os.path.join(PTP_CONFIG_DIR, "ptp4l_bc_slave.json")) # Master to close loop config_master = load_ptp_config(os.path.join(PTP_CONFIG_DIR, "ptp4l_bc_master.json")) write_ptp_config_to_node(hostname, "slave", config_slave) write_ptp_config_to_node(hostname, "master", config_master) # Start incoming PTP4l (eht1) cmd_slave = f"{PTP4L_INSTANCE} -i eth1 -m -H -f {PTP_REMOTE_CONFIG_PATH}{SV_FILE} > {SLAVE_LOG} 2>&1 &" print(f"[{hostname}] Starting ptp4l slave (Last node in chain): {cmd_slave}") subprocess.run(["ssh", hostname, cmd_slave], check=True) # -------------- NO REALTIME_CLOCK ----------------------- # cmd_phc2sys_1 = f"phc2sys -s eth1 -c eth0 -c eth2 -O 0 -m {TUNING} --step_threshold=1 > {PHC2SYS_LOG_1} 2>&1 &" print(f"[{hostname}] Starting ptp4l (Last node in chain): {cmd_phc2sys_1}") subprocess.run(["ssh", hostname, cmd_phc2sys_1], check=True) # -------------------------------------------------------- # # # Start phc2sys from incoming interface (eth1) to clock_realtime # # and wait for sync (jump when above 1 second) # cmd_phc2sys_1 = f"phc2sys -s eth1 -c CLOCK_REALTIME -O 0 -m -w {TUNING} --step_threshold=1 > {PHC2SYS_LOG_1} 2>&1 &" # print(f"[{hostname}] Starting phc2sys1 (Last node in chain): {cmd_phc2sys_1}") # subprocess.run(["ssh", hostname, cmd_phc2sys_1], check=True) # # Start phc2sys from clock_realtime to interfaces eth0 and eth2 # # (jump when above 1 second) # cmd_phc2sys_2 = f"phc2sys -s CLOCK_REALTIME -c eth0 -c eth2 -O 0 -m {TUNING} --step_threshold=1 > {PHC2SYS_LOG_2} 2>&1 &" # print(f"[{hostname}] Starting phc2sys2 (Last node in chain): {cmd_phc2sys_2}") # subprocess.run(["ssh", hostname, cmd_phc2sys_2], check=True) # Master to close loop cmd_master = f"{PTP4L_INSTANCE} -i eth2 -m -H -f {PTP_REMOTE_CONFIG_PATH}{MA_FILE} > {MASTER_LOG} 2>&1 &" print(f"[{hostname}] Starting ptp4l master (Last node in chain): {cmd_master}") subprocess.run(["ssh", hostname, cmd_master], check=True) # INTERMEDIATE NODES else: # Intermediate nodes: ptp4l (slave) + phc2sys + ptp4l (master) config_slave = load_ptp_config(os.path.join(PTP_CONFIG_DIR, "ptp4l_bc_slave.json")) config_master = load_ptp_config(os.path.join(PTP_CONFIG_DIR, "ptp4l_bc_master.json")) # Push configs to node write_ptp_config_to_node(hostname, "slave", config_slave) write_ptp_config_to_node(hostname, "master",config_master) # Build full command chain cmd_slave = f"{PTP4L_INSTANCE} -i eth1 -m -H -f {PTP_REMOTE_CONFIG_PATH}{SV_FILE} > {SLAVE_LOG} 2>&1 &" print(f"[{hostname}] Starting ptp4l slave (Intermediate node): {cmd_slave}") subprocess.run(["ssh", hostname, cmd_slave], check=True) # -------------- NO REALTIME_CLOCK ----------------------- # cmd_phc2sys_1 = f"phc2sys -s eth1 -c eth0 -c eth2 -O 0 -m {TUNING} --step_threshold=1 > {PHC2SYS_LOG_1} 2>&1 &" print(f"[{hostname}] Starting ptp4l (Intermediate node): {cmd_phc2sys_1}") subprocess.run(["ssh", hostname, cmd_phc2sys_1], check=True) # -------------------------------------------------------- # # # Start phc2sys from incoming interface (eth1) to clock_realtime # # and wait for sync (jump when above 1 second) # cmd_phc2sys_1 = f"phc2sys -s eth1 -c CLOCK_REALTIME -O 0 -m -w {TUNING} --step_threshold=1 > {PHC2SYS_LOG_1} 2>&1 &" # print(f"[{hostname}] Starting phc2sys1 (Last node in chain): {cmd_phc2sys_1}") # subprocess.run(["ssh", hostname, cmd_phc2sys_1], check=True) # # Start phc2sys from clock_realtime to interfaces eth0 and eth2 # # (jump when above 1 second) # cmd_phc2sys_2 = f"phc2sys -s CLOCK_REALTIME -c eth0 -c eth2 -O 0 -m {TUNING} --step_threshold=1 > {PHC2SYS_LOG_2} 2>&1 &" # print(f"[{hostname}] Starting phc2sys2 (Last node in chain): {cmd_phc2sys_2}") # subprocess.run(["ssh", hostname, cmd_phc2sys_2], check=True) # Master for next node cmd_master = f"{PTP4L_INSTANCE} -i eth2 -m -H -f {PTP_REMOTE_CONFIG_PATH}{MA_FILE} > {MASTER_LOG} 2>&1 &" print(f"[{hostname}] Starting ptp4l master (Intermediate node): {cmd_master}") subprocess.run(["ssh", hostname, cmd_master], check=True) def is_ptp_running(hostname): try: subprocess.run( ["ssh", hostname, "pgrep -x ptp4l > /dev/null"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) return True except subprocess.CalledProcessError: return False def wait_for_all_ptp_processes(nodes, retries=10, delay=2): node_hostnames = [get_hostname(n) for n in nodes] print("Checking ptp4l processes on all nodes...") for attempt in range(1, retries + 1): not_running = [] for hostname in node_hostnames: if not is_ptp_running(hostname): not_running.append(hostname) if not not_running: print("All ptp4l processes are running.") return True else: print(f"Attempt {attempt}/{retries}: Waiting for ptp4l on: {', '.join(not_running)}") time.sleep(delay) print("Error: Some ptp4l processes did not start in time.") return False def collect_logs(nodes, base_output_dir, duration): # Use your function to create the folder name experiment_folder_name = generate_output_filename(nodes, duration) experiment_dir = os.path.join(base_output_dir, experiment_folder_name) os.makedirs(experiment_dir, exist_ok=True) for node_id in nodes: hostname = get_hostname(node_id) print(f"[{hostname}] Fetching logs...") # Role-based source selection if node_id == min(nodes): log_sources = [ ("ptp4l_grandmaster", GRANDMASTER_LOG), ("phc2sys_1", PHC2SYS_LOG_1), ("phc2sys_2", PHC2SYS_LOG_2), ("ptp4l_slave", SLAVE_LOG) ] elif node_id == max(nodes): log_sources = [ ("ptp4l_master", MASTER_LOG), ("phc2sys_1", PHC2SYS_LOG_1), ("phc2sys_2", PHC2SYS_LOG_2), ("ptp4l_slave", SLAVE_LOG) ] else: log_sources = [ ("ptp4l_slave", SLAVE_LOG), ("phc2sys_1", PHC2SYS_LOG_1), ("phc2sys_2", PHC2SYS_LOG_2), ("ptp4l_master", MASTER_LOG) ] # Create node subfolder node_dir = os.path.join(experiment_dir, hostname) os.makedirs(node_dir, exist_ok=True) # Fetch timestamp logs tslog_dir = os.path.join(node_dir, "tslogs") os.makedirs(tslog_dir, exist_ok=True) remote_cmd = "ls -td /tmp/tslog_* | head -n1" result = subprocess.run(["ssh", hostname, remote_cmd], capture_output=True, text=True) folder = result.stdout.strip() if folder: subprocess.run(["scp", f"{hostname}:{folder}/*.csv", tslog_dir]) else: print(f"[WARN] No tslog folder found on {hostname}") # Fetch ptp4l logs for log_label, log_path in log_sources: try: result = subprocess.run( ["ssh", hostname, f'bash -c "sync; sleep 1; cat \\"{log_path}\\""'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) cleaned = result.stdout.replace('\x00', '') # Write CSV log_file_path = os.path.join(node_dir, f"{log_label}.csv") with open(log_file_path, "w") as f: f.write(cleaned) except subprocess.CalledProcessError as e: print(f"ERROR: Failed to fetch {log_label} log from {hostname}: {e.stderr}") error_path = os.path.join(node_dir, f"{log_label}_ERROR.txt") with open(error_path, "w") as f: f.write(e.stderr + "\n") def stop_ptp_rbm_on_nodes(nodes): print("Stopping ptp4l on all nodes...") for node_id in nodes: hostname = get_hostname(node_id) try: subprocess.run(["ssh", hostname, "pkill -f ptp4l"], check=True) subprocess.run(["ssh", hostname, "sync"], check=True) print(f"[{hostname}] ptp4l stopped and logs flushed.") except subprocess.CalledProcessError: print(f"[{hostname}] ptp4l was not running or could not be stopped.") try: subprocess.run(["ssh", hostname, "pkill -f phc2sys"], check=True) subprocess.run(["ssh", hostname, "sync"], check=True) print(f"[{hostname}] phc2sys stopped and logs flushed.") except subprocess.CalledProcessError: print(f"[{hostname}] phc2sys was not running or could not be stopped.") try: subprocess.run(["ssh", hostname, "sudo timedatectl set-ntp false"], check=True) subprocess.run(["ssh", hostname, "sync"], check=True) print(f"[{hostname}] timedatectl stopped and logs flushed.") except subprocess.CalledProcessError: print(f"[{hostname}] timedatectl was not running or could not be stopped.") try: subprocess.run(["ssh", hostname, "sudo systemctl disable --now systemd-timesyncd 2>/dev/null || true"], check=True) subprocess.run(["ssh", hostname, "sync"], check=True) print(f"[{hostname}] systemd-timesyncd stopped and logs flushed.") except subprocess.CalledProcessError: print(f"[{hostname}] systemd-timesyncd was not running or could not be stopped.") try: subprocess.run(["ssh", hostname, "pkill -f receiver_hwts_logger"]) subprocess.run(["ssh", hostname, "sync"], check=True) print(f"[{hostname}] receiver_hwts_logger stopped and logs flushed.") except subprocess.CalledProcessError: print(f"[{hostname}] receiver_hwts_logger was not running or could not be stopped.") def remove_tslogs(nodes): print("Removing old tslogs on all nodes...") for node_id in nodes: hostname = get_hostname(node_id) try: subprocess.run(["ssh", hostname, "rm -r /tmp/tslog*"]) subprocess.run(["ssh", hostname, "sync"], check=True) print(f"[{hostname}] remmoved tslogs.") except subprocess.CalledProcessError: print(f"[{hostname}] could not remmove tslogs.") def start_receivers(nodes): for node_id in nodes: hostname = get_hostname(node_id) cmd = f"sudo taskset -c 1 chrt -f 90 {RECEIVER_PATH} eth0 > /dev/null 2>&1 &" subprocess.run(["ssh", hostname, cmd], check=True) def wait_for_receivers(nodes, timeout=10): for node_id in nodes: hostname = get_hostname(node_id) for _ in range(timeout): result = subprocess.run([ "ssh", hostname, "pgrep -f receiver_hwts_logger > /dev/null" ]) if result.returncode == 0: break time.sleep(1) else: print(f"ERROR: receiver not running on {hostname}") sys.exit(1) def start_broadcaster(): cmd = f"sudo taskset -c 1 chrt -f 90 {BROADCASTER_PATH} eth0 1000 > /dev/null 2>&1 & echo $!" proc = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, text=True) return int(proc.stdout.strip()) def main(): if len(sys.argv) < 3: usage() try: duration = int(sys.argv[1]) nodes = [int(n) for n in sys.argv[2:]] except ValueError: usage() # STOPPING EVERYTHING print(f"Starting PTP loop experiment with nodes: {nodes}") print(f"Test duration: {duration} seconds") print(f"Killing all PTP and Reference Broadcast Measurement processes on nodes...") stop_ptp_rbm_on_nodes(nodes) # Remove old timestamps remove_tslogs(nodes) # Start reference broadcast measuring start_receivers(nodes) wait_for_receivers(nodes) broadcaster_pid = start_broadcaster() # Start ptp infrastructure for index, node_id in enumerate(nodes): start_ptp_on_node(node_id, nodes) if not wait_for_all_ptp_processes(nodes): print("Aborting experiment due to startup failure.") stop_ptp_rbm_on_nodes(nodes) sys.exit(1) print(f"Running loop experiment for {duration} seconds...") time.sleep(duration) log_filename = generate_output_filename(nodes, duration) output_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), log_filename) print("Stopping PTP processes on all nodes...") stop_ptp_rbm_on_nodes(nodes) print("Stopping local reference broadcaster") subprocess.run(["sudo", "pkill", "broadcaster"]) print(f"Collecting logs into: {output_path}") # collect_logs(nodes, output_path) collect_logs( nodes=nodes, base_output_dir="eth_e2e/logs", duration=duration # experiment duration in seconds ) print("Experiment complete. Logs saved.") if __name__ == "__main__": main()