import os import pty import time import subprocess import signal def run_test(): # Create pseudo-terminal master, slave = pty.openpty() # Start bash in interactive mode p = subprocess.Popen( ['bash', '-i'], stdin=slave, stdout=slave, stderr=slave, close_fds=True, preexec_fn=os.setsid # Create a new session/process group ) # Close slave end in parent os.close(slave) # Set master to non-blocking os.set_blocking(master, False) # Helper to write to bash def write_to_bash(cmd): os.write(master, cmd.encode()) time.sleep(0.5) # Define function in the interactive shell func_def = """ my_func() { sleep 100 & local bg_pid=$! echo "ACTUAL_BG_PID:$bg_pid" sleep 10 echo "RUNNING CLEANUP" kill $bg_pid 2>/dev/null wait $bg_pid 2>/dev/null echo "CLEANUP DONE" } """ write_to_bash(func_def + "\n") # Run the function write_to_bash("my_func\n") # Read output to wait for "ACTUAL_BG_PID:" output = b"" start_time = time.time() bg_pid = None while time.time() - start_time < 5: try: r = os.read(master, 1024) if r: output += r except BlockingIOError: pass if b"ACTUAL_BG_PID:" in output: parts = output.split(b"ACTUAL_BG_PID:") if len(parts) > 1: # Get the part after ACTUAL_BG_PID: rest = parts[-1].strip() # Split by newline or carriage return or spaces potential_pid = rest.split(b"\r")[0].split(b"\n")[0].split(b" ")[0].decode().strip() if potential_pid.isdigit(): bg_pid = int(potential_pid) break time.sleep(0.1) print("Found BG PID:", bg_pid) print("Output so far:", output.decode('utf-8', errors='ignore')) if bg_pid is None: print("Error: Could not find BG PID.") p.terminate() return # Wait a bit to ensure it is sleeping time.sleep(1) # In a terminal, Ctrl+C sends SIGINT to the foreground process group. # The shell sets the foreground process group of the terminal to the active job. # Let's get the foreground process group of the master terminal. # We can use os.tcgetpgrp(master) to get the process group currently in the foreground! try: fore_pgid = os.tcgetpgrp(master) print(f"Foreground process group of tty: {fore_pgid}") # Send SIGINT to the foreground process group os.killpg(fore_pgid, signal.SIGINT) except Exception as e: print("Failed to get foreground pgid via tcgetpgrp or killpg:", e) # Fallback: kill the bash process group pgid = os.getpgid(p.pid) print(f"Fallback: sending SIGINT to bash process group {pgid}") os.killpg(pgid, signal.SIGINT) # Give it some time to process time.sleep(2) # Let's read the remaining output to see if "RUNNING CLEANUP" was printed try: time.sleep(1) remaining = b"" start_time = time.time() while time.time() - start_time < 2: try: r = os.read(master, 4096) if r: remaining += r except BlockingIOError: pass time.sleep(0.1) print("Remaining output:", remaining.decode('utf-8', errors='ignore')) except Exception as e: print("Read error or no more output:", e) # Check if bg_pid is still running try: os.kill(bg_pid, 0) print(f"VERDICT: Process {bg_pid} is STILL RUNNING! It was orphaned!") # Clean it up os.kill(bg_pid, signal.SIGKILL) except OSError: print(f"VERDICT: Process {bg_pid} is NOT running. Cleanup worked!") # Clean up bash p.terminate() p.wait() if __name__ == '__main__': run_test()