#!/usr/bin/env python3 import os import re import sys import time import datetime import subprocess import json # PTP_CONF = "/opt/ptp_conf/ptp4l_eth_line_top.conf" PTP_REMOTE_CONFIG_PATH = "/opt/ptp_conf/" GM_FILE = "ptp4l_grandmaster.conf" SV_FILE = "ptp4l_slave.conf" MA_FILE = "ptp4l_master.conf" BC_FILE = "ptp4l_boundary.conf" PTP4L_INSTANCE = "ptp4l" # PTP4L_INSTANCE = "/home/apu/wifi-ptp/ptp/ptp4l" GRANDMASTER_LOG = "/tmp/ptp4l_grandmaster.log" SLAVE_LOG = "/tmp/ptp4l_slave.log" MASTER_LOG = "/tmp/ptp4l_master.log" BOUNDARY_LOG = "/tmp/ptp4l_boundary.log" PHC2SYS_LOG_1 = "/tmp/phc2sys_1.log" PHC2SYS_LOG_2 = "/tmp/phc2sys_2.log" PHC2SYS_LOG_3 = "/tmp/phc2sys_3.log" RECEIVER_PATH = "/opt/timestamping/receiver_hwts_logger" BROADCASTER_PATH = "/home/apu/testbed_files/experiments/reference_broadcast/broadcaster" PTP_CONFIG_DIR = "ptp_config/eth" EXPERIMENT_NAME = "ptp_loop_refbc" def load_ptp_config(filepath): with open(filepath, "r") as f: data = json.load(f) lines = [] for section, options in data.items(): lines.append(f"[{section}]") for key, value in options.items(): lines.append(f"{key} {value}") return "\n".join(lines) def usage(): print("Usage: run_ptp_experiment.py ") print("Example: run_ptp_experiment.py 60 0 1 2 3 4") sys.exit(1) def get_hostname(node_id): return f"apu{str(node_id).zfill(2)}" def write_ptp_config_to_node(hostname, role, conf_str): if role == "grandmaster": file_name = GM_FILE elif role == "slave": file_name = SV_FILE elif role == "master": file_name = MA_FILE elif role == "boundary": file_name = BC_FILE else: raise ValueError("Invalid role. Must be 'grandmaster' or 'slave'.") try: subprocess.run( ["ssh", hostname, f"echo '{conf_str}' | sudo tee '{PTP_REMOTE_CONFIG_PATH}/{file_name}' > /dev/null"], shell=False, check=True ) print(f"[{hostname}] Wrote ptp4l config.") except subprocess.CalledProcessError as e: print(f"ERROR: Failed to write ptp4l.conf to {hostname}: {e}") def generate_output_filename(nodes, duration): timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") node_list = "-".join(str(n) for n in nodes) return f"{timestamp}_loop_ptp4l_{duration}s_nodes({node_list}).log" def delete_logs_on_node(hostname): log_files = [GRANDMASTER_LOG, SLAVE_LOG, MASTER_LOG, PHC2SYS_LOG_1, PHC2SYS_LOG_2, BOUNDARY_LOG] delete_cmd = "sudo rm -f " + " ".join(log_files) print(f"[{hostname}] Running: {delete_cmd}") try: result = subprocess.run( ["ssh", hostname, delete_cmd], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) print(f"[{hostname}] Log files deleted.") except subprocess.CalledProcessError as e: print(f"[{hostname}] Failed to delete log files.") print(f"STDERR:\n{e.stderr}") def start_ptp_on_node(node_id, nodes): # New version: generate boundary clock by two ptp4l processes # ---- configurable knobs for the GM84 special case ---- GM84_ID = 84 GM84_HOST = get_hostname(GM84_ID) # override with IP if DNS for apu84 is missing, e.g. "192.168.0.110" hostname = get_hostname(node_id) # min_node = min(nodes) # max_node = max(nodes) min_node = nodes[0] max_node = nodes[-1] single_node = (len(nodes) == 1) ETHTOOL_CMD = "ethtool -K eth0 gro off lro off" print(f"[{hostname}] ethtools tuning: {ETHTOOL_CMD}") subprocess.run(["ssh", hostname, ETHTOOL_CMD], check=True) # TUNING = "-R 4 -N 5 -P 0.3 -I 0.05" TUNING = " " delete_logs_on_node(hostname) # ========================= # SPECIAL CASE: apu84 (GM) # ========================= if node_id == GM84_ID: # Use fixed SSH target for apu84 gm_host = GM84_HOST # Push configurations config_gm = load_ptp_config(os.path.join(PTP_CONFIG_DIR, "ptp4l_grandmaster.json")) config_slave = load_ptp_config(os.path.join(PTP_CONFIG_DIR, "ptp4l_bc_slave.json")) write_ptp_config_to_node(gm_host, "grandmaster", config_gm) write_ptp_config_to_node(gm_host, "slave", config_slave) # ptp4l: GM on eth2 cmd_gm = f"{PTP4L_INSTANCE} -i eth2 -m -H -f {PTP_REMOTE_CONFIG_PATH}{GM_FILE} > {GRANDMASTER_LOG} 2>&1 &" print(f"[apu84@{gm_host}] ptp4l GM (eth2): {cmd_gm}") subprocess.run(["ssh", gm_host, cmd_gm], check=True) # ptp4l: slave on eth1 cmd_sv = f"{PTP4L_INSTANCE} -i eth1 -m -H -f {PTP_REMOTE_CONFIG_PATH}{SV_FILE} > {SLAVE_LOG} 2>&1 &" print(f"[apu84@{gm_host}] ptp4l SLAVE (eth1): {cmd_sv}") subprocess.run(["ssh", gm_host, cmd_sv], check=True) # phc2sys mirrors (measurement PHCs) # outgoing path clock -> meas A cmd_pxc_a = f"phc2sys -s eth2 -c eth0 -O 0 -m {TUNING} -w --step_threshold=1 > {PHC2SYS_LOG_1} 2>&1 &" print(f"[apu84@{gm_host}] phc2sys (eth2 -> eth0): {cmd_pxc_a}") subprocess.run(["ssh", gm_host, cmd_pxc_a], check=True) # incoming path clock -> meas B cmd_pxc_b = f"phc2sys -s eth1 -c eth3 -O 0 -m {TUNING} -w --step_threshold=1 > {PHC2SYS_LOG_2} 2>&1 &" print(f"[apu84@{gm_host}] phc2sys (eth1 -> eth3): {cmd_pxc_b}") subprocess.run(["ssh", gm_host, cmd_pxc_b], check=True) # NOTE: receivers are started elsewhere by start_receivers(); nothing to do here. return # --- SINGLE NODE: eth2 directly connected to eth1 --- if single_node: # Grandmaster on eth2, Slave on eth1, phc2sys: eth2 -> eth0 config_gm = load_ptp_config(os.path.join(PTP_CONFIG_DIR, "ptp4l_grandmaster.json")) config_slave = load_ptp_config(os.path.join(PTP_CONFIG_DIR, "ptp4l_bc_slave.json")) write_ptp_config_to_node(hostname, "grandmaster", config_gm) write_ptp_config_to_node(hostname, "slave", config_slave) cmd_grandmaster = f"{PTP4L_INSTANCE} -i eth2 -m -H -f {PTP_REMOTE_CONFIG_PATH}{GM_FILE} > {GRANDMASTER_LOG} 2>&1 &" print(f"[{hostname}] Starting ptp4l (Grandmaster, single-node): {cmd_grandmaster}") subprocess.run(["ssh", hostname, cmd_grandmaster], check=True) # keep your 'no CLOCK_REALTIME' approach: sync GM port clock into eth0 cmd_phc2sys_1 = f"phc2sys -s eth2 -c eth0 -O 0 -m {TUNING} --step_threshold=1 > {PHC2SYS_LOG_1} 2>&1 &" print(f"[{hostname}] Starting phc2sys (eth2 -> eth0, single-node): {cmd_phc2sys_1}") subprocess.run(["ssh", hostname, cmd_phc2sys_1], check=True) cmd_slave = f"{PTP4L_INSTANCE} -i eth1 -m -H -f {PTP_REMOTE_CONFIG_PATH}{SV_FILE} > {SLAVE_LOG} 2>&1 &" print(f"[{hostname}] Starting ptp4l (Slave on eth1, single-node): {cmd_slave}") subprocess.run(["ssh", hostname, cmd_slave], check=True) return # done for single-node # FIRST NODE if node_id == min_node: # Grandmaster node config_gm = load_ptp_config(os.path.join(PTP_CONFIG_DIR, "ptp4l_grandmaster.json")) # slave to close loop config_slave = load_ptp_config(os.path.join(PTP_CONFIG_DIR, "ptp4l_bc_slave.json")) # Push configs to node write_ptp_config_to_node(hostname, "grandmaster", config_gm) write_ptp_config_to_node(hostname, "slave", config_slave) # Start Grandmaster PTP4l cmd_grandmaster = f"{PTP4L_INSTANCE} -i eth2 -m -H -f {PTP_REMOTE_CONFIG_PATH}{GM_FILE} > {GRANDMASTER_LOG} 2>&1 &" print(f"[{hostname}] Starting ptp4l (Grandmaster): {cmd_grandmaster}") subprocess.run(["ssh", hostname, cmd_grandmaster], check=True) # -------------- NO REALTIME_CLOCK ----------------------- # cmd_phc2sys_1 = f"phc2sys -s eth2 -c eth0 -O 0 -m {TUNING} --step_threshold=1 > {PHC2SYS_LOG_1} 2>&1 &" print(f"[{hostname}] Starting ptp4l (Grandmaster): {cmd_phc2sys_1}") subprocess.run(["ssh", hostname, cmd_phc2sys_1], check=True) # -------------------------------------------------------- # # # -------------- NO REALTIME_CLOCK ----------------------- # # cmd_phc2sys_2 = f"phc2sys -s eth1 -c CLOCK_REALTIME -O 0 -m {TUNING} --step_threshold=1 > {PHC2SYS_LOG_1} 2>&1 &" # print(f"[{hostname}] Starting ptp4l (Grandmaster): {cmd_phc2sys_2}") # subprocess.run(["ssh", hostname, cmd_phc2sys_2], check=True) # # -------------------------------------------------------- # # # Start phc2sys from outgoing interface to clock_realtime # # and wait for sync (jump when above 1 second) # cmd_phc2sys_1 = f"phc2sys -s eth2 -c CLOCK_REALTIME -O 0 -m {TUNING} --step_threshold=1 > {PHC2SYS_LOG_1} 2>&1 &" # print(f"[{hostname}] Starting ptp4l (Grandmaster): {cmd_phc2sys_1}") # subprocess.run(["ssh", hostname, cmd_phc2sys_1], check=True) # # Start phc2sys from clock_realtime to test port (eth0) # # and wait for sync (jump when above 1 second) # cmd_phc2sys_2 = f"phc2sys -s CLOCK_REALTIME -c eth0 -O 0 -m {TUNING} --step_threshold=1 > {PHC2SYS_LOG_2} 2>&1 &" # print(f"[{hostname}] Starting ptp4l (Grandmaster): {cmd_phc2sys_2}") # subprocess.run(["ssh", hostname, cmd_phc2sys_2], check=True) # Build full command chain cmd_slave = f"{PTP4L_INSTANCE} -i eth1 -m -H -f {PTP_REMOTE_CONFIG_PATH}{SV_FILE} > {SLAVE_LOG} 2>&1 &" print(f"[{hostname}] Starting ptp4l slave (Grandmaster): {cmd_slave}") subprocess.run(["ssh", hostname, cmd_slave], check=True) # LAST NODE elif node_id == max_node: # Last node (slave only) config_slave = load_ptp_config(os.path.join(PTP_CONFIG_DIR, "ptp4l_bc_slave.json")) # Master to close loop config_master = load_ptp_config(os.path.join(PTP_CONFIG_DIR, "ptp4l_bc_master.json")) write_ptp_config_to_node(hostname, "slave", config_slave) write_ptp_config_to_node(hostname, "master", config_master) # Start incoming PTP4l (eht1) cmd_slave = f"{PTP4L_INSTANCE} -i eth1 -m -H -f {PTP_REMOTE_CONFIG_PATH}{SV_FILE} > {SLAVE_LOG} 2>&1 &" print(f"[{hostname}] Starting ptp4l slave (Last node in chain): {cmd_slave}") subprocess.run(["ssh", hostname, cmd_slave], check=True) # -------------- NO REALTIME_CLOCK ----------------------- # cmd_phc2sys_1 = f"phc2sys -s eth1 -c eth0 -c eth2 -O 0 -m {TUNING} -w --step_threshold=1 > {PHC2SYS_LOG_1} 2>&1 &" print(f"[{hostname}] Starting ptp4l (Last node in chain): {cmd_phc2sys_1}") subprocess.run(["ssh", hostname, cmd_phc2sys_1], check=True) # -------------------------------------------------------- # # # Start phc2sys from incoming interface (eth1) to clock_realtime # # and wait for sync (jump when above 1 second) # cmd_phc2sys_1 = f"phc2sys -s eth1 -c CLOCK_REALTIME -O 0 -m -w {TUNING} --step_threshold=1 > {PHC2SYS_LOG_1} 2>&1 &" # print(f"[{hostname}] Starting phc2sys1 (Last node in chain): {cmd_phc2sys_1}") # subprocess.run(["ssh", hostname, cmd_phc2sys_1], check=True) # # Start phc2sys from clock_realtime to interfaces eth0 and eth2 # # (jump when above 1 second) # cmd_phc2sys_2 = f"phc2sys -s CLOCK_REALTIME -c eth0 -c eth2 -O 0 -m {TUNING} --step_threshold=1 > {PHC2SYS_LOG_2} 2>&1 &" # print(f"[{hostname}] Starting phc2sys2 (Last node in chain): {cmd_phc2sys_2}") # subprocess.run(["ssh", hostname, cmd_phc2sys_2], check=True) # Master to close loop cmd_master = f"{PTP4L_INSTANCE} -i eth2 -m -H -f {PTP_REMOTE_CONFIG_PATH}{MA_FILE} > {MASTER_LOG} 2>&1 &" print(f"[{hostname}] Starting ptp4l master (Last node in chain): {cmd_master}") subprocess.run(["ssh", hostname, cmd_master], check=True) # INTERMEDIATE NODES else: # Intermediate nodes: ptp4l (slave) + phc2sys + ptp4l (master) config_slave = load_ptp_config(os.path.join(PTP_CONFIG_DIR, "ptp4l_bc_slave.json")) config_master = load_ptp_config(os.path.join(PTP_CONFIG_DIR, "ptp4l_bc_master.json")) # Push configs to node write_ptp_config_to_node(hostname, "slave", config_slave) write_ptp_config_to_node(hostname, "master",config_master) # Build full command chain cmd_slave = f"{PTP4L_INSTANCE} -i eth1 -m -H -f {PTP_REMOTE_CONFIG_PATH}{SV_FILE} > {SLAVE_LOG} 2>&1 &" print(f"[{hostname}] Starting ptp4l slave (Intermediate node): {cmd_slave}") subprocess.run(["ssh", hostname, cmd_slave], check=True) # -------------- NO REALTIME_CLOCK ----------------------- # cmd_phc2sys_1 = f"phc2sys -s eth1 -c eth0 -c eth2 -O 0 -m {TUNING} -w --step_threshold=1 > {PHC2SYS_LOG_1} 2>&1 &" print(f"[{hostname}] Starting ptp4l (Intermediate node): {cmd_phc2sys_1}") subprocess.run(["ssh", hostname, cmd_phc2sys_1], check=True) # -------------------------------------------------------- # # # Start phc2sys from incoming interface (eth1) to clock_realtime # # and wait for sync (jump when above 1 second) # cmd_phc2sys_1 = f"phc2sys -s eth1 -c CLOCK_REALTIME -O 0 -m -w {TUNING} --step_threshold=1 > {PHC2SYS_LOG_1} 2>&1 &" # print(f"[{hostname}] Starting phc2sys1 (Last node in chain): {cmd_phc2sys_1}") # subprocess.run(["ssh", hostname, cmd_phc2sys_1], check=True) # # Start phc2sys from clock_realtime to interfaces eth0 and eth2 # # (jump when above 1 second) # cmd_phc2sys_2 = f"phc2sys -s CLOCK_REALTIME -c eth0 -c eth2 -O 0 -m {TUNING} --step_threshold=1 > {PHC2SYS_LOG_2} 2>&1 &" # print(f"[{hostname}] Starting phc2sys2 (Last node in chain): {cmd_phc2sys_2}") # subprocess.run(["ssh", hostname, cmd_phc2sys_2], check=True) # Master for next node cmd_master = f"{PTP4L_INSTANCE} -i eth2 -m -H -f {PTP_REMOTE_CONFIG_PATH}{MA_FILE} > {MASTER_LOG} 2>&1 &" print(f"[{hostname}] Starting ptp4l master (Intermediate node): {cmd_master}") subprocess.run(["ssh", hostname, cmd_master], check=True) def is_ptp_running(hostname): try: subprocess.run( ["ssh", hostname, "pgrep -x ptp4l > /dev/null"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) return True except subprocess.CalledProcessError: return False def wait_for_all_ptp_processes(nodes, retries=10, delay=2): node_hostnames = [get_hostname(n) for n in nodes] print("Checking ptp4l processes on all nodes...") for attempt in range(1, retries + 1): not_running = [] for hostname in node_hostnames: if not is_ptp_running(hostname): not_running.append(hostname) if not not_running: print("All ptp4l processes are running.") return True else: print(f"Attempt {attempt}/{retries}: Waiting for ptp4l on: {', '.join(not_running)}") time.sleep(delay) print("Error: Some ptp4l processes did not start in time.") return False # def collect_logs(nodes, base_output_dir, duration): # # Use your function to create the folder name # experiment_folder_name = generate_output_filename(nodes, duration) # experiment_dir = os.path.join(base_output_dir, experiment_folder_name) # os.makedirs(experiment_dir, exist_ok=True) # for node_id in nodes: # hostname = get_hostname(node_id) # print(f"[{hostname}] Fetching logs...") # # Role-based source selection # if node_id == min(nodes): # log_sources = [ # ("ptp4l_grandmaster", GRANDMASTER_LOG), # ("phc2sys_1", PHC2SYS_LOG_1), # ("phc2sys_2", PHC2SYS_LOG_2), # ("ptp4l_slave", SLAVE_LOG) # ] # elif node_id == max(nodes): # log_sources = [ # ("ptp4l_master", MASTER_LOG), # ("phc2sys_1", PHC2SYS_LOG_1), # ("phc2sys_2", PHC2SYS_LOG_2), # ("ptp4l_slave", SLAVE_LOG) # ] # else: # log_sources = [ # ("ptp4l_slave", SLAVE_LOG), # ("phc2sys_1", PHC2SYS_LOG_1), # ("phc2sys_2", PHC2SYS_LOG_2), # ("ptp4l_master", MASTER_LOG) # ] # # Create node subfolder # node_dir = os.path.join(experiment_dir, hostname) # os.makedirs(node_dir, exist_ok=True) # # Fetch timestamp logs # tslog_dir = os.path.join(node_dir, "tslogs") # os.makedirs(tslog_dir, exist_ok=True) # remote_cmd = "ls -td /tmp/tslog_* | head -n1" # result = subprocess.run(["ssh", hostname, remote_cmd], capture_output=True, text=True) # folder = result.stdout.strip() # if folder: # subprocess.run(["scp", f"{hostname}:{folder}/*.csv", tslog_dir]) # else: # print(f"[WARN] No tslog folder found on {hostname}") # # Fetch ptp4l logs # for log_label, log_path in log_sources: # try: # result = subprocess.run( # ["ssh", hostname, f'bash -c "sync; sleep 1; cat \\"{log_path}\\""'], # check=True, # stdout=subprocess.PIPE, # stderr=subprocess.PIPE, # text=True # ) # cleaned = result.stdout.replace('\x00', '') # # Write CSV # log_file_path = os.path.join(node_dir, f"{log_label}.csv") # with open(log_file_path, "w") as f: # f.write(cleaned) # except subprocess.CalledProcessError as e: # print(f"ERROR: Failed to fetch {log_label} log from {hostname}: {e.stderr}") # error_path = os.path.join(node_dir, f"{log_label}_ERROR.txt") # with open(error_path, "w") as f: # f.write(e.stderr + "\n") def collect_logs(nodes, base_output_dir, duration): """ Collect receiver tslogs and ptp/phc2sys logs from all nodes. - Experiment directory name is generated by generate_output_filename(nodes, duration). - For apu84 (node_id==84), SSH/scp uses fixed IP 192.168.0.110. - tslogs: copy *.csv from the two newest /tmp/tslog_* folders on the node, merging into a single local tslogs/ dir. Newer folder wins on filename clashes. """ import os import subprocess # Prepare experiment directory experiment_folder_name = generate_output_filename(nodes, duration) experiment_dir = os.path.join(base_output_dir, experiment_folder_name) os.makedirs(experiment_dir, exist_ok=True) first_id, last_id = nodes[0], nodes[-1] for node_id in nodes: host = get_hostname(node_id) node_label = get_hostname(node_id) # folder naming; safe on controller print(f"[{host}] Fetching logs...") # Role-based source selection (unchanged) if node_id == first_id: log_sources = [ ("ptp4l_grandmaster", GRANDMASTER_LOG), ("phc2sys_1", PHC2SYS_LOG_1), ("phc2sys_2", PHC2SYS_LOG_2), ("ptp4l_slave", SLAVE_LOG), ] elif node_id == last_id: log_sources = [ ("ptp4l_master", MASTER_LOG), ("phc2sys_1", PHC2SYS_LOG_1), # ("phc2sys_2", PHC2SYS_LOG_2), ("ptp4l_slave", SLAVE_LOG), ] else: log_sources = [ ("ptp4l_slave", SLAVE_LOG), ("phc2sys_1", PHC2SYS_LOG_1), # ("phc2sys_2", PHC2SYS_LOG_2), ("ptp4l_master", MASTER_LOG), ] # Create node subfolder node_dir = os.path.join(experiment_dir, node_label) os.makedirs(node_dir, exist_ok=True) # === Fetch tslogs (two newest folders, merge unique filenames) === tslog_dir = os.path.join(node_dir, "tslogs") os.makedirs(tslog_dir, exist_ok=True) # Grab paths of the two newest /tmp/tslog_* folders (if any) res = subprocess.run( ["ssh", host, r'sh -lc "ls -td /tmp/tslog_* 2>/dev/null | head -n2"'], capture_output=True, text=True ) folders = [line.strip() for line in res.stdout.splitlines() if line.strip()] copied = set() # basenames already copied (prefer newer folder first) # Iterate folders in listed order: ls -t prints newest first for fld in folders: # List CSV files in that folder lsres = subprocess.run( ["ssh", host, f'sh -lc \'ls -1 "{fld}"/*.csv 2>/dev/null\''], capture_output=True, text=True ) files = [f.strip() for f in lsres.stdout.splitlines() if f.strip()] if not files: continue # Copy only files not already copied (so newer folder wins) for remote_file in files: base = os.path.basename(remote_file) if base in copied: continue subprocess.run( ["scp", "-q", f"{host}:{remote_file}", os.path.join(tslog_dir, base)], check=False ) copied.add(base) if not copied: print(f"[WARN] No CSVs found in the two newest tslog folders on {host}") # === Fetch ptp4l/phc2sys logs === for log_label, log_path in log_sources: try: result = subprocess.run( ["ssh", host, f'bash -lc "sync; sleep 1; [ -f \\"{log_path}\\" ] && cat \\"{log_path}\\""'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) cleaned = result.stdout.replace("\x00", "") with open(os.path.join(node_dir, f"{log_label}.csv"), "w") as f: f.write(cleaned) except subprocess.CalledProcessError as e: print(f"ERROR: Failed to fetch {log_label} from {host}: {e.stderr.strip()}") with open(os.path.join(node_dir, f"{log_label}_ERROR.txt"), "w") as f: f.write((e.stderr or "") + "\n") def stop_ptp_rbm_on_nodes(nodes): print("Stopping ptp4l on all nodes...") for node_id in nodes: hostname = get_hostname(node_id) try: subprocess.run(["ssh", hostname, "pkill -f ptp4l"], check=True) subprocess.run(["ssh", hostname, "sync"], check=True) print(f"[{hostname}] ptp4l stopped and logs flushed.") except subprocess.CalledProcessError: print(f"[{hostname}] ptp4l was not running or could not be stopped.") try: subprocess.run(["ssh", hostname, "pkill -f phc2sys"], check=True) subprocess.run(["ssh", hostname, "sync"], check=True) print(f"[{hostname}] phc2sys stopped and logs flushed.") except subprocess.CalledProcessError: print(f"[{hostname}] phc2sys was not running or could not be stopped.") try: subprocess.run(["ssh", hostname, "sudo timedatectl set-ntp false"], check=True) subprocess.run(["ssh", hostname, "sync"], check=True) print(f"[{hostname}] timedatectl stopped and logs flushed.") except subprocess.CalledProcessError: print(f"[{hostname}] timedatectl was not running or could not be stopped.") try: subprocess.run(["ssh", hostname, "sudo systemctl disable --now systemd-timesyncd 2>/dev/null || true"], check=True) subprocess.run(["ssh", hostname, "sync"], check=True) print(f"[{hostname}] systemd-timesyncd stopped and logs flushed.") except subprocess.CalledProcessError: print(f"[{hostname}] systemd-timesyncd was not running or could not be stopped.") try: subprocess.run(["ssh", hostname, "pkill -f receiver_hwts_logger"]) subprocess.run(["ssh", hostname, "sync"], check=True) print(f"[{hostname}] receiver_hwts_logger stopped and logs flushed.") except subprocess.CalledProcessError: print(f"[{hostname}] receiver_hwts_logger was not running or could not be stopped.") def remove_tslogs(nodes): print("Removing old tslogs on all nodes...") for node_id in nodes: hostname = get_hostname(node_id) try: subprocess.run(["ssh", hostname, "rm -r /tmp/tslog*"]) subprocess.run(["ssh", hostname, "sync"], check=True) print(f"[{hostname}] remmoved tslogs.") except subprocess.CalledProcessError: print(f"[{hostname}] could not remmove tslogs.") def start_receivers(nodes): for node_id in nodes: hostname = get_hostname(node_id) if node_id == 84: ifaces = ("eth0", "eth3") else: ifaces = ("eth0",) for iface in ifaces: cmd = f"sudo taskset -c 1 chrt -f 90 {RECEIVER_PATH} {iface} > /dev/null 2>&1 &" print(f"[{hostname}] Starting receiver on {iface}") subprocess.run(["ssh", hostname, cmd], check=True) def wait_for_receivers(nodes, timeout=10): for node_id in nodes: hostname = get_hostname(node_id) for _ in range(timeout): result = subprocess.run([ "ssh", hostname, "pgrep -f receiver_hwts_logger > /dev/null" ]) if result.returncode == 0: break time.sleep(1) else: print(f"ERROR: receiver not running on {hostname}") sys.exit(1) def wait_for_receivers(nodes, timeout=10): """ Wait until receivers are up. - Expect 2 processes on apu84 (eth0 + eth3) - Expect 1 on all other nodes (eth0) """ for node_id in nodes: hostname = get_hostname(node_id) if node_id == 84: expected = 2 else: expected = 1 ok = False for _ in range(timeout): # count running receiver processes on the remote host # (pgrep -fa lists full cmdlines; wc -l counts lines) result = subprocess.run( ["ssh", hostname, "pgrep -fa receiver_hwts_logger | wc -l"], capture_output=True, text=True ) try: count = int(result.stdout.strip()) except ValueError: count = 0 if count >= expected: ok = True break time.sleep(1) if not ok: # fetch details to help debug detail = subprocess.run( ["ssh", hostname, "pgrep -fa receiver_hwts_logger || true"], capture_output=True, text=True ).stdout.strip() print(f"ERROR: receiver(s) not running on {hostname} " f"(expected ≥{expected}, found {count}).\n{detail}") sys.exit(1) def start_broadcaster(): cmd = f"sudo taskset -c 1 chrt -f 90 {BROADCASTER_PATH} eth0 1000 > /dev/null 2>&1 & echo $!" proc = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, text=True) return int(proc.stdout.strip()) def main(): if len(sys.argv) < 3: usage() try: duration = int(sys.argv[1]) nodes = [int(n) for n in sys.argv[2:]] except ValueError: usage() # STOPPING EVERYTHING print(f"Starting PTP loop experiment with nodes: {nodes}") print(f"Test duration: {duration} seconds") print(f"Killing all PTP and Reference Broadcast Measurement processes on nodes...") stop_ptp_rbm_on_nodes(nodes) # Remove old timestamps remove_tslogs(nodes) # Start reference broadcast measuring start_receivers(nodes) wait_for_receivers(nodes) broadcaster_pid = start_broadcaster() # Start ptp infrastructure for index, node_id in enumerate(nodes): start_ptp_on_node(node_id, nodes) if not wait_for_all_ptp_processes(nodes): print("Aborting experiment due to startup failure.") stop_ptp_rbm_on_nodes(nodes) sys.exit(1) print(f"Running loop experiment for {duration} seconds...") for remaining in range(duration, 0, -1): sys.stdout.write(f"\rTime left: {remaining:4d} s") sys.stdout.flush() time.sleep(1) print("\rTime left: 0 s\n") log_filename = generate_output_filename(nodes, duration) output_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), log_filename) print("Stopping PTP processes on all nodes...") stop_ptp_rbm_on_nodes(nodes) print("Stopping local reference broadcaster") subprocess.run(["sudo", "pkill", "broadcaster"]) print(f"Collecting logs into: {output_path}") # collect_logs(nodes, output_path) collect_logs( nodes=nodes, base_output_dir="logs", duration=duration # experiment duration in seconds ) print("Experiment complete. Logs saved.") if __name__ == "__main__": main()