Files
netsec/Task pwn02/week02/easy/client.py
2024-11-01 00:07:17 +09:00

88 lines
2.5 KiB
Python

import hashlib
import logging
import time
import threading
from scapy.config import conf
from scapy.layers.inet import TCP, IP
from scapy.packet import Packet
from scapy.sendrecv import send, sniff
from random import randrange
log = logging.getLogger(__name__)
TCP_CLIENTS = {} # ((IP, port) -> [sent_packets])
# SERVER_IP = '131.159.15.68' # don't use the domain name in this case
SERVER_IP = '127.0.0.1' # don't use the domain name in this case
SERVER_PORT = 20102
COOKIE_SECRET = 'TASTY_COOKIES123'
SRC_PORT = randrange(10000, 50000)
def generate_syn_cookie(client_ip: str, client_port: int, server_secret: str):
# TODO: please implement me!
hash_input = f'{client_ip}{client_port}{server_secret}'.encode()
return int(hashlib.sha256(hash_input).hexdigest(), 16) % (2**32)
def handle_packet(packet: Packet):
# TODO: please implement me!
if packet.haslayer(TCP) and packet[TCP].dport == SRC_PORT and "S" == packet[TCP].flags:
server_seq = packet[TCP].seq
ip = IP(dst=SERVER_IP)
ack = TCP(sport=SRC_PORT, dport=SERVER_PORT, flags='A', seq=packet[TCP].ack, ack=1337)
send(ip / ack)
print("Sent ACK packet with modified ACK number:", server_seq)
# Function to send the initial SYN packet
def send_initial_syn():
# Generate the SYN cookie
cookie = generate_syn_cookie(SERVER_IP, SERVER_PORT, COOKIE_SECRET)
# Construct the IP and TCP layers
ip = IP(dst=SERVER_IP)
syn = TCP(sport=SRC_PORT, dport=SERVER_PORT, flags='S', seq=cookie)
# Send the packet
send(ip / syn)
print("Sent initial SYN packet with cookie:", cookie)
# Function to start the packet sniffing
def start_sniffing():
sniff(
filter=f'tcp port {SERVER_PORT}', # this should filter all packets relevant for this challenge.
prn=handle_packet,
store=False,
monitor=True,
iface='lo', # set to your interface. IMPORTANT: SET TO enX0 FOR AUTOGRADER!!!
)
COOKIE = generate_syn_cookie(SERVER_IP, SERVER_PORT, COOKIE_SECRET)
# Run the server in a separate thread
def main():
conf.use_pcap = False
server_thread = threading.Thread(target=start_sniffing)
server_thread.start()
time.sleep(3) # wait for the sniffer to start.
# TODO: send intial first byte
send_initial_syn()
if __name__ == '__main__':
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s',
)
logging.getLogger('asyncio').setLevel(logging.WARNING)
main()