Source code for dexray_insight.modules.native.native_loader

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# #!/usr/bin/env python3
# # -*- coding: utf-8 -*-
#
# # Copyright (C) {{ year }} Dexray Insight Contributors
# #
# # This file is part of Dexray Insight - Android APK Security Analysis Tool
# #
# # Licensed under the Apache License, Version 2.0 (the "License");
# # you may not use this file except in compliance with the License.
# # You may obtain a copy of the License at
# #
# #     http://www.apache.org/licenses/LICENSE-2.0
# #
# # Unless required by applicable law or agreed to in writing, software
# # distributed under the License is distributed on an "AS IS" BASIS,
# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# # See the License for the specific language governing permissions and
# # limitations under the License.

"""
Native Binary Analysis Loader Module.

This module serves as the main orchestrator for native binary analysis using radare2.
It discovers native binaries in unzipped APKs, manages r2pipe connections, and
coordinates the execution of native analysis modules.
"""

import shutil
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from typing import Optional

try:
    import r2pipe
except ImportError:
    r2pipe = None

# Import core framework components
from ...core.base_classes import AnalysisContext
from ...core.base_classes import AnalysisStatus
from ...core.base_classes import BaseAnalysisModule
from ...core.base_classes import BaseResult
from ...core.base_classes import register_module

# Import native analysis components
from .base_native_module import BaseNativeModule
from .base_native_module import NativeAnalysisResult
from .base_native_module import NativeBinaryInfo
from .base_native_module import NativeStringSource


