Source code for iops_profiler.collector

"""
Data collection module for IOPS Profiler.

This module contains all the data collection functionality including:
- Parsing strace and fs_usage output
- Platform-specific measurement methods (Linux, macOS, Windows)
- Helper functions for collecting I/O statistics
"""

import os
import re
import subprocess
import sys
import tempfile
import time

try:
    import psutil
except ImportError:
[docs] psutil = None
# Timing constants for strace attachment and capture
[docs] STRACE_ATTACH_DELAY = 0.5 # seconds to wait for strace to attach to process
[docs] STRACE_CAPTURE_DELAY = 0.5 # seconds to wait for strace to capture final I/O
# I/O syscalls to trace with strace
[docs] STRACE_IO_SYSCALLS = [ "read", "write", # Basic I/O "pread64", "pwrite64", # Positional I/O "readv", "writev", # Vectored I/O "preadv", "pwritev", # Positional vectored I/O "preadv2", "pwritev2", # Extended vectored I/O ]
# Regex pattern for extracting byte count from fs_usage output
[docs] FS_USAGE_BYTE_PATTERN = r"B=0x([0-9a-fA-F]+)"
[docs] class Collector: """Collector class for I/O profiling data collection. This class encapsulates all data collection functionality and maintains necessary state like the IPython shell, strace patterns, and syscall sets. """ def __init__(self, shell): """Initialize the Collector with IPython shell context. Args: shell: IPython shell instance for executing code """
[docs] self.shell = shell
[docs] self.platform = sys.platform
# Compile regex patterns for better performance # Pattern matches: PID syscall(args) = result
[docs] self._strace_pattern = re.compile(r"^\s*(\d+)\s+(\w+)\([^)]+\)\s*=\s*(-?\d+)")
# Pattern matches: B=0x[hex] in fs_usage output
[docs] self._fs_usage_byte_pattern = re.compile(FS_USAGE_BYTE_PATTERN)
# Set of syscall names for I/O operations (lowercase)
[docs] self._io_syscalls = set(STRACE_IO_SYSCALLS)
@staticmethod
[docs] def parse_fs_usage_line_static(line, byte_pattern=None, collect_ops=False): """Parse a single fs_usage output line for I/O operations (static version) Args: line: The line to parse byte_pattern: Compiled regex pattern for extracting byte count (optional) collect_ops: If True, return full operation info for histogram collection Returns: If collect_ops is False: (op_type, bytes_transferred) If collect_ops is True: {'type': op_type, 'bytes': bytes_transferred} """ parts = line.split() if len(parts) < 2: return None if collect_ops else (None, 0) syscall = parts[1].lower() is_read = "read" in syscall is_write = "write" in syscall if not (is_read or is_write): return None if collect_ops else (None, 0) # Extract byte count from B=0x[hex] pattern using compiled regex if byte_pattern is None: # Fallback to inline regex if no pattern provided (for backward compatibility) byte_match = re.search(FS_USAGE_BYTE_PATTERN, line) else: byte_match = byte_pattern.search(line) bytes_transferred = int(byte_match.group(1), 16) if byte_match else 0 op_type = "read" if is_read else "write" if collect_ops: return {"type": op_type, "bytes": bytes_transferred} return op_type, bytes_transferred
[docs] def parse_fs_usage_line(self, line, collect_ops=False): """Parse a single fs_usage output line for I/O operations (instance method) This is a convenience wrapper that uses the instance's compiled byte pattern. """ return self.parse_fs_usage_line_static(line, self._fs_usage_byte_pattern, collect_ops)
@staticmethod
[docs] def parse_strace_line_static(line, strace_pattern, io_syscalls, collect_ops=False): """Parse a single strace output line for I/O operations (static version) Example strace lines: 3385 write(3, "Hello World...", 1100) = 1100 3385 read(3, "data", 4096) = 133 3385 pread64(3, "...", 1024, 0) = 1024 Note: Lines with <unfinished ...> or <... resumed> are not matched as they don't contain complete result information in a single line. Args: line: The line to parse strace_pattern: Compiled regex pattern for strace output io_syscalls: Set of I/O syscall names to track collect_ops: If True, return full operation info for histogram collection Returns: If collect_ops is False: (op_type, bytes_transferred) If collect_ops is True: {'type': op_type, 'bytes': bytes_transferred} """ # Match patterns like: PID syscall(fd, ..., size) = result match = strace_pattern.match(line) if not match: return None if collect_ops else (None, 0) pid, syscall, result = match.groups() syscall = syscall.lower() # Check if it's one of the I/O syscalls we're tracking if syscall not in io_syscalls: return None if collect_ops else (None, 0) # Determine if it's a read or write operation based on syscall name if "read" in syscall: is_read = True elif "write" in syscall: is_read = False else: return None if collect_ops else (None, 0) # The return value is the number of bytes transferred (or -1 on error) bytes_transferred = int(result) if bytes_transferred < 0: return None if collect_ops else (None, 0) op_type = "read" if is_read else "write" if collect_ops: return {"type": op_type, "bytes": bytes_transferred} return op_type, bytes_transferred
[docs] def parse_strace_line(self, line, collect_ops=False): """Parse a single strace output line for I/O operations (instance method) This is a convenience wrapper that uses the instance's strace pattern and syscalls. """ return self.parse_strace_line_static(line, self._strace_pattern, self._io_syscalls, collect_ops)
@staticmethod
[docs] def _create_helper_script(pid, output_file, control_file): """Create a bash helper script that runs fs_usage with elevated privileges""" script_content = f"""#!/bin/bash PID={pid} OUTPUT_FILE="{output_file}" CONTROL_FILE="{control_file}" ERROR_FILE="${{OUTPUT_FILE}}.err" # Try to clean up any existing fs_usage processes first killall -9 fs_usage 2>/dev/null sleep 0.5 # Start fs_usage and capture stderr separately fs_usage -f filesystem -w "$PID" > "$OUTPUT_FILE" 2> "$ERROR_FILE" & FS_USAGE_PID=$! # Give fs_usage a moment to initialize sleep 1 if ! kill -0 "$FS_USAGE_PID" 2>/dev/null; then exit 1 fi echo "$FS_USAGE_PID" > "${{CONTROL_FILE}}.pid" # Wait for stop signal while [ "$(cat "$CONTROL_FILE" 2>/dev/null)" != "STOP" ]; do if ! kill -0 "$FS_USAGE_PID" 2>/dev/null; then exit 1 fi sleep 0.1 done # Terminate fs_usage kill -TERM "$FS_USAGE_PID" 2>/dev/null sleep 0.5 if kill -0 "$FS_USAGE_PID" 2>/dev/null; then kill -9 "$FS_USAGE_PID" 2>/dev/null fi exit 0 """ return script_content
[docs] def _launch_helper_via_osascript(self, helper_script_path): """Launch helper script with sudo via osascript (prompts for password)""" applescript = f""" do shell script "bash {helper_script_path}" with administrator privileges """ proc = subprocess.Popen( ["osascript", "-e", applescript], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) return proc
[docs] def measure_macos_osascript(self, code, collect_ops=False): """Measure IOPS on macOS using fs_usage via osascript Args: code: The code to profile collect_ops: If True, collect individual operation sizes for histogram """ pid = os.getpid() # Create temporary files # Create temporary files - we need the names, not file handles output_file = tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False).name # noqa: SIM115 control_file = tempfile.NamedTemporaryFile(mode="w", suffix=".ctrl", delete=False).name # noqa: SIM115 helper_script = tempfile.NamedTemporaryFile(mode="w", suffix=".sh", delete=False).name # noqa: SIM115 try: # Write the helper script script_content = self._create_helper_script(pid, output_file, control_file) with open(helper_script, "w") as f: f.write(script_content) os.chmod(helper_script, 0o755) # Initialize control file with open(control_file, "w") as f: f.write("INIT") print("⚠️ A password dialog will appear - please enter your password to enable I/O monitoring.") # Launch helper via osascript helper_proc = self._launch_helper_via_osascript(helper_script) # Wait for fs_usage to be ready pid_file = f"{control_file}.pid" max_wait = 30 waited = 0 while not os.path.exists(pid_file) and waited < max_wait: time.sleep(0.5) waited += 0.5 if helper_proc.poll() is not None: raise RuntimeError("Helper script failed to start fs_usage (may be Resource busy)") if not os.path.exists(pid_file): raise RuntimeError("Timeout waiting for fs_usage to start") # Execute the code start_time = time.time() self.shell.run_cell(code) elapsed_time = time.time() - start_time # Give fs_usage a moment to capture final I/O time.sleep(0.5) # Signal helper to stop with open(control_file, "w") as f: f.write("STOP") # Wait for helper to finish try: helper_proc.communicate(timeout=5) except subprocess.TimeoutExpired: helper_proc.kill() helper_proc.wait() # Parse the output read_count = 0 write_count = 0 read_bytes = 0 write_bytes = 0 operations = [] if collect_ops else None if os.path.exists(output_file): with open(output_file, "r") as f: for line in f: if collect_ops: op = self.parse_fs_usage_line(line, collect_ops=True) if op: operations.append(op) if op["type"] == "read": read_count += 1 read_bytes += op["bytes"] elif op["type"] == "write": write_count += 1 write_bytes += op["bytes"] else: op_type, bytes_transferred = self.parse_fs_usage_line(line) if op_type == "read": read_count += 1 read_bytes += bytes_transferred elif op_type == "write": write_count += 1 write_bytes += bytes_transferred result = { "read_count": read_count, "write_count": write_count, "read_bytes": read_bytes, "write_bytes": write_bytes, "elapsed_time": elapsed_time, "method": "fs_usage (per-process)", } if collect_ops: result["operations"] = operations return result finally: # Cleanup - try to kill fs_usage processes try: # noqa: SIM105 subprocess.run( ["sudo", "killall", "-9", "fs_usage"], capture_output=True, timeout=2, check=False ) except (subprocess.TimeoutExpired, FileNotFoundError, OSError): pass # sudo or killall not available or timed out for filepath in [ output_file, control_file, helper_script, f"{control_file}.pid", f"{output_file}.err", ]: try: if os.path.exists(filepath): os.remove(filepath) except OSError: pass # File already deleted or permission issue
[docs] def measure_linux_strace(self, code, collect_ops=False): """Measure IOPS on Linux using strace (no elevated privileges required) Args: code: The code to profile collect_ops: If True, collect individual operation sizes for histogram """ pid = os.getpid() # Allow this process to be ptraced (required on systems with Yama LSM) try: import ctypes import ctypes.util libc = ctypes.CDLL(ctypes.util.find_library("c")) PR_SET_PTRACER = 0x59616D61 # noqa: N806 - system constant PR_SET_PTRACER_ANY = -1 # noqa: N806 - system constant libc.prctl(PR_SET_PTRACER, PR_SET_PTRACER_ANY, 0, 0, 0) except Exception: pass # Create temporary file for strace output - we need the name, not file handle output_file = tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False).name # noqa: SIM115 try: # Start strace in the background syscalls_to_trace = ",".join(STRACE_IO_SYSCALLS) strace_cmd = [ "strace", "-f", # Follow forks "-e", f"trace={syscalls_to_trace}", "-o", output_file, "-p", str(pid), ] # Start strace process strace_proc = subprocess.Popen( strace_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Give strace a moment to attach time.sleep(STRACE_ATTACH_DELAY) # Check if strace started successfully if strace_proc.poll() is not None: stdout, stderr = strace_proc.communicate() if "Operation not permitted" in stderr: raise RuntimeError( "strace failed - ptrace not permitted. This may be due to kernel security settings." ) raise RuntimeError(f"Failed to start strace: {stderr}") # Execute the code start_time = time.time() self.shell.run_cell(code) elapsed_time = time.time() - start_time # Give strace a moment to capture final I/O time.sleep(STRACE_CAPTURE_DELAY) # Terminate strace strace_proc.terminate() try: strace_proc.wait(timeout=2) except subprocess.TimeoutExpired: strace_proc.kill() strace_proc.wait() # Parse the output read_count = 0 write_count = 0 read_bytes = 0 write_bytes = 0 operations = [] if collect_ops else None if os.path.exists(output_file): try: with open(output_file, "r", errors="ignore") as f: for line in f: if collect_ops: op = self.parse_strace_line(line, collect_ops=True) if op: operations.append(op) if op["type"] == "read": read_count += 1 read_bytes += op["bytes"] elif op["type"] == "write": write_count += 1 write_bytes += op["bytes"] else: op_type, bytes_transferred = self.parse_strace_line(line) if op_type == "read": read_count += 1 read_bytes += bytes_transferred elif op_type == "write": write_count += 1 write_bytes += bytes_transferred except OSError: pass result = { "read_count": read_count, "write_count": write_count, "read_bytes": read_bytes, "write_bytes": write_bytes, "elapsed_time": elapsed_time, "method": "strace (per-process)", } if collect_ops: result["operations"] = operations return result finally: # Cleanup try: if os.path.exists(output_file): os.remove(output_file) except OSError: # File cleanup may fail, not critical pass
[docs] def measure_linux_windows(self, code): """Measure IOPS on Linux/Windows using psutil""" if not psutil: raise RuntimeError("psutil not installed. Run: pip install psutil") process = psutil.Process() # Get initial I/O counters try: io_before = process.io_counters() except AttributeError as e: raise RuntimeError(f"psutil.Process.io_counters() not supported on {self.platform}") from e # noqa: B904 # Execute the code start_time = time.time() self.shell.run_cell(code) elapsed_time = time.time() - start_time # Get final I/O counters io_after = process.io_counters() # Calculate differences read_count = io_after.read_count - io_before.read_count write_count = io_after.write_count - io_before.write_count read_bytes = io_after.read_bytes - io_before.read_bytes write_bytes = io_after.write_bytes - io_before.write_bytes return { "read_count": read_count, "write_count": write_count, "read_bytes": read_bytes, "write_bytes": write_bytes, "elapsed_time": elapsed_time, "method": "psutil (per-process)", }
[docs] def measure_systemwide_fallback(self, code): """Fallback: system-wide I/O measurement using psutil""" if not psutil: raise RuntimeError("psutil not installed. Run: pip install psutil") # Get initial system-wide I/O counters io_before = psutil.disk_io_counters() if io_before is None: raise RuntimeError("System-wide disk I/O counters not available") # Execute the code start_time = time.time() self.shell.run_cell(code) elapsed_time = time.time() - start_time # Get final system-wide I/O counters io_after = psutil.disk_io_counters() # Calculate differences read_count = io_after.read_count - io_before.read_count write_count = io_after.write_count - io_before.write_count read_bytes = io_after.read_bytes - io_before.read_bytes write_bytes = io_after.write_bytes - io_before.write_bytes return { "read_count": read_count, "write_count": write_count, "read_bytes": read_bytes, "write_bytes": write_bytes, "elapsed_time": elapsed_time, "method": "⚠️ SYSTEM-WIDE (includes all processes)", }
# Module-level functions for backward compatibility with tests # These directly reference the static methods to avoid code duplication
[docs] parse_fs_usage_line = Collector.parse_fs_usage_line_static
[docs] parse_strace_line = Collector.parse_strace_line_static
[docs] create_helper_script = Collector._create_helper_script