Skip to content

Python API

friTap ships a stable, SemVer-guaranteed Python API for driving captures, consuming live events, and working with .tap files offline. The public surface is everything exported from the friTap package __all__ (59 symbols); removing or breaking any of them requires a friTap MAJOR bump (see RELEASING.md).

This page leads with the modern API:

The legacy SSL_Logger class is documented last, in a deprecated section.

CI-runnable vs. live-device examples

Examples in this page are labelled either live / device (they attach to or spawn a real process and cannot run in CI) or offline / CI-runnable (they operate on .tap files or in-memory objects and run anywhere friTap imports).


Quick start (FriTap builder)

Live / device example

FriTap(...).start() attaches to or spawns a real target through Frida. This cannot run in CI — it needs a device and the target app/process.

from friTap import FriTap

session = (
    FriTap("com.example.app")
    .mobile()                  # target a USB device
    .keylog("keys.log")        # write an SSLKEYLOGFILE
    .pcap("cap.pcapng")        # write decrypted traffic (pcapng → embedded DSB)
    .start()                   # returns a FriTapSession
)

session.wait()   # block until the target exits; or call session.stop()

With callbacks (also live / device):

from friTap import FriTap

session = (
    FriTap("com.example.app")
    .mobile("device-id")
    .pcap("capture.pcap")
    .on_keylog(lambda e: print(e.key_data))
    .on_data(lambda e: print(f"{e.src_addr}:{e.src_port} -> {e.dst_addr}:{e.dst_port}"))
    .start()
)

build_config() is the offline-safe half of the builder — it returns a FriTapConfig without touching a device, which is useful for tests and for inspecting/serializing configuration:

from friTap import FriTap

config = FriTap("com.example.app").mobile().keylog("keys.log").build_config()
print(config.target, config.output.keylog)   # offline / CI-runnable

FriTapSession

start() returns a FriTapSession handle:

Member Kind Description
event_bus property → EventBus The session's event bus; subscribe for live events.
is_running property → bool Whether the capture is still active.
stop() method Gracefully stop the capture session.
wait() method Block until the session ends (e.g. the process exits).

Builder reference

Every method below returns self, so calls chain. Names are verbatim from friTap.api.FriTap. The builder is hand-documented (chained APIs render poorly under autodoc).

Device & target

Method Description
mobile(device_id=None) Target a mobile device — USB if device_id is omitted, otherwise the given Frida device ID.
host(address) Target a remote Frida device (host:port).
spawn(enable=True) Spawn the target instead of attaching.
spawn_gating(enable=True, all_processes=False) Enable spawn gating for multi-process apps. all_processes=True catches every spawn.
child_gating(enable=True) Enable child-process gating.
timeout(seconds) Set a timeout before resuming the suspended target.

Output

Method Description
pcap(path) Write decrypted traffic to a PCAP file. The extension is authoritative — .pcap writes classic libpcap, .pcapng writes pcapng (with embedded DSB when keys are extracted).
pcapng(path) Write self-decrypting PCAPNG. Equivalent to pcap() with a .pcapng extension; the file extension still wins.
keylog(path) Write key material in Wireshark-loadable form for the active protocol (NSS SSLKEYLOGFILE for TLS). With --protocol all/auto, the path is split per protocol as <stem>.<proto><ext>.
json_output(path) Write session metadata as JSON.
verbose(enable=True) Enable verbose console output.
live(enable=True) Stream to Wireshark via a named pipe.
full_capture(enable=True) Capture the full network stream, not just decrypted payload.

Hooking

Method Description
patterns(path) Use pattern-based (symbol-less) hooking from a JSON file.
offsets(path) Use offset-based hooking from a JSON file.
experimental(enable=True) Enable experimental features.
anti_root(enable=True) Enable anti-root-detection hooks (Android).
payload_modification(enable=True) Enable payload-modification capability.
force_scan(name) Force-scan a module that Cronet-split-topology suppression would otherwise skip. Accepts a literal name, a stem prefix (name*) or a regex (re:...). May be called multiple times.

Protocol & backend

Method Description
protocol(proto) Set the target protocol: tls, ipsec, ssh, auto (and other registered values). Default tls.
backend(backend) Set the instrumentation backend: frida (default), gdb, lldb, ebpf.

Experimental backends

Only frida is fully supported. gdb/lldb/ebpf are experimental/future; FriTapConfig rejects a protocol/backend combination that is not fully supported.

Debug & misc

Method Description
debug(enable=True) Enable debug mode with Chrome Inspector (also turns on debug output).
debug_output(enable=True) Enable debug output only (no inspector).
custom_script(path) Load a custom Frida script before friTap's hooks.
add_script_plugin(plugin) Register a ScriptPlugin to load when the session starts.
environment(path) Provide an environment-variables JSON for spawn.
proxy(address) Redirect connections to a host:port proxy and bypass cert pinning (requires fritap-proxy).

Callbacks

Each registers a callback fired on the session's event bus.

Method Event delivered
on_keylog(callback) KeylogEvent
on_data(callback) DatalogEvent
on_library_detected(callback) LibraryDetectedEvent
on_session(callback) SessionEvent
on_flow(callback) FlowEvent — wires up a FlowCollector automatically.

Terminal methods

Method Description
build_config() Build a FriTapConfig from the current builder state. Offline-safe — does not touch a device.
start() Build the config, create the capture, wire up the event bus, and start. Returns a FriTapSession. Live / device.

Events & EventBus

friTap dispatches typed events through a thread-safe publish-subscribe EventBus. Output handlers, the TUI, plugins, and your own callbacks all subscribe to event classes.

friTap.events.EventBus

Simple thread-safe publish-subscribe event bus.

Usage

bus = EventBus() bus.subscribe(KeylogEvent, my_handler) bus.emit(KeylogEvent(key_data="CLIENT_RANDOM ..."))

Subscribers can specify a priority (higher runs first). Plugins should use EventBus.PLUGIN_PRIORITY so they execute before the built-in output handlers.

Source code in friTap/events.py
class EventBus:
    """
    Simple thread-safe publish-subscribe event bus.

    Usage:
        bus = EventBus()
        bus.subscribe(KeylogEvent, my_handler)
        bus.emit(KeylogEvent(key_data="CLIENT_RANDOM ..."))

    Subscribers can specify a *priority* (higher runs first).  Plugins
    should use ``EventBus.PLUGIN_PRIORITY`` so they execute before the
    built-in output handlers.
    """

    MAX_HANDLER_FAILURES = 10
    PLUGIN_PRIORITY = 100

    def __init__(self, on_handler_error: Optional[Callable] = None) -> None:
        self._subscribers: Dict[Type[FriTapEvent], List[Tuple[int, Callable]]] = {}
        self._lock = threading.RLock()
        self._logger = logging.getLogger("friTap.events")
        self._failure_counts: Dict[str, int] = {}
        self._on_handler_error = on_handler_error

    def subscribe(self, event_type: Type[FriTapEvent], callback: Callable, *, priority: int = 0) -> None:
        """Register *callback* to be called whenever *event_type* is emitted.

        Higher *priority* values run first.  The default is 0; plugins
        should use ``EventBus.PLUGIN_PRIORITY`` (100).
        """
        with self._lock:
            subs = self._subscribers.setdefault(event_type, [])
            subs.append((priority, callback))
            subs.sort(key=lambda t: t[0], reverse=True)

    def unsubscribe(self, event_type: Type[FriTapEvent], callback: Callable) -> None:
        """Remove a previously registered callback."""
        with self._lock:
            subs = self._subscribers.get(event_type, [])
            for i, (_, cb) in enumerate(subs):
                if cb == callback:
                    subs.pop(i)
                    break

    def emit(self, event: FriTapEvent) -> None:
        """
        Dispatch *event* to all subscribers registered for its type.

        Also dispatches to subscribers of the base ``FriTapEvent`` type,
        allowing catch-all handlers.  The merged list is sorted by
        descending priority so higher-priority subscribers run first.
        """
        with self._lock:
            specific = self._subscribers.get(type(event), [])
            if type(event) is not FriTapEvent:
                catch_all = self._subscribers.get(FriTapEvent, [])
                if not catch_all:
                    # Fast path: no catch-all subscribers
                    handlers = list(specific)
                else:
                    # Merge two pre-sorted (descending priority) lists
                    handlers = []
                    i, j = 0, 0
                    while i < len(specific) and j < len(catch_all):
                        if specific[i][0] >= catch_all[j][0]:
                            handlers.append(specific[i])
                            i += 1
                        else:
                            handlers.append(catch_all[j])
                            j += 1
                    handlers.extend(specific[i:])
                    handlers.extend(catch_all[j:])
            else:
                handlers = list(specific)

        for _prio, cb in handlers:
            try:
                cb(event)
            except Exception as exc:
                handler_name = getattr(cb, "__qualname__", repr(cb))
                self._logger.exception(
                    "Error in event subscriber %s for %s",
                    handler_name,
                    type(event).__name__,
                )

                # Track failure count
                count = self._failure_counts.get(handler_name, 0) + 1
                self._failure_counts[handler_name] = count

                # Invoke optional error callback
                if self._on_handler_error is not None:
                    try:
                        self._on_handler_error(cb, event, exc)
                    except Exception:
                        pass

                # Emit ErrorEvent for handler failures (guard against recursion)
                if not isinstance(event, ErrorEvent):
                    try:
                        self.emit(ErrorEvent(
                            error=f"Handler {handler_name} failed",
                            description=str(exc),
                            severity=ERROR_SEVERITY_ERROR,
                        ))
                    except Exception:
                        pass

                # Auto-unsubscribe after too many failures
                if count >= self.MAX_HANDLER_FAILURES:
                    self._logger.warning(
                        "Auto-unsubscribing handler %s after %d failures",
                        handler_name, count,
                    )
                    self._remove_handler(cb)

    def _remove_handler(self, cb: Callable) -> None:
        """Remove *cb* from all event-type subscriber lists."""
        with self._lock:
            for subs in self._subscribers.values():
                for i, (_p, c) in enumerate(subs):
                    if c is cb:
                        subs.pop(i)
                        break

    @property
    def handler_failures(self) -> Dict[str, int]:
        """Return a copy of the handler failure counts."""
        return dict(self._failure_counts)

    def clear(self) -> None:
        """Remove all subscribers."""
        with self._lock:
            self._subscribers.clear()

subscribe(event_type, callback, *, priority=0)

Register callback to be called whenever event_type is emitted.

Higher priority values run first. The default is 0; plugins should use EventBus.PLUGIN_PRIORITY (100).

Source code in friTap/events.py
def subscribe(self, event_type: Type[FriTapEvent], callback: Callable, *, priority: int = 0) -> None:
    """Register *callback* to be called whenever *event_type* is emitted.

    Higher *priority* values run first.  The default is 0; plugins
    should use ``EventBus.PLUGIN_PRIORITY`` (100).
    """
    with self._lock:
        subs = self._subscribers.setdefault(event_type, [])
        subs.append((priority, callback))
        subs.sort(key=lambda t: t[0], reverse=True)

unsubscribe(event_type, callback)

Remove a previously registered callback.

Source code in friTap/events.py
def unsubscribe(self, event_type: Type[FriTapEvent], callback: Callable) -> None:
    """Remove a previously registered callback."""
    with self._lock:
        subs = self._subscribers.get(event_type, [])
        for i, (_, cb) in enumerate(subs):
            if cb == callback:
                subs.pop(i)
                break

emit(event)

Dispatch event to all subscribers registered for its type.

Also dispatches to subscribers of the base FriTapEvent type, allowing catch-all handlers. The merged list is sorted by descending priority so higher-priority subscribers run first.

Source code in friTap/events.py
def emit(self, event: FriTapEvent) -> None:
    """
    Dispatch *event* to all subscribers registered for its type.

    Also dispatches to subscribers of the base ``FriTapEvent`` type,
    allowing catch-all handlers.  The merged list is sorted by
    descending priority so higher-priority subscribers run first.
    """
    with self._lock:
        specific = self._subscribers.get(type(event), [])
        if type(event) is not FriTapEvent:
            catch_all = self._subscribers.get(FriTapEvent, [])
            if not catch_all:
                # Fast path: no catch-all subscribers
                handlers = list(specific)
            else:
                # Merge two pre-sorted (descending priority) lists
                handlers = []
                i, j = 0, 0
                while i < len(specific) and j < len(catch_all):
                    if specific[i][0] >= catch_all[j][0]:
                        handlers.append(specific[i])
                        i += 1
                    else:
                        handlers.append(catch_all[j])
                        j += 1
                handlers.extend(specific[i:])
                handlers.extend(catch_all[j:])
        else:
            handlers = list(specific)

    for _prio, cb in handlers:
        try:
            cb(event)
        except Exception as exc:
            handler_name = getattr(cb, "__qualname__", repr(cb))
            self._logger.exception(
                "Error in event subscriber %s for %s",
                handler_name,
                type(event).__name__,
            )

            # Track failure count
            count = self._failure_counts.get(handler_name, 0) + 1
            self._failure_counts[handler_name] = count

            # Invoke optional error callback
            if self._on_handler_error is not None:
                try:
                    self._on_handler_error(cb, event, exc)
                except Exception:
                    pass

            # Emit ErrorEvent for handler failures (guard against recursion)
            if not isinstance(event, ErrorEvent):
                try:
                    self.emit(ErrorEvent(
                        error=f"Handler {handler_name} failed",
                        description=str(exc),
                        severity=ERROR_SEVERITY_ERROR,
                    ))
                except Exception:
                    pass

            # Auto-unsubscribe after too many failures
            if count >= self.MAX_HANDLER_FAILURES:
                self._logger.warning(
                    "Auto-unsubscribing handler %s after %d failures",
                    handler_name, count,
                )
                self._remove_handler(cb)

clear()

Remove all subscribers.

Source code in friTap/events.py
def clear(self) -> None:
    """Remove all subscribers."""
    with self._lock:
        self._subscribers.clear()

The bus also exposes the class constant PLUGIN_PRIORITY = 100; plugins should pass priority=EventBus.PLUGIN_PRIORITY to subscribe() so they run before the built-in output handlers.

A standalone bus can be created, subscribed to, and emitted into without any device — useful for testing handlers:

from friTap import EventBus, KeylogEvent

bus = EventBus()
bus.subscribe(KeylogEvent, lambda e: print("key:", e.key_data))
bus.emit(KeylogEvent(key_data="CLIENT_RANDOM abc def"))   # offline / CI-runnable
# -> key: CLIENT_RANDOM abc def

