Source code for trigdroid.core.cleanup

"""Resource cleanup handlers for TrigDroid."""

import logging
import tempfile
import shutil
from typing import List, Optional, Callable, Any
from pathlib import Path
import atexit
import weakref


[docs] class CleanupManager: """Manages cleanup of resources during TrigDroid execution. This class ensures that temporary files, processes, and other resources are properly cleaned up when TrigDroid exits, even if execution is interrupted. Examples: # Register a cleanup function cleanup = CleanupManager() cleanup.register_file("/tmp/trigdroid_temp.log") cleanup.register_callback(lambda: print("Cleaning up...")) # Use as context manager with CleanupManager() as cleanup: cleanup.register_file(temp_file) # ... do work ... # Cleanup happens automatically """ _instances: List[weakref.ReferenceType] = []
[docs] def __init__(self, logger: Optional[logging.Logger] = None): self._logger = logger or logging.getLogger(__name__) self._temp_files: List[Path] = [] self._temp_dirs: List[Path] = [] self._callbacks: List[Callable[[], Any]] = [] self._processes: List[Any] = [] # Process objects to terminate self._frida_sessions: List[Any] = [] # Frida sessions to detach self._cleaned_up = False # Register this instance for global cleanup CleanupManager._instances.append(weakref.ref(self))
[docs] def __enter__(self) -> 'CleanupManager': """Enter context manager.""" return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """Exit context manager and perform cleanup.""" self.cleanup()
[docs] def register_file(self, file_path: str) -> None: """Register a file for cleanup. Args: file_path: Path to file to delete on cleanup """ path = Path(file_path) if path not in self._temp_files: self._temp_files.append(path) self._logger.debug(f"Registered file for cleanup: {path}")
[docs] def register_directory(self, dir_path: str) -> None: """Register a directory for cleanup. Args: dir_path: Path to directory to delete on cleanup """ path = Path(dir_path) if path not in self._temp_dirs: self._temp_dirs.append(path) self._logger.debug(f"Registered directory for cleanup: {path}")
[docs] def register_callback(self, callback: Callable[[], Any]) -> None: """Register a callback function for cleanup. Args: callback: Function to call during cleanup """ self._callbacks.append(callback) self._logger.debug(f"Registered callback for cleanup: {callback.__name__}")
[docs] def register_process(self, process: Any) -> None: """Register a process for termination. Args: process: Process object with terminate() method """ self._processes.append(process) self._logger.debug(f"Registered process for cleanup: {process}")
[docs] def register_frida_session(self, session: Any) -> None: """Register a Frida session for detachment. Args: session: Frida session object """ self._frida_sessions.append(session) self._logger.debug(f"Registered Frida session for cleanup: {session}")
[docs] def create_temp_file(self, suffix: str = "", prefix: str = "trigdroid_") -> Path: """Create a temporary file that will be cleaned up. Args: suffix: File suffix prefix: File prefix Returns: Path to temporary file """ fd, temp_path = tempfile.mkstemp(suffix=suffix, prefix=prefix) import os os.close(fd) # Close the file descriptor path = Path(temp_path) self.register_file(path) return path
[docs] def create_temp_dir(self, prefix: str = "trigdroid_") -> Path: """Create a temporary directory that will be cleaned up. Args: prefix: Directory prefix Returns: Path to temporary directory """ temp_dir = tempfile.mkdtemp(prefix=prefix) path = Path(temp_dir) self.register_directory(path) return path
[docs] def cleanup(self) -> None: """Perform all registered cleanup operations.""" if self._cleaned_up: return self._logger.debug("Starting cleanup process...") # Clean up Frida sessions first for session in self._frida_sessions: try: if hasattr(session, 'detach'): session.detach() self._logger.debug(f"Detached Frida session: {session}") except Exception as e: self._logger.warning(f"Failed to detach Frida session {session}: {e}") # Terminate processes for process in self._processes: try: if hasattr(process, 'terminate'): process.terminate() self._logger.debug(f"Terminated process: {process}") elif hasattr(process, 'kill'): process.kill() self._logger.debug(f"Killed process: {process}") except Exception as e: self._logger.warning(f"Failed to terminate process {process}: {e}") # Run cleanup callbacks for callback in self._callbacks: try: callback() self._logger.debug(f"Executed cleanup callback: {callback.__name__}") except Exception as e: self._logger.warning(f"Cleanup callback {callback.__name__} failed: {e}") # Clean up temporary files for file_path in self._temp_files: try: if file_path.exists(): file_path.unlink() self._logger.debug(f"Deleted temporary file: {file_path}") except Exception as e: self._logger.warning(f"Failed to delete file {file_path}: {e}") # Clean up temporary directories for dir_path in self._temp_dirs: try: if dir_path.exists() and dir_path.is_dir(): shutil.rmtree(dir_path) self._logger.debug(f"Deleted temporary directory: {dir_path}") except Exception as e: self._logger.warning(f"Failed to delete directory {dir_path}: {e}") self._cleaned_up = True self._logger.debug("Cleanup process completed")
[docs] @classmethod def cleanup_all(cls) -> None: """Cleanup all active CleanupManager instances.""" for ref in cls._instances[:]: # Copy list to avoid modification during iteration instance = ref() if instance is not None: instance.cleanup() else: cls._instances.remove(ref)
# Global cleanup manager instance _global_cleanup_manager: Optional[CleanupManager] = None
[docs] def get_global_cleanup_manager() -> CleanupManager: """Get the global cleanup manager instance. Returns: Global CleanupManager instance """ global _global_cleanup_manager if _global_cleanup_manager is None: _global_cleanup_manager = CleanupManager() return _global_cleanup_manager
[docs] def register_cleanup_file(file_path: str) -> None: """Register a file for cleanup using the global manager. Args: file_path: Path to file to delete on cleanup """ get_global_cleanup_manager().register_file(file_path)
[docs] def register_cleanup_dir(dir_path: str) -> None: """Register a directory for cleanup using the global manager. Args: dir_path: Path to directory to delete on cleanup """ get_global_cleanup_manager().register_directory(dir_path)
[docs] def register_cleanup_callback(callback: Callable[[], Any]) -> None: """Register a callback for cleanup using the global manager. Args: callback: Function to call during cleanup """ get_global_cleanup_manager().register_callback(callback)
[docs] def create_temp_file(suffix: str = "", prefix: str = "trigdroid_") -> Path: """Create a temporary file using the global cleanup manager. Args: suffix: File suffix prefix: File prefix Returns: Path to temporary file """ return get_global_cleanup_manager().create_temp_file(suffix, prefix)
[docs] def create_temp_dir(prefix: str = "trigdroid_") -> Path: """Create a temporary directory using the global cleanup manager. Args: prefix: Directory prefix Returns: Path to temporary directory """ return get_global_cleanup_manager().create_temp_dir(prefix)
# Register global cleanup on exit def _global_cleanup_handler(): """Global cleanup handler for atexit.""" CleanupManager.cleanup_all() atexit.register(_global_cleanup_handler) # Context manager for temporary resources
[docs] class TempResource: """Context manager for temporary resources with automatic cleanup. Examples: # Temporary file with TempResource.file() as temp_file: temp_file.write_text("test data") # ... use file ... # File is automatically deleted # Temporary directory with TempResource.dir() as temp_dir: (temp_dir / "test.txt").write_text("test") # ... use directory ... # Directory is automatically deleted """
[docs] def __init__(self, cleanup_manager: Optional[CleanupManager] = None): self._cleanup_manager = cleanup_manager or get_global_cleanup_manager()
[docs] @classmethod def file(cls, suffix: str = "", prefix: str = "trigdroid_") -> 'TempFileResource': """Create a temporary file resource. Args: suffix: File suffix prefix: File prefix Returns: TempFileResource context manager """ return TempFileResource(suffix, prefix)
[docs] @classmethod def dir(cls, prefix: str = "trigdroid_") -> 'TempDirResource': """Create a temporary directory resource. Args: prefix: Directory prefix Returns: TempDirResource context manager """ return TempDirResource(prefix)
[docs] class TempFileResource: """Context manager for temporary files."""
[docs] def __init__(self, suffix: str = "", prefix: str = "trigdroid_"): self._suffix = suffix self._prefix = prefix self._path: Optional[Path] = None self._cleanup_manager = get_global_cleanup_manager()
def __enter__(self) -> Path: self._path = self._cleanup_manager.create_temp_file(self._suffix, self._prefix) return self._path def __exit__(self, exc_type, exc_val, exc_tb): if self._path and self._path.exists(): try: self._path.unlink() except Exception: pass # Cleanup manager will handle it
[docs] class TempDirResource: """Context manager for temporary directories."""
[docs] def __init__(self, prefix: str = "trigdroid_"): self._prefix = prefix self._path: Optional[Path] = None self._cleanup_manager = get_global_cleanup_manager()
def __enter__(self) -> Path: self._path = self._cleanup_manager.create_temp_dir(self._prefix) return self._path def __exit__(self, exc_type, exc_val, exc_tb): if self._path and self._path.exists(): try: shutil.rmtree(self._path) except Exception: pass # Cleanup manager will handle it