467 lines
19 KiB
Python
467 lines
19 KiB
Python
import asyncio
|
|
import os
|
|
import threading
|
|
import subprocess
|
|
import concurrent.futures
|
|
import sys
|
|
import logging
|
|
import warnings
|
|
|
|
from pyshark import ek_field_mapping
|
|
from pyshark.packet.packet import Packet
|
|
from pyshark.tshark.output_parser import tshark_ek
|
|
from pyshark.tshark.output_parser import tshark_json
|
|
from pyshark.tshark.output_parser import tshark_xml
|
|
from pyshark.tshark.tshark import get_process_path, get_tshark_display_filter_flag, \
|
|
tshark_supports_json, TSharkVersionException, get_tshark_version, tshark_supports_duplicate_keys
|
|
|
|
|
|
if sys.version_info < (3, 8):
|
|
asyncTimeoutError = concurrent.futures.TimeoutError
|
|
else:
|
|
asyncTimeoutError = asyncio.exceptions.TimeoutError
|
|
|
|
|
|
class TSharkCrashException(Exception):
|
|
pass
|
|
|
|
|
|
class UnknownEncyptionStandardException(Exception):
|
|
pass
|
|
|
|
|
|
class RawMustUseJsonException(Exception):
|
|
"""If the use_raw argument is True, so should the use_json argument"""
|
|
|
|
|
|
class StopCapture(Exception):
|
|
"""Exception that the user can throw anywhere in packet-handling to stop the capture process."""
|
|
pass
|
|
|
|
|
|
class Capture:
|
|
"""Base class for packet captures."""
|
|
SUMMARIES_BATCH_SIZE = 64
|
|
DEFAULT_LOG_LEVEL = logging.CRITICAL
|
|
SUPPORTED_ENCRYPTION_STANDARDS = ["wep", "wpa-pwk", "wpa-pwd", "wpa-psk"]
|
|
|
|
def __init__(self, display_filter=None, only_summaries=False, eventloop=None,
|
|
decryption_key=None, encryption_type="wpa-pwd", output_file=None,
|
|
decode_as=None, disable_protocol=None, tshark_path=None,
|
|
override_prefs=None, capture_filter=None, use_json=False, include_raw=False,
|
|
use_ek=False, custom_parameters=None, debug=False):
|
|
|
|
self.loaded = False
|
|
self.tshark_path = tshark_path
|
|
self._override_prefs = override_prefs
|
|
self.debug = debug
|
|
self.use_json = use_json
|
|
self._use_ek = use_ek
|
|
self.include_raw = include_raw
|
|
self._packets = []
|
|
self._current_packet = 0
|
|
self._display_filter = display_filter
|
|
self._capture_filter = capture_filter
|
|
self._only_summaries = only_summaries
|
|
self._output_file = output_file
|
|
self._running_processes = set()
|
|
self._decode_as = decode_as
|
|
self._disable_protocol = disable_protocol
|
|
self._log = logging.Logger(
|
|
self.__class__.__name__, level=self.DEFAULT_LOG_LEVEL)
|
|
self._closed = False
|
|
self._custom_parameters = custom_parameters
|
|
self._eof_reached = False
|
|
self._last_error_line = None
|
|
self._stderr_handling_tasks = []
|
|
self.__tshark_version = None
|
|
|
|
if include_raw and not (use_json or use_ek):
|
|
raise RawMustUseJsonException(
|
|
"use_json/use_ek must be True if include_raw")
|
|
|
|
if self.debug:
|
|
self.set_debug()
|
|
|
|
self.eventloop = eventloop
|
|
if self.eventloop is None:
|
|
self._setup_eventloop()
|
|
if encryption_type and encryption_type.lower() in self.SUPPORTED_ENCRYPTION_STANDARDS:
|
|
self.encryption = (decryption_key, encryption_type.lower())
|
|
else:
|
|
standards = ", ".join(self.SUPPORTED_ENCRYPTION_STANDARDS)
|
|
raise UnknownEncyptionStandardException(f"Only the following standards are supported: {standards}.")
|
|
|
|
def __getitem__(self, item):
|
|
"""Gets the packet in the given index.
|
|
|
|
:param item: packet index
|
|
:return: Packet object.
|
|
"""
|
|
return self._packets[item]
|
|
|
|
def __len__(self):
|
|
return len(self._packets)
|
|
|
|
def next(self) -> Packet:
|
|
return self.next_packet()
|
|
|
|
# Allows for child classes to call next() from super() without 2to3 "fixing"
|
|
# the call
|
|
def next_packet(self) -> Packet:
|
|
if self._current_packet >= len(self._packets):
|
|
raise StopIteration()
|
|
cur_packet = self._packets[self._current_packet]
|
|
self._current_packet += 1
|
|
return cur_packet
|
|
|
|
def clear(self):
|
|
"""Empties the capture of any saved packets."""
|
|
self._packets = []
|
|
self._current_packet = 0
|
|
|
|
def reset(self):
|
|
"""Starts iterating packets from the first one."""
|
|
self._current_packet = 0
|
|
|
|
def load_packets(self, packet_count=0, timeout=None):
|
|
"""Reads the packets from the source (cap, interface, etc.) and adds it to the internal list.
|
|
|
|
If 0 as the packet_count is given, reads forever
|
|
|
|
:param packet_count: The amount of packets to add to the packet list (0 to read forever)
|
|
:param timeout: If given, automatically stops after a given amount of time.
|
|
"""
|
|
initial_packet_amount = len(self._packets)
|
|
|
|
def keep_packet(pkt):
|
|
self._packets.append(pkt)
|
|
|
|
if packet_count != 0 and len(self._packets) - initial_packet_amount >= packet_count:
|
|
raise StopCapture()
|
|
|
|
try:
|
|
self.apply_on_packets(
|
|
keep_packet, timeout=timeout, packet_count=packet_count)
|
|
self.loaded = True
|
|
except asyncTimeoutError:
|
|
pass
|
|
|
|
def set_debug(self, set_to=True, log_level=logging.DEBUG):
|
|
"""Sets the capture to debug mode (or turns it off if specified)."""
|
|
if set_to:
|
|
handler = logging.StreamHandler(sys.stdout)
|
|
handler.setFormatter(logging.Formatter(
|
|
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
|
|
self._log.addHandler(handler)
|
|
self._log.level = log_level
|
|
self.debug = set_to
|
|
|
|
def _verify_capture_parameters(self):
|
|
"""Optionally verify that the capture's parameters are valid.
|
|
|
|
Should raise an exception if they are not valid.
|
|
"""
|
|
pass
|
|
|
|
def _setup_eventloop(self):
|
|
"""Sets up a new eventloop as the current one according to the OS."""
|
|
if os.name == "nt":
|
|
current_eventloop = asyncio.get_event_loop_policy().get_event_loop()
|
|
if isinstance(current_eventloop, asyncio.ProactorEventLoop):
|
|
self.eventloop = current_eventloop
|
|
else:
|
|
# On Python before 3.8, Proactor is not the default eventloop type, so we have to create a new one.
|
|
# If there was an existing eventloop this can create issues, since we effectively disable it here.
|
|
if asyncio.all_tasks():
|
|
warnings.warn("The running eventloop has tasks but pyshark must set a new eventloop to continue. "
|
|
"Existing tasks may not run.")
|
|
self.eventloop = asyncio.ProactorEventLoop()
|
|
asyncio.set_event_loop(self.eventloop)
|
|
else:
|
|
try:
|
|
self.eventloop = asyncio.get_event_loop_policy().get_event_loop()
|
|
except RuntimeError:
|
|
if threading.current_thread() != threading.main_thread():
|
|
# Ran not in main thread, make a new eventloop
|
|
self.eventloop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(self.eventloop)
|
|
else:
|
|
raise
|
|
if os.name == "posix" and isinstance(threading.current_thread(), threading._MainThread):
|
|
# The default child watchers (ThreadedChildWatcher) attach_loop method is empty!
|
|
# While using pyshark with ThreadedChildWatcher, asyncio could raise a ChildProcessError
|
|
# "Unknown child process pid %d, will report returncode 255"
|
|
# This led to a TSharkCrashException in _cleanup_subprocess.
|
|
# Using the SafeChildWatcher fixes this issue, but it is slower.
|
|
# SafeChildWatcher O(n) -> large numbers of processes are slow
|
|
# ThreadedChildWatcher O(1) -> independent of process number
|
|
# asyncio.get_child_watcher().attach_loop(self.eventloop)
|
|
asyncio.set_child_watcher(asyncio.SafeChildWatcher())
|
|
asyncio.get_child_watcher().attach_loop(self.eventloop)
|
|
|
|
def _packets_from_tshark_sync(self, packet_count=None, existing_process=None):
|
|
"""Returns a generator of packets.
|
|
|
|
This is the sync version of packets_from_tshark. It wait for the completion of each coroutine and
|
|
reimplements reading packets in a sync way, yielding each packet as it arrives.
|
|
|
|
:param packet_count: If given, stops after this amount of packets is captured.
|
|
"""
|
|
# NOTE: This has code duplication with the async version, think about how to solve this
|
|
tshark_process = existing_process or self.eventloop.run_until_complete(
|
|
self._get_tshark_process())
|
|
parser = self._setup_tshark_output_parser()
|
|
packets_captured = 0
|
|
|
|
data = b""
|
|
try:
|
|
while True:
|
|
try:
|
|
packet, data = self.eventloop.run_until_complete(
|
|
parser.get_packets_from_stream(tshark_process.stdout, data,
|
|
got_first_packet=packets_captured > 0))
|
|
|
|
except EOFError:
|
|
self._log.debug("EOF reached (sync)")
|
|
self._eof_reached = True
|
|
break
|
|
|
|
if packet:
|
|
packets_captured += 1
|
|
yield packet
|
|
if packet_count and packets_captured >= packet_count:
|
|
break
|
|
finally:
|
|
if tshark_process in self._running_processes:
|
|
self.eventloop.run_until_complete(
|
|
self._cleanup_subprocess(tshark_process))
|
|
|
|
def apply_on_packets(self, callback, timeout=None, packet_count=None):
|
|
"""Runs through all packets and calls the given callback (a function) with each one as it is read.
|
|
|
|
If the capture is infinite (i.e. a live capture), it will run forever, otherwise it will complete after all
|
|
packets have been read.
|
|
|
|
Example usage:
|
|
def print_callback(pkt):
|
|
print(pkt)
|
|
capture.apply_on_packets(print_callback)
|
|
|
|
If a timeout is given, raises a Timeout error if not complete before the timeout (in seconds)
|
|
"""
|
|
coro = self.packets_from_tshark(callback, packet_count=packet_count)
|
|
if timeout is not None:
|
|
coro = asyncio.wait_for(coro, timeout)
|
|
return self.eventloop.run_until_complete(coro)
|
|
|
|
async def packets_from_tshark(self, packet_callback, packet_count=None, close_tshark=True):
|
|
"""
|
|
A coroutine which creates a tshark process, runs the given callback on each packet that is received from it and
|
|
closes the process when it is done.
|
|
|
|
Do not use interactively. Can be used in order to insert packets into your own eventloop.
|
|
"""
|
|
tshark_process = await self._get_tshark_process(packet_count=packet_count)
|
|
try:
|
|
await self._go_through_packets_from_fd(tshark_process.stdout, packet_callback, packet_count=packet_count)
|
|
except StopCapture:
|
|
pass
|
|
finally:
|
|
if close_tshark:
|
|
await self.close_async()
|
|
|
|
async def _go_through_packets_from_fd(self, fd, packet_callback, packet_count=None):
|
|
"""A coroutine which goes through a stream and calls a given callback for each XML packet seen in it."""
|
|
packets_captured = 0
|
|
self._log.debug("Starting to go through packets")
|
|
|
|
parser = self._setup_tshark_output_parser()
|
|
data = b""
|
|
|
|
while True:
|
|
try:
|
|
packet, data = await parser.get_packets_from_stream(fd, data,
|
|
got_first_packet=packets_captured > 0)
|
|
except EOFError:
|
|
self._log.debug("EOF reached")
|
|
self._eof_reached = True
|
|
break
|
|
|
|
if packet:
|
|
packets_captured += 1
|
|
try:
|
|
packet_callback(packet)
|
|
except StopCapture:
|
|
self._log.debug("User-initiated capture stop in callback")
|
|
break
|
|
|
|
if packet_count and packets_captured >= packet_count:
|
|
break
|
|
|
|
def _create_stderr_handling_task(self, stderr):
|
|
self._stderr_handling_tasks.append(asyncio.ensure_future(self._handle_process_stderr_forever(stderr)))
|
|
|
|
async def _handle_process_stderr_forever(self, stderr):
|
|
while True:
|
|
stderr_line = await stderr.readline()
|
|
if not stderr_line:
|
|
break
|
|
stderr_line = stderr_line.decode().strip()
|
|
self._last_error_line = stderr_line
|
|
self._log.debug(stderr_line)
|
|
|
|
def _get_tshark_path(self):
|
|
return get_process_path(self.tshark_path)
|
|
|
|
def _get_tshark_version(self):
|
|
if self.__tshark_version is None:
|
|
self.__tshark_version = get_tshark_version(self.tshark_path)
|
|
return self.__tshark_version
|
|
|
|
async def _get_tshark_process(self, packet_count=None, stdin=None):
|
|
"""Returns a new tshark process with previously-set parameters."""
|
|
self._verify_capture_parameters()
|
|
|
|
output_parameters = []
|
|
if self.use_json or self._use_ek:
|
|
if not tshark_supports_json(self._get_tshark_version()):
|
|
raise TSharkVersionException(
|
|
"JSON only supported on Wireshark >= 2.2.0")
|
|
|
|
if self.use_json:
|
|
output_type = "json"
|
|
if tshark_supports_duplicate_keys(self._get_tshark_version()):
|
|
output_parameters.append("--no-duplicate-keys")
|
|
elif self._use_ek:
|
|
output_type = "ek"
|
|
else:
|
|
output_type = "psml" if self._only_summaries else "pdml"
|
|
parameters = [self._get_tshark_path(), "-l", "-n", "-T", output_type] + \
|
|
self.get_parameters(packet_count=packet_count) + output_parameters
|
|
|
|
self._log.debug(
|
|
"Creating TShark subprocess with parameters: " + " ".join(parameters))
|
|
self._log.debug("Executable: %s", parameters[0])
|
|
tshark_process = await asyncio.create_subprocess_exec(*parameters,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
stdin=stdin)
|
|
self._create_stderr_handling_task(tshark_process.stderr)
|
|
self._created_new_process(parameters, tshark_process)
|
|
return tshark_process
|
|
|
|
def _created_new_process(self, parameters, process, process_name="TShark"):
|
|
self._log.debug(
|
|
process_name + f" subprocess (pid {process.pid}) created")
|
|
if process.returncode is not None and process.returncode != 0:
|
|
raise TSharkCrashException(
|
|
f"{process_name} seems to have crashed. Try updating it. (command ran: '{' '.join(parameters)}')")
|
|
self._running_processes.add(process)
|
|
|
|
async def _cleanup_subprocess(self, process):
|
|
"""Kill the given process and properly closes any pipes connected to it."""
|
|
self._log.debug(f"Cleanup Subprocess (pid {process.pid})")
|
|
if process.returncode is None:
|
|
try:
|
|
process.kill()
|
|
return await asyncio.wait_for(process.wait(), 1)
|
|
except asyncTimeoutError:
|
|
self._log.debug(
|
|
"Waiting for process to close failed, may have zombie process.")
|
|
except ProcessLookupError:
|
|
pass
|
|
except OSError:
|
|
if os.name != "nt":
|
|
raise
|
|
elif process.returncode > 0:
|
|
if process.returncode != 1 or self._eof_reached:
|
|
raise TSharkCrashException(f"TShark (pid {process.pid}) seems to have crashed (retcode: {process.returncode}).\n"
|
|
f"Last error line: {self._last_error_line}\n"
|
|
"Try rerunning in debug mode [ capture_obj.set_debug() ] or try updating tshark.")
|
|
|
|
def _setup_tshark_output_parser(self):
|
|
if self.use_json:
|
|
return tshark_json.TsharkJsonParser(self._get_tshark_version())
|
|
if self._use_ek:
|
|
ek_field_mapping.MAPPING.load_mapping(str(self._get_tshark_version()),
|
|
tshark_path=self.tshark_path)
|
|
return tshark_ek.TsharkEkJsonParser()
|
|
return tshark_xml.TsharkXmlParser(parse_summaries=self._only_summaries)
|
|
|
|
def close(self):
|
|
self.eventloop.run_until_complete(self.close_async())
|
|
|
|
async def close_async(self):
|
|
for process in self._running_processes.copy():
|
|
await self._cleanup_subprocess(process)
|
|
self._running_processes.clear()
|
|
|
|
# Wait for all stderr handling to finish
|
|
await asyncio.gather(*self._stderr_handling_tasks)
|
|
|
|
def __del__(self):
|
|
if self._running_processes:
|
|
self.close()
|
|
|
|
def __enter__(self): return self
|
|
async def __aenter__(self): return self
|
|
def __exit__(self, exc_type, exc_val, exc_tb): self.close()
|
|
|
|
async def __aexit__(self, exc_type, exc_val,
|
|
exc_tb): await self.close_async()
|
|
|
|
def get_parameters(self, packet_count=None):
|
|
"""Returns the special tshark parameters to be used according to the configuration of this class."""
|
|
params = []
|
|
if self._capture_filter:
|
|
params += ["-f", self._capture_filter]
|
|
if self._display_filter:
|
|
params += [get_tshark_display_filter_flag(self._get_tshark_version(),),
|
|
self._display_filter]
|
|
# Raw is only enabled when JSON is also enabled.
|
|
if self.include_raw:
|
|
params += ["-x"]
|
|
if packet_count:
|
|
params += ["-c", str(packet_count)]
|
|
|
|
if self._custom_parameters:
|
|
if isinstance(self._custom_parameters, list):
|
|
params += self._custom_parameters
|
|
elif isinstance(self._custom_parameters, dict):
|
|
for key, val in self._custom_parameters.items():
|
|
params += [key, val]
|
|
else:
|
|
raise TypeError("Custom parameters type not supported.")
|
|
|
|
if all(self.encryption):
|
|
params += ["-o", "wlan.enable_decryption:TRUE", "-o", 'uat:80211_keys:"' + self.encryption[1] + '","' +
|
|
self.encryption[0] + '"']
|
|
if self._override_prefs:
|
|
for preference_name, preference_value in self._override_prefs.items():
|
|
if all(self.encryption) and preference_name in ("wlan.enable_decryption", "uat:80211_keys"):
|
|
continue # skip if override preferences also given via --encryption options
|
|
params += ["-o", f"{preference_name}:{preference_value}"]
|
|
|
|
if self._output_file:
|
|
params += ["-w", self._output_file]
|
|
|
|
if self._decode_as:
|
|
for criterion, decode_as_proto in self._decode_as.items():
|
|
params += ["-d",
|
|
",".join([criterion.strip(), decode_as_proto.strip()])]
|
|
|
|
if self._disable_protocol:
|
|
params += ["--disable-protocol", self._disable_protocol.strip()]
|
|
|
|
return params
|
|
|
|
def __iter__(self):
|
|
if self.loaded:
|
|
return iter(self._packets)
|
|
else:
|
|
return self._packets_from_tshark_sync()
|
|
|
|
def __repr__(self):
|
|
return f"<{self.__class__.__name__} ({len(self._packets)} packets)>"
|