Event base class

All events subclass FriTapEvent, which provides:

Field / member Type Description
timestamp float Creation time (time.time()).
protocol str Protocol context (default "tls").
cancel() method Advisory cancellation (DOM preventDefault style).
cancelled property → bool Whether cancel() was called.

KeylogEvent

Emitted when key material is extracted (on_keylog).

Field Type Description
key_data str Pre-formatted keylog line (e.g. CLIENT_RANDOM <hex> <hex>).
payload dict \| None Structured payload for protocols that need it (e.g. SSH KEX shared secret).

DatalogEvent

Emitted when decrypted application data is captured (on_data).

Field Type Description
data bytes The decrypted payload.
function str The hooked function that produced it.
direction str "read" or "write".
src_addr / src_port str / int Source endpoint.
dst_addr / dst_port str / int Destination endpoint.
ss_family str Socket family (e.g. "AF_INET").
ssl_session_id str TLS session identifier.
client_random str TLS client random.
transport str "tcp" or "udp" (QUIC is udp).
http3_headers list \| None Decoded HTTP/3 headers [[name, value], ...] (app-api QUIC mode).
stream_id int \| None QUIC stream id.
quic_scid / quic_dcid str QUIC source / destination connection IDs.
quic_stream_type str QUIC stream type.

LibraryDetectedEvent

Emitted when a TLS/SSL library is detected (on_library_detected).

Field Type Description
library str Logical library name (e.g. "openssl").
module str The loaded module/file name.
path str Full module path in the target.

SessionEvent

Emitted on TLS/QUIC session lifecycle changes (on_session).

Field Type Description
session_id str Session identifier.
event_type str One of SESSION_STARTED/SESSION_RESUMED/SESSION_ENDED/SESSION_DESTROYED (the string constants "started", "resumed", "ended", "destroyed").
cipher_suite str Negotiated cipher suite.
cipher property → str Read-only alias for cipher_suite.
protocol_version str TLS version (e.g. "TLS 1.3").
server_name str SNI server name.
alpn str Negotiated ALPN ("h2", "http/1.1", …).
quic_version str QUIC transport version (QUIC only).
client_random, connection_id, src_addr, src_port, dst_addr, dst_port Additional session/endpoint metadata.

FlowEvent

Emitted when a flow is created, updated, or completed (on_flow).

Field Type Description
flow Flow The flow object.
flow_event_type str "created", "updated", or "completed".

Other events

These are part of the public surface and are emitted by the agent/output pipeline; subscribe to them the same way:

  • ConsoleEvent (message, level) — console log lines from the agent.
  • ErrorEvent (error, description, stack, file, line, severity) — agent/hooking errors; severity is "info"/"warning"/"error"/"fatal".
  • SocketTraceEvent (src_addr, src_port, dst_addr, dst_port, ss_family) — traced socket info.
  • DetachEvent (reason) — emitted when the target process detaches.

Configuration dataclasses

FriTap.build_config() returns a FriTapConfig. You can also construct it directly and pass it to the lower-level CoreController / SSL_Logger.

friTap.config.FriTapConfig dataclass

Top-level configuration for a friTap session.

Usage

config = FriTapConfig( target="com.example.app", device=DeviceConfig(mobile=True, spawn=True), output=OutputConfig(pcap="capture.pcap", keylog="keys.log"), )

Source code in friTap/config.py
@dataclass
class FriTapConfig:
    """
    Top-level configuration for a friTap session.

    Usage:
        config = FriTapConfig(
            target="com.example.app",
            device=DeviceConfig(mobile=True, spawn=True),
            output=OutputConfig(pcap="capture.pcap", keylog="keys.log"),
        )
    """
    target: str
    device: DeviceConfig = field(default_factory=DeviceConfig)
    output: OutputConfig = field(default_factory=OutputConfig)
    hooking: HookingConfig = field(default_factory=HookingConfig)
    protocol: str = "tls"
    backend: str = BackendName.FRIDA
    debug: bool = False
    debug_output: bool = False
    custom_hook_script: Optional[str] = None
    environment_file: Optional[str] = None
    install_lsass_hook: bool = True
    proxy: Optional[str] = None  # "host:port" or "[ipv6]:port" format

    def __post_init__(self):
        if self.debug:
            self.debug_output = True

    def validate_protocol_backend(self, protocol_handler=None) -> None:
        """Validate that the selected backend supports the configured protocol.

        Parameters
        ----------
        protocol_handler
            A ProtocolHandler instance. If None, validation is skipped.

        Raises
        ------
        UnsupportedProtocolBackendError
            When the backend support level is STUB or UNSUPPORTED.
        """
        if self.protocol == "auto":
            return  # auto always starts with Frida default
        if protocol_handler is None:
            return
        from .protocols.base import BackendSupport
        level = protocol_handler.get_backend_support_level(self.backend)
        if level != BackendSupport.FULL:
            supported = [
                name for name, lvl in protocol_handler.supported_backends.items()
                if lvl == BackendSupport.FULL
            ]
            raise UnsupportedProtocolBackendError(
                f"Protocol '{self.protocol}' does not fully support the "
                f"'{self.backend}' backend (level: {level}). "
                f"Supported backends: {', '.join(supported)}"
            )

    @classmethod
    def from_legacy_params(
        cls,
        app: str,
        pcap_name: Optional[str] = None,
        verbose: bool = False,
        spawn: bool = False,
        keylog: bool | str = False,
        enable_spawn_gating: bool = False,
        spawn_gating_all: bool = False,
        enable_child_gating: bool = False,
        mobile: bool | str = False,
        live: bool = False,
        environment_file: Optional[str] = None,
        debug_mode: bool = False,
        full_capture: bool = False,
        socket_trace: bool | str = False,
        host: bool | str = False,
        offsets: Optional[str] = None,
        debug_output: bool = False,
        experimental: bool = False,
        anti_root: bool = False,
        payload_modification: bool = False,
        library_scan: bool = False,
        enable_default_fd: bool = False,
        patterns: Optional[str] = None,
        custom_hook_script: Optional[str] = None,
        json_output: Optional[str] = None,
        install_lsass_hook: bool = True,
        timeout: Optional[int] = None,
        backend: str = BackendName.FRIDA,
        protocol: str = "tls",
        proxy: Optional[str] = None,
        filter_expression: Optional[str] = None,
        filter_infrastructure: bool = True,
        include_loopback: bool = False,
        force_scan_modules: Optional[List[str]] = None,
        quic_capture_mode: str = "stream",
        quic_only: bool = False,
        no_loader_hook: bool = False,
        stealth_loader: bool = False,
        pairip_safe: bool = False,
        quic_egress_headers_layer: str = "auto",
        scan_keys_region: Optional[str] = None,
        scan: Optional[str] = None,
        scan_report: str = "table",
        scan_report_out: Optional[str] = None,
        scan_min_severity: str = "info",
        scan_min_confidence: float = 0.0,
        scan_source: Optional[str] = None,
        scan_category: Optional[str] = None,
        scan_show_pii: bool = False,
        scan_analyzer_path: Optional[List[str]] = None,
    ) -> "FriTapConfig":
        """
        Build a FriTapConfig from the legacy SSL_Logger constructor parameters.
        Ensures full backward compatibility.
        """
        return cls(
            target=app,
            device=DeviceConfig(
                mobile=mobile,
                host=host if host else None,
                spawn=spawn,
                enable_spawn_gating=enable_spawn_gating,
                spawn_gating_all=spawn_gating_all,
                enable_child_gating=enable_child_gating,
                timeout=timeout,
            ),
            output=OutputConfig(
                pcap=pcap_name,
                keylog=keylog if isinstance(keylog, str) else (keylog or None),
                json_output=json_output,
                live=live,
                verbose=verbose,
                full_capture=full_capture,
                socket_trace=socket_trace,
                filter_expression=filter_expression,
                filter_infrastructure=filter_infrastructure,
                include_loopback=include_loopback,
                scan=scan,
                scan_report=scan_report,
                scan_report_out=scan_report_out,
                scan_min_severity=scan_min_severity,
                scan_min_confidence=scan_min_confidence,
                scan_source=scan_source,
                scan_category=scan_category,
                scan_show_pii=scan_show_pii,
                scan_analyzer_path=scan_analyzer_path,
            ),
            hooking=HookingConfig(
                offsets=offsets,
                patterns=patterns,
                experimental=experimental,
                enable_default_fd=enable_default_fd,
                anti_root=anti_root,
                payload_modification=payload_modification,
                library_scan=library_scan,
                force_scan_modules=list(force_scan_modules or []),
                quic_capture_mode=quic_capture_mode,
                quic_only=quic_only,
                no_loader_hook=no_loader_hook,
                stealth_loader=stealth_loader,
                pairip_safe=pairip_safe,
                quic_egress_headers_layer=quic_egress_headers_layer,
                scan_keys_region=scan_keys_region,
            ),
            protocol=protocol,
            backend=backend,
            debug=debug_mode,
            debug_output=debug_output,
            custom_hook_script=custom_hook_script,
            environment_file=environment_file,
            install_lsass_hook=install_lsass_hook,
            proxy=proxy,
        )

friTap.config.DeviceConfig dataclass

Configuration for target device connection.

Source code in friTap/config.py
@dataclass
class DeviceConfig:
    """Configuration for target device connection."""
    device_id: Optional[str] = None  # Frida device ID (from TUI enumeration)
    mobile: bool | str = False
    host: Optional[str] = None
    spawn: bool = False
    enable_spawn_gating: bool = False
    spawn_gating_all: bool = False
    enable_child_gating: bool = False
    timeout: Optional[int] = None

friTap.config.OutputConfig dataclass

Configuration for output destinations and formats.

Source code in friTap/config.py
@dataclass
class OutputConfig:
    """Configuration for output destinations and formats."""
    pcap: Optional[str] = None
    keylog: Optional[str] = None
    json_output: Optional[str] = None
    output_format: str = "auto"
    live: bool = False
    live_mode: str = ""  # "", "wireshark", "live_pcapng"
    verbose: bool = False
    full_capture: bool = False
    socket_trace: bool | str = False
    filter_expression: Optional[str] = None  # Wireshark-like display filter
    # Drop frida/adb infrastructure traffic (ports 5037/5555/27042/27043) by default
    filter_infrastructure: bool = True
    # Include loopback/localhost traffic (e.g. Firefox NSS IPC) — off by default
    include_loopback: bool = False
    # Live passive-analysis ("scan") of observed traffic during capture.
    # ``scan`` is an analyzer spec (None disables; "all" / comma-list selects).
    scan: Optional[str] = None
    scan_report: str = "table"
    scan_report_out: Optional[str] = None
    scan_min_severity: str = "info"
    scan_min_confidence: float = 0.0
    scan_source: Optional[str] = None
    scan_category: Optional[str] = None
    scan_show_pii: bool = False
    # External analyzer references ("module" or "module:Class") to load for the
    # live scan, mirroring offline ``analyze --analyzer-path``. Repeatable.
    scan_analyzer_path: Optional[List[str]] = None

friTap.config.HookingConfig dataclass

Configuration for hooking strategies.

Source code in friTap/config.py
@dataclass
class HookingConfig:
    """Configuration for hooking strategies."""
    offsets: Optional[str] = None
    patterns: Optional[str] = None
    experimental: bool = False
    enable_default_fd: bool = False
    anti_root: bool = False
    payload_modification: bool = False
    library_scan: bool = False
    # QUIC plaintext capture boundary: "stream" (default lower-boundary
    # stream-level Readv hooks) or "app-api" (Boundary-4 decoded HTTP/3
    # headers; Chrome/Android Google QUICHE only).
    quic_capture_mode: str = "stream"
    # When True, the agent installs ONLY the Google QUICHE hooks and skips every
    # TLS-library hook (BoringSSL, Conscrypt, NSS, …), the Java hooks, OHTTP,
    # and the keylog scan-result hooks. Useful when the user only wants HTTP/3
    # capture: attach is dramatically lighter (no multi-megabyte Memory.scanSync
    # passes, no Java VM safepoint sync), which also helps fritap attach to a
    # target that is already in the middle of active QUIC traffic.
    quic_only: bool = False
    # When True, the agent skips the inline android_dlopen_ext loader hook. This
    # is the hook PairIP / anti-tamper runtimes detect and SIGSEGV on during a
    # spawn-time integrity scan (fkie-cad/friTap#64). Only already-loaded /
    # explicitly-selected TLS libraries are then hooked. The agent also auto-
    # skips it in spawn mode when an anti-tamper library is detected.
    no_loader_hook: bool = False
    # EXPERIMENTAL (Android). Watch android_dlopen_ext via a hardware breakpoint
    # (ARM64 debug registers, no linker code patch) instead of the inline
    # trampoline, so late-loaded TLS libs can be hooked on PairIP-protected apps
    # without tripping the anti-tamper scan. Unvalidated on-device; default OFF.
    stealth_loader: bool = False
    # --pairip-safe (Android; attach and spawn). Minimal, scan-free capture mode
    # for PairIP-protected apps: hook only a curated TLS-library allowlist
    # (libssl.so, libhttpengine.so, libjavacrypto.so, libconscrypt*,
    # libcommerce_http_client.so, offset-based libwebviewchromium.so; libunity.so
    # is opt-in via --offsets), resolved WITHOUT any Memory.scan (exports ->
    # symbols -> offsets). Skips the loader hook, WebView/Cronet pattern scan,
    # Java hooks, OHTTP and the library-scan pass — the broad footprint that trips
    # PairIP's periodic integrity check (an in-process SIGSEGV). Keys persist via
    # "blink" (hooks toggled so .text stays pristine between scans).
    # (fkie-cad/friTap#64). Default OFF.
    pairip_safe: bool = False
    # Override which layer of the HTTP/3 egress-headers fallback chain the
    # agent actually attaches to. "auto" (default) keeps the winner-takes-all
    # logic: quiche-internal QuicSpdyStream::WriteHeaders preferred, then
    # net::QuicChromiumClientStream::WriteHeaders, then
    # quic::QuicSpdySession::WriteHeadersOnHeadersStream as a last-resort gQUIC
    # fallback. Set to "chrome-shim" or "session-level" to FORCE a fallback
    # layer for testing — useful for validating chain behavior on builds where
    # the quiche-internal layer still resolves. Only effective in app-api mode.
    quic_egress_headers_layer: str = "auto"
    # Generic memory-region key-scan target (--scan-keys-region). None disables
    # the scan. Passed through to the agent via config_batch.extensions.scan_region;
    # protocol-agnostic (the public scan engine and any private scan binding both
    # read it). See agent/shared/scan/.
    scan_keys_region: Optional[str] = None
    encapsulated_protocols: Dict[str, bool] = field(
        default_factory=lambda: {"ohttp": True}
    )
    # Module names that should bypass the Cronet-split-topology suppression
    # check, even when friTap would otherwise treat them as covered by a
    # sibling library. Accepts literal names, prefixes (a value ending in '*'
    # is treated as a stem prefix), or regexes prefixed with "re:".
    force_scan_modules: List[str] = field(default_factory=list)

    @property
    def ohttp_enabled(self) -> bool:
        return self.encapsulated_protocols.get("ohttp", True)

    def __post_init__(self) -> None:
        env_value = os.environ.get("FRITAP_FORCE_SCAN")
        if env_value:
            extra = [item.strip() for item in env_value.split(",") if item.strip()]
            for item in extra:
                if item not in self.force_scan_modules:
                    self.force_scan_modules.append(item)

