Source code for dexray_insight.modules.tracker_analysis.tracker_analysis_module

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Tracker Analysis Module - Refactored Main Module

Advertising and analytics tracker detection module using specialized detectors.
Refactored to use submodules following Single Responsibility Principle.

Phase 7 TDD Refactoring: Main module now delegates to specialized detectors
and imports databases from dedicated submodules.
"""

import time
import logging
from typing import List, Dict, Any, Set
from dataclasses import dataclass

from dexray_insight.core.base_classes import BaseAnalysisModule, BaseResult, AnalysisContext, AnalysisStatus, register_module

# Import from submodules
from .models import DetectedTracker
from .databases import TrackerDatabase, ExodusAPIClient
from .detectors import PatternDetector, VersionExtractor, TrackerDeduplicator


@dataclass
class TrackerAnalysisResult(BaseResult):
    """Result class for tracker analysis"""
    detected_trackers: List[DetectedTracker] = None
    total_trackers: int = 0
    exodus_trackers: List[Dict[str, Any]] = None
    custom_detections: List[DetectedTracker] = None
    analysis_errors: List[str] = None
    
    def __post_init__(self):
        if self.detected_trackers is None:
            self.detected_trackers = []
        if self.custom_detections is None:
            self.custom_detections = []
        if self.analysis_errors is None:
            self.analysis_errors = []
        self.total_trackers = len(self.detected_trackers)
    
    def to_dict(self) -> Dict[str, Any]:
        base_dict = super().to_dict()
        base_dict.update({
            'detected_trackers': [tracker.to_dict() for tracker in self.detected_trackers],
            'total_trackers': self.total_trackers,
            'custom_detections': [tracker.to_dict() for tracker in self.custom_detections],
            'analysis_errors': self.analysis_errors
        })
        return base_dict


[docs] @register_module('tracker_analysis') class TrackerAnalysisModule(BaseAnalysisModule): """ Advertising and analytics tracker detection module. Phase 7 TDD Refactoring: Refactored to use specialized detectors and databases from dedicated submodules following SRP. """
[docs] def __init__(self, config: Dict[str, Any]): super().__init__(config) self.logger = logging.getLogger(__name__) # Initialize specialized components self.tracker_database = TrackerDatabase() self.exodus_client = ExodusAPIClient(config) self.pattern_detector = PatternDetector() self.version_extractor = VersionExtractor() self.deduplicator = TrackerDeduplicator() # Configuration self.fetch_exodus_trackers = config.get('fetch_exodus_trackers', True)
[docs] def get_dependencies(self) -> List[str]: """Dependencies: string analysis for pattern matching""" return ['string_analysis']
[docs] def analyze(self, apk_path: str, context: AnalysisContext) -> TrackerAnalysisResult: """ Perform tracker detection analysis using specialized detectors. Refactored coordinator function that delegates to specialized detection components following the Single Responsibility Principle. Each detection concern is handled by a dedicated detector with its own logic and error management. Args: apk_path: Path to the APK file context: Analysis context Returns: TrackerAnalysisResult with comprehensive detection results """ start_time = time.time() self.logger.info(f"Starting tracker analysis for {apk_path}") try: detected_trackers = [] analysis_errors = [] exodus_trackers = [] custom_detections = [] # Extract strings from analysis context all_strings = self._extract_strings_from_context(context, analysis_errors) self.logger.debug(f"Analyzing {len(all_strings)} strings for tracker patterns") # Phase 1: Fetch Exodus Privacy trackers if enabled if self.fetch_exodus_trackers and self.exodus_client.is_enabled(): try: exodus_trackers = self.exodus_client.fetch_trackers() self.logger.debug(f"Loaded {len(exodus_trackers)} trackers from Exodus Privacy") except Exception as e: error_msg = f"Failed to fetch Exodus Privacy trackers: {str(e)}" self.logger.warning(error_msg) analysis_errors.append(error_msg) # Phase 2: Detect trackers using built-in database custom_detections = self._detect_custom_trackers(all_strings, context) detected_trackers.extend(custom_detections) # Phase 3: Detect trackers using Exodus Privacy patterns if exodus_trackers: exodus_detections = self._detect_exodus_trackers(all_strings, exodus_trackers, context) detected_trackers.extend(exodus_detections) # Phase 4: Remove duplicates and finalize results unique_trackers = self.deduplicator.deduplicate_trackers(detected_trackers) execution_time = time.time() - start_time # Log summary self._log_detection_summary(unique_trackers) return TrackerAnalysisResult( module_name=self.name, status=AnalysisStatus.SUCCESS, execution_time=execution_time, detected_trackers=unique_trackers, total_trackers=len(unique_trackers), custom_detections=custom_detections, analysis_errors=analysis_errors ) except Exception as e: execution_time = time.time() - start_time self.logger.error(f"Tracker analysis failed: {str(e)}") return TrackerAnalysisResult( module_name=self.name, status=AnalysisStatus.FAILURE, execution_time=execution_time, error_message=str(e), total_trackers=0, analysis_errors=[str(e)] )
def _extract_strings_from_context(self, context: AnalysisContext, analysis_errors: List[str]) -> Set[str]: """ Extract all available strings from the analysis context. Args: context: Analysis context with string analysis results analysis_errors: List to append any errors to Returns: Set of all strings for pattern matching """ all_strings = set() # Get strings from string analysis module string_analysis = context.get_result('string_analysis') if not string_analysis: self.logger.warning("String analysis results not available, limited tracker detection") return all_strings # Collect strings from different categories if hasattr(string_analysis, 'urls') and string_analysis.urls: all_strings.update(string_analysis.urls) if hasattr(string_analysis, 'domains') and string_analysis.domains: all_strings.update(string_analysis.domains) if hasattr(string_analysis, 'emails') and string_analysis.emails: all_strings.update(string_analysis.emails) # Extract raw strings from androguard if available string_locations = {} if context.androguard_obj: try: dex_obj = context.androguard_obj.get_androguard_dex() if dex_obj: for dex in dex_obj: # Extract strings with class/method context for class_analysis in dex.get_classes(): class_name = class_analysis.get_name() for method in class_analysis.get_methods(): method_name = method.get_name() method_full_name = f"{class_name}->{method_name}" # Get strings from method bytecode try: for instruction in method.get_instructions(): if hasattr(instruction, 'get_operands'): for operand in instruction.get_operands(): if hasattr(operand, 'get_value'): operand_value = operand.get_value() if isinstance(operand_value, str) and len(operand_value) > 3: all_strings.add(operand_value) if operand_value not in string_locations: string_locations[operand_value] = [] string_locations[operand_value].append(method_full_name) except Exception: pass # Skip errors in instruction parsing # Also get all strings from DEX (fallback) for string in dex.get_strings(): string_value = str(string) all_strings.add(string_value) # If no specific location found, mark as generic if string_value not in string_locations: string_locations[string_value] = ["DEX strings pool"] except Exception as e: self.logger.warning(f"Error extracting raw strings: {str(e)}") # Fallback to simple string extraction try: dex_obj = context.androguard_obj.get_androguard_dex() if dex_obj: for dex in dex_obj: for string in dex.get_strings(): string_value = str(string) all_strings.add(string_value) string_locations[string_value] = ["DEX strings pool"] except Exception: pass # Store string locations in context for use in pattern matching context.string_locations = string_locations return all_strings def _detect_custom_trackers(self, strings: Set[str], context: AnalysisContext) -> List[DetectedTracker]: """Detect trackers using built-in tracker database""" detected = [] tracker_database = self.tracker_database.get_tracker_database() for tracker_name, tracker_info in tracker_database.items(): detection_results = self.pattern_detector.detect_tracker_patterns( tracker_name, tracker_info, strings, context ) if detection_results: detected.extend(detection_results) return detected def _detect_exodus_trackers(self, strings: Set[str], exodus_trackers: List[Dict[str, Any]], context: AnalysisContext) -> List[DetectedTracker]: """Detect trackers using Exodus Privacy patterns""" detected = [] for tracker_info in exodus_trackers: detection_results = self.pattern_detector.detect_exodus_patterns( tracker_info, strings, context ) if detection_results: detected.extend(detection_results) return detected def _log_detection_summary(self, trackers: List[DetectedTracker]): """Log a summary of detected trackers""" self.logger.info(f"Tracker analysis completed: {len(trackers)} trackers detected") for tracker in trackers: version_info = f" (v{tracker.version})" if tracker.version else "" self.logger.info(f"📍 {tracker.name}{version_info} - {tracker.category}") # Log category breakdown if trackers: categories = self.deduplicator.group_by_category(trackers) self.logger.debug(f"Trackers by category: {dict((k, len(v)) for k, v in categories.items())}")
[docs] def validate_config(self) -> bool: """Validate module configuration""" # Validate Exodus client configuration if self.fetch_exodus_trackers and not self.exodus_client.is_enabled(): self.logger.warning("Exodus tracker fetching enabled but client is disabled due to invalid configuration") return True