#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import logging
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed
from .base_classes import (
AnalysisContext, BaseResult, AnalysisStatus,
registry
)
from .configuration import Configuration
from .security_engine import SecurityAssessmentEngine
from .temporal_directory import TemporalDirectoryManager
from ..results.FullAnalysisResults import FullAnalysisResults
# Import result classes for type hints only
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..results.apkOverviewResults import APKOverview
from ..results.InDepthAnalysisResults import Results
[docs]
@dataclass
class ExecutionPlan:
"""
Represents the execution plan for analysis modules with dependency ordering.
This dataclass encapsulates the complete execution strategy for a set of
analysis modules, including dependency-aware ordering and parallelization
opportunities.
Attributes:
modules: List of all modules to be executed (includes dependencies)
tools: List of external tools to be executed
execution_order: Dependency-ordered list of modules (topological sort)
parallel_groups: List of lists, each inner list contains modules
that can be executed in parallel with each other
Design Pattern: Data Transfer Object (DTO)
Usage: Created by DependencyResolver, consumed by AnalysisEngine
"""
modules: List[str]
tools: List[str]
execution_order: List[str]
parallel_groups: List[List[str]]
[docs]
class DependencyResolver:
"""
Resolves module dependencies and creates execution plans for analysis workflows.
This class analyzes the dependency graph between analysis modules and creates
optimized execution plans that respect dependencies while maximizing parallel
execution opportunities.
Responsibilities:
- Parse module dependencies from registered analysis modules
- Build dependency graphs and detect circular dependencies
- Perform topological sorting to determine execution order
- Identify modules that can be executed in parallel
- Create structured ExecutionPlan objects for the AnalysisEngine
Design Pattern: Dependency Injection (receives registry)
SOLID Principles: Single Responsibility (only handles dependency resolution)
"""
[docs]
def __init__(self, registry_instance):
self.registry = registry_instance
[docs]
def resolve_dependencies(self, requested_modules: List[str]) -> ExecutionPlan:
"""
Resolve module dependencies and create execution plan
Args:
requested_modules: List of module names to execute
Returns:
ExecutionPlan with proper execution order
"""
# Build dependency graph
dependency_graph = {}
all_modules = set(requested_modules)
# Add dependencies to the set of modules to execute
for module_name in list(all_modules):
module_class = self.registry.get_module(module_name)
if module_class:
instance = module_class({}) # Temporary instance for dependency info
deps = instance.get_dependencies()
dependency_graph[module_name] = deps
all_modules.update(deps)
# Topological sort to get execution order
execution_order = self._topological_sort(dependency_graph, all_modules)
# Identify modules that can run in parallel
parallel_groups = self._identify_parallel_groups(dependency_graph, execution_order)
return ExecutionPlan(
modules=list(all_modules),
tools=[], # Tools are handled separately
execution_order=execution_order,
parallel_groups=parallel_groups
)
def _topological_sort(self, graph: Dict[str, List[str]], nodes: set) -> List[str]:
"""Perform topological sort on dependency graph"""
visited = set()
temp_visited = set()
result = []
def visit(node):
if node in temp_visited:
raise ValueError(f"Circular dependency detected involving {node}")
if node in visited:
return
temp_visited.add(node)
for dependency in graph.get(node, []):
if dependency in nodes: # Only consider requested modules
visit(dependency)
temp_visited.remove(node)
visited.add(node)
result.append(node)
for node in nodes:
if node not in visited:
visit(node)
return result
def _identify_parallel_groups(self, graph: Dict[str, List[str]], execution_order: List[str]) -> List[List[str]]:
"""Identify modules that can be executed in parallel"""
parallel_groups = []
remaining = set(execution_order)
while remaining:
# Find modules with no remaining dependencies
ready = []
for module in execution_order:
if module not in remaining:
continue
deps = graph.get(module, [])
if all(dep not in remaining for dep in deps):
ready.append(module)
if not ready:
# This shouldn't happen if topological sort worked correctly
ready = [remaining.pop()]
parallel_groups.append(ready)
remaining -= set(ready)
return parallel_groups
[docs]
class AnalysisEngine:
"""
Main analysis engine that orchestrates all APK analysis activities.
The AnalysisEngine serves as the central coordinator for the entire analysis
workflow. It manages module execution, external tool integration, result
aggregation, and security assessment orchestration.
Key Features:
- Modular architecture with pluggable analysis modules
- Dependency-aware execution planning and parallel processing
- External tool integration (APKID, Kavanoz, JADX, etc.)
- Comprehensive error handling and resilience
- Security assessment engine integration
- Result aggregation and structured output generation
Architecture Patterns:
- Registry Pattern: For module discovery and management
- Strategy Pattern: For different analysis approaches
- Factory Pattern: For result object creation
- Template Method: For analysis workflow orchestration
SOLID Principles:
- Single Responsibility: Orchestrates analysis workflow
- Open/Closed: Extensible through module registration
- Dependency Inversion: Depends on abstractions (Configuration, modules)
Usage:
config = Configuration()
engine = AnalysisEngine(config)
results = engine.analyze_apk('/path/to/app.apk')
"""
[docs]
def __init__(self, config: Configuration):
self.config = config
self.registry = registry
self.dependency_resolver = DependencyResolver(self.registry)
self.security_engine = SecurityAssessmentEngine(config) if config.enable_security_assessment else None
self.logger = logging.getLogger(__name__)
[docs]
def analyze_apk(self, apk_path: str, requested_modules: Optional[List[str]] = None, androguard_obj: Optional[Any] = None, timestamp: Optional[str] = None) -> FullAnalysisResults:
"""
Perform comprehensive APK analysis
Args:
apk_path: Path to the APK file
requested_modules: Optional list of specific modules to run
androguard_obj: Optional pre-initialized Androguard object
timestamp: Optional timestamp for temporal directory naming
Returns:
FullAnalysisResults containing all analysis results
"""
start_time = time.time()
# Determine which modules to run
if requested_modules is None:
requested_modules = self._get_enabled_modules()
try:
# Set up analysis context (refactored)
context = self._setup_analysis_context(apk_path, androguard_obj, timestamp)
# Process APK with external tools if temporal analysis is enabled
tool_results = {}
if context.temporal_paths:
temporal_manager = TemporalDirectoryManager(self.config, self.logger)
# Process APK with external tools (unzip, JADX, apktool)
self.logger.info("Processing APK with external tools...")
external_tool_results = temporal_manager.process_apk_with_tools(apk_path, context.temporal_paths)
# Log tool execution results
for tool_name, success in external_tool_results.items():
if success:
self.logger.info(f"✓ {tool_name.upper()} completed successfully")
else:
self.logger.warning(f"✗ {tool_name.upper()} failed or was skipped")
tool_results['temporal_processing'] = {
'temporal_directory': str(context.temporal_paths.base_dir),
'tools_executed': external_tool_results
}
# Execute analysis pipeline (refactored)
module_results = self._execute_analysis_pipeline(context, requested_modules)
# Execute remaining external tools (apkid, kavanoz, etc.)
legacy_tool_results = self._execute_external_tools(apk_path)
tool_results.update(legacy_tool_results)
# Perform security assessment if enabled
security_results = None
if self.security_engine:
combined_results = {**module_results, **tool_results}
security_results = self.security_engine.assess(combined_results)
# Create combined results
results = self._create_full_results(module_results, tool_results, security_results, context)
total_time = time.time() - start_time
self.logger.info(f"Analysis completed in {total_time:.2f} seconds")
# Handle cleanup based on configuration (refactored)
if context.temporal_paths and self.config.get_temporal_analysis_config().get('cleanup_after_analysis', False):
self._handle_analysis_cleanup(context.temporal_paths, preserve_on_error=False)
elif context.temporal_paths:
self._handle_analysis_cleanup(context.temporal_paths, preserve_on_error=True)
return results
except Exception as e:
self.logger.error(f"Analysis failed: {str(e)}")
# Handle temporal directory cleanup on error (refactored)
if hasattr(context, 'temporal_paths') and context.temporal_paths:
preserve_on_error = self.config.get_temporal_analysis_config().get('preserve_on_error', True)
self._handle_analysis_cleanup(context.temporal_paths, preserve_on_error)
# Log the full traceback for debugging
import traceback
self.logger.debug(f"Full traceback:\n{traceback.format_exc()}")
raise
def _setup_analysis_context(self, apk_path: str, androguard_obj: Optional[Any] = None, timestamp: Optional[str] = None) -> AnalysisContext:
"""
Set up analysis context and temporal directories for APK analysis.
Single Responsibility: Create AnalysisContext with temporal directory setup
and tool availability checks.
Args:
apk_path: Path to the APK file
androguard_obj: Optional pre-initialized Androguard object
timestamp: Optional timestamp for temporal directory naming
Returns:
AnalysisContext configured for analysis
Raises:
FileNotFoundError: If APK file doesn't exist
"""
import os
if not os.path.exists(apk_path):
raise FileNotFoundError(f"APK file not found: {apk_path}")
# Initialize temporal directory manager
temporal_manager = TemporalDirectoryManager(self.config, self.logger)
temporal_paths = None
if self.config.get_temporal_analysis_config().get('enabled', True):
self.logger.info("Creating temporal directory structure...")
temporal_paths = temporal_manager.create_temporal_directory(apk_path, timestamp)
# Create analysis context
context = AnalysisContext(
apk_path=apk_path,
config=self.config.to_dict(),
androguard_obj=androguard_obj,
temporal_paths=temporal_paths,
jadx_available=temporal_manager.check_tool_availability('jadx'),
apktool_available=temporal_manager.check_tool_availability('apktool')
)
return context
def _execute_analysis_pipeline(self, context: AnalysisContext, requested_modules: List[str]) -> Dict[str, Any]:
"""
Execute the analysis pipeline with requested modules.
Single Responsibility: Execute analysis modules and coordinate their results.
Args:
context: Analysis context with APK and configuration data
requested_modules: List of modules to execute
Returns:
Dict containing analysis results from all executed modules
Raises:
Exception: If module execution fails critically
"""
# Execute analysis modules using existing method
module_results = self._execute_analysis_modules(context, requested_modules)
return module_results
def _handle_analysis_cleanup(self, temporal_paths: Optional[Any], preserve_on_error: bool = True):
"""
Handle cleanup of temporal analysis directories.
Single Responsibility: Manage cleanup of temporal directories based on
configuration and error state.
Args:
temporal_paths: Temporal directory paths object, can be None
preserve_on_error: Whether to preserve files when preserve_on_error is True
"""
if temporal_paths is None:
return
temporal_manager = TemporalDirectoryManager(self.config, self.logger)
if preserve_on_error:
# Don't cleanup when preserving on error
self.logger.info(f"Temporal directory preserved for debugging at: {temporal_paths.base_dir}")
else:
# Cleanup temporal directories - force=True for error scenarios
self.logger.info("Cleaning up temporal directory...")
temporal_manager.cleanup_temporal_directory(temporal_paths, force=True)
def _get_enabled_modules(self) -> List[str]:
"""Get list of enabled modules from configuration"""
enabled_modules = []
for module_name in self.registry.list_modules():
module_config = self.config.get_module_config(module_name)
if module_config.get('enabled', True):
enabled_modules.append(module_name)
return enabled_modules
def _execute_analysis_modules(self, context: AnalysisContext, requested_modules: List[str]) -> Dict[str, BaseResult]:
"""Execute analysis modules in dependency order"""
execution_plan = self.dependency_resolver.resolve_dependencies(requested_modules)
results = {}
self.logger.info(f"Executing modules in order: {execution_plan.execution_order}")
for parallel_group in execution_plan.parallel_groups:
if len(parallel_group) == 1:
# Single module - execute directly
module_name = parallel_group[0]
results[module_name] = self._execute_single_module(module_name, context)
else:
# Multiple modules - execute in parallel
parallel_results = self._execute_modules_parallel(parallel_group, context)
results.update(parallel_results)
# Update context with results for next group
for module_name, result in results.items():
if module_name in parallel_group:
context.add_result(module_name, result)
return results
def _execute_single_module(self, module_name: str, context: AnalysisContext) -> BaseResult:
"""Execute a single analysis module"""
start_time = time.time()
try:
module_class = self.registry.get_module(module_name)
if not module_class:
raise ValueError(f"Module {module_name} not found in registry")
module_config = self.config.get_module_config(module_name)
module = module_class(module_config)
if not module.is_enabled():
self.logger.info(f"Module {module_name} is disabled, skipping")
result = BaseResult(
module_name=module_name,
status=AnalysisStatus.SKIPPED,
execution_time=0
)
return result
self.logger.info(f"Executing module: {module_name}")
result = module.analyze(context.apk_path, context)
result.execution_time = time.time() - start_time
self.logger.info(f"Module {module_name} completed in {result.execution_time:.2f}s")
return result
except Exception as e:
execution_time = time.time() - start_time
import traceback
error_details = traceback.format_exc()
# Use colored error message
print(f"\033[91m[-] {module_name.title()} analysis failed: {str(e)}\033[0m")
self.logger.error(f"Module {module_name} failed: {str(e)}")
self.logger.debug(f"Module {module_name} error details:\n{error_details}")
return BaseResult(
module_name=module_name,
status=AnalysisStatus.FAILURE,
execution_time=execution_time,
error_message=str(e)
)
def _execute_modules_parallel(self, module_names: List[str], context: AnalysisContext) -> Dict[str, BaseResult]:
"""Execute multiple modules in parallel"""
results = {}
max_workers = self.config.max_workers
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all modules for execution
future_to_module = {
executor.submit(self._execute_single_module, module_name, context): module_name
for module_name in module_names
}
# Collect results as they complete
for future in as_completed(future_to_module):
module_name = future_to_module[future]
try:
result = future.result()
results[module_name] = result
except Exception as e:
self.logger.error(f"Parallel execution of {module_name} failed: {str(e)}")
results[module_name] = BaseResult(
module_name=module_name,
status=AnalysisStatus.FAILURE,
error_message=str(e)
)
return results
[docs]
def _build_apk_overview(self, module_results: Dict[str, BaseResult]) -> 'APKOverview':
"""
Build APK overview from analysis results.
Single Responsibility: Create APK overview object from module results.
Args:
module_results: Dictionary of module analysis results
Returns:
APKOverview object populated with analysis data
"""
from ..results.apkOverviewResults import APKOverview
apk_overview = APKOverview()
# Extract data from APK overview analysis if available
apk_overview_result = module_results.get('apk_overview')
if apk_overview_result and apk_overview_result.status.value == 'success':
# Copy fields from the APK overview result
for field_name in ['general_info', 'components', 'permissions', 'certificates',
'native_libs', 'directory_listing']:
if hasattr(apk_overview_result, field_name):
setattr(apk_overview, field_name, getattr(apk_overview_result, field_name))
# Handle cross-platform detection
if hasattr(apk_overview_result, 'is_cross_platform'):
apk_overview.is_cross_platform = apk_overview_result.is_cross_platform
apk_overview.cross_platform_framework = apk_overview_result.cross_platform_framework
else:
# Fallback: Extract basic data from manifest analysis for overview
self._apply_manifest_fallback_to_overview(apk_overview, module_results)
return apk_overview
def _apply_manifest_fallback_to_overview(self, apk_overview: 'APKOverview',
module_results: Dict[str, BaseResult]) -> None:
"""
Apply manifest analysis data as fallback for APK overview.
Single Responsibility: Handle fallback data extraction from manifest analysis.
Args:
apk_overview: APK overview object to populate
module_results: Dictionary of module analysis results
"""
manifest_result = module_results.get('manifest_analysis')
if manifest_result and manifest_result.status.value == 'success':
if hasattr(manifest_result, 'package_name'):
apk_overview.app_name = manifest_result.package_name
apk_overview.main_activity = manifest_result.main_activity
[docs]
def _build_in_depth_analysis(self, module_results: Dict[str, BaseResult],
context: AnalysisContext) -> 'Results':
"""
Build in-depth analysis results from module outputs.
Single Responsibility: Create in-depth analysis object from module results.
Args:
module_results: Dictionary of module analysis results
context: Analysis context for fallback operations
Returns:
Results object populated with in-depth analysis data
"""
from ..results.InDepthAnalysisResults import Results
in_depth_analysis = Results()
# Map module results to in-depth analysis structure
self._map_manifest_results(in_depth_analysis, module_results)
self._map_permission_results(in_depth_analysis, module_results)
self._map_signature_results(in_depth_analysis, module_results)
self._map_string_results(in_depth_analysis, module_results, context)
self._map_library_results(in_depth_analysis, module_results)
self._map_tracker_results(in_depth_analysis, module_results)
self._map_behavior_results(in_depth_analysis, module_results)
return in_depth_analysis
[docs]
def _map_manifest_results(self, in_depth_analysis: 'Results',
module_results: Dict[str, BaseResult]) -> None:
"""Map manifest analysis results to in-depth analysis."""
manifest_result = module_results.get('manifest_analysis')
if manifest_result and manifest_result.status.value == 'success':
if hasattr(manifest_result, 'intent_filters'):
in_depth_analysis.intents = manifest_result.intent_filters
[docs]
def _map_permission_results(self, in_depth_analysis: 'Results',
module_results: Dict[str, BaseResult]) -> None:
"""Map permission analysis results to in-depth analysis."""
permission_result = module_results.get('permission_analysis')
if permission_result and permission_result.status.value == 'success':
if hasattr(permission_result, 'critical_permissions'):
in_depth_analysis.filtered_permissions = permission_result.critical_permissions
[docs]
def _map_signature_results(self, in_depth_analysis: 'Results',
module_results: Dict[str, BaseResult]) -> None:
"""
Map signature detection results to in-depth analysis structure.
Single Responsibility: Extract and map signature detection data to Results object.
Args:
in_depth_analysis: Results object to populate with signature data
module_results: Dictionary containing module analysis results
Side Effects:
Modifies in_depth_analysis.signatures if signature analysis succeeded
"""
signature_result = module_results.get('signature_detection')
if signature_result and signature_result.status.value == 'success':
if hasattr(signature_result, 'signatures'):
in_depth_analysis.signatures = signature_result.signatures
[docs]
def _map_string_results(self, in_depth_analysis: 'Results',
module_results: Dict[str, BaseResult],
context: AnalysisContext) -> None:
"""
Map string analysis results to in-depth analysis with fallback support.
This method handles string analysis results with built-in fallback logic.
If string analysis module failed, it uses legacy string extraction methods
to ensure string data is always available.
Single Responsibility: Map string analysis data with fallback handling.
Args:
in_depth_analysis: Results object to populate with string data
module_results: Dictionary containing module analysis results
context: Analysis context for fallback string extraction
Side Effects:
Modifies in_depth_analysis string fields (strings_emails, strings_ip, etc.)
May trigger fallback string extraction if module analysis failed
"""
string_result = module_results.get('string_analysis')
self.logger.debug(f"String analysis result found: {string_result is not None}")
if string_result and string_result.status.value == 'success':
self._apply_successful_string_results(in_depth_analysis, string_result)
else:
# Fallback to old string analysis method if new module failed
self._apply_string_analysis_fallback(in_depth_analysis, context)
[docs]
def _apply_successful_string_results(self, in_depth_analysis: 'Results',
string_result: BaseResult) -> None:
"""
Apply successful string analysis results to in-depth analysis.
Single Responsibility: Map successful string analysis fields to Results object.
Args:
in_depth_analysis: Results object to populate
string_result: Successful string analysis result with extracted data
Side Effects:
Populates string fields in in_depth_analysis (emails, ip_addresses, urls, domains)
Logs debug information about string counts
"""
self.logger.debug("Processing successful string analysis results")
string_fields = ['emails', 'ip_addresses', 'urls', 'domains']
result_fields = ['strings_emails', 'strings_ip', 'strings_urls', 'strings_domain']
for string_field, result_field in zip(string_fields, result_fields):
if hasattr(string_result, string_field):
value = getattr(string_result, string_field)
setattr(in_depth_analysis, result_field, value)
self.logger.debug(f"Found {len(value)} {string_field}")
[docs]
def _apply_string_analysis_fallback(self, in_depth_analysis: 'Results',
context: AnalysisContext) -> None:
"""
Apply fallback string analysis when string module failed.
This method provides resilience by using legacy string extraction methods
when the string analysis module fails. It directly processes the APK using
Androguard objects to extract string data.
Single Responsibility: Handle fallback string extraction from Androguard objects.
Args:
in_depth_analysis: Results object to populate with fallback string data
context: Analysis context containing APK path and Androguard objects
Side Effects:
Populates string fields using legacy string_analysis_execute function
Logs fallback operation and any errors encountered
Raises:
None: Handles all exceptions gracefully and logs errors
"""
self.logger.debug("🔄 String analysis module failed, using fallback method")
try:
from ..string_analysis.string_analysis_module import string_analysis_execute
androguard_obj = context.androguard_obj
if androguard_obj:
self.logger.debug("📁 Running fallback string extraction from DEX objects")
old_results = string_analysis_execute(context.apk_path, androguard_obj)
if old_results and len(old_results) >= 5:
# Process fallback results
in_depth_analysis.strings_emails = list(old_results[0]) if old_results[0] else []
in_depth_analysis.strings_ip = list(old_results[1]) if old_results[1] else []
in_depth_analysis.strings_urls = list(old_results[2]) if old_results[2] else []
in_depth_analysis.strings_domain = list(old_results[3]) if old_results[3] else []
self.logger.debug(f"Fallback found: {len(in_depth_analysis.strings_emails)} emails, "
f"{len(in_depth_analysis.strings_ip)} IPs, "
f"{len(in_depth_analysis.strings_urls)} URLs, "
f"{len(in_depth_analysis.strings_domain)} domains")
except Exception as e:
self.logger.error(f"String analysis fallback failed: {str(e)}")
[docs]
def _map_library_results(self, in_depth_analysis: 'Results',
module_results: Dict[str, BaseResult]) -> None:
"""
Map library detection results to in-depth analysis structure.
Single Responsibility: Extract and map library detection data to Results object.
Args:
in_depth_analysis: Results object to populate with library data
module_results: Dictionary containing module analysis results
Side Effects:
Modifies in_depth_analysis.libraries if library detection succeeded
"""
library_result = module_results.get('library_detection')
if library_result and library_result.status.value == 'success':
if hasattr(library_result, 'detected_libraries'):
in_depth_analysis.libraries = library_result.detected_libraries
[docs]
def _map_tracker_results(self, in_depth_analysis: 'Results',
module_results: Dict[str, BaseResult]) -> None:
"""
Map tracker analysis results to in-depth analysis structure.
Single Responsibility: Extract and map tracker analysis data to Results object.
Args:
in_depth_analysis: Results object to populate with tracker data
module_results: Dictionary containing module analysis results
Side Effects:
Modifies in_depth_analysis.trackers if tracker analysis succeeded
"""
tracker_result = module_results.get('tracker_analysis')
if tracker_result and tracker_result.status.value == 'success':
if hasattr(tracker_result, 'detected_trackers'):
in_depth_analysis.trackers = tracker_result.detected_trackers
[docs]
def _map_behavior_results(self, in_depth_analysis: 'Results',
module_results: Dict[str, BaseResult]) -> None:
"""
Map behavior analysis results to in-depth analysis structure.
Single Responsibility: Extract and map behavior analysis data to Results object.
Args:
in_depth_analysis: Results object to populate with behavior data
module_results: Dictionary containing module analysis results
Side Effects:
Modifies in_depth_analysis.behaviors if behavior analysis succeeded
"""
behavior_result = module_results.get('behaviour_analysis')
if behavior_result and behavior_result.status.value == 'success':
if hasattr(behavior_result, 'behaviors'):
in_depth_analysis.behaviors = behavior_result.behaviors
def _execute_external_tools(self, apk_path: str) -> Dict[str, Any]:
"""Execute external tools"""
results = {}
enabled_tools = self._get_enabled_tools()
for tool_name in enabled_tools:
try:
tool_class = self.registry.get_tool(tool_name)
if not tool_class:
self.logger.warning(f"Tool {tool_name} not found in registry")
continue
tool_config = self.config.get_tool_config(tool_name)
tool = tool_class(tool_config)
if not tool.is_available():
self.logger.warning(f"Tool {tool_name} is not available on system")
continue
self.logger.info(f"Executing tool: {tool_name}")
start_time = time.time()
result = tool.execute(apk_path)
execution_time = time.time() - start_time
results[tool_name] = {
'result': result,
'execution_time': execution_time,
'status': 'success'
}
self.logger.info(f"Tool {tool_name} completed in {execution_time:.2f}s")
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"\033[93m[W] {tool_name} tool failed: {str(e)}\033[0m")
self.logger.error(f"Tool {tool_name} failed: {str(e)}")
self.logger.debug(f"Tool {tool_name} error details:\n{error_details}")
results[tool_name] = {
'result': None,
'execution_time': 0,
'status': 'failure',
'error': str(e)
}
return results
def _get_enabled_tools(self) -> List[str]:
"""Get list of enabled external tools"""
enabled_tools = []
for tool_name in self.registry.list_tools():
tool_config = self.config.get_tool_config(tool_name)
if tool_config.get('enabled', True):
enabled_tools.append(tool_name)
return enabled_tools
def _create_full_results(self, module_results: Dict[str, BaseResult],
tool_results: Dict[str, Any],
security_results: Optional[Any],
context: AnalysisContext) -> FullAnalysisResults:
"""
Create comprehensive results object using focused builder methods.
Single Responsibility: Orchestrate the creation of FullAnalysisResults
by delegating to specialized builder methods.
Args:
module_results: Dictionary of module analysis results
tool_results: Dictionary of tool execution results
security_results: Optional security assessment results
context: Analysis context for fallback operations
Returns:
FullAnalysisResults object with all analysis data
"""
# Build different result components using focused methods
apk_overview = self._build_apk_overview(module_results)
in_depth_analysis = self._build_in_depth_analysis(module_results, context)
apkid_results, kavanoz_results = self._build_tool_results(tool_results)
# Assemble final results object
full_results = FullAnalysisResults()
full_results.apk_overview = apk_overview
full_results.in_depth_analysis = in_depth_analysis
full_results.apkid_analysis = apkid_results
full_results.kavanoz_analysis = kavanoz_results
# Add individual module results for direct access (with proper conversions)
# Convert library detection result to the proper format for console display
library_result = module_results.get('library_detection')
if library_result:
from ..results.LibraryDetectionResults import LibraryDetectionResults
full_results.library_detection = LibraryDetectionResults(library_result)
else:
full_results.library_detection = None
# Convert tracker analysis result to the proper format for console display
tracker_result = module_results.get('tracker_analysis')
if tracker_result:
from ..results.TrackerAnalysisResults import TrackerAnalysisResults
full_results.tracker_analysis = TrackerAnalysisResults(tracker_result)
else:
full_results.tracker_analysis = None
full_results.behaviour_analysis = module_results.get('behaviour_analysis')
# Add security results if available
if security_results:
full_results.security_assessment = security_results.to_dict() if hasattr(security_results, 'to_dict') else security_results
return full_results