HookingConfig exposes the encapsulated-protocol toggle ohttp_enabled (OHTTP is on by default within --protocol tls) and the QUIC knobs quic_capture_mode, quic_only, and quic_egress_headers_layer.

Migrating from legacy parameters

FriTapConfig.from_legacy_params(...) bridges the old flat SSL_Logger keyword arguments (app, pcap_name, keylog, mobile, patterns, …) into the structured dataclasses, so existing code can move incrementally:

from friTap import FriTapConfig

config = FriTapConfig.from_legacy_params(
    app="com.example.app",
    pcap_name="capture.pcap",
    keylog="keys.log",
    mobile=True,
)
print(config.target, config.device.mobile, config.output.pcap)   # offline / CI-runnable

Working with flows

A Flow is friTap's reconstructed picture of one connection: endpoints, timing, parsed request/response, a protocol layer stack, and any attached findings. Flows are produced live (via on_flow) and read back from .tap files (see Reading .tap).

friTap.flow.models.Flow dataclass

Source code in friTap/flow/models.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
@dataclass
class Flow:
    # Identity
    flow_id: str = ""
    connection_id: str = ""
    src_addr: str = ""
    src_port: int = 0
    dst_addr: str = ""
    dst_port: int = 0
    ssl_session_id: str = ""

    # Transport/encryption protocol of the underlying connection: "tls"
    # (TLS-over-TCP, the default) or "quic" (QUIC-over-UDP). Stamped from
    # ``event.protocol`` at flow creation (FlowCollector._stamp_metadata). The
    # Flow itself stores no other transport hint, so the LayerPipeline reads
    # this to decide whether the layer-0 transport is a TlsLayer or QuicLayer.
    transport: str = "tls"

    # State
    state: FlowState = FlowState.ACTIVE
    started: float = field(default_factory=time.time)
    ended: float = 0.0

    # Parsed (from parsers module)
    request: Optional[ParseResult] = None
    response: Optional[ParseResult] = None

    # OHTTP inner payloads (decrypted bhttp, from NSS HPKE hooks)
    ohttp_inner_request: Optional[ParseResult] = None
    ohttp_inner_response: Optional[ParseResult] = None

    # Trailing data (unconsumed bytes after valid WebSocket/protocol frames)
    trailing_bytes: Optional[bytes] = None
    trailing_protocol: str = ""
    trailing_parse: Optional[ParseResult] = None

    # Raw
    chunks: list[FlowChunk] = field(default_factory=list)

    # Cached byte total (incremented by FlowCollector when appending chunks)
    _total_bytes: int = 0

    # Protocol detected by the parser registry, populated even when no
    # ``request``/``response`` is produced yet (e.g. HTTP/2 control-frame-only
    # flows). Used as a display fallback before showing "unknown".
    detected_protocol: str = ""

    # ------------------------------------------------------------------
    # Schema v2 additive enrichment fields (all optional, back-compat safe)
    # ------------------------------------------------------------------

    # Explicit endpoint-role labeling (v2-reserved). ``src_*``/``dst_*`` remain
    # the canonical transport endpoints (used across display/collector/filters);
    # these name the same two ends by *role* — local (instrumented process) and
    # remote (peer). They are stored fields (not properties) because both the
    # FLOW decode path (tap_format.decode_flow) and the collector assign them,
    # and they are persisted as their own keys in the .tap meta. They are
    # populated from src/dst in exactly ONE place — FlowCollector._enrich_flow —
    # which is the single source of truth for the local==src / remote==dst
    # mapping; everything else only reads them. Kept as a v2 hook so a future
    # capture source that distinguishes role from src/dst order (e.g. inbound
    # server flows) can set them without changing readers.
    local_addr: str = ""
    local_port: int = 0
    remote_addr: str = ""
    remote_port: int = 0

    # Process / package identity (from the capture target / agent).
    process_name: str = ""
    package_name: str = ""

    # Ordered protocol layer stack (outermost transport -> innermost), e.g.
    # [tls, http2] or [quic, http3]. Per-flow LINEAR (HTTP/2-3 streams are
    # already separate flows). Resolved by name via __getattr__: ``flow.tls``,
    # ``flow.quic``, ``flow.ssh`` each return their typed layer (lazily created
    # empty if absent, so e.g. ``flow.tls.sni`` is never None — matching the
    # old TlsMetadata value-object ergonomics). Whole-layer writes go through
    # add_layer/set_layer; field mutation (``flow.tls.sni = ...``) works on the
    # cached instance returned by __getattr__.
    layers: list[ProtocolLayer] = field(default_factory=list, repr=False)

    # Hook origin. ``hook_function`` mirrors the dominant chunk.function;
    # ``hook_stack`` is reserved for full backtraces (NICE-later).
    hook_function: str = ""
    hook_stack: str = ""

    # Mutable analyst annotations.
    tags: list[str] = field(default_factory=list)
    notes: str = ""
    # Attached analysis findings. Typed loosely as ``list`` to avoid importing
    # ``friTap.analysis`` here (it imports ``flow.models`` under TYPE_CHECKING);
    # elements are ``friTap.analysis.Finding`` instances.
    findings: list = field(default_factory=list)

    # ------------------------------------------------------------------
    # Protocol layer access (flow.<protocol> -> typed ProtocolLayer)
    # ------------------------------------------------------------------

    def __getattr__(self, name: str):
        # Only invoked when normal attribute lookup fails — real dataclass
        # fields/methods never reach here. Resolve a registered protocol name
        # to its layer (lazily creating an empty one so flow.tls/flow.quic are
        # never-None, matching the old TlsMetadata ergonomics). Anything else
        # raises AttributeError so getattr(flow, x, default) (filter engine,
        # copy/pickle dunder probing) keeps working.
        #
        # This materializes-and-attaches by design: it is the writable
        # never-None ergonomic (``flow.tls.version = ...`` persists). PURE-READ
        # probes that must NOT grow the stack (serializers, snapshots) must use
        # the non-mutating :meth:`layer` instead of attribute access.
        if name.startswith("_"):
            raise AttributeError(name)
        layers = self.__dict__.get("layers")
        if layers is None:
            # Half-constructed (e.g. during copy/unpickle before fields set).
            raise AttributeError(name)
        desc = get_registry().get(name)
        if desc is None:
            raise AttributeError(name)
        for ly in layers:
            if ly.name == name:
                return ly
        return self._create_layer(name, desc)

    def _create_layer(self, name: str, desc) -> ProtocolLayer:
        layer = desc.layer_cls()
        # Stamp the per-instance name so generic layers (AppLayer, registered
        # under several names against one class) report the right ``name``.
        # Harmless for typed layers (their ``NAME`` already equals ``name``).
        layer._name = name
        layer.depth = len(self.layers)
        layer._flow = self
        if desc.data_source == "chunks":
            layer.data = LayerData(data_source="chunks", _owner=self)
        self.layers.append(layer)
        return layer

    def layer(self, name: str) -> Optional[ProtocolLayer]:
        """Return the layer named *name*, or None if not present."""
        for ly in self.layers:
            if ly.name == name:
                return ly
        return None

    def add_layer(self, layer: ProtocolLayer) -> ProtocolLayer:
        """Append *layer* to the stack, linking parent/child + binding data."""
        layer.depth = len(self.layers)
        if self.layers:
            parent = self.layers[-1]
            parent.child = layer
            layer.parent = parent
        layer._flow = self
        if layer.data.data_source == "chunks" and layer.data._owner is None:
            layer.data._owner = self
        self.layers.append(layer)
        return layer

    def set_layer(self, layer: ProtocolLayer) -> ProtocolLayer:
        """Replace an existing same-name layer in place, else append it."""
        for i, existing in enumerate(self.layers):
            if existing.name == layer.name:
                layer.depth = existing.depth
                layer.parent, layer.child = existing.parent, existing.child
                layer._flow = self
                if layer.data.data_source == "chunks" and layer.data._owner is None:
                    layer.data._owner = self
                self.layers[i] = layer
                return layer
        return self.add_layer(layer)

    def __copy__(self) -> "Flow":
        # Shallow copy that re-lists the mutable containers (chunks, layers) so
        # appending to a live flow does not mutate a snapshot's lists. Element
        # objects are shared, matching the pre-existing snapshot semantics
        # (the collector already shared the TlsMetadata/FlowChunk objects).
        new = Flow.__new__(Flow)
        new.__dict__.update(self.__dict__)
        new.chunks = list(self.chunks)
        new.layers = list(self.layers)
        return new

    @property
    def has_trailing_data(self) -> bool:
        return self.trailing_bytes is not None

    @property
    def segments(self) -> list[dict]:
        """Return segment descriptors for multi-protocol rendering.

        Each segment is a dict with keys: type ('parsed'|'raw'), protocol,
        parse_result (or None), data (raw bytes for 'raw' type), source.
        """
        segs: list[dict] = []
        if self.request is not None:
            segs.append({
                "type": "parsed",
                "protocol": self.display_protocol,
                "parse_result": self.request,
                "source": "primary",
            })
        if self.trailing_parse is not None:
            segs.append({
                "type": "parsed",
                "protocol": self.trailing_protocol,
                "parse_result": self.trailing_parse,
                "source": "trailing",
            })
        elif self.trailing_bytes:
            segs.append({
                "type": "raw",
                "protocol": self.trailing_protocol or "unknown",
                "data": self.trailing_bytes,
                "source": "trailing",
            })
        return segs

    @property
    def duration(self) -> float:
        if self.ended > 0:
            return self.ended - self.started
        return time.time() - self.started

    @property
    def display_method(self) -> str:
        return _display.display_method_layered(self)

    @property
    def display_host(self) -> str:
        return _display.display_host(self.request, self.dst_addr, self.dst_port)

    @property
    def display_status(self) -> str:
        return _display.display_status(self.response)

    @property
    def display_size(self) -> str:
        total_bytes = self._total_bytes
        if total_bytes == 0 and self.chunks:
            total_bytes = sum(len(c.data) for c in self.chunks)
        return _display.display_size(self.response, total_bytes)

    @property
    def display_protocol(self) -> str:
        return _display.display_protocol(self)

    @property
    def has_request_data(self) -> bool:
        if self.request is not None:
            return True
        return any(c.direction == "write" for c in self.chunks)

    @property
    def has_response_data(self) -> bool:
        if self.response is not None:
            return True
        return any(c.direction == "read" for c in self.chunks)

    def get_direction_bytes(self, direction: str, max_bytes: int = 0) -> bytes:
        """Get concatenated raw bytes for a given direction ('read' or 'write').

        If *max_bytes* > 0, stop accumulating after that many bytes.
        """
        if max_bytes <= 0:
            return b"".join(c.data for c in self.chunks if c.direction == direction)
        parts: list[bytes] = []
        total = 0
        for c in self.chunks:
            if c.direction != direction:
                continue
            parts.append(c.data)
            total += len(c.data)
            if total >= max_bytes:
                break
        return b"".join(parts)

    @property
    def display_source(self) -> str:
        return _display.display_source(self.src_addr, self.src_port)

    @property
    def display_connection(self) -> str:
        """Directional connection string: src -> dst, src <- dst, or src <-> dst."""
        return _display.display_connection(
            self.request if self.has_request_data else None,
            self.response if self.has_response_data else None,
            self.src_addr, self.src_port, self.dst_addr, self.dst_port,
        )

    def to_dict(self, include_bodies: bool = False) -> dict:
        """Return a JSON-safe dict view of this flow for API/web consumers.

        Covers identity, transport/timing, parsed request/response (via
        :meth:`ParseResult.to_dict`), the protocol layer names, analyst
        annotations and attached findings. Raw chunk bytes are never included;
        request/response bodies are included (hex-encoded) only when
        *include_bodies* is True. Uses the non-mutating :meth:`layer` lookup so
        serializing never grows the layer stack.
        """
        tls = self.layer("tls")
        total_bytes = self._total_bytes or sum(len(c.data) for c in self.chunks)
        return {
            "flow_id": self.flow_id,
            "connection_id": self.connection_id,
            "src_addr": self.src_addr,
            "src_port": self.src_port,
            "dst_addr": self.dst_addr,
            "dst_port": self.dst_port,
            "ssl_session_id": self.ssl_session_id,
            "transport": self.transport,
            "state": self.state.value if isinstance(self.state, FlowState) else str(self.state),
            "started": self.started,
            "ended": self.ended,
            # Deterministic: avoid the wall-clock `duration` property for an
            # in-progress flow so the serialized snapshot is stable across calls.
            "duration": (self.ended - self.started) if self.ended > 0 else 0.0,
            "protocol": self.display_protocol,
            "detected_protocol": self.detected_protocol,
            "total_bytes": total_bytes,
            "process_name": self.process_name,
            "package_name": self.package_name,
            "tls_sni": tls.sni if tls is not None else "",
            "tls_alpn": tls.alpn if tls is not None else "",
            "layers": [ly.name for ly in self.layers],
            "tags": list(self.tags),
            "notes": self.notes,
            "request": self.request.to_dict(include_body=include_bodies) if self.request else None,
            "response": self.response.to_dict(include_body=include_bodies) if self.response else None,
            "findings": [f.to_dict() for f in self.findings],
        }

    # ------------------------------------------------------------------
    # On-demand body reconstruction from raw chunks
    # ------------------------------------------------------------------

    # Cache for reconstructed bodies: {direction: bytes}
    _body_cache: dict = field(default_factory=dict, repr=False)

    def reconstruct_body(self, direction: str) -> bytes:
        """Reconstruct the body for *direction* ('read' or 'write') from raw chunks.

        Bodies are no longer accumulated in parsers — this method extracts
        them on demand from ``self.chunks``.  The result is cached so repeated
        calls are cheap.
        """
        if direction in self._body_cache:
            return self._body_cache[direction]

        proto = self.display_protocol.lower()
        if "http/2" in proto:
            body = self._extract_h2_body(direction)
        elif "http/3" in proto:
            body = self._extract_h3_body(direction)
        elif "http/1" in proto:
            body = self._extract_h1_body(direction)
        elif "websocket" in proto:
            body = self._extract_ws_body(direction)
        else:
            body = b""

        self._body_cache[direction] = body
        return body

    def invalidate_body_cache(self) -> None:
        """Clear cached body data (call when chunks change)."""
        self._body_cache.clear()

    def _extract_h1_body(self, direction: str) -> bytes:
        """Extract HTTP/1 body by re-parsing raw bytes with h11."""
        try:
            import h11
        except ImportError:
            return b""
        raw = self.get_direction_bytes(direction)
        if not raw:
            return b""
        conn = h11.Connection(
            h11.CLIENT if direction == "read" else h11.SERVER
        )
        conn.receive_data(raw)
        body_parts: list[bytes] = []
        while True:
            event = conn.next_event()
            if event is h11.NEED_DATA or event is h11.PAUSED:
                break
            if isinstance(event, h11.Data):
                body_parts.append(bytes(event.data))
            elif isinstance(event, h11.EndOfMessage):
                break
        return b"".join(body_parts)

    def _extract_h2_body(self, direction: str) -> bytes:
        """Extract HTTP/2 DATA frame payloads from raw chunks.

        Stateless scan — no HPACK needed for body-only extraction.
        """
        import struct
        stream_id = 0
        msg = self.request if direction == "write" else self.response
        if msg is not None:
            stream_id = msg.stream_id

        body_parts: list[bytes] = []
        buf = bytearray()
        for chunk in self.chunks:
            if chunk.direction != direction:
                continue
            buf.extend(chunk.data)

        offset = 0
        while offset + 9 <= len(buf):
            length = (buf[offset] << 16) | (buf[offset + 1] << 8) | buf[offset + 2]
            frame_type = buf[offset + 3]
            flags = buf[offset + 4]
            sid = struct.unpack_from("!I", buf, offset + 5)[0] & 0x7FFFFFFF
            offset += 9
            if offset + length > len(buf):
                break
            if frame_type == 0x00 and (stream_id == 0 or sid == stream_id):
                payload = buf[offset:offset + length]
                # Strip padding if PADDED flag (0x08) is set
                if flags & 0x08 and len(payload) >= 1:
                    pad_len = payload[0]
                    end = len(payload) - pad_len
                    payload = payload[1:end] if end > 1 else b""
                body_parts.append(bytes(payload))
            offset += length

        return b"".join(body_parts)

    def _extract_h3_body(self, direction: str) -> bytes:
        """Extract HTTP/3 DATA frame payloads using varint framing."""
        from friTap.parsers.varint import decode_varint

        body_parts: list[bytes] = []
        buf = bytearray()
        for chunk in self.chunks:
            if chunk.direction != direction:
                continue
            buf.extend(chunk.data)

        offset = 0
        while offset < len(buf):
            try:
                frame_type, type_len = decode_varint(buf, offset)
                frame_length, len_len = decode_varint(buf, offset + type_len)
            except (ValueError, IndexError):
                break
            header_size = type_len + len_len
            payload_start = offset + header_size
            payload_end = payload_start + frame_length
            if payload_end > len(buf):
                break
            if frame_type == 0x00:  # DATA frame
                body_parts.append(bytes(buf[payload_start:payload_end]))
            offset = payload_end

        return b"".join(body_parts)

    def _extract_ws_body(self, direction: str) -> bytes:
        """Extract WebSocket payload — body is already in ParseResult for WS."""
        msg = self.request if direction == "write" else self.response
        if msg is not None and msg.body:
            return msg.body
        return b""

    # ------------------------------------------------------------------
    # Header / body / content-type access
    # ------------------------------------------------------------------

    @staticmethod
    def _get_header(msg: "Optional[ParseResult]", name: str, default: str = "") -> str:
        if msg is None:
            return default
        lower_name = name.lower()
        for key, value in msg.headers.items():
            if key.lower() == lower_name:
                return value
        return default

    def _get_decompressed_body(self, msg: "Optional[ParseResult]", direction: str, encoding_override: str = "") -> bytes:
        if msg is None:
            return b""
        body = msg.body if msg.body else self.reconstruct_body(direction)
        if not body:
            return b""
        encoding = encoding_override or msg.content_encoding
        if not encoding:
            lower = "content-encoding"
            for key, value in msg.headers.items():
                if key.lower() == lower:
                    encoding = value
                    break
        if not encoding:
            return body
        from friTap.parsers.decompress import decompress_body
        data, _err = decompress_body(body, encoding)
        return data

    def get_request_header(self, name: str, default: str = "") -> str:
        """Get a request header value by name (case-insensitive)."""
        return self._get_header(self.request, name, default)

    def get_response_header(self, name: str, default: str = "") -> str:
        """Get a response header value by name (case-insensitive)."""
        return self._get_header(self.response, name, default)

    @property
    def request_body(self) -> bytes:
        """The request body (possibly compressed), or ``b""``."""
        if self.request and self.request.body:
            return self.request.body
        return self.reconstruct_body("write")

    @property
    def response_body(self) -> bytes:
        """The response body (possibly compressed), or ``b""``."""
        if self.response and self.response.body:
            return self.response.body
        return self.reconstruct_body("read")

    def get_decompressed_request_body(self) -> bytes:
        """Return the request body after decompressing (gzip/br/zstd/deflate)."""
        return self._get_decompressed_body(self.request, "write")

    def get_decompressed_response_body(self) -> bytes:
        """Return the response body after decompressing (gzip/br/zstd/deflate)."""
        return self._get_decompressed_body(self.response, "read")

    @property
    def response_content_type(self) -> str:
        """The response Content-Type, or ``""`` if unavailable."""
        if self.response is not None and self.response.content_type:
            return self.response.content_type
        return self.get_response_header("content-type")

    @property
    def request_content_type(self) -> str:
        """The request Content-Type, or ``""`` if unavailable."""
        if self.request is not None and self.request.content_type:
            return self.request.content_type
        return self.get_request_header("content-type")

    def decode_request_protobuf(
        self, schema_path: Optional[str] = None, force: bool = False
    ) -> Optional[list]:
        """Decode the request body as protobuf.

        Returns a dict of ``{field_number: [ProtoField, ...]}`` or ``None``
        if the body is not protobuf or decoding fails.

        Args:
            schema_path: Optional path to a compiled ``.desc`` file.
            force: If ``True``, attempt decoding regardless of content type.
        """
        return self._decode_protobuf(
            self.get_decompressed_request_body(),
            self.request_content_type,
            schema_path=schema_path,
            force=force,
        )

    def decode_response_protobuf(
        self, schema_path: Optional[str] = None, force: bool = False
    ) -> Optional[list]:
        """Decode the response body as protobuf.

        Returns a dict of ``{field_number: [ProtoField, ...]}`` or ``None``
        if the body is not protobuf or decoding fails.

        Args:
            schema_path: Optional path to a compiled ``.desc`` file.
            force: If ``True``, attempt decoding regardless of content type.
        """
        return self._decode_protobuf(
            self.get_decompressed_response_body(),
            self.response_content_type,
            schema_path=schema_path,
            force=force,
        )

    def _decode_protobuf(
        self,
        body: bytes,
        content_type: str,
        schema_path: Optional[str] = None,
        force: bool = False,
    ) -> Optional[list]:
        """Internal: decode protobuf body with auto gRPC framing detection."""
        if not body:
            return None

        from friTap.parsers.protobuf import (
            decode_raw,
            extract_grpc_messages,
            is_grpc_content_type,
            is_likely_protobuf,
            is_protobuf_content_type,
        )

        # Check if we should attempt decoding
        is_proto_ct = is_grpc_content_type(content_type) or is_protobuf_content_type(content_type)
        if not force and not is_proto_ct and not is_likely_protobuf(body):
            return None

        try:
            payloads = extract_grpc_messages(body, content_type)
            results = []
            for payload in payloads:
                if not payload:
                    continue
                msg = decode_raw(payload)
                results.append(msg)
            return results if results else None
        except Exception:
            return None

