#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# #!/usr/bin/env python3
# # -*- coding: utf-8 -*-
#
# # Copyright (C) {{ year }} Dexray Insight Contributors
# #
# # This file is part of Dexray Insight - Android APK Security Analysis Tool
# #
# # Licensed under the Apache License, Version 2.0 (the "License");
# # you may not use this file except in compliance with the License.
# # You may obtain a copy of the License at
# #
# # http://www.apache.org/licenses/LICENSE-2.0
# #
# # Unless required by applicable law or agreed to in writing, software
# # distributed under the License is distributed on an "AS IS" BASIS,
# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# # See the License for the specific language governing permissions and
# # limitations under the License.
"""
Library Detection Module - Refactored Main Module.
Third-party library detection module using multi-stage analysis with specialized engines.
Refactored to use submodules following Single Responsibility Principle.
"""
import logging
import re
from dataclasses import dataclass
from typing import Any
from typing import Optional
from dexray_insight.core.base_classes import AnalysisContext
from dexray_insight.core.base_classes import BaseAnalysisModule
from dexray_insight.core.base_classes import BaseResult
from dexray_insight.core.base_classes import register_module
from dexray_insight.results.LibraryDetectionResults import DetectedLibrary
from dexray_insight.results.LibraryDetectionResults import LibraryCategory
from dexray_insight.results.LibraryDetectionResults import LibraryDetectionMethod
from dexray_insight.results.LibraryDetectionResults import LibrarySource
from dexray_insight.results.LibraryDetectionResults import LibraryType
from .engines import LibraryDetectionCoordinator
# Import from submodules
from .patterns import LIBRARY_PATTERNS
from .signatures import ClassSignatureExtractor
from .signatures import SignatureMatcher
@dataclass
class LibraryDetectionResult(BaseResult):
"""Result class for library detection analysis."""
detected_libraries: list[DetectedLibrary] = None
total_libraries: int = 0
heuristic_libraries: list[DetectedLibrary] = None
similarity_libraries: list[DetectedLibrary] = None
analysis_errors: list[str] = None
stage1_time: float = 0.0
stage2_time: float = 0.0
def __post_init__(self):
"""Initialize default values for optional fields."""
if self.detected_libraries is None:
self.detected_libraries = []
if self.heuristic_libraries is None:
self.heuristic_libraries = []
if self.similarity_libraries is None:
self.similarity_libraries = []
if self.analysis_errors is None:
self.analysis_errors = []
self.total_libraries = len(self.detected_libraries)
def to_dict(self) -> dict[str, Any]:
"""Convert result to dictionary."""
base_dict = super().to_dict()
base_dict.update(
{
"detected_libraries": [lib.to_dict() for lib in self.detected_libraries],
"total_libraries": self.total_libraries,
"heuristic_libraries": [lib.to_dict() for lib in self.heuristic_libraries],
"similarity_libraries": [lib.to_dict() for lib in self.similarity_libraries],
"analysis_errors": self.analysis_errors,
"stage1_time": self.stage1_time,
"stage2_time": self.stage2_time,
}
)
return base_dict
def export_to_dict(self) -> dict[str, Any]:
"""Export all results to dictionary format for CVE scanning compatibility."""
return {
"detected_libraries": [lib.to_dict() for lib in self.detected_libraries],
"total_libraries": self.total_libraries,
"heuristic_detections": [lib.to_dict() for lib in self.heuristic_libraries],
"similarity_detections": [lib.to_dict() for lib in self.similarity_libraries],
"analysis_errors": self.analysis_errors,
"execution_time": getattr(self, "execution_time", 0.0),
"stage1_time": self.stage1_time,
"stage2_time": self.stage2_time,
}
[docs]
@register_module("library_detection")
class LibraryDetectionModule(BaseAnalysisModule):
"""
Third-party library detection module using multi-stage analysis.
Phase 6.5 TDD Refactoring: Refactored to use specialized engines and
patterns/signatures from dedicated submodules following SRP.
"""
[docs]
def __init__(self, config: dict[str, Any]):
"""Initialize LibraryDetectionModule with configuration."""
super().__init__(config)
self.logger = logging.getLogger(__name__)
# Configuration options
self.enable_stage1 = config.get("enable_heuristic", True)
self.enable_stage2 = config.get("enable_similarity", True)
self.confidence_threshold = config.get("confidence_threshold", 0.7)
self.similarity_threshold = config.get("similarity_threshold", 0.85)
self.class_similarity_threshold = config.get("class_similarity_threshold", 0.7)
# Import library patterns from patterns submodule
self.LIBRARY_PATTERNS = LIBRARY_PATTERNS.copy()
# Custom library patterns from config
self.custom_patterns = config.get("custom_patterns", {})
if self.custom_patterns:
self.LIBRARY_PATTERNS.update(self.custom_patterns)
# Initialize specialized signature components
self.signature_extractor = ClassSignatureExtractor()
self.signature_matcher = SignatureMatcher(self.similarity_threshold)
# Phase 6 TDD Refactoring: Initialize detection coordinator
self.detection_coordinator = LibraryDetectionCoordinator(self)
[docs]
def get_dependencies(self) -> list[str]:
"""Dependencies: string analysis for class names, manifest analysis for permissions/services, native analysis for native library integration."""
return ["string_analysis", "manifest_analysis", "native_analysis"]
[docs]
def analyze(self, apk_path: str, context: AnalysisContext) -> LibraryDetectionResult:
"""
Perform comprehensive library detection analysis using specialized detection engines.
Refactored coordinator function that delegates to specialized detection engines
following the Single Responsibility Principle. Each detection concern is handled
by a dedicated engine with its own timing and error management.
Args:
apk_path: Path to the APK file
context: Analysis context
Returns:
LibraryDetectionResult with comprehensive detection results
"""
# Phase 6.5 TDD Refactoring: Delegate to specialized coordinator from engines submodule
return self.detection_coordinator.execute_full_analysis(apk_path, context)
# Legacy detection methods - kept for backward compatibility
# These will be called by the engines
def _perform_heuristic_detection(self, context: AnalysisContext, errors: list[str]) -> list[DetectedLibrary]:
"""
Stage 1: Heuristic-based library detection using known patterns.
Args:
context: Analysis context with existing results
errors: List to append any analysis errors
Returns:
List of detected libraries using heuristic methods
"""
detected_libraries = []
try:
# Get existing analysis results
string_results = context.get_result("string_analysis")
manifest_results = context.get_result("manifest_analysis")
if not string_results:
errors.append("String analysis results not available for heuristic detection")
return detected_libraries
# Extract all strings for pattern matching
all_strings = getattr(string_results, "all_strings", [])
if not all_strings:
self.logger.warning("No strings available from string analysis")
all_strings = []
# Extract package names from class names
package_names = self._extract_package_names(all_strings)
class_names = self._extract_class_names(all_strings)
self.logger.debug(f"Found {len(package_names)} unique package names and {len(class_names)} class names")
# Check each known library pattern
for lib_name, pattern in self.LIBRARY_PATTERNS.items():
library = self._check_library_pattern(lib_name, pattern, package_names, class_names, manifest_results)
if library:
detected_libraries.append(library)
self.logger.debug(f"Detected {lib_name} via heuristic analysis")
except Exception as e:
error_msg = f"Error in heuristic detection: {str(e)}"
self.logger.error(error_msg)
errors.append(error_msg)
return detected_libraries
def _perform_similarity_detection(
self, context: AnalysisContext, errors: list[str], existing_libraries: list[DetectedLibrary]
) -> list[DetectedLibrary]:
"""Stage 2: Similarity-based detection using LibScan-inspired approach."""
detected_libraries = []
try:
if not context.androguard_obj:
self.logger.warning("Androguard object not available for similarity detection")
return detected_libraries
# Get DEX object for class analysis
dex_objects = context.androguard_obj.get_androguard_dex()
if not dex_objects:
self.logger.warning("No DEX objects available for similarity analysis")
return detected_libraries
self.logger.debug("Building class dependency graph and extracting signatures...")
# Extract comprehensive class features using signature extractor
_ = self.signature_extractor.build_class_dependency_graph(dex_objects)
_ = self.signature_extractor.extract_method_opcode_patterns(dex_objects)
_ = self.signature_extractor.extract_call_chain_patterns(dex_objects)
# Perform LibScan-style similarity matching using signature matcher
class_signatures = self.signature_extractor.extract_class_signatures(dex_objects)
similarity_libraries = self.signature_matcher.match_class_signatures(class_signatures, existing_libraries)
detected_libraries.extend(similarity_libraries)
self.logger.debug(f"Similarity detection found {len(similarity_libraries)} additional libraries")
except Exception as e:
error_msg = f"Error in similarity detection: {str(e)}"
self.logger.error(error_msg)
errors.append(error_msg)
return detected_libraries
def _extract_package_names(self, strings: list[str]) -> set[str]:
"""Extract package names from string data."""
package_names = set()
# Pattern for Java package names (at least 2 segments with dots)
package_pattern = re.compile(r"^[a-z][a-z0-9_]*(?:\.[a-z][a-z0-9_]*)+$")
for string in strings:
if isinstance(string, str) and package_pattern.match(string):
# Exclude very common Android packages to reduce noise
if not string.startswith(("android.", "java.", "javax.", "org.w3c.", "org.xml.")):
package_names.add(string)
return package_names
def _extract_class_names(self, strings: list[str]) -> set[str]:
"""Extract class names from string data."""
class_names = set()
# Pattern for class names (CamelCase, possibly with package prefix)
class_pattern = re.compile(r"(?:^|\.)[A-Z][a-zA-Z0-9]*(?:\$[A-Z][a-zA-Z0-9]*)*$")
for string in strings:
if isinstance(string, str) and class_pattern.search(string):
# Extract just the class name part
parts = string.split(".")
for part in parts:
if re.match(r"^[A-Z][a-zA-Z0-9]*", part):
class_names.add(part.split("$")[0]) # Remove inner class suffix
return class_names
def _check_library_pattern(
self,
lib_name: str,
pattern: dict[str, Any],
package_names: set[str],
class_names: set[str],
manifest_results: Any,
) -> Optional[DetectedLibrary]:
"""Check if a library pattern matches the detected packages and classes."""
# This method contains the original pattern matching logic
# Keeping it here for backward compatibility with existing detection logic
matches = []
confidence = 0.0
# Check package matches
required_packages = pattern.get("packages", [])
package_matches = 0
for package in required_packages:
for detected_package in package_names:
if package in detected_package or detected_package.startswith(package):
matches.append(f"Package: {package}")
package_matches += 1
break
# Check class matches
required_classes = pattern.get("classes", [])
class_matches = 0
for class_name in required_classes:
if class_name in class_names:
matches.append(f"Class: {class_name}")
class_matches += 1
# Check permission matches
if manifest_results and hasattr(manifest_results, "permissions"):
required_permissions = pattern.get("permissions", [])
permission_matches = 0
for permission in required_permissions:
if permission in manifest_results.permissions:
matches.append(f"Permission: {permission}")
permission_matches += 1
# Calculate confidence based on matches
total_criteria = len(required_packages) + len(required_classes) + len(pattern.get("permissions", []))
if total_criteria > 0:
confidence = len(matches) / total_criteria
# Require minimum confidence threshold
if confidence >= self.confidence_threshold:
return DetectedLibrary(
name=lib_name,
detection_method=LibraryDetectionMethod.HEURISTIC,
category=pattern.get("category", LibraryCategory.UNKNOWN),
confidence=confidence,
evidence=matches,
)
return None
def _detect_native_libraries(self, context: AnalysisContext) -> list[DetectedLibrary]:
"""Detect native (.so) libraries from lib/ directories."""
native_libraries = []
try:
if not context.androguard_obj:
return native_libraries
apk = context.androguard_obj.get_androguard_apk()
if not apk:
return native_libraries
# Get all files in the APK
files = apk.get_files()
lib_files = [f for f in files if f.startswith("lib/") and f.endswith(".so")]
# Group by library name and collect architectures
lib_groups = {}
for lib_file in lib_files:
parts = lib_file.split("/")
if len(parts) >= 3:
arch = parts[1] # e.g., 'arm64-v8a'
lib_name = parts[-1] # e.g., 'libffmpeg.so'
if lib_name not in lib_groups:
lib_groups[lib_name] = {"architectures": [], "paths": [], "size": 0}
lib_groups[lib_name]["architectures"].append(arch)
lib_groups[lib_name]["paths"].append(lib_file)
# Try to get file size
try:
lib_data = apk.get_file(lib_file)
if lib_data:
lib_groups[lib_name]["size"] += len(lib_data)
except Exception:
pass
# Create DetectedLibrary objects for each native library
for lib_name, lib_info in lib_groups.items():
detected_library = DetectedLibrary(
name=lib_name,
detection_method=LibraryDetectionMethod.NATIVE,
category=LibraryCategory.UTILITY, # Default for native libs
confidence=1.0, # High confidence for native detection
evidence=[f"Found in {len(lib_info['paths'])} architecture(s)"],
architectures=lib_info["architectures"],
file_paths=lib_info["paths"],
size_bytes=lib_info["size"],
source=LibrarySource.NATIVE_LIBS,
)
native_libraries.append(detected_library)
except Exception as e:
self.logger.error(f"Error detecting native libraries: {str(e)}")
return native_libraries
def _detect_androidx_libraries(self, context: AnalysisContext) -> list[DetectedLibrary]:
"""Detect AndroidX libraries from package analysis."""
androidx_libraries = []
try:
# Get existing analysis results
string_results = context.get_result("string_analysis")
if not string_results:
return androidx_libraries
all_strings = getattr(string_results, "all_strings", [])
# Look for AndroidX packages
androidx_packages = set()
for string in all_strings:
if isinstance(string, str) and string.startswith("androidx."):
# Extract main AndroidX component
parts = string.split(".")
if len(parts) >= 2:
component = f"androidx.{parts[1]}"
androidx_packages.add(component)
# Map AndroidX packages to library names
androidx_mapping = {
"androidx.appcompat": "AndroidX AppCompat",
"androidx.core": "AndroidX Core",
"androidx.lifecycle": "AndroidX Lifecycle",
"androidx.room": "AndroidX Room",
"androidx.work": "AndroidX WorkManager",
"androidx.recyclerview": "AndroidX RecyclerView",
"androidx.fragment": "AndroidX Fragments",
"androidx.navigation": "AndroidX Navigation",
"androidx.databinding": "AndroidX Data Binding",
"androidx.constraintlayout": "AndroidX ConstraintLayout",
}
# Create detected libraries for each AndroidX component
for package, lib_name in androidx_mapping.items():
if package in androidx_packages:
detected_library = DetectedLibrary(
name=lib_name,
package_name=package,
detection_method=LibraryDetectionMethod.HEURISTIC,
category=LibraryCategory.ANDROIDX,
library_type=LibraryType.ANDROIDX,
confidence=0.9,
evidence=[f"Package: {package}"],
source=LibrarySource.SMALI_CLASSES,
)
androidx_libraries.append(detected_library)
except Exception as e:
self.logger.error(f"Error detecting AndroidX libraries: {str(e)}")
return androidx_libraries
def _deduplicate_libraries(self, libraries: list[DetectedLibrary]) -> list[DetectedLibrary]:
"""Remove duplicate libraries based on name and package."""
seen = {}
deduplicated = []
for library in libraries:
# Use name as primary key, package as secondary
key = (library.name, library.package_name)
if key not in seen:
seen[key] = library
deduplicated.append(library)
else:
# Keep the one with higher confidence
existing = seen[key]
if library.confidence > existing.confidence:
deduplicated.remove(existing)
deduplicated.append(library)
seen[key] = library
return deduplicated
def _validate_config(self) -> bool:
"""Validate module configuration."""
if not isinstance(self.confidence_threshold, (int, float)) or not (0 <= self.confidence_threshold <= 1):
self.logger.error("confidence_threshold must be a number between 0 and 1")
return False
if not isinstance(self.similarity_threshold, (int, float)) or not (0 <= self.similarity_threshold <= 1):
self.logger.error("similarity_threshold must be a number between 0 and 1")
return False
return True