[docs] @dataclass class NativeAnalysisModuleResult(BaseResult): """Result container for the native analysis loader module.""" analyzed_binaries: list[NativeBinaryInfo] = None total_strings_extracted: int = 0 strings_by_source: dict[str, list[NativeStringSource]] = None module_results: dict[str, list[NativeAnalysisResult]] = None analysis_errors: list[str] = None radare2_available: bool = False
[docs] def __post_init__(self): """Initialize default values for optional fields.""" if self.analyzed_binaries is None: self.analyzed_binaries = [] if self.strings_by_source is None: self.strings_by_source = {} if self.module_results is None: self.module_results = {} if self.analysis_errors is None: self.analysis_errors = []
[docs] def to_dict(self) -> dict[str, Any]: """Convert result to dictionary for serialization.""" base_dict = super().to_dict() base_dict.update( { "analyzed_binaries": [ { "file_path": str(binary.file_path), "relative_path": binary.relative_path, "architecture": binary.architecture, "file_size": binary.file_size, "file_name": binary.file_name, } for binary in self.analyzed_binaries ], "total_strings_extracted": self.total_strings_extracted, "strings_by_source": { source: [ { "content": s.content, "source_type": s.source_type, "file_path": s.file_path, "extraction_method": s.extraction_method, "offset": s.offset, "encoding": s.encoding, "confidence": s.confidence, } for s in strings ] for source, strings in self.strings_by_source.items() }, "analysis_errors": self.analysis_errors, "radare2_available": self.radare2_available, "binaries_analyzed_count": len(self.analyzed_binaries), "architectures_found": list(set(b.architecture for b in self.analyzed_binaries)), } ) return base_dict
[docs] @register_module("native_analysis") class NativeAnalysisLoader(BaseAnalysisModule): """ Main native binary analysis module that orchestrates native analysis. This module: 1. Checks if temporal analysis is enabled and APK is unzipped 2. Discovers native binaries (.so files) in the unzipped APK 3. Filters binaries by configured architectures 4. Manages r2pipe connections to binaries 5. Executes registered native analysis modules 6. Collects and aggregates results 7. Integrates native strings with existing string analysis """
[docs] def __init__(self, config: dict[str, Any]): """Initialize NativeAnalysisLoaderModule with configuration.""" super().__init__(config) self.native_modules: list[BaseNativeModule] = [] self._initialize_native_modules()
def _initialize_native_modules(self): """Initialize and register native analysis modules.""" try: # Import and register native modules from .library_version_detection import NativeLibraryVersionModule from .string_extraction import NativeStringExtractionModule from .strings_fallback_detection import StringsFallbackDetectionModule # Get native module configurations native_config = self.config.get("modules", {}) # Initialize string extraction module if native_config.get("string_extraction", {}).get("enabled", True): string_module = NativeStringExtractionModule( config=native_config.get("string_extraction", {}), logger=self.logger ) if string_module.is_enabled(): self.native_modules.append(string_module) self.logger.debug("Registered NativeStringExtractionModule") # Initialize library version detection module if native_config.get("library_version_detection", {}).get("enabled", True): # Try radare2-based detection first version_module = NativeLibraryVersionModule( config=native_config.get("library_version_detection", {}), logger=self.logger ) # Check if radare2 is available and r2pipe works r2_available = r2pipe is not None and self._check_radare2_availability() if r2_available and version_module.is_enabled(): self.native_modules.append(version_module) self.logger.debug("Registered NativeLibraryVersionModule (radare2-based)") else: # Fallback to strings-based detection self.logger.info( "radare2 not available, using strings-based fallback for library version detection" ) fallback_module = StringsFallbackDetectionModule( config=native_config.get("library_version_detection", {}), logger=self.logger ) if fallback_module.is_enabled(): self.native_modules.append(fallback_module) self.logger.debug("Registered StringsFallbackDetectionModule") except ImportError as e: self.logger.warning(f"Failed to import native analysis modules: {e}") except Exception as e: self.logger.error(f"Error initializing native modules: {e}")
[docs] def analyze(self, apk_path: str, context: AnalysisContext) -> NativeAnalysisModuleResult: """ Perform native binary analysis on the APK. Args: apk_path: Path to the APK file context: Analysis context with shared data Returns: NativeAnalysisModuleResult with analysis results """ start_time = time.time() try: # Check if r2pipe is available if r2pipe is None: self.logger.warning("r2pipe not available - skipping native analysis") return NativeAnalysisModuleResult( module_name="native_analysis", status=AnalysisStatus.SKIPPED, execution_time=time.time() - start_time, error_message="r2pipe not available", radare2_available=False, ) # Check if radare2 binary is available if not self._check_radare2_availability(): self.logger.warning("radare2 binary not available - skipping native analysis") return NativeAnalysisModuleResult( module_name="native_analysis", status=AnalysisStatus.SKIPPED, execution_time=time.time() - start_time, error_message="radare2 binary not available", radare2_available=False, ) # Check if temporal analysis is enabled and APK is unzipped if not context.temporal_paths: self.logger.info("Temporal analysis not enabled - skipping native analysis") return NativeAnalysisModuleResult( module_name="native_analysis", status=AnalysisStatus.SKIPPED, execution_time=time.time() - start_time, error_message="Temporal analysis required but not enabled", radare2_available=True, ) # Discover native binaries self.logger.info("Discovering native binaries in unzipped APK...") native_binaries = self._discover_native_binaries(context.temporal_paths.unzipped_dir) if not native_binaries: self.logger.info("No native binaries found in APK") return NativeAnalysisModuleResult( module_name="native_analysis", status=AnalysisStatus.SUCCESS, execution_time=time.time() - start_time, radare2_available=True, ) self.logger.info(f"Found {len(native_binaries)} native binaries to analyze") # Filter binaries by architecture filtered_binaries = self._filter_binaries_by_architecture(native_binaries) self.logger.info(f"Analyzing {len(filtered_binaries)} binaries after architecture filtering") # Analyze binaries with native modules results = self._analyze_binaries(filtered_binaries) # Aggregate results and extract strings all_strings = [] strings_by_source = {} analysis_errors = [] detected_native_libraries = [] for binary_results in results.values(): for result in binary_results: if result.strings_found: all_strings.extend(result.strings_found) source_key = result.binary_info.relative_path strings_by_source[source_key] = result.strings_found # Extract library version detection results if ( result.module_name == "native_library_version" and result.additional_data and "detected_libraries" in result.additional_data ): detected_native_libraries.extend(result.additional_data["detected_libraries"]) if result.error_message: analysis_errors.append(f"{result.binary_info.file_name}: {result.error_message}") # Integrate native strings with context for other modules to use if all_strings: self._integrate_native_strings(context, all_strings) self.logger.info(f"Extracted {len(all_strings)} strings from native binaries") # Integrate native library detections with context for library detection module if detected_native_libraries: self._integrate_native_libraries(context, detected_native_libraries) self.logger.info(f"Detected {len(detected_native_libraries)} native libraries with versions") return NativeAnalysisModuleResult( module_name="native_analysis", status=AnalysisStatus.SUCCESS, execution_time=time.time() - start_time, analyzed_binaries=filtered_binaries, total_strings_extracted=len(all_strings), strings_by_source=strings_by_source, module_results=results, analysis_errors=analysis_errors, radare2_available=True, ) except Exception as e: self.logger.error(f"Native analysis failed: {str(e)}") return NativeAnalysisModuleResult( module_name="native_analysis", status=AnalysisStatus.FAILURE, execution_time=time.time() - start_time, error_message=str(e), radare2_available=r2pipe is not None, )
def _check_radare2_availability(self) -> bool: """Check if radare2 binary is available.""" try: # Get radare2 configuration from ...core.configuration import Configuration config = Configuration() radare2_config = config.get_tool_config("radare2") # First try configured path radare2_path = radare2_config.get("path") if radare2_path: if Path(radare2_path).exists(): return True else: self.logger.debug(f"Configured radare2 path does not exist: {radare2_path}") # Try common radare2 command names in PATH for command in ["r2", "radare2"]: if shutil.which(command) is not None: self.logger.debug(f"Found radare2 as '{command}' in PATH") return True # Try fallback paths from configuration fallback_paths = radare2_config.get("fallback_paths", []) for fallback_path in fallback_paths: if Path(fallback_path).exists(): self.logger.debug(f"Found radare2 at fallback path: {fallback_path}") return True self.logger.debug("radare2 not found in PATH or fallback paths") return False except Exception as e: self.logger.debug(f"Error checking radare2 availability: {e}") return False def _discover_native_binaries(self, unzipped_dir: Path) -> list[NativeBinaryInfo]: """ Discover native binaries in the unzipped APK directory. Args: unzipped_dir: Path to unzipped APK directory Returns: List of discovered native binaries """ binaries = [] # Get file patterns from configuration file_patterns = self.config.get("file_patterns", ["*.so"]) try: # Look for native libraries in lib/ directory lib_dir = unzipped_dir / "lib" if lib_dir.exists(): for pattern in file_patterns: for binary_file in lib_dir.rglob(pattern): if binary_file.is_file(): # Extract architecture from path (e.g., lib/arm64-v8a/libexample.so) path_parts = binary_file.relative_to(unzipped_dir).parts architecture = "unknown" if len(path_parts) >= 3 and path_parts[0] == "lib": architecture = path_parts[1] # e.g., "arm64-v8a" binary_info = NativeBinaryInfo( file_path=binary_file, relative_path=str(binary_file.relative_to(unzipped_dir)), architecture=architecture, file_size=binary_file.stat().st_size, file_name=binary_file.name, ) binaries.append(binary_info) except Exception as e: self.logger.error(f"Error discovering native binaries: {e}") return binaries def _filter_binaries_by_architecture(self, binaries: list[NativeBinaryInfo]) -> list[NativeBinaryInfo]: """ Filter binaries by configured architectures. Args: binaries: List of discovered binaries Returns: List of filtered binaries """ allowed_architectures = self.config.get("architectures", ["arm64-v8a"]) filtered = [] for binary in binaries: if binary.architecture in allowed_architectures: filtered.append(binary) else: self.logger.debug( f"Skipping {binary.file_name} - architecture {binary.architecture} not in allowed list" ) return filtered def _analyze_binaries(self, binaries: list[NativeBinaryInfo]) -> dict[str, list[NativeAnalysisResult]]: """ Analyze native binaries using registered native modules. Args: binaries: List of binaries to analyze Returns: Dictionary mapping module names to analysis results """ results = {} # Get radare2 configuration from ...core.configuration import Configuration config = Configuration() radare2_config = config.get_tool_config("radare2") timeout = radare2_config.get("timeout", 120) for binary in binaries: self.logger.debug(f"Analyzing native binary: {binary.relative_path}") try: # Open r2pipe connection r2 = self._open_r2pipe_connection(binary.file_path, timeout) if r2 is None: self.logger.warning(f"Failed to open r2pipe connection for {binary.file_name}") continue # Run each native analysis module for module in self.native_modules: if not module.can_analyze(binary): continue module_name = module.get_module_name() if module_name not in results: results[module_name] = [] try: self.logger.debug(f"Running {module_name} on {binary.file_name}") result = module.analyze_binary(binary, r2) results[module_name].append(result) except Exception as e: self.logger.error(f"Module {module_name} failed on {binary.file_name}: {e}") error_result = NativeAnalysisResult( binary_info=binary, module_name=module_name, success=False, error_message=str(e) ) results[module_name].append(error_result) # Close r2pipe connection try: r2.quit() except Exception: # Ignore r2pipe cleanup errors pass except Exception as e: self.logger.error(f"Error analyzing binary {binary.file_name}: {e}") return results def _open_r2pipe_connection(self, binary_path: Path, timeout: int) -> Optional[Any]: """ Open an r2pipe connection to a native binary. Args: binary_path: Path to the native binary timeout: Connection timeout Returns: r2pipe connection or None if failed """ import os try: # Get radare2 configuration from ...core.configuration import Configuration config = Configuration() radare2_config = config.get_tool_config("radare2") radare2_path = radare2_config.get("path") options = radare2_config.get("options", []) # If no configured path, try to find radare2 if not radare2_path: # Try common command names for command in ["r2", "radare2"]: if shutil.which(command): radare2_path = command self.logger.debug(f"Using radare2 command: {command}") break # If still not found, try fallback paths if not radare2_path: fallback_paths = radare2_config.get("fallback_paths", []) for fallback_path in fallback_paths: if Path(fallback_path).exists(): radare2_path = fallback_path self.logger.debug(f"Using radare2 from fallback path: {fallback_path}") break if not radare2_path: self.logger.warning("No radare2 binary found") return None # Ensure radare2 is accessible to r2pipe by modifying PATH if needed old_path = os.environ.get("PATH", "") path_modified = False if not shutil.which("r2") and not shutil.which("radare2"): # Add the directory containing radare2 to PATH radare2_dir = Path(radare2_path).parent os.environ["PATH"] = f"{radare2_dir}:{old_path}" path_modified = True self.logger.debug(f"Added {radare2_dir} to PATH for r2pipe") try: # Open connection with timeout handling r2 = r2pipe.open(str(binary_path), flags=options) # Basic initialization (skip auto-analysis for performance) # r2.cmd("aaa") # Auto-analyze all - commented out for speed return r2 finally: # Restore original PATH if we modified it if path_modified: os.environ["PATH"] = old_path except Exception as e: self.logger.debug(f"Failed to open r2pipe connection to {binary_path}: {e}") return None def _integrate_native_strings(self, context: AnalysisContext, native_strings: list[NativeStringSource]): """ Integrate native strings with the analysis context for other modules to use. Args: context: Analysis context native_strings: List of strings extracted from native binaries """ try: # Add native strings to context for other modules if "native_strings" not in context.module_results: context.module_results["native_strings"] = [] # Convert to format expected by string analysis modules string_contents = [s.content for s in native_strings] context.module_results["native_strings"].extend(string_contents) # Also store detailed native string information context.module_results["native_string_sources"] = native_strings self.logger.debug(f"Added {len(native_strings)} native strings to analysis context") except Exception as e: self.logger.error(f"Error integrating native strings: {e}") def _integrate_native_libraries(self, context: AnalysisContext, detected_native_libraries: list[dict[str, Any]]): """ Integrate native library detections with the analysis context for library detection module. Args: context: Analysis context detected_native_libraries: List of detected native libraries with version information """ try: # Add native library detections to context for library detection module if "native_libraries" not in context.module_results: context.module_results["native_libraries"] = [] # Convert native library detections to format expected by library detection system for native_lib in detected_native_libraries: # Create standardized library detection format library_detection = { "name": native_lib.get("library_name", ""), "version": native_lib.get("version", ""), "confidence": native_lib.get("confidence", 0.0), "category": "native", "detection_method": f"native_{native_lib.get('source_type', 'unknown')}", "source_evidence": native_lib.get("source_evidence", ""), "file_path": native_lib.get("file_path", ""), "additional_info": { "source_type": native_lib.get("source_type", ""), "architecture": self._extract_architecture_from_path(native_lib.get("file_path", "")), "native_detection": True, **native_lib.get("additional_info", {}), }, } context.module_results["native_libraries"].append(library_detection) self.logger.debug( f"Integrated native library: {native_lib.get('library_name')} {native_lib.get('version')} " f"(confidence: {native_lib.get('confidence', 0.0):.2f})" ) # Store count for statistics context.module_results["native_libraries_count"] = len(detected_native_libraries) self.logger.info( f"Successfully integrated {len(detected_native_libraries)} native libraries into analysis context" ) except Exception as e: self.logger.error(f"Error integrating native libraries: {e}") def _extract_architecture_from_path(self, file_path: str) -> str: """Extract architecture from native library file path.""" try: # Common Android architectures in lib paths architectures = ["arm64-v8a", "armeabi-v7a", "armeabi", "x86", "x86_64", "mips", "mips64"] for arch in architectures: if arch in file_path: return arch return "unknown" except Exception: return "unknown"
[docs] def get_dependencies(self) -> list[str]: """Get list of module dependencies.""" # Native analysis should run after basic analysis is done return ["apk_overview", "string_analysis"]