request_body property

The request body (possibly compressed), or b"".

response_body property

The response body (possibly compressed), or b"".

to_dict(include_bodies=False)

Return a JSON-safe dict view of this flow for API/web consumers.

Covers identity, transport/timing, parsed request/response (via :meth:ParseResult.to_dict), the protocol layer names, analyst annotations and attached findings. Raw chunk bytes are never included; request/response bodies are included (hex-encoded) only when include_bodies is True. Uses the non-mutating :meth:layer lookup so serializing never grows the layer stack.

Source code in friTap/flow/models.py
def to_dict(self, include_bodies: bool = False) -> dict:
    """Return a JSON-safe dict view of this flow for API/web consumers.

    Covers identity, transport/timing, parsed request/response (via
    :meth:`ParseResult.to_dict`), the protocol layer names, analyst
    annotations and attached findings. Raw chunk bytes are never included;
    request/response bodies are included (hex-encoded) only when
    *include_bodies* is True. Uses the non-mutating :meth:`layer` lookup so
    serializing never grows the layer stack.
    """
    tls = self.layer("tls")
    total_bytes = self._total_bytes or sum(len(c.data) for c in self.chunks)
    return {
        "flow_id": self.flow_id,
        "connection_id": self.connection_id,
        "src_addr": self.src_addr,
        "src_port": self.src_port,
        "dst_addr": self.dst_addr,
        "dst_port": self.dst_port,
        "ssl_session_id": self.ssl_session_id,
        "transport": self.transport,
        "state": self.state.value if isinstance(self.state, FlowState) else str(self.state),
        "started": self.started,
        "ended": self.ended,
        # Deterministic: avoid the wall-clock `duration` property for an
        # in-progress flow so the serialized snapshot is stable across calls.
        "duration": (self.ended - self.started) if self.ended > 0 else 0.0,
        "protocol": self.display_protocol,
        "detected_protocol": self.detected_protocol,
        "total_bytes": total_bytes,
        "process_name": self.process_name,
        "package_name": self.package_name,
        "tls_sni": tls.sni if tls is not None else "",
        "tls_alpn": tls.alpn if tls is not None else "",
        "layers": [ly.name for ly in self.layers],
        "tags": list(self.tags),
        "notes": self.notes,
        "request": self.request.to_dict(include_body=include_bodies) if self.request else None,
        "response": self.response.to_dict(include_body=include_bodies) if self.response else None,
        "findings": [f.to_dict() for f in self.findings],
    }

layer(name)

Return the layer named name, or None if not present.

Source code in friTap/flow/models.py
def layer(self, name: str) -> Optional[ProtocolLayer]:
    """Return the layer named *name*, or None if not present."""
    for ly in self.layers:
        if ly.name == name:
            return ly
    return None

reconstruct_body(direction)

Reconstruct the body for direction ('read' or 'write') from raw chunks.

Bodies are no longer accumulated in parsers — this method extracts them on demand from self.chunks. The result is cached so repeated calls are cheap.

Source code in friTap/flow/models.py
def reconstruct_body(self, direction: str) -> bytes:
    """Reconstruct the body for *direction* ('read' or 'write') from raw chunks.

    Bodies are no longer accumulated in parsers — this method extracts
    them on demand from ``self.chunks``.  The result is cached so repeated
    calls are cheap.
    """
    if direction in self._body_cache:
        return self._body_cache[direction]

    proto = self.display_protocol.lower()
    if "http/2" in proto:
        body = self._extract_h2_body(direction)
    elif "http/3" in proto:
        body = self._extract_h3_body(direction)
    elif "http/1" in proto:
        body = self._extract_h1_body(direction)
    elif "websocket" in proto:
        body = self._extract_ws_body(direction)
    else:
        body = b""

    self._body_cache[direction] = body
    return body

Serializing a flow

Flow.to_dict(include_bodies=False) returns a JSON-safe view: identity, transport/timing, parsed request/response, layer names, tags/notes and findings. Raw chunk bytes are never included; request/response bodies are included (hex-encoded) only when include_bodies=True.

flow_dict = flow.to_dict()                  # bodies omitted
flow_dict = flow.to_dict(include_bodies=True)  # bodies hex-encoded

Layer access

Each flow carries a stack of protocol layers. Access them two ways:

  • Attribute accessflow.tls, flow.quic, flow.ssh lazily materialize a typed layer if that protocol is registered (raises AttributeError for an unregistered name). Typed layers expose fields such as flow.tls.sni and flow.tls.alpn.
  • flow.layer(name) — a non-mutating lookup returning the existing layer or None. Use this in serialization paths so reading never grows the stack: flow.layer("http2"), then inspect .parsed / .data.
sni = flow.tls.sni                  # typed accessor
http2 = flow.layer("http2")         # None if absent
if http2 is not None:
    parsed = http2.parsed

Why pr.body may be empty

Bodies are not accumulated in the parser's ParseResult anymore — to keep .tap files small, identical/large bodies are reconstructed on demand from the flow's raw chunks. So a ParseResult (flow.request / flow.response) can have an empty .body even when data was captured. To get the bytes, reconstruct from the flow:

req = flow.request_body        # bytes, reconstructed + cached
resp = flow.response_body
# equivalently: flow.reconstruct_body("write") / flow.reconstruct_body("read")

FlowSummary

FlowSummary is the lightweight index entry used when listing flows without reading their full bodies (returned by read_flow_summaries() / get_summaries()).

friTap.flow.models.FlowSummary dataclass

Lightweight snapshot of a Flow for list display and filtering.

Contains only scalar metadata (~200 bytes) — no chunks, no body bytes. Compatible with the filter engine (has request/response stubs with the same attribute names as ParseResult).

