Source code for dexray_insight.core.base_classes

#!/usr/bin/env python3

"""Base classes and data structures for the Dexray Insight analysis framework."""

# #!/usr/bin/env python3
# # -*- coding: utf-8 -*-
#
# # Copyright (C) {{ year }} Dexray Insight Contributors
# #
# # This file is part of Dexray Insight - Android APK Security Analysis Tool
# #
# # Licensed under the Apache License, Version 2.0 (the "License");
# # you may not use this file except in compliance with the License.
# # You may obtain a copy of the License at
# #
# #     http://www.apache.org/licenses/LICENSE-2.0
# #
# # Unless required by applicable law or agreed to in writing, software
# # distributed under the License is distributed on an "AS IS" BASIS,
# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# # See the License for the specific language governing permissions and
# # limitations under the License.

import json
import logging
from abc import ABC
from abc import abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Any
from typing import Optional


[docs] class AnalysisSeverity(Enum): """Enumeration of analysis severity levels for security findings. Used throughout the security assessment framework to classify the severity of detected vulnerabilities and security issues. Values: LOW: Informational findings or minor security concerns MEDIUM: Moderate security issues requiring attention HIGH: Serious security vulnerabilities needing prompt remediation CRITICAL: Severe security issues requiring immediate action """ LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical"
[docs] class AnalysisStatus(Enum): """Enumeration of analysis module execution statuses. Used to track the execution state of individual analysis modules and provide consistent status reporting across the framework. Values: SUCCESS: Module completed successfully with results FAILURE: Module failed to execute due to errors PARTIAL: Module completed with some issues or warnings SKIPPED: Module was not executed (disabled, missing dependencies, etc.) """ SUCCESS = "success" FAILURE = "failure" PARTIAL = "partial" SKIPPED = "skipped"
[docs] @dataclass class AnalysisContext: """Context object passed between modules containing shared data and results. The AnalysisContext serves as a shared data container that is passed between analysis modules during the analysis workflow. It contains APK information, configuration, and accumulated results from previous modules. This design enables: - Data sharing between dependent modules - Centralized configuration access - Progressive result accumulation - Temporal directory management Attributes: apk_path: File path to the APK being analyzed config: Configuration dictionary from the engine androguard_obj: Optional pre-loaded Androguard analysis object unzip_path: Legacy field for backwards compatibility (deprecated) module_results: Dictionary storing results from completed modules temporal_paths: Modern temporal directory management object jadx_available: Flag indicating JADX decompiler availability apktool_available: Flag indicating APKTool availability Design Pattern: Context Object (shares state between modules) SOLID Principles: Single Responsibility (data container and accessor) """ apk_path: str config: dict[str, Any] androguard_obj: Any | None = None unzip_path: str | None = None # Legacy field for backwards compatibility module_results: dict[str, Any] = None # Temporal directory paths (new) temporal_paths: Any | None = None # TemporalDirectoryPaths object jadx_available: bool = False apktool_available: bool = False
[docs] def __post_init__(self): """Initialize module results dictionary after dataclass creation.""" if self.module_results is None: self.module_results = {}
[docs] def add_result(self, module_name: str, result: Any): """Add a module result to the context for use by dependent modules. This method allows completed modules to store their results in the shared context where they can be accessed by dependent modules. Args: module_name: Name of the module storing the result result: Analysis result object or data structure Side Effects: Modifies self.module_results dictionary """ self.module_results[module_name] = result
[docs] def get_unzipped_dir(self) -> str | None: """Get path to unzipped APK directory (temporal or legacy). This method provides backwards compatibility by checking both modern temporal paths and legacy unzip paths. Returns: str: Path to unzipped APK directory, or None if not available Design Pattern: Facade (hides complexity of path resolution) """ if self.temporal_paths: return str(self.temporal_paths.unzipped_dir) return self.unzip_path
[docs] def get_jadx_dir(self) -> str | None: """Get path to JADX decompiled directory.""" if self.temporal_paths: return str(self.temporal_paths.jadx_dir) return None
[docs] def get_apktool_dir(self) -> str | None: """Get path to apktool results directory.""" if self.temporal_paths: return str(self.temporal_paths.apktool_dir) return None
[docs] def get_result(self, module_name: str) -> Any | None: """Get a result from a previously executed module.""" return self.module_results.get(module_name)
[docs] def create_file_location( self, file_path: str, line_number: int | None = None, offset: int | None = None, end_line: int | None = None, end_offset: int | None = None, ) -> "FileLocation": """Create a FileLocation object for security findings. This method creates file location objects with proper URI formatting and handles different file types appropriately: - Java/Smali files: Use line_number for precise location - Native libraries (.so): Use offset for binary location (base address 0x0) Args: file_path: Absolute or relative path to the file line_number: Line number for Java/Smali files (1-based) offset: Byte offset for native libraries (0x0 base address) end_line: Optional end line for multi-line findings end_offset: Optional end offset for native libraries Returns: FileLocation object with proper URI and location information. """ import os # Ensure absolute path for URI if not os.path.isabs(file_path): # Try to resolve relative to decompiled directories if file_path.endswith(".java") and self.get_jadx_dir(): abs_path = os.path.join(self.get_jadx_dir(), file_path) elif file_path.endswith(".smali") and self.get_apktool_dir(): abs_path = os.path.join(self.get_apktool_dir(), file_path) elif file_path.endswith(".so") and self.get_unzipped_dir(): abs_path = os.path.join(self.get_unzipped_dir(), "lib", file_path) else: abs_path = os.path.abspath(file_path) else: abs_path = file_path # Create file URI file_uri = f"file://{abs_path}" return FileLocation( uri=file_uri, start_line=line_number, start_offset=offset, end_line=end_line, end_offset=end_offset )
[docs] def create_java_file_location( self, java_file_path: str, line_number: int, end_line: int | None = None ) -> "FileLocation": """Create a FileLocation for decompiled Java files from JADX. Args: java_file_path: Path to Java file (relative to JADX directory) line_number: Line number in Java file (1-based) end_line: Optional end line for multi-line findings Returns: FileLocation object for Java file. """ return self.create_file_location(java_file_path, line_number=line_number, end_line=end_line)
[docs] def create_smali_file_location( self, smali_file_path: str, line_number: int, end_line: int | None = None ) -> "FileLocation": """Create a FileLocation for Smali files from APKTool. Args: smali_file_path: Path to Smali file (relative to APKTool directory) line_number: Line number in Smali file (1-based) end_line: Optional end line for multi-line findings Returns: FileLocation object for Smali file. """ return self.create_file_location(smali_file_path, line_number=line_number, end_line=end_line)
[docs] def create_native_file_location( self, so_file_path: str, offset: int, end_offset: int | None = None ) -> "FileLocation": """Create a FileLocation for native library (.so) files with byte offsets. For native libraries, we use byte offsets assuming base load address 0x0. This allows binary analysis tools to locate the exact position in the file. Args: so_file_path: Path to .so file (relative to lib directory) offset: Byte offset in the native library (base address 0x0) end_offset: Optional end offset for range findings Returns: FileLocation object for native library. """ return self.create_file_location(so_file_path, offset=offset, end_offset=end_offset)
[docs] @dataclass class BaseResult: """Base class for all analysis results.""" module_name: str status: AnalysisStatus execution_time: float = 0.0 error_message: str | None = None
[docs] def to_dict(self) -> dict[str, Any]: """Convert result to dictionary for serialization.""" return { "module_name": self.module_name, "status": self.status.value, "execution_time": self.execution_time, "error_message": self.error_message, }
[docs] def to_json(self) -> str: """Convert result to JSON string.""" return json.dumps(self.to_dict(), indent=2)
@dataclass class FileLocation: """Represents a file location for security findings.""" uri: str # File URI in format "file://<absolute_path>" start_line: int | None = None # Line number for Java/Smali files start_offset: int | None = None # Byte offset for native libraries (.so files) end_line: int | None = None # Optional end line for multi-line findings end_offset: int | None = None # Optional end offset for native libraries def to_dict(self) -> dict[str, Any]: """Convert to dictionary for serialization.""" result = {"uri": self.uri} if self.start_line is not None: result["startLine"] = self.start_line if self.start_offset is not None: result["startOffset"] = self.start_offset if self.end_line is not None: result["endLine"] = self.end_line if self.end_offset is not None: result["endOffset"] = self.end_offset return result @dataclass class SecurityFinding: """Represents a security finding from OWASP assessment with precise file location.""" category: str severity: AnalysisSeverity title: str description: str evidence: list[str] recommendations: list[str] cve_references: list[str] = None additional_data: dict[str, Any] = None file_location: FileLocation | None = None # Precise file and line/offset information def __post_init__(self): """Initialize optional fields after dataclass creation.""" if self.cve_references is None: self.cve_references = [] if self.additional_data is None: self.additional_data = {} def to_dict(self) -> dict[str, Any]: """Convert to dictionary for serialization including file location.""" result = { "category": self.category, "severity": self.severity.value, "title": self.title, "description": self.description, "evidence": self.evidence, "recommendations": self.recommendations, "cve_references": self.cve_references, "additional_data": self.additional_data, } if self.file_location: result["fileLocation"] = self.file_location.to_dict() return result
[docs] class BaseAnalysisModule(ABC): """Abstract base class for all analysis modules in the Dexray Insight framework. This class defines the standard interface that all analysis modules must implement. It provides common functionality like configuration handling, logging setup, and standardized method signatures for the analysis workflow. Responsibilities: - Define the contract for analysis modules (analyze, get_dependencies) - Provide common initialization and configuration handling - Set up standardized logging for all modules - Enforce consistent return types (BaseResult) Design Pattern: Template Method (defines algorithm structure) SOLID Principles: - Interface Segregation (focused interface for analysis modules) - Liskov Substitution (all modules can be used interchangeably) Implementation Requirements: - Must implement analyze() method for core analysis logic - Must implement get_dependencies() to declare module dependencies - Should return results wrapped in BaseResult or its subclasses Attributes: config: Configuration dictionary passed from AnalysisEngine name: Module class name for identification enabled: Flag indicating if module is enabled for execution logger: Configured logger instance for this module """
[docs] def __init__(self, config: dict[str, Any]): """Initialize the analysis module with configuration. Args: config: Configuration dictionary for this module. """ self.config = config self.name = self.__class__.__name__ self.enabled = config.get("enabled", True) self.logger = logging.getLogger(self.__class__.__module__ + "." + self.__class__.__name__)
[docs] @abstractmethod def analyze(self, apk_path: str, context: AnalysisContext) -> BaseResult: """Perform the core analysis logic for this module. This is the main entry point for module execution. Implementations should perform their specific analysis tasks and return structured results wrapped in a BaseResult object. Args: apk_path: Absolute path to the APK file being analyzed context: AnalysisContext containing shared data, configuration, and results from previously executed modules Returns: BaseResult: Analysis results with status, data, and error information. Should include all relevant findings from this module's analysis. Raises: Should handle internal exceptions and return results with FAILURE status rather than propagating exceptions to the engine. Implementation Guidelines: - Use self.logger for consistent logging - Access configuration via self.config - Use context to access shared data and previous results - Return meaningful error messages in BaseResult on failure - Follow single responsibility principle in analysis logic """
[docs] @abstractmethod def get_dependencies(self) -> list[str]: """Return list of module names this module depends on. Returns: List of module names that must be executed before this module. """
[docs] def validate_config(self) -> bool: """Validate module configuration. Returns: True if configuration is valid, False otherwise. """ return True
[docs] def is_enabled(self) -> bool: """Check if module is enabled.""" return self.enabled
[docs] def get_priority(self) -> int: """Get execution priority (lower numbers = higher priority).""" return self.config.get("priority", 100)
class BaseExternalTool(ABC): """Abstract base class for external tool integrations. This class defines the interface for integrating external tools like APKID, Kavanoz, JADX, and APKTool into the analysis workflow. Responsibilities: - Define standard interface for external tool execution - Provide configuration management for tool-specific settings - Standardize tool availability checking and execution - Handle tool-specific result processing and error handling Design Pattern: Adapter (adapts external tools to framework interface) SOLID Principles: Interface Segregation (focused interface for tools) Attributes: config: Tool-specific configuration dictionary name: Tool class name for identification enabled: Flag indicating if tool is enabled for execution """ def __init__(self, config: dict[str, Any]): """Initialize external tool with configuration. Args: config: Tool configuration dictionary. """ self.config = config self.name = self.__class__.__name__ self.enabled = config.get("enabled", True) @abstractmethod def execute(self, apk_path: str, output_dir: str | None = None) -> dict[str, Any]: """Execute the external tool. Args: apk_path: Path to the APK file output_dir: Optional output directory for tool results Returns: Dictionary containing tool results. """ @abstractmethod def is_available(self) -> bool: """Check if tool is available on the system. Returns: True if tool is available and can be executed. """ def get_version(self) -> str | None: """Get tool version if available.""" return None def is_enabled(self) -> bool: """Check if tool is enabled.""" return self.enabled class BaseSecurityAssessment(ABC): """Abstract base class for OWASP Top 10 security assessments with file location support.""" def __init__(self, config: dict[str, Any]): """Initialize security assessment with configuration. Args: config: Assessment configuration dictionary. """ self.config = config self.name = self.__class__.__name__ self.owasp_category = "" self.enabled = config.get("enabled", True) self.logger = logging.getLogger(self.__class__.__module__ + "." + self.__class__.__name__) @abstractmethod def assess( self, analysis_results: dict[str, Any], context: Optional["AnalysisContext"] = None ) -> list[SecurityFinding]: """Perform security assessment with file location tracking. Args: analysis_results: Combined results from all analysis modules context: Analysis context for file location creation (optional for backward compatibility) Returns: List of security findings with precise file locations where possible. """ def create_finding_with_location( self, category: str, severity: AnalysisSeverity, title: str, description: str, evidence: list[str], recommendations: list[str], context: Optional["AnalysisContext"] = None, file_path: str | None = None, line_number: int | None = None, offset: int | None = None, end_line: int | None = None, cve_references: list[str] | None = None, additional_data: dict[str, Any] | None = None, ) -> SecurityFinding: """Create a security finding with precise file location information. Args: category: OWASP category (e.g., "A03:2021 - Sensitive Data") severity: Severity level of the finding title: Brief title of the finding description: Detailed description of the issue evidence: List of evidence strings showing the problem recommendations: List of remediation recommendations context: Analysis context for file location creation file_path: Path to file containing the issue line_number: Line number for Java/Smali files offset: Byte offset for native libraries end_line: Optional end line for multi-line issues cve_references: Optional CVE references additional_data: Optional additional metadata Returns: SecurityFinding with file location information. """ file_location = None if context and file_path: try: file_location = context.create_file_location( file_path, line_number=line_number, offset=offset, end_line=end_line ) except Exception as e: self.logger.warning(f"Could not create file location for {file_path}: {e}") return SecurityFinding( category=category, severity=severity, title=title, description=description, evidence=evidence, recommendations=recommendations, cve_references=cve_references or [], additional_data=additional_data or {}, file_location=file_location, ) def create_java_finding( self, category: str, severity: AnalysisSeverity, title: str, description: str, evidence: list[str], recommendations: list[str], context: Optional["AnalysisContext"] = None, java_file: str | None = None, line_number: int | None = None, end_line: int | None = None, **kwargs, ) -> SecurityFinding: """ Create a security finding for Java decompiled code. Args: java_file: Path to Java file relative to JADX directory line_number: Line number in Java file (1-based) end_line: Optional end line for multi-line findings **kwargs: Additional arguments passed to create_finding_with_location Returns: SecurityFinding with Java file location """ file_location = None if context and java_file and line_number: try: file_location = context.create_java_file_location(java_file, line_number, end_line) except Exception as e: self.logger.warning(f"Could not create Java file location: {e}") return SecurityFinding( category=category, severity=severity, title=title, description=description, evidence=evidence, recommendations=recommendations, file_location=file_location, **{k: v for k, v in kwargs.items() if k in ["cve_references", "additional_data"]}, ) def create_smali_finding( self, category: str, severity: AnalysisSeverity, title: str, description: str, evidence: list[str], recommendations: list[str], context: Optional["AnalysisContext"] = None, smali_file: str | None = None, line_number: int | None = None, end_line: int | None = None, **kwargs, ) -> SecurityFinding: """ Create a security finding for Smali code from APKTool. Args: smali_file: Path to Smali file relative to APKTool directory line_number: Line number in Smali file (1-based) end_line: Optional end line for multi-line findings **kwargs: Additional arguments passed to create_finding_with_location Returns: SecurityFinding with Smali file location """ file_location = None if context and smali_file and line_number: try: file_location = context.create_smali_file_location(smali_file, line_number, end_line) except Exception as e: self.logger.warning(f"Could not create Smali file location: {e}") return SecurityFinding( category=category, severity=severity, title=title, description=description, evidence=evidence, recommendations=recommendations, file_location=file_location, **{k: v for k, v in kwargs.items() if k in ["cve_references", "additional_data"]}, ) def create_native_finding( self, category: str, severity: AnalysisSeverity, title: str, description: str, evidence: list[str], recommendations: list[str], context: Optional["AnalysisContext"] = None, so_file: str | None = None, offset: int | None = None, end_offset: int | None = None, **kwargs, ) -> SecurityFinding: """ Create a security finding for native library (.so) files. Args: so_file: Path to .so file relative to lib directory offset: Byte offset in native library (base address 0x0) end_offset: Optional end offset for range findings **kwargs: Additional arguments passed to create_finding_with_location Returns: SecurityFinding with native library file location """ file_location = None if context and so_file and offset is not None: try: file_location = context.create_native_file_location(so_file, offset, end_offset) except Exception as e: self.logger.warning(f"Could not create native file location: {e}") return SecurityFinding( category=category, severity=severity, title=title, description=description, evidence=evidence, recommendations=recommendations, file_location=file_location, **{k: v for k, v in kwargs.items() if k in ["cve_references", "additional_data"]}, ) def get_owasp_category(self) -> str: """Get OWASP Top 10 category this assessment covers.""" return self.owasp_category def is_enabled(self) -> bool: """Check if assessment is enabled.""" return self.enabled class ModuleRegistry: """Registry for managing analysis modules, tools, and security assessments.""" def __init__(self): """Initialize module registry with empty collections.""" self._modules: dict[str, type] = {} self._tools: dict[str, type] = {} self._assessments: dict[str, type] = {} def register_module(self, name: str, module_class: type): """Register an analysis module.""" if not issubclass(module_class, BaseAnalysisModule): raise ValueError(f"Module {name} must inherit from BaseAnalysisModule") self._modules[name] = module_class def register_tool(self, name: str, tool_class: type): """Register an external tool.""" if not issubclass(tool_class, BaseExternalTool): raise ValueError(f"Tool {name} must inherit from BaseExternalTool") self._tools[name] = tool_class def register_assessment(self, name: str, assessment_class: type): """Register a security assessment.""" if not issubclass(assessment_class, BaseSecurityAssessment): raise ValueError(f"Assessment {name} must inherit from BaseSecurityAssessment") self._assessments[name] = assessment_class def get_module(self, name: str) -> type | None: """Get a registered module class.""" return self._modules.get(name) def get_tool(self, name: str) -> type | None: """Get a registered tool class.""" return self._tools.get(name) def get_assessment(self, name: str) -> type | None: """Get a registered assessment class.""" return self._assessments.get(name) def list_modules(self) -> list[str]: """List all registered modules.""" return list(self._modules.keys()) def list_tools(self) -> list[str]: """List all registered tools.""" return list(self._tools.keys()) def list_assessments(self) -> list[str]: """List all registered assessments.""" return list(self._assessments.keys()) # Global registry instance registry = ModuleRegistry()
[docs] def register_module(name: str): """Register analysis modules via decorator.""" def decorator(cls): registry.register_module(name, cls) return cls return decorator
def register_tool(name: str): """Register external tools via decorator.""" def decorator(cls): registry.register_tool(name, cls) return cls return decorator def register_assessment(name: str): """Register security assessments via decorator.""" def decorator(cls): registry.register_assessment(name, cls) return cls return decorator