#!/usr/bin/env python3 import argparse import os import time import threading from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler class FileEventHandler(FileSystemEventHandler): def __init__(self, target_subfolders, script_start_time): super().__init__() self.target_subfolders = set(target_subfolders) self.missing_subfolders_counter = len(target_subfolders) self.script_start_time = script_start_time self.stop_event = threading.Event() def on_created(self, event): self.handle_event(event) def on_modified(self, event): self.handle_event(event) def handle_event(self, event): if not event.is_directory: if os.path.exists(event.src_path) and not self.is_hidden(event.src_path): file_creation_time = os.path.getctime(event.src_path) if file_creation_time > self.script_start_time: subfolder = os.path.basename(os.path.dirname(event.src_path)) if subfolder in self.target_subfolders: self.target_subfolders.remove(subfolder) self.missing_subfolders_counter -= 1 print(f"File created or modified in '{subfolder}': {event.src_path}") print(f"Remaining missing subfolders: {self.missing_subfolders_counter}") if self.missing_subfolders_counter > 0: print("Missing subfolders:", ", ".join(map(lambda x: x[3:], sorted(self.target_subfolders)))) def is_hidden(self, path): name = os.path.basename(path) return name.startswith('.') or any(part.startswith('.') for part in name.split(os.path.sep)) def stop(self): self.stop_event.set() def generate_subfolder_names(start, end): return [f"apu{i:02}" for i in range(start, end + 1)] def observe_folder(folder_path, target_subfolders, start, end, script_start_time): target_subfolders = set(target_subfolders) event_handler = FileEventHandler(target_subfolders, script_start_time) observer = Observer() observer.schedule(event_handler, path=folder_path, recursive=True) observer.start() try: print(f"Overall missing new files at the beginning: {event_handler.missing_subfolders_counter}") while not event_handler.stop_event.is_set(): time.sleep(1) if event_handler.missing_subfolders_counter == 0: print("All target subfolders found. Stopping observation.") break elif event_handler.stop_event.wait(timeout=1): print("Stopping observation by user request.") break except KeyboardInterrupt: pass finally: observer.stop() observer.join() def run_test(folder_path, start, end): script_start_time = time.time() target_subfolders = generate_subfolder_names(start, end) print(f"Running test for files created or modified after script start time: {time.ctime(script_start_time)}") observe_folder(folder_path, target_subfolders, start, end, script_start_time) def main(): parser = argparse.ArgumentParser(description="Monitor a folder and its subfolders for new or modified file events (excluding hidden files).") parser.add_argument("folder_path", help="Path of the folder to observe") parser.add_argument("--test", action="store_true", help="Run a test for specific subfolders") parser.add_argument("--start", type=int, default=0, help="Start index for subfolder range") parser.add_argument("--end", type=int, default=24, help="End index for subfolder range") args = parser.parse_args() folder_to_observe = args.folder_path if args.test: run_test(folder_to_observe, args.start, args.end) else: observe_folder(folder_to_observe, [], args.start, args.end, 0) if __name__ == "__main__": main()