Source code in friTap/flow/models.py
@dataclass(frozen=True, slots=True)
class FlowSummary:
    """Lightweight snapshot of a Flow for list display and filtering.

    Contains only scalar metadata (~200 bytes) — no chunks, no body bytes.
    Compatible with the filter engine (has ``request``/``response`` stubs
    with the same attribute names as ParseResult).
    """
    flow_id: str = ""
    connection_id: str = ""
    src_addr: str = ""
    src_port: int = 0
    dst_addr: str = ""
    dst_port: int = 0
    ssl_session_id: str = ""
    state: FlowState = FlowState.ACTIVE
    started: float = 0.0
    ended: float = 0.0
    # Transport/encryption protocol of the underlying connection ("tls", "quic",
    # "signal", ...). Mirrors Flow.transport so consumers can group/filter from
    # the cheap summary without materializing the full Flow.
    transport: str = "tls"
    request: Optional[_ParseStub] = None
    response: Optional[_ParseStub] = None
    has_ohttp: bool = False
    has_trailing_data: bool = False
    trailing_protocol: str = ""
    total_bytes: int = 0
    detected_protocol: str = ""
    # Schema v2 additive enrichment scalars (cheap to surface in list/filter views)
    process_name: str = ""
    tls_sni: str = ""
    tls_alpn: str = ""
    tag_count: int = 0
    finding_count: int = 0
    has_notes: bool = False
    # Layered protocol-display scalars (so the layered display works without a
    # live layer stack). See friTap.flow.display.display_protocol_layered.
    outer_app_protocol: str = ""
    inner_e2e_protocol: str = ""
    inner_summary: str = ""
    # Primary TL operation derived from the flow's message-bearing layers (e.g.
    # "sendMessage", "users", "updates"). Surfaced in the Method column without
    # loading full messages. See friTap.flow.display.method_from_messages.
    flow_method: str = ""

    @property
    def duration(self) -> float:
        if self.ended > 0:
            return self.ended - self.started
        return time.time() - self.started

    @property
    def display_protocol(self) -> str:
        return _display.display_protocol(self)

    @property
    def display_protocol_layered(self) -> str:
        return _display.display_protocol_layered(self)

    @property
    def display_method(self) -> str:
        return _display.display_method_layered(self)

    @property
    def display_host(self) -> str:
        return _display.display_host(self.request, self.dst_addr, self.dst_port)

    @property
    def display_status(self) -> str:
        return _display.display_status(self.response)

    @property
    def display_size(self) -> str:
        return _display.display_size(self.response, self.total_bytes)

    @property
    def display_source(self) -> str:
        return _display.display_source(self.src_addr, self.src_port)

    @property
    def display_connection(self) -> str:
        return _display.display_connection(
            self.request, self.response,
            self.src_addr, self.src_port, self.dst_addr, self.dst_port,
        )

    # Aliases for filter engine compatibility
    @property
    def _total_bytes(self) -> int:
        return self.total_bytes

    @property
    def ohttp_inner_request(self):
        # Filter engine checks `is not None` — return a truthy sentinel
        return _OHTTP_SENTINEL if self.has_ohttp else None

    @property
    def ohttp_inner_response(self):
        return _OHTTP_SENTINEL if self.has_ohttp else None

    @staticmethod
    def from_flow(flow: "Flow") -> "FlowSummary":
        """Create a summary snapshot from a full Flow."""
        req_stub = None
        if flow.request is not None:
            r = flow.request
            req_stub = _ParseStub(
                protocol=r.protocol, method=r.method, url=r.url,
                host=r.host, status_code=r.status_code, status_text=r.status_text,
                content_type=r.content_type, content_encoding=r.content_encoding,
                body_size=r.body_size, stream_id=r.stream_id,
                is_control_frame=r.is_control_frame,
            )
        resp_stub = None
        if flow.response is not None:
            r = flow.response
            resp_stub = _ParseStub(
                protocol=r.protocol, method=r.method, url=r.url,
                host=r.host, status_code=r.status_code, status_text=r.status_text,
                content_type=r.content_type, content_encoding=r.content_encoding,
                body_size=r.body_size, stream_id=r.stream_id,
                is_control_frame=r.is_control_frame,
            )
        # Non-mutating lookup: a summary is a pure read, so use layer() (None if
        # absent) rather than flow.tls, which would attach an empty layer.
        tls_layer = flow.layer("tls")
        # Prefer a scalar already set on the flow (e.g. a synthetic, layerless
        # flow rebuilt from a .tap summary in the TUI flow list), since
        # layered_scalars_from_flow returns "" for a flow with no layer stack.
        # Live flows have layers and no preset scalars, so they fall through to
        # the computed values and keep working unchanged.
        s_outer, s_inner, s_summary = _display.layered_scalars_from_flow(flow)
        outer_app = getattr(flow, "outer_app_protocol", "") or s_outer
        inner_e2e = getattr(flow, "inner_e2e_protocol", "") or s_inner
        inner_summary = getattr(flow, "inner_summary", "") or s_summary
        # Primary TL operation from the message-bearing layers; falls back to a
        # preset scalar (synthetic flows rebuilt from a .tap have no live layers).
        flow_method = getattr(flow, "flow_method", "") or _display.method_from_messages(flow)
        return FlowSummary(
            flow_id=flow.flow_id,
            connection_id=flow.connection_id,
            src_addr=flow.src_addr,
            src_port=flow.src_port,
            dst_addr=flow.dst_addr,
            dst_port=flow.dst_port,
            ssl_session_id=flow.ssl_session_id,
            state=flow.state,
            started=flow.started,
            ended=flow.ended,
            transport=getattr(flow, "transport", "tls") or "tls",
            request=req_stub,
            response=resp_stub,
            has_ohttp=(flow.ohttp_inner_request is not None
                       or flow.ohttp_inner_response is not None),
            has_trailing_data=flow.trailing_bytes is not None,
            trailing_protocol=flow.trailing_protocol,
            total_bytes=flow._total_bytes,
            detected_protocol=flow.detected_protocol,
            process_name=flow.process_name,
            tls_sni=(tls_layer.sni if tls_layer is not None else ""),
            tls_alpn=(tls_layer.alpn if tls_layer is not None else ""),
            tag_count=len(flow.tags),
            finding_count=len(flow.findings),
            has_notes=bool(flow.notes),
            outer_app_protocol=outer_app,
            inner_e2e_protocol=inner_e2e,
            inner_summary=inner_summary,
            flow_method=flow_method,
        )

    def to_dict(self) -> dict:
        """Return a JSON-safe, body-free dict for a high-level flow overview.

        Emits the canonical FlowSummary key set shared with
        :meth:`friTap.flow.tap_format.FlowSummary.to_dict` so that live
        summaries (built via :meth:`from_flow`) and offline summaries (read
        from a .tap) render identically in a web UI / TUI / CLI overview.
        """
        req, resp = self.request, self.response
        return {
            "flow_id": self.flow_id,
            "connection_id": self.connection_id,
            "src_addr": self.src_addr,
            "src_port": self.src_port,
            "dst_addr": self.dst_addr,
            "dst_port": self.dst_port,
            "ssl_session_id": self.ssl_session_id,
            "state": self.state.value if isinstance(self.state, FlowState) else str(self.state),
            "started": self.started,
            "ended": self.ended,
            "transport": self.transport,
            # Deterministic and parity-matched with tap_format.FlowSummary.to_dict:
            # the `duration` property returns wall-clock `time.time() - started`
            # for in-progress flows (ended == 0), which is non-deterministic and
            # would diverge from the offline summary shape. A serialized snapshot
            # uses 0.0 until the flow completes.
            "duration": (self.ended - self.started) if self.ended > 0 else 0.0,
            "protocol": (self.detected_protocol
                         or (req.protocol if req else "")
                         or (resp.protocol if resp else "")
                         or "unknown"),
            "method": req.method if req else "",
            "url": req.url if req else "",
            "host": req.host if req else "",
            "status_code": resp.status_code if resp else 0,
            "total_bytes": self.total_bytes,
            "detected_protocol": self.detected_protocol,
            "process_name": self.process_name,
            "tls_sni": self.tls_sni,
            "tls_alpn": self.tls_alpn,
            "tag_count": self.tag_count,
            "finding_count": self.finding_count,
            "has_notes": self.has_notes,
            "outer_app_protocol": self.outer_app_protocol,
            "inner_e2e_protocol": self.inner_e2e_protocol,
            "inner_summary": self.inner_summary,
            "flow_method": self.flow_method,
        }

Offline conversion

Reconstruct a .tap file from an existing packet capture plus its key material — no device, no live capture.

Requires Wireshark / tshark ≥ 4.x

pcap_to_tap() (and the --from-pcap CLI) shell out to tshark for dissection. If tshark is not on PATH, set tshark_path= (or the $FRITAP_TSHARK environment variable). The example below is offline but needs tshark installed.

from friTap import pcap_to_tap

result = pcap_to_tap(
    "chrome.pcap",
    keylog_path="chromekeys.log",   # SSLKEYLOGFILE; raises NoDecryptionKeysError if given but unusable
    tap_path="out.tap",
    run_scan=True,                  # also run analyzers and embed findings
)
print(result.to_dict())

Import path

pcap_to_tap is importable from the package root (from friTap import pcap_to_tap), but not from friTap.offline — re-exporting it there would shadow the friTap.offline.pcap_to_tap module. The no-manifest core is friTap.convert_pcap_to_tap(...).

pcap_to_tap() reads a manifest sidecar <pcap>.fritap.json (keys keylog/tls_ports/quic_ports) when use_manifest=True; explicit arguments win over the manifest, which wins over defaults. It returns a ConvertResult:

friTap.offline.pcap_to_tap.ConvertResult dataclass

Summary of a pcap-to-tap conversion.

Source code in friTap/offline/pcap_to_tap.py
@dataclass
class ConvertResult:
    """Summary of a pcap-to-tap conversion."""
    tap_path: str
    flow_count: int = 0
    decrypted_packet_count: int = 0
    stream_count: int = 0
    # QUIC per-packet drops (e.g. misaligned stream id/payload lists). Kept
    # distinct from dropped TLS streams below so the two are not conflated.
    dropped_packet_count: int = 0
    # TLS streams that could not be followed/decoded (whole-stream drops).
    dropped_stream_count: int = 0
    findings_count: int = 0
    # Streams skipped during a keyless (plaintext) conversion because they were
    # encrypted (TLS/QUIC) and therefore need keys. Drives the "looks encrypted —
    # pass --keylog" hint in the offline CLI.
    encrypted_streams_skipped: int = 0
    # MTProto (Telegram) offline-decryption counters (populated only when
    # --mtproto-keylog is supplied; the decryptor is friTap's own, not tshark).
    mtproto_messages: int = 0
    mtproto_streams: int = 0
    mtproto_records_undecryptable: int = 0
    mtproto_streams_degraded: int = 0
    # Protocol-generic counters keyed by counter_prefix, e.g.
    # ``{"mtproto": {"messages": 6, "streams": 2, "undecryptable": 0, "degraded": 0}}``.
    # The named ``mtproto_*`` fields above are kept as back-compat accessors and
    # stay mirrored; any OTHER registry-driven protocol (built-in or plugin) needs
    # only this dict (no new dataclass field). Populated via :meth:`record_protocol`.
    per_protocol: dict = field(default_factory=dict)

    def record_protocol(
        self,
        prefix: str,
        *,
        messages: int = 0,
        streams: int = 0,
        undecryptable: int = 0,
        degraded: int = 0,
    ) -> None:
        """Accumulate one protocol decryptor's counters (generic + back-compat).

        Writes the protocol-generic ``per_protocol[prefix]`` view AND, when a
        matching legacy named field exists (``<prefix>_messages`` etc.), mirrors
        the increment into it so existing readers of ``result.mtproto_messages`` /
        ``result.mtproto_streams`` keep working unchanged.
        """
        bucket = self.per_protocol.setdefault(
            prefix,
            {"messages": 0, "streams": 0, "undecryptable": 0, "degraded": 0},
        )
        bucket["messages"] += messages
        bucket["streams"] += streams
        bucket["undecryptable"] += undecryptable
        bucket["degraded"] += degraded
        for legacy_suffix, value in (
            ("messages", messages),
            ("streams", streams),
            ("records_undecryptable", undecryptable),
            ("streams_degraded", degraded),
        ):
            attr = f"{prefix}_{legacy_suffix}"
            if hasattr(self, attr):
                setattr(self, attr, getattr(self, attr) + value)

    def to_dict(self) -> dict:
        """Return a JSON-safe dict view of this conversion summary.

        Lets web/API callers serialize the result of a pcap-to-tap conversion
        without reaching into the dataclass fields by hand.
        """
        from dataclasses import asdict
        return asdict(self)

to_dict()

Return a JSON-safe dict view of this conversion summary.

Lets web/API callers serialize the result of a pcap-to-tap conversion without reaching into the dataclass fields by hand.

Source code in friTap/offline/pcap_to_tap.py
def to_dict(self) -> dict:
    """Return a JSON-safe dict view of this conversion summary.

    Lets web/API callers serialize the result of a pcap-to-tap conversion
    without reaching into the dataclass fields by hand.
    """
    from dataclasses import asdict
    return asdict(self)

Key fields: tap_path, flow_count, decrypted_packet_count, stream_count, dropped_packet_count, dropped_stream_count, findings_count, and encrypted_streams_skipped (streams that could not be decrypted and were tallied rather than emitted).

Caveats

Keyless 1-RTT-only QUIC captures are undetectable without keys. Encrypted streams that cannot be decrypted are skipped and counted in encrypted_streams_skipped. SSH banners and HTTP/2 control frames become synthetic, metadata-only flows. NoDecryptionKeysError is raised when keylog_path is given but no usable keys (and no embedded DSB) are found.


Reading .tap files

friTap captures persist as a binary .tap file (see the .tap binary format). Two readers are provided.

TapReader

TapReader is the low-level streaming reader.

friTap.flow.tap_reader.TapReader

Indexed reader for .tap capture files.

Usage::

