Source code for dexray_insight.core.base_classes

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
import json
import logging

[docs] class AnalysisSeverity(Enum): """ Enumeration of analysis severity levels for security findings. Used throughout the security assessment framework to classify the severity of detected vulnerabilities and security issues. Values: LOW: Informational findings or minor security concerns MEDIUM: Moderate security issues requiring attention HIGH: Serious security vulnerabilities needing prompt remediation CRITICAL: Severe security issues requiring immediate action """ LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical"
[docs] class AnalysisStatus(Enum): """ Enumeration of analysis module execution statuses. Used to track the execution state of individual analysis modules and provide consistent status reporting across the framework. Values: SUCCESS: Module completed successfully with results FAILURE: Module failed to execute due to errors PARTIAL: Module completed with some issues or warnings SKIPPED: Module was not executed (disabled, missing dependencies, etc.) """ SUCCESS = "success" FAILURE = "failure" PARTIAL = "partial" SKIPPED = "skipped"
[docs] @dataclass class AnalysisContext: """ Context object passed between modules containing shared data and results. The AnalysisContext serves as a shared data container that is passed between analysis modules during the analysis workflow. It contains APK information, configuration, and accumulated results from previous modules. This design enables: - Data sharing between dependent modules - Centralized configuration access - Progressive result accumulation - Temporal directory management Attributes: apk_path: File path to the APK being analyzed config: Configuration dictionary from the engine androguard_obj: Optional pre-loaded Androguard analysis object unzip_path: Legacy field for backwards compatibility (deprecated) module_results: Dictionary storing results from completed modules temporal_paths: Modern temporal directory management object jadx_available: Flag indicating JADX decompiler availability apktool_available: Flag indicating APKTool availability Design Pattern: Context Object (shares state between modules) SOLID Principles: Single Responsibility (data container and accessor) """ apk_path: str config: Dict[str, Any] androguard_obj: Optional[Any] = None unzip_path: Optional[str] = None # Legacy field for backwards compatibility module_results: Dict[str, Any] = None # Temporal directory paths (new) temporal_paths: Optional[Any] = None # TemporalDirectoryPaths object jadx_available: bool = False apktool_available: bool = False def __post_init__(self): if self.module_results is None: self.module_results = {}
[docs] def add_result(self, module_name: str, result: Any): """ Add a module result to the context for use by dependent modules. This method allows completed modules to store their results in the shared context where they can be accessed by dependent modules. Args: module_name: Name of the module storing the result result: Analysis result object or data structure Side Effects: Modifies self.module_results dictionary """ self.module_results[module_name] = result
[docs] def get_unzipped_dir(self) -> Optional[str]: """ Get path to unzipped APK directory (temporal or legacy). This method provides backwards compatibility by checking both modern temporal paths and legacy unzip paths. Returns: str: Path to unzipped APK directory, or None if not available Design Pattern: Facade (hides complexity of path resolution) """ if self.temporal_paths: return str(self.temporal_paths.unzipped_dir) return self.unzip_path
[docs] def get_jadx_dir(self) -> Optional[str]: """Get path to JADX decompiled directory""" if self.temporal_paths: return str(self.temporal_paths.jadx_dir) return None
[docs] def get_apktool_dir(self) -> Optional[str]: """Get path to apktool results directory""" if self.temporal_paths: return str(self.temporal_paths.apktool_dir) return None
[docs] def get_result(self, module_name: str) -> Optional[Any]: """Get a result from a previously executed module""" return self.module_results.get(module_name)
[docs] @dataclass class BaseResult: """Base class for all analysis results""" module_name: str status: AnalysisStatus execution_time: float = 0.0 error_message: Optional[str] = None
[docs] def to_dict(self) -> Dict[str, Any]: """Convert result to dictionary for serialization""" return { 'module_name': self.module_name, 'status': self.status.value, 'execution_time': self.execution_time, 'error_message': self.error_message }
[docs] def to_json(self) -> str: """Convert result to JSON string""" return json.dumps(self.to_dict(), indent=2)
@dataclass class SecurityFinding: """Represents a security finding from OWASP assessment""" category: str severity: AnalysisSeverity title: str description: str evidence: List[str] recommendations: List[str] cve_references: List[str] = None additional_data: Dict[str, Any] = None def __post_init__(self): if self.cve_references is None: self.cve_references = [] if self.additional_data is None: self.additional_data = {}
[docs] class BaseAnalysisModule(ABC): """ Abstract base class for all analysis modules in the Dexray Insight framework. This class defines the standard interface that all analysis modules must implement. It provides common functionality like configuration handling, logging setup, and standardized method signatures for the analysis workflow. Responsibilities: - Define the contract for analysis modules (analyze, get_dependencies) - Provide common initialization and configuration handling - Set up standardized logging for all modules - Enforce consistent return types (BaseResult) Design Pattern: Template Method (defines algorithm structure) SOLID Principles: - Interface Segregation (focused interface for analysis modules) - Liskov Substitution (all modules can be used interchangeably) Implementation Requirements: - Must implement analyze() method for core analysis logic - Must implement get_dependencies() to declare module dependencies - Should return results wrapped in BaseResult or its subclasses Attributes: config: Configuration dictionary passed from AnalysisEngine name: Module class name for identification enabled: Flag indicating if module is enabled for execution logger: Configured logger instance for this module """
[docs] def __init__(self, config: Dict[str, Any]): self.config = config self.name = self.__class__.__name__ self.enabled = config.get('enabled', True) self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
[docs] @abstractmethod def analyze(self, apk_path: str, context: AnalysisContext) -> BaseResult: """ Perform the core analysis logic for this module. This is the main entry point for module execution. Implementations should perform their specific analysis tasks and return structured results wrapped in a BaseResult object. Args: apk_path: Absolute path to the APK file being analyzed context: AnalysisContext containing shared data, configuration, and results from previously executed modules Returns: BaseResult: Analysis results with status, data, and error information. Should include all relevant findings from this module's analysis. Raises: Should handle internal exceptions and return results with FAILURE status rather than propagating exceptions to the engine. Implementation Guidelines: - Use self.logger for consistent logging - Access configuration via self.config - Use context to access shared data and previous results - Return meaningful error messages in BaseResult on failure - Follow single responsibility principle in analysis logic """ pass
[docs] @abstractmethod def get_dependencies(self) -> List[str]: """ Return list of module names this module depends on Returns: List of module names that must be executed before this module """ pass
[docs] def validate_config(self) -> bool: """ Validate module configuration Returns: True if configuration is valid, False otherwise """ return True
[docs] def is_enabled(self) -> bool: """Check if module is enabled""" return self.enabled
[docs] def get_priority(self) -> int: """Get execution priority (lower numbers = higher priority)""" return self.config.get('priority', 100)
class BaseExternalTool(ABC): """ Abstract base class for external tool integrations. This class defines the interface for integrating external tools like APKID, Kavanoz, JADX, and APKTool into the analysis workflow. Responsibilities: - Define standard interface for external tool execution - Provide configuration management for tool-specific settings - Standardize tool availability checking and execution - Handle tool-specific result processing and error handling Design Pattern: Adapter (adapts external tools to framework interface) SOLID Principles: Interface Segregation (focused interface for tools) Attributes: config: Tool-specific configuration dictionary name: Tool class name for identification enabled: Flag indicating if tool is enabled for execution """ def __init__(self, config: Dict[str, Any]): self.config = config self.name = self.__class__.__name__ self.enabled = config.get('enabled', True) @abstractmethod def execute(self, apk_path: str, output_dir: Optional[str] = None) -> Dict[str, Any]: """ Execute the external tool Args: apk_path: Path to the APK file output_dir: Optional output directory for tool results Returns: Dictionary containing tool results """ pass @abstractmethod def is_available(self) -> bool: """ Check if tool is available on the system Returns: True if tool is available and can be executed """ pass def get_version(self) -> Optional[str]: """Get tool version if available""" return None def is_enabled(self) -> bool: """Check if tool is enabled""" return self.enabled class BaseSecurityAssessment(ABC): """Abstract base class for OWASP Top 10 security assessments""" def __init__(self, config: Dict[str, Any]): self.config = config self.name = self.__class__.__name__ self.owasp_category = "" self.enabled = config.get('enabled', True) @abstractmethod def assess(self, analysis_results: Dict[str, Any]) -> List[SecurityFinding]: """ Perform security assessment Args: analysis_results: Combined results from all analysis modules Returns: List of security findings """ pass def get_owasp_category(self) -> str: """Get OWASP Top 10 category this assessment covers""" return self.owasp_category def is_enabled(self) -> bool: """Check if assessment is enabled""" return self.enabled class ModuleRegistry: """Registry for managing analysis modules, tools, and security assessments""" def __init__(self): self._modules: Dict[str, type] = {} self._tools: Dict[str, type] = {} self._assessments: Dict[str, type] = {} def register_module(self, name: str, module_class: type): """Register an analysis module""" if not issubclass(module_class, BaseAnalysisModule): raise ValueError(f"Module {name} must inherit from BaseAnalysisModule") self._modules[name] = module_class def register_tool(self, name: str, tool_class: type): """Register an external tool""" if not issubclass(tool_class, BaseExternalTool): raise ValueError(f"Tool {name} must inherit from BaseExternalTool") self._tools[name] = tool_class def register_assessment(self, name: str, assessment_class: type): """Register a security assessment""" if not issubclass(assessment_class, BaseSecurityAssessment): raise ValueError(f"Assessment {name} must inherit from BaseSecurityAssessment") self._assessments[name] = assessment_class def get_module(self, name: str) -> Optional[type]: """Get a registered module class""" return self._modules.get(name) def get_tool(self, name: str) -> Optional[type]: """Get a registered tool class""" return self._tools.get(name) def get_assessment(self, name: str) -> Optional[type]: """Get a registered assessment class""" return self._assessments.get(name) def list_modules(self) -> List[str]: """List all registered modules""" return list(self._modules.keys()) def list_tools(self) -> List[str]: """List all registered tools""" return list(self._tools.keys()) def list_assessments(self) -> List[str]: """List all registered assessments""" return list(self._assessments.keys()) # Global registry instance registry = ModuleRegistry()
[docs] def register_module(name: str): """Decorator for registering analysis modules""" def decorator(cls): registry.register_module(name, cls) return cls return decorator
def register_tool(name: str): """Decorator for registering external tools""" def decorator(cls): registry.register_tool(name, cls) return cls return decorator def register_assessment(name: str): """Decorator for registering security assessments""" def decorator(cls): registry.register_assessment(name, cls) return cls return decorator