diff --git a/classes/MessageHandler.py b/classes/MessageHandler.py index 9e7af93..acb8db2 100644 --- a/classes/MessageHandler.py +++ b/classes/MessageHandler.py @@ -6,6 +6,7 @@ from collections.abc import Callable from json import JSONDecodeError from Crypto.PublicKey import ECC +from Crypto.PublicKey.ECC import EccKey from classes.Message import Message from classes.MessageTypes.Announcement import AnnouncementMessage @@ -20,7 +21,7 @@ class MessageHandler: def __init__(self): self.receivers: list[Callable[[Message], None]] = [] - def send_message(self, message: Message): + def send_message(self, message: Message, signing_key: EccKey): # Must be implemented by child classes pass diff --git a/classes/SantasBrain.py b/classes/SantasBrain.py index 16b7800..425553e 100644 --- a/classes/SantasBrain.py +++ b/classes/SantasBrain.py @@ -1,9 +1,13 @@ import hashlib import logging +import math +import random import secrets import threading +from collections import defaultdict from typing import Optional, Callable +import Crypto.Util.number from Crypto.PublicKey import ECC from Crypto.PublicKey.ECC import EccKey from Crypto.Util.number import getPrime @@ -14,200 +18,343 @@ from classes.Crypto.CommutativeCipher import CommutativeCipher from classes.MessageHandler import MessageHandler from classes.MessageTypes.Introduction import IntroductionMessage from classes.MessageTypes.Ready import ReadyMessage +from classes.MessageTypes.Shuffle import ShuffleMessage from classes.UserInterface import UserInterface logger = logging.getLogger(__name__) +SHUFFLE_CARDS_STAGE = 'shuffle_cards' +DECRYPT_CARDS_STAGE = 'decrypt_cards' +BUILD_ANONYMOUS_STAGE = 'build_announcement' +SHUFFLE_ANONYMOUS_STAGE = 'shuffle_announcement' +DECRYPT_ANONYMOUS_STAGE = 'decrypt_announcement' + class Brain: def __init__(self, message_handler: MessageHandler, user_interface: UserInterface): - # We're going to need to do some self.thread_lock = threading.Lock() self.message_handler = message_handler self.user_interface = user_interface - self.other_possible_participants: dict[str, tuple[EccKey, bytes]] = {} - self.name = None - self.extra_info_for_santa = None - self.key = ECC.generate(curve='p256') - self.random_seed = secrets.token_bytes(32) + # Store the names, public key and seed commits of each participant. We will assume that each participant uses + # a unique name. It is expected that the participants in the secret santa will make sure that this condition is + # met. If a non-unique name is received, throw an error to warn the user, and then ignore the new participant + # with the existing name. + self.known_participants: dict[str, tuple[EccKey, bytes]] = {} + # Store who we expect to take part in the secret santa + self.chosen_participants: Optional[list[str]] = None + # Store all received messages (except Introduction messages), by sender + self.received_messages: dict[str, list[Message]] = defaultdict(list) + # Our own info, that we will send out + self.own_name: Optional[str] = None + self.signing_key: Optional[EccKey] = None + self.random_seed: Optional[bytes] = None + self.introduction_message: Optional[IntroductionMessage] = None + self.info_for_santa: Optional[str] = None - self.message_handler.add_message_receiver(self.receive_message) - self.user_interface.add_user_info_listener(self.set_user_data) + # Secret key, used for receiving information about who your secret santa is + self.secret_key = ECC.generate(curve='p256') + + self.user_interface.add_user_info_listener(self.receive_user_info) self.user_interface.add_start_listener(self.receive_user_start_command) + self.message_handler.add_message_receiver(self.receive_message) - # The following fields are used during the exchange itself - self.other_participants: dict[str, tuple[EccKey, bytes]] = {} - self.other_ready_participants: dict[str, tuple[list[str], bytes]] = {} + # The variables we need through the secret santa process itself + self.process_failed = False + self.card_values: Optional[list[bytes]] = None + self.card_exchange_cipher: Optional[CommutativeCipher] = None + self.announcement_build_cipher: Optional[CommutativeCipher] = None + self.announcement_shuffle_cipher: Optional[CommutativeCipher] = None + self.sent_card_shuffling: bool = False + self.sent_card_decryption: bool = False + self.card_drawn: Optional[int] = None - self.first_shuffle_key: CommutativeCipher = None - self.second_shuffle_key: CommutativeCipher = None - self.third_shuffle_key: CommutativeCipher = None - self.card_values: list[bytes] = [] - - def set_user_data(self, name: str, extra_info_for_santa: str): - with self.thread_lock: - if name in self.other_possible_participants: - logger.error(f'Participant already exists with username {name}. Please choose another.') - return - self.name = name - self.extra_info_for_santa = extra_info_for_santa - - # Send our introductions out on the network - self.send_introduction_message() - - def receive_user_start_command(self, other_participant_names: list[str]): - with self.thread_lock: - other_participants = {} - for other_participant_name in other_participant_names: - if other_participant_name not in self.other_possible_participants: - logger.error(f'Tried to start an exchange containing unknown participant {other_participant_name}') - return - - other_participants[other_participant_name] = self.other_possible_participants[other_participant_name] - - self.other_participants = other_participants - self.send_ready_message() - after_lock_command = None - with self.thread_lock: - if self.check_if_ready(): - after_lock_command = self.start_exchange_process() - if after_lock_command is not None: - after_lock_command() - - def receive_message(self, message: Message): - after_lock_call: Optional[Callable[[], None]] = None - with self.thread_lock: - # First, check if message signature is correct - sender_name = message.get_name() - key = None - if sender_name in self.other_possible_participants: - key, _ = self.other_possible_participants[sender_name] - elif isinstance(message, IntroductionMessage): - key = message.get_key() - - if key is None: - logger.warning(f'Received message from participant {sender_name}, but there is no validation key.') - return - - if not message.check_signature(key): - logger.warning(f'Received message from participant {sender_name} with invalid signature. Ignoring.') - return - - if isinstance(message, IntroductionMessage): - after_lock_call = self.handle_introduction(message) - elif isinstance(message, ReadyMessage): - after_lock_call = self.receive_ready_message(message) - - if after_lock_call is not None: - after_lock_call() - - def handle_introduction(self, introduction: IntroductionMessage) -> Optional[Callable[[], None]]: - name = introduction.get_name() - key = introduction.get_key() - commit = introduction.get_seed_commit() - - # Check if it's a participant we already know about - if name in self.other_possible_participants: - previous_key, previous_commit = self.other_possible_participants[name] - - # Either it's a participant we already know about, or it's someone trying to use the same name - # as a previous participant. Either way, we don't really do anything - if previous_key != key or previous_commit != commit: - logger.warning(f'A second participant tried to register with the already used name {name}. Ignoring.') - return None - - self.other_possible_participants[name] = (key, commit) - # Since this participant is new, they might not know about us yet. Send an introduction - # Also, tell the user interface about this new user - def post_introduction(): - self.send_introduction_message() - self.user_interface.add_user(name) - return post_introduction - - def send_introduction_message(self): - if self.name is None or self.key is None or self.random_seed is None: + def receive_user_info(self, name: str, info_for_santa: str): + # We will only receive this once + if ( + self.own_name is not None + or self.signing_key is not None + or self.introduction_message is not None + or self.info_for_santa is not None + or self.random_seed is not None + ): return - # We need to commit to our random seed by hashing it, but we don't actually want to send the seed itself yet, - # to prevent others from crafting their seeds based on the value of ours + self.own_name = name + self.signing_key = ECC.generate(curve='p256') + self.random_seed = secrets.token_bytes(32) + self.info_for_santa = info_for_santa + + # Need to create a commit for our random seed hasher = hashlib.sha512() hasher.update(self.random_seed) - introduction_message = IntroductionMessage(self.name, self.key.public_key(), hasher.digest()) - introduction_message.generate_and_sign(self.key) - self.message_handler.send_message(introduction_message) + seed_commit = hasher.digest() + self.introduction_message = IntroductionMessage(name, self.signing_key.public_key(), seed_commit) + self.message_handler.send_message(self.introduction_message, self.signing_key) - def send_ready_message(self): - other_participants = [(name, self.other_possible_participants[name][0]) for name in self.other_possible_participants] - ready_message = ReadyMessage(self.name, other_participants, self.random_seed) - ready_message.generate_and_sign(self.key) - self.message_handler.send_message(ready_message) + def receive_user_start_command(self, chosen_participants: list[str]): + with self.thread_lock: + # If we have not introduced ourselves to the world and selected a random seed and all that, ignore command + if self.own_name is None or self.random_seed is None: + return + # If we have already selected who we want to exchange with, ignore this + if self.chosen_participants is not None: + return - def receive_ready_message(self, ready_message: ReadyMessage) -> Optional[Callable[[], None]]: - sender_name = ready_message.get_name() - sender_expected_participants = ready_message.get_participants() - sender_random_seed = ready_message.get_random_seed() + # Make sure that each participant we want to play with actually exists + confirmed_existing = [] - if sender_name not in self.other_possible_participants: - logger.warning(f'Received ready message from unknown participant {sender_name}') - return None + # Make sure that no participant is repeated in the list + chosen_participants = list(set(chosen_participants)) - _, sender_commit = self.other_possible_participants[sender_name] + for name in chosen_participants: + if name not in self.known_participants: + logger.error(f'Tried to start a secret santa with non-existing participant {name}') + return + key, _ = self.known_participants[name] + confirmed_existing.append((name, key)) - hasher = hashlib.sha512() - hasher.update(sender_random_seed) - seed_hash = hasher.digest() + ready_message = ReadyMessage(self.own_name, confirmed_existing, self.random_seed) + self.chosen_participants = sorted([name for name, key in confirmed_existing]) + self.message_handler.send_message(ready_message, self.signing_key) - if seed_hash != sender_commit: - logger.error(f'Participant {sender_name} sent random seed that did not match their initial commit!') - return None + def receive_message(self, message: Message): + # If this is an introduction message, it needs special handling + if isinstance(message, IntroductionMessage): + self.receive_introduction_message(message) + # For normal, non-introduction messages, check that they're from who they purport to be from, + # and add them to the list + else: + with self.thread_lock: + name = message.get_name() + if name not in self.known_participants: + logger.warning(f'Received message from unknown participant {name}. Ignoring.') + return + key, _ = self.known_participants[name] + if not message.check_signature(key): + logger.warning(f'Received message that purports to be from {name} with invalid signature. Ignoring.') + return + self.received_messages[name].append(message) - for expected_participant_name, expected_participant_key in sender_expected_participants: - if expected_participant_name not in self.other_possible_participants: - logger.warning(f'Participant {sender_name} expects exchange with unknown participant {expected_participant_name}') - return None + # Each received message triggers the main function of this class + messages_to_send = self.santa_loop() + for message in messages_to_send: + self.message_handler.send_message(message, self.signing_key) - our_key, _ = self.other_possible_participants[expected_participant_name] - if our_key != expected_participant_key: - logger.error(f'Participant {sender_name} has different public key for participant {expected_participant_name} than we have.') - return None - self.other_ready_participants[sender_name] = ([name for name, _ in sender_expected_participants], sender_random_seed) + def receive_introduction_message(self, message: IntroductionMessage): + discovered_new_participant = False + name = message.get_name() + key = message.get_key() - if self.check_if_ready(): - return self.start_exchange_process() + with self.thread_lock: + if name in self.known_participants and self.known_participants[name][0] != key: + logger.error( + f'There are two participants using the name {name}. All participants need unique name. ' + f'The second participant will be ignored, but this may lead to the santa exchange not being started.' + ) + return - def check_if_ready(self): - list_of_names = set([name for name in self.other_participants] + [self.name]) - for name in self.other_participants: - if name not in self.other_ready_participants: + if name not in self.known_participants: + discovered_new_participant = True + self.known_participants[name] = (key, message.get_seed_commit()) + + # If this a participant we don't already know about, chances are they don't know about us, either. + # Send an introduction message, if we are ready for that + if ( + discovered_new_participant + and self.introduction_message is not None + and self.signing_key is not None + ): + self.message_handler.send_message(self.introduction_message, self.signing_key) + + + # Santa's brain will be driven by receiving messages. We'll call this main method each time we receive a message. + # This is probably inefficient, but it makes it easier to follow what the code is doing + def santa_loop(self) -> list[Message]: + # We don't want to send messages while holding the thread lock, in case this leads to a deadlock + messages_to_send: list[Message] = [] + with self.thread_lock: + # If something has caused the process to fail, don't try to do it again + if self.process_failed: + return messages_to_send + # First of all, if the user hasn't pressed start yet, there is nothing to do here + if self.chosen_participants is None: + return messages_to_send + # Next, check that the user has actually provided all the information we need to perform this process + if self.own_name is None or self.info_for_santa is None or self.random_seed is None: + return messages_to_send + + # Next, if we haven't built our commutative ciphers and card values yet, attempt to do that first + if ( + self.card_values is None + or self.card_exchange_cipher is None + or self.announcement_build_cipher is None + or self.announcement_shuffle_cipher is None + ): + should_continue = self.build_ciphers() + if not should_continue: + return messages_to_send + + # We will give each participant a number, based on their alphabetical order + all_participants = sorted(list(set(self.chosen_participants + [self.own_name]))) + + # Shuffle cards + if not self.sent_card_shuffling: + shuffle_message = self.build_shuffle_message(all_participants) + if shuffle_message is None: + return messages_to_send + messages_to_send.append(shuffle_message) + self.sent_card_shuffling = True + + # Decrypt shuffled cards + if not self.sent_card_decryption: + decrypt_cards_message = self.build_decrypt_cards_message(all_participants) + if decrypt_cards_message is None: + return messages_to_send + messages_to_send.append(decrypt_cards_message) + self.sent_card_decryption = True + + # Find which card we drew + if self.card_drawn is None: + own_index = all_participants.index(self.own_name) + last_participant = all_participants[-1] + message = next( + ( + message for message in self.received_messages[last_participant] + if isinstance(message, ShuffleMessage) and message.get_stage() == DECRYPT_CARDS_STAGE + ), + None + ) + if message is None: + return messages_to_send + self.decrypt_card_value(message.get_cards()[own_index]) + + + + + return messages_to_send + + + def build_ciphers(self) -> bool: + received_seeds = [self.random_seed] + for name in self.chosen_participants: + _, seed_commit = self.known_participants[name] + ready_message = next( + (message for message in self.received_messages[name] if isinstance(message, ReadyMessage)), + None + ) + if ready_message is None: + # We haven't received ready messages from everyone yet, and so we cannot continue at the moment return False - other_list_of_names = set(self.other_ready_participants[name][0] + [name]) - if list_of_names != other_list_of_names: - logger.critical(f'Participant {name} does not have the same list of participants as us!') + + # Check that the seed we received from this user matches the seed they committed to sending + received_seed = ready_message.get_random_seed() + hasher = hashlib.sha512() + hasher.update(received_seed) + expected_commit = hasher.digest() + if expected_commit != seed_commit: + logger.critical( + f'Received random seed from user {name} that did not match their commit. ' + f'Cancelling secret santa exchange!' + ) + self.process_failed = True return False + + received_seeds.append(received_seed) + # Next, create a combined random seed from all the provided seeds + number_of_seed_bytes = max(len(seed) for seed in received_seeds) + total_seed = b'\0' * number_of_seed_bytes + for received_seed in received_seeds: + # Pad seed with leading zeros if not long enough + while len(received_seed) < number_of_seed_bytes: + received_seed = b'\0' + received_seed + # Xor seed with current seed so far + total_seed = bytes(a ^ b for a, b in zip(total_seed, received_seed)) + + # Use this seed in a cryptographically secure random number generator + random_generator = CSPRNG(total_seed) + + p = Crypto.Util.number.getPrime(1500, randfunc=random_generator.get_random_bytes) + q = Crypto.Util.number.getPrime(1000, randfunc=random_generator.get_random_bytes) + + self.card_exchange_cipher = CommutativeCipher(p, q) + self.announcement_build_cipher = CommutativeCipher(p, q) + self.announcement_shuffle_cipher = CommutativeCipher(p, q) + + self.card_values = [random_generator.get_random_bytes(8) for i in range(len(self.chosen_participants) + 1)] return True - def start_exchange_process(self) -> Optional[Callable[[], None]]: - # XOR all the seeds together - all_seeds = [self.random_seed] - for name in self.other_participants: - all_seeds.append(self.other_ready_participants[name][1]) + def build_shuffle_message(self, all_participants: list[str]) -> Optional[ShuffleMessage]: + own_index = all_participants.index(self.own_name) + if own_index == 0: + # If we are the first participant, we must create the list + card_deck = [card_value for card_value in self.card_values] + else: + previous_participant = all_participants[own_index - 1] + message = next( + ( + message for message in self.received_messages[previous_participant] + if isinstance(message, ShuffleMessage) and message.get_stage() == SHUFFLE_CARDS_STAGE + ), None + ) + # If we haven't received a shuffle message yet, we can't continue + if message is None: + return None - longest_seed_length = max(len(seed) for seed in all_seeds) - total_seed = b'0' * longest_seed_length - for seed in all_seeds: - seed = b'0' * (longest_seed_length - len(seed)) + seed - total_seed = bytes(a ^ b for a, b in zip(total_seed, seed)) + card_deck = message.get_cards() - # Use the total random seed to initialize a cryptographically secure pseudo-random number generator - csprng = CSPRNG(total_seed) - # Since everyone is using the same seed, everyone should get same values for p and q - p = getPrime(1200, randfunc=csprng.get_random_bytes) - q = getPrime(800, randfunc=csprng.get_random_bytes) + # Shuffle by drawing random numbers from secret + shuffled_deck = [] + while len(card_deck) > 0: + drawn_card = secrets.randbelow(len(card_deck)) + shuffled_deck.append(card_deck[drawn_card]) + del card_deck[drawn_card] - self.first_shuffle_key = CommutativeCipher(p, q) - self.second_shuffle_key = CommutativeCipher(p, q) - self.third_shuffle_key = CommutativeCipher(p, q) - # In the same way, everyone should get the same values for each card - for i in range(len(self.other_participants) + 1): - self.card_values.append(csprng.get_random_bytes(16)) + return ShuffleMessage(self.own_name, shuffled_deck, SHUFFLE_CARDS_STAGE) + def build_decrypt_cards_message(self, all_participants: list[str]) -> Optional[ShuffleMessage]: + own_index = all_participants.index(self.own_name) + # Again, special case if we are the first participant + message = None + previous_participant = all_participants[(own_index - 1) % len(all_participants)] + if own_index == 0: + message = next( + ( + message for message in self.received_messages[previous_participant] + if isinstance(message, ShuffleMessage) and message.get_stage() == SHUFFLE_CARDS_STAGE + ), + None + ) + else: + message = next( + ( + message for message in self.received_messages[previous_participant] + if isinstance(message, ShuffleMessage) and message.get_stage() == DECRYPT_CARDS_STAGE + ), + None + ) + if message is None: + return None + + card_deck = message.get_cards() + decrypted_cards = [] + for index, card_value in enumerate(card_deck): + # Don't decrypt our own card + if index == own_index: + decrypted_cards.append(card_value) + else: + decrypted_cards.append(self.card_exchange_cipher.decode(card_value)) + + # In case we are the last participant in the list, we are actually ready to fully decrypt + if own_index == len(all_participants) - 1: + self.decrypt_card_value(decrypted_cards[-1]) + + return ShuffleMessage(self.own_name, decrypted_cards, DECRYPT_CARDS_STAGE) + + def decrypt_card_value(self, card: bytes): + decrypted_card_bytes = self.card_exchange_cipher.decode(card) + if decrypted_card_bytes not in self.card_values: + logging.critical(f'Received an invalid card after shuffling. Secret santa exchange failed!') + self.process_failed = True + return + + self.card_drawn = self.card_values.index(decrypted_card_bytes) \ No newline at end of file diff --git a/main.py b/main.py index e7d19f8..35ff42b 100644 --- a/main.py +++ b/main.py @@ -1,13 +1,16 @@ +import base64 import secrets +from base64 import b64encode +import Crypto.Cipher.PKCS1_OAEP from Crypto.Util.number import getPrime -from Crypto.PublicKey import ECC +from Crypto.PublicKey import ECC, RSA from classes.Crypto.CommutativeCipher import CommutativeCipher from classes.MessageTypes.Introduction import IntroductionMessage -p = getPrime(1200) -q = getPrime(800) +p = getPrime(1500) +q = getPrime(1000) cipher1 = CommutativeCipher(p, q) cipher2 = CommutativeCipher(p, q) message = 'Hei på deg'.encode('utf-8 ') @@ -30,4 +33,13 @@ print(f'Keys are equal: {key1 == key2}') test = IntroductionMessage('Martin', key.public_key(), seed) print(test.generate_and_sign(key)) -print(test.check_signature(key.public_key())) \ No newline at end of file +print(test.check_signature(key.public_key())) + +rsa_key = RSA.generate(2048) +message = 'Keep it secret! Keep it safe!'.encode('utf-8') +cipher = Crypto.Cipher.PKCS1_OAEP.new(rsa_key.public_key()) +ciphertext = cipher.encrypt(message) +decoder = Crypto.Cipher.PKCS1_OAEP.new(rsa_key) +print(decoder.decrypt(ciphertext)) + +print(len(b64encode(rsa_key.public_key().export_key(format='DER')))) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index f161669..75a967c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -pycryptodome~=3.21.0 \ No newline at end of file +pycryptodome>=3.21.0 \ No newline at end of file