reader = TapReader("capture.tap")
meta = reader.open()
summaries = reader.read_flow_summaries()   # fast, metadata only
flow = reader.read_flow("10.0.0.1:443-...:0")  # full load on demand
reader.close()
Source code in friTap/flow/tap_reader.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
class TapReader:
    """Indexed reader for .tap capture files.

    Usage::

        reader = TapReader("capture.tap")
        meta = reader.open()
        summaries = reader.read_flow_summaries()   # fast, metadata only
        flow = reader.read_flow("10.0.0.1:443-...:0")  # full load on demand
        reader.close()
    """

    def __init__(self, path: str) -> None:
        self._path = str(Path(path).resolve())
        self._file = None
        self._header: Optional[TapHeader] = None
        self._meta: Optional[TapMeta] = None
        self._data_start: int = 0  # byte offset where records begin
        # Index: flow_id -> file_offset of the FLOW record envelope
        self._flow_offsets: dict[str, int] = {}
        # Lazily-built findings index: flow_id -> list[Finding]. None = not built.
        self._findings_index: Optional[dict[str, list]] = None
        # Finding-presence flag determined cheaply at open() time.
        #   None  = undetermined (footer-index path does not walk records, so we
        #           cannot know without a scan — fall back to lazy full scan)
        #   False = open-time scan walked every record and saw NO REC_FINDING
        #           (read_findings short-circuits to [] with no second scan)
        #   True  = at least one REC_FINDING was seen; offsets captured below
        self._saw_finding_record: Optional[bool] = None
        # Byte offsets of REC_FINDING record envelopes captured during the
        # open-time linear scan, so read_findings can seek directly to them
        # instead of re-scanning the whole file.
        self._finding_offsets: list[int] = []
        self._opened: bool = False

    @property
    def path(self) -> str:
        return self._path

    @property
    def header(self) -> Optional[TapHeader]:
        return self._header

    @property
    def meta(self) -> Optional[TapMeta]:
        return self._meta

    @property
    def flow_count(self) -> int:
        return len(self._flow_offsets)

    def open(self) -> TapMeta:
        """Open the .tap file, parse the header, and build the flow index.

        Returns the file-level TapMeta.
        """
        self._file = open(self._path, "rb")
        self._opened = True

        try:
            # Read header
            header_raw = self._file.read(_HEADER_STRUCT.size + 4096)  # read extra for ext
            self._header, self._data_start = decode_header(header_raw)

            # Try to read META record (should be the first record)
            self._file.seek(self._data_start)
            self._meta = self._try_read_meta()

            # Build flow index
            if self._header.flags & FLAG_HAS_INDEX:
                self._build_index_from_footer()
            else:
                logger.info("No index in .tap file, performing linear scan")
                self._build_index_linear_scan()
        except Exception:
            self.close()
            raise

        logger.info(
            "TapReader opened: %s (%d flows)",
            self._path, len(self._flow_offsets),
        )
        return self._meta or TapMeta()

    def read_flow_summaries(self) -> list[FlowSummary]:
        """Read lightweight flow metadata for all flows (no chunks/bodies).

        Returns summaries sorted by start timestamp.
        """
        self._ensure_open()
        summaries = []

        for flow_id, offset in self._flow_offsets.items():
            try:
                payload = self._read_record_payload_at(offset)
                if payload is not None:
                    summary = decode_flow_summary(payload, file_offset=offset)
                    summaries.append(summary)
            except Exception:
                logger.debug("Failed to read flow summary at offset %d", offset, exc_info=True)

        # Enrich finding_count from the findings index: findings are stored in
        # separate REC_FINDING records, not in FLOW meta, so decode_flow_summary
        # cannot know them and leaves the count at 0. has_findings() is a cheap
        # no-op short-circuit for findings-free captures (the common case); only
        # captures that actually carry findings pay for the (cached) index build.
        if self.has_findings():
            for summary in summaries:
                summary.finding_count = len(self.read_findings(summary.flow_id))

        summaries.sort(key=lambda s: s.started)
        return summaries

    def read_flow(self, flow_id: str) -> Optional["Flow"]:
        """Read a full Flow object by flow_id (on-demand, with chunks/bodies).

        Returns None if the flow_id is not in the index.
        """
        self._ensure_open()
        offset = self._flow_offsets.get(flow_id)
        if offset is None:
            return None

        try:
            payload = self._read_record_payload_at(offset)
            if payload is None:
                return None
            flow = decode_flow(payload)
        except Exception:
            logger.error("Failed to read flow %s at offset %d", flow_id, offset, exc_info=True)
            return None

        # Findings are best-effort enrichment: a corrupt/undecodable findings
        # index must NEVER suppress an already-decoded valid flow. Isolate it.
        try:
            findings = self.read_findings(flow_id)
            if findings:
                flow.findings = list(findings)
        except Exception:
            logger.warning(
                "Failed to read findings for flow %s; returning flow without findings",
                flow_id, exc_info=True,
            )
        return flow

    def read_all_flows(self) -> list["Flow"]:
        """Read all flows fully (with chunks and bodies).

        For large captures, prefer read_flow_summaries() + read_flow() on demand.
        """
        self._ensure_open()
        flows = []

        for flow_id, offset in self._flow_offsets.items():
            try:
                payload = self._read_record_payload_at(offset)
                if payload is None:
                    continue
                flow = decode_flow(payload)
            except Exception:
                logger.debug("Failed to read flow at offset %d", offset, exc_info=True)
                continue

            # Findings are best-effort enrichment: a corrupt/undecodable
            # findings index must NEVER drop an already-decoded valid flow.
            try:
                findings = self.read_findings(flow.flow_id)
                if findings:
                    flow.findings = list(findings)
            except Exception:
                logger.warning(
                    "Failed to read findings for flow %s; flow kept without findings",
                    flow.flow_id, exc_info=True,
                )
            flows.append(flow)

        flows.sort(key=lambda f: f.started)
        return flows

    def read_findings(self, flow_id: str) -> list:
        """Return analysis findings persisted for *flow_id* (REC_FINDING records).

        The findings index is built lazily on first call (one linear pass over
        the file) and cached. Returns an empty list if the file carries none.
        """
        self._ensure_open()
        if self._findings_index is None:
            self._build_findings_index()
        return self._findings_index.get(flow_id, [])

    def has_findings(self) -> bool:
        """True if the file contains any persisted findings."""
        self._ensure_open()
        if self._findings_index is None:
            self._build_findings_index()
        return bool(self._findings_index)

    def close(self) -> None:
        """Close the file. Safe to call multiple times."""
        if self._file is not None:
            try:
                self._file.close()
            except Exception:
                pass
            self._file = None
        self._opened = False
        self._flow_offsets.clear()
        self._findings_index = None
        self._saw_finding_record = None
        self._finding_offsets = []

    def __enter__(self) -> "TapReader":
        self.open()
        return self

    def __exit__(self, *args) -> None:
        self.close()

    # ------------------------------------------------------------------
    # Internal: index building
    # ------------------------------------------------------------------

    def _build_index_from_footer(self) -> None:
        """Read the FLOW_INDEX from the footer pointer."""
        assert self._file is not None

        # Findings presence is recorded in the header flags at close() time, so
        # we can decide cheaply (no record walk) whether the findings scan is
        # worth running. A closed/indexed file (FLAG_HAS_INDEX, this path) that
        # lacks FLAG_HAS_FINDINGS provably contains no REC_FINDING records.
        if self._header is not None and self._header.flags & FLAG_HAS_FINDINGS:
            # Findings exist; offsets unknown on this path, so _build_findings_index
            # will scan once (lazily, only if findings are actually requested).
            self._saw_finding_record = True
        else:
            self._saw_finding_record = False

        # Read footer (last 16 bytes)
        self._file.seek(0, 2)  # seek to end
        file_size = self._file.tell()

        if file_size < _HEADER_STRUCT.size + _FOOTER_STRUCT.size:
            logger.warning("File too small for footer, falling back to linear scan")
            self._build_index_linear_scan()
            return

        self._file.seek(file_size - _FOOTER_STRUCT.size)
        footer_raw = self._file.read(_FOOTER_STRUCT.size)

        try:
            footer_magic, index_offset = _FOOTER_STRUCT.unpack(footer_raw)
        except Exception:
            logger.warning("Footer unpack failed, falling back to linear scan")
            self._build_index_linear_scan()
            return

        if footer_magic != FOOTER_MAGIC:
            logger.warning("Footer magic mismatch, falling back to linear scan")
            self._build_index_linear_scan()
            return

        # Read FLOW_INDEX record at index_offset
        payload = self._read_record_payload_at(index_offset)
        if payload is None:
            logger.warning("Failed to read FLOW_INDEX, falling back to linear scan")
            self._build_index_linear_scan()
            return

        entries = decode_flow_index(payload)
        for entry in entries:
            self._flow_offsets[entry["flow_id"]] = entry["offset"]

    def _build_index_linear_scan(self) -> None:
        """Scan all records sequentially to build the flow index.

        Used when the FLOW_INDEX is missing (partial capture / crash).
        Keeps only the last FLOW record per flow_id.
        """
        assert self._file is not None

        # This pass already visits every record, so capture finding presence
        # and offsets here for free. After this method completes, the flag is
        # authoritative (False if no REC_FINDING was seen), letting
        # _build_findings_index short-circuit instead of scanning a second time.
        self._saw_finding_record = False
        self._finding_offsets = []

        for offset, rec_type, payload, stored_crc in self._iter_records(self._data_start):
            # Record finding presence/offset cheaply (payload is decoded lazily
            # on first read_findings, not here).
            if rec_type == REC_FINDING:
                self._saw_finding_record = True
                self._finding_offsets.append(offset)
                continue

            if rec_type == REC_FLOW:
                if verify_payload_crc(payload, stored_crc):
                    try:
                        summary = decode_flow_summary(payload)
                        self._flow_offsets[summary.flow_id] = offset
                    except Exception:
                        logger.debug("Skipping corrupt FLOW at offset %d", offset)
                else:
                    logger.debug("CRC mismatch for FLOW at offset %d, skipping", offset)
            elif rec_type == REC_META and self._meta is None:
                if verify_payload_crc(payload, stored_crc):
                    try:
                        self._meta = decode_meta(payload)
                    except Exception:
                        pass

    def _build_findings_index(self) -> None:
        """Build the lazy flow_id -> [Finding] index from REC_FINDING records.

        Performance gate: the open-time scan records whether any REC_FINDING
        record exists (``self._saw_finding_record``) and, when so, their byte
        offsets (``self._finding_offsets``):

          * ``_saw_finding_record is False`` — the open-time linear scan walked
            every record and saw no findings, so we short-circuit to an empty
            index without re-reading the (potentially large) file. This is the
            common case for findings-free captures.
          * ``_finding_offsets`` populated — seek directly to each known finding
            record instead of scanning the whole file.
          * ``_saw_finding_record is None`` — undetermined (the footer-index
            path does not walk records, so presence is unknown without a scan).
            Fall back to the original full linear pass, preserving behaviour.

        The result is cached in ``self._findings_index``.
        """
        assert self._file is not None

        # Short-circuit: open-time scan proved there are no findings.
        if self._saw_finding_record is False:
            self._findings_index = {}
            return

        index: dict[str, list] = {}
        # Lazy import to avoid an import cycle (analysis imports flow.models).
        try:
            from friTap.analysis import Finding
        except Exception:
            Finding = None  # type: ignore

        def add_record(payload: bytes, stored_crc: int, offset: int) -> None:
            if not verify_payload_crc(payload, stored_crc):
                logger.debug("CRC mismatch for REC_FINDING at offset %d, skipping", offset)
                return
            try:
                flow_id, finding_dicts = decode_finding_record(payload)
            except Exception:
                logger.debug("Skipping corrupt REC_FINDING at offset %d", offset)
                return
            bucket = index.setdefault(flow_id, [])
            for fd in finding_dicts:
                bucket.append(Finding.from_dict(fd) if Finding is not None else fd)

        # Fast path: seek directly to the offsets captured at open() time.
        if self._saw_finding_record is True and self._finding_offsets:
            for offset in self._finding_offsets:
                self._file.seek(offset)
                envelope_raw = self._file.read(_RECORD_ENVELOPE.size)
                if len(envelope_raw) < _RECORD_ENVELOPE.size:
                    continue
                try:
                    rec_type, payload_len, stored_crc, _ = decode_record_envelope(envelope_raw)
                except ValueError:
                    continue
                if rec_type != REC_FINDING or payload_len == 0:
                    continue
                payload = self._file.read(payload_len)
                if len(payload) < payload_len:
                    continue
                add_record(payload, stored_crc, offset)
            self._findings_index = index
            return

        # Fallback (presence undetermined, e.g. footer-index path): full scan.
        for offset, rec_type, payload, stored_crc in self._iter_records(self._data_start):
            if rec_type != REC_FINDING:
                continue
            add_record(payload, stored_crc, offset)

        self._findings_index = index

    def _iter_records(self, start_offset: int):
        """Yield ``(offset, rec_type, payload, stored_crc)`` for every record.

        Single shared sequential record iterator used by both
        ``_build_index_linear_scan`` and ``_build_findings_index`` (fallback
        path). It handles sync-marker recovery on a corrupt envelope and skips
        zero-length records exactly as the prior duplicated loops did. The
        CRC is yielded unverified so each caller can apply its own
        verify/decode policy per record type (behaviour-preserving).
        """
        assert self._file is not None
        self._file.seek(start_offset)

        while True:
            offset = self._file.tell()
            envelope_raw = self._file.read(_RECORD_ENVELOPE.size)

            if len(envelope_raw) < _RECORD_ENVELOPE.size:
                break  # EOF or truncated

            try:
                rec_type, payload_len, stored_crc, _ = decode_record_envelope(envelope_raw)
            except ValueError:
                # Try to find next sync marker for recovery
                recovered = self._recover_from(offset + 1)
                if recovered < 0:
                    break
                continue

            if payload_len == 0:
                continue

            payload = self._file.read(payload_len)
            if len(payload) < payload_len:
                break  # Truncated payload at EOF

            yield offset, rec_type, payload, stored_crc

    def _recover_from(self, start_offset: int) -> int:
        """Scan forward from start_offset to find the next valid sync marker.

        Returns the offset of the next record, or -1 if not found.
        """
        assert self._file is not None
        self._file.seek(start_offset)

        # Read in 4KB chunks to find sync marker
        while True:
            chunk_start = self._file.tell()
            chunk = self._file.read(4096)
            if not chunk:
                return -1

            idx = find_sync_marker(chunk)
            if idx >= 0:
                recovered_offset = chunk_start + idx
                self._file.seek(recovered_offset)
                return recovered_offset

    # ------------------------------------------------------------------
    # Internal: record reading
    # ------------------------------------------------------------------

    def _read_record_payload_at(self, offset: int) -> Optional[bytes]:
        """Read and verify a single record's payload at the given file offset.

        Returns the raw payload bytes, or None if the record is invalid.
        """
        assert self._file is not None
        self._file.seek(offset)

        envelope_raw = self._file.read(_RECORD_ENVELOPE.size)
        if len(envelope_raw) < _RECORD_ENVELOPE.size:
            return None

        try:
            rec_type, payload_len, stored_crc, _ = decode_record_envelope(envelope_raw)
        except ValueError:
            return None

        payload = self._file.read(payload_len)
        if len(payload) < payload_len:
            return None

        if not verify_payload_crc(payload, stored_crc):
            logger.warning("CRC mismatch at offset %d", offset)
            return None

        return payload

    def _try_read_meta(self) -> Optional[TapMeta]:
        """Try to read a META record at the current position."""
        assert self._file is not None
        pos = self._file.tell()

        envelope_raw = self._file.read(_RECORD_ENVELOPE.size)
        if len(envelope_raw) < _RECORD_ENVELOPE.size:
            self._file.seek(pos)
            return None

        try:
            rec_type, payload_len, stored_crc, _ = decode_record_envelope(envelope_raw)
        except ValueError:
            self._file.seek(pos)
            return None

        if rec_type != REC_META:
            self._file.seek(pos)
            return None

        payload = self._file.read(payload_len)
        if len(payload) < payload_len:
            self._file.seek(pos)
            return None

        if not verify_payload_crc(payload, stored_crc):
            self._file.seek(pos)
            return None

        return decode_meta(payload)

    def _ensure_open(self) -> None:
        if not self._opened or self._file is None:
            raise RuntimeError("TapReader is not open. Call open() first.")

