201 lines
8.9 KiB
Python
201 lines
8.9 KiB
Python
import asyncio
|
|
import datetime
|
|
import itertools
|
|
import subprocess
|
|
import os
|
|
import struct
|
|
import time
|
|
import warnings
|
|
from packaging import version
|
|
|
|
from pyshark.capture.capture import Capture, StopCapture
|
|
|
|
DEFAULT_TIMEOUT = 30
|
|
|
|
|
|
class LinkTypes(object):
|
|
NULL = 0
|
|
ETHERNET = 1
|
|
IEEE802_5 = 6
|
|
PPP = 9
|
|
IEEE802_11 = 105
|
|
|
|
|
|
class InMemCapture(Capture):
|
|
|
|
def __init__(self, bpf_filter=None, display_filter=None, only_summaries=False,
|
|
decryption_key=None, encryption_type='wpa-pwk', decode_as=None,
|
|
disable_protocol=None, tshark_path=None, override_prefs=None, use_json=False, use_ek=False,
|
|
linktype=LinkTypes.ETHERNET, include_raw=False, eventloop=None, custom_parameters=None,
|
|
debug=False):
|
|
"""Creates a new in-mem capture, a capture capable of receiving binary packets and parsing them using tshark.
|
|
|
|
Significantly faster if packets are added in a batch.
|
|
|
|
:param bpf_filter: BPF filter to use on packets.
|
|
:param display_filter: Display (wireshark) filter to use.
|
|
:param only_summaries: Only produce packet summaries, much faster but includes very little information
|
|
:param decryption_key: Key used to encrypt and decrypt captured traffic.
|
|
:param encryption_type: Standard of encryption used in captured traffic (must be either 'WEP', 'WPA-PWD',
|
|
or 'WPA-PWK'. Defaults to WPA-PWK).
|
|
:param decode_as: A dictionary of {decode_criterion_string: decode_as_protocol} that are used to tell tshark
|
|
to decode protocols in situations it wouldn't usually, for instance {'tcp.port==8888': 'http'} would make
|
|
it attempt to decode any port 8888 traffic as HTTP. See tshark documentation for details.
|
|
:param tshark_path: Path of the tshark binary
|
|
:param override_prefs: A dictionary of tshark preferences to override, {PREFERENCE_NAME: PREFERENCE_VALUE, ...}.
|
|
:param disable_protocol: Tells tshark to remove a dissector for a specifc protocol.
|
|
:param custom_parameters: A dict of custom parameters to pass to tshark, i.e. {"--param": "value"}
|
|
or else a list of parameters in the format ["--foo", "bar", "--baz", "foo"].
|
|
"""
|
|
super(InMemCapture, self).__init__(display_filter=display_filter, only_summaries=only_summaries,
|
|
decryption_key=decryption_key, encryption_type=encryption_type,
|
|
decode_as=decode_as, disable_protocol=disable_protocol,
|
|
tshark_path=tshark_path, override_prefs=override_prefs,
|
|
use_json=use_json, use_ek=use_ek,
|
|
include_raw=include_raw, eventloop=eventloop,
|
|
custom_parameters=custom_parameters, debug=debug)
|
|
self.bpf_filter = bpf_filter
|
|
self._packets_to_write = None
|
|
self._current_linktype = linktype
|
|
self._current_tshark = None
|
|
|
|
def get_parameters(self, packet_count=None):
|
|
"""Returns the special tshark parameters to be used according to the configuration of this class."""
|
|
params = super(InMemCapture, self).get_parameters(
|
|
packet_count=packet_count)
|
|
params += ['-i', '-']
|
|
return params
|
|
|
|
async def _get_tshark_process(self, packet_count=None):
|
|
if self._current_tshark:
|
|
return self._current_tshark
|
|
proc = await super(InMemCapture, self)._get_tshark_process(packet_count=packet_count, stdin=subprocess.PIPE)
|
|
self._current_tshark = proc
|
|
|
|
# Create PCAP header
|
|
header = struct.pack("IHHIIII", 0xa1b2c3d4, 2, 4,
|
|
0, 0, 0x7fff, self._current_linktype)
|
|
proc.stdin.write(header)
|
|
|
|
return proc
|
|
|
|
def _get_json_separators(self):
|
|
""""Returns the separators between packets in a JSON output
|
|
|
|
Returns a tuple of (packet_separator, end_of_file_separator, characters_to_disregard).
|
|
The latter variable being the number of characters to ignore in order to pass the packet (i.e. extra newlines,
|
|
commas, parenthesis).
|
|
"""
|
|
if self._get_tshark_version() >= version.parse("2.6.7"):
|
|
return f"{os.linesep} }}".encode(), f"}}{os.linesep}]".encode(), 0
|
|
else:
|
|
return f'}}{os.linesep}{os.linesep}'.encode(), f"}}{os.linesep}{os.linesep}]", 1
|
|
|
|
def _write_packet(self, packet, sniff_time):
|
|
if sniff_time is None:
|
|
now = time.time()
|
|
elif isinstance(sniff_time, datetime.datetime):
|
|
now = sniff_time.timestamp()
|
|
else:
|
|
now = float(sniff_time)
|
|
secs = int(now)
|
|
usecs = int((now * 1000000) % 1000000)
|
|
# Write packet header
|
|
self._current_tshark.stdin.write(struct.pack(
|
|
"IIII", secs, usecs, len(packet), len(packet)))
|
|
self._current_tshark.stdin.write(packet)
|
|
|
|
def parse_packet(self, binary_packet, sniff_time=None, timeout=DEFAULT_TIMEOUT):
|
|
"""Parses a single binary packet and returns its parsed version.
|
|
|
|
DOES NOT CLOSE tshark. It must be closed manually by calling close() when you're done
|
|
working with it.
|
|
Use parse_packets when parsing multiple packets for faster parsing
|
|
"""
|
|
if sniff_time is not None:
|
|
sniff_time = [sniff_time]
|
|
return self.parse_packets([binary_packet], sniff_time, timeout)[0]
|
|
|
|
def parse_packets(self, binary_packets, sniff_times=None, timeout=DEFAULT_TIMEOUT):
|
|
"""Parses binary packets and return a list of parsed packets.
|
|
|
|
DOES NOT CLOSE tshark. It must be closed manually by calling close() when you're done
|
|
working with it.
|
|
"""
|
|
if self.eventloop is None:
|
|
self._setup_eventloop()
|
|
return self.eventloop.run_until_complete(self.parse_packets_async(binary_packets, sniff_times, timeout))
|
|
|
|
async def parse_packets_async(self, binary_packets, sniff_times=None, timeout=DEFAULT_TIMEOUT):
|
|
"""A coroutine which parses binary packets and return a list of parsed packets.
|
|
|
|
DOES NOT CLOSE tshark. It must be closed manually by calling close() when you're done
|
|
working with it.
|
|
"""
|
|
parsed_packets = []
|
|
if sniff_times is None:
|
|
sniff_times = []
|
|
if not self._current_tshark:
|
|
await self._get_tshark_process()
|
|
for binary_packet, sniff_time in itertools.zip_longest(binary_packets, sniff_times):
|
|
self._write_packet(binary_packet, sniff_time)
|
|
|
|
def callback(pkt):
|
|
parsed_packets.append(pkt)
|
|
if len(parsed_packets) == len(binary_packets):
|
|
raise StopCapture()
|
|
|
|
await self._get_parsed_packet_from_tshark(callback, timeout)
|
|
return parsed_packets
|
|
|
|
async def _get_parsed_packet_from_tshark(self, callback, timeout):
|
|
await self._current_tshark.stdin.drain()
|
|
try:
|
|
await asyncio.wait_for(self.packets_from_tshark(callback, close_tshark=False), timeout)
|
|
except asyncio.TimeoutError:
|
|
await self.close_async()
|
|
raise asyncio.TimeoutError("Timed out while waiting for tshark to parse packet. "
|
|
"Try rerunning with cap.set_debug() to see tshark errors. "
|
|
"Closing tshark..")
|
|
|
|
async def close_async(self):
|
|
self._current_tshark = None
|
|
await super(InMemCapture, self).close_async()
|
|
|
|
def feed_packet(self, binary_packet, linktype=LinkTypes.ETHERNET, timeout=DEFAULT_TIMEOUT):
|
|
"""
|
|
DEPRECATED. Use parse_packet instead.
|
|
This function adds the packet to the packets list, and also closes and reopens tshark for
|
|
each packet.
|
|
==============
|
|
|
|
Gets a binary (string) packet and parses & adds it to this capture.
|
|
Returns the added packet.
|
|
|
|
Use feed_packets if you have multiple packets to insert.
|
|
|
|
By default, assumes the packet is an ethernet packet. For another link type, supply the linktype argument (most
|
|
can be found in the class LinkTypes)
|
|
"""
|
|
warnings.warn(
|
|
"Deprecated method. Use InMemCapture.parse_packet() instead.")
|
|
self._current_linktype = linktype
|
|
pkt = self.parse_packet(binary_packet, timeout=timeout)
|
|
self.close()
|
|
self._packets.append(pkt)
|
|
return pkt
|
|
|
|
def feed_packets(self, binary_packets, linktype=LinkTypes.ETHERNET, timeout=DEFAULT_TIMEOUT):
|
|
"""Gets a list of binary packets, parses them using tshark and returns their parsed values.
|
|
|
|
Keeps the packets in the internal packet list as well.
|
|
|
|
By default, assumes the packets are ethernet packets. For another link type, supply the linktype argument (most
|
|
can be found in the class LinkTypes)
|
|
"""
|
|
self._current_linktype = linktype
|
|
parsed_packets = self.parse_packets(binary_packets, timeout=timeout)
|
|
self._packets.extend(parsed_packets)
|
|
self.close()
|
|
return parsed_packets
|