#!/usr/bin/env python3 import re import time import subprocess import json import os import paho.mqtt.client as mqtt import threading import signal XmeshShift=0 YmeshShift=0 XpubPos=-2 YpubPos=-2 mem_util=-1 cpu_util=-1 cpu_temp=-1 hostnumber=-1 last_toffsets = {} hwmp_enabled = False ping_enabled = False ping_targets = {} ping_procs = {"mesh0": None, "mesh1": None} active_ping_targets = {"mesh0": None, "mesh1": None} last_command_ids = {} boot_time = time.time() ping_stats = { "mesh0": {"min": None, "avg": None, "max": None}, "mesh1": {"min": None, "avg": None, "max": None} } ping_stats_internal = { "mesh0": {"sum": 0.0, "count": 0, "min": None, "max": None}, "mesh1": {"sum": 0.0, "count": 0, "min": None, "max": None} } ping_threads = {"mesh0": None, "mesh1": None} def kill_duplicate_publishers(): try: result = subprocess.check_output(["pgrep", "-f", "mqtt_publisher.py"], encoding="utf-8") except subprocess.CalledProcessError: return current_pid = os.getpid() for pid_str in result.splitlines(): pid_str = pid_str.strip() if not pid_str.isdigit(): continue pid = int(pid_str) if pid == current_pid: continue try: os.kill(pid, signal.SIGKILL) continue except ProcessLookupError: continue except PermissionError: pass try: subprocess.run(["sudo", "-n", "kill", "-9", str(pid)], check=False) except subprocess.SubprocessError: continue def count_process_instances(process_name): try: result = subprocess.check_output( ["pgrep", "-c", process_name], encoding="utf-8" ).strip() return int(result) except (subprocess.CalledProcessError, FileNotFoundError, ValueError): try: output = subprocess.check_output( ["ps", "aux"], encoding="utf-8" ) matches = [ line for line in output.splitlines() if process_name in line and "grep" not in line ] return len(matches) except subprocess.CalledProcessError: return 0 def host2pos(hostname): Xpos=-2 Ypos=-2 hostnumber=int(hostname.replace('apu', '')) #maybe insert check here! if(hostnumber >= 50 and hostnumber < 55): Xpos=0 Ypos=hostnumber-50 if(hostnumber >= 0 and hostnumber < 25 ): Xpos=(hostnumber%5)+XmeshShift Ypos=(hostnumber/5)+YmeshShift if(hostnumber >= 60 and hostnumber < 85): #error1:exists but no in the testbed Xpos=-1 Ypos=-1 if(hostnumber >= 85): #error2:do not exist Xpos=-2 Ypos=-2 return hostnumber, int(Xpos), int(Ypos) def get_established_neighbors(interface): """ Extracts MAC addresses of established (`ESTAB`) mesh links for a given interface. """ try: # output = subprocess.check_output(f"iw dev {interface} station dump", shell=True, encoding='utf-8') # Run the command to get mesh station information result = subprocess.run(['iw', 'dev', interface, 'station', 'dump'], capture_output=True, text=True) output = result.stdout # Regex patterns mac_pattern = re.compile(r"^Station ([0-9A-Fa-f:]+)") plink_pattern = re.compile(r"^\s*mesh plink:\s*(\w+)") established_links = [] current_mac = None for line in output.split('\n'): # Match MAC addresses mac_match = mac_pattern.match(line) if mac_match: current_mac = mac_match.group(1) # Store MAC address # Check if the link is ESTAB (print debug info) plink_match = plink_pattern.match(line) if plink_match and current_mac: plink_state = plink_match.group(1) if plink_state == "ESTAB": established_links.append(current_mac) current_mac = None # Reset for next station return established_links except subprocess.CalledProcessError as e: print(f"Error executing iw dev {interface} station dump: {e}") return [] def get_neighbor_metrics(interface): """Return maps of neighbor MAC -> signal (dBm), tx bitrate, Toffset, tx packets, and tx failed.""" try: result = subprocess.run(['iw', 'dev', interface, 'station', 'dump'], capture_output=True, text=True) output = result.stdout mac_pattern = re.compile(r"^Station ([0-9A-Fa-f:]+)") plink_pattern = re.compile(r"^\s*mesh plink:\s*(\w+)") signal_pattern = re.compile(r"^\s*signal:\s*(-?\d+)") tx_pattern = re.compile(r"^\s*tx bitrate:\s*(.+)") toffset_pattern = re.compile(r"^\s*toffset:\s*(.+)", re.IGNORECASE) tx_packets_pattern = re.compile(r"^\s*tx packets:\s*(\d+)") tx_failed_pattern = re.compile(r"^\s*tx failed:\s*(\d+)") signals = {} tx_bitrates = {} toffsets = {} tx_packets = {} tx_failed = {} current_mac = None current_signal = None current_tx = None current_toffset = None current_tx_packets = None current_tx_failed = None current_plink = None for line in output.split('\n'): mac_match = mac_pattern.match(line) if mac_match: current_mac = mac_match.group(1) current_signal = None current_tx = None current_toffset = None current_tx_packets = None current_tx_failed = None current_plink = None continue signal_match = signal_pattern.match(line) if signal_match and current_mac: current_signal = int(signal_match.group(1)) tx_match = tx_pattern.match(line) if tx_match and current_mac: current_tx = tx_match.group(1).strip() toffset_match = toffset_pattern.match(line) if toffset_match and current_mac: current_toffset = toffset_match.group(1).strip() tx_packets_match = tx_packets_pattern.match(line) if tx_packets_match and current_mac: current_tx_packets = int(tx_packets_match.group(1)) tx_failed_match = tx_failed_pattern.match(line) if tx_failed_match and current_mac: current_tx_failed = int(tx_failed_match.group(1)) plink_match = plink_pattern.match(line) if plink_match and current_mac: current_plink = plink_match.group(1) if current_plink == "ESTAB": signals[current_mac] = current_signal tx_bitrates[current_mac] = current_tx toffsets[current_mac] = current_toffset tx_packets[current_mac] = current_tx_packets tx_failed[current_mac] = current_tx_failed current_mac = None current_signal = None current_tx = None current_toffset = None current_tx_packets = None current_tx_failed = None current_plink = None return signals, tx_bitrates, toffsets, tx_packets, tx_failed except subprocess.CalledProcessError as e: print(f"Error executing iw dev {interface} station dump: {e}") return {}, {}, {}, {}, {} def get_interface_channel(interface): """Return (channel, frequency_mhz) from `iw dev info`.""" try: result = subprocess.run(['iw', 'dev', interface, 'info'], capture_output=True, text=True) for line in result.stdout.splitlines(): match = re.search(r"channel\s+(\d+)\s+\((\d+)\s+MHz\)", line) if match: return match.group(1), match.group(2) return None, None except subprocess.CalledProcessError as e: print(f"Error executing iw dev {interface} info: {e}") return None, None def get_mesh_paths(interface): """Return mesh path entries from `iw dev mpath dump`.""" try: result = subprocess.run(['iw', 'dev', interface, 'mpath', 'dump'], capture_output=True, text=True) output = result.stdout lines = [line for line in output.splitlines() if line.strip()] if len(lines) < 2: return [] entries = [] for line in lines[1:]: parts = line.split() if len(parts) < 11: continue entries.append({ "dest": parts[0], "next_hop": parts[1], "iface": parts[2], "sn": parts[3], "metric": parts[4], "qlen": parts[5], "exptime": parts[6], "dtim": parts[7], "dret": parts[8], "flags": parts[9], "hop_count": parts[10], "path_change": parts[11] if len(parts) > 11 else None }) return entries except subprocess.CalledProcessError as e: print(f"Error executing iw dev {interface} mpath dump: {e}") return [] # MQTT Settings MOSQUITTO_IP = "192.168.0.1" MOSQUITTO_PORT = 1883 PUBLISH_INTERVAL = int(os.environ.get("PUBLISH_INTERVAL", "1")) COMMAND_TOPIC = "apu_tb/cmd/all" # Get hostname hostname = str(subprocess.check_output("hostname", shell=True), encoding='utf-8') hostname_clean = re.sub(r"\s+", "", hostname) # positions #hostname_clean="apu10" #debug hostnumber, XpubPos, YpubPos = host2pos(hostname_clean) def build_payload(): # uptime in seconds uptime_cmd = "awk '{print $1}' /proc/uptime" uptime = int(float(re.sub(r"\s+", "", str(subprocess.check_output(uptime_cmd, shell=True), encoding='utf-8')))) # core temperature cpu_temp_cmd = "cat /sys/class/thermal/thermal_zone0/temp" cpu_temp = int(int(subprocess.check_output(cpu_temp_cmd, shell=True)) / 1000) # utilizations mem_util = int(re.sub(r"\s+", "", str(subprocess.check_output("bash /opt/scripts/mem_util.sh", shell=True), encoding='utf-8'))) cpu_util = int(float(re.sub(r"\s+", "", str(subprocess.check_output("bash /opt/scripts/cpu_util.sh", shell=True), encoding='utf-8')))) # process counts ptp4l_count = count_process_instances("ptp4l") phc2sys_count = count_process_instances("phc2sys") mesh0_neighbors = get_established_neighbors("mesh0") mesh1_neighbors = get_established_neighbors("mesh1") mesh0_signals, mesh0_tx_bitrates, mesh0_toffsets, mesh0_tx_packets, mesh0_tx_failed = get_neighbor_metrics("mesh0") mesh1_signals, mesh1_tx_bitrates, mesh1_toffsets, mesh1_tx_packets, mesh1_tx_failed = get_neighbor_metrics("mesh1") mesh0_paths = get_mesh_paths("mesh0") mesh1_paths = get_mesh_paths("mesh1") mesh0_channel, mesh0_freq = get_interface_channel("mesh0") mesh1_channel, mesh1_freq = get_interface_channel("mesh1") neighbors = [] for mac in mesh0_neighbors: tx_packets = mesh0_tx_packets.get(mac) tx_failed = mesh0_tx_failed.get(mac) loss = None if tx_packets: loss = round((tx_failed or 0) / tx_packets * 100, 1) current_toffset = mesh0_toffsets.get(mac) if current_toffset is None: current_toffset = last_toffsets.get(mac) else: last_toffsets[mac] = current_toffset neighbors.append({ "interface": "mesh0", "mac": mac, "signal": mesh0_signals.get(mac), "tx_bitrate": mesh0_tx_bitrates.get(mac), "toffset": current_toffset, "packet_loss": loss }) for mac in mesh1_neighbors: tx_packets = mesh1_tx_packets.get(mac) tx_failed = mesh1_tx_failed.get(mac) loss = None if tx_packets: loss = round((tx_failed or 0) / tx_packets * 100, 1) current_toffset = mesh1_toffsets.get(mac) if current_toffset is None: current_toffset = last_toffsets.get(mac) else: last_toffsets[mac] = current_toffset neighbors.append({ "interface": "mesh1", "mac": mac, "signal": mesh1_signals.get(mac), "tx_bitrate": mesh1_tx_bitrates.get(mac), "toffset": current_toffset, "packet_loss": loss }) apu_string = [ { "uptime": uptime, "cpu_temp": cpu_temp, "mem_util": mem_util, "cpu_util": cpu_util, "ptp4l_count": ptp4l_count, "phc2sys_count": phc2sys_count, "ping_stats": ping_stats, "XPosition": XpubPos, "YPosition": YpubPos }, { "id": hostname_clean } ] apu_json = json.dumps(apu_string) neighbors_json = json.dumps({"id": hostname_clean, "neighbors": neighbors}) paths_json = json.dumps({ "id": hostname_clean, "paths": { "mesh0": { "channel": mesh0_channel, "frequency": mesh0_freq, "entries": mesh0_paths }, "mesh1": { "channel": mesh1_channel, "frequency": mesh1_freq, "entries": mesh1_paths } } }) return apu_json, neighbors_json, paths_json def handle_command(client, userdata, message): try: payload = json.loads(message.payload.decode("utf-8")) except json.JSONDecodeError: print("Invalid command payload") return action = payload.get("action") cmd_id = payload.get("id") if cmd_id: cmd_id = str(cmd_id) target_id = payload.get("target") scope = payload.get("scope") payload_ts = payload.get("ts") if isinstance(payload_ts, (int, float)): if action in {"reboot", "re-init"} and payload_ts < boot_time: def publish_ack(): if not cmd_id: return ack_payload = json.dumps({ "id": cmd_id, "action": action, "node": hostname_clean, "ts": int(time.time()) }) try: client.publish(f"apu_tb/ack/{cmd_id}", ack_payload, qos=1) except Exception as e: print(f"Failed to publish ack: {e}") publish_ack() return def publish_ack(): if not cmd_id: return ack_payload = json.dumps({ "id": cmd_id, "action": action, "node": hostname_clean, "ts": int(time.time()) }) try: client.publish(f"apu_tb/ack/{cmd_id}", ack_payload, qos=1) except Exception as e: print(f"Failed to publish ack: {e}") if action == "reboot": if scope == "all": pass elif target_id: if target_id != hostname_clean: return else: return if cmd_id and last_command_ids.get(action) == cmd_id: publish_ack() return if cmd_id: last_command_ids[action] = cmd_id publish_ack() try: subprocess.run(["sudo", "reboot"], check=False) except subprocess.SubprocessError as e: print(f"Failed to reboot: {e}") return if action == "re-init": if scope == "all": pass elif target_id: if target_id != hostname_clean: return else: return if cmd_id and last_command_ids.get(action) == cmd_id: publish_ack() return if cmd_id: last_command_ids[action] = cmd_id publish_ack() try: kill_duplicate_publishers() subprocess.run(["sudo", "/opt/start_script.sh"], check=False) subprocess.run(["sudo", "systemctl", "restart", "mqtt_cyclic_publish.service"], check=False) except subprocess.SubprocessError as e: print(f"Failed to re-init: {e}") return if action not in {"hwmp_proactive", "hwmp_ping"}: return global hwmp_enabled, ping_enabled, ping_targets if action == "hwmp_proactive": enabled = bool(payload.get("enabled", False)) value = "2" if enabled else "0" try: for iface in ["mesh0", "mesh1"]: subprocess.run( ["iw", "dev", iface, "set", "mesh_param", "mesh_hwmp_rootmode", value], check=True ) subprocess.run( ["iw", "dev", iface, "set", "mesh_param", "mesh_hwmp_rann_interval", "1000"], check=True ) print(f"HWMP proactive set to {value} on mesh0/mesh1") except subprocess.CalledProcessError as e: print(f"Failed to set HWMP proactive: {e}") return hwmp_enabled = enabled if not enabled: ping_enabled = False ping_targets = {} update_ping_processes() return ping_enabled_payload = payload.get("enabled") enabled = True if ping_enabled_payload is None else bool(ping_enabled_payload) if not enabled: ping_enabled = False ping_targets = {} update_ping_processes() return ping = payload.get("ping") or {} if ping.get("source") != hostname_clean: if ping_enabled or ping_targets: ping_targets = {} ping_enabled = False update_ping_processes() return ping_targets = ping.get("targets") or {} ping_enabled = True update_ping_processes() def stop_ping(iface): proc = ping_procs.get(iface) if proc and proc.poll() is None: proc.terminate() try: proc.wait(timeout=2) except subprocess.TimeoutExpired: proc.kill() ping_procs[iface] = None active_ping_targets[iface] = None ping_stats[iface] = {"min": None, "avg": None, "max": None} ping_stats_internal[iface] = {"sum": 0.0, "count": 0, "min": None, "max": None} def _ping_reader(iface, proc): pattern = re.compile(r"time=([0-9.]+)\s*ms") summary_pattern = re.compile(r"min/avg/max[^=]*=\s*([0-9.]+)/([0-9.]+)/([0-9.]+)") while True: if proc.poll() is not None: break line = proc.stdout.readline() if not line: continue summary_match = summary_pattern.search(line) if summary_match: try: min_v = float(summary_match.group(1)) avg_v = float(summary_match.group(2)) max_v = float(summary_match.group(3)) except ValueError: continue ping_stats[iface] = { "min": round(min_v, 1), "avg": round(avg_v, 1), "max": round(max_v, 1) } continue match = pattern.search(line) if not match: continue try: value = float(match.group(1)) except ValueError: continue stats = ping_stats_internal[iface] stats["sum"] += value stats["count"] += 1 stats["min"] = value if stats["min"] is None else min(stats["min"], value) stats["max"] = value if stats["max"] is None else max(stats["max"], value) avg = stats["sum"] / stats["count"] if stats["count"] else None ping_stats[iface] = { "min": round(stats["min"], 1) if stats["min"] is not None else None, "avg": round(avg, 1) if avg is not None else None, "max": round(stats["max"], 1) if stats["max"] is not None else None } def start_ping(iface, target): if not target: stop_ping(iface) return if active_ping_targets.get(iface) == target: proc = ping_procs.get(iface) if proc and proc.poll() is None: return active_ping_targets[iface] = None ping_procs[iface] = None ping_procs[iface] = subprocess.Popen( ["ping", "-I", iface, target], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1 ) active_ping_targets[iface] = target ping_threads[iface] = threading.Thread( target=_ping_reader, args=(iface, ping_procs[iface]), daemon=True ) ping_threads[iface].start() def update_ping_processes(): if not ping_enabled: stop_ping("mesh0") stop_ping("mesh1") return start_ping("mesh0", ping_targets.get("mesh0")) start_ping("mesh1", ping_targets.get("mesh1")) client = mqtt.Client() client.on_message = handle_command client.connect(MOSQUITTO_IP, MOSQUITTO_PORT, 60) client.subscribe(COMMAND_TOPIC) client.loop_start() try: while True: update_ping_processes() apu_json, neighbors_json, paths_json = build_payload() client.publish("apu_tb/" + hostname_clean, apu_json) client.publish("apu_tb/" + hostname_clean + "/neighbors", neighbors_json) client.publish("apu_tb/" + hostname_clean + "/paths", paths_json) time.sleep(PUBLISH_INTERVAL) except KeyboardInterrupt: pass finally: client.loop_stop() client.disconnect()