open()

Open the .tap file, parse the header, and build the flow index.

Returns the file-level TapMeta.

Source code in friTap/flow/tap_reader.py
def open(self) -> TapMeta:
    """Open the .tap file, parse the header, and build the flow index.

    Returns the file-level TapMeta.
    """
    self._file = open(self._path, "rb")
    self._opened = True

    try:
        # Read header
        header_raw = self._file.read(_HEADER_STRUCT.size + 4096)  # read extra for ext
        self._header, self._data_start = decode_header(header_raw)

        # Try to read META record (should be the first record)
        self._file.seek(self._data_start)
        self._meta = self._try_read_meta()

        # Build flow index
        if self._header.flags & FLAG_HAS_INDEX:
            self._build_index_from_footer()
        else:
            logger.info("No index in .tap file, performing linear scan")
            self._build_index_linear_scan()
    except Exception:
        self.close()
        raise

    logger.info(
        "TapReader opened: %s (%d flows)",
        self._path, len(self._flow_offsets),
    )
    return self._meta or TapMeta()

read_flow_summaries()

Read lightweight flow metadata for all flows (no chunks/bodies).

Returns summaries sorted by start timestamp.

Source code in friTap/flow/tap_reader.py
def read_flow_summaries(self) -> list[FlowSummary]:
    """Read lightweight flow metadata for all flows (no chunks/bodies).

    Returns summaries sorted by start timestamp.
    """
    self._ensure_open()
    summaries = []

    for flow_id, offset in self._flow_offsets.items():
        try:
            payload = self._read_record_payload_at(offset)
            if payload is not None:
                summary = decode_flow_summary(payload, file_offset=offset)
                summaries.append(summary)
        except Exception:
            logger.debug("Failed to read flow summary at offset %d", offset, exc_info=True)

    # Enrich finding_count from the findings index: findings are stored in
    # separate REC_FINDING records, not in FLOW meta, so decode_flow_summary
    # cannot know them and leaves the count at 0. has_findings() is a cheap
    # no-op short-circuit for findings-free captures (the common case); only
    # captures that actually carry findings pay for the (cached) index build.
    if self.has_findings():
        for summary in summaries:
            summary.finding_count = len(self.read_findings(summary.flow_id))

    summaries.sort(key=lambda s: s.started)
    return summaries

read_flow(flow_id)

Read a full Flow object by flow_id (on-demand, with chunks/bodies).

Returns None if the flow_id is not in the index.

Source code in friTap/flow/tap_reader.py
def read_flow(self, flow_id: str) -> Optional["Flow"]:
    """Read a full Flow object by flow_id (on-demand, with chunks/bodies).

    Returns None if the flow_id is not in the index.
    """
    self._ensure_open()
    offset = self._flow_offsets.get(flow_id)
    if offset is None:
        return None

    try:
        payload = self._read_record_payload_at(offset)
        if payload is None:
            return None
        flow = decode_flow(payload)
    except Exception:
        logger.error("Failed to read flow %s at offset %d", flow_id, offset, exc_info=True)
        return None

    # Findings are best-effort enrichment: a corrupt/undecodable findings
    # index must NEVER suppress an already-decoded valid flow. Isolate it.
    try:
        findings = self.read_findings(flow_id)
        if findings:
            flow.findings = list(findings)
    except Exception:
        logger.warning(
            "Failed to read findings for flow %s; returning flow without findings",
            flow_id, exc_info=True,
        )
    return flow

read_all_flows()

Read all flows fully (with chunks and bodies).

For large captures, prefer read_flow_summaries() + read_flow() on demand.

Source code in friTap/flow/tap_reader.py
def read_all_flows(self) -> list["Flow"]:
    """Read all flows fully (with chunks and bodies).

    For large captures, prefer read_flow_summaries() + read_flow() on demand.
    """
    self._ensure_open()
    flows = []

    for flow_id, offset in self._flow_offsets.items():
        try:
            payload = self._read_record_payload_at(offset)
            if payload is None:
                continue
            flow = decode_flow(payload)
        except Exception:
            logger.debug("Failed to read flow at offset %d", offset, exc_info=True)
            continue

        # Findings are best-effort enrichment: a corrupt/undecodable
        # findings index must NEVER drop an already-decoded valid flow.
        try:
            findings = self.read_findings(flow.flow_id)
            if findings:
                flow.findings = list(findings)
        except Exception:
            logger.warning(
                "Failed to read findings for flow %s; flow kept without findings",
                flow.flow_id, exc_info=True,
            )
        flows.append(flow)

    flows.sort(key=lambda f: f.started)
    return flows

close()

Close the file. Safe to call multiple times.

Source code in friTap/flow/tap_reader.py
def close(self) -> None:
    """Close the file. Safe to call multiple times."""
    if self._file is not None:
        try:
            self._file.close()
        except Exception:
            pass
        self._file = None
    self._opened = False
    self._flow_offsets.clear()
    self._findings_index = None
    self._saw_finding_record = None
    self._finding_offsets = []
from friTap import TapReader

reader = TapReader("capture_20260507_153933.tap")   # offline / CI-runnable
meta = reader.open()
for summary in reader.read_flow_summaries():
    print(summary.flow_id, summary.host, summary.status_code)
reader.close()

ReplayController

ReplayController is the higher-level facade used by the TUI replay view. It adds an LRU cache (128 flows), a context-manager interface, and convenience properties. It implements IFlowSource (get_flows() / get_flow(id)).

friTap.flow.replay.ReplayController

Loads and serves flows from a .tap file for TUI replay mode.

Satisfies the IFlowSource protocol so MainScreen can use it interchangeably with FlowCollector via CaptureController.

Source code in friTap/flow/replay.py
class ReplayController:
    """Loads and serves flows from a .tap file for TUI replay mode.

    Satisfies the IFlowSource protocol so MainScreen can use it
    interchangeably with FlowCollector via CaptureController.
    """

    def __init__(self, path: str) -> None:
        self._path = path
        self._reader: Optional[TapReader] = None
        self._meta: Optional[TapMeta] = None
        self._summaries: list[FlowSummary] = []
        self._flow_cache: _LRUCache = _LRUCache(maxsize=128)
        self._reparse_results: dict[str, tuple[Optional["ParseResult"], Optional["ParseResult"]]] = {}

    @property
    def replay_file(self) -> str:
        return self._path

    @property
    def flow_count(self) -> int:
        return len(self._summaries)

    @property
    def meta(self) -> Optional[TapMeta]:
        return self._meta

    @property
    def header(self):
        return self._reader.header if self._reader else None

    def load(self) -> TapMeta:
        """Open the .tap file and load flow summaries.

        Returns the file-level TapMeta.
        """
        self._reader = TapReader(self._path)
        self._meta = self._reader.open()
        self._summaries = self._reader.read_flow_summaries()
        logger.info(
            "Replay loaded: %s (%d flows)", self._path, len(self._summaries)
        )
        return self._meta

    def get_summaries(self) -> list[FlowSummary]:
        """Return all flow summaries (lightweight, no chunk/body data)."""
        return list(self._summaries)

    def get_flows(self) -> list["Flow"]:
        """Return all flows as full Flow objects.

        Note: For large captures this loads everything into memory.
        Prefer get_summaries() + get_flow() for on-demand loading.
        """
        if self._reader is None:
            return []
        return self._reader.read_all_flows()

    def get_flow(self, flow_id: str) -> Optional["Flow"]:
        """Load a full Flow by ID (on-demand from disk, LRU-cached)."""
        flow = self._flow_cache.get(flow_id)
        if flow is None:
            if self._reader is None:
                return None
            flow = self._reader.read_flow(flow_id)
            if flow is None:
                return None
            self._flow_cache[flow_id] = flow
        # Re-apply reparse results that survive cache eviction
        if flow_id in self._reparse_results:
            req, resp = self._reparse_results[flow_id]
            if req is not None:
                flow.request = req
            if resp is not None:
                flow.response = resp
        return flow

    def store_reparse(
        self,
        flow_id: str,
        request: Optional["ParseResult"],
        response: Optional["ParseResult"],
    ) -> None:
        """Store reparse results so they survive cache eviction."""
        self._reparse_results[flow_id] = (request, response)

    def read_all_findings(self) -> list:
        """Return persisted findings (as dicts) across all flows in the .tap.

        Passthrough over :class:`~friTap.flow.tap_reader.TapReader` so callers
        (e.g. the TUI findings viewer) don't reach into the private reader.
        Iterates the loaded summaries and collects each flow's REC_FINDING
        records. Returns an empty list if the file carries none.
        """
        if self._reader is None:
            return []
        findings: list = []
        for summary in self._summaries:
            findings.extend(self._reader.read_findings(summary.flow_id))
        return findings

    def has_findings(self) -> bool:
        """True if the .tap file contains any persisted findings."""
        if self._reader is None:
            return False
        return self._reader.has_findings()

    def close(self) -> None:
        """Close the reader and release resources."""
        if self._reader is not None:
            self._reader.close()
            self._reader = None
        self._flow_cache.clear()
        self._reparse_results.clear()
        self._summaries.clear()

    def __enter__(self) -> "ReplayController":
        self.load()
        return self

    def __exit__(self, *args) -> None:
        self.close()

load()

Open the .tap file and load flow summaries.

Returns the file-level TapMeta.

Source code in friTap/flow/replay.py
def load(self) -> TapMeta:
    """Open the .tap file and load flow summaries.

    Returns the file-level TapMeta.
    """
    self._reader = TapReader(self._path)
    self._meta = self._reader.open()
    self._summaries = self._reader.read_flow_summaries()
    logger.info(
        "Replay loaded: %s (%d flows)", self._path, len(self._summaries)
    )
    return self._meta

get_summaries()

Return all flow summaries (lightweight, no chunk/body data).

Source code in friTap/flow/replay.py
def get_summaries(self) -> list[FlowSummary]:
    """Return all flow summaries (lightweight, no chunk/body data)."""
    return list(self._summaries)

get_flows()

Return all flows as full Flow objects.

Note: For large captures this loads everything into memory. Prefer get_summaries() + get_flow() for on-demand loading.

Source code in friTap/flow/replay.py
def get_flows(self) -> list["Flow"]:
    """Return all flows as full Flow objects.

    Note: For large captures this loads everything into memory.
    Prefer get_summaries() + get_flow() for on-demand loading.
    """
    if self._reader is None:
        return []
    return self._reader.read_all_flows()

get_flow(flow_id)

Load a full Flow by ID (on-demand from disk, LRU-cached).

Source code in friTap/flow/replay.py
def get_flow(self, flow_id: str) -> Optional["Flow"]:
    """Load a full Flow by ID (on-demand from disk, LRU-cached)."""
    flow = self._flow_cache.get(flow_id)
    if flow is None:
        if self._reader is None:
            return None
        flow = self._reader.read_flow(flow_id)
        if flow is None:
            return None
        self._flow_cache[flow_id] = flow
    # Re-apply reparse results that survive cache eviction
    if flow_id in self._reparse_results:
        req, resp = self._reparse_results[flow_id]
        if req is not None:
            flow.request = req
        if resp is not None:
            flow.response = resp
    return flow

close()

Close the reader and release resources.

Source code in friTap/flow/replay.py
def close(self) -> None:
    """Close the reader and release resources."""
    if self._reader is not None:
        self._reader.close()
        self._reader = None
    self._flow_cache.clear()
    self._reparse_results.clear()
    self._summaries.clear()
from friTap import ReplayController

with ReplayController("capture_20260507_153933.tap") as rc:   # offline / CI-runnable
    meta = rc.load()
    print("flows:", rc.flow_count)
    for summary in rc.get_summaries():
        print(summary.flow_id, summary.protocol, summary.host)
    flow = rc.get_flow(rc.get_summaries()[0].flow_id)
    print(flow.to_dict())

Traffic analysis

friTap can run its analyzers (credentials, ioc, privacy, protobuf, plus custom ones) over a captured .tap file — passive analysis, no network activity. Use analyze_tap_report() for programmatic access; it performs no I/O beyond reading the .tap and never calls sys.exit.

friTap.commands.analyze.analyze_tap_report(tap_file, *, scanners=None, analyzer_path=None, min_severity='info', report_format='table', include_private_ips=False, protobuf_schema=None, min_confidence=0.0, source=None, category=None, show_pii=False)

Run analyzers over tap_file and return findings + a rendered report.

Pure orchestration shared by the CLI and external tools: resolve analyzers (scanners is a comma-separated name list, or None/"all" for the built-ins) → run them → filter (severity/confidence/source/category) → render in report_format (one of :func:list_report_formats). Performs no stdout, no sidecar write and never calls sys.exit — callers decide how to surface the result.

