Source code for dexray_insight.modules.behaviour_analysis.behaviour_analysis_module

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Behaviour Analysis Module - Refactored

Main coordinator for behavioral analysis with fast/deep modes.
Delegates specific analysis tasks to specialized submodules.
"""

import time
import logging
from typing import List, Dict, Any

from dexray_insight.core.base_classes import BaseAnalysisModule, AnalysisContext, AnalysisStatus, register_module
from dexray_insight.results.BehaviourAnalysisResults import BehaviourAnalysisResults

# Import specialized analyzers
from .analyzers.device_analyzer import DeviceAnalyzer
from .analyzers.telephony_analyzer import TelephonyAnalyzer
from .analyzers.system_analyzer import SystemAnalyzer
from .analyzers.media_analyzer import MediaAnalyzer
from .analyzers.reflection_analyzer import ReflectionAnalyzer

# Import modes and engines
from .modes.mode_manager import ModeManager
from .modes.fast_mode_analyzer import FastModeAnalyzer

# Import models


[docs] @register_module('behaviour_analysis') class BehaviourAnalysisModule(BaseAnalysisModule): """Refactored module for behavioral analysis with specialized components"""
[docs] def __init__(self, config: Dict[str, Any]): super().__init__(config) self.logger = logging.getLogger(__name__) # Initialize specialized components self.mode_manager = ModeManager(config, self.logger) self.fast_mode_analyzer = FastModeAnalyzer(self.logger) # Initialize deep mode analyzers self.device_analyzer = DeviceAnalyzer(self.logger) self.telephony_analyzer = TelephonyAnalyzer(self.logger) self.system_analyzer = SystemAnalyzer(self.logger) self.media_analyzer = MediaAnalyzer(self.logger) self.reflection_analyzer = ReflectionAnalyzer(self.logger)
[docs] def get_name(self) -> str: return "Behaviour Analysis"
[docs] def get_description(self) -> str: return "Performs behavioral analysis to detect privacy-sensitive behaviors. Supports fast mode (APK only) and deep mode (full DEX analysis)"
[docs] def get_dependencies(self) -> List[str]: return ["apk_overview"]
[docs] def get_priority(self) -> int: return 1000
[docs] def analyze(self, apk_path: str, context: AnalysisContext) -> BehaviourAnalysisResults: """ Coordinate behavioral analysis using specialized components Args: apk_path: Path to APK file context: Analysis context Returns: BehaviourAnalysisResults with behavioral findings """ start_time = time.time() try: # Check if module is enabled if not self.mode_manager.is_module_enabled(context): return BehaviourAnalysisResults( module_name="behaviour_analysis", status=AnalysisStatus.SKIPPED, error_message="Behaviour analysis module disabled in configuration", execution_time=time.time() - start_time ) # Determine analysis mode is_deep_mode, mode_str = self.mode_manager.determine_analysis_mode(context) self.logger.info(f"Starting behaviour analysis in {mode_str} mode...") # Validate androguard object availability if not context.androguard_obj: return BehaviourAnalysisResults( module_name="behaviour_analysis", status=AnalysisStatus.FAILURE, error_message="Androguard object not available in context", execution_time=time.time() - start_time ) # Prepare analysis objects analysis_objects = self.mode_manager.prepare_analysis_objects(context, is_deep_mode) # Initialize result result = BehaviourAnalysisResults( module_name="behaviour_analysis", status=AnalysisStatus.SUCCESS, execution_time=0.0 ) # Store analysis objects in result for security analysis access self.mode_manager.store_analysis_objects_in_result(result, analysis_objects) # Perform analysis based on mode if is_deep_mode: self._perform_deep_analysis(analysis_objects, result) else: self._perform_fast_analysis(analysis_objects, result) # Generate summary result.summary = self.mode_manager.generate_analysis_summary(result, is_deep_mode) result.execution_time = time.time() - start_time detected_count = len(result.get_detected_features()) total_count = len(result.findings) self.logger.info(f"Behaviour analysis ({mode_str} mode) completed in {result.execution_time:.2f}s - {detected_count}/{total_count} behaviors detected") return result except Exception as e: execution_time = time.time() - start_time self.logger.error(f"Behaviour analysis failed: {str(e)}") return BehaviourAnalysisResults( module_name="behaviour_analysis", status=AnalysisStatus.FAILURE, error_message=str(e), execution_time=execution_time )
def _perform_deep_analysis(self, analysis_objects: Dict[str, Any], result: BehaviourAnalysisResults) -> None: """Perform deep analysis using all available analyzers""" apk_obj = analysis_objects['apk_obj'] dex_obj = analysis_objects['dex_obj'] dx_obj = analysis_objects['dx_obj'] try: # Device information analysis self.device_analyzer.analyze_device_model_access(apk_obj, dex_obj, dx_obj, result) self.device_analyzer.analyze_android_version_access(apk_obj, dex_obj, dx_obj, result) # Telephony analysis self.telephony_analyzer.analyze_imei_access(apk_obj, dex_obj, dx_obj, result) self.telephony_analyzer.analyze_phone_number_access(apk_obj, dex_obj, dx_obj, result) # System analysis self.system_analyzer.analyze_clipboard_usage(apk_obj, dex_obj, dx_obj, result) self.system_analyzer.analyze_dynamic_receivers(apk_obj, dex_obj, dx_obj, result) self.system_analyzer.analyze_running_services_access(apk_obj, dex_obj, dx_obj, result) self.system_analyzer.analyze_installed_applications(apk_obj, dex_obj, dx_obj, result) self.system_analyzer.analyze_installed_packages(apk_obj, dex_obj, dx_obj, result) # Media analysis self.media_analyzer.analyze_camera_access(apk_obj, dex_obj, dx_obj, result) # Reflection analysis self.reflection_analyzer.analyze_reflection_usage(apk_obj, dex_obj, dx_obj, result) except Exception as e: self.logger.error(f"Deep analysis failed: {e}") raise def _perform_fast_analysis(self, analysis_objects: Dict[str, Any], result: BehaviourAnalysisResults) -> None: """Perform fast analysis using only APK object""" apk_obj = analysis_objects['apk_obj'] try: # Basic permission analysis self.fast_mode_analyzer.analyze_basic_permissions(apk_obj, result) # Basic component analysis self.fast_mode_analyzer.analyze_basic_components(apk_obj, result) # App metadata analysis self.fast_mode_analyzer.analyze_app_metadata(apk_obj, result) except Exception as e: self.logger.error(f"Fast analysis failed: {e}") raise