From 3a915dd278c46a5cd25380a3c58f853379d11972 Mon Sep 17 00:00:00 2001 From: Dongho Kim Date: Sun, 24 Nov 2024 22:41:48 +0900 Subject: [PATCH] first commit --- .gitignore | 13 + LICENSE.md | 13 + README.md | 141 ++++ aggregate.py | 82 +++ cert_config.txt | 10 + certs.sh | 59 ++ fetch_artifacts.py | 103 +++ implementations.json | 5 + implementations.py | 30 + interop.py | 1679 ++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 6 + result.py | 7 + run.py | 394 ++++++++++ testcases.py | 1072 +++++++++++++++++++++++++++ trace.py | 197 +++++ 15 files changed, 3811 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE.md create mode 100644 README.md create mode 100644 aggregate.py create mode 100644 cert_config.txt create mode 100755 certs.sh create mode 100644 fetch_artifacts.py create mode 100644 implementations.json create mode 100644 implementations.py create mode 100644 interop.py create mode 100644 requirements.txt create mode 100644 result.py create mode 100755 run.py create mode 100644 testcases.py create mode 100644 trace.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0dc3061 --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +certs/ +logs/ +logs_*/ +*.json +!implementations.json +!testbed/*.json +web/latest + +*.egg-info/ +__pycache__ +build/ +dist/ +out/ diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..0e5aa06 --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,13 @@ +Copyright 2019 Jana Iyengar, Marten Seemann + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..664fd4b --- /dev/null +++ b/README.md @@ -0,0 +1,141 @@ +# Interop Test Runner - ACN + +The Interop Test Runner aims to automatically generate an interop matrix by running multiple **test cases** using different QUIC implementations. +Note, this runner is an adaptation of the original implementation. +Instead of relying on docker-compose and simulating a network, it executes the server and client from a testbed management host on testbed machines or locally. + +The testbed mode is not important for you, but used by us to test your implementations on real hardware in later phases of the project. + +## Requirements +The Interop Runner is written in Python 3. You'll need to install a few Python modules to run it. A virtual environment would be optimal, especially when installing it on a testbed management host. + +```bash +pip3 install -r requirements.txt +``` + +* The client is given URLs including a hostname. To be able to resolve this hostname, the /etc/hosts file has to be updated. The hostname is "server". The IP address has to be set to 127.0.0.1 in local mode. + +- Optional: For several testcases which inspect packet traces you need to install development version of Wireshark (version 4.0.6 or newer). The installed version on your ACN VM already supports all tests. + +## Building a QUIC endpoint + +To include your QUIC implementations in the Interop Runner, three scripts are required: +* setup-env.sh +* run-client.sh +* run-server.sh + +Typically, a test case will require a server to serve files from a directory, and a client to download files. Different test cases will specify the behavior to be tested. For example, the Retry test case expects the server to use a Retry before accepting the connection from the client. +All configuration information from the test framework to your implementation is fed to the scripts run-client.sh and run-server.sh. +You can use them in your respective implementations as enviorenment variables or use the script to transform them into command line parameters. + +The test case is passed into your Docker container using the `TESTCASE` environment variable. If your implementation doesn't support a test case, it MUST exit with status code 127. This will allow us to add new test cases in the future, and correctly report test failures und successes, even if some implementations have not yet implented support for this new test case. + +After the transfer is completed, the client container is expected to exit with exit status 0. If an error occurred during the transfer, the client is expected to exit with exit status 1. +After completion of the test case, the Interop Runner will verify that the client downloaded the files it was expected to transfer, and that the file contents match. Additionally, for certain test cases, the Interop Runner will use the pcap of the transfer to verify that the implementations fulfilled the requirements of the test (for example, for the Retry test case, the pcap should show that a Retry packet was sent, and that the client used the Token provided in that packet). + +### Server Variables +The following variables will be given to the server and should be supported by your implementation +| Var | Description | +| -------- | -------- | +| SSLKEYLOGFILE | The variable contains the path + name of the keylog file. The output is required to decrypt traces and verify tests. The file has to be in the [NSS Key Log format](https://developer.mozilla.org/en-US/docs/Mozilla/Projects/NSS/Key_Log_Format) | +| QLOGDIR | qlog results are not required but might help to debug your output. However they have a negativ impact on performance so you might want to deactivate it for some tests| +| LOGS | It contains the path to a directory the server can use for its general logs. These will be uploaded as part of the results artifact. | +| TESTCASE | The name of the test case. You have to make sure a random string can be handled by your implementation. | +| WWW | It contains the directory that will contain one or more randomly generated files. Your server implementation is expected to run on the given port 443 and serve files from this directory. | +| CERTS | The runner will create an X.509 certificate and chain to be used by the server during the handshake. The variable contains the path to a directory that contains a priv.key and cert.pem file. | +| IP | The IP the server has to listen on. | +| PORT | The port the server has to listen on. | +| SERVERNAME | The servername a client might send using SNI. The name relates to the provided certificate and might be necessary for some QUIC implementations. | + +### Client Variables +The following variables will be given to the server and should be supported by your implementation +| Var | Description | +| -------- | -------- | +| SSLKEYLOGFILE | The variable contains the path + name of the keylog file. The output is required to decrypt traces and verify tests. The file has to be in the [NSS Key Log format](https://developer.mozilla.org/en-US/docs/Mozilla/Projects/NSS/Key_Log_Format). | +| QLOGDIR | qlog results are not required but might help to debug your output. However they have a negativ impact on performance so you might want to deactivate it for some tests | +| LOGS| It contains the path to a directory the client can use for its general logs. These will be uploaded as part of the results artifact. | +| TESTCASE | The name of the test case. You have to make sure a random string can be handled by your implementation. | +| DOWNLOADS | The directory is initially empty, and your client implementation is expected to store downloaded files into this directory. Served and downloaded files are compared to check the test. | +| REQUESTS | A space seperated list of requests a client should execute one by one. (e.g., https://server:4433/xyz) | + +### implementations.json +The implementations.json file contains a simple json with an object for each implementation. +Implementations as simply represented as a named object with a path variable. +The path should point to the folder containing the three required scripts. +Scripsts themselves should be able to execute at any location. Paths inside scripts (e.g., to your binaries) should be relative to the script. + +### ACN Example + +We offer an example implementation on the ACN material [website](https://acn.net.in.tum.de/). +It supports all required tests and can be used to test your implementations. + +### Logs + +To facilitate debugging, the Interop Runner saves the log files to the logs directory. + +Implementations that implement [qlog](https://github.com/quiclog/internet-drafts) should export the log files to the directory specified by the `QLOGDIR` environment variable. + +## Test cases + +The Interop Runner implements the following test cases. Unless noted otherwise, test cases use HTTP/3 for file transfers. More test cases will be added in the future, to test more protocol features. The name in parentheses is the value of the `TESTCASE` environment variable passed into your Docker container. + +* **Handshake** (`handshake`): +The client requests a single file and the server should serve the file. The test +is successful if there is exactly one QUIC handshake and no retries within the +packet trace. Additionally, the downloaded file must be equal to the file served +by the server. + +* **Transfer** (`transfer`): +The client needs to send multiple requests and download all files using a single +connection. All files have to match and only a single handshake should be visible +to pass the test. + +* **Multi Handshake** (`multihandshake`): +The client needs to send multiple requests and download all files using new +connections for each request. All files have to match and for each file, a +handshake needs to be visible to pass the test. + +* **Version Negotiation** (`versionnegotiation`): +Tests whether a server sends a valid version negotiation packet in response to +an unknown QUIC version number. The client should start a connection using an +unsupported version number (it can use a reserved version number to do so), and +has to abort the connection attempt when receiving the Version Negotiation packet. + +* **Transport Parameter** (`transportparameter`): +Tests whether the server is able to set an initial_max_streams_bidi value of < 11 +during the handshake. The client has to download all files with a single connection. + +* **Follow** (`follow`): +The client requests a single file from the server, which serves two files. While the +first file contains the path of the second file, the second file contains random data. +The client only receives one request but has to download both files by parsing the +content of the first file and constructing a second request by replacing the path with +the one retrieved. + +* **ChaCha20** (`chacha20`): +In this test, client and server are expected to offer only +`TLS_CHACHA20_POLY1305_SHA256` as a cipher suite. The client then downloads the files. + +* **Retry** (`retry`): +Tests that the server can generate a Retry, and that the client can act upon it +(i.e. use the Token provided in the Retry packet in the Initial packet). Only a +single handshake should be visible. + +* **Resumption** (`resumption`): +Tests QUIC session resumption (**without** 0-RTT). The client is expected to establish +a connection and download the first file (first value in the REQUESTS variable). +The server is expected to provide the client with a session ticket that allows it +to resume the connection. After downloading the first file, the client has to close +the first connection, establish a resumed connection using the session ticket, and +use this connection to download the remaining file(s). + +* **0-RTT** (`zerortt`): +Tests QUIC 0-RTT. The client is expected to establish a connection and download the +first file. The server is expected to provide the client with a session ticket that +allows the client to establish a 0-RTT connection on the next connection attempt. +After downloading the first file, the client has to close the first connection, +establish and request the remaining file(s) in 0-RTT. + +* **Multiplexing** (`multiplexing`): +Tests whether the server is able to set an `initial_max_streams_bidi` value of < 11 +during the handshake. The client has to download all files with a single connection. diff --git a/aggregate.py b/aggregate.py new file mode 100644 index 0000000..9fa5639 --- /dev/null +++ b/aggregate.py @@ -0,0 +1,82 @@ +import argparse +import json +import sys +import glob +import os + +from implementations import IMPLEMENTATIONS + + +def get_args(): + parser = argparse.ArgumentParser() + parser.add_argument( + "-s", "--server", help="server implementations (comma-separated)", default=','.join(IMPLEMENTATIONS.keys()) + ) + parser.add_argument( + "-c", "--client", help="client implementations (comma-separated)", default=','.join(IMPLEMENTATIONS.keys()) + ) + parser.add_argument("-l", "--log-dir", help="results directory. In this directory we will search for the logs recursively", default='.') + parser.add_argument("-o", "--output", help="output file (stdout if not set)") + return parser.parse_args() + + +STAR_TIME = None + +servers = get_args().server.split(",") +clients = get_args().client.split(",") +result = { + "servers": servers, + "clients": clients, + "log_dir": get_args().log_dir, + "results": [], + "measurements": [], + "tests": {}, + "urls": {}, +} + + +def parse(server: str, client: str, cat: str): + filename = server + "_" + client + "_" + cat + ".json" + + files = glob.glob(os.path.join(get_args().log_dir, "**", filename), recursive=True) + if len(files) > 0: + with open(files[0]) as f: + data = json.load(f) + else: + print("Warning: Couldn't open file " + filename) + result[cat].append([]) + return + parse_data(server, client, cat, data) + + +def parse_data(server: str, client: str, cat: str, data: object): + if len(data["servers"]) != 1: + sys.exit("expected exactly one server") + if data["servers"][0] != server: + sys.exit("inconsistent server") + if len(data["clients"]) != 1: + sys.exit("expected exactly one client") + if data["clients"][0] != client: + sys.exit("inconsistent client") + if "end_time" not in result or data["end_time"] > result["end_time"]: + result["end_time"] = data["end_time"] + if "start_time" not in result or data["start_time"] < result["start_time"]: + result["start_time"] = data["start_time"] + result[cat].append(data[cat][0]) + result["quic_draft"] = data["quic_draft"] + result["quic_version"] = data["quic_version"] + #result["urls"].update(data["urls"]) + result["tests"].update(data["tests"]) + + +for client in clients: + for server in servers: + parse(server, client, "results") + parse(server, client, "measurements") + +if get_args().output: + f = open(get_args().output, "w") + json.dump(result, f) + f.close() +else: + print(json.dumps(result)) \ No newline at end of file diff --git a/cert_config.txt b/cert_config.txt new file mode 100644 index 0000000..9ef33b9 --- /dev/null +++ b/cert_config.txt @@ -0,0 +1,10 @@ +[ req ] +distinguished_name = req_distinguished_name +x509_extensions = v3_ca +dirstring_type = nobmp +[ req_distinguished_name ] +[ v3_ca ] +keyUsage=critical, keyCertSign +subjectKeyIdentifier=hash +authorityKeyIdentifier=keyid:always,issuer:always +basicConstraints=critical,CA:TRUE,pathlen:100 diff --git a/certs.sh b/certs.sh new file mode 100755 index 0000000..b26b2f8 --- /dev/null +++ b/certs.sh @@ -0,0 +1,59 @@ +#!/bin/bash + +set -e + +if [ -z "$1" ] || [ -z "$2" ] ; then + echo "$0 " + exit 1 +fi + +CERTDIR=$1 +CHAINLEN=$2 + +mkdir -p $CERTDIR || true + +# Generate Root CA and certificate +openssl req -x509 -sha256 -nodes -days 365 -newkey rsa:2048 \ + -keyout $CERTDIR/ca_0.key -out $CERTDIR/cert_0.pem \ + -subj "/O=interop runner Root Certificate Authority/" \ + -config cert_config.txt \ + -extensions v3_ca \ + 2> /dev/null + +for i in $(seq 1 $CHAINLEN); do + # Generate a CSR + SUBJ="interop runner intermediate $i" + if [[ $i == $CHAINLEN ]]; then + SUBJ="interop runner leaf" + fi + openssl req -out $CERTDIR/cert.csr -new -newkey rsa:2048 -nodes -keyout $CERTDIR/ca_$i.key \ + -subj "/O=$SUBJ/" \ + 2> /dev/null + + # Sign the certificate + j=$(($i-1)) + if [[ $i < $CHAINLEN ]]; then + openssl x509 -req -sha256 -days 365 -in $CERTDIR/cert.csr -out $CERTDIR/cert_$i.pem \ + -CA $CERTDIR/cert_$j.pem -CAkey $CERTDIR/ca_$j.key -CAcreateserial \ + -extfile cert_config.txt \ + -extensions v3_ca \ + 2> /dev/null + else + openssl x509 -req -sha256 -days 365 -in $CERTDIR/cert.csr -out $CERTDIR/cert_$i.pem \ + -CA $CERTDIR/cert_$j.pem -CAkey $CERTDIR/ca_$j.key -CAcreateserial \ + -extfile <(printf "subjectAltName=DNS:server,DNS:server4,DNS:server6,DNS:server46") \ + 2> /dev/null + fi +done + +mv $CERTDIR/cert_0.pem $CERTDIR/ca.pem +cp $CERTDIR/ca_$CHAINLEN.key $CERTDIR/priv.key + +# combine certificates +for i in $(seq $CHAINLEN -1 1); do + cat $CERTDIR/cert_$i.pem >> $CERTDIR/cert.pem + rm $CERTDIR/cert_$i.pem $CERTDIR/ca_$i.key +done +rm $CERTDIR/*.srl $CERTDIR/ca_0.key $CERTDIR/cert.csr + + diff --git a/fetch_artifacts.py b/fetch_artifacts.py new file mode 100644 index 0000000..87fd76c --- /dev/null +++ b/fetch_artifacts.py @@ -0,0 +1,103 @@ +import argparse +import os +import sys +import gitlab +import zipfile +import io +from termcolor import colored +import logging + +from implementations import IMPLEMENTATIONS + +logging.basicConfig( + format='%(asctime)s %(levelname)s %(message)s', + datefmt='%m-%d %H:%M:%S' +) + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + + +class ZipFileWithPermissions(zipfile.ZipFile): + """ Custom ZipFile class handling file permissions. + From https://stackoverflow.com/a/54748564 """ + def _extract_member(self, member, targetpath, pwd): + if not isinstance(member, zipfile.ZipInfo): + member = self.getinfo(member) + + targetpath = super()._extract_member(member, targetpath, pwd) + + attr = member.external_attr >> 16 + if attr != 0: + os.chmod(targetpath, attr) + return targetpath + + +def main(args): + GITLAB_TOKEN = os.getenv('GITLAB_TOKEN') + CI_JOB_TOKEN = os.getenv('CI_JOB_TOKEN') + gitlab_url = 'https://gitlab.lrz.de' + + if GITLAB_TOKEN: + logger.info('Using GITLAB_TOKEN') + gl = gitlab.Gitlab(gitlab_url, private_token=GITLAB_TOKEN) + elif CI_JOB_TOKEN: + logger.info('Using CI_JOB_TOKEN') + gl = gitlab.Gitlab(gitlab_url, job_token=os.environ['CI_JOB_TOKEN']) + else: + logger.error('Set GITLAB_TOKEN or CI_JOB_TOKEN') + exit(1) + + implementations = {} + if args.implementations: + for s in args.implementations: + if s not in [n for n, _ in IMPLEMENTATIONS.items()]: + sys.exit("implementation " + s + " not found.") + implementations[s] = IMPLEMENTATIONS[s] + else: + implementations = IMPLEMENTATIONS + + successful = 0 + errors = 0 + + for name, value in implementations.items(): + project_id = value.get("project_id") + + if not project_id: + logger.info(colored(f'{name}: no Gitlab project id specified, skipping.', 'yellow')) + continue + + outpath = os.path.join(args.output_directory, name) + os.makedirs(outpath, exist_ok=True) + + # Get project + project = gl.projects.get(project_id, lazy=True) + + # Get branch, use main if not set + ref = value.get('branch', 'main') + + # Get latest build artifact and extract + try: + for job in project.jobs.list(all=True): + if job.ref == ref and job.name == 'build' and job.status == 'success': + artifacts = job.artifact(path='/artifact.zip') + ZipFileWithPermissions(io.BytesIO(artifacts)).extractall(path=outpath) + logger.info(colored(f'{name}: artifacts pulled successfully for {ref}.', 'green')) + successful += 1 + break + except gitlab.exceptions.GitlabGetError: + logger.info(colored(f'{name}: failed to pull artifacts.', 'red')) + errors += 1 + except zipfile.BadZipFile: + logger.info(colored(f'{name}: failed to pull artifacts.', 'red')) + errors += 1 + logger.info(f'{successful}/{successful + errors} artifacts downloaded.') + + +if __name__ == "__main__": + + parser = argparse.ArgumentParser() + parser.add_argument("-i", "--implementations", help="implementations to pull", nargs='*') + parser.add_argument("-o", "--output_directory", help="write output to this directory", default='out') + args = parser.parse_args() + main(args) diff --git a/implementations.json b/implementations.json new file mode 100644 index 0000000..37abffb --- /dev/null +++ b/implementations.json @@ -0,0 +1,5 @@ +{ + "name": { + "path": "/dev/null" + } +} diff --git a/implementations.py b/implementations.py new file mode 100644 index 0000000..2af625a --- /dev/null +++ b/implementations.py @@ -0,0 +1,30 @@ +import json +import re +from enum import Enum + +IMPLEMENTATIONS = {} + + +class Role(Enum): + BOTH = "both" + SERVER = "server" + CLIENT = "client" + + +def parse_filesize(input: str, default_unit="B"): + units = {"B": 1, "KB": 10 ** 3, "MB": 10 ** 6, "GB": 10 ** 9, "TB": 10 ** 12, + "KiB": 2 ** 10, "MiB": 2 ** 20, "GiB": 2 ** 30, "TiB": 2 ** 40} + m = re.match(fr'^(\d+(?:\.\d+)?)\s*({"|".join(units.keys())})?$', input) + units[None] = units[default_unit] + if m: + number, unit = m.groups() + return int(float(number) * units[unit]) + raise ValueError("Invalid file size") + + +with open("implementations.json", "r") as f: + data = json.load(f) + for name, val in data.items(): + if 'max_filesize' in val.keys(): + val['max_filesize'] = parse_filesize(val['max_filesize']) + IMPLEMENTATIONS[name] = val diff --git a/interop.py b/interop.py new file mode 100644 index 0000000..37ce82e --- /dev/null +++ b/interop.py @@ -0,0 +1,1679 @@ +import json +import logging +import os +import pathlib +import random +import re +import shutil +import statistics +import string +import subprocess +import sys +import tempfile +import time +import traceback +from datetime import datetime +from typing import Callable, List, Tuple + +import prettytable +import psutil +import venv +from termcolor import colored + +import testcases +from result import TestResult +from testcases import Perspective + + +TESTBED_ENABLED = True +try: + from poslib import api as pos + from poslib import restapi +except: + TESTBED_ENABLED = False + + +def on_terminate(proc): + logging.debug("process {} terminated with exit code {}".format(proc, proc.returncode)) + + +def random_string(length: int): + """ Generate a random string of fixed length """ + letters = string.ascii_lowercase + return "".join(random.choice(letters) for i in range(length)) + + +def kill(proc_pid): + process = psutil.Process(proc_pid) + procs = process.children(recursive=True) + for proc in procs: + proc.terminate() + gone, alive = psutil.wait_procs(procs, timeout=3, callback=on_terminate) + for p in alive: + p.kill() + process.terminate() + + +class MeasurementResult: + result = TestResult + details = str + all_infos: [float] = [] + + +class LogFileFormatter(logging.Formatter): + def format(self, record): + msg = super(LogFileFormatter, self).format(record) + # remove color control characters + return re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]").sub("", msg) + + +class LogConsoleFormatter(logging.Formatter): + pass + + +def log_process(stdout, stderr, name): + if stdout: + logging.debug(colored(f"{name} stdout", 'yellow')) + logging.debug(stdout.decode("utf-8")) + if stderr: + logging.debug(colored(f"{name} stderr", 'yellow')) + logging.debug(stderr.decode("utf-8")) + + +class InteropRunner: + _start_time = 0 + test_results = {} + measurement_results = {} + compliant = { + 'server': {}, + 'client': {} + } + testcase_is_unsupported = { + 'server': {}, + 'client': {} + } + _prepared_envs = { + 'server': [], + 'client': [] + } + _implementations = {} + _servers = [] + _clients = [] + _tests = [] + _measurements = [] + _output = "" + _log_dir = "" + _save_files = False + _venv_dir = "" + _testbed = None + _bandwidth = None + _server_pre_scripts = [] + _server_pre_hot_scripts = [] + _server_post_hot_scripts = [] + _server_post_scripts = [] + _client_pre_scripts = [] + _client_pre_hot_scripts = [] + _client_post_hot_scripts = [] + _client_post_scripts = [] + _client_implementation_params = {} + _server_implementation_params = {} + _disable_server_aes_offload = False + _disable_client_aes_offload = False + _args = {} + _delay = None + _reorder_packets = [] + _corruption = None + _loss = None + _use_v6 = False + + def __init__( + self, + client_implementation_params, + server_implementation_params, + implementations: dict, + servers: List[str], + clients: List[str], + tests: List[testcases.TestCase], + measurements: List[testcases.Measurement], + output: str, + debug: bool, + manual_mode: bool, + reorder_packets: List[str]=[], + save_files=False, + log_dir="", + venv_dir="", + testbed=None, + bandwidth: str=None, + implementations_directory: str='.', + delay: str=None, + corruption: str=None, + loss: str=None, + server_pre_scripts: [str]=[], + server_pre_hot_scripts: [str]=[], + server_post_hot_scripts: [str]=[], + server_post_scripts: [str]=[], + client_pre_scripts: [str]=[], + client_pre_hot_scripts: [str]=[], + client_post_hot_scripts: [str]=[], + client_post_scripts: [str]=[], + disable_server_aes_offload = False, + disable_client_aes_offload = False, + continue_on_error=False, + use_client_timestamps=False, + only_same_implementation=False, + use_v6: bool = False, + args={} + ): + self.logger = logging.getLogger() + self.logger.setLevel(logging.DEBUG) + console = logging.StreamHandler(stream=sys.stderr) + formatter = LogConsoleFormatter("%(message)s") + console.setFormatter(formatter) + if debug: + console.setLevel(logging.DEBUG) + else: + console.setLevel(logging.INFO) + self._manual_mode = manual_mode + self.logger.addHandler(console) + self._start_time = datetime.now() + self._tests = tests + self._measurements = measurements + self._servers = servers + self._clients = clients + self._implementations = implementations + self._implementations_directory = implementations_directory + self._output = output + self._log_dir = log_dir + self._save_files = save_files + self._venv_dir = venv_dir + self._testbed = testbed + self._use_v6 = use_v6 + self._testbed_server_pci_id = None + self._testbed_server_mac = None + self._testbed_client_pci_id = None + self.logger.debug(f"Running {self._testbed} {bool(self._testbed)}") + if self._testbed: + with open(self._testbed) as f: + data = json.load(f) + self._testbed_server = data['server']['host'] + self._testbed_server_ip = data['server']['ip'] + if self._use_v6: + self.logger.debug(f"RUNNING WiTH IPv6 {data['server']['ipv6']}") + self._testbed_server_ip = data['server']['ipv6'] # formatting to correctly bracket it + server_interface = data['server']['interface'] + self._testbed_server_interface = server_interface['name'] + if 'pci_id' in server_interface: + self._testbed_server_pci_id = server_interface['pci_id'] + if 'mac' in server_interface: + self._testbed_server_mac = server_interface['mac'] + self._testbed_client = data['client']['host'] + self._testbed_client_ip = data['client']['ip'] + client_interface = data['client']['interface'] + self._testbed_client_interface = client_interface['name'] + if 'pci_id' in client_interface: + self._testbed_client_pci_id = client_interface['pci_id'] + self.logger.debug(f"Running in testbed mode") + self.logger.debug(f"Server: {self._testbed_server} Client: {self._testbed_client}") + self._testbed = True + + if len(self._venv_dir) == 0: + self._venv_dir = "/tmp" + if len(self._log_dir) == 0: + self._log_dir = "logs_{:%Y-%m-%dT%H:%M:%S}".format(self._start_time) + if os.path.exists(self._log_dir): + sys.exit("Log dir " + self._log_dir + " already exists.") + for server in servers: + self.test_results[server] = {} + self.measurement_results[server] = {} + for client in clients: + self.test_results[server][client] = {} + for test in self._tests: + self.test_results[server][client][test] = {} + self.measurement_results[server][client] = {} + for measurement in measurements: + self.measurement_results[server][client][measurement] = {} + # tc settings + self._bandwidth = bandwidth + # Parse other Measurement settings + self._delay = delay + self._reorder_packets = reorder_packets + self._corruption = corruption + self._loss = loss + # Parse Pre and Postscripts of server and client + self._server_pre_scripts = server_pre_scripts + self._server_pre_hot_scripts = server_pre_hot_scripts + self._server_post_hot_scripts = server_post_hot_scripts + self._server_post_scripts = server_post_scripts + self._client_pre_scripts = client_pre_scripts + self._client_pre_hot_scripts = client_pre_hot_scripts + self._client_post_hot_scripts = client_post_hot_scripts + self._client_post_scripts = client_post_scripts + # implementation parameters + self._client_implementation_params = client_implementation_params + self._server_implementation_params = server_implementation_params + # AES offload + self._disable_server_aes_offload = disable_server_aes_offload + self._disable_client_aes_offload = disable_client_aes_offload + self._continue_on_error = continue_on_error + self._use_client_timestamps = use_client_timestamps + self._only_same_implementation = only_same_implementation + self._args = args + + def _set_variables_with_pos_on_machine(self, host: str, dictionary: dict): + """loads the variables and their value given in as a dict on to the + host using pos allocations. + These can later be used on the host using + pos command scripts. + """ + self.logger.debug(f"Setting the pos variables:\n{dictionary}\non the host {host}") + # Create a temporary json file to store the dict in, + # as pos currently only allows to read vars using a data file. + + tmp_file = tempfile.NamedTemporaryFile(dir="/tmp", prefix="interop-temp-pos-data-", mode="w+") + json.dump(dictionary, tmp_file, ensure_ascii=False, indent=4) + tmp_file.flush() + tmp_file.seek(0) + pos.allocations.set_variables( + allocation=host, + datafile=tmp_file, + extension="json", + as_global=None, + as_loop=None, + print_variables=None + ) + tmp_file.close() + return + + def _run_script_with_pos_on_machine(self, host: str, script_path: str): + """Execute a given script on the given machine using pos. + """ + self.logger.debug(f'Run {script_path} on {host}') + with open(script_path, "r") as f: + pos.commands.launch( + node=host, + infile=f, + blocking=True + ) + return + + def _is_client_or_server_solomode(self, client, server) -> bool: + if "solo" in self._implementations[client].keys(): + return self._implementations[client]["solo"] + elif "solo" in self._implementations[server].keys(): + return self._implementations[server]["solo"] + else: + return False + + @staticmethod + def _get_node_image() -> str: + p = subprocess.Popen( + "STR=$(grep IMAGE= setup-hosts.sh);" + + "echo ${STR#*=}", + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + stdout, stderr = p.communicate(timeout=10) + return stdout.decode("utf-8").rstrip("\n").strip("\"") + + @staticmethod + def _get_commit_hash() -> str: + p = subprocess.Popen( + "git rev-parse HEAD", + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + stdout, stderr = p.communicate(timeout=10) + return stdout.decode("utf-8").rstrip("\n") + + def _remove_server_bandwidth_limit(self): + self.logger.debug(f'Removing server bandwidth limit') + pos.commands.launch( + node=self._testbed_server, + command=[ + 'tc', 'qdisc', 'del', 'dev', + self._testbed_server_interface, 'root' + ], + blocking=True, + ) + return + + def _set_server_bandwidth_limit(self, bandwidth: str): + self.logger.debug(f'Limiting bandwidth on server {self._testbed_server} to {self._bandwidth}') + pos.commands.launch( + node=self._testbed_server, + command=[ + 'tc', 'qdisc', 'add', 'dev', self._testbed_server_interface, + 'root', 'tbf', 'rate', bandwidth, 'latency', '50ms', 'burst', + '1540' + ], + blocking=True, + ) + return + + def _remove_client_network_emulation_incoming(self): + self.logger.debug(f'Removing client network emulation on {self._testbed_client}') + # remove ifb + pos.commands.launch( + node=self._testbed_client, + command=[ + 'modprobe', '-r', 'ifb' + ], + blocking=True, + ) + # remove ingress qdisc + pos.commands.launch( + node=self._testbed_client, + command=[ + 'tc', 'qdisc', 'del', 'dev', self._testbed_client_interface, 'ingress' + ], + blocking=True, + ) + return + + def build_netem_command(self, interface, delay, reorder_packets, corruption, loss): + command: List[str] = ['tc', 'qdisc', 'add', 'dev', interface, 'root', 'netem', 'limit', '100000'] + + if delay is not None: + command.append('delay') + command.append(delay) + + if reorder_packets: + command.append('reorder') + command.append(reorder_packets[0]) + command.append(reorder_packets[1]) + + if corruption is not None: + command.append('corrupt') + command.append(corruption) + + if loss is not None: + command.append('loss') + command.append(loss) + return command + + def _set_client_network_emulation_incoming(self, delay: str, reorder_packets: List[str], corruption: str, loss: str): + """ Sets a queueing discipline for ingress traffic on client using an Intermediate Functional Block (IFB) + pseudo-device and tc. Can set delay, packet reordering, bit corruption and/or packet loss. + """ + + # Create IFB + pos.commands.launch( + node=self._testbed_client, + command=[ + 'modprobe', 'ifb', 'numifbs=1' + ], + blocking=True, + ) + pos.commands.launch( + node=self._testbed_client, + command=[ + 'ip', 'link', 'set', 'dev', 'ifb0', 'up' + ], + blocking=True, + ) + + # Redirect ingress traffic from the client interface to the IFB device + pos.commands.launch( + node=self._testbed_client, + command=[ + 'tc', 'qdisc', 'add', 'dev', self._testbed_client_interface, 'ingress' + ], + blocking=True, + ) + pos.commands.launch( + node=self._testbed_client, + command=[ + 'tc', 'filter', 'add', 'dev', self._testbed_client_interface, 'parent', 'ffff:', 'protocol', 'ip', + 'u32', 'match', 'u32', '0', '0', 'flowid', '1:1', 'action', 'mirred', 'egress', 'redirect', 'dev', + 'ifb0' + ], + blocking=True, + ) + + command = self.build_netem_command('ifb0', delay, reorder_packets, corruption, loss) + + self.logger.debug(f"Adding netem in client {self._testbed_client} on ingress traffic" + f" on interface {self._testbed_client_interface} " + f"with the following command: {command}") + pos.commands.launch( + node=self._testbed_client, + command=command, + blocking=True, + ) + return + + def _set_client_network_emulation_outgoing(self, delay: str, reorder_packets: List[str], corruption: str, loss: str): + + command = self.build_netem_command(self._testbed_client_interface, delay, reorder_packets, corruption, loss) + + self.logger.debug(f"Adding netem in client {self._testbed_client} on egress traffic" + f" on interface {self._testbed_client_interface} " + f"with the following command: {command}") + pos.commands.launch( + node=self._testbed_client, + command=command, + blocking=True, + ) + return + + def _remove_client_network_emulation_outgoing(self): + self.logger.debug(f'Removing client network emulation on {self._testbed_client}') + + # remove ingress qdisc + pos.commands.launch( + node=self._testbed_client, + command=[ + 'tc', 'qdisc', 'del', 'dev', self._testbed_client_interface, 'root' + ], + blocking=True, + ) + return + + def _create_venv_on_remote_host(self, host, venv_dir_path): + self.logger.debug(f"Venv Setup: Creating venv on host {host} at \ + {venv_dir_path}") + pos.commands.launch( + node=host, + command=[ + 'python3', '-m', 'venv', venv_dir_path + ], + blocking=True, + ) + return + + def _does_remote_file_exist(self, host, path): + self.logger.debug( + f"Checking if file {path} exists") + try: + pos.commands.launch( + node=host, + command=[ + 'test', '-f', path + ], + blocking=True, + ) + return True + except restapi.RESTError: + return False + + def _get_venv(self, name, role): + """Creates the venv directory for the specified role either locally or + copied to the host in testbed mode should it not exist. + + Return: the path of the + with a prepended bash 'source' command. + """ + venv_dir = os.path.join(self._venv_dir, name + "-" + role) + venv_activate = os.path.join(venv_dir, "bin/activate") + + if self._testbed: + host = self._testbed_server if role == 'server' else self._testbed_client + if not self._does_remote_file_exist(host, venv_activate): + self._create_venv_on_remote_host(host, venv_dir) + else: + if not os.path.exists(venv_activate): + self.logger.debug(f'Create venv: {venv_dir}') + venv.create(venv_dir, with_pip=True) + return ". " + venv_activate + + def get_implementation_version(self, name): + try: + with open(os.path.join(self._implementations_directory, self._implementations[name]['path'], 'VERSION')) as f: + version = f.readline().rstrip('\n') + return version + except Exception as e: + self.logger.error(f'Failed to get version of {name}: {e}') + return None + + def _copy_implementations(self): + """ + Copies the implementations to the remote hosts, if in testbed mode. + """ + if self._testbed: + for client in self._clients: + self._push_directory_to_remote( + self._testbed_client, + os.path.join( + self._implementations_directory, + self._implementations[client]['path'], + '' # This prevents that rsync copies the directory into itself, adds trailing slash + ), + self._implementations[client]['path'], + normalize=False + ) + for server in self._servers: + self._push_directory_to_remote( + self._testbed_server, + os.path.join( + self._implementations_directory, + self._implementations[server]['path'], + '' + ), + self._implementations[server]['path'], + normalize=False + ) + + def _push_directory_to_remote(self, host, src, dst=None, normalize=True): + """Copies a directory from the machine it is executed on + (management host) to a given host to path using rsync. + """ + if normalize: + src = os.path.normpath(src) + + if not dst: + dst = str(pathlib.Path(src).parent) + self.logger.debug(f"Copy {src} to {host}:{dst}") + + #api.nodes.copy(host, src, dst, recursive=True) + cmd = f'rsync -r {src} {host}:{dst}' + p = subprocess.Popen( + cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + try: + # large timeout as copied file could be very large + p.wait(2000) + except subprocess.TimeoutExpired: + self.logger.debug( + f'Timeout when moving files {src} to host {host}') + return + + + def _pull_directory_from_remote(self, host, src, dst=None): + src = os.path.normpath(src) + if not dst: + dst = str(pathlib.Path(src).parent) + self.logger.debug(f"Copy {host}:{src} to {dst}") + + #api.commands.launch(host, f'pos_upload {src} -o {dst}') + cmd = f'rsync -r {host}:{src} {dst}' + p = subprocess.Popen( + cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + try: + # large timeout as copied file could be very large + p.wait(2000) + except subprocess.TimeoutExpired: + self.logger.debug( + f'Timeout when copying files {src} from host {host}') + return + + def _delete_remote_directory(self, host, directory): + cmd = f'ssh {host} "rm -rf {directory}"' + self.logger.debug(f"Deleting {host}:{directory}") + + #api.commands.launch(host, f'rm -rf {directory}') + setup_env = subprocess.Popen( + cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + def _add_remote_shell_config(self, host, prep_cmds): + """ Add shell environment configuration to a remote host. Useful when executing tests manually via SSH. """ + cmd = f'ssh {host} \'echo "{prep_cmds}" >> ~/.profile-interop;' \ + f'grep -qxF ". ~/.profile-interop" .profile || echo ". ~/.profile-interop" >> .profile\'' + self.logger.debug(f'Adding commands "{prep_cmds}" to the shell startup script of host {host}') + + p = subprocess.Popen( + cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + p.wait() + + def _remove_remote_shell_config(self, host): + """ Remove shell environment configuration added via _add_remote_shell_config to a remote host. """ + cmd = f'ssh {host} \'echo "" > ~/.profile-interop\'' + self.logger.debug(f'Removing interop config from the shell startup script of host {host}') + + p = subprocess.Popen( + cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + p.wait() + + def _is_unsupported(self, lines: List[str]) -> bool: + return any("exited with code 127" in str(line) for line in lines) or any( + "exit status 127" in str(line) for line in lines + ) + + def _check_impl_is_compliant(self, name: str, role: str) -> bool: + """ check if an implementation return UNSUPPORTED for unknown test cases """ + assert role in ['client', 'server'] + + if name in self.compliant[role]: + self.logger.debug( + f"{name} ({role}) already tested for compliance: {str(self.compliant[role][name])}" + ) + return self.compliant[role][name] + + log_dir = tempfile.TemporaryDirectory(dir="/tmp", prefix="logs_") + www_dir = tempfile.TemporaryDirectory(dir="/tmp", prefix="compliance_www_") + certs_dir = tempfile.TemporaryDirectory(dir="/tmp", prefix="compliance_certs_") + downloads_dir = tempfile.TemporaryDirectory( + dir="/tmp", prefix="compliance_downloads_" + ) + + run_script = self._implementations[name]['path'] + f"/run-{role}.sh" + + # Check if runscript.sh exists + if self._testbed: + host = self._testbed_server if role == 'server' else self._testbed_client + if not self._does_remote_file_exist(host, run_script): + self.logger.error(colored(f"{run_script} does not exist", 'red')) + self.compliant[role][name] = False + return False + else: + run_script = os.path.join(self._implementations_directory, run_script) + if not os.path.isfile(run_script): + self.logger.error(colored(f"{run_script} does not exist", 'red')) + self.compliant[role][name] = False + return False + + testcases.generate_cert_chain(certs_dir.name) + + if self._testbed: + for dir in [log_dir.name, www_dir.name, certs_dir.name, downloads_dir.name]: + self._push_directory_to_remote( + self._testbed_server if role == 'server' else self._testbed_client, + dir + ) + + venv_script = self._get_venv(name, role) + + params = " ".join([ + f"TESTCASE={random_string(6)}", + f"DOWNLOADS={downloads_dir.name}", + f"LOGS={log_dir.name}", + f"QLOGDIR={log_dir.name}", + f"SSLKEYLOGFILE={log_dir.name}/keys.log", + f"IP=localhost", + f"PORT=4433", + f"CERTS={certs_dir.name}", + f"WWW={www_dir.name}", + ]) + cmd = f"{venv_script}; {params} ./run-{role}.sh" + + if self._testbed: + cmd = f'ssh {self._testbed_server if role == "server" else self._testbed_client} \'cd {self._implementations[name]["path"]}; {cmd}\'' + + self.logger.debug(cmd) + + # Do not set cwd for testbed mode + if self._testbed: + proc = subprocess.Popen( + cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT + ) + else: + proc = subprocess.Popen( + cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=os.path.join(self._implementations_directory, self._implementations[name]['path']) + ) + + try: + stdout, stderr = proc.communicate(timeout=10) + except subprocess.TimeoutExpired: + self.logger.error(colored(f"{name} {role} compliance check timeout", 'red')) + self.logger.error(colored(f"{name} {role} not compliant", 'red')) + self.compliant[role][name] = False + return False + finally: + if self._testbed: + host = self._testbed_server if role == "server" else self._testbed_client + self._delete_remote_directory(host, log_dir.name) + self._delete_remote_directory(host, www_dir.name) + self._delete_remote_directory(host, certs_dir.name) + self._delete_remote_directory(host, downloads_dir.name) + + if not proc.returncode == 127 and not self._is_unsupported(stdout.decode("utf-8").splitlines()): + self.logger.error(colored(f"{name} {role} not compliant", 'red')) + self.logger.debug("%s", stdout.decode("utf-8")) + self.compliant[role][name] = False + return False + self.logger.debug(f"{name} {role} compliant.") + + # remember compliance test outcome + self.compliant[role][name] = True + return True + + def _check_test_is_unsupported(self, name: str, role: str, testcase: str) -> bool: + """ check if a testcase returns UNSUPPORTED """ + return testcase in self.testcase_is_unsupported[role].get(name, []) + + def _set_testcase_unsupported(self, name: str, role: str, testcase: str) -> bool: + if not name in self.testcase_is_unsupported[role].keys(): + self.testcase_is_unsupported[role][name] = [] + self.testcase_is_unsupported[role][name].append(testcase) + + def _print_results(self): + """print the interop table""" + self.logger.info("\n\nRun took %s", datetime.now() - self._start_time) + + def get_letters(result): + return ",".join( + [test.abbreviation() for test in cell if cell[test] is result] + ) + + if len(self._tests) > 0: + t = prettytable.PrettyTable() + t.hrules = prettytable.ALL + t.vrules = prettytable.ALL + t.field_names = [""] + [name for name in self._servers] + for client in self._clients: + row = [client] + for server in self._servers: + cell = self.test_results[server][client] + res = colored(get_letters(TestResult.SUCCEEDED), "green") + "\n" + res += colored(get_letters(TestResult.UNSUPPORTED), "yellow") + "\n" + res += colored(get_letters(TestResult.FAILED), "red") + row += [res] + t.add_row(row) + print("\n\u2193clients/servers\u2192") + print(t) + + if len(self._measurements) > 0: + t = prettytable.PrettyTable() + t.hrules = prettytable.ALL + t.vrules = prettytable.ALL + t.field_names = [""] + [name for name in self._servers] + for client in self._clients: + row = [client] + for server in self._servers: + cell = self.measurement_results[server][client] + results = [] + for measurement in self._measurements: + res = cell[measurement] + if not hasattr(res, "result"): + continue + if res.result == TestResult.SUCCEEDED: + results.append( + colored( + measurement.abbreviation() + ": " + res.details, + "green", + ) + ) + elif res.result == TestResult.UNSUPPORTED: + results.append(colored(measurement.abbreviation(), "yellow")) + elif res.result == TestResult.FAILED: + results.append(colored(measurement.abbreviation(), "red")) + row += ["\n".join(results)] + t.add_row(row) + print(t) + + def _export_results(self): + if not self._output: + self._output = os.path.join(self._log_dir, 'result.json') + if not os.path.exists(self._log_dir): + os.makedirs(self._log_dir) + + self.logger.info(f'Exporting results to {self._output}') + + out = { + "interop_commit_hash": self._get_commit_hash(), + "interop_start_time_unix_timestamp": self._start_time.timestamp(), + "interop_end_time_unix_timestamp": datetime.now().timestamp(), + "log_dir": self._log_dir, + "server_node_name": self._testbed_server if self._testbed else None, + "client_node_name": self._testbed_client if self._testbed else None, + "node_image": self._get_node_image()if self._testbed else None, + "server_implementations": {name: self.get_implementation_version(name) for name in self._servers}, + "client_implementations": {name: self.get_implementation_version(name) for name in self._clients}, + "bandwidth_limit": str(self._bandwidth), + "delay": str(self._delay), + "loss": str(self._loss), + "reorder_packets": str(self._reorder_packets), + "corruption": str(self._corruption), + "tests": { + x.abbreviation(): { + "name": x.name(), + "desc": x.desc(), + } + for x in self._tests + self._measurements + }, + "quic_draft": testcases.QUIC_DRAFT, + "quic_version": testcases.QUIC_VERSION, + "results": [], + "measurements": [], + "args": self._args, + } + + for client in self._clients: + for server in self._servers: + results = [] + for test in self._tests: + r = None + if hasattr(self.test_results[server][client][test], "value"): + r = self.test_results[server][client][test].value + results.append( + { + "abbr": test.abbreviation(), + "name": test.name(), # TODO: remove + "result": r, + } + ) + out["results"].append(results) + + measurements = [] + for measurement in self._measurements: + res = self.measurement_results[server][client][measurement] + if not hasattr(res, "result"): + continue + measurements.append( + { + "name": measurement.name(), # TODO: remove + "abbr": measurement.abbreviation(), + "filesize": min(measurement.FILESIZE, self._get_max_filesize(client, server)) if self._get_max_filesize(client, server) is not None else measurement.FILESIZE, + "result": res.result.value, + "average": res.details, + "details": res.all_infos, + "server": server, + "client": client, + } + ) + out["measurements"].append(measurements) + + f = open(self._output, "w") + json.dump(out, f) + f.close() + + # Copy server and client pre- and postscripts into logdir root + all_scripts = [ + ("spre", self._server_pre_scripts), + ("spost", self._server_post_scripts), + ("cpre", self._client_pre_scripts), + ("cpost", self._client_post_scripts), + ("sprehot", self._server_pre_hot_scripts), + ("sposthot", self._server_post_hot_scripts), + ("cprehot", self._client_pre_hot_scripts), + ("cposthot", self._client_post_hot_scripts) + ] + + for prefix, script_list in all_scripts: + for script in script_list: + shutil.copyfile(script, os.path.join(self._log_dir, prefix + "_" + script.split("/")[1])) + + return + + def _copy_hot_scripts(self): + #Upload demon.py as it executes the hot scripts + + if not self._testbed: + return + + if (self._server_pre_hot_scripts or self._server_post_hot_scripts or + self._client_pre_hot_scripts or self._client_post_hot_scripts): + + script_dir = pathlib.Path(__file__).parent.resolve() + server_script = os.path.join(script_dir, "demon.py") + self._push_directory_to_remote( + self._testbed_client, server_script, "demon.py", normalize=False + ) + self._push_directory_to_remote( + self._testbed_server, server_script, "demon.py", normalize=False + ) + + all_hot_scripts = self._server_pre_hot_scripts + self._server_post_hot_scripts + \ + self._client_pre_hot_scripts + self._client_post_hot_scripts + for hot_script in all_hot_scripts: + path = pathlib.Path(hot_script) + # Create directory + mkdir = f"mkdir -p {path.parent}" + mkdir_c = f"ssh {self._testbed_client} {mkdir}" + mkdir_s = f"ssh {self._testbed_server} {mkdir}" + mkdir_pc = subprocess.Popen(mkdir_c, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + mkdir_ps = subprocess.Popen(mkdir_s, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + mkdir_pc.communicate() + mkdir_ps.communicate() + # Copy script + self._push_directory_to_remote( + self._testbed_client, hot_script, path.parent, normalize=False + ) + self._push_directory_to_remote( + self._testbed_server, hot_script, path.parent, normalize=False + ) + + def _run_testcase( + self, server: str, client: str, test: Callable[[], testcases.TestCase] + ) -> TestResult: + return self._run_test(server, client, None, test)[0] + + def _run_test( + self, + server: str, + client: str, + log_dir_prefix: None, + test: Callable[[], testcases.TestCase], + ) -> Tuple[TestResult, float]: + + if self._check_test_is_unsupported(name=client, role='client', testcase=test.name()): + self.logger.info(colored(f"Client {client} does not support {test.name()}", "red")) + return TestResult.UNSUPPORTED, None + + if self._check_test_is_unsupported(name=server, role='server', testcase=test.name()): + self.logger.info(colored(f"Server {server} does not support {test.name()}", "red")) + return TestResult.UNSUPPORTED, None + + start_time = datetime.now() + sim_log_dir = tempfile.TemporaryDirectory(dir="/tmp", prefix="logs_sim_") + server_log_dir = tempfile.TemporaryDirectory(dir="/tmp", prefix="logs_server_") + client_log_dir = tempfile.TemporaryDirectory(dir="/tmp", prefix="logs_client_") + log_file = tempfile.NamedTemporaryFile(dir="/tmp", prefix="output_log_") + log_handler = logging.FileHandler(log_file.name) + log_handler.setLevel(logging.DEBUG) + + formatter = LogFileFormatter("%(asctime)s %(message)s") + log_handler.setFormatter(formatter) + self.logger.addHandler(log_handler) + + client_keylog = os.path.join(client_log_dir.name, 'keys.log') + server_keylog = os.path.join(server_log_dir.name, 'keys.log') + + client_qlog_dir = os.path.join(client_log_dir.name, 'client_qlog/') + server_qlog_dir = os.path.join(server_log_dir.name, 'server_qlog/') + + server_ip = "127.0.0.1" + server_name = "server" if not self._use_v6 else "server6" + + # Check if test object is of MEASUREMENTS or TESTCASES type + # a Measurement Object has additional bandwidth parameters + testcase = test( + sim_log_dir=sim_log_dir, + client_keylog_file=client_keylog, + server_keylog_file=server_keylog, + client_log_dir=client_log_dir.name, + server_log_dir=server_log_dir.name, + client_qlog_dir=client_qlog_dir, + server_qlog_dir=server_qlog_dir, + server_ip=server_ip if not self._testbed else self._testbed_server_ip, + server_name=server_name, + link_bandwidth=self._bandwidth, + delay=self._delay, + packet_reorder=self._reorder_packets, + loss=self._loss, + corruption=self._corruption + ) + + if self._testbed: + for dir in [server_log_dir.name, testcase.www_dir(), testcase.certs_dir()]: + self._push_directory_to_remote(self._testbed_server, dir) + for dir in [sim_log_dir.name, client_log_dir.name, testcase.download_dir(), testcase.certs_dir()]: + self._push_directory_to_remote(self._testbed_client, dir) + + paths = testcase.get_paths( + max_size=self._get_max_filesize(client, server), + host=self._testbed_server if self._testbed else None + ) + + reqs = " ".join([testcase.urlprefix() + p for p in paths]) + self.logger.debug("Requests: %s", reqs) + + server_params = " ".join([ + f"SSLKEYLOGFILE={server_keylog}", + f"QLOGDIR={server_qlog_dir}" if testcase.use_qlog() else "", + f"LOGS={server_log_dir.name}", + f"TESTCASE={testcase.testname(Perspective.SERVER)}", + f"WWW={testcase.www_dir()}", + f"CERTS={testcase.certs_dir()}", + f"IP={testcase.ip()}", + f"PORT={testcase.port()}", + f"SERVERNAME={testcase.servername()}", + ]) + if self._disable_server_aes_offload: + server_params = " ".join([ + 'OPENSSL_ia32cap="~0x200000200000000"', + server_params + ]) + + client_params = " ".join([ + f"SSLKEYLOGFILE={client_keylog}", + f"QLOGDIR={client_qlog_dir}" if testcase.use_qlog() else "", + f"LOGS={client_log_dir.name}", + f"TESTCASE={testcase.testname(Perspective.CLIENT)}", + f"DOWNLOADS={testcase.download_dir()}", + f"CERTS={testcase.certs_dir()}", + f"REQUESTS=\"{reqs}\"", + ]) + if self._disable_client_aes_offload: + client_params = " ".join([ + 'OPENSSL_ia32cap="~0x200000200000000"', + client_params + ]) + + server_run_script = "./run-server.sh" + server_venv_script = self._get_venv(server, "server") + client_run_script = "./run-client.sh" + client_venv_script = self._get_venv(client, "client") + + #TODO: Add notes that these tools are not supported when using DPDK + interface = 'lo' if not sys.platform == "darwin" else 'lo0' + interface = interface if not self._testbed else self._testbed_client_interface + + trace_cmd = f"tcpdump -i {interface} -U -w {sim_log_dir.name}/trace.pcap" + ifstat_cmd = f"ifstat -i {interface} -bn -t > {sim_log_dir.name}/interface_status.txt" + + server_cmd = f"{server_venv_script}; {server_params} {server_run_script}" + client_cmd = f"{client_venv_script}; {client_params} {client_run_script}" + + if self._testbed: + trace_cmd = f'ssh {self._testbed_client} "{trace_cmd}"' + ifstat_cmd = f'ssh {self._testbed_client} "{ifstat_cmd}"' + + server_cmd = f'ssh {self._testbed_server} \'cd {self._implementations[server]["path"]}; {server_cmd}\'' + client_cmd = f'ssh {self._testbed_client} \'cd {self._implementations[client]["path"]}; {client_cmd}\'' + + expired = False + try: + if testcase.use_tcpdump(): + self.logger.debug(f'Starting tcpdump on {interface}') + trace = subprocess.Popen( + trace_cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + if testcase.use_ifstat(): + self.logger.debug(f'Starting ifstat on {interface}') + ifstat = subprocess.Popen( + ifstat_cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + if testcase.use_tcpdump() or testcase.use_ifstat(): + # Wait until processes are started + time.sleep(2) + + # Limit bandwidth using tc for measurements if was set + if self._testbed and testcase.is_bandwidth_limited(): + self._set_server_bandwidth_limit( + testcase.bandwidth() + ) + + # Emulate network on client interface + if self._testbed and (testcase.is_delay_added() or testcase.is_loss_added() + or testcase.is_corruption_added() or testcase.is_packet_reorder_added()): + self._set_client_network_emulation_incoming( + delay=testcase.delay(), + reorder_packets=testcase.packet_reorder(), + corruption=testcase.corruption(), + loss=testcase.loss() + ) + self._set_client_network_emulation_outgoing( + delay=testcase.delay(), + reorder_packets=testcase.packet_reorder(), + corruption=testcase.corruption(), + loss=testcase.loss() + ) + + # Set client and server pos variables for pre- and postscripts + # + # Only do this when in testbed mode. Implementation parameters will + # also be set here. Variables have to be set every iteration as + # we want to offer temporary log directories to the scripts as well. + if self._testbed: + server_variables: dict = { + "implementation": server, + "interface": self._testbed_server_interface if self._testbed else interface, + "hostname": self._testbed_server if self._testbed else 'local', + "log_dir": server_log_dir.name, + "www_dir": testcase.www_dir(), + "certs_dir": testcase.certs_dir(), + "role": "server" + } + client_variables: dict = { + "implementation": client, + "interface": self._testbed_client_interface if self._testbed else interface, + "hostname": self._testbed_client if self._testbed else 'local', + "log_dir": client_log_dir.name, + "sim_log_dir": sim_log_dir.name, + "download_dir": testcase.download_dir(), + "certs_dir": testcase.certs_dir(), + "role": "client", + "ip": self._testbed_client_ip if self._testbed else "127.0.0.1" + } + + # Only set pci_id and server_mac in the variables if those were specified in the testbed file + if self._testbed_server_pci_id is not None: + server_variables["pci_id"] = self._testbed_server_pci_id + if self._testbed_client_pci_id is not None: + client_variables["pci_id"] = self._testbed_client_pci_id + if self._testbed_server_mac is not None: + client_variables["server_mac"] = self._testbed_server_mac + + if (self._server_pre_hot_scripts or self._client_pre_hot_scripts + or self._server_post_hot_scripts or self._client_post_hot_scripts): + client_variables["client_socket"] = "/tmp/client" + client_variables["server_socket"] = "/tmp/server" + server_variables["client_socket"] = "/tmp/client" + server_variables["server_socket"] = "/tmp/server" + + # Include implementation params into variables + role_tuples = [ + (server_variables, self._server_implementation_params), + (client_variables, self._client_implementation_params) + ] + for (dct, impl_vars) in role_tuples: + for (k, v) in impl_vars.items(): + dct[k] = v + + self._set_variables_with_pos_on_machine( + self._testbed_server, + server_variables + ) + self._set_variables_with_pos_on_machine( + self._testbed_client, + client_variables + ) + + # Execute list of server and client pre run scripts given if in testbed mode + if self._testbed: + for host, scripts in [(self._testbed_server, self._server_pre_scripts), (self._testbed_client, self._client_pre_scripts)]: + if len(scripts) != 0: + for script in scripts: + self._run_script_with_pos_on_machine(host, script) + + dhs = None + dhc = None + if (self._server_pre_hot_scripts or self._client_pre_hot_scripts + or self._server_post_hot_scripts or self._client_post_hot_scripts): + # Run the demon.py + self.logger.debug(f'Starting demon.py') + demon_hot_cmd_s = f'./demon.py -c /tmp/client -s /tmp/server' + demon_hot_cmd_c = f'./demon.py -c /tmp/client -s /tmp/server' + for server_script in self._server_pre_hot_scripts: + demon_hot_cmd_s += f' -p {server_script}' + for server_script in self._server_post_hot_scripts: + demon_hot_cmd_s += f' -P {server_script}' + for client_script in self._client_pre_hot_scripts: + demon_hot_cmd_c += f' -p {client_script}' + for client_script in self._client_post_hot_scripts: + demon_hot_cmd_c += f' -P {client_script}' + + if self._testbed: + demon_hot_cmd_s = f'ssh {self._testbed_server} \'{demon_hot_cmd_s}\'' + demon_hot_cmd_c = f'ssh {self._testbed_client} \'{demon_hot_cmd_c}\'' + + self.logger.debug(f'Starting demon.py on server:\n {demon_hot_cmd_s}\n') + self.logger.debug(f'Starting demon.py on client:\n {demon_hot_cmd_c}\n') + dhs = subprocess.Popen( + demon_hot_cmd_s, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + dhc = subprocess.Popen( + demon_hot_cmd_c, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + # If in manual mode, don't run the server and client cmds, but print them out + if self._manual_mode: + if self._testbed: + # Prepare server shell + self._add_remote_shell_config( + self._testbed_server, + f'{server_venv_script}; export {server_params}' + ) + + print(f'Server commands:\n') + # TODO: also support non-testbed mode + print(f'\tssh -t {self._testbed_server} \"cd {self._implementations[server]["path"]}; exec \\$SHELL -l\"\n' + f'\t{server_run_script}\n') + + if self._testbed: + # Prepare client shell + self._add_remote_shell_config( + self._testbed_client, + f'{client_venv_script}; export {client_params}' + ) + + print(f'Client commands:\n') + # TODO: also support non-testbed mode + print(f'\tssh -t {self._testbed_client} \"cd {self._implementations[client]["path"]}; exec \\$SHELL -l\"\n' + f'\t{client_run_script}\n') + + # Wait for user input before continuing + input("Press [Enter] to continue...") + print("Cleaning up this testcase...") + + if self._testbed: + # Remove config from the server and client shells + self._remove_remote_shell_config(self._testbed_server) + self._remove_remote_shell_config(self._testbed_client) + else: + # Run Server + self.logger.debug(f'Starting server:\n {server_cmd}\n') + if self._testbed: + s = subprocess.Popen( + server_cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + else: + s = subprocess.Popen( + server_cmd, + shell=True, + cwd=os.path.join(self._implementations_directory, self._implementations[server]['path']), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + time.sleep(2) + + # Run Client + self.logger.debug(f'Starting client:\n {client_cmd}\n') + if self._testbed: + testcase._start_time = datetime.now() + c = subprocess.Popen( + client_cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + else: + testcase._start_time = datetime.now() + c = subprocess.Popen( + client_cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=os.path.join(self._implementations_directory, self._implementations[client]['path']) + ) + c_stdout, c_stderr = c.communicate(timeout=testcase.timeout()) + testcase._end_time = datetime.now() + output = (c_stdout.decode("utf-8") if c_stdout else '') + \ + (c_stderr.decode("utf-8") if c_stderr else '') + + if dhs: + self.logger.debug(f'Killing demon.py on server') + if self._testbed: + subprocess.Popen(f'ssh {self._testbed_server} pkill -f demon.py', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + else: + dhs.kill() + if dhc: + self.logger.debug(f'Killing demon.py on client') + if self._testbed: + subprocess.Popen(f'ssh {self._testbed_client} pkill -f demon.py', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + else: + dhc.kill() + + except subprocess.TimeoutExpired as ex: + self.logger.error(colored(f"Client expired: {ex}", 'red')) + expired = True + finally: + # Remove bandwidth limit if was set + if self._testbed and testcase.is_bandwidth_limited(): + self._remove_server_bandwidth_limit() + + # Remove network emulation if was set + if self._testbed and (testcase.is_delay_added() or testcase.is_loss_added() + or testcase.is_corruption_added() or testcase.is_packet_reorder_added()): + self._remove_client_network_emulation_incoming() + self._remove_client_network_emulation_outgoing() + + # Execute list of server and client post run scripts given if in testbed mode + if self._testbed: + for host, scripts in [(self._testbed_server, self._server_post_scripts), (self._testbed_client, self._client_post_scripts)]: + if len(scripts) != 0: + for script in scripts: + self._run_script_with_pos_on_machine(host, script) + + time.sleep(1) + if self._testbed: + if testcase.use_tcpdump(): + subprocess.Popen(f'ssh {self._testbed_client} pkill -f tcpdump', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if testcase.use_ifstat(): + subprocess.Popen(f'ssh {self._testbed_client} pkill -f ifstat', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + #Does only kill the server if its process name is "server"? + #Use s.kill() instead? Killing ssh session kills processes if not in background + subprocess.Popen(f'ssh {self._testbed_server} pkill -f server', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + else: + if testcase.use_tcpdump(): + kill(trace.pid) + if testcase.use_ifstat(): + kill(ifstat.pid) + kill(s.pid) + + # Unset client and server pos variables + # + # This step is necessary as otherwise set pos variables will stay + # for future testcases/measurements if not set to another value + # + # Set the values to "" to trigger default variables in bash + # + if self._testbed: + self._set_variables_with_pos_on_machine( + self._testbed_server, + {k: "" for k in server_variables.keys()} + ) + self._set_variables_with_pos_on_machine( + self._testbed_client, + {k: "" for k in client_variables.keys()} + ) + + # End execution here when in manual mode (i.e., skip logging and performance calculations) + if self._manual_mode: + return TestResult.UNSUPPORTED, float('nan') + + if not expired: + log_process(c_stdout, c_stderr, 'Client') + + if testcase.use_tcpdump(): + log_process(*trace.communicate(), 'tcpdump') + if testcase.use_ifstat(): + log_process(*ifstat.communicate(), 'ifstat') + s_output = s.communicate() + log_process(*s_output, 'Server') + + # TODO: + # instead of pulling the download dir from the client, + # calculate the hash there already + if self._testbed: + for dir in [server_log_dir.name, testcase.certs_dir()]: + self._pull_directory_from_remote(self._testbed_server, dir) + for dir in [sim_log_dir.name, client_log_dir.name]: + self._pull_directory_from_remote(self._testbed_client, dir) + + # Try to compute transmission time from client logs + if self._use_client_timestamps: + try: + with open(os.path.join(client_log_dir.name, 'time.json')) as f: + data = json.load(f) + + new_start = datetime.fromtimestamp(data['start'] / 10**9) + new_end = datetime.fromtimestamp(data['end'] / 10**9) + + old_duration = testcase._end_time - testcase._start_time + new_duration = new_end - new_start + + self.logger.debug(f'Interop duration: {old_duration}') + self.logger.debug(f'Client duration: {new_duration}') + self.logger.debug('Difference: {:.2f}%'.format((old_duration - new_duration) / old_duration * 100)) + + testcase._start_time = new_start + testcase._end_time = new_end + + except Exception as e: + self.logger.error(f'Failed to read time.json: {e}') + + # Todo End + if s.returncode == 127 \ + or self._is_unsupported(s_output[0].decode("utf-8").splitlines()) \ + or self._is_unsupported(s_output[1].decode("utf-8").splitlines()): + self.logger.error(colored(f"server does not support the test", 'red')) + self._set_testcase_unsupported(name=server, role='server', testcase=testcase.name()) + status = TestResult.UNSUPPORTED + elif not expired: + lines = output.splitlines() + if c.returncode == 127 or self._is_unsupported(lines): + self.logger.error(colored(f"client does not support the test", 'red')) + self._set_testcase_unsupported(name=client, role='client', testcase=testcase.name()) + status = TestResult.UNSUPPORTED + elif c.returncode == 0 or any("client exited with code 0" in str(line) for line in lines): + try: + status = testcase.check() if not self._testbed else testcase.check(self._testbed_client, self._testbed_server) + except Exception: + self.logger.error(colored(f"testcase.check() threw Exception: {traceback.format_exc()}", 'red')) + status = TestResult.FAILED + else: + self.logger.error(colored(f"Client or server failed", 'red')) + status = TestResult.FAILED + else: + self.logger.error(colored(f"Client or server expired", 'red')) + status = TestResult.FAILED + + if status == TestResult.SUCCEEDED: + self.logger.info(colored(f"\u2713 Test successful", 'green')) + elif status == TestResult.FAILED: + self.logger.info(colored(f"\u2620 Test failed", 'red')) + elif status == TestResult.UNSUPPORTED: + self.logger.info(colored(f"? Test unsupported", 'yellow')) + + # save logs + self.logger.removeHandler(log_handler) + log_handler.close() + if status == TestResult.FAILED or status == TestResult.SUCCEEDED: + log_dir = self._log_dir + "/" + server + "_" + client + "/" + str(testcase) + if log_dir_prefix: + log_dir += "/" + log_dir_prefix + shutil.copytree(server_log_dir.name, log_dir + "/server") + shutil.copytree(client_log_dir.name, log_dir + "/client") + shutil.copytree(sim_log_dir.name, log_dir + "/sim") + shutil.copyfile(log_file.name, log_dir + "/output.txt") + if self._save_files and status == TestResult.FAILED: + shutil.copytree(testcase.www_dir(), log_dir + "/www") + try: + shutil.copytree(testcase.download_dir(), log_dir + "/downloads") + except Exception as exception: + self.logger.info("Could not copy downloaded files: %s", exception) + + if self._testbed: + self._delete_remote_directory(self._testbed_server, server_log_dir.name) + self._delete_remote_directory(self._testbed_server, testcase.www_dir()) + self._delete_remote_directory(self._testbed_server, testcase.certs_dir()) + self._delete_remote_directory(self._testbed_client, client_log_dir.name) + self._delete_remote_directory(self._testbed_client, sim_log_dir.name) + self._delete_remote_directory(self._testbed_client, testcase.download_dir()) + self._delete_remote_directory(self._testbed_client, testcase.certs_dir()) + + testcase.cleanup() + server_log_dir.cleanup() + client_log_dir.cleanup() + self.logger.debug("Test took %ss", (datetime.now() - start_time).total_seconds()) + + # measurements also have a value + if hasattr(testcase, "result"): + value = testcase.result() + else: + value = None + + return status, value + + def _run_measurement( + self, server: str, client: str, test: Callable[[], testcases.Measurement] + ) -> MeasurementResult: + values = [] + for i in range(0, test.repetitions()): + self.logger.info(f"Run measurement {i + 1}/{test.repetitions()}") + result, value = self._run_test(server, client, "%d" % (i + 1), test) + if result != TestResult.SUCCEEDED: + if self._continue_on_error: + continue + res = MeasurementResult() + res.result = result + res.details = "" + return res + values.append(value) + + self.logger.debug(values) + res = MeasurementResult() + res.result = TestResult.SUCCEEDED + res.all_infos = values + res.details = "" + + if len(values) > 0: + mean = statistics.mean(values) + stdev = statistics.stdev(values) if len(values) > 1 else 0 + + res.details = "{:.2f} (± {:.2f}) {}".format( + mean, stdev, test.unit() + ) + else: + res.result = TestResult.FAILED + return res + + def _setup_env(self, path, name, role): + """Creates a python venv for this role and + executes the implementations setup-env.sh script in this python + venv; both tasks either locally or remotely using ssh in testbed mode. + """ + try: + if name in self._prepared_envs[role]: + return + + venv_command = self._get_venv(name, role) + cmd = venv_command + "; ./setup-env.sh" + + if self._testbed: + cmd = f'ssh {self._testbed_server if role == "server" else self._testbed_client} "cd {path}; {cmd}"' + + self.logger.debug(f'Setup:\n {cmd}\n') + + if self._testbed: + setup_env = subprocess.Popen( + cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + else: + setup_env = subprocess.Popen( + cmd, + cwd=os.path.join(self._implementations_directory, self._implementations[name]['path']), + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + log_process(*setup_env.communicate(timeout=120), 'setup_env') + self._prepared_envs[role].append(name) + except subprocess.TimeoutExpired as ex: + self.logger.error(colored(f"Setup environment timeout for {name} ({role})", 'red')) + return ex + return + + def _get_max_filesize(self, client: str, server: str): + """Check if there is a file size limitation for the given client server implementation""" + + sizes = list(filter(None, [ + self._implementations[client].get('max_filesize', None), + self._implementations[server].get('max_filesize', None) + ])) + return min(sizes, default=None) + + def run(self): + """run the interop test suite and output the table""" + + if self._testbed: + if not TESTBED_ENABLED: + self.logger.info('Failed to load pos, testbed mode disabled.') + return 1 + self.logger.info(colored(f'Testbed mode: {self._testbed_server}-{self._testbed_client}', 'white', attrs=['bold'])) + + nr_failed = 0 + self.logger.info(colored(f"Saving logs to {self._log_dir}", "yellow", attrs=['bold'])) + self.logger.info(colored(f'Servers: {" ".join(self._servers)}', 'yellow', attrs=['bold'])) + self.logger.info(colored(f'Clients: {" ".join(self._clients)}', 'yellow', attrs=['bold'])) + if len(self._tests) > 0: + self.logger.info(colored(f'Testcases: {" ".join(map(lambda x: x.name(), self._tests))}', 'yellow', attrs=['bold'])) + if len(self._measurements) > 0: + self.logger.info(colored(f'Measurements: {" ".join(map(lambda x: x.name(), self._measurements))}', 'yellow', attrs=['bold'])) + + total_tests = len(self._servers) * len(self._clients) * (len(self._tests) + len(self._measurements)) + finished_tests = 0 + + # Copy implementations to remote hosts + self._copy_implementations() + # Copy everything needed for hot scripts to remote hosts + self._copy_hot_scripts() + + for server in self._servers: + path = self._implementations[server]["path"] + if self._setup_env(path, name=server, role="server") is not None: + continue + + if not self._check_impl_is_compliant(server, role='server'): + self.logger.info(colored(f"Server {server} is not compliant, skipping", "red")) + finished_tests += (len(self._tests) + len(self._measurements)) * len(self._clients) + continue + + for client in self._clients: + + if self._only_same_implementation or self._is_client_or_server_solomode(client, server): + if client != server: + finished_tests += len(self._tests) + len(self._measurements) + continue + + path = self._implementations[client]["path"] + if self._setup_env(path, name=client, role="client") is not None: + continue + + if not self._check_impl_is_compliant(client, role='client'): + self.logger.info(colored(f"Client {client} is not compliant, skipping", "red")) + finished_tests += len(self._tests) + len(self._measurements) + continue + + self.logger.debug( + "Running with server %s (%s) and client %s (%s)", + server, + self._implementations[server]["path"], + client, + self._implementations[client]["path"], + ) + + # run the test cases + for testcase in self._tests: + finished_tests += 1 + + self.logger.info( + colored( + "\n---\n" + + f"{finished_tests}/{total_tests}\n" + + f"Test: {testcase.name()}\n" + + f"Server: {server} " + + f"Client: {client}\n" + + "---", + 'cyan', + attrs=['bold'] + ) + ) + + status = self._run_testcase(server, client, testcase) + self.test_results[server][client][testcase] = status + if status == TestResult.FAILED: + nr_failed += 1 + + # run the measurements + for measurement in self._measurements: + finished_tests += 1 + + self.logger.info( + colored( + "\n---\n" + + f"{finished_tests}/{total_tests}\n" + + f"Measurement: {measurement.name()}\n" + + f"Server: {server}\n" + + f"Client: {client}\n" + + "---", + 'magenta', + attrs=['bold'] + ) + ) + + res = self._run_measurement(server, client, measurement) + self.measurement_results[server][client][measurement] = res + + self._print_results() + self._export_results() + return nr_failed diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..a3af80e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +psutil +termcolor +prettytable +pyshark +python-gitlab +pyyaml diff --git a/result.py b/result.py new file mode 100644 index 0000000..24fb193 --- /dev/null +++ b/result.py @@ -0,0 +1,7 @@ +from enum import Enum + + +class TestResult(Enum): + SUCCEEDED = "succeeded" + FAILED = "failed" + UNSUPPORTED = "unsupported" diff --git a/run.py b/run.py new file mode 100755 index 0000000..e463982 --- /dev/null +++ b/run.py @@ -0,0 +1,394 @@ +#!/usr/bin/env python3 + +import argparse +import ast +import sys +import yaml + +from typing import List, Tuple +from yaml.scanner import ScannerError + +import testcases + +from implementations import IMPLEMENTATIONS +from implementations import parse_filesize +from interop import InteropRunner +from testcases import MEASUREMENTS, TESTCASES + + +def main(): + def get_args(): + parser = argparse.ArgumentParser() + parser.add_argument( + "-d", + "--debug", + action="store_const", + const=True, + default=False, + help="turn on debug logs", + ) + parser.add_argument( + "-m", + "--manual-mode", + action="store_true", + help="only prepare the tests and print out the server and client run commands (to be executed manually)" + ) + parser.add_argument( + "--config", + metavar="config.yml", + help="File containing argument values" + ) + parser.add_argument( + "-6", "--enable-ipv6", action="store_true", default=False, dest="v6", # dest allows accessing it + help="Enables IPv6 execution in the interop runner" + ) + parser.add_argument( + "-s", "--server", help="server implementations (comma-separated)" + ) + parser.add_argument( + "-c", "--client", help="client implementations (comma-separated)" + ) + parser.add_argument( + "-t", + "--test", + help="test cases (comma-separatated). Valid test cases are: " + + ", ".join([x.name() for x in TESTCASES + MEASUREMENTS]), + ) + parser.add_argument( + "-l", + "--log-dir", + help="log directory", + default="", + ) + parser.add_argument( + "-f", "--save-files", help="save downloaded files if a test fails" + ) + parser.add_argument( + "-i", "--implementation-directory", + help="Directory containing the implementations." + "This is prepended to the 'path' in the implementations.json file." + "Default: .", + default='.' + ) + parser.add_argument( + "-j", "--json", help="output the matrix to file in json format" + ) + parser.add_argument( + "--venv-dir", + help="dir to store venvs", + default="", + ) + parser.add_argument( + "--testbed", + help="Runs the measurement in testbed mode. Requires a json file with client/server information" + ) + parser.add_argument( + "--bandwidth", + help="Set a link bandwidth value which will be enforced using tc. Is only set in testbed mode on the remote hosts. Set values in tc syntax, e.g. 100mbit, 1gbit" + ) + parser.add_argument( + "--delay", + help="Add the chosen delay to packets sent to the client interface using tc. Set values in milliseconds, " + "e.g. --delay 10ms" + ) + parser.add_argument( + "--reorder", + nargs=2, + help="Add random reordering to packets sent to the client interface using tc. It is required to set a " + "delay value for using this option. Two percentage values are required for this option. The first is " + "the percentage of packets immediately sent and the second is the correlation ," + "e.g. --reorder-packets 25%% 50%%" + ) + parser.add_argument( + "--corruption", + help="Add random noise corruption using tc. This option introduces a single bit error at a random offset " + "in the packet. Set value in percentage, e.g. --corruption 0.1%%" + ) + parser.add_argument( + "--loss", + help="Add random packet loss specified in the 'tc' command in percentage, e.g. --loss 0.1%%" + ) + # TODO: Maybe add option for packet duplication too + + # Handle Pre-/Postscripts for server and client + script_variable_help_msg = ("Available pos variables to use: " + "interface (the interface we send/receive on, e.g enp123test), " + "hostname, log_dir (the directory where all logs are saved to)") + script_server_vars = ", www_dir (server root when serving files), certs_dir (folder of the certificates)" + script_client_vars = ", sim_log_dir, download_dir" + parser.add_argument( + "-spre", + "--server-prerunscript", + default=[], + nargs="*", + metavar="SCRIPT", + help="Add a bash script which should be executed before a test run on the server using pos. " + script_variable_help_msg + script_server_vars, + ) + parser.add_argument( + "-sprehot", + "--server-prerunscript-hot", + default=[], + nargs="*", + metavar="SCRIPT", + help="Add a bash script which should be executed at the first hold stage (client/server called begin()) " + script_variable_help_msg + script_server_vars, + ) + parser.add_argument( + "-sposthot", + "--server-postrunscript-hot", + default=[], + nargs="*", + metavar="SCRIPT", + help="Add a bash script which should be executed at the second hold stage (client/server called end()) " + script_variable_help_msg + script_server_vars, + ) + parser.add_argument( + "-spost", + "--server-postrunscript", + default=[], + nargs="*", + metavar="SCRIPT", + help="Add a bash script which should be executed after a test run on the server using pos. " + script_variable_help_msg + script_server_vars, + ) + parser.add_argument( + "-cpre", + "--client-prerunscript", + default=[], + nargs="*", + metavar="SCRIPT", + help="Add a bash script which should be executed before a test run on the client using pos. " + script_variable_help_msg + script_client_vars, + ) + parser.add_argument( + "-cprehot", + "--client-prerunscript-hot", + default=[], + nargs="*", + metavar="SCRIPT", + help="Add a bash script which should be executed at the first hold stage (client/server called begin()) " + script_variable_help_msg + script_client_vars, + ) + parser.add_argument( + "-cposthot", + "--client-postrunscript-hot", + default=[], + nargs="*", + metavar="SCRIPT", + help="Add a bash script which should be executed at the second hold stage (client/server called end()) " + script_variable_help_msg + script_client_vars, + ) + parser.add_argument( + "-cpost", + "--client-postrunscript", + default=[], + nargs="*", + metavar="SCRIPT", + help="Add a bash script which should be executed after a test run on the client using pos. " + script_variable_help_msg + script_client_vars, + ) + parser.add_argument( + "--client-implementation-params", + nargs="*", + metavar="KEY=VALUE", + help="", + default=[] + ) + parser.add_argument( + "--server-implementation-params", + nargs="*", + metavar="KEY=VALUE", + help="", + default=[] + ) + parser.add_argument( + "--disable-server-aes-offload", + action="store_const", + const=True, + default=False, + help="turn server aes offload off", + ) + parser.add_argument( + "--disable-client-aes-offload", + action="store_const", + const=True, + default=False, + help="turn client aes offload off", + ) + parser.add_argument( + "--filesize", + help="Set the filesize of the transmitted file for all measurements. If no unit is specified MiB is assumed." + ) + parser.add_argument( + "--repetitions", + metavar="N", + type=int, + help="Set the number of repetitions for all measurements." + ) + parser.add_argument( + "--continue-on-error", + action="store_true", + help="Continue measurement even if a measurement fails." + ) + parser.add_argument( + "--use-client-timestamps", + action="store_true", + help="Try to parse timestamps written by the client for computing goodput." + ) + parser.add_argument( + "--only-same-implementation", + action="store_true", + help="Test implementations only against their counterpart." + ) + + args = parser.parse_args() + if args.config: + config_args = parse_config(args.config) + parser.set_defaults(**config_args) + # Ensure that every config file argument is defined in the parser + for k, _ in config_args.items(): + if k not in args: + sys.exit(f"Argument '{k}' from config file was not recognized by the parser.") + args = parser.parse_args() + + return args + + def parse_config(config_file): + try: + with open(config_file, 'r') as f: + model_config = yaml.safe_load(f) + return model_config + except ScannerError: + sys.exit("config file syntax error!") + except FileNotFoundError: + sys.exit("config file not found!") + + def get_dict_arg(arg): + """Given: list containing one KV pair + per entry + Return: dict containing all KV pairs + from the list + """ + output = {} + if not arg: + return output + for item in arg: + if type(item) is dict: + output = {**output, **item} + else: + try: + k, v = item.split('=', 1) + except ValueError: + # handle entries without equals symbol as bool set to True + output[item] = True + continue + try: + output[k] = ast.literal_eval(v) + except (ValueError, SyntaxError): + output[k] = v + return output + + def get_impls(arg, availableImpls, role) -> List[str]: + if not arg: + return availableImpls + impls = [] + arg = arg.replace(" ", "").replace("\n", "") + for s in arg.replace(" ", "").split(","): + if s not in availableImpls: + sys.exit(role + " implementation " + s + " not found.") + impls.append(s) + return impls + + def get_tests_and_measurements( + arg, + filesize, + repetitions, + ) -> Tuple[List[testcases.TestCase], List[testcases.Measurement]]: + if arg is None: + return TESTCASES, MEASUREMENTS + elif arg == "onlyTests": + return TESTCASES, [] + elif arg == "onlyMeasurements": + return [], MEASUREMENTS + elif not arg: + return [], [] + tests = [] + measurements = [] + for t in arg.split(","): + if t in [tc.name() for tc in TESTCASES]: + tests += [tc for tc in TESTCASES if tc.name() == t] + elif t in [tc.name() for tc in MEASUREMENTS]: + measurement = [tc for tc in MEASUREMENTS if tc.name() == t] + if filesize: + measurement[0].FILESIZE = parse_filesize(str(filesize), default_unit="MiB") + if repetitions: + measurement[0].REPETITIONS = int(repetitions) + measurements += measurement + else: + print( + ( + "Test case {} not found.\n" + "Available testcases: {}\n" + "Available measurements: {}" + ).format( + t, + ", ".join([t.name() for t in TESTCASES]), + ", ".join([t.name() for t in MEASUREMENTS]), + ) + ) + sys.exit() + return tests, measurements + + args = get_args() + args.server_implementation_params = get_dict_arg(args.server_implementation_params) + args.client_implementation_params = get_dict_arg(args.client_implementation_params) + + tests, measurements = get_tests_and_measurements( + args.test, + args.filesize, + args.repetitions + ) + + # Check if reorder packets option is set without delay + if args.reorder and (args.delay is None): + print("--reorder requires --delay") + return 1 + + if args.manual_mode and not args.testbed: + print("Manual mode is currently only supported in testbed mode!") + return 1 + + return InteropRunner( + implementations=IMPLEMENTATIONS, + implementations_directory=args.implementation_directory, + servers=get_impls(args.server, IMPLEMENTATIONS, "Server"), + clients=get_impls(args.client, IMPLEMENTATIONS, "Client"), + tests=tests, + measurements=measurements, + output=args.json, + debug=args.debug, + manual_mode=args.manual_mode, + log_dir=args.log_dir, + save_files=args.save_files, + venv_dir=args.venv_dir, + testbed=args.testbed, + bandwidth=args.bandwidth, + server_pre_scripts=args.server_prerunscript, + server_pre_hot_scripts=args.server_prerunscript_hot, + server_post_hot_scripts=args.server_postrunscript_hot, + server_post_scripts=args.server_postrunscript, + client_pre_scripts=args.client_prerunscript, + client_pre_hot_scripts=args.client_prerunscript_hot, + client_post_hot_scripts=args.client_postrunscript_hot, + client_post_scripts=args.client_postrunscript, + reorder_packets=args.reorder, + delay=args.delay, + corruption=args.corruption, + loss=args.loss, + client_implementation_params=args.client_implementation_params, + server_implementation_params=args.server_implementation_params, + disable_server_aes_offload=args.disable_server_aes_offload, + disable_client_aes_offload=args.disable_client_aes_offload, + continue_on_error=args.continue_on_error, + use_client_timestamps=args.use_client_timestamps, + only_same_implementation=args.only_same_implementation, + use_v6=args.v6, + args=vars(args), + ).run() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/testcases.py b/testcases.py new file mode 100644 index 0000000..d119221 --- /dev/null +++ b/testcases.py @@ -0,0 +1,1072 @@ +import abc +import filecmp +import logging +import os +import random +import re +import string +import subprocess +import sys +import tempfile +from datetime import timedelta +from enum import Enum, IntEnum +from trace import Direction, PacketType, TraceAnalyzer +from typing import List + +from result import TestResult + +KiB = 1 << 10 +MiB = 1 << 20 +GiB = 1 << 30 + +QUIC_DRAFT = 34 # draft-34 +QUIC_VERSION = hex(0x1) + + +class Perspective(Enum): + SERVER = "server" + CLIENT = "client" + + +class ECN(IntEnum): + NONE = 0 + ECT1 = 1 + ECT0 = 2 + CE = 3 + + +def random_string(length: int): + """Generate a random string of fixed length """ + letters = string.ascii_lowercase + return "".join(random.choice(letters) for i in range(length)) + + +def generate_cert_chain(directory: str, length: int = 1): + cmd = "./certs.sh " + directory + " " + str(length) + r = subprocess.run( + cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ) + logging.debug("%s", r.stdout.decode("utf-8")) + if r.returncode != 0: + logging.info("Unable to create certificates") + sys.exit(1) + + +class TestCase(abc.ABC): + _files = [] + _www_dir = None + _client_keylog_file = None + _server_keylog_file = None + _download_dir = None + _sim_log_dir = None + _cert_dir = None + _cached_server_trace = None + _cached_client_trace = None + _start_time = None + _end_time = None + _server_ip = None + _server_port = None + _server_name = None + _link_bandwidth = None + _delay = None + _packet_reorder: [str] = [] + _loss = None + _corruption = None + + def __init__( + self, + sim_log_dir: tempfile.TemporaryDirectory, + client_keylog_file: str, + server_keylog_file: str, + client_log_dir: str, + server_log_dir: str, + client_qlog_dir: str, + server_qlog_dir: str, + link_bandwidth: str, + delay: str, + packet_reorder: [str], + loss: str, + corruption: str, + server_ip: str = "127.0.0.2", + server_name: str = "server", + server_port: int = 4433, + ): + self._server_keylog_file = server_keylog_file + self._client_keylog_file = client_keylog_file + self._server_log_dir = server_log_dir + self._client_log_dir = client_log_dir + self._server_qlog_dir = server_qlog_dir + self._client_qlog_dir = client_qlog_dir + self._files = [] + self._sim_log_dir = sim_log_dir + self._server_ip = server_ip + self._server_port = server_port + self._server_name = server_name + self._link_bandwidth = link_bandwidth + self._delay = delay + self._packet_reorder = packet_reorder + self._loss = loss + self._corruption = corruption + + @abc.abstractmethod + def name(self): + pass + + @abc.abstractmethod + def desc(self): + pass + + def __str__(self): + return self.name() + + def testname(self, p: Perspective): + """ The name of testcase presented to the endpoint Docker images""" + return self.name() + + @staticmethod + def scenario() -> str: + """ Scenario for the ns3 simulator """ + return "simple-p2p --delay=15ms --bandwidth=10Mbps --queue=25" + + @staticmethod + def timeout() -> int: + """ timeout in s """ + return 120 + + @staticmethod + def additional_envs() -> List[str]: + return [""] + + @staticmethod + def additional_containers() -> List[str]: + return [""] + + @staticmethod + def use_tcpdump() ->bool: + return True + + @staticmethod + def use_ifstat() -> bool: + return False + + @staticmethod + def use_qlog() -> bool: + return True + + def urlprefix(self) -> str: + """ URL prefix """ + return f"https://{self.servername()}:{self.port()}/" + + def ip(self): + return self._server_ip + + def port(self): + return str(self._server_port) + + def servername(self): + return self._server_name + + def www_dir(self): + if not self._www_dir: + self._www_dir = tempfile.TemporaryDirectory(dir="/tmp", prefix="www_") + return self._www_dir.name + "/" + + def download_dir(self): + if not self._download_dir: + self._download_dir = tempfile.TemporaryDirectory( + dir="/tmp", prefix="download_" + ) + return self._download_dir.name + "/" + + def certs_dir(self): + if not self._cert_dir: + self._cert_dir = tempfile.TemporaryDirectory(dir="/tmp", prefix="certs_") + generate_cert_chain(self._cert_dir.name) + return self._cert_dir.name + "/" + + def _is_valid_keylog(self, filename) -> bool: + if not os.path.isfile(filename) or os.path.getsize(filename) == 0: + return False + with open(filename, "r") as file: + if not re.search( + r"^SERVER_HANDSHAKE_TRAFFIC_SECRET", file.read(), re.MULTILINE + ): + logging.info("Key log file %s is using incorrect format.", filename) + return False + return True + + def _keylog_file(self) -> str: + if self._is_valid_keylog(self._client_keylog_file): + logging.debug("Using the client's key log file.") + return self._client_keylog_file + elif self._is_valid_keylog(self._server_keylog_file): + logging.debug("Using the server's key log file.") + return self._server_keylog_file + logging.debug("No key log file found.") + + def is_bandwidth_limited(self) -> bool: + return self._link_bandwidth is not None + + def is_delay_added(self) -> bool: + return self._delay is not None + + def is_packet_reorder_added(self) -> bool: + return self._packet_reorder + + def is_loss_added(self) -> bool: + return self._loss is not None + + def is_corruption_added(self) -> bool: + return self._corruption is not None + + def bandwidth(self) -> str: + return self._link_bandwidth + + def delay(self) -> str: + return self._delay + + def packet_reorder(self) -> [str]: + return self._packet_reorder + + def loss(self) -> str: + return self._loss + + def corruption(self) -> str: + return self._corruption + + def _client_trace(self): + if self._cached_client_trace is None: + self._cached_client_trace = TraceAnalyzer( + self._sim_log_dir.name + "/trace.pcap", self._keylog_file(), + ip4_server=self._server_ip, + port_server=self._server_port, + ) + return self._cached_client_trace + + def _server_trace(self): + if self._cached_server_trace is None: + self._cached_server_trace = TraceAnalyzer( + self._sim_log_dir.name + "/trace.pcap", self._keylog_file(), + ip4_server=self._server_ip, + port_server=self._server_port, + ) + return self._cached_server_trace + + def _generate_random_file(self, size, filename_len=10, host=None) -> str: + filename = random_string(filename_len) + path = self.www_dir() + filename + + if host: # testbed mode + # https://superuser.com/questions/792427/creating-a-large-file-of-random-bytes-quickly + cmd = f'ssh {host} \'touch {path} && shred -n 1 -s {size} {path}\'' + logging.debug(cmd) + p = subprocess.Popen( + cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + stdout, stderr = p.communicate() + + else: + # See https://realpython.com/python-random/ for byte generation + # with urandom + random_bytes = os.urandom(size) + with open(path, "wb") as f: + f.write(random_bytes) + logging.debug("Generated random file: %s of size: %d", path, size) + return filename + + def _generate_file(self, content, filename_len=10, host=None) -> str: + filename = random_string(filename_len) + path = self.www_dir() + filename + + if host: # testbed mode + cmd = f'ssh {host} \'touch {path} && echo "{content}" > {path}\'' + logging.debug(cmd) + p = subprocess.Popen( + cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + p.communicate() + else: + with open(path, "w") as f: + f.write(content) + logging.debug("Generated file: %s with content: %s", path, content) + return filename + + def _retry_sent(self) -> bool: + return len(self._client_trace().get_retry()) > 0 + + def _check_version(self) -> bool: + versions = [hex(int(v, 0)) for v in self._get_versions()] + if len(versions) != 1: + logging.info("Expected exactly one version. Got %s", versions) + return False + if QUIC_VERSION not in versions: + logging.info("Wrong version. Expected %s, got %s", QUIC_VERSION, versions) + return False + return True + + def _check_files(self, client=None, server=None) -> bool: + if len(self._files) == 0: + raise Exception("No test files generated.") + + if client and server: # testbed mode + + for file in self._files: + + cmd = f'ssh {client} \'stat -c %s {self.download_dir() + file}\'' + logging.debug(cmd) + client_p = subprocess.Popen( + cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + client_stdout, client_stderr = client_p.communicate(timeout=10) + + cmd = f'ssh {server} \'stat -c %s {self.www_dir() + file}\'' + logging.debug(cmd) + server_p = subprocess.Popen( + cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + server_stdout, server_stderr = server_p.communicate(timeout=10) + + if client_p.returncode > 0 or server_p.returncode > 0: + logging.debug(f'An error occured while comparing the filesize!') + logging.debug(f'Client stderr: {client_stderr.decode()}') + logging.debug(f'Server stderr: {server_stderr.decode()}') + return False + + client_size = float(client_stdout.decode()) + server_size = float(server_stdout.decode()) + + deviation = abs(client_size - server_size) / max(client_size, server_size) + + if client_size != server_size: + logging.debug(f'Different filesize: {client_size} | {server_size}') + + # We allow differences in the filesize < 1% + if deviation < 0.01: + logging.debug(f'Different filesize tolerated (less than 1%)') + else: + logging.debug(f'Different filesize not tolerated: {deviation * 100:.2f}%') + return False + + else: + files = [ + n + for n in os.listdir(self.download_dir()) + if os.path.isfile(os.path.join(self.download_dir(), n)) + ] + too_many = [f for f in files if f not in self._files] + if len(too_many) != 0: + logging.info("Found unexpected downloaded files: %s", too_many) + too_few = [f for f in self._files if f not in files] + if len(too_few) != 0: + logging.info("Missing files: %s", too_few) + if len(too_many) != 0 or len(too_few) != 0: + return False + for f in self._files: + fp = self.download_dir() + f + if not os.path.isfile(fp): + logging.info("File %s does not exist.", fp) + return False + try: + size = os.path.getsize(self.www_dir() + f) + downloaded_size = os.path.getsize(fp) + + deviation = abs(downloaded_size - size) / max(downloaded_size, size) + + # We allow differences in the filesize < 1% + if deviation < 0.01: + logging.debug(f'Different filesize tolerated (less than 1%)') + else: + logging.debug(f'Different filesize not tolerated: {deviation * 100:.2f}%') + logging.debug(f'File size of {fp} doesn\'t match. Original: {size} bytes, downloaded: {downloaded_size} bytes.') + return False + + except Exception as exception: + logging.info( + "Could not compare files %s and %s: %s", + self.www_dir() + f, + fp, + exception, + ) + return False + logging.debug("Check of downloaded files succeeded.") + return True + + def _check_version_and_files(self) -> bool: + + if not self._check_version(): + return False + return self._check_files() + + + def _count_handshakes(self) -> int: + """ Count the number of QUIC handshakes """ + tr = self._server_trace() + # Determine the number of handshakes by looking at Initial packets. + # This is easier, since the SCID of Initial packets doesn't changes. + return len(set([p.scid for p in tr.get_initial(Direction.FROM_SERVER)])) + + def _get_versions(self) -> set: + """ Get the QUIC versions """ + tr = self._server_trace() + return set([p.version for p in tr.get_initial(Direction.FROM_SERVER)]) + + def _payload_size(self, packets: List) -> int: + """ Get the sum of the payload sizes of all packets """ + size = 0 + for p in packets: + if hasattr(p, "long_packet_type"): + if hasattr(p, "payload"): # when keys are available + size += len(p.payload.split(":")) + else: + size += len(p.remaining_payload.split(":")) + else: + if hasattr(p, "protected_payload"): + size += len(p.protected_payload.split(":")) + return size + + def cleanup(self): + if self._www_dir: + self._www_dir.cleanup() + self._www_dir = None + if self._download_dir: + self._download_dir.cleanup() + self._download_dir = None + + @abc.abstractmethod + def get_paths(self, max_size=None, host=None): + pass + + @abc.abstractmethod + def check(self, client=None, server=None) -> TestResult: + pass + + +class Measurement(TestCase): + REPETITIONS = 10 + + @abc.abstractmethod + def result(self) -> float: + pass + + @staticmethod + @abc.abstractmethod + def unit() -> str: + pass + + @classmethod + def repetitions(cls) -> int: + return cls.REPETITIONS + + @staticmethod + def use_tcpdump() ->bool: + return False + + @staticmethod + def use_ifstat() -> bool: + return True + + @staticmethod + def use_qlog() -> bool: + return False + + +class TestCaseHandshake(TestCase): + @staticmethod + def name(): + return "handshake" + + @staticmethod + def abbreviation(): + return "H" + + @staticmethod + def desc(): + return "Handshake completes successfully." + + def get_paths(self, max_size=None, host=None): + self._files = [self._generate_random_file(1 * KiB)] + return self._files + + def check(self, client=None, server=None) -> TestResult: + if not self._check_version_and_files(): + return TestResult.FAILED + if self._retry_sent(): + logging.info("Didn't expect a Retry to be sent.") + return TestResult.FAILED + num_handshakes = self._count_handshakes() + if num_handshakes != 1: + logging.info("Expected exactly 1 handshake. Got: %d", num_handshakes) + return TestResult.FAILED + return TestResult.SUCCEEDED + + +class TestCaseVersionNegotiation(TestCase): + @staticmethod + def name(): + return "versionnegotiation" + + @staticmethod + def abbreviation(): + return "V" + + @staticmethod + def desc(): + return "A version negotiation packet is elicited and acted on." + + def get_paths(self, max_size=None, host=None): + return [""] + + def check(self, client=None, server=None) -> TestResult: + tr = self._client_trace() + initials = tr.get_initial(Direction.FROM_CLIENT) + dcid = "" + for p in initials: + dcid = p.dcid + break + if dcid == "": + logging.info("Didn't find an Initial / a DCID.") + return TestResult.FAILED + vnps = tr.get_vnp() + for p in vnps: + if hasattr(p.quic, "scid"): + if p.quic.scid == dcid: + return TestResult.SUCCEEDED + logging.info("Didn't find a Version Negotiation Packet with matching SCID.") + return TestResult.FAILED + + +class TestCaseMulti(TestCase): + @staticmethod + def name(): + return "multihandshake" + + def testname(self, p: Perspective): + return "multihandshake" + + @staticmethod + def abbreviation(): + return "MHS" + + @staticmethod + def desc(): + return "Stream data is being sent and received correctly. Connection close completes with a zero error code." + + def get_paths(self, max_size=None, host=None): + self._files = [ + self._generate_random_file(2 * KiB), + self._generate_random_file(3 * KiB), + self._generate_random_file(5 * KiB), + ] + return self._files + + def check(self, client=None, server=None) -> TestResult: + num_handshakes = self._count_handshakes() + if num_handshakes != 3: + logging.info("Expected exactly 3 handshake. Got: %d", num_handshakes) + return TestResult.FAILED + if not self._check_version_and_files(): + return TestResult.FAILED + return TestResult.SUCCEEDED + + +class TestCaseTransfer(TestCase): + @staticmethod + def name(): + return "transfer" + + def testname(self, p: Perspective): + return "transfer" + + @staticmethod + def abbreviation(): + return "T" + + @staticmethod + def desc(): + return "Stream data is being sent and received correctly. Connection close completes with a zero error code." + + def get_paths(self, max_size=None, host=None): + self._files = [ + self._generate_random_file(2 * KiB), + self._generate_random_file(3 * KiB), + self._generate_random_file(5 * KiB), + ] + return self._files + + def check(self, client=None, server=None) -> TestResult: + num_handshakes = self._count_handshakes() + if num_handshakes != 1: + logging.info("Expected exactly 1 handshake. Got: %d", num_handshakes) + return TestResult.FAILED + if not self._check_version_and_files(): + return TestResult.FAILED + return TestResult.SUCCEEDED + +class TestCaseFollow(TestCase): + @staticmethod + def name(): + return "follow" + + def testname(self, p: Perspective): + return "follow" + + @staticmethod + def abbreviation(): + return "F" + + @staticmethod + def desc(): + return "Two files are created but the name of the second file is in the first file. The client only has one REQUEST but should download both files." + + def get_paths(self, max_size=None, host=None): + second_file = self._generate_random_file(5 * KiB, host=host) + first_file = self._generate_file(second_file, host=host) + self._files = [first_file, second_file] + return self._files[:1] + + def check(self, client=None, server=None) -> TestResult: + if not self._check_files(client=client, server=server): + return TestResult.FAILED + return TestResult.SUCCEEDED + + +class TestCaseChaCha20(TestCase): + @staticmethod + def name(): + return "chacha20" + + @staticmethod + def testname(p: Perspective): + return "chacha20" + + @staticmethod + def abbreviation(): + return "C20" + + @staticmethod + def desc(): + return "Handshake completes using ChaCha20." + + def get_paths(self, max_size=None, host=None): + self._files = [self._generate_random_file(3 * KiB)] + return self._files + + def check(self, client=None, server=None) -> TestResult: + num_handshakes = self._count_handshakes() + if num_handshakes != 1: + logging.info("Expected exactly 1 handshake. Got: %d", num_handshakes) + return TestResult.FAILED + ciphersuites = [] + for p in self._client_trace().get_initial(Direction.FROM_CLIENT): + if hasattr(p, "tls_handshake_ciphersuite"): + ciphersuites.append(p.tls_handshake_ciphersuite) + if len(set(ciphersuites)) != 1 or (ciphersuites[0] != "4867" and ciphersuites [0] != "0x1303"): + logging.info( + "Expected only ChaCha20 cipher suite to be offered. Got: %s", + set(ciphersuites), + ) + return TestResult.FAILED + if not self._check_version_and_files(): + return TestResult.FAILED + return TestResult.SUCCEEDED + + +class TestCaseTransportParameter(TestCase): + @staticmethod + def name(): + return "transportparameter" + + @staticmethod + def testname(p: Perspective): + return "transportparameter" + + @staticmethod + def abbreviation(): + return "TP" + + @staticmethod + def desc(): + return "Hundreds of files are transferred over a single connection, and server increased stream limits to accommodate client requests." + + def get_paths(self, max_size=None, host=None): + for _ in range(1, 5): + self._files.append(self._generate_random_file(32)) + return self._files + + def check(self, client=None, server=None) -> TestResult: + if not self._keylog_file(): + logging.info("Can't check test result. SSLKEYLOG required.") + return TestResult.UNSUPPORTED + num_handshakes = self._count_handshakes() + if num_handshakes != 1: + logging.info("Expected exactly 1 handshake. Got: %d", num_handshakes) + return TestResult.FAILED + if not self._check_version_and_files(): + return TestResult.FAILED + # Check that the server set a bidirectional stream limit <= 1000 + checked_stream_limit = False + for p in self._client_trace().get_handshake(Direction.FROM_SERVER): + if hasattr(p, "tls.quic.parameter.initial_max_streams_bidi"): + checked_stream_limit = True + stream_limit = int( + getattr(p, "tls.quic.parameter.initial_max_streams_bidi") + ) + logging.debug("Server set bidirectional stream limit: %d", stream_limit) + if stream_limit > 10: + logging.info("Server set a stream limit > 10.") + return TestResult.FAILED + if not checked_stream_limit: + logging.info("Couldn't check stream limit.") + return TestResult.FAILED + return TestResult.SUCCEEDED + + +class TestCaseRetry(TestCase): + @staticmethod + def name(): + return "retry" + + @staticmethod + def abbreviation(): + return "S" + + @staticmethod + def desc(): + return "Server sends a Retry, and a subsequent connection using the Retry token completes successfully." + + def get_paths(self, max_size=None, host=None): + self._files = [ + self._generate_random_file(10 * KiB), + ] + return self._files + + def _check_trace(self) -> bool: + # check that (at least) one Retry packet was actually sent + tr = self._client_trace() + tokens = [] + retries = tr.get_retry(Direction.FROM_SERVER) + for p in retries: + if not hasattr(p, "retry_token"): + logging.info("Retry packet doesn't have a retry_token") + logging.info(p) + return False + tokens += [p.retry_token.replace(":", "")] + if len(tokens) == 0: + logging.info("Didn't find any Retry packets.") + return False + + # check that an Initial packet uses a token sent in the Retry packet(s) + highest_pn_before_retry = -1 + for p in tr.get_initial(Direction.FROM_CLIENT): + pn = int(p.packet_number) + if p.token_length == "0": + highest_pn_before_retry = max(highest_pn_before_retry, pn) + continue + if pn <= highest_pn_before_retry: + logging.debug( + "Client reset the packet number. Check failed for PN %d", pn + ) + return False + token = p.token.replace(":", "") + if token in tokens: + logging.debug("Check of Retry succeeded. Token used: %s", token) + return True + logging.info("Didn't find any Initial packet using a Retry token.") + return False + + def check(self, client=None, server=None) -> TestResult: + num_handshakes = self._count_handshakes() + if num_handshakes != 1: + logging.info("Expected exactly 1 handshake. Got: %d", num_handshakes) + return TestResult.FAILED + if not self._check_version_and_files(): + return TestResult.FAILED + if not self._check_trace(): + return TestResult.FAILED + return TestResult.SUCCEEDED + + +class TestCaseResumption(TestCase): + @staticmethod + def name(): + return "resumption" + + @staticmethod + def abbreviation(): + return "R" + + @staticmethod + def desc(): + return "Connection is established using TLS Session Resumption." + + def get_paths(self, max_size=None, host=None): + self._files = [ + self._generate_random_file(5 * KiB), + self._generate_random_file(10 * KiB), + ] + return self._files + + def check(self, client=None, server=None) -> TestResult: + if not self._keylog_file(): + logging.info("Can't check test result. SSLKEYLOG required.") + return TestResult.UNSUPPORTED + num_handshakes = self._count_handshakes() + if num_handshakes != 2: + logging.info("Expected exactly 2 handshake. Got: %d", num_handshakes) + return TestResult.FAILED + + handshake_packets = self._client_trace().get_handshake(Direction.FROM_SERVER) + cids = [p.scid for p in handshake_packets] + first_handshake_has_cert = False + for p in handshake_packets: + if p.scid == cids[0]: + if hasattr(p, "tls_handshake_certificates_length"): + first_handshake_has_cert = True + elif p.scid == cids[len(cids) - 1]: # second handshake + if hasattr(p, "tls_handshake_certificates_length"): + logging.info( + "Server sent a Certificate message in the second handshake." + ) + return TestResult.FAILED + else: + logging.info( + "Found handshake packet that neither belongs to the first nor the second handshake." + ) + return TestResult.FAILED + if not first_handshake_has_cert: + logging.info( + "Didn't find a Certificate message in the first handshake. That's weird." + ) + return TestResult.FAILED + if not self._check_version_and_files(): + return TestResult.FAILED + return TestResult.SUCCEEDED + + +class TestCaseZeroRTT(TestCase): + NUM_FILES = 2 + FILESIZE = 10 * KiB # in bytes + FILENAMELEN = 10 + + @staticmethod + def name(): + return "zerortt" + + @staticmethod + def abbreviation(): + return "Z" + + @staticmethod + def desc(): + return "0-RTT data is being sent and acted on." + + def get_paths(self, max_size=None, host=None): + for _ in range(self.NUM_FILES): + self._files.append( + self._generate_random_file(self.FILESIZE, self.FILENAMELEN) + ) + return self._files + + def check(self, client=None, server=None) -> TestResult: + num_handshakes = self._count_handshakes() + if num_handshakes != 2: + logging.info("Expected exactly 2 handshakes. Got: %d", num_handshakes) + return TestResult.FAILED + if not self._check_version_and_files(): + return TestResult.FAILED + tr = self._client_trace() + zeroRTTSize = self._payload_size(tr.get_0rtt()) + oneRTTSize = self._payload_size(tr.get_1rtt(Direction.FROM_CLIENT)) + logging.debug("0-RTT size: %d", zeroRTTSize) + logging.debug("1-RTT size: %d", oneRTTSize) + if zeroRTTSize == 0: + logging.info("Client didn't send any 0-RTT data.") + return TestResult.FAILED + return TestResult.SUCCEEDED + + +class MeasurementGoodput(Measurement): + FILESIZE = 1 * GiB + _result = 0.0 + + @staticmethod + def name(): + return "goodput" + + @staticmethod + def timeout(): + return 300 + + @staticmethod + def unit() -> str: + return "Mbps" + + @staticmethod + def testname(p: Perspective): + return "goodput" + + @staticmethod + def abbreviation(): + return "G" + + @staticmethod + def desc(): + return "Measures connection goodput as baseline." + + def get_paths(self, max_size=None, host=None): + if max_size and max_size < self.FILESIZE: + logging.debug(f'Limit filesize for {self.name()} to {max_size}') + self.FILESIZE = max_size + self._files = [ + self._generate_random_file( + self.FILESIZE, + host=host + ) + ] + return self._files + + def check(self, client=None, server=None) -> TestResult: + if not self._check_files(client=client, server=server): + return TestResult.FAILED + + time = (self._end_time - self._start_time) / timedelta(seconds=1) + goodput = (8 * self.FILESIZE) / time / 10**6 + logging.info( + f"Transferring {self.FILESIZE / 10**6:.2f} MB took {time:.3f} s. Goodput: {goodput:.3f} {self.unit()}", + ) + self._result = goodput + + return TestResult.SUCCEEDED + + def result(self) -> float: + return self._result + + +class MeasurementQlog(Measurement): + FILESIZE = 200 * MiB + _result = 0.0 + + @staticmethod + def name(): + return "qlog" + + @staticmethod + def timeout(): + return 80 + + @staticmethod + def unit() -> str: + return "Mbps" + + @staticmethod + def testname(p: Perspective): + return "qlog" + + @staticmethod + def abbreviation(): + return "Q" + + @staticmethod + def desc(): + return "Measures connection goodput while running qlog." + + @staticmethod + def use_qlog() -> bool: + return True + + def get_paths(self, max_size=None, host=None): + self._files = [self._generate_random_file(min(self.FILESIZE, max_size) if max_size else self.FILESIZE )] + return self._files + + def check(self, client=None, server=None) -> TestResult: + + result_status = TestResult.SUCCEEDED + + # Check if qlog file exists + client_qlogs = [os.path.join(self._client_qlog_dir, name) for name in os.listdir(self._client_qlog_dir)] + server_qlogs = [os.path.join(self._server_qlog_dir, name) for name in os.listdir(self._server_qlog_dir)] + + if len(client_qlogs) < 1: + logging.info(f"Expected at least 1 qlog file from client. Got: {len(client_qlogs)}") + result_status = TestResult.FAILED + + if len(server_qlogs) < 1: + logging.info(f"Expected at least 1 qlog file from server. Got: {len(server_qlogs)}") + result_status = TestResult.FAILED + + logging.debug(f"Deleting {len(client_qlogs + server_qlogs)} qlogs") + for f in client_qlogs + server_qlogs: + os.remove(f) + + if not self._check_files(): + result_status = TestResult.FAILED + + if result_status == TestResult.FAILED: + return result_status + + time = (self._end_time - self._start_time) / timedelta(seconds=1) + goodput = (8 * self.FILESIZE) / time / 10**6 + logging.info( + f"Transferring {self.FILESIZE / 10**6:.2f} MB took {time:.3f} s. Goodput (with qlog): {goodput:.3f} {self.unit()}", + ) + self._result = goodput + return TestResult.SUCCEEDED + + def result(self) -> float: + return self._result + + +class MeasurementOptimize(MeasurementGoodput): + + @staticmethod + def name(): + return "optimize" + + @staticmethod + def timeout(): + return 80 + + @staticmethod + def testname(p: Perspective): + return "optimize" + + @staticmethod + def abbreviation(): + return "Opt" + + @staticmethod + def desc(): + return "Measures connection goodput with optimizations." + + +TESTCASES = [ + TestCaseHandshake, + TestCaseTransfer, + TestCaseMulti, + TestCaseChaCha20, + TestCaseTransportParameter, + TestCaseRetry, + TestCaseResumption, + TestCaseZeroRTT, + TestCaseFollow +] + +MEASUREMENTS = [ + MeasurementGoodput, + MeasurementQlog, + MeasurementOptimize, +] diff --git a/trace.py b/trace.py new file mode 100644 index 0000000..c5ca2af --- /dev/null +++ b/trace.py @@ -0,0 +1,197 @@ +import datetime +import logging +from enum import Enum +from typing import List, Optional, Tuple + +import pyshark + +IP4_SERVER = "127.0.0.1" +IP6_SERVER = "fd00:cafe:cafe:100::100" +PORT_SERVER = 4433 +QUIC_V2 = hex(0x6B3343CF) + + +class Direction(Enum): + ALL = 0 + FROM_CLIENT = 1 + FROM_SERVER = 2 + INVALID = 3 + + +class PacketType(Enum): + INITIAL = 1 + HANDSHAKE = 2 + ZERORTT = 3 + RETRY = 4 + ONERTT = 5 + VERSIONNEGOTIATION = 6 + INVALID = 7 + + +WIRESHARK_PACKET_TYPES = { + PacketType.INITIAL: "0", + PacketType.ZERORTT: "1", + PacketType.HANDSHAKE: "2", + PacketType.RETRY: "3", +} + + +WIRESHARK_PACKET_TYPES_V2 = { + PacketType.INITIAL: "1", + PacketType.ZERORTT: "2", + PacketType.HANDSHAKE: "3", + PacketType.RETRY: "0", +} + + +def get_packet_type(p) -> PacketType: + if p.quic.header_form == "0": + return PacketType.ONERTT + if p.quic.version == "0x00000000": + return PacketType.VERSIONNEGOTIATION + if p.quic.version == QUIC_V2: + for t, num in WIRESHARK_PACKET_TYPES_V2.items(): + if p.quic.long_packet_type_v2 == num: + return t + return PacketType.INVALID + for t, num in WIRESHARK_PACKET_TYPES.items(): + if p.quic.long_packet_type == num: + return t + return PacketType.INVALID + + +class TraceAnalyzer: + _filename = "" + + def __init__( + self, + filename: str, + keylog_file: Optional[str] = None, + ip4_server: str=None, + ip6_server: str = None, + port_server: int = None, + ): + self._filename = filename + self._keylog_file = keylog_file + self._ip4_server = ip4_server or IP4_SERVER + self._ip6_server = ip6_server or IP6_SERVER + self._port_server = port_server or PORT_SERVER + + def _get_direction_filter(self, d: Direction) -> str: + f = "(quic && !icmp) && " + if d == Direction.FROM_CLIENT: + return ( + f + "((ip.dst==" + self._ip4_server + " || ipv6.dst==" + self._ip6_server + ") && udp.dstport==" + str(self._port_server) + ") && " + ) + elif d == Direction.FROM_SERVER: + return ( + f + "((ip.src==" + self._ip4_server + " || ipv6.src==" + self._ip6_server + ") && udp.srcport==" + str(self._port_server) + ") && " + ) + else: + return f + + def _get_packets(self, f: str) -> List: + override_prefs = {} + if self._keylog_file is not None: + override_prefs["tls.keylog_file"] = self._keylog_file + cap = pyshark.FileCapture( + self._filename, + display_filter=f, + override_prefs=override_prefs, + disable_protocol="http3", # see https://github.com/marten-seemann/quic-interop-runner/pull/179/ + decode_as={"udp.port==443": "quic"}, + ) + packets = [] + # If the pcap has been cut short in the middle of the packet, pyshark will crash. + # See https://github.com/KimiNewt/pyshark/issues/390. + try: + for p in cap: + packets.append(p) + cap.close() + except Exception as e: + logging.debug(e) + + if self._keylog_file is not None: + for p in packets: + if hasattr(p["quic"], "decryption_failed"): + logging.info("At least one QUIC packet could not be decrypted") + logging.debug(p) + break + return packets + + def get_raw_packets(self, direction: Direction = Direction.ALL) -> List: + packets = [] + for packet in self._get_packets(self._get_direction_filter(direction) + "quic"): + packets.append(packet) + return packets + + def get_1rtt(self, direction: Direction = Direction.ALL) -> List: + """Get all QUIC packets, one or both directions.""" + packets, _, _ = self.get_1rtt_sniff_times(direction) + return packets + + def get_1rtt_sniff_times( + self, direction: Direction = Direction.ALL + ) -> Tuple[List, datetime.datetime, datetime.datetime]: + """Get all QUIC packets, one or both directions, and first and last sniff times.""" + packets = [] + first, last = 0, 0 + for packet in self._get_packets( + self._get_direction_filter(direction) + "quic.header_form==0" + ): + for layer in packet.layers: + if ( + layer.layer_name == "quic" + and not hasattr(layer, "long_packet_type") + and not hasattr(layer, "long_packet_type_v2") + ): + if first == 0: + first = packet.sniff_time + last = packet.sniff_time + packets.append(layer) + return packets, first, last + + def get_vnp(self, direction: Direction = Direction.ALL) -> List: + return self._get_packets( + self._get_direction_filter(direction) + "quic.version==0" + ) + + def _get_long_header_packets( + self, packet_type: PacketType, direction: Direction + ) -> List: + packets = [] + for packet in self._get_packets( + self._get_direction_filter(direction) + + "(quic.long.packet_type || quic.long.packet_type_v2)" + ): + for layer in packet.layers: + if layer.layer_name == "quic" and ( + ( + hasattr(layer, "long_packet_type") + and layer.long_packet_type + == WIRESHARK_PACKET_TYPES[packet_type] + ) + or ( + hasattr(layer, "long_packet_type_v2") + and layer.long_packet_type_v2 + == WIRESHARK_PACKET_TYPES_V2[packet_type] + ) + ): + packets.append(layer) + return packets + + def get_initial(self, direction: Direction = Direction.ALL) -> List: + """ Get all Initial packets. """ + return self._get_long_header_packets(PacketType.INITIAL, direction) + + def get_retry(self, direction: Direction = Direction.ALL) -> List: + """ Get all Retry packets. """ + return self._get_long_header_packets(PacketType.RETRY, direction) + + def get_handshake(self, direction: Direction = Direction.ALL) -> List: + """ Get all Handshake packets. """ + return self._get_long_header_packets(PacketType.HANDSHAKE, direction) + + def get_0rtt(self) -> List: + """ Get all 0-RTT packets. """ + return self._get_long_header_packets(PacketType.ZERORTT, Direction.FROM_CLIENT)