The filter arguments are all additive and default to no-ops, so existing callers are unaffected:

  • min_confidence — keep findings with confidence at or above this value.
  • source — comma-separated analyzer source names to keep (which findings show; distinct from scanners, which selects analyzers that run).
  • category — comma-separated finding categories to keep (secret/pii/network/protocol).
  • show_pii — reveal PII/secret values instead of redacting them (forwarded to analyzers as reveal_pii; default redacts).

Raises ValueError for an unknown report_format or an unresolvable analyzer spec, ImportError for a bad analyzer_path; any .tap read/analyze failure propagates.

Source code in friTap/commands/analyze.py
def analyze_tap_report(
    tap_file: str,
    *,
    scanners: str | None = None,
    analyzer_path: str | None = None,
    min_severity: str = "info",
    report_format: str = "table",
    include_private_ips: bool = False,
    protobuf_schema: str | None = None,
    min_confidence: float = 0.0,
    source: str | None = None,
    category: str | None = None,
    show_pii: bool = False,
) -> AnalyzeReport:
    """Run analyzers over *tap_file* and return findings + a rendered report.

    Pure orchestration shared by the CLI and external tools: resolve analyzers
    (``scanners`` is a comma-separated name list, or ``None``/``"all"`` for the
    built-ins) → run them → filter (severity/confidence/source/category) →
    render in *report_format* (one of :func:`list_report_formats`). Performs no
    stdout, no sidecar write and never calls ``sys.exit`` — callers decide how
    to surface the result.

    The filter arguments are all additive and default to no-ops, so existing
    callers are unaffected:

    * ``min_confidence`` — keep findings with confidence at or above this value.
    * ``source`` — comma-separated analyzer source names to keep (which findings
      *show*; distinct from ``scanners``, which selects analyzers that *run*).
    * ``category`` — comma-separated finding categories to keep
      (``secret``/``pii``/``network``/``protocol``).
    * ``show_pii`` — reveal PII/secret values instead of redacting them
      (forwarded to analyzers as ``reveal_pii``; default redacts).

    Raises ``ValueError`` for an unknown *report_format* or an unresolvable
    analyzer spec, ``ImportError`` for a bad ``analyzer_path``; any .tap
    read/analyze failure propagates.
    """
    from friTap.analysis.filtering import FindingFilter, apply, split_csv

    if report_format not in _REPORTER_REGISTRY:
        raise ValueError(
            f"unknown report format {report_format!r}; "
            f"choose from {', '.join(sorted(_REPORTER_REGISTRY))}"
        )

    analyzers = resolve_analyzers(
        scanners,
        analyzer_path=analyzer_path,
        include_private_ips=include_private_ips,
        protobuf_schema=protobuf_schema,
        reveal_pii=show_pii,
    )
    findings = analyze_tap_multi(analyzers, tap_file)
    normalized_severity = min_severity if min_severity in _SEVERITY_ORDER else "info"
    finding_filter = FindingFilter(
        min_severity=normalized_severity,
        sources=split_csv(source),
        categories=split_csv(category),
        min_confidence=min_confidence if min_confidence > 0.0 else None,
    )
    filtered = apply(findings, finding_filter)
    analyzer_names = [a.name for a in analyzers]
    meta = {"tap_file": tap_file, "analyzers": analyzer_names}
    # AnalyzeReport.findings keep full fidelity; only the rendered report is
    # redacted (unless show_pii). The reporter is the redaction enforcement point.
    rendered = _REPORTER_REGISTRY[report_format](redact_pii=not show_pii).report(filtered, meta)
    return AnalyzeReport(
        findings=filtered,
        rendered=rendered,
        report_format=report_format,
        analyzer_names=analyzer_names,
        meta=meta,
    )

friTap.commands.analyze.AnalyzeReport dataclass

Result of a programmatic .tap analysis.

Carries both the structured (already severity-filtered) findings and the report rendered in the requested format, so web / TUI / CLI callers can use whichever they need without re-running the analyzers. Returned by :func:analyze_tap_report.

Source code in friTap/commands/analyze.py
@dataclass(frozen=True)
class AnalyzeReport:
    """Result of a programmatic ``.tap`` analysis.

    Carries both the structured (already severity-filtered) findings and the
    report rendered in the requested format, so web / TUI / CLI callers can use
    whichever they need without re-running the analyzers. Returned by
    :func:`analyze_tap_report`.
    """

    findings: list["Finding"]
    rendered: str
    report_format: str
    analyzer_names: list[str] = field(default_factory=list)
    meta: dict[str, Any] = field(default_factory=dict)
    gate_severity: str = _GATE_SEVERITY

    @property
    def gate_tripped(self) -> bool:
        """True iff any finding is at or above :attr:`gate_severity`.

        Mirrors the condition under which :func:`run_analyze_cli` returns the
        non-zero CI-gate exit code.
        """
        return bool(_filter_min_severity(self.findings, self.gate_severity))

    @property
    def exit_code(self) -> int:
        """CLI-parity exit code: 2 when :attr:`gate_tripped`, else 0."""
        return 2 if self.gate_tripped else 0

gate_tripped property

True iff any finding is at or above :attr:gate_severity.

Mirrors the condition under which :func:run_analyze_cli returns the non-zero CI-gate exit code.

exit_code property

CLI-parity exit code: 2 when :attr:gate_tripped, else 0.

from friTap import analyze_tap_report

report = analyze_tap_report(
    "capture_20260507_153933.tap",   # offline / CI-runnable
    scanners="credentials,ioc",      # None or "all" → built-ins (which analyzers RUN)
    min_severity="info",
    report_format="table",
    # report-side filters (additive, keyword-only; all default to no-op):
    min_confidence=0.0,              # drop findings below this confidence (0.0–1.0)
    source=None,                     # comma-separated source names to SHOW (e.g. "credentials,privacy")
    category=None,                   # "secret,pii,network,protocol" to SHOW
    show_pii=False,                  # reveal PII/secret values instead of redacting
)

for finding in report.findings:
    print(finding.severity.name, finding.category, finding.title, finding.flow_id)

print(report.rendered)               # the rendered table/json/csv/md
raise SystemExit(report.exit_code)   # 2 if any finding ≥ medium, else 0 (CI gate)

min_confidence, source, category, and show_pii are additive, keyword-only parameters — older calls that omit them keep working unchanged. scanners chooses which analyzers run; source/category filter which of the resulting findings are shown.

Findings & severity

Each Finding has: severity (a Severity enum), title, description, source, flow_id, confidence (default 1.0), timestamp, and the dicts evidence and metadata. Severity is CRITICAL/HIGH/MEDIUM/LOW/INFO (rank 0 is most severe).

Every finding now carries a category in metadata["category"], surfaced as the Finding.category property: secret (credentials), pii (privacy), network (IOC), or protocol (protobuf). PII findings additionally carry metadata["compliance"] (e.g. GDPR, CCPA, PCI-DSS, HIPAA).

Filtering findings

FindingFilter (exported from the top-level friTap package) is the reusable filter behind the CLI's --source/--category/--min-confidence flags. Apply it to any iterable of Finding objects:

from friTap import FindingFilter

flt = FindingFilter(
    min_severity="info",                 # floor: this severity and above
    min_confidence=0.8,
    sources={"credentials", "privacy"},
    categories={"secret", "pii"},
)
kept = [f for f in report.findings if flt.matches(f)]

Use severities for an exact-bucket match instead of the min_severity floor — FindingFilter(severities={"high"}) keeps only high findings (this is what the TUI's per-severity dashboard chips use, so a chip's count matches its result):

high_only = [f for f in report.findings
             if FindingFilter(severities={"high"}).matches(f)]

PrivacyAnalyzer import path

The built-in analyzer classes are available under friTap.analysis.* — the privacy analyzer is from friTap.analysis.privacy import PrivacyAnalyzer (not re-exported from the top-level friTap package). Select it by name via scanners="privacy" rather than importing it directly.

CI gate

AnalyzeReport.gate_tripped is True when any finding is at or above the gate severity (medium), and exit_code returns 2 in that case (else 0) — mirroring the fritap analyze CLI exit code. Use it to fail a pipeline when a capture contains sensitive findings.

Discovery helpers

  • list_analyzers() → analyzer names (built-in plus any discovered).
  • list_analyzers_detailed() → richer AnalyzerInfo records (name, source/origin, description) for every analyzer, including discovered ones. Use this to render an analyzer catalog in your own tooling.
  • list_report_formats() → available report formats (json, csv, md, table).
from friTap import list_analyzers, list_analyzers_detailed

print(list_analyzers())              # ['credentials', 'ioc', 'privacy', 'protobuf', ...]
for info in list_analyzers_detailed():
    print(info.name, "-", info.description)

Custom analyzers can be loaded via the CLI --analyzer-path module:Class (the module:Class form skips the is_fritap_analyzer marker requirement), or made ambiently discoverable through the drop-in directory, the fritap.analyzers entry-points group, or a plugin's register_analyzers() hook — see Traffic Analysis → Ship it everywhere.

Discovery executes code

Discovered analyzers (drop-in directory, fritap.analyzers entry points, plugin bridge) execute as soon as list_analyzers()/list_analyzers_detailed() or any resolution runs. Set FRITAP_DISABLE_ANALYZER_DISCOVERY=1 to disable ambient discovery; explicit --analyzer-path / .analyzer_path() references still work.

Live scan from the builder

The FriTap builder exposes live passive analysis directly. scan() enables it, analyzer_path() is repeatable, and scan_report(), scan_min_severity(), and scan_show_pii() set the corresponding --scan-* options:

from friTap import FriTap

session = (
    FriTap("com.example.app")
    .mobile()
    .scan("all")                                  # or "credentials,ioc"
    .analyzer_path("my_analyzer:InternalHostAnalyzer")
    .scan_report("table")
    .scan_min_severity("medium")
    .start()
)

Protobuf utilities

Schema-less protobuf wire decoding with zero external dependencies — handy for inspecting captured gRPC/protobuf bodies.

friTap.parsers.protobuf.decode_raw(data, max_depth=16)

Decode raw protobuf wire format without a schema.

Recursively attempts sub-message parsing for LENGTH_DELIMITED fields.

Parameters:

Name Type Description Default
data bytes

Raw protobuf bytes.

required
max_depth int

Maximum nesting depth (default 16).

16

Returns:

Name Type Description
A ProtobufMessage

class:ProtobufMessage tree.

Raises:

Type Description
ValueError

If the data is malformed.

Source code in friTap/parsers/protobuf/wire.py
def decode_raw(data: bytes, max_depth: int = 16) -> ProtobufMessage:
    """Decode raw protobuf wire format without a schema.

    Recursively attempts sub-message parsing for ``LENGTH_DELIMITED`` fields.

    Args:
        data: Raw protobuf bytes.
        max_depth: Maximum nesting depth (default 16).

    Returns:
        A :class:`ProtobufMessage` tree.

    Raises:
        ValueError: If the data is malformed.
    """
    if not data:
        return ProtobufMessage()
    return _decode_raw_impl(data, max_depth)

friTap.parsers.protobuf.format_message(msg, indent=0)

Format a :class:ProtobufMessage as human-readable indented text.

Output resembles protoc --decode_raw.

Source code in friTap/parsers/protobuf/wire.py
def format_message(msg: ProtobufMessage, indent: int = 0) -> str:
    """Format a :class:`ProtobufMessage` as human-readable indented text.

    Output resembles ``protoc --decode_raw``.
    """
    lines: list[str] = []
    prefix = "  " * indent

    for f in msg.fields:
        if f.wire_type == WireType.VARINT and f.varint is not None:
            zigzag = _zigzag_decode(f.varint)
            if zigzag != f.varint and zigzag < 0:
                lines.append(f"{prefix}{f.field_number}: {f.varint} (signed: {zigzag})")
            else:
                lines.append(f"{prefix}{f.field_number}: {f.varint}")

        elif f.wire_type == WireType.FIXED64 and f.fixed64 is not None:
            as_double = struct.unpack("<d", f.fixed64)[0]
            as_uint64 = struct.unpack("<Q", f.fixed64)[0]
            as_int64 = struct.unpack("<q", f.fixed64)[0]
            lines.append(
                f"{prefix}{f.field_number}: 0x{f.fixed64.hex()} "
                f"(uint64: {as_uint64}, int64: {as_int64}, double: {as_double})"
            )

        elif f.wire_type == WireType.FIXED32 and f.fixed32 is not None:
            as_float = struct.unpack("<f", f.fixed32)[0]
            as_uint32 = struct.unpack("<I", f.fixed32)[0]
            as_int32 = struct.unpack("<i", f.fixed32)[0]
            lines.append(
                f"{prefix}{f.field_number}: 0x{f.fixed32.hex()} "
                f"(uint32: {as_uint32}, int32: {as_int32}, float: {as_float})"
            )

        elif f.wire_type == WireType.LENGTH_DELIMITED:
            if f.sub_message is not None:
                lines.append(f"{prefix}{f.field_number} {{")
                lines.append(format_message(f.sub_message, indent + 1))
                lines.append(f"{prefix}}}")
            elif f.packed_varints is not None:
                vals = ", ".join(str(v) for v in f.packed_varints)
                lines.append(f"{prefix}{f.field_number}: [packed] [{vals}]")
            elif f.length_delimited is not None:
                lines.append(
                    f"{prefix}{f.field_number}: "
                    f"{_format_bytes_as_string_or_hex(f.length_delimited)}"
                )

    return "\n".join(lines)
from friTap import decode_raw, format_message

msg = decode_raw(b"\x08\x96\x01")   # offline / CI-runnable
print(format_message(msg))
# 1: 150

decode_raw(data, max_depth=16) returns a ProtobufMessage (a tree of ProtobufField objects); format_message(msg, indent=0) renders it as human-readable text. ProtobufProcessor provides higher-level, schema-aware processing.


Legacy SSL_Logger (deprecated)

Deprecated — will be removed in friTap 3.0

SSL_Logger is retained only for backward compatibility. New code should use the FriTap builder (or pass a FriTapConfig — build one from old kwargs with FriTapConfig.from_legacy_params(...)).

Minimal legacy usage (live / device):

from friTap import SSL_Logger
import time

logger = SSL_Logger(
    app="firefox",
    pcap_name="traffic.pcap",
    keylog="keys.log",
    verbose=True,
)
logger.install_signal_handler()
logger.start_fritap_session()

while logger.running:
    time.sleep(1)

To migrate, replace the flat constructor with the builder:

from friTap import FriTap

session = FriTap("firefox").pcap("traffic.pcap").keylog("keys.log").verbose().start()

Next steps