diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d3892af --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +*.csv +plots/ +bin/ +__pycache__/ +.DS_Store diff --git a/README.md b/README.md index b5829d1..4b63803 100644 --- a/README.md +++ b/README.md @@ -23,12 +23,13 @@ Deploy the `server/` directory to the Ground Station machine. Deploy the `client/` directory to the Drone machine. ```bash # Blasts framed payloads to the ground station. -# You can specify the scheduler, the duration in seconds, and the payload size in bytes. +# You can specify the scheduler, the duration in seconds, the message size, and an optional chunk size. ./client/scripts/run.sh \ --addr :4242 \ --scheduler minrtt \ --duration 30 \ - --payload-size 2048 + --message-size 2048 \ + --chunk-size 512 ``` ### Analyzing the Results diff --git a/analysis/visualize.py b/analysis/visualize.py index f4e0a0d..a1db664 100755 --- a/analysis/visualize.py +++ b/analysis/visualize.py @@ -1,7 +1,8 @@ #!/usr/bin/env python3 """MP-QUIC Application & eBPF Results Visualizer. -This script takes the CSV outputs and plots their latency distributions and timelines. +This script takes the CSV outputs and plots their latency distributions and timelines, +with advanced network statistics. Usage: python visualize.py --app ../server/app_metrics.csv @@ -13,6 +14,7 @@ import pandas as pd import matplotlib.pyplot as plt import seaborn as sns import os +import numpy as np def load_ebpf_data(csv_path, label): if not os.path.exists(csv_path): @@ -24,7 +26,7 @@ def load_ebpf_data(csv_path, label): df = df.sort_values('timestamp') if not df.empty: first_event_time = df['timestamp'].iloc[0] - df['rel_time'] = df['timestamp'] - first_event_time + df['rel_time'] = (df['timestamp'] - first_event_time) / 1e9 else: df['rel_time'] = 0.0 df['value_us'] = df['value_ns'] / 1000.0 @@ -41,41 +43,98 @@ def plot_app_metrics(csv_path, output_dir): print("App metrics file is empty.") return - df = df.sort_values('sequence_number') + # Sort by message ID and then chunk index + df = df.sort_values(['message_id', 'chunk_index']) - # Calculate Jitter and Rel Latency + # Calculate Latency df['latency_ms'] = df['latency_ns'] / 1000000.0 + # Calculate Jitter (absolute difference between consecutive latencies) + df['jitter_ms'] = df['latency_ms'].diff().abs() + # Relative time from first packet received df['rel_time'] = (df['ground_recv_time'] - df['ground_recv_time'].min()) / 1e9 - plt.figure(figsize=(10, 5)) - sns.scatterplot(x='rel_time', y='latency_ms', data=df, s=15, alpha=0.6) - plt.title('App-Level End-to-End Latency Over Time (Glass-to-Glass)') - plt.ylabel('Relative Latency (ms)') - plt.xlabel('Time (s)') - out_path = os.path.join(output_dir, 'app_latency_timeline.png') - plt.savefig(out_path, dpi=300, bbox_inches='tight') + # Plot 1: Latency and Jitter Over Time + fig, ax1 = plt.subplots(figsize=(12, 6)) + + sns.scatterplot(x='rel_time', y='latency_ms', data=df, s=30, alpha=0.5, color='#1f77b4', label='Latency (ms)', ax=ax1, edgecolor='none') + + # Rolling average latency + df['rolling_latency'] = df['latency_ms'].rolling(window=20, min_periods=1).mean() + sns.lineplot(x='rel_time', y='rolling_latency', data=df, color='#d62728', linewidth=2.5, label='Moving Avg Latency', ax=ax1) + + ax1.set_title('App-Level Network Performance: Latency & Jitter Over Time', fontsize=16, fontweight='bold', pad=20) + ax1.set_ylabel('Latency (ms)', fontsize=13, fontweight='bold') + ax1.set_xlabel('Time (s)', fontsize=13, fontweight='bold') + ax1.grid(True, linestyle='--', alpha=0.7) + + # Add Jitter on a secondary y-axis + ax2 = ax1.twinx() + sns.lineplot(x='rel_time', y='jitter_ms', data=df, color='#2ca02c', alpha=0.4, linewidth=1.5, label='Jitter (ms)', ax=ax2) + ax2.set_ylabel('Jitter (ms)', fontsize=13, fontweight='bold', color='#2ca02c') + ax2.tick_params(axis='y', labelcolor='#2ca02c') + + # Combine legends + lines_1, labels_1 = ax1.get_legend_handles_labels() + lines_2, labels_2 = ax2.get_legend_handles_labels() + ax1.legend(lines_1 + lines_2, labels_1 + labels_2, loc='upper left', frameon=True, shadow=True) + + out_path_timeline = os.path.join(output_dir, 'app_performance_timeline.png') + plt.tight_layout() + plt.savefig(out_path_timeline, dpi=300, bbox_inches='tight') plt.close() - # Packet Loss Calculation - max_seq = df['sequence_number'].max() - min_seq = df['sequence_number'].min() - expected_packets = max_seq - min_seq + 1 - received_packets = len(df) - lost_packets = expected_packets - received_packets - reliability = (received_packets / expected_packets) * 100 if expected_packets > 0 else 0 - - print("\n" + "=" * 55) - print(" 🏆 APP-LEVEL RELIABILITY (Drone -> Ground)") - print("=" * 55) - print(f" Packets Sent (Expected): {expected_packets}") - print(f" Packets Received: {received_packets}") - print(f" Packets Lost: {lost_packets}") - print(f" Reliability: {reliability:.5f}%") - print("=" * 55) + # Plot 2: Latency CDF + plt.figure(figsize=(9, 6)) + sns.ecdfplot(data=df, x='latency_ms', color='#9467bd', linewidth=3) + plt.title('CDF of End-to-End App Latency', fontsize=16, fontweight='bold', pad=20) + plt.xlabel('Latency (ms)', fontsize=13, fontweight='bold') + plt.ylabel('Cumulative Probability', fontsize=13, fontweight='bold') + plt.grid(True, linestyle='--', alpha=0.7) - print(f"Saved app-level plot to {out_path}") + # Mark percentiles + p50, p90, p95, p99 = df['latency_ms'].quantile([0.5, 0.9, 0.95, 0.99]) + plt.axvline(p50, color='r', linestyle=':', linewidth=2, label=f'P50: {p50:.2f} ms') + plt.axvline(p90, color='orange', linestyle=':', linewidth=2, label=f'P90: {p90:.2f} ms') + plt.axvline(p99, color='green', linestyle=':', linewidth=2, label=f'P99: {p99:.2f} ms') + plt.legend(frameon=True, shadow=True, fontsize=11) + + out_path_cdf = os.path.join(output_dir, 'app_latency_cdf.png') + plt.tight_layout() + plt.savefig(out_path_cdf, dpi=300, bbox_inches='tight') + plt.close() + + # Packet Loss Calculation with Chunking Support + max_msg = df['message_id'].max() + min_msg = df['message_id'].min() + expected_messages = max_msg - min_msg + 1 + + chunks_per_msg = df['total_chunks'].max() if 'total_chunks' in df.columns else 1 + expected_chunks = expected_messages * chunks_per_msg + + received_chunks = len(df) + lost_chunks = expected_chunks - received_chunks + reliability = (received_chunks / expected_chunks) * 100 if expected_chunks > 0 else 0 + + print("\n" + "=" * 60) + print(" 📊 APP-LEVEL NETWORK STATISTICS (Drone -> Ground)") + print("=" * 60) + print(f" Messages Sent (Expected): {expected_messages}") + print(f" Chunks Sent (Expected): {expected_chunks}") + print(f" Chunks Received: {received_chunks}") + print(f" Chunks Lost: {lost_chunks}") + print(f" Reliability: {reliability:.5f}%") + print("-" * 60) + print(f" Latency P50 (Median): {p50:.2f} ms") + print(f" Latency P90: {p90:.2f} ms") + print(f" Latency P95: {p95:.2f} ms") + print(f" Latency P99 (Tail): {p99:.2f} ms") + print(f" Avg Jitter: {df['jitter_ms'].mean():.2f} ms") + print("=" * 60) + + print(f"Saved app-level timeline plot to {out_path_timeline}") + print(f"Saved app-level CDF plot to {out_path_cdf}") def plot_latency_distributions(df, output_dir): @@ -83,13 +142,20 @@ def plot_latency_distributions(df, output_dir): if latency_df.empty: return plt.figure(figsize=(10, 6)) - sns.violinplot(x='event_type', y='value_us', hue='node', data=latency_df, split=True, inner="quartile") + + # Use a custom color palette + palette = {"Client": "#4C72B0", "Server": "#C44E52"} + + sns.boxplot(x='event_type', y='value_us', hue='node', data=latency_df, palette=palette, showfliers=False, width=0.6) plt.yscale('log') - plt.title('Kernel Network Stack Latency Distribution (Log Scale)') - plt.ylabel('Latency (µs)') - plt.xlabel('Event Type') + plt.title('Kernel Network Stack Latency Distribution', fontsize=16, fontweight='bold', pad=20) + plt.ylabel('Latency (µs) [Log Scale]', fontsize=13, fontweight='bold') + plt.xlabel('Event Type', fontsize=13, fontweight='bold') + plt.grid(True, axis='y', linestyle='--', alpha=0.7) + plt.legend(title='Node', title_fontsize='13', fontsize='11', frameon=True, shadow=True) out_path = os.path.join(output_dir, 'kernel_latency_distribution.png') + plt.tight_layout() plt.savefig(out_path, dpi=300, bbox_inches='tight') plt.close() print(f"Saved kernel distribution plot to {out_path}") @@ -98,17 +164,22 @@ def plot_latency_timeline(df, output_dir): latency_df = df[df['event_type'].isin(['SEND_LATENCY', 'RECV_LATENCY'])] if latency_df.empty: return - g = sns.FacetGrid(latency_df, col="event_type", row="node", margin_titles=True, height=4, aspect=2) - g.map(sns.scatterplot, "rel_time", "value_us", alpha=0.5, s=10) + # Group by Event Type and Node + g = sns.FacetGrid(latency_df, col="event_type", row="node", margin_titles=True, height=4.5, aspect=2, sharey=False) + + # Scatter plot with reduced opacity for density + g.map(sns.scatterplot, "rel_time", "value_us", alpha=0.5, s=25, color="#55A868", edgecolor='none') g.set_axis_labels("Time (s)", "Latency (µs)") - g.set_titles(col_template="{col_name}", row_template="{row_name}") + g.set_titles(col_template="{col_name}", row_template="{row_name}", size=14, weight='bold') for ax in g.axes.flat: ax.set_yscale('log') + ax.grid(True, linestyle=':', alpha=0.7) - g.fig.suptitle('Kernel Latency Over Time', y=1.02) + g.fig.suptitle('Kernel Latency Over Time (Log Scale)', y=1.05, fontsize=18, fontweight='bold') out_path = os.path.join(output_dir, 'kernel_latency_timeline.png') + plt.tight_layout() plt.savefig(out_path, dpi=300, bbox_inches='tight') plt.close() print(f"Saved kernel timeline plot to {out_path}") @@ -122,6 +193,9 @@ def main(): args = parser.parse_args() os.makedirs(args.outdir, exist_ok=True) + + # Set global aesthetic for seaborn + sns.set_theme(style="whitegrid", context="notebook", font_scale=1.1) if args.app: plot_app_metrics(args.app, args.outdir) @@ -134,11 +208,10 @@ def main(): if dfs: df = pd.concat(dfs, ignore_index=True) - sns.set_theme(style="whitegrid") plot_latency_distributions(df, args.outdir) plot_latency_timeline(df, args.outdir) - print("\nVisualization complete! Check the '{}' directory.".format(args.outdir)) + print(f"\n✅ Visualization complete! Beautiful plots generated in '{args.outdir}' directory.") if __name__ == "__main__": main() diff --git a/client/client_bin b/client/client_bin new file mode 100755 index 0000000..622a125 Binary files /dev/null and b/client/client_bin differ diff --git a/client/main.go b/client/main.go index 86eb1dd..58f7d27 100644 --- a/client/main.go +++ b/client/main.go @@ -19,7 +19,8 @@ func main() { listScheds := flag.Bool("list-schedulers", false, "List available schedulers") addr := flag.String("addr", "127.0.0.1:4242", "Server address") duration := flag.Int("duration", 10, "Duration in seconds") - payloadSize := flag.Int("payload-size", 1024, "Size of the payload in bytes (min 20)") + messageSize := flag.Int("message-size", 1024, "Size of the full application message in bytes (min 36)") + chunkSize := flag.Int("chunk-size", 0, "Chunk size. If > 0, splits message-size into multiple chunks (min 36)") flag.Parse() if *listScheds { @@ -30,8 +31,8 @@ func main() { return } - if *payloadSize < 20 { - *payloadSize = 20 // 4 bytes length, 8 bytes seq, 8 bytes timestamp + if *messageSize < 36 { + *messageSize = 36 // 4 len + 8 seq + 8 ts + 8 chunk_idx + 8 tot_chunks } tlsConf := &tls.Config{ @@ -60,37 +61,52 @@ func main() { log.Fatal(err) } - fmt.Printf("Stream opened, sending data for %d seconds (Payload Size: %d bytes)...\n", *duration, *payloadSize) + fmt.Printf("Stream opened, sending data for %d seconds (Message Size: %d bytes, Chunk Size: %d)...\n", *duration, *messageSize, *chunkSize) end := time.Now().Add(time.Duration(*duration) * time.Second) - payload := make([]byte, *payloadSize) - // Frame structure: - // [0:4] uint32 Total Length - // [4:12] uint64 Sequence Number - // [12:20] uint64 Send Timestamp (nanoseconds) - // [20:] Padding (dummy data) - binary.BigEndian.PutUint32(payload[0:4], uint32(*payloadSize)) - - var seqNum uint64 = 0 + var msgId uint64 = 0 totalBytes := 0 + totalChunksSent := 0 + + actualChunkSize := *messageSize + if *chunkSize > 36 && *chunkSize <= *messageSize { + actualChunkSize = *chunkSize + } + + totalChunks := uint64((*messageSize + actualChunkSize - 1) / actualChunkSize) - // Target sending rate: we don't want to lock the CPU entirely in a busy loop. - // We yield slightly to allow the network stack to process. - // But to measure max throughput we just send as fast as stream.Write allows. for time.Now().Before(end) { - seqNum++ + msgId++ sendTime := uint64(time.Now().UnixNano()) - binary.BigEndian.PutUint64(payload[4:12], seqNum) - binary.BigEndian.PutUint64(payload[12:20], sendTime) + bytesLeft := *messageSize - n, err := stream.Write(payload) - if err != nil { - log.Fatal("Stream write error:", err) + for chunkIdx := uint64(0); chunkIdx < totalChunks; chunkIdx++ { + currentLength := actualChunkSize + if bytesLeft < actualChunkSize { + currentLength = bytesLeft + } + if currentLength < 36 { + currentLength = 36 + } + + payload := make([]byte, currentLength) + binary.BigEndian.PutUint32(payload[0:4], uint32(currentLength)) + binary.BigEndian.PutUint64(payload[4:12], msgId) + binary.BigEndian.PutUint64(payload[12:20], sendTime) + binary.BigEndian.PutUint64(payload[20:28], chunkIdx) + binary.BigEndian.PutUint64(payload[28:36], totalChunks) + + n, err := stream.Write(payload) + if err != nil { + log.Fatal("Stream write error:", err) + } + totalBytes += n + bytesLeft -= currentLength + totalChunksSent++ } - totalBytes += n } - fmt.Printf("Finished sending %d packets, %d bytes (%.2f MB)\n", seqNum, totalBytes, float64(totalBytes)/1024/1024) + fmt.Printf("Finished sending %d messages (%d chunks), %d bytes (%.2f MB)\n", msgId, totalChunksSent, totalBytes, float64(totalBytes)/1024/1024) } diff --git a/server/main.go b/server/main.go index c085e45..1949cbc 100644 --- a/server/main.go +++ b/server/main.go @@ -68,7 +68,7 @@ func main() { defer f.Close() writer := csv.NewWriter(f) - writer.Write([]string{"sequence_number", "drone_send_time", "ground_recv_time", "latency_ns", "bytes_received"}) + writer.Write([]string{"message_id", "chunk_index", "total_chunks", "drone_send_time", "ground_recv_time", "latency_ns", "bytes_received"}) writer.Flush() // Mutex to protect CSV writer if multiple streams are used @@ -99,7 +99,7 @@ func main() { } length := binary.BigEndian.Uint32(header) - if length < 20 { + if length < 36 { fmt.Printf("Warning: Invalid frame length %d\n", length) continue } @@ -114,20 +114,24 @@ func main() { recvTime := time.Now().UnixNano() - seqNum := binary.BigEndian.Uint64(payload[0:8]) + msgId := binary.BigEndian.Uint64(payload[0:8]) sendTime := binary.BigEndian.Uint64(payload[8:16]) + chunkIdx := binary.BigEndian.Uint64(payload[16:24]) + totalChunks := binary.BigEndian.Uint64(payload[24:32]) latency := recvTime - int64(sendTime) mu.Lock() writer.Write([]string{ - strconv.FormatUint(seqNum, 10), + strconv.FormatUint(msgId, 10), + strconv.FormatUint(chunkIdx, 10), + strconv.FormatUint(totalChunks, 10), strconv.FormatUint(sendTime, 10), strconv.FormatInt(recvTime, 10), strconv.FormatInt(latency, 10), strconv.FormatUint(uint64(length), 10), }) // Periodically flush? - if seqNum % 1000 == 0 { + if msgId % 1000 == 0 { writer.Flush() } mu.Unlock() diff --git a/server/server_bin b/server/server_bin new file mode 100755 index 0000000..41142b0 Binary files /dev/null and b/server/server_bin differ