Source code for pyqt_reactive.core.log_utils

"""
Core Log Utilities for pyqt-reactor.

Unified log discovery, classification, and monitoring utilities
shared between UI implementations.
"""

import logging
import time
from pathlib import Path
from typing import Optional, List
from dataclasses import dataclass

from pyqt_reactive.protocols import get_form_config

logger = logging.getLogger(__name__)


def _get_log_dir() -> Path:
    """Return configured log directory or default."""
    config = get_form_config()
    if config.log_dir:
        return Path(config.log_dir)
    return Path.home() / ".local" / "share" / "pyqt_reactive" / "logs"


def _get_log_prefixes() -> List[str]:
    """Return configured log prefixes or default."""
    config = get_form_config()
    return config.log_prefixes or ["pyqt_reactive_"]


def _match_prefixed(file_name: str, suffix: str) -> Optional[str]:
    """Return matching prefix for a file name if it starts with prefix+suffix."""
    for prefix in _get_log_prefixes():
        if file_name.startswith(f"{prefix}{suffix}"):
            return prefix
    return None


[docs] def get_current_log_file_path() -> str: """Get the current log file path from the logging system.""" try: # Get the root logger and find the FileHandler root_logger = logging.getLogger() for handler in root_logger.handlers: if isinstance(handler, logging.FileHandler): return handler.baseFilename # Fallback: try to get from configured logger name config = get_form_config() if config.log_root_logger_name: root_named_logger = logging.getLogger(config.log_root_logger_name) for handler in root_named_logger.handlers: if isinstance(handler, logging.FileHandler): return handler.baseFilename # Last resort: create a default path log_dir = _get_log_dir() log_dir.mkdir(parents=True, exist_ok=True) prefix = (_get_log_prefixes() or ["pyqt_reactive_"])[0] return str(log_dir / f"{prefix}subprocess_{int(time.time())}.log") except Exception as e: logger.error(f"Failed to get current log file path: {e}") raise RuntimeError(f"Could not determine log file path: {e}")
[docs] @dataclass class LogFileInfo: """Information about a discovered log file.""" path: Path log_type: str # "tui", "main", "worker", "unknown" worker_id: Optional[str] = None display_name: Optional[str] = None
[docs] def __post_init__(self): """Generate display name if not provided.""" if not self.display_name: if self.log_type == "tui": self.display_name = "Main Process" elif self.log_type == "main": self.display_name = "Main Subprocess" elif self.log_type == "worker" and self.worker_id: self.display_name = f"Worker {self.worker_id}" else: self.display_name = self.path.name
[docs] def discover_logs(base_log_path: Optional[str] = None, include_main_log: bool = True, log_directory: Optional[Path] = None) -> List[LogFileInfo]: """ Discover application log files and return as classified LogFileInfo objects. Args: base_log_path: Base path for specific subprocess logs (optional) include_main_log: Whether to include the current main process log log_directory: Directory to search (defaults to configured log directory) Returns: List of LogFileInfo objects for discovered log files """ discovered_logs = [] # Include current main process log if requested if include_main_log: try: main_log_path = get_current_log_file_path() main_log = Path(main_log_path) if main_log.exists(): log_info = classify_log_file(main_log, base_log_path, include_main_log) discovered_logs.append(log_info) except Exception: pass # Main log not available, continue # Discover subprocess logs if base_log_path is provided if base_log_path: base_path = Path(base_log_path) log_dir = base_path.parent if log_dir.exists(): for log_file in log_dir.glob("*.log"): if is_relevant_log_file(log_file, base_log_path): log_info = classify_log_file(log_file, base_log_path, include_main_log) discovered_logs.append(log_info) # Discover all logs if no specific base_log_path elif log_directory or not base_log_path: if log_directory is None: log_directory = _get_log_dir() if log_directory.exists(): for log_file in log_directory.glob("*.log"): if is_app_log_file(log_file) and log_file not in [log.path for log in discovered_logs]: # Infer base_log_path for proper classification inferred_base = infer_base_log_path(log_file) if 'subprocess_' in log_file.name else None log_info = classify_log_file(log_file, inferred_base, include_main_log) discovered_logs.append(log_info) return discovered_logs
[docs] def classify_log_file(log_path: Path, base_log_path: Optional[str] = None, include_tui_log: bool = True) -> LogFileInfo: """ Pure function: Classify a log file and extract metadata. Args: log_path: Path to log file base_log_path: Base path for subprocess log files include_tui_log: Whether to check for TUI log classification Returns: LogFileInfo with classification and metadata """ file_name = log_path.name # Check if it's the current TUI log if include_tui_log: try: tui_log_path = get_current_log_file_path() if log_path == Path(tui_log_path): return LogFileInfo(log_path, "tui", display_name="Main Process") except RuntimeError: pass # TUI log not found, continue with other classification # Check for ZMQ server logs (<prefix>zmq_server_port_{port}_{timestamp}.log) prefix = _match_prefixed(file_name, "zmq_server_port_") if prefix: # Extract port from filename parts = file_name.replace(f'{prefix}zmq_server_port_', '').replace('.log', '').split('_') port = parts[0] if parts else 'unknown' return LogFileInfo(log_path, "zmq_server", display_name=f"ZMQ Server (port {port})") # Check for ZMQ worker logs if file_name.startswith('zmq_worker_exec_'): # Extract execution ID and worker PID parts = file_name.replace('zmq_worker_exec_', '').replace('.log', '').split('_worker_') if len(parts) == 2: exec_id_short = parts[0][:8] # First 8 chars of UUID worker_pid = parts[1].split('_')[0] # PID is first part after _worker_ return LogFileInfo(log_path, "zmq_worker", worker_pid, display_name=f"ZMQ Worker {worker_pid}") # Check for Napari viewer logs if file_name.startswith('napari_detached_port_'): port = file_name.replace('napari_detached_port_', '').replace('.log', '') return LogFileInfo(log_path, "napari", display_name=f"Napari Viewer (port {port})") # Check subprocess logs if base_log_path is provided if base_log_path: base_name = Path(base_log_path).name # Check if it's the main subprocess log: exact match if file_name == f"{base_name}.log": return LogFileInfo(log_path, "main", display_name="Main Subprocess") # Check if it's a worker log: {base_name}_worker_*.log if file_name.startswith(f"{base_name}_worker_") and file_name.endswith('.log'): # Extract worker ID (everything between _worker_ and .log) worker_part = file_name[len(f"{base_name}_worker_"):-4] # Remove .log suffix worker_id = worker_part.split('_')[0] # Take first part before any additional underscores return LogFileInfo(log_path, "worker", worker_id, display_name=f"Worker {worker_id}") # Unknown or malformed log file logger.debug(f"Unrecognized log file pattern: {file_name}") return LogFileInfo(log_path, "unknown")
[docs] def is_relevant_log_file(file_path: Path, base_log_path: Optional[str]) -> bool: """ Check if file is a relevant log file for monitoring. Args: file_path: Path to file to check base_log_path: Base path for subprocess log files Returns: bool: True if file is relevant for monitoring """ if not base_log_path: return False base_name = Path(base_log_path).name file_name = file_path.name # Check if it matches our patterns if file_name == f"{base_name}.log": return True if file_name.startswith(f"{base_name}_worker_") and file_name.endswith('.log'): return True return False
[docs] def is_app_log_file(file_path: Path) -> bool: """ Check if a file is a recognized application log file. Args: file_path: Path to file to check Returns: bool: True if file matches configured log prefixes """ if not file_path.name.endswith('.log'): return False file_name = file_path.name # App log patterns based on configured prefixes, plus common auxiliary logs prefixes = _get_log_prefixes() extra_patterns = [ "pyqt_gui_subprocess_", "zmq_worker_", "napari_detached_", ] patterns = [*prefixes, *extra_patterns] return any(file_name.startswith(pattern) for pattern in patterns)
[docs] def infer_base_log_path(file_path: Path) -> str: """ Infer the base log path from a subprocess log file name. Args: file_path: Path to subprocess log file Returns: str: Inferred base log path """ file_name = file_path.name # Handle worker logs: remove _worker_* suffix if '_worker_' in file_name: base_name = file_name.split('_worker_')[0] else: # Handle main subprocess logs: remove .log extension base_name = file_path.stem return str(file_path.parent / base_name)
# Backward compatibility alias is_openhcs_log_file = is_app_log_file