This commit is contained in:
Dongho Kim
2026-05-17 16:33:43 +02:00
commit a221870b70
20 changed files with 1285 additions and 0 deletions
+191
View File
@@ -0,0 +1,191 @@
#!/usr/bin/env python3
"""eBPF-based UDP tracer for MP-QUIC performance measurement.
Traces kernel-level UDP send/receive latency and packet drops.
Requires root privileges and Linux with BCC installed.
"""
from bcc import BPF
import time
import argparse
import signal
import sys
bpf_text = """
#include <uapi/linux/ptrace.h>
#include <net/sock.h>
#include <net/inet_sock.h>
#include <bcc/proto.h>
#define MPQUIC_PORT __TARGET_PORT__
BPF_HASH(send_start, u32, u64);
BPF_HASH(recv_start, u32, u64);
BPF_PERF_OUTPUT(events);
struct event_t {
u64 timestamp_ns;
u64 delta_ns;
u32 pid;
u16 sport;
u16 dport;
u8 event_type; // 0=SEND, 1=RECV, 2=DROP
};
int trace_udp_sendmsg(struct pt_regs *ctx, struct sock *sk) {
struct inet_sock *inet = (struct inet_sock *)sk;
u16 dport = 0;
bpf_probe_read_kernel(&dport, sizeof(dport), &inet->inet_dport);
dport = ntohs(dport);
// Filter: only trace traffic to/from MP-QUIC port
u16 sport = 0;
bpf_probe_read_kernel(&sport, sizeof(sport), &inet->inet_sport);
sport = ntohs(sport);
if (dport != MPQUIC_PORT && sport != MPQUIC_PORT)
return 0;
u32 pid = bpf_get_current_pid_tgid();
u64 ts = bpf_ktime_get_ns();
send_start.update(&pid, &ts);
return 0;
}
int trace_udp_sendmsg_ret(struct pt_regs *ctx) {
u32 pid = bpf_get_current_pid_tgid();
u64 *tsp = send_start.lookup(&pid);
if (tsp == 0)
return 0;
u64 delta = bpf_ktime_get_ns() - *tsp;
send_start.delete(&pid);
struct event_t event = {};
event.timestamp_ns = bpf_ktime_get_ns();
event.delta_ns = delta;
event.pid = pid;
event.event_type = 0; // SEND
events.perf_submit(ctx, &event, sizeof(event));
return 0;
}
int trace_udp_recvmsg(struct pt_regs *ctx, struct sock *sk) {
struct inet_sock *inet = (struct inet_sock *)sk;
u16 sport = 0;
bpf_probe_read_kernel(&sport, sizeof(sport), &inet->inet_sport);
sport = ntohs(sport);
u16 dport = 0;
bpf_probe_read_kernel(&dport, sizeof(dport), &inet->inet_dport);
dport = ntohs(dport);
if (sport != MPQUIC_PORT && dport != MPQUIC_PORT)
return 0;
u32 pid = bpf_get_current_pid_tgid();
u64 ts = bpf_ktime_get_ns();
recv_start.update(&pid, &ts);
return 0;
}
int trace_udp_recvmsg_ret(struct pt_regs *ctx) {
u32 pid = bpf_get_current_pid_tgid();
u64 *tsp = recv_start.lookup(&pid);
if (tsp == 0)
return 0;
u64 delta = bpf_ktime_get_ns() - *tsp;
recv_start.delete(&pid);
struct event_t event = {};
event.timestamp_ns = bpf_ktime_get_ns();
event.delta_ns = delta;
event.pid = pid;
event.event_type = 1; // RECV
events.perf_submit(ctx, &event, sizeof(event));
return 0;
}
TRACEPOINT_PROBE(skb, kfree_skb) {
struct event_t event = {};
event.timestamp_ns = bpf_ktime_get_ns();
event.delta_ns = 0;
event.pid = bpf_get_current_pid_tgid();
event.event_type = 2; // DROP
events.perf_submit(ctx, &event, sizeof(event));
return 0;
}
"""
EVENT_TYPES = {0: "SEND_LATENCY", 1: "RECV_LATENCY", 2: "DROP"}
# Counters for live summary
stats = {"send_count": 0, "recv_count": 0, "drop_count": 0,
"send_total_ns": 0, "recv_total_ns": 0}
def main():
parser = argparse.ArgumentParser(description="eBPF UDP Tracer for MP-QUIC")
parser.add_argument("--output", type=str, default="measurement.csv",
help="CSV output file")
parser.add_argument("--port", type=int, default=4242,
help="MP-QUIC port to filter on")
args = parser.parse_args()
program = bpf_text.replace("__TARGET_PORT__", str(args.port))
print(f"Compiling eBPF program (requires root)...")
print(f"Filtering on UDP port {args.port}")
b = BPF(text=program)
# Attach kprobes for send and receive paths
b.attach_kprobe(event="udp_sendmsg", fn_name="trace_udp_sendmsg")
b.attach_kretprobe(event="udp_sendmsg", fn_name="trace_udp_sendmsg_ret")
b.attach_kprobe(event="udp_recvmsg", fn_name="trace_udp_recvmsg")
b.attach_kretprobe(event="udp_recvmsg", fn_name="trace_udp_recvmsg_ret")
csv_file = open(args.output, "w")
csv_file.write("timestamp,event_type,value_ns,pid\n")
def handle_event(cpu, data, size):
event = b["events"].event(data)
etype = EVENT_TYPES.get(event.event_type, "UNKNOWN")
csv_file.write(f"{time.time()},{etype},{event.delta_ns},{event.pid}\n")
# Update live stats
if event.event_type == 0:
stats["send_count"] += 1
stats["send_total_ns"] += event.delta_ns
elif event.event_type == 1:
stats["recv_count"] += 1
stats["recv_total_ns"] += event.delta_ns
elif event.event_type == 2:
stats["drop_count"] += 1
b["events"].open_perf_buffer(handle_event)
def signal_handler(sig, frame):
print("\n--- Measurement Summary ---")
if stats["send_count"] > 0:
avg_send = stats["send_total_ns"] / stats["send_count"] / 1000
print(f" Send events: {stats['send_count']:>8} (avg {avg_send:.1f} µs)")
if stats["recv_count"] > 0:
avg_recv = stats["recv_total_ns"] / stats["recv_count"] / 1000
print(f" Recv events: {stats['recv_count']:>8} (avg {avg_recv:.1f} µs)")
print(f" Drop events: {stats['drop_count']:>8}")
print(f" Output file: {args.output}")
csv_file.close()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
print(f"Tracing... Output → {args.output}. Ctrl-C to stop.")
while True:
b.perf_buffer_poll()
if __name__ == "__main__":
main()
+139
View File
@@ -0,0 +1,139 @@
package main
import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"encoding/csv"
"encoding/pem"
"flag"
"fmt"
"io"
"log"
"math/big"
"os"
"strconv"
"sync"
"time"
quic "github.com/AeonDave/mp-quic-go"
)
func generateTLSConfig() *tls.Config {
key, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
panic(err)
}
template := x509.Certificate{SerialNumber: big.NewInt(1)}
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key)
if err != nil {
panic(err)
}
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)})
certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER})
tlsCert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
panic(err)
}
return &tls.Config{
Certificates: []tls.Certificate{tlsCert},
NextProtos: []string{"mpquic-exp"},
}
}
func main() {
addr := flag.String("addr", "0.0.0.0:4242", "Address to listen on")
maxPaths := flag.Int("maxpaths", 4, "Maximum number of paths")
outCSV := flag.String("output", "app_metrics.csv", "Output CSV for application-level metrics")
flag.Parse()
listener, err := quic.ListenAddr(*addr, generateTLSConfig(), &quic.Config{
MaxPaths: *maxPaths,
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Server listening on %s with max %d paths\n", *addr, *maxPaths)
fmt.Printf("Logging app-level metrics to %s\n", *outCSV)
// Open CSV
f, err := os.OpenFile(*outCSV, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
log.Fatal("Failed to open CSV:", err)
}
defer f.Close()
writer := csv.NewWriter(f)
writer.Write([]string{"sequence_number", "drone_send_time", "ground_recv_time", "latency_ns", "bytes_received"})
writer.Flush()
// Mutex to protect CSV writer if multiple streams are used
var mu sync.Mutex
for {
conn, err := listener.Accept(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Println("Client connected:", conn.RemoteAddr())
go func(conn *quic.Conn) {
for {
stream, err := conn.AcceptStream(context.Background())
if err != nil {
fmt.Println("Session error:", err)
return
}
go func(stream *quic.Stream) {
header := make([]byte, 4)
for {
// Read 4-byte length prefix
_, err := io.ReadFull(stream, header)
if err != nil {
fmt.Printf("Stream closed\n")
return
}
length := binary.BigEndian.Uint32(header)
if length < 20 {
fmt.Printf("Warning: Invalid frame length %d\n", length)
continue
}
// Read rest of the payload
payload := make([]byte, length-4)
_, err = io.ReadFull(stream, payload)
if err != nil {
fmt.Printf("Stream read error: %v\n", err)
return
}
recvTime := time.Now().UnixNano()
seqNum := binary.BigEndian.Uint64(payload[0:8])
sendTime := binary.BigEndian.Uint64(payload[8:16])
latency := recvTime - int64(sendTime)
mu.Lock()
writer.Write([]string{
strconv.FormatUint(seqNum, 10),
strconv.FormatUint(sendTime, 10),
strconv.FormatInt(recvTime, 10),
strconv.FormatInt(latency, 10),
strconv.FormatUint(uint64(length), 10),
})
// Periodically flush?
if seqNum % 1000 == 0 {
writer.Flush()
}
mu.Unlock()
}
}(stream)
}
}(conn)
}
}
+11
View File
@@ -0,0 +1,11 @@
#!/bin/bash
set -euo pipefail
cd "$(dirname "$0")/.."
# Build the server
echo "Building server..."
go build -o server_bin main.go
echo "Starting server..."
./server_bin "$@"
+94
View File
@@ -0,0 +1,94 @@
#!/bin/bash
# Setup script for MP-QUIC experiment environment.
# Run on each Linux machine (server + client) before experiments.
set -euo pipefail
echo "╔═══════════════════════════════════════════╗"
echo "║ MP-QUIC Experiment Environment Setup ║"
echo "╚═══════════════════════════════════════════╝"
# --- OS Check ---
if [[ ! -f /etc/os-release ]]; then
echo "❌ Not running on Linux. eBPF tracing requires Linux."
echo " You can still build and run the Go binaries on this machine."
exit 1
fi
. /etc/os-release
echo "📋 Detected OS: $PRETTY_NAME"
# --- Install dependencies ---
if [[ "$ID" == "ubuntu" || "$ID" == "debian" ]]; then
echo ""
echo "📦 Installing packages..."
sudo apt-get update -qq
sudo apt-get install -y -qq \
bpfcc-tools \
python3-bpfcc \
linux-headers-$(uname -r) \
golang-go \
iperf3 \
net-tools \
iproute2
# Ensure Python3 BCC bindings work
python3 -c "from bcc import BPF; print(' ✓ BCC Python bindings OK')" 2>/dev/null || {
echo " ⚠ BCC Python import failed. Try: sudo apt install python3-bpfcc"
}
elif [[ "$ID" == "fedora" || "$ID" == "rhel" || "$ID" == "centos" ]]; then
echo ""
echo "📦 Installing packages (DNF)..."
sudo dnf install -y \
bcc-tools \
python3-bcc \
kernel-devel-$(uname -r) \
golang \
iperf3 \
iproute
else
echo "⚠ Unsupported distro: $ID"
echo " Please install manually: bcc-tools, python3-bcc, golang, linux-headers"
fi
# --- Verify Go ---
echo ""
if command -v go &>/dev/null; then
echo "✓ Go $(go version | awk '{print $3}')"
else
echo "❌ Go not found. Install from https://go.dev/dl/"
exit 1
fi
# --- Kernel config check ---
echo ""
echo "🔍 Checking kernel eBPF support..."
if [[ -d /sys/kernel/debug/tracing ]]; then
echo " ✓ debugfs mounted"
else
echo " ⚠ debugfs not mounted. Run: sudo mount -t debugfs debugfs /sys/kernel/debug"
fi
if grep -q CONFIG_BPF=y /boot/config-$(uname -r) 2>/dev/null; then
echo " ✓ CONFIG_BPF enabled"
else
echo " ⚠ Could not verify CONFIG_BPF (may still work)"
fi
# --- Network tuning (optional) ---
echo ""
echo "🔧 Applying network tuning..."
sudo sysctl -w net.core.rmem_max=26214400 2>/dev/null && echo " ✓ rmem_max=25MB" || true
sudo sysctl -w net.core.wmem_max=26214400 2>/dev/null && echo " ✓ wmem_max=25MB" || true
sudo sysctl -w net.core.rmem_default=1048576 2>/dev/null || true
sudo sysctl -w net.core.wmem_default=1048576 2>/dev/null || true
echo ""
echo "╔═══════════════════════════════════════════╗"
echo "║ ✓ Setup complete! ║"
echo "║ ║"
echo "║ Quick start: ║"
echo "║ python3 run_experiment.py --duration 10 ║"
echo "║ ║"
echo "║ With eBPF (needs sudo): ║"
echo "║ python3 run_experiment.py --ebpf ║"
echo "╚═══════════════════════════════════════════╝"