diff --git a/week05/easy/__pycache__/pwn_utils.cpython-312.pyc b/week05/easy/__pycache__/pwn_utils.cpython-312.pyc new file mode 100644 index 0000000..0346b95 Binary files /dev/null and b/week05/easy/__pycache__/pwn_utils.cpython-312.pyc differ diff --git a/week05/easy/client.py b/week05/easy/client.py index 2179c00..c41929c 100644 --- a/week05/easy/client.py +++ b/week05/easy/client.py @@ -1,8 +1,79 @@ import socket - +import base64 +import hashlib +from Crypto.Cipher import AES +import binascii # Fill in the right target here -HOST = 'this.is.not.a.valid.domain' # TODO -PORT = 0 # TODO +HOST = 'netsec.net.in.tum.de' # TODO +PORT = 20105 # TODO +KEY = b'1337133713371337' + +def pkcs7(message: bytes, block_size: int = 16) -> bytes: + gap_size = block_size - (len(message) % block_size) + return message + bytes([gap_size] * gap_size) + +def calc_hmac(key: bytes, message: bytes ) -> bytes: + block_size = 64 + if len(key) > block_size: + key = hashlib.sha256(key).digest() + if len(key) < block_size: + key += b'\x00' * (block_size - len(key)) + + o_key_pad = bytes([k ^ 0x5C for k in key]) + i_key_pad = bytes([k ^ 0x36 for k in key]) + + inner_hash = hashlib.sha256(i_key_pad + message).digest() + return hashlib.sha256(o_key_pad + inner_hash).digest() + +def calc_cbc_mac( key: bytes, iv: bytes, message: bytes) -> bytes: + message = pkcs7(message) + cipher = AES.new(key, AES.MODE_CBC, iv) + encrypted_message = cipher.encrypt(message) + return encrypted_message[-16:] + +def cmac_padding(message: bytes, block_size: int = 16) -> bytes: + if len(message) % block_size == 0: + return message # Full block, no padding needed + else: + padded_message = message + b'\x80' # Add the 0x80 byte + return padded_message.ljust((len(message) // block_size + 1) * block_size, b'\x00') + +def calc_cmac(key: bytes, message: bytes) -> bytes: + from Crypto.Cipher import AES + + def shift_left(block: bytes) -> bytes: + result = int.from_bytes(block, byteorder="big") << 1 + return (result & ((1 << 128) - 1)).to_bytes(16, byteorder="big") + + const_rb = 0x87 + cipher = AES.new(key, AES.MODE_ECB) + zero_block = b'\x00' * 16 + l_block = cipher.encrypt(zero_block) + + k1 = shift_left(l_block) + if l_block[0] & 0x80: + k1 = (int.from_bytes(k1, byteorder="big") ^ const_rb).to_bytes(16, byteorder="big") + + k2 = shift_left(k1) + if k1[0] & 0x80: + k2 = (int.from_bytes(k2, byteorder="big") ^ const_rb).to_bytes(16, byteorder="big") + + padded_message = cmac_padding(message) + if len(message) % 16 == 0: + last_block = bytes([b ^ k1[i] for i, b in enumerate(padded_message[-16:])]) + else: + last_block = bytes([b ^ k2[i] for i, b in enumerate(padded_message[-16:])]) + + previous_block = zero_block + for i in range(0, len(padded_message) - 16, 16): + block = padded_message[i:i + 16] + previous_block = cipher.encrypt(bytes([b ^ p for b, p in zip(block, previous_block)])) + + cmac_result = cipher.encrypt(bytes([b ^ p for b, p in zip(last_block, previous_block)])) + return cmac_result + +def decode_message(msg): + return binascii.unhexlify(msg) def get_flag(): @@ -12,8 +83,16 @@ def get_flag(): sf = s.makefile('rw') # we use a file abstraction for the sockets message1 = sf.readline().rstrip('\n') - # TODO - + print(message1) + challenge = decode_message(message1) + hmac = base64.b64encode(calc_hmac(KEY, challenge)).decode() + cmac = base64.b64encode(calc_cmac(KEY, challenge)).decode() + cbc_mac = base64.b64encode(calc_cbc_mac(KEY, b'\x00' * 16, challenge)).decode() + answer = f"{hmac};{cbc_mac};{cmac}" + print(f"Calcualted the answer {answer}") + sf.write(f"{answer}\n") + sf.flush() + print(sf.readline().rstrip('\n')) sf.close() s.close() diff --git a/week05/easy/no_test.py b/week05/easy/no_test.py deleted file mode 100644 index 326d68b..0000000 --- a/week05/easy/no_test.py +++ /dev/null @@ -1,79 +0,0 @@ -import random -import binascii -from Crypto.Cipher import AES -from Crypto.Hash import CMAC, HMAC, SHA256 -import hashlib - -KEY = b'1337133713371337' - - -def pkcs7(message: bytes, block_size: int = 16) -> bytes: - gap_size = block_size - (len(message) % block_size) - return message + bytes([gap_size] * gap_size) - - -def calc_cbc_mac(message: bytes, iv: bytes, key: bytes) -> bytes: - cipher = AES.new(key, AES.MODE_CBC, iv) - message = pkcs7(message) - last_block = cipher.encrypt(message)[-16:] - return last_block - -def calc_hmac(message: bytes, key: bytes) -> bytes: - # Create HMAC object with the key and message using SHA-256 - hmac = HMAC.new(KEY, msg=message, digestmod=SHA256) - - # Calculate the HMAC - mac = hmac.hexdigest() - return mac - - - -def calc_cmac(message: bytes, key: bytes) -> bytes: - c = CMAC.new(key, ciphermod=AES) - c.update(message) - return c.digest() - - -def calc_pure_cmac(message: bytes, key: bytes) -> bytes: - def left_shift(k): - """Perform left shift on a byte string with overflow handling""" - result = bytearray(k) - overflow = result[0] & 0x80 - for i in range(len(result)): - result[i] <<= 1 - if i > 0: - result[i] |= (1 if result[i-1] & 0x80 else 0) - - if overflow: - result[-1] ^= 0x87 - - return bytes(result) - - - def xor_bytes(a, b): - """XOR two byte strings""" - return bytes(x ^ y for x, y in zip(a, b)) - - - - - -def check_challenge(challenge: bytes): - return ( - calc_cbc_mac(challenge, b'\x00' * 16, KEY), - calc_cmac(challenge, KEY), - calc_hmac(challenge, KEY) - ) - -def decode_message(msg): - return binascii.unhexlify(msg) - - -def main(): - challenge = "f681e8625406c40419ae7771eac8f8a2eb6a6fcb1fc0396ab8fdca793a27c93a6dafbb" - challenge = decode_message(challenge) - print(check_challenge(challenge)) - - -if __name__ == "__main__": - main() diff --git a/week05/easy/pwn_utils.py b/week05/easy/pwn_utils.py new file mode 100644 index 0000000..a0d1c2f --- /dev/null +++ b/week05/easy/pwn_utils.py @@ -0,0 +1,27 @@ +import asyncio + + +class utils: + @staticmethod + async def read_line_safe(reader): + """ + Simple implementation to read a line from an async reader + Mimics the original read_line_safe functionality + """ + try: + line = await reader.readline() + return line.decode().strip() + except Exception: + return None + + +def log_error(e, client_writer=None): + """ + Basic error logging function + """ + print(f"Error occurred: {e}") + if client_writer: + try: + client_writer.write(f"Error: {str(e)}\n".encode()) + except Exception: + print("Could not send error to client") diff --git a/week05/easy/server.py b/week05/easy/server.py index d2d5e0e..8c127a3 100644 --- a/week05/easy/server.py +++ b/week05/easy/server.py @@ -68,6 +68,7 @@ async def handle_client(client_reader: StreamReader, client_writer: StreamWriter hmac = base64.b64decode(hmac) cbc_mac = base64.b64decode(cbc_mac) cmac = base64.b64decode(cmac) + print(hmac, cbc_mac, cmac) if check_challenge(challenge, hmac, cbc_mac, cmac): client_writer.write(subprocess.check_output('flag')) else: diff --git a/week05/easy/test.py b/week05/easy/test.py deleted file mode 100644 index 4a572c2..0000000 --- a/week05/easy/test.py +++ /dev/null @@ -1,55 +0,0 @@ -import random -import binascii -import base64 -from Crypto.Cipher import AES -from Crypto.Hash import CMAC, HMAC, SHA256 -# Crypto.Hash and hmac modules are forbidden so needs to replace them - -KEY = b'1337133713371337' - - -def pkcs7(message: bytes, block_size: int = 16) -> bytes: - gap_size = block_size - (len(message) % block_size) - return message + bytes([gap_size] * gap_size) - - -def calc_cbc_mac(message: bytes, iv: bytes, key: bytes) -> bytes: - cipher = AES.new(key, AES.MODE_CBC, iv) - message = pkcs7(message) - last_block = cipher.encrypt(message)[-16:] - last_block = base64.b64encode(last_block) - return last_block - -def calc_hmac(message: bytes, key: bytes) -> bytes: - # Create HMAC object with the key and message using SHA-256 - hmac = HMAC.new(KEY, msg=message, digestmod=SHA256) - hmac = base64.b64encode(hmac.digest()) - return hmac - - -def calc_cmac(message: bytes, key: bytes) -> bytes: - c = CMAC.new(key, ciphermod=AES) - c.update(message) - cmac = base64.b64encode(c.digest()) - return cmac - - -def check_challenge(challenge: bytes): - return ( - calc_cbc_mac(challenge, b'\x00' * 16, KEY), - calc_cmac(challenge, KEY), - calc_hmac(challenge, KEY) - ) - -def decode_message(msg): - return binascii.unhexlify(msg) - - -def main(): - challenge = "f681e8625406c40419ae7771eac8f8a2eb6a6fcb1fc0396ab8fdca793a27c93a6dafbb" - challenge = decode_message(challenge) - print(check_challenge(challenge)) - - -if __name__ == "__main__": - main() diff --git a/week05/hard/client.py b/week05/hard/client.py new file mode 100644 index 0000000..8b8039c --- /dev/null +++ b/week05/hard/client.py @@ -0,0 +1,21 @@ +import socket + +# Fill in the right target here +HOST = 'this.is.not.a.valid.domain' # TODO +PORT = 0 # TODO + + +def get_flag(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + s.connect((HOST, PORT)) + sf = s.makefile('rw') # we use a file abstraction for the sockets + + # TODO + + sf.close() + s.close() + + +if __name__ == '__main__': + get_flag() diff --git a/week05/hard/description/description.pdf b/week05/hard/description/description.pdf new file mode 100644 index 0000000..6903158 Binary files /dev/null and b/week05/hard/description/description.pdf differ diff --git a/week05/hard/server.py b/week05/hard/server.py new file mode 100644 index 0000000..78bc2da --- /dev/null +++ b/week05/hard/server.py @@ -0,0 +1,124 @@ +import asyncio +import base64 +import logging +import random +import subprocess +from asyncio import StreamReader, StreamWriter + +from Crypto.Cipher import AES + +from pwn_utils import utils + +log = logging.getLogger(__name__) +clients = {} # task -> (reader, writer) + +KEY = random.randbytes(16) +FACT_STORE = { + 1: 'Rumors say that the most important secret is secret number 1337', + 2: 'The 6 security goals are Confidentiality, Integrity, Availability, Authenticity, Accountability and Controlled Access.', + 3: 'One of the first Computer Worms was the Morris Worm in 1988 (https://en.wikipedia.org/wiki/Morris_worm).', + 4: 'Cloudflare uses a wall of lava lamps to generate randomness (https://blog.cloudflare.com/lavarand-in-production-the-nitty-gritty-technical-details/).', +} + +SECRET_STORE = { + 42: 'Answer to the Ultimate Question of Life, The Universe, and Everything', +} + + +def pkcs7(message: bytes, block_size: int = 16) -> bytes: + gap_size = block_size - (len(message) % block_size) + return message + bytes([gap_size] * gap_size) + + +def cbc_mac(message: bytes, iv: bytes, key: bytes) -> bytes: + cipher = AES.new(key, AES.MODE_CBC, iv) + message = pkcs7(message) + last_block = cipher.encrypt(message)[-16:] + return last_block + + +def handle_message(message: bytes, iv: bytes, mac: bytes) -> str: + kvs = [term.split(b'=') for term in message.split(b'&')] + args = {key: value for key, value in kvs} + type = args.get(b'type', b'') + number = int(args.get(b'number', b'0')) + + expected_mac = cbc_mac(message, iv, KEY) + if expected_mac != mac: + if type == b'secrets': + # don't give any info for secrets + return 'MAC verification failed' + return f'MAC verification failed: expected {expected_mac.hex()}, got {mac.hex()}' + if type == b'funfact': + return FACT_STORE.get(number, 'No funfact available') + if type == b'secrets': + if number == 1337: + return subprocess.check_output('flag').decode() + return SECRET_STORE.get(number, 'No secret available') + return 'Unknown type' + + +async def handle_client(client_reader: StreamReader, client_writer: StreamWriter): + try: + remote = client_writer.get_extra_info('peername') + if remote is None: + log.error('Could not get ip of client') + return + remote = '%s:%s' % (remote[0], remote[1]) + log.info('new connection from: %s' % remote) + except Exception as e: + log.error('EXCEPTION (get peername): %s (%s)' % (e, type(e))) + return + + try: + while True: + message = await utils.read_line_safe(client_reader) + if message is None: + return + match message.split(';'): + case [m, iv, mac]: + m = base64.b64decode(m) + iv = base64.b64decode(iv) + mac = base64.b64decode(mac) + answer = handle_message(m, iv, mac) + client_writer.write((answer + '\n').encode()) + continue + case _: + client_writer.write( + 'Invalid message, expected format "message;iv;mac"\n'.encode() + ) + except Exception as e: + utils.log_error(e, client_writer) + + +def accept_client(client_reader: StreamReader, client_writer: StreamWriter): + task = asyncio.Task(handle_client(client_reader, client_writer)) + clients[task] = (client_reader, client_writer) + + def client_done(task): + del clients[task] + client_writer.close() + log.info('connection closed') + + task.add_done_callback(client_done) + + +def main(): + # start server + loop = asyncio.get_event_loop() + f = asyncio.start_server(accept_client, host=None, port=20205) + log.info('Server waiting for connections') + loop.run_until_complete(f) + loop.run_forever() + + +if __name__ == '__main__': + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s', + ) + + # "INFO:asyncio:poll took 25.960 seconds" is annyoing + logging.getLogger('asyncio').setLevel(logging.WARNING) + + main()