friTap ships a stable, SemVer-guaranteed Python API for driving captures, consuming live events, and working with .tap files offline. The public surface is everything exported from the friTap package __all__ (59 symbols); removing or breaking any of them requires a friTap MAJOR bump (see RELEASING.md).
This page leads with the modern API:
the FriTap builder for configuring and starting captures,
the EventBus and typed events for consuming data programmatically,
The legacy SSL_Logger class is documented last, in a deprecated section.
CI-runnable vs. live-device examples
Examples in this page are labelled either live / device (they attach to or spawn a real process and cannot run in CI) or offline / CI-runnable (they operate on .tap files or in-memory objects and run anywhere friTap imports).
FriTap(...).start() attaches to or spawns a real target through Frida. This cannot run in CI — it needs a device and the target app/process.
fromfriTapimportFriTapsession=(FriTap("com.example.app").mobile()# target a USB device.keylog("keys.log")# write an SSLKEYLOGFILE.pcap("cap.pcapng")# write decrypted traffic (pcapng → embedded DSB).start()# returns a FriTapSession)session.wait()# block until the target exits; or call session.stop()
build_config() is the offline-safe half of the builder — it returns a FriTapConfig without touching a device, which is useful for tests and for inspecting/serializing configuration:
Every method below returns self, so calls chain. Names are verbatim from friTap.api.FriTap. The builder is hand-documented (chained APIs render poorly under autodoc).
Write decrypted traffic to a PCAP file. The extension is authoritative — .pcap writes classic libpcap, .pcapng writes pcapng (with embedded DSB when keys are extracted).
pcapng(path)
Write self-decrypting PCAPNG. Equivalent to pcap() with a .pcapng extension; the file extension still wins.
keylog(path)
Write key material in Wireshark-loadable form for the active protocol (NSS SSLKEYLOGFILE for TLS). With --protocol all/auto, the path is split per protocol as <stem>.<proto><ext>.
json_output(path)
Write session metadata as JSON.
verbose(enable=True)
Enable verbose console output.
live(enable=True)
Stream to Wireshark via a named pipe.
full_capture(enable=True)
Capture the full network stream, not just decrypted payload.
Use pattern-based (symbol-less) hooking from a JSON file.
offsets(path)
Use offset-based hooking from a JSON file.
experimental(enable=True)
Enable experimental features.
anti_root(enable=True)
Enable anti-root-detection hooks (Android).
payload_modification(enable=True)
Enable payload-modification capability.
force_scan(name)
Force-scan a module that Cronet-split-topology suppression would otherwise skip. Accepts a literal name, a stem prefix (name*) or a regex (re:...). May be called multiple times.
friTap dispatches typed events through a thread-safe publish-subscribe EventBus. Output handlers, the TUI, plugins, and your own callbacks all subscribe to event classes.
bus = EventBus() bus.subscribe(KeylogEvent, my_handler) bus.emit(KeylogEvent(key_data="CLIENT_RANDOM ..."))
Subscribers can specify a priority (higher runs first). Plugins should use EventBus.PLUGIN_PRIORITY so they execute before the built-in output handlers.
classEventBus:""" Simple thread-safe publish-subscribe event bus. Usage: bus = EventBus() bus.subscribe(KeylogEvent, my_handler) bus.emit(KeylogEvent(key_data="CLIENT_RANDOM ...")) Subscribers can specify a *priority* (higher runs first). Plugins should use ``EventBus.PLUGIN_PRIORITY`` so they execute before the built-in output handlers. """MAX_HANDLER_FAILURES=10PLUGIN_PRIORITY=100def__init__(self,on_handler_error:Optional[Callable]=None)->None:self._subscribers:Dict[Type[FriTapEvent],List[Tuple[int,Callable]]]={}self._lock=threading.RLock()self._logger=logging.getLogger("friTap.events")self._failure_counts:Dict[str,int]={}self._on_handler_error=on_handler_errordefsubscribe(self,event_type:Type[FriTapEvent],callback:Callable,*,priority:int=0)->None:"""Register *callback* to be called whenever *event_type* is emitted. Higher *priority* values run first. The default is 0; plugins should use ``EventBus.PLUGIN_PRIORITY`` (100). """withself._lock:subs=self._subscribers.setdefault(event_type,[])subs.append((priority,callback))subs.sort(key=lambdat:t[0],reverse=True)defunsubscribe(self,event_type:Type[FriTapEvent],callback:Callable)->None:"""Remove a previously registered callback."""withself._lock:subs=self._subscribers.get(event_type,[])fori,(_,cb)inenumerate(subs):ifcb==callback:subs.pop(i)breakdefemit(self,event:FriTapEvent)->None:""" Dispatch *event* to all subscribers registered for its type. Also dispatches to subscribers of the base ``FriTapEvent`` type, allowing catch-all handlers. The merged list is sorted by descending priority so higher-priority subscribers run first. """withself._lock:specific=self._subscribers.get(type(event),[])iftype(event)isnotFriTapEvent:catch_all=self._subscribers.get(FriTapEvent,[])ifnotcatch_all:# Fast path: no catch-all subscribershandlers=list(specific)else:# Merge two pre-sorted (descending priority) listshandlers=[]i,j=0,0whilei<len(specific)andj<len(catch_all):ifspecific[i][0]>=catch_all[j][0]:handlers.append(specific[i])i+=1else:handlers.append(catch_all[j])j+=1handlers.extend(specific[i:])handlers.extend(catch_all[j:])else:handlers=list(specific)for_prio,cbinhandlers:try:cb(event)exceptExceptionasexc:handler_name=getattr(cb,"__qualname__",repr(cb))self._logger.exception("Error in event subscriber %s for %s",handler_name,type(event).__name__,)# Track failure countcount=self._failure_counts.get(handler_name,0)+1self._failure_counts[handler_name]=count# Invoke optional error callbackifself._on_handler_errorisnotNone:try:self._on_handler_error(cb,event,exc)exceptException:pass# Emit ErrorEvent for handler failures (guard against recursion)ifnotisinstance(event,ErrorEvent):try:self.emit(ErrorEvent(error=f"Handler {handler_name} failed",description=str(exc),severity=ERROR_SEVERITY_ERROR,))exceptException:pass# Auto-unsubscribe after too many failuresifcount>=self.MAX_HANDLER_FAILURES:self._logger.warning("Auto-unsubscribing handler %s after %d failures",handler_name,count,)self._remove_handler(cb)def_remove_handler(self,cb:Callable)->None:"""Remove *cb* from all event-type subscriber lists."""withself._lock:forsubsinself._subscribers.values():fori,(_p,c)inenumerate(subs):ifciscb:subs.pop(i)break@propertydefhandler_failures(self)->Dict[str,int]:"""Return a copy of the handler failure counts."""returndict(self._failure_counts)defclear(self)->None:"""Remove all subscribers."""withself._lock:self._subscribers.clear()
defsubscribe(self,event_type:Type[FriTapEvent],callback:Callable,*,priority:int=0)->None:"""Register *callback* to be called whenever *event_type* is emitted. Higher *priority* values run first. The default is 0; plugins should use ``EventBus.PLUGIN_PRIORITY`` (100). """withself._lock:subs=self._subscribers.setdefault(event_type,[])subs.append((priority,callback))subs.sort(key=lambdat:t[0],reverse=True)
defunsubscribe(self,event_type:Type[FriTapEvent],callback:Callable)->None:"""Remove a previously registered callback."""withself._lock:subs=self._subscribers.get(event_type,[])fori,(_,cb)inenumerate(subs):ifcb==callback:subs.pop(i)break
Dispatch event to all subscribers registered for its type.
Also dispatches to subscribers of the base FriTapEvent type, allowing catch-all handlers. The merged list is sorted by descending priority so higher-priority subscribers run first.
defemit(self,event:FriTapEvent)->None:""" Dispatch *event* to all subscribers registered for its type. Also dispatches to subscribers of the base ``FriTapEvent`` type, allowing catch-all handlers. The merged list is sorted by descending priority so higher-priority subscribers run first. """withself._lock:specific=self._subscribers.get(type(event),[])iftype(event)isnotFriTapEvent:catch_all=self._subscribers.get(FriTapEvent,[])ifnotcatch_all:# Fast path: no catch-all subscribershandlers=list(specific)else:# Merge two pre-sorted (descending priority) listshandlers=[]i,j=0,0whilei<len(specific)andj<len(catch_all):ifspecific[i][0]>=catch_all[j][0]:handlers.append(specific[i])i+=1else:handlers.append(catch_all[j])j+=1handlers.extend(specific[i:])handlers.extend(catch_all[j:])else:handlers=list(specific)for_prio,cbinhandlers:try:cb(event)exceptExceptionasexc:handler_name=getattr(cb,"__qualname__",repr(cb))self._logger.exception("Error in event subscriber %s for %s",handler_name,type(event).__name__,)# Track failure countcount=self._failure_counts.get(handler_name,0)+1self._failure_counts[handler_name]=count# Invoke optional error callbackifself._on_handler_errorisnotNone:try:self._on_handler_error(cb,event,exc)exceptException:pass# Emit ErrorEvent for handler failures (guard against recursion)ifnotisinstance(event,ErrorEvent):try:self.emit(ErrorEvent(error=f"Handler {handler_name} failed",description=str(exc),severity=ERROR_SEVERITY_ERROR,))exceptException:pass# Auto-unsubscribe after too many failuresifcount>=self.MAX_HANDLER_FAILURES:self._logger.warning("Auto-unsubscribing handler %s after %d failures",handler_name,count,)self._remove_handler(cb)
defclear(self)->None:"""Remove all subscribers."""withself._lock:self._subscribers.clear()
The bus also exposes the class constant PLUGIN_PRIORITY = 100; plugins should pass priority=EventBus.PLUGIN_PRIORITY to subscribe() so they run before the built-in output handlers.
A standalone bus can be created, subscribed to, and emitted into without any device — useful for testing handlers:
@dataclassclassFriTapConfig:""" Top-level configuration for a friTap session. Usage: config = FriTapConfig( target="com.example.app", device=DeviceConfig(mobile=True, spawn=True), output=OutputConfig(pcap="capture.pcap", keylog="keys.log"), ) """target:strdevice:DeviceConfig=field(default_factory=DeviceConfig)output:OutputConfig=field(default_factory=OutputConfig)hooking:HookingConfig=field(default_factory=HookingConfig)protocol:str="tls"backend:str=BackendName.FRIDAdebug:bool=Falsedebug_output:bool=Falsecustom_hook_script:Optional[str]=Noneenvironment_file:Optional[str]=Noneinstall_lsass_hook:bool=Trueproxy:Optional[str]=None# "host:port" or "[ipv6]:port" formatdef__post_init__(self):ifself.debug:self.debug_output=Truedefvalidate_protocol_backend(self,protocol_handler=None)->None:"""Validate that the selected backend supports the configured protocol. Parameters ---------- protocol_handler A ProtocolHandler instance. If None, validation is skipped. Raises ------ UnsupportedProtocolBackendError When the backend support level is STUB or UNSUPPORTED. """ifself.protocol=="auto":return# auto always starts with Frida defaultifprotocol_handlerisNone:returnfrom.protocols.baseimportBackendSupportlevel=protocol_handler.get_backend_support_level(self.backend)iflevel!=BackendSupport.FULL:supported=[nameforname,lvlinprotocol_handler.supported_backends.items()iflvl==BackendSupport.FULL]raiseUnsupportedProtocolBackendError(f"Protocol '{self.protocol}' does not fully support the "f"'{self.backend}' backend (level: {level}). "f"Supported backends: {', '.join(supported)}")@classmethoddeffrom_legacy_params(cls,app:str,pcap_name:Optional[str]=None,verbose:bool=False,spawn:bool=False,keylog:bool|str=False,enable_spawn_gating:bool=False,spawn_gating_all:bool=False,enable_child_gating:bool=False,mobile:bool|str=False,live:bool=False,environment_file:Optional[str]=None,debug_mode:bool=False,full_capture:bool=False,socket_trace:bool|str=False,host:bool|str=False,offsets:Optional[str]=None,debug_output:bool=False,experimental:bool=False,anti_root:bool=False,payload_modification:bool=False,library_scan:bool=False,enable_default_fd:bool=False,patterns:Optional[str]=None,custom_hook_script:Optional[str]=None,json_output:Optional[str]=None,install_lsass_hook:bool=True,timeout:Optional[int]=None,backend:str=BackendName.FRIDA,protocol:str="tls",proxy:Optional[str]=None,filter_expression:Optional[str]=None,filter_infrastructure:bool=True,include_loopback:bool=False,force_scan_modules:Optional[List[str]]=None,quic_capture_mode:str="stream",quic_only:bool=False,no_loader_hook:bool=False,stealth_loader:bool=False,pairip_safe:bool=False,quic_egress_headers_layer:str="auto",scan_keys_region:Optional[str]=None,scan:Optional[str]=None,scan_report:str="table",scan_report_out:Optional[str]=None,scan_min_severity:str="info",scan_min_confidence:float=0.0,scan_source:Optional[str]=None,scan_category:Optional[str]=None,scan_show_pii:bool=False,scan_analyzer_path:Optional[List[str]]=None,)->"FriTapConfig":""" Build a FriTapConfig from the legacy SSL_Logger constructor parameters. Ensures full backward compatibility. """returncls(target=app,device=DeviceConfig(mobile=mobile,host=hostifhostelseNone,spawn=spawn,enable_spawn_gating=enable_spawn_gating,spawn_gating_all=spawn_gating_all,enable_child_gating=enable_child_gating,timeout=timeout,),output=OutputConfig(pcap=pcap_name,keylog=keylogifisinstance(keylog,str)else(keylogorNone),json_output=json_output,live=live,verbose=verbose,full_capture=full_capture,socket_trace=socket_trace,filter_expression=filter_expression,filter_infrastructure=filter_infrastructure,include_loopback=include_loopback,scan=scan,scan_report=scan_report,scan_report_out=scan_report_out,scan_min_severity=scan_min_severity,scan_min_confidence=scan_min_confidence,scan_source=scan_source,scan_category=scan_category,scan_show_pii=scan_show_pii,scan_analyzer_path=scan_analyzer_path,),hooking=HookingConfig(offsets=offsets,patterns=patterns,experimental=experimental,enable_default_fd=enable_default_fd,anti_root=anti_root,payload_modification=payload_modification,library_scan=library_scan,force_scan_modules=list(force_scan_modulesor[]),quic_capture_mode=quic_capture_mode,quic_only=quic_only,no_loader_hook=no_loader_hook,stealth_loader=stealth_loader,pairip_safe=pairip_safe,quic_egress_headers_layer=quic_egress_headers_layer,scan_keys_region=scan_keys_region,),protocol=protocol,backend=backend,debug=debug_mode,debug_output=debug_output,custom_hook_script=custom_hook_script,environment_file=environment_file,install_lsass_hook=install_lsass_hook,proxy=proxy,)
@dataclassclassOutputConfig:"""Configuration for output destinations and formats."""pcap:Optional[str]=Nonekeylog:Optional[str]=Nonejson_output:Optional[str]=Noneoutput_format:str="auto"live:bool=Falselive_mode:str=""# "", "wireshark", "live_pcapng"verbose:bool=Falsefull_capture:bool=Falsesocket_trace:bool|str=Falsefilter_expression:Optional[str]=None# Wireshark-like display filter# Drop frida/adb infrastructure traffic (ports 5037/5555/27042/27043) by defaultfilter_infrastructure:bool=True# Include loopback/localhost traffic (e.g. Firefox NSS IPC) — off by defaultinclude_loopback:bool=False# Live passive-analysis ("scan") of observed traffic during capture.# ``scan`` is an analyzer spec (None disables; "all" / comma-list selects).scan:Optional[str]=Nonescan_report:str="table"scan_report_out:Optional[str]=Nonescan_min_severity:str="info"scan_min_confidence:float=0.0scan_source:Optional[str]=Nonescan_category:Optional[str]=Nonescan_show_pii:bool=False# External analyzer references ("module" or "module:Class") to load for the# live scan, mirroring offline ``analyze --analyzer-path``. Repeatable.scan_analyzer_path:Optional[List[str]]=None
@dataclassclassHookingConfig:"""Configuration for hooking strategies."""offsets:Optional[str]=Nonepatterns:Optional[str]=Noneexperimental:bool=Falseenable_default_fd:bool=Falseanti_root:bool=Falsepayload_modification:bool=Falselibrary_scan:bool=False# QUIC plaintext capture boundary: "stream" (default lower-boundary# stream-level Readv hooks) or "app-api" (Boundary-4 decoded HTTP/3# headers; Chrome/Android Google QUICHE only).quic_capture_mode:str="stream"# When True, the agent installs ONLY the Google QUICHE hooks and skips every# TLS-library hook (BoringSSL, Conscrypt, NSS, …), the Java hooks, OHTTP,# and the keylog scan-result hooks. Useful when the user only wants HTTP/3# capture: attach is dramatically lighter (no multi-megabyte Memory.scanSync# passes, no Java VM safepoint sync), which also helps fritap attach to a# target that is already in the middle of active QUIC traffic.quic_only:bool=False# When True, the agent skips the inline android_dlopen_ext loader hook. This# is the hook PairIP / anti-tamper runtimes detect and SIGSEGV on during a# spawn-time integrity scan (fkie-cad/friTap#64). Only already-loaded /# explicitly-selected TLS libraries are then hooked. The agent also auto-# skips it in spawn mode when an anti-tamper library is detected.no_loader_hook:bool=False# EXPERIMENTAL (Android). Watch android_dlopen_ext via a hardware breakpoint# (ARM64 debug registers, no linker code patch) instead of the inline# trampoline, so late-loaded TLS libs can be hooked on PairIP-protected apps# without tripping the anti-tamper scan. Unvalidated on-device; default OFF.stealth_loader:bool=False# --pairip-safe (Android; attach and spawn). Minimal, scan-free capture mode# for PairIP-protected apps: hook only a curated TLS-library allowlist# (libssl.so, libhttpengine.so, libjavacrypto.so, libconscrypt*,# libcommerce_http_client.so, offset-based libwebviewchromium.so; libunity.so# is opt-in via --offsets), resolved WITHOUT any Memory.scan (exports -># symbols -> offsets). Skips the loader hook, WebView/Cronet pattern scan,# Java hooks, OHTTP and the library-scan pass — the broad footprint that trips# PairIP's periodic integrity check (an in-process SIGSEGV). Keys persist via# "blink" (hooks toggled so .text stays pristine between scans).# (fkie-cad/friTap#64). Default OFF.pairip_safe:bool=False# Override which layer of the HTTP/3 egress-headers fallback chain the# agent actually attaches to. "auto" (default) keeps the winner-takes-all# logic: quiche-internal QuicSpdyStream::WriteHeaders preferred, then# net::QuicChromiumClientStream::WriteHeaders, then# quic::QuicSpdySession::WriteHeadersOnHeadersStream as a last-resort gQUIC# fallback. Set to "chrome-shim" or "session-level" to FORCE a fallback# layer for testing — useful for validating chain behavior on builds where# the quiche-internal layer still resolves. Only effective in app-api mode.quic_egress_headers_layer:str="auto"# Generic memory-region key-scan target (--scan-keys-region). None disables# the scan. Passed through to the agent via config_batch.extensions.scan_region;# protocol-agnostic (the public scan engine and any private scan binding both# read it). See agent/shared/scan/.scan_keys_region:Optional[str]=Noneencapsulated_protocols:Dict[str,bool]=field(default_factory=lambda:{"ohttp":True})# Module names that should bypass the Cronet-split-topology suppression# check, even when friTap would otherwise treat them as covered by a# sibling library. Accepts literal names, prefixes (a value ending in '*'# is treated as a stem prefix), or regexes prefixed with "re:".force_scan_modules:List[str]=field(default_factory=list)@propertydefohttp_enabled(self)->bool:returnself.encapsulated_protocols.get("ohttp",True)def__post_init__(self)->None:env_value=os.environ.get("FRITAP_FORCE_SCAN")ifenv_value:extra=[item.strip()foriteminenv_value.split(",")ifitem.strip()]foriteminextra:ifitemnotinself.force_scan_modules:self.force_scan_modules.append(item)
HookingConfig exposes the encapsulated-protocol toggle ohttp_enabled (OHTTP is on by default within --protocol tls) and the QUIC knobs quic_capture_mode, quic_only, and quic_egress_headers_layer.
FriTapConfig.from_legacy_params(...) bridges the old flat SSL_Logger keyword arguments (app, pcap_name, keylog, mobile, patterns, …) into the structured dataclasses, so existing code can move incrementally:
A Flow is friTap's reconstructed picture of one connection: endpoints, timing, parsed request/response, a protocol layer stack, and any attached findings. Flows are produced live (via on_flow) and read back from .tap files (see Reading .tap).
@dataclassclassFlow:# Identityflow_id:str=""connection_id:str=""src_addr:str=""src_port:int=0dst_addr:str=""dst_port:int=0ssl_session_id:str=""# Transport/encryption protocol of the underlying connection: "tls"# (TLS-over-TCP, the default) or "quic" (QUIC-over-UDP). Stamped from# ``event.protocol`` at flow creation (FlowCollector._stamp_metadata). The# Flow itself stores no other transport hint, so the LayerPipeline reads# this to decide whether the layer-0 transport is a TlsLayer or QuicLayer.transport:str="tls"# Statestate:FlowState=FlowState.ACTIVEstarted:float=field(default_factory=time.time)ended:float=0.0# Parsed (from parsers module)request:Optional[ParseResult]=Noneresponse:Optional[ParseResult]=None# OHTTP inner payloads (decrypted bhttp, from NSS HPKE hooks)ohttp_inner_request:Optional[ParseResult]=Noneohttp_inner_response:Optional[ParseResult]=None# Trailing data (unconsumed bytes after valid WebSocket/protocol frames)trailing_bytes:Optional[bytes]=Nonetrailing_protocol:str=""trailing_parse:Optional[ParseResult]=None# Rawchunks:list[FlowChunk]=field(default_factory=list)# Cached byte total (incremented by FlowCollector when appending chunks)_total_bytes:int=0# Protocol detected by the parser registry, populated even when no# ``request``/``response`` is produced yet (e.g. HTTP/2 control-frame-only# flows). Used as a display fallback before showing "unknown".detected_protocol:str=""# ------------------------------------------------------------------# Schema v2 additive enrichment fields (all optional, back-compat safe)# ------------------------------------------------------------------# Explicit endpoint-role labeling (v2-reserved). ``src_*``/``dst_*`` remain# the canonical transport endpoints (used across display/collector/filters);# these name the same two ends by *role* — local (instrumented process) and# remote (peer). They are stored fields (not properties) because both the# FLOW decode path (tap_format.decode_flow) and the collector assign them,# and they are persisted as their own keys in the .tap meta. They are# populated from src/dst in exactly ONE place — FlowCollector._enrich_flow —# which is the single source of truth for the local==src / remote==dst# mapping; everything else only reads them. Kept as a v2 hook so a future# capture source that distinguishes role from src/dst order (e.g. inbound# server flows) can set them without changing readers.local_addr:str=""local_port:int=0remote_addr:str=""remote_port:int=0# Process / package identity (from the capture target / agent).process_name:str=""package_name:str=""# Ordered protocol layer stack (outermost transport -> innermost), e.g.# [tls, http2] or [quic, http3]. Per-flow LINEAR (HTTP/2-3 streams are# already separate flows). Resolved by name via __getattr__: ``flow.tls``,# ``flow.quic``, ``flow.ssh`` each return their typed layer (lazily created# empty if absent, so e.g. ``flow.tls.sni`` is never None — matching the# old TlsMetadata value-object ergonomics). Whole-layer writes go through# add_layer/set_layer; field mutation (``flow.tls.sni = ...``) works on the# cached instance returned by __getattr__.layers:list[ProtocolLayer]=field(default_factory=list,repr=False)# Hook origin. ``hook_function`` mirrors the dominant chunk.function;# ``hook_stack`` is reserved for full backtraces (NICE-later).hook_function:str=""hook_stack:str=""# Mutable analyst annotations.tags:list[str]=field(default_factory=list)notes:str=""# Attached analysis findings. Typed loosely as ``list`` to avoid importing# ``friTap.analysis`` here (it imports ``flow.models`` under TYPE_CHECKING);# elements are ``friTap.analysis.Finding`` instances.findings:list=field(default_factory=list)# ------------------------------------------------------------------# Protocol layer access (flow.<protocol> -> typed ProtocolLayer)# ------------------------------------------------------------------def__getattr__(self,name:str):# Only invoked when normal attribute lookup fails — real dataclass# fields/methods never reach here. Resolve a registered protocol name# to its layer (lazily creating an empty one so flow.tls/flow.quic are# never-None, matching the old TlsMetadata ergonomics). Anything else# raises AttributeError so getattr(flow, x, default) (filter engine,# copy/pickle dunder probing) keeps working.## This materializes-and-attaches by design: it is the writable# never-None ergonomic (``flow.tls.version = ...`` persists). PURE-READ# probes that must NOT grow the stack (serializers, snapshots) must use# the non-mutating :meth:`layer` instead of attribute access.ifname.startswith("_"):raiseAttributeError(name)layers=self.__dict__.get("layers")iflayersisNone:# Half-constructed (e.g. during copy/unpickle before fields set).raiseAttributeError(name)desc=get_registry().get(name)ifdescisNone:raiseAttributeError(name)forlyinlayers:ifly.name==name:returnlyreturnself._create_layer(name,desc)def_create_layer(self,name:str,desc)->ProtocolLayer:layer=desc.layer_cls()# Stamp the per-instance name so generic layers (AppLayer, registered# under several names against one class) report the right ``name``.# Harmless for typed layers (their ``NAME`` already equals ``name``).layer._name=namelayer.depth=len(self.layers)layer._flow=selfifdesc.data_source=="chunks":layer.data=LayerData(data_source="chunks",_owner=self)self.layers.append(layer)returnlayerdeflayer(self,name:str)->Optional[ProtocolLayer]:"""Return the layer named *name*, or None if not present."""forlyinself.layers:ifly.name==name:returnlyreturnNonedefadd_layer(self,layer:ProtocolLayer)->ProtocolLayer:"""Append *layer* to the stack, linking parent/child + binding data."""layer.depth=len(self.layers)ifself.layers:parent=self.layers[-1]parent.child=layerlayer.parent=parentlayer._flow=selfiflayer.data.data_source=="chunks"andlayer.data._ownerisNone:layer.data._owner=selfself.layers.append(layer)returnlayerdefset_layer(self,layer:ProtocolLayer)->ProtocolLayer:"""Replace an existing same-name layer in place, else append it."""fori,existinginenumerate(self.layers):ifexisting.name==layer.name:layer.depth=existing.depthlayer.parent,layer.child=existing.parent,existing.childlayer._flow=selfiflayer.data.data_source=="chunks"andlayer.data._ownerisNone:layer.data._owner=selfself.layers[i]=layerreturnlayerreturnself.add_layer(layer)def__copy__(self)->"Flow":# Shallow copy that re-lists the mutable containers (chunks, layers) so# appending to a live flow does not mutate a snapshot's lists. Element# objects are shared, matching the pre-existing snapshot semantics# (the collector already shared the TlsMetadata/FlowChunk objects).new=Flow.__new__(Flow)new.__dict__.update(self.__dict__)new.chunks=list(self.chunks)new.layers=list(self.layers)returnnew@propertydefhas_trailing_data(self)->bool:returnself.trailing_bytesisnotNone@propertydefsegments(self)->list[dict]:"""Return segment descriptors for multi-protocol rendering. Each segment is a dict with keys: type ('parsed'|'raw'), protocol, parse_result (or None), data (raw bytes for 'raw' type), source. """segs:list[dict]=[]ifself.requestisnotNone:segs.append({"type":"parsed","protocol":self.display_protocol,"parse_result":self.request,"source":"primary",})ifself.trailing_parseisnotNone:segs.append({"type":"parsed","protocol":self.trailing_protocol,"parse_result":self.trailing_parse,"source":"trailing",})elifself.trailing_bytes:segs.append({"type":"raw","protocol":self.trailing_protocolor"unknown","data":self.trailing_bytes,"source":"trailing",})returnsegs@propertydefduration(self)->float:ifself.ended>0:returnself.ended-self.startedreturntime.time()-self.started@propertydefdisplay_method(self)->str:return_display.display_method_layered(self)@propertydefdisplay_host(self)->str:return_display.display_host(self.request,self.dst_addr,self.dst_port)@propertydefdisplay_status(self)->str:return_display.display_status(self.response)@propertydefdisplay_size(self)->str:total_bytes=self._total_bytesiftotal_bytes==0andself.chunks:total_bytes=sum(len(c.data)forcinself.chunks)return_display.display_size(self.response,total_bytes)@propertydefdisplay_protocol(self)->str:return_display.display_protocol(self)@propertydefhas_request_data(self)->bool:ifself.requestisnotNone:returnTruereturnany(c.direction=="write"forcinself.chunks)@propertydefhas_response_data(self)->bool:ifself.responseisnotNone:returnTruereturnany(c.direction=="read"forcinself.chunks)defget_direction_bytes(self,direction:str,max_bytes:int=0)->bytes:"""Get concatenated raw bytes for a given direction ('read' or 'write'). If *max_bytes* > 0, stop accumulating after that many bytes. """ifmax_bytes<=0:returnb"".join(c.dataforcinself.chunksifc.direction==direction)parts:list[bytes]=[]total=0forcinself.chunks:ifc.direction!=direction:continueparts.append(c.data)total+=len(c.data)iftotal>=max_bytes:breakreturnb"".join(parts)@propertydefdisplay_source(self)->str:return_display.display_source(self.src_addr,self.src_port)@propertydefdisplay_connection(self)->str:"""Directional connection string: src -> dst, src <- dst, or src <-> dst."""return_display.display_connection(self.requestifself.has_request_dataelseNone,self.responseifself.has_response_dataelseNone,self.src_addr,self.src_port,self.dst_addr,self.dst_port,)defto_dict(self,include_bodies:bool=False)->dict:"""Return a JSON-safe dict view of this flow for API/web consumers. Covers identity, transport/timing, parsed request/response (via :meth:`ParseResult.to_dict`), the protocol layer names, analyst annotations and attached findings. Raw chunk bytes are never included; request/response bodies are included (hex-encoded) only when *include_bodies* is True. Uses the non-mutating :meth:`layer` lookup so serializing never grows the layer stack. """tls=self.layer("tls")total_bytes=self._total_bytesorsum(len(c.data)forcinself.chunks)return{"flow_id":self.flow_id,"connection_id":self.connection_id,"src_addr":self.src_addr,"src_port":self.src_port,"dst_addr":self.dst_addr,"dst_port":self.dst_port,"ssl_session_id":self.ssl_session_id,"transport":self.transport,"state":self.state.valueifisinstance(self.state,FlowState)elsestr(self.state),"started":self.started,"ended":self.ended,# Deterministic: avoid the wall-clock `duration` property for an# in-progress flow so the serialized snapshot is stable across calls."duration":(self.ended-self.started)ifself.ended>0else0.0,"protocol":self.display_protocol,"detected_protocol":self.detected_protocol,"total_bytes":total_bytes,"process_name":self.process_name,"package_name":self.package_name,"tls_sni":tls.sniiftlsisnotNoneelse"","tls_alpn":tls.alpniftlsisnotNoneelse"","layers":[ly.nameforlyinself.layers],"tags":list(self.tags),"notes":self.notes,"request":self.request.to_dict(include_body=include_bodies)ifself.requestelseNone,"response":self.response.to_dict(include_body=include_bodies)ifself.responseelseNone,"findings":[f.to_dict()forfinself.findings],}# ------------------------------------------------------------------# On-demand body reconstruction from raw chunks# ------------------------------------------------------------------# Cache for reconstructed bodies: {direction: bytes}_body_cache:dict=field(default_factory=dict,repr=False)defreconstruct_body(self,direction:str)->bytes:"""Reconstruct the body for *direction* ('read' or 'write') from raw chunks. Bodies are no longer accumulated in parsers — this method extracts them on demand from ``self.chunks``. The result is cached so repeated calls are cheap. """ifdirectioninself._body_cache:returnself._body_cache[direction]proto=self.display_protocol.lower()if"http/2"inproto:body=self._extract_h2_body(direction)elif"http/3"inproto:body=self._extract_h3_body(direction)elif"http/1"inproto:body=self._extract_h1_body(direction)elif"websocket"inproto:body=self._extract_ws_body(direction)else:body=b""self._body_cache[direction]=bodyreturnbodydefinvalidate_body_cache(self)->None:"""Clear cached body data (call when chunks change)."""self._body_cache.clear()def_extract_h1_body(self,direction:str)->bytes:"""Extract HTTP/1 body by re-parsing raw bytes with h11."""try:importh11exceptImportError:returnb""raw=self.get_direction_bytes(direction)ifnotraw:returnb""conn=h11.Connection(h11.CLIENTifdirection=="read"elseh11.SERVER)conn.receive_data(raw)body_parts:list[bytes]=[]whileTrue:event=conn.next_event()ifeventish11.NEED_DATAoreventish11.PAUSED:breakifisinstance(event,h11.Data):body_parts.append(bytes(event.data))elifisinstance(event,h11.EndOfMessage):breakreturnb"".join(body_parts)def_extract_h2_body(self,direction:str)->bytes:"""Extract HTTP/2 DATA frame payloads from raw chunks. Stateless scan — no HPACK needed for body-only extraction. """importstructstream_id=0msg=self.requestifdirection=="write"elseself.responseifmsgisnotNone:stream_id=msg.stream_idbody_parts:list[bytes]=[]buf=bytearray()forchunkinself.chunks:ifchunk.direction!=direction:continuebuf.extend(chunk.data)offset=0whileoffset+9<=len(buf):length=(buf[offset]<<16)|(buf[offset+1]<<8)|buf[offset+2]frame_type=buf[offset+3]flags=buf[offset+4]sid=struct.unpack_from("!I",buf,offset+5)[0]&0x7FFFFFFFoffset+=9ifoffset+length>len(buf):breakifframe_type==0x00and(stream_id==0orsid==stream_id):payload=buf[offset:offset+length]# Strip padding if PADDED flag (0x08) is setifflags&0x08andlen(payload)>=1:pad_len=payload[0]end=len(payload)-pad_lenpayload=payload[1:end]ifend>1elseb""body_parts.append(bytes(payload))offset+=lengthreturnb"".join(body_parts)def_extract_h3_body(self,direction:str)->bytes:"""Extract HTTP/3 DATA frame payloads using varint framing."""fromfriTap.parsers.varintimportdecode_varintbody_parts:list[bytes]=[]buf=bytearray()forchunkinself.chunks:ifchunk.direction!=direction:continuebuf.extend(chunk.data)offset=0whileoffset<len(buf):try:frame_type,type_len=decode_varint(buf,offset)frame_length,len_len=decode_varint(buf,offset+type_len)except(ValueError,IndexError):breakheader_size=type_len+len_lenpayload_start=offset+header_sizepayload_end=payload_start+frame_lengthifpayload_end>len(buf):breakifframe_type==0x00:# DATA framebody_parts.append(bytes(buf[payload_start:payload_end]))offset=payload_endreturnb"".join(body_parts)def_extract_ws_body(self,direction:str)->bytes:"""Extract WebSocket payload — body is already in ParseResult for WS."""msg=self.requestifdirection=="write"elseself.responseifmsgisnotNoneandmsg.body:returnmsg.bodyreturnb""# ------------------------------------------------------------------# Header / body / content-type access# ------------------------------------------------------------------@staticmethoddef_get_header(msg:"Optional[ParseResult]",name:str,default:str="")->str:ifmsgisNone:returndefaultlower_name=name.lower()forkey,valueinmsg.headers.items():ifkey.lower()==lower_name:returnvaluereturndefaultdef_get_decompressed_body(self,msg:"Optional[ParseResult]",direction:str,encoding_override:str="")->bytes:ifmsgisNone:returnb""body=msg.bodyifmsg.bodyelseself.reconstruct_body(direction)ifnotbody:returnb""encoding=encoding_overrideormsg.content_encodingifnotencoding:lower="content-encoding"forkey,valueinmsg.headers.items():ifkey.lower()==lower:encoding=valuebreakifnotencoding:returnbodyfromfriTap.parsers.decompressimportdecompress_bodydata,_err=decompress_body(body,encoding)returndatadefget_request_header(self,name:str,default:str="")->str:"""Get a request header value by name (case-insensitive)."""returnself._get_header(self.request,name,default)defget_response_header(self,name:str,default:str="")->str:"""Get a response header value by name (case-insensitive)."""returnself._get_header(self.response,name,default)@propertydefrequest_body(self)->bytes:"""The request body (possibly compressed), or ``b""``."""ifself.requestandself.request.body:returnself.request.bodyreturnself.reconstruct_body("write")@propertydefresponse_body(self)->bytes:"""The response body (possibly compressed), or ``b""``."""ifself.responseandself.response.body:returnself.response.bodyreturnself.reconstruct_body("read")defget_decompressed_request_body(self)->bytes:"""Return the request body after decompressing (gzip/br/zstd/deflate)."""returnself._get_decompressed_body(self.request,"write")defget_decompressed_response_body(self)->bytes:"""Return the response body after decompressing (gzip/br/zstd/deflate)."""returnself._get_decompressed_body(self.response,"read")@propertydefresponse_content_type(self)->str:"""The response Content-Type, or ``""`` if unavailable."""ifself.responseisnotNoneandself.response.content_type:returnself.response.content_typereturnself.get_response_header("content-type")@propertydefrequest_content_type(self)->str:"""The request Content-Type, or ``""`` if unavailable."""ifself.requestisnotNoneandself.request.content_type:returnself.request.content_typereturnself.get_request_header("content-type")defdecode_request_protobuf(self,schema_path:Optional[str]=None,force:bool=False)->Optional[list]:"""Decode the request body as protobuf. Returns a dict of ``{field_number: [ProtoField, ...]}`` or ``None`` if the body is not protobuf or decoding fails. Args: schema_path: Optional path to a compiled ``.desc`` file. force: If ``True``, attempt decoding regardless of content type. """returnself._decode_protobuf(self.get_decompressed_request_body(),self.request_content_type,schema_path=schema_path,force=force,)defdecode_response_protobuf(self,schema_path:Optional[str]=None,force:bool=False)->Optional[list]:"""Decode the response body as protobuf. Returns a dict of ``{field_number: [ProtoField, ...]}`` or ``None`` if the body is not protobuf or decoding fails. Args: schema_path: Optional path to a compiled ``.desc`` file. force: If ``True``, attempt decoding regardless of content type. """returnself._decode_protobuf(self.get_decompressed_response_body(),self.response_content_type,schema_path=schema_path,force=force,)def_decode_protobuf(self,body:bytes,content_type:str,schema_path:Optional[str]=None,force:bool=False,)->Optional[list]:"""Internal: decode protobuf body with auto gRPC framing detection."""ifnotbody:returnNonefromfriTap.parsers.protobufimport(decode_raw,extract_grpc_messages,is_grpc_content_type,is_likely_protobuf,is_protobuf_content_type,)# Check if we should attempt decodingis_proto_ct=is_grpc_content_type(content_type)oris_protobuf_content_type(content_type)ifnotforceandnotis_proto_ctandnotis_likely_protobuf(body):returnNonetry:payloads=extract_grpc_messages(body,content_type)results=[]forpayloadinpayloads:ifnotpayload:continuemsg=decode_raw(payload)results.append(msg)returnresultsifresultselseNoneexceptException:returnNone
Return a JSON-safe dict view of this flow for API/web consumers.
Covers identity, transport/timing, parsed request/response (via :meth:ParseResult.to_dict), the protocol layer names, analyst annotations and attached findings. Raw chunk bytes are never included; request/response bodies are included (hex-encoded) only when include_bodies is True. Uses the non-mutating :meth:layer lookup so serializing never grows the layer stack.
defto_dict(self,include_bodies:bool=False)->dict:"""Return a JSON-safe dict view of this flow for API/web consumers. Covers identity, transport/timing, parsed request/response (via :meth:`ParseResult.to_dict`), the protocol layer names, analyst annotations and attached findings. Raw chunk bytes are never included; request/response bodies are included (hex-encoded) only when *include_bodies* is True. Uses the non-mutating :meth:`layer` lookup so serializing never grows the layer stack. """tls=self.layer("tls")total_bytes=self._total_bytesorsum(len(c.data)forcinself.chunks)return{"flow_id":self.flow_id,"connection_id":self.connection_id,"src_addr":self.src_addr,"src_port":self.src_port,"dst_addr":self.dst_addr,"dst_port":self.dst_port,"ssl_session_id":self.ssl_session_id,"transport":self.transport,"state":self.state.valueifisinstance(self.state,FlowState)elsestr(self.state),"started":self.started,"ended":self.ended,# Deterministic: avoid the wall-clock `duration` property for an# in-progress flow so the serialized snapshot is stable across calls."duration":(self.ended-self.started)ifself.ended>0else0.0,"protocol":self.display_protocol,"detected_protocol":self.detected_protocol,"total_bytes":total_bytes,"process_name":self.process_name,"package_name":self.package_name,"tls_sni":tls.sniiftlsisnotNoneelse"","tls_alpn":tls.alpniftlsisnotNoneelse"","layers":[ly.nameforlyinself.layers],"tags":list(self.tags),"notes":self.notes,"request":self.request.to_dict(include_body=include_bodies)ifself.requestelseNone,"response":self.response.to_dict(include_body=include_bodies)ifself.responseelseNone,"findings":[f.to_dict()forfinself.findings],}
deflayer(self,name:str)->Optional[ProtocolLayer]:"""Return the layer named *name*, or None if not present."""forlyinself.layers:ifly.name==name:returnlyreturnNone
defreconstruct_body(self,direction:str)->bytes:"""Reconstruct the body for *direction* ('read' or 'write') from raw chunks. Bodies are no longer accumulated in parsers — this method extracts them on demand from ``self.chunks``. The result is cached so repeated calls are cheap. """ifdirectioninself._body_cache:returnself._body_cache[direction]proto=self.display_protocol.lower()if"http/2"inproto:body=self._extract_h2_body(direction)elif"http/3"inproto:body=self._extract_h3_body(direction)elif"http/1"inproto:body=self._extract_h1_body(direction)elif"websocket"inproto:body=self._extract_ws_body(direction)else:body=b""self._body_cache[direction]=bodyreturnbody
Flow.to_dict(include_bodies=False) returns a JSON-safe view: identity, transport/timing, parsed request/response, layer names, tags/notes and findings. Raw chunk bytes are never included; request/response bodies are included (hex-encoded) only when include_bodies=True.
Each flow carries a stack of protocol layers. Access them two ways:
Attribute access — flow.tls, flow.quic, flow.ssh lazily materialize a typed layer if that protocol is registered (raises AttributeError for an unregistered name). Typed layers expose fields such as flow.tls.sni and flow.tls.alpn.
flow.layer(name) — a non-mutating lookup returning the existing layer or None. Use this in serialization paths so reading never grows the stack: flow.layer("http2"), then inspect .parsed / .data.
sni=flow.tls.sni# typed accessorhttp2=flow.layer("http2")# None if absentifhttp2isnotNone:parsed=http2.parsed
Bodies are not accumulated in the parser's ParseResult anymore — to keep .tap files small, identical/large bodies are reconstructed on demand from the flow's raw chunks. So a ParseResult (flow.request / flow.response) can have an empty .body even when data was captured. To get the bytes, reconstruct from the flow:
FlowSummary is the lightweight index entry used when listing flows without reading their full bodies (returned by read_flow_summaries() / get_summaries()).
Lightweight snapshot of a Flow for list display and filtering.
Contains only scalar metadata (~200 bytes) — no chunks, no body bytes. Compatible with the filter engine (has request/response stubs with the same attribute names as ParseResult).
@dataclass(frozen=True,slots=True)classFlowSummary:"""Lightweight snapshot of a Flow for list display and filtering. Contains only scalar metadata (~200 bytes) — no chunks, no body bytes. Compatible with the filter engine (has ``request``/``response`` stubs with the same attribute names as ParseResult). """flow_id:str=""connection_id:str=""src_addr:str=""src_port:int=0dst_addr:str=""dst_port:int=0ssl_session_id:str=""state:FlowState=FlowState.ACTIVEstarted:float=0.0ended:float=0.0# Transport/encryption protocol of the underlying connection ("tls", "quic",# "signal", ...). Mirrors Flow.transport so consumers can group/filter from# the cheap summary without materializing the full Flow.transport:str="tls"request:Optional[_ParseStub]=Noneresponse:Optional[_ParseStub]=Nonehas_ohttp:bool=Falsehas_trailing_data:bool=Falsetrailing_protocol:str=""total_bytes:int=0detected_protocol:str=""# Schema v2 additive enrichment scalars (cheap to surface in list/filter views)process_name:str=""tls_sni:str=""tls_alpn:str=""tag_count:int=0finding_count:int=0has_notes:bool=False# Layered protocol-display scalars (so the layered display works without a# live layer stack). See friTap.flow.display.display_protocol_layered.outer_app_protocol:str=""inner_e2e_protocol:str=""inner_summary:str=""# Primary TL operation derived from the flow's message-bearing layers (e.g.# "sendMessage", "users", "updates"). Surfaced in the Method column without# loading full messages. See friTap.flow.display.method_from_messages.flow_method:str=""@propertydefduration(self)->float:ifself.ended>0:returnself.ended-self.startedreturntime.time()-self.started@propertydefdisplay_protocol(self)->str:return_display.display_protocol(self)@propertydefdisplay_protocol_layered(self)->str:return_display.display_protocol_layered(self)@propertydefdisplay_method(self)->str:return_display.display_method_layered(self)@propertydefdisplay_host(self)->str:return_display.display_host(self.request,self.dst_addr,self.dst_port)@propertydefdisplay_status(self)->str:return_display.display_status(self.response)@propertydefdisplay_size(self)->str:return_display.display_size(self.response,self.total_bytes)@propertydefdisplay_source(self)->str:return_display.display_source(self.src_addr,self.src_port)@propertydefdisplay_connection(self)->str:return_display.display_connection(self.request,self.response,self.src_addr,self.src_port,self.dst_addr,self.dst_port,)# Aliases for filter engine compatibility@propertydef_total_bytes(self)->int:returnself.total_bytes@propertydefohttp_inner_request(self):# Filter engine checks `is not None` — return a truthy sentinelreturn_OHTTP_SENTINELifself.has_ohttpelseNone@propertydefohttp_inner_response(self):return_OHTTP_SENTINELifself.has_ohttpelseNone@staticmethoddeffrom_flow(flow:"Flow")->"FlowSummary":"""Create a summary snapshot from a full Flow."""req_stub=Noneifflow.requestisnotNone:r=flow.requestreq_stub=_ParseStub(protocol=r.protocol,method=r.method,url=r.url,host=r.host,status_code=r.status_code,status_text=r.status_text,content_type=r.content_type,content_encoding=r.content_encoding,body_size=r.body_size,stream_id=r.stream_id,is_control_frame=r.is_control_frame,)resp_stub=Noneifflow.responseisnotNone:r=flow.responseresp_stub=_ParseStub(protocol=r.protocol,method=r.method,url=r.url,host=r.host,status_code=r.status_code,status_text=r.status_text,content_type=r.content_type,content_encoding=r.content_encoding,body_size=r.body_size,stream_id=r.stream_id,is_control_frame=r.is_control_frame,)# Non-mutating lookup: a summary is a pure read, so use layer() (None if# absent) rather than flow.tls, which would attach an empty layer.tls_layer=flow.layer("tls")# Prefer a scalar already set on the flow (e.g. a synthetic, layerless# flow rebuilt from a .tap summary in the TUI flow list), since# layered_scalars_from_flow returns "" for a flow with no layer stack.# Live flows have layers and no preset scalars, so they fall through to# the computed values and keep working unchanged.s_outer,s_inner,s_summary=_display.layered_scalars_from_flow(flow)outer_app=getattr(flow,"outer_app_protocol","")ors_outerinner_e2e=getattr(flow,"inner_e2e_protocol","")ors_innerinner_summary=getattr(flow,"inner_summary","")ors_summary# Primary TL operation from the message-bearing layers; falls back to a# preset scalar (synthetic flows rebuilt from a .tap have no live layers).flow_method=getattr(flow,"flow_method","")or_display.method_from_messages(flow)returnFlowSummary(flow_id=flow.flow_id,connection_id=flow.connection_id,src_addr=flow.src_addr,src_port=flow.src_port,dst_addr=flow.dst_addr,dst_port=flow.dst_port,ssl_session_id=flow.ssl_session_id,state=flow.state,started=flow.started,ended=flow.ended,transport=getattr(flow,"transport","tls")or"tls",request=req_stub,response=resp_stub,has_ohttp=(flow.ohttp_inner_requestisnotNoneorflow.ohttp_inner_responseisnotNone),has_trailing_data=flow.trailing_bytesisnotNone,trailing_protocol=flow.trailing_protocol,total_bytes=flow._total_bytes,detected_protocol=flow.detected_protocol,process_name=flow.process_name,tls_sni=(tls_layer.sniiftls_layerisnotNoneelse""),tls_alpn=(tls_layer.alpniftls_layerisnotNoneelse""),tag_count=len(flow.tags),finding_count=len(flow.findings),has_notes=bool(flow.notes),outer_app_protocol=outer_app,inner_e2e_protocol=inner_e2e,inner_summary=inner_summary,flow_method=flow_method,)defto_dict(self)->dict:"""Return a JSON-safe, body-free dict for a high-level flow overview. Emits the canonical FlowSummary key set shared with :meth:`friTap.flow.tap_format.FlowSummary.to_dict` so that live summaries (built via :meth:`from_flow`) and offline summaries (read from a .tap) render identically in a web UI / TUI / CLI overview. """req,resp=self.request,self.responsereturn{"flow_id":self.flow_id,"connection_id":self.connection_id,"src_addr":self.src_addr,"src_port":self.src_port,"dst_addr":self.dst_addr,"dst_port":self.dst_port,"ssl_session_id":self.ssl_session_id,"state":self.state.valueifisinstance(self.state,FlowState)elsestr(self.state),"started":self.started,"ended":self.ended,"transport":self.transport,# Deterministic and parity-matched with tap_format.FlowSummary.to_dict:# the `duration` property returns wall-clock `time.time() - started`# for in-progress flows (ended == 0), which is non-deterministic and# would diverge from the offline summary shape. A serialized snapshot# uses 0.0 until the flow completes."duration":(self.ended-self.started)ifself.ended>0else0.0,"protocol":(self.detected_protocolor(req.protocolifreqelse"")or(resp.protocolifrespelse"")or"unknown"),"method":req.methodifreqelse"","url":req.urlifreqelse"","host":req.hostifreqelse"","status_code":resp.status_codeifrespelse0,"total_bytes":self.total_bytes,"detected_protocol":self.detected_protocol,"process_name":self.process_name,"tls_sni":self.tls_sni,"tls_alpn":self.tls_alpn,"tag_count":self.tag_count,"finding_count":self.finding_count,"has_notes":self.has_notes,"outer_app_protocol":self.outer_app_protocol,"inner_e2e_protocol":self.inner_e2e_protocol,"inner_summary":self.inner_summary,"flow_method":self.flow_method,}
Reconstruct a .tap file from an existing packet capture plus its key material — no device, no live capture.
Requires Wireshark / tshark ≥ 4.x
pcap_to_tap() (and the --from-pcap CLI) shell out to tshark for dissection. If tshark is not on PATH, set tshark_path= (or the $FRITAP_TSHARK environment variable). The example below is offline but needs tshark installed.
fromfriTapimportpcap_to_tapresult=pcap_to_tap("chrome.pcap",keylog_path="chromekeys.log",# SSLKEYLOGFILE; raises NoDecryptionKeysError if given but unusabletap_path="out.tap",run_scan=True,# also run analyzers and embed findings)print(result.to_dict())
Import path
pcap_to_tap is importable from the package root (from friTap import pcap_to_tap), but not from friTap.offline — re-exporting it there would shadow the friTap.offline.pcap_to_tapmodule. The no-manifest core is friTap.convert_pcap_to_tap(...).
pcap_to_tap() reads a manifest sidecar <pcap>.fritap.json (keys keylog/tls_ports/quic_ports) when use_manifest=True; explicit arguments win over the manifest, which wins over defaults. It returns a ConvertResult:
@dataclassclassConvertResult:"""Summary of a pcap-to-tap conversion."""tap_path:strflow_count:int=0decrypted_packet_count:int=0stream_count:int=0# QUIC per-packet drops (e.g. misaligned stream id/payload lists). Kept# distinct from dropped TLS streams below so the two are not conflated.dropped_packet_count:int=0# TLS streams that could not be followed/decoded (whole-stream drops).dropped_stream_count:int=0findings_count:int=0# Streams skipped during a keyless (plaintext) conversion because they were# encrypted (TLS/QUIC) and therefore need keys. Drives the "looks encrypted —# pass --keylog" hint in the offline CLI.encrypted_streams_skipped:int=0# MTProto (Telegram) offline-decryption counters (populated only when# --mtproto-keylog is supplied; the decryptor is friTap's own, not tshark).mtproto_messages:int=0mtproto_streams:int=0mtproto_records_undecryptable:int=0mtproto_streams_degraded:int=0# Protocol-generic counters keyed by counter_prefix, e.g.# ``{"mtproto": {"messages": 6, "streams": 2, "undecryptable": 0, "degraded": 0}}``.# The named ``mtproto_*`` fields above are kept as back-compat accessors and# stay mirrored; any OTHER registry-driven protocol (built-in or plugin) needs# only this dict (no new dataclass field). Populated via :meth:`record_protocol`.per_protocol:dict=field(default_factory=dict)defrecord_protocol(self,prefix:str,*,messages:int=0,streams:int=0,undecryptable:int=0,degraded:int=0,)->None:"""Accumulate one protocol decryptor's counters (generic + back-compat). Writes the protocol-generic ``per_protocol[prefix]`` view AND, when a matching legacy named field exists (``<prefix>_messages`` etc.), mirrors the increment into it so existing readers of ``result.mtproto_messages`` / ``result.mtproto_streams`` keep working unchanged. """bucket=self.per_protocol.setdefault(prefix,{"messages":0,"streams":0,"undecryptable":0,"degraded":0},)bucket["messages"]+=messagesbucket["streams"]+=streamsbucket["undecryptable"]+=undecryptablebucket["degraded"]+=degradedforlegacy_suffix,valuein(("messages",messages),("streams",streams),("records_undecryptable",undecryptable),("streams_degraded",degraded),):attr=f"{prefix}_{legacy_suffix}"ifhasattr(self,attr):setattr(self,attr,getattr(self,attr)+value)defto_dict(self)->dict:"""Return a JSON-safe dict view of this conversion summary. Lets web/API callers serialize the result of a pcap-to-tap conversion without reaching into the dataclass fields by hand. """fromdataclassesimportasdictreturnasdict(self)
defto_dict(self)->dict:"""Return a JSON-safe dict view of this conversion summary. Lets web/API callers serialize the result of a pcap-to-tap conversion without reaching into the dataclass fields by hand. """fromdataclassesimportasdictreturnasdict(self)
Key fields: tap_path, flow_count, decrypted_packet_count, stream_count, dropped_packet_count, dropped_stream_count, findings_count, and encrypted_streams_skipped (streams that could not be decrypted and were tallied rather than emitted).
Caveats
Keyless 1-RTT-only QUIC captures are undetectable without keys. Encrypted streams that cannot be decrypted are skipped and counted in encrypted_streams_skipped. SSH banners and HTTP/2 control frames become synthetic, metadata-only flows. NoDecryptionKeysError is raised when keylog_path is given but no usable keys (and no embedded DSB) are found.
classTapReader:"""Indexed reader for .tap capture files. Usage:: reader = TapReader("capture.tap") meta = reader.open() summaries = reader.read_flow_summaries() # fast, metadata only flow = reader.read_flow("10.0.0.1:443-...:0") # full load on demand reader.close() """def__init__(self,path:str)->None:self._path=str(Path(path).resolve())self._file=Noneself._header:Optional[TapHeader]=Noneself._meta:Optional[TapMeta]=Noneself._data_start:int=0# byte offset where records begin# Index: flow_id -> file_offset of the FLOW record envelopeself._flow_offsets:dict[str,int]={}# Lazily-built findings index: flow_id -> list[Finding]. None = not built.self._findings_index:Optional[dict[str,list]]=None# Finding-presence flag determined cheaply at open() time.# None = undetermined (footer-index path does not walk records, so we# cannot know without a scan — fall back to lazy full scan)# False = open-time scan walked every record and saw NO REC_FINDING# (read_findings short-circuits to [] with no second scan)# True = at least one REC_FINDING was seen; offsets captured belowself._saw_finding_record:Optional[bool]=None# Byte offsets of REC_FINDING record envelopes captured during the# open-time linear scan, so read_findings can seek directly to them# instead of re-scanning the whole file.self._finding_offsets:list[int]=[]self._opened:bool=False@propertydefpath(self)->str:returnself._path@propertydefheader(self)->Optional[TapHeader]:returnself._header@propertydefmeta(self)->Optional[TapMeta]:returnself._meta@propertydefflow_count(self)->int:returnlen(self._flow_offsets)defopen(self)->TapMeta:"""Open the .tap file, parse the header, and build the flow index. Returns the file-level TapMeta. """self._file=open(self._path,"rb")self._opened=Truetry:# Read headerheader_raw=self._file.read(_HEADER_STRUCT.size+4096)# read extra for extself._header,self._data_start=decode_header(header_raw)# Try to read META record (should be the first record)self._file.seek(self._data_start)self._meta=self._try_read_meta()# Build flow indexifself._header.flags&FLAG_HAS_INDEX:self._build_index_from_footer()else:logger.info("No index in .tap file, performing linear scan")self._build_index_linear_scan()exceptException:self.close()raiselogger.info("TapReader opened: %s (%d flows)",self._path,len(self._flow_offsets),)returnself._metaorTapMeta()defread_flow_summaries(self)->list[FlowSummary]:"""Read lightweight flow metadata for all flows (no chunks/bodies). Returns summaries sorted by start timestamp. """self._ensure_open()summaries=[]forflow_id,offsetinself._flow_offsets.items():try:payload=self._read_record_payload_at(offset)ifpayloadisnotNone:summary=decode_flow_summary(payload,file_offset=offset)summaries.append(summary)exceptException:logger.debug("Failed to read flow summary at offset %d",offset,exc_info=True)# Enrich finding_count from the findings index: findings are stored in# separate REC_FINDING records, not in FLOW meta, so decode_flow_summary# cannot know them and leaves the count at 0. has_findings() is a cheap# no-op short-circuit for findings-free captures (the common case); only# captures that actually carry findings pay for the (cached) index build.ifself.has_findings():forsummaryinsummaries:summary.finding_count=len(self.read_findings(summary.flow_id))summaries.sort(key=lambdas:s.started)returnsummariesdefread_flow(self,flow_id:str)->Optional["Flow"]:"""Read a full Flow object by flow_id (on-demand, with chunks/bodies). Returns None if the flow_id is not in the index. """self._ensure_open()offset=self._flow_offsets.get(flow_id)ifoffsetisNone:returnNonetry:payload=self._read_record_payload_at(offset)ifpayloadisNone:returnNoneflow=decode_flow(payload)exceptException:logger.error("Failed to read flow %s at offset %d",flow_id,offset,exc_info=True)returnNone# Findings are best-effort enrichment: a corrupt/undecodable findings# index must NEVER suppress an already-decoded valid flow. Isolate it.try:findings=self.read_findings(flow_id)iffindings:flow.findings=list(findings)exceptException:logger.warning("Failed to read findings for flow %s; returning flow without findings",flow_id,exc_info=True,)returnflowdefread_all_flows(self)->list["Flow"]:"""Read all flows fully (with chunks and bodies). For large captures, prefer read_flow_summaries() + read_flow() on demand. """self._ensure_open()flows=[]forflow_id,offsetinself._flow_offsets.items():try:payload=self._read_record_payload_at(offset)ifpayloadisNone:continueflow=decode_flow(payload)exceptException:logger.debug("Failed to read flow at offset %d",offset,exc_info=True)continue# Findings are best-effort enrichment: a corrupt/undecodable# findings index must NEVER drop an already-decoded valid flow.try:findings=self.read_findings(flow.flow_id)iffindings:flow.findings=list(findings)exceptException:logger.warning("Failed to read findings for flow %s; flow kept without findings",flow.flow_id,exc_info=True,)flows.append(flow)flows.sort(key=lambdaf:f.started)returnflowsdefread_findings(self,flow_id:str)->list:"""Return analysis findings persisted for *flow_id* (REC_FINDING records). The findings index is built lazily on first call (one linear pass over the file) and cached. Returns an empty list if the file carries none. """self._ensure_open()ifself._findings_indexisNone:self._build_findings_index()returnself._findings_index.get(flow_id,[])defhas_findings(self)->bool:"""True if the file contains any persisted findings."""self._ensure_open()ifself._findings_indexisNone:self._build_findings_index()returnbool(self._findings_index)defclose(self)->None:"""Close the file. Safe to call multiple times."""ifself._fileisnotNone:try:self._file.close()exceptException:passself._file=Noneself._opened=Falseself._flow_offsets.clear()self._findings_index=Noneself._saw_finding_record=Noneself._finding_offsets=[]def__enter__(self)->"TapReader":self.open()returnselfdef__exit__(self,*args)->None:self.close()# ------------------------------------------------------------------# Internal: index building# ------------------------------------------------------------------def_build_index_from_footer(self)->None:"""Read the FLOW_INDEX from the footer pointer."""assertself._fileisnotNone# Findings presence is recorded in the header flags at close() time, so# we can decide cheaply (no record walk) whether the findings scan is# worth running. A closed/indexed file (FLAG_HAS_INDEX, this path) that# lacks FLAG_HAS_FINDINGS provably contains no REC_FINDING records.ifself._headerisnotNoneandself._header.flags&FLAG_HAS_FINDINGS:# Findings exist; offsets unknown on this path, so _build_findings_index# will scan once (lazily, only if findings are actually requested).self._saw_finding_record=Trueelse:self._saw_finding_record=False# Read footer (last 16 bytes)self._file.seek(0,2)# seek to endfile_size=self._file.tell()iffile_size<_HEADER_STRUCT.size+_FOOTER_STRUCT.size:logger.warning("File too small for footer, falling back to linear scan")self._build_index_linear_scan()returnself._file.seek(file_size-_FOOTER_STRUCT.size)footer_raw=self._file.read(_FOOTER_STRUCT.size)try:footer_magic,index_offset=_FOOTER_STRUCT.unpack(footer_raw)exceptException:logger.warning("Footer unpack failed, falling back to linear scan")self._build_index_linear_scan()returniffooter_magic!=FOOTER_MAGIC:logger.warning("Footer magic mismatch, falling back to linear scan")self._build_index_linear_scan()return# Read FLOW_INDEX record at index_offsetpayload=self._read_record_payload_at(index_offset)ifpayloadisNone:logger.warning("Failed to read FLOW_INDEX, falling back to linear scan")self._build_index_linear_scan()returnentries=decode_flow_index(payload)forentryinentries:self._flow_offsets[entry["flow_id"]]=entry["offset"]def_build_index_linear_scan(self)->None:"""Scan all records sequentially to build the flow index. Used when the FLOW_INDEX is missing (partial capture / crash). Keeps only the last FLOW record per flow_id. """assertself._fileisnotNone# This pass already visits every record, so capture finding presence# and offsets here for free. After this method completes, the flag is# authoritative (False if no REC_FINDING was seen), letting# _build_findings_index short-circuit instead of scanning a second time.self._saw_finding_record=Falseself._finding_offsets=[]foroffset,rec_type,payload,stored_crcinself._iter_records(self._data_start):# Record finding presence/offset cheaply (payload is decoded lazily# on first read_findings, not here).ifrec_type==REC_FINDING:self._saw_finding_record=Trueself._finding_offsets.append(offset)continueifrec_type==REC_FLOW:ifverify_payload_crc(payload,stored_crc):try:summary=decode_flow_summary(payload)self._flow_offsets[summary.flow_id]=offsetexceptException:logger.debug("Skipping corrupt FLOW at offset %d",offset)else:logger.debug("CRC mismatch for FLOW at offset %d, skipping",offset)elifrec_type==REC_METAandself._metaisNone:ifverify_payload_crc(payload,stored_crc):try:self._meta=decode_meta(payload)exceptException:passdef_build_findings_index(self)->None:"""Build the lazy flow_id -> [Finding] index from REC_FINDING records. Performance gate: the open-time scan records whether any REC_FINDING record exists (``self._saw_finding_record``) and, when so, their byte offsets (``self._finding_offsets``): * ``_saw_finding_record is False`` — the open-time linear scan walked every record and saw no findings, so we short-circuit to an empty index without re-reading the (potentially large) file. This is the common case for findings-free captures. * ``_finding_offsets`` populated — seek directly to each known finding record instead of scanning the whole file. * ``_saw_finding_record is None`` — undetermined (the footer-index path does not walk records, so presence is unknown without a scan). Fall back to the original full linear pass, preserving behaviour. The result is cached in ``self._findings_index``. """assertself._fileisnotNone# Short-circuit: open-time scan proved there are no findings.ifself._saw_finding_recordisFalse:self._findings_index={}returnindex:dict[str,list]={}# Lazy import to avoid an import cycle (analysis imports flow.models).try:fromfriTap.analysisimportFindingexceptException:Finding=None# type: ignoredefadd_record(payload:bytes,stored_crc:int,offset:int)->None:ifnotverify_payload_crc(payload,stored_crc):logger.debug("CRC mismatch for REC_FINDING at offset %d, skipping",offset)returntry:flow_id,finding_dicts=decode_finding_record(payload)exceptException:logger.debug("Skipping corrupt REC_FINDING at offset %d",offset)returnbucket=index.setdefault(flow_id,[])forfdinfinding_dicts:bucket.append(Finding.from_dict(fd)ifFindingisnotNoneelsefd)# Fast path: seek directly to the offsets captured at open() time.ifself._saw_finding_recordisTrueandself._finding_offsets:foroffsetinself._finding_offsets:self._file.seek(offset)envelope_raw=self._file.read(_RECORD_ENVELOPE.size)iflen(envelope_raw)<_RECORD_ENVELOPE.size:continuetry:rec_type,payload_len,stored_crc,_=decode_record_envelope(envelope_raw)exceptValueError:continueifrec_type!=REC_FINDINGorpayload_len==0:continuepayload=self._file.read(payload_len)iflen(payload)<payload_len:continueadd_record(payload,stored_crc,offset)self._findings_index=indexreturn# Fallback (presence undetermined, e.g. footer-index path): full scan.foroffset,rec_type,payload,stored_crcinself._iter_records(self._data_start):ifrec_type!=REC_FINDING:continueadd_record(payload,stored_crc,offset)self._findings_index=indexdef_iter_records(self,start_offset:int):"""Yield ``(offset, rec_type, payload, stored_crc)`` for every record. Single shared sequential record iterator used by both ``_build_index_linear_scan`` and ``_build_findings_index`` (fallback path). It handles sync-marker recovery on a corrupt envelope and skips zero-length records exactly as the prior duplicated loops did. The CRC is yielded unverified so each caller can apply its own verify/decode policy per record type (behaviour-preserving). """assertself._fileisnotNoneself._file.seek(start_offset)whileTrue:offset=self._file.tell()envelope_raw=self._file.read(_RECORD_ENVELOPE.size)iflen(envelope_raw)<_RECORD_ENVELOPE.size:break# EOF or truncatedtry:rec_type,payload_len,stored_crc,_=decode_record_envelope(envelope_raw)exceptValueError:# Try to find next sync marker for recoveryrecovered=self._recover_from(offset+1)ifrecovered<0:breakcontinueifpayload_len==0:continuepayload=self._file.read(payload_len)iflen(payload)<payload_len:break# Truncated payload at EOFyieldoffset,rec_type,payload,stored_crcdef_recover_from(self,start_offset:int)->int:"""Scan forward from start_offset to find the next valid sync marker. Returns the offset of the next record, or -1 if not found. """assertself._fileisnotNoneself._file.seek(start_offset)# Read in 4KB chunks to find sync markerwhileTrue:chunk_start=self._file.tell()chunk=self._file.read(4096)ifnotchunk:return-1idx=find_sync_marker(chunk)ifidx>=0:recovered_offset=chunk_start+idxself._file.seek(recovered_offset)returnrecovered_offset# ------------------------------------------------------------------# Internal: record reading# ------------------------------------------------------------------def_read_record_payload_at(self,offset:int)->Optional[bytes]:"""Read and verify a single record's payload at the given file offset. Returns the raw payload bytes, or None if the record is invalid. """assertself._fileisnotNoneself._file.seek(offset)envelope_raw=self._file.read(_RECORD_ENVELOPE.size)iflen(envelope_raw)<_RECORD_ENVELOPE.size:returnNonetry:rec_type,payload_len,stored_crc,_=decode_record_envelope(envelope_raw)exceptValueError:returnNonepayload=self._file.read(payload_len)iflen(payload)<payload_len:returnNoneifnotverify_payload_crc(payload,stored_crc):logger.warning("CRC mismatch at offset %d",offset)returnNonereturnpayloaddef_try_read_meta(self)->Optional[TapMeta]:"""Try to read a META record at the current position."""assertself._fileisnotNonepos=self._file.tell()envelope_raw=self._file.read(_RECORD_ENVELOPE.size)iflen(envelope_raw)<_RECORD_ENVELOPE.size:self._file.seek(pos)returnNonetry:rec_type,payload_len,stored_crc,_=decode_record_envelope(envelope_raw)exceptValueError:self._file.seek(pos)returnNoneifrec_type!=REC_META:self._file.seek(pos)returnNonepayload=self._file.read(payload_len)iflen(payload)<payload_len:self._file.seek(pos)returnNoneifnotverify_payload_crc(payload,stored_crc):self._file.seek(pos)returnNonereturndecode_meta(payload)def_ensure_open(self)->None:ifnotself._openedorself._fileisNone:raiseRuntimeError("TapReader is not open. Call open() first.")
defopen(self)->TapMeta:"""Open the .tap file, parse the header, and build the flow index. Returns the file-level TapMeta. """self._file=open(self._path,"rb")self._opened=Truetry:# Read headerheader_raw=self._file.read(_HEADER_STRUCT.size+4096)# read extra for extself._header,self._data_start=decode_header(header_raw)# Try to read META record (should be the first record)self._file.seek(self._data_start)self._meta=self._try_read_meta()# Build flow indexifself._header.flags&FLAG_HAS_INDEX:self._build_index_from_footer()else:logger.info("No index in .tap file, performing linear scan")self._build_index_linear_scan()exceptException:self.close()raiselogger.info("TapReader opened: %s (%d flows)",self._path,len(self._flow_offsets),)returnself._metaorTapMeta()
defread_flow_summaries(self)->list[FlowSummary]:"""Read lightweight flow metadata for all flows (no chunks/bodies). Returns summaries sorted by start timestamp. """self._ensure_open()summaries=[]forflow_id,offsetinself._flow_offsets.items():try:payload=self._read_record_payload_at(offset)ifpayloadisnotNone:summary=decode_flow_summary(payload,file_offset=offset)summaries.append(summary)exceptException:logger.debug("Failed to read flow summary at offset %d",offset,exc_info=True)# Enrich finding_count from the findings index: findings are stored in# separate REC_FINDING records, not in FLOW meta, so decode_flow_summary# cannot know them and leaves the count at 0. has_findings() is a cheap# no-op short-circuit for findings-free captures (the common case); only# captures that actually carry findings pay for the (cached) index build.ifself.has_findings():forsummaryinsummaries:summary.finding_count=len(self.read_findings(summary.flow_id))summaries.sort(key=lambdas:s.started)returnsummaries
defread_flow(self,flow_id:str)->Optional["Flow"]:"""Read a full Flow object by flow_id (on-demand, with chunks/bodies). Returns None if the flow_id is not in the index. """self._ensure_open()offset=self._flow_offsets.get(flow_id)ifoffsetisNone:returnNonetry:payload=self._read_record_payload_at(offset)ifpayloadisNone:returnNoneflow=decode_flow(payload)exceptException:logger.error("Failed to read flow %s at offset %d",flow_id,offset,exc_info=True)returnNone# Findings are best-effort enrichment: a corrupt/undecodable findings# index must NEVER suppress an already-decoded valid flow. Isolate it.try:findings=self.read_findings(flow_id)iffindings:flow.findings=list(findings)exceptException:logger.warning("Failed to read findings for flow %s; returning flow without findings",flow_id,exc_info=True,)returnflow
defread_all_flows(self)->list["Flow"]:"""Read all flows fully (with chunks and bodies). For large captures, prefer read_flow_summaries() + read_flow() on demand. """self._ensure_open()flows=[]forflow_id,offsetinself._flow_offsets.items():try:payload=self._read_record_payload_at(offset)ifpayloadisNone:continueflow=decode_flow(payload)exceptException:logger.debug("Failed to read flow at offset %d",offset,exc_info=True)continue# Findings are best-effort enrichment: a corrupt/undecodable# findings index must NEVER drop an already-decoded valid flow.try:findings=self.read_findings(flow.flow_id)iffindings:flow.findings=list(findings)exceptException:logger.warning("Failed to read findings for flow %s; flow kept without findings",flow.flow_id,exc_info=True,)flows.append(flow)flows.sort(key=lambdaf:f.started)returnflows
defclose(self)->None:"""Close the file. Safe to call multiple times."""ifself._fileisnotNone:try:self._file.close()exceptException:passself._file=Noneself._opened=Falseself._flow_offsets.clear()self._findings_index=Noneself._saw_finding_record=Noneself._finding_offsets=[]
ReplayController is the higher-level facade used by the TUI replay view. It adds an LRU cache (128 flows), a context-manager interface, and convenience properties. It implements IFlowSource (get_flows() / get_flow(id)).
classReplayController:"""Loads and serves flows from a .tap file for TUI replay mode. Satisfies the IFlowSource protocol so MainScreen can use it interchangeably with FlowCollector via CaptureController. """def__init__(self,path:str)->None:self._path=pathself._reader:Optional[TapReader]=Noneself._meta:Optional[TapMeta]=Noneself._summaries:list[FlowSummary]=[]self._flow_cache:_LRUCache=_LRUCache(maxsize=128)self._reparse_results:dict[str,tuple[Optional["ParseResult"],Optional["ParseResult"]]]={}@propertydefreplay_file(self)->str:returnself._path@propertydefflow_count(self)->int:returnlen(self._summaries)@propertydefmeta(self)->Optional[TapMeta]:returnself._meta@propertydefheader(self):returnself._reader.headerifself._readerelseNonedefload(self)->TapMeta:"""Open the .tap file and load flow summaries. Returns the file-level TapMeta. """self._reader=TapReader(self._path)self._meta=self._reader.open()self._summaries=self._reader.read_flow_summaries()logger.info("Replay loaded: %s (%d flows)",self._path,len(self._summaries))returnself._metadefget_summaries(self)->list[FlowSummary]:"""Return all flow summaries (lightweight, no chunk/body data)."""returnlist(self._summaries)defget_flows(self)->list["Flow"]:"""Return all flows as full Flow objects. Note: For large captures this loads everything into memory. Prefer get_summaries() + get_flow() for on-demand loading. """ifself._readerisNone:return[]returnself._reader.read_all_flows()defget_flow(self,flow_id:str)->Optional["Flow"]:"""Load a full Flow by ID (on-demand from disk, LRU-cached)."""flow=self._flow_cache.get(flow_id)ifflowisNone:ifself._readerisNone:returnNoneflow=self._reader.read_flow(flow_id)ifflowisNone:returnNoneself._flow_cache[flow_id]=flow# Re-apply reparse results that survive cache evictionifflow_idinself._reparse_results:req,resp=self._reparse_results[flow_id]ifreqisnotNone:flow.request=reqifrespisnotNone:flow.response=respreturnflowdefstore_reparse(self,flow_id:str,request:Optional["ParseResult"],response:Optional["ParseResult"],)->None:"""Store reparse results so they survive cache eviction."""self._reparse_results[flow_id]=(request,response)defread_all_findings(self)->list:"""Return persisted findings (as dicts) across all flows in the .tap. Passthrough over :class:`~friTap.flow.tap_reader.TapReader` so callers (e.g. the TUI findings viewer) don't reach into the private reader. Iterates the loaded summaries and collects each flow's REC_FINDING records. Returns an empty list if the file carries none. """ifself._readerisNone:return[]findings:list=[]forsummaryinself._summaries:findings.extend(self._reader.read_findings(summary.flow_id))returnfindingsdefhas_findings(self)->bool:"""True if the .tap file contains any persisted findings."""ifself._readerisNone:returnFalsereturnself._reader.has_findings()defclose(self)->None:"""Close the reader and release resources."""ifself._readerisnotNone:self._reader.close()self._reader=Noneself._flow_cache.clear()self._reparse_results.clear()self._summaries.clear()def__enter__(self)->"ReplayController":self.load()returnselfdef__exit__(self,*args)->None:self.close()
defget_flows(self)->list["Flow"]:"""Return all flows as full Flow objects. Note: For large captures this loads everything into memory. Prefer get_summaries() + get_flow() for on-demand loading. """ifself._readerisNone:return[]returnself._reader.read_all_flows()
defget_flow(self,flow_id:str)->Optional["Flow"]:"""Load a full Flow by ID (on-demand from disk, LRU-cached)."""flow=self._flow_cache.get(flow_id)ifflowisNone:ifself._readerisNone:returnNoneflow=self._reader.read_flow(flow_id)ifflowisNone:returnNoneself._flow_cache[flow_id]=flow# Re-apply reparse results that survive cache evictionifflow_idinself._reparse_results:req,resp=self._reparse_results[flow_id]ifreqisnotNone:flow.request=reqifrespisnotNone:flow.response=respreturnflow
defclose(self)->None:"""Close the reader and release resources."""ifself._readerisnotNone:self._reader.close()self._reader=Noneself._flow_cache.clear()self._reparse_results.clear()self._summaries.clear()
friTap can run its analyzers (credentials, ioc, privacy, protobuf, plus custom ones) over a captured .tap file — passive analysis, no network activity. Use analyze_tap_report() for programmatic access; it performs no I/O beyond reading the .tap and never calls sys.exit.
Run analyzers over tap_file and return findings + a rendered report.
Pure orchestration shared by the CLI and external tools: resolve analyzers (scanners is a comma-separated name list, or None/"all" for the built-ins) → run them → filter (severity/confidence/source/category) → render in report_format (one of :func:list_report_formats). Performs no stdout, no sidecar write and never calls sys.exit — callers decide how to surface the result.
The filter arguments are all additive and default to no-ops, so existing callers are unaffected:
min_confidence — keep findings with confidence at or above this value.
source — comma-separated analyzer source names to keep (which findings show; distinct from scanners, which selects analyzers that run).
category — comma-separated finding categories to keep (secret/pii/network/protocol).
show_pii — reveal PII/secret values instead of redacting them (forwarded to analyzers as reveal_pii; default redacts).
Raises ValueError for an unknown report_format or an unresolvable analyzer spec, ImportError for a bad analyzer_path; any .tap read/analyze failure propagates.
defanalyze_tap_report(tap_file:str,*,scanners:str|None=None,analyzer_path:str|None=None,min_severity:str="info",report_format:str="table",include_private_ips:bool=False,protobuf_schema:str|None=None,min_confidence:float=0.0,source:str|None=None,category:str|None=None,show_pii:bool=False,)->AnalyzeReport:"""Run analyzers over *tap_file* and return findings + a rendered report. Pure orchestration shared by the CLI and external tools: resolve analyzers (``scanners`` is a comma-separated name list, or ``None``/``"all"`` for the built-ins) → run them → filter (severity/confidence/source/category) → render in *report_format* (one of :func:`list_report_formats`). Performs no stdout, no sidecar write and never calls ``sys.exit`` — callers decide how to surface the result. The filter arguments are all additive and default to no-ops, so existing callers are unaffected: * ``min_confidence`` — keep findings with confidence at or above this value. * ``source`` — comma-separated analyzer source names to keep (which findings *show*; distinct from ``scanners``, which selects analyzers that *run*). * ``category`` — comma-separated finding categories to keep (``secret``/``pii``/``network``/``protocol``). * ``show_pii`` — reveal PII/secret values instead of redacting them (forwarded to analyzers as ``reveal_pii``; default redacts). Raises ``ValueError`` for an unknown *report_format* or an unresolvable analyzer spec, ``ImportError`` for a bad ``analyzer_path``; any .tap read/analyze failure propagates. """fromfriTap.analysis.filteringimportFindingFilter,apply,split_csvifreport_formatnotin_REPORTER_REGISTRY:raiseValueError(f"unknown report format {report_format!r}; "f"choose from {', '.join(sorted(_REPORTER_REGISTRY))}")analyzers=resolve_analyzers(scanners,analyzer_path=analyzer_path,include_private_ips=include_private_ips,protobuf_schema=protobuf_schema,reveal_pii=show_pii,)findings=analyze_tap_multi(analyzers,tap_file)normalized_severity=min_severityifmin_severityin_SEVERITY_ORDERelse"info"finding_filter=FindingFilter(min_severity=normalized_severity,sources=split_csv(source),categories=split_csv(category),min_confidence=min_confidenceifmin_confidence>0.0elseNone,)filtered=apply(findings,finding_filter)analyzer_names=[a.nameforainanalyzers]meta={"tap_file":tap_file,"analyzers":analyzer_names}# AnalyzeReport.findings keep full fidelity; only the rendered report is# redacted (unless show_pii). The reporter is the redaction enforcement point.rendered=_REPORTER_REGISTRY[report_format](redact_pii=notshow_pii).report(filtered,meta)returnAnalyzeReport(findings=filtered,rendered=rendered,report_format=report_format,analyzer_names=analyzer_names,meta=meta,)
Carries both the structured (already severity-filtered) findings and the report rendered in the requested format, so web / TUI / CLI callers can use whichever they need without re-running the analyzers. Returned by :func:analyze_tap_report.
@dataclass(frozen=True)classAnalyzeReport:"""Result of a programmatic ``.tap`` analysis. Carries both the structured (already severity-filtered) findings and the report rendered in the requested format, so web / TUI / CLI callers can use whichever they need without re-running the analyzers. Returned by :func:`analyze_tap_report`. """findings:list["Finding"]rendered:strreport_format:stranalyzer_names:list[str]=field(default_factory=list)meta:dict[str,Any]=field(default_factory=dict)gate_severity:str=_GATE_SEVERITY@propertydefgate_tripped(self)->bool:"""True iff any finding is at or above :attr:`gate_severity`. Mirrors the condition under which :func:`run_analyze_cli` returns the non-zero CI-gate exit code. """returnbool(_filter_min_severity(self.findings,self.gate_severity))@propertydefexit_code(self)->int:"""CLI-parity exit code: 2 when :attr:`gate_tripped`, else 0."""return2ifself.gate_trippedelse0
CLI-parity exit code: 2 when :attr:gate_tripped, else 0.
fromfriTapimportanalyze_tap_reportreport=analyze_tap_report("capture_20260507_153933.tap",# offline / CI-runnablescanners="credentials,ioc",# None or "all" → built-ins (which analyzers RUN)min_severity="info",report_format="table",# report-side filters (additive, keyword-only; all default to no-op):min_confidence=0.0,# drop findings below this confidence (0.0–1.0)source=None,# comma-separated source names to SHOW (e.g. "credentials,privacy")category=None,# "secret,pii,network,protocol" to SHOWshow_pii=False,# reveal PII/secret values instead of redacting)forfindinginreport.findings:print(finding.severity.name,finding.category,finding.title,finding.flow_id)print(report.rendered)# the rendered table/json/csv/mdraiseSystemExit(report.exit_code)# 2 if any finding ≥ medium, else 0 (CI gate)
min_confidence, source, category, and show_pii are additive, keyword-only parameters — older calls that omit them keep working unchanged. scanners chooses which analyzers run; source/category filter which of the resulting findings are shown.
Each Finding has: severity (a Severity enum), title, description, source, flow_id, confidence (default 1.0), timestamp, and the dicts evidence and metadata. Severity is CRITICAL/HIGH/MEDIUM/LOW/INFO (rank 0 is most severe).
Every finding now carries a category in metadata["category"], surfaced as the Finding.category property: secret (credentials), pii (privacy), network (IOC), or protocol (protobuf). PII findings additionally carry metadata["compliance"] (e.g. GDPR, CCPA, PCI-DSS, HIPAA).
FindingFilter (exported from the top-level friTap package) is the reusable filter behind the CLI's --source/--category/--min-confidence flags. Apply it to any iterable of Finding objects:
fromfriTapimportFindingFilterflt=FindingFilter(min_severity="info",# floor: this severity and abovemin_confidence=0.8,sources={"credentials","privacy"},categories={"secret","pii"},)kept=[fforfinreport.findingsifflt.matches(f)]
Use severities for an exact-bucket match instead of the min_severity floor — FindingFilter(severities={"high"}) keeps only high findings (this is what the TUI's per-severity dashboard chips use, so a chip's count matches its result):
The built-in analyzer classes are available under friTap.analysis.* — the privacy analyzer is from friTap.analysis.privacy import PrivacyAnalyzer (not re-exported from the top-level friTap package). Select it by name via scanners="privacy" rather than importing it directly.
AnalyzeReport.gate_tripped is True when any finding is at or above the gate severity (medium), and exit_code returns 2 in that case (else 0) — mirroring the fritap analyze CLI exit code. Use it to fail a pipeline when a capture contains sensitive findings.
list_analyzers() → analyzer names (built-in plus any discovered).
list_analyzers_detailed() → richer AnalyzerInfo records (name, source/origin, description) for every analyzer, including discovered ones. Use this to render an analyzer catalog in your own tooling.
list_report_formats() → available report formats (json, csv, md, table).
Custom analyzers can be loaded via the CLI --analyzer-path module:Class (the module:Class form skips the is_fritap_analyzer marker requirement), or made ambiently discoverable through the drop-in directory, the fritap.analyzers entry-points group, or a plugin's register_analyzers() hook — see Traffic Analysis → Ship it everywhere.
Discovery executes code
Discovered analyzers (drop-in directory, fritap.analyzers entry points, plugin bridge) execute as soon as list_analyzers()/list_analyzers_detailed() or any resolution runs. Set FRITAP_DISABLE_ANALYZER_DISCOVERY=1 to disable ambient discovery; explicit --analyzer-path / .analyzer_path() references still work.
The FriTap builder exposes live passive analysis directly. scan() enables it, analyzer_path() is repeatable, and scan_report(), scan_min_severity(), and scan_show_pii() set the corresponding --scan-* options:
fromfriTapimportFriTapsession=(FriTap("com.example.app").mobile().scan("all")# or "credentials,ioc".analyzer_path("my_analyzer:InternalHostAnalyzer").scan_report("table").scan_min_severity("medium").start())
defdecode_raw(data:bytes,max_depth:int=16)->ProtobufMessage:"""Decode raw protobuf wire format without a schema. Recursively attempts sub-message parsing for ``LENGTH_DELIMITED`` fields. Args: data: Raw protobuf bytes. max_depth: Maximum nesting depth (default 16). Returns: A :class:`ProtobufMessage` tree. Raises: ValueError: If the data is malformed. """ifnotdata:returnProtobufMessage()return_decode_raw_impl(data,max_depth)
decode_raw(data, max_depth=16) returns a ProtobufMessage (a tree of ProtobufField objects); format_message(msg, indent=0) renders it as human-readable text. ProtobufProcessor provides higher-level, schema-aware processing.