From 889b6546ff7e5d3e1777c1b1a30abbe7812bc7e2 Mon Sep 17 00:00:00 2001 From: Martin Asprusten Date: Thu, 17 Apr 2025 19:25:23 +0200 Subject: [PATCH] Push current state to Gitea --- .gitignore | 2 + classes/Crypto/CSPRNG.py | 49 ++++++ classes/Crypto/CommutativeCipher.py | 92 ++++++++++++ classes/Crypto/__init__.py | 0 classes/Message.py | 41 ++++++ classes/MessageHandler.py | 168 +++++++++++++++++++++ classes/MessageTypes/Announcement.py | 23 +++ classes/MessageTypes/Introduction.py | 33 +++++ classes/MessageTypes/Ready.py | 41 ++++++ classes/MessageTypes/Shuffle.py | 35 +++++ classes/MessageTypes/__init__.py | 0 classes/SantasBrain.py | 213 +++++++++++++++++++++++++++ classes/UserInterface.py | 15 ++ classes/__init__.py | 0 main.py | 33 +++++ requirements.txt | 1 + 16 files changed, 746 insertions(+) create mode 100644 .gitignore create mode 100644 classes/Crypto/CSPRNG.py create mode 100644 classes/Crypto/CommutativeCipher.py create mode 100644 classes/Crypto/__init__.py create mode 100644 classes/Message.py create mode 100644 classes/MessageHandler.py create mode 100644 classes/MessageTypes/Announcement.py create mode 100644 classes/MessageTypes/Introduction.py create mode 100644 classes/MessageTypes/Ready.py create mode 100644 classes/MessageTypes/Shuffle.py create mode 100644 classes/MessageTypes/__init__.py create mode 100644 classes/SantasBrain.py create mode 100644 classes/UserInterface.py create mode 100644 classes/__init__.py create mode 100644 main.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d436ecc --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea/* +**/__pycache__/** diff --git a/classes/Crypto/CSPRNG.py b/classes/Crypto/CSPRNG.py new file mode 100644 index 0000000..bbed1d4 --- /dev/null +++ b/classes/Crypto/CSPRNG.py @@ -0,0 +1,49 @@ +import math +import hashlib + +# According to NIST Special Publication 800-90A, Revision 1, this should be a cryptographically secure pseudo-random +# number generator, provided I've implemented it properly, which is of course very possible I haven't +class CSPRNG: + def __init__(self, entropy: bytes, nonce: bytes=b'', personalization_string: bytes=b''): + self.V = hash_df(entropy + nonce + personalization_string, 888).to_bytes(111) + self.C = hash_df(int(0).to_bytes(0) + self.V, 888).to_bytes(111) + self.reseed_counter = 1 + + def hash_gen(self, requested_number_of_bits: int): + m = int(math.ceil(requested_number_of_bits / 512)) + data = self.V + w = b'' + for i in range(m): + hasher = hashlib.sha512() + hasher.update(data) + w += hasher.digest() + data = int.from_bytes(data) + data = (data + 1) % 2 ** 888 + data = data.to_bytes(111) + w = int.from_bytes(w) + w = w >> (512 * m - requested_number_of_bits) + return w + + def get_random_bytes(self, number_of_bytes: int): + return_bytes = self.hash_gen(number_of_bytes * 8).to_bytes(number_of_bytes) + hasher = hashlib.sha512() + hasher.update(int(3).to_bytes(1) + self.V) + h = hasher.digest() + new_v = (int.from_bytes(self.V) + int.from_bytes(h) + int.from_bytes(self.C) + self.reseed_counter) % 2 ** 888 + self.V = new_v.to_bytes(111) + self.reseed_counter += 1 + return return_bytes + + +# Hash derivation function as specified in section 10.3.1 of NIST Special Publication 800-90A, Revision 1 +def hash_df(input_string: bytes, number_of_bits: int): + temp = b'' + length = int(math.ceil(number_of_bits / 512)) + for i in range(length): + hash_input = (i + 1).to_bytes(1) + number_of_bits.to_bytes(4) + input_string + m = hashlib.sha512() + m.update(hash_input) + temp += m.digest() + number = int.from_bytes(temp) + number = number >> (512 * length - number_of_bits) + return number \ No newline at end of file diff --git a/classes/Crypto/CommutativeCipher.py b/classes/Crypto/CommutativeCipher.py new file mode 100644 index 0000000..982df6a --- /dev/null +++ b/classes/Crypto/CommutativeCipher.py @@ -0,0 +1,92 @@ +import base64 +import secrets +import math +import Crypto.Util + + +# This commutative cipher is based on the SRA cryptographical system, which is just a modification of RSA where the +# modulus n is known, but both the encryption and decryption exponents are kept secret. As long as both keys use the +# same modulus, this cryptography system is commutative, i.e. Ea(Eb(x)) = Eb(Ea(x)) if encryption with key a is denoted +# as Ea() and encryption with key b is denoted as Eb. +class CommutativeCipher: + def __init__(self, p, q): + self.n = p*q + carmichael_function = (p-1) * (q-1) + + # Make the exponent have almost as many bits as the modulus + number_of_bits = int(math.ceil(math.log(self.n) / math.log(2))) + self.e = Crypto.Util.number.getPrime(number_of_bits-10, randfunc=secrets.token_bytes) + + self.d = pow(self.e, -1, carmichael_function) + + def encode(self, message): + message_was_base64 = False + message_was_bytes = False + + if isinstance(message, str): + message_bytes = base64.b64decode(message) + message = message_bytes + message_was_base64 = True + + try: + message_int = int.from_bytes(message) + message = message_int + message_was_bytes = True + except TypeError: + # Assume message is already an integer + pass + + if not isinstance(message, int): + raise Exception( + 'The message to encrypt was not of the correct type (base64 string, bytes-like object, or integer' + ) + + if message >= self.n: + raise Exception( + 'The message is equal to or larger than the modulus' + ) + + encrypted = pow(message, self.e, self.n) + + if message_was_bytes: + # Find number of bits + number_of_bits = int(math.ceil(math.log(encrypted) / math.log(2))) + number_of_bytes = int(math.ceil(number_of_bits / 8)) + encrypted = encrypted.to_bytes(number_of_bytes) + + if message_was_base64: + encrypted = base64.b64encode(encrypted) + + return encrypted + + def decode(self, cipher): + cipher_was_base64 = False + cipher_was_bytes = False + + if isinstance(cipher, str): + cipher_was_base64 = True + cipher = base64.b64decode(cipher) + + try: + cipher_int = int.from_bytes(cipher) + cipher = cipher_int + cipher_was_bytes = True + except TypeError: + pass + + if not isinstance(cipher, int): + raise Exception('The passed cipher was not a valid type (base64 string, bytes object or integer)') + + if cipher >= self.n: + raise Exception('The passed cipher is equal to or larger than the modulus') + + decrypted = pow(cipher, self.d, self.n) + if cipher_was_bytes: + number_of_bits = int(math.ceil(math.log(decrypted)/math.log(2))) + number_of_bytes = int(math.ceil(number_of_bits / 8)) + decrypted = decrypted.to_bytes(number_of_bytes) + + if cipher_was_base64: + decrypted = base64.b64encode(decrypted) + + return decrypted \ No newline at end of file diff --git a/classes/Crypto/__init__.py b/classes/Crypto/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/classes/Message.py b/classes/Message.py new file mode 100644 index 0000000..ad64c8e --- /dev/null +++ b/classes/Message.py @@ -0,0 +1,41 @@ +import base64 +import json +from Crypto.Hash import SHA256 +from Crypto.PublicKey.ECC import EccKey +from Crypto.Signature import DSS + +class Message: + def __init__(self, fields = None): + self.message_fields = {} + if fields is not None: + for field in fields: + self.message_fields[field] = fields[field] + + def generate_and_sign(self, private_key: EccKey): + if 'signature' in self.message_fields: + del self.message_fields['signature'] + + message = json.dumps(self.message_fields) + h = SHA256.new(message.encode('utf-8')) + signer = DSS.new(private_key, 'fips-186-3') + signature = signer.sign(h) + self.message_fields['signature'] = base64.b64encode(signature).decode('utf-8') + return json.dumps(self.message_fields) + + def check_signature(self, public_key: EccKey): + signature = base64.b64decode(self.message_fields['signature']) + message_copy = self.message_fields.copy() + del message_copy['signature'] + message = json.dumps(message_copy) + h = SHA256.new(message.encode('utf-8')) + verifier = DSS.new(public_key, 'fips-186-3') + + try: + verifier.verify(h, signature) + return True + except ValueError: + return False + + def get_name(self) -> str: + # All subclasses have a get_name function, which tells who sent the message + pass \ No newline at end of file diff --git a/classes/MessageHandler.py b/classes/MessageHandler.py new file mode 100644 index 0000000..9e7af93 --- /dev/null +++ b/classes/MessageHandler.py @@ -0,0 +1,168 @@ +import base64 +import binascii +import json +import logging +from collections.abc import Callable +from json import JSONDecodeError + +from Crypto.PublicKey import ECC + +from classes.Message import Message +from classes.MessageTypes.Announcement import AnnouncementMessage +from classes.MessageTypes.Introduction import IntroductionMessage +from classes.MessageTypes.Ready import ReadyMessage +from classes.MessageTypes.Shuffle import ShuffleMessage + +logger = logging.getLogger(__name__) + + +class MessageHandler: + def __init__(self): + self.receivers: list[Callable[[Message], None]] = [] + + def send_message(self, message: Message): + # Must be implemented by child classes + pass + + def add_message_receiver(self, message_receiver: Callable[[Message], None]): + self.receivers.append(message_receiver) + + def remove_message_receiver(self, message_receiver: Callable[[Message], None]): + self.receivers.remove(message_receiver) + + def decode_received_message(self, message_string: str): + try: + message_object = json.loads(message_string) + except JSONDecodeError | UnicodeDecodeError: + logger.error(f'Could not decode received string {message_string}') + return + + message_type = message_object.get('type') + if message_type is None: + logger.error(f'Message type not found in message {message_string}') + return + + message = None + + if message_type == 'introduction': + name = message_object.get('name') + seed_commit = message_object.get('seed_commit') + key = message_object.get('key') + + if None in [name, seed_commit, key]: + logger.error(f'Did not find expected fields for introduction message: {message_string}') + return + + elif not isinstance(name, str) or not isinstance(seed_commit, str) or not isinstance(key, str): + logger.error(f'Received data of the wrong type for introduction message: {message_string}') + return + + try: + key = ECC.import_key(key) + except ValueError: + logger.error(f'{key} is not a valid key') + return + + try: + seed_commit = base64.b64decode(seed_commit, validate=True) + except binascii.Error: + logger.error(f'Seed commit {seed_commit} is not a valid base64 string') + return + + message = IntroductionMessage(name, key, seed_commit) + + elif message_type == 'ready': + name = message_object.get('name') + participants = message_object.get('participants') + random_seed = message_object.get('random_seed') + + if None in [name, participants, random_seed]: + logger.error(f'Did not find expected fields for ready message {message_string}') + return + + elif not isinstance(name, str) or not isinstance(participants, list) or not isinstance(random_seed, str): + logger.error(f'Received data of the wrong type for ready message: {message_string}') + return + + elif not all(isinstance(participant, tuple) for participant in participants): + logger.error(f'Not all participants in participant list are tuples: {message_string}') + return + + elif not all(len(participant_tuple) == 2 for participant_tuple in participants): + logger.error(f'Not all participant tuples are of length two in {message_string}') + return + + elif not all(isinstance(name, str) and isinstance(key, bytes) for name, key in participants): + logger.error(f'Not all participant tuples contain a name and a key') + return + + decoded_participants = [] + for participant_name, key in participants: + try: + key = ECC.import_key(key) + decoded_participants.append((participant_name, key)) + except ValueError: + logger.error(f'Could not decode public key {key} of participant {participant_name}') + return + + try: + random_seed = base64.b64decode(random_seed, validate=True) + except binascii.Error: + logger.error(f'Random seed {random_seed} from {name} is not a valid base64 string') + return + + message = ReadyMessage(name, participants, random_seed) + + elif message_type == 'shuffle': + name = message_object.get('name') + cards = message_object.get('cards') + stage = message_object.get('stage') + + if None in [name, cards, stage]: + logger.error(f'Did not receive all expected fields for shuffle message: {message_string}') + return + + elif not isinstance(name, str) or not isinstance(cards, list) or not isinstance(stage, str): + logger.error(f'Received fields were not correct type for shuffle message: {message_string}') + return + + elif not all(isinstance(card, str) for card in cards): + logger.error(f'All received cards were not of type string: {message_string}') + return + + new_cards = [] + for card in cards: + try: + new_cards.append(base64.b64decode(card, validate=True)) + except binascii.Error: + logger.error(f'{card} is not a valid base64 string') + return + + message = ShuffleMessage(name, new_cards, stage) + + elif message_type == 'announcement': + name = message_object.get('name') + announcement = message_object.get('announcement') + + if None in [name, announcement]: + logger.error(f'Did not receive all expected fields for announcement message: {message_string}') + return + + elif not isinstance(name, str) or not isinstance(announcement, str): + logger.error(f'Received fields for announcement message are not of correct type: {message_string}') + return + + try: + announcement = base64.b64decode(announcement, validate=True) + except binascii.Error: + logger.error(f'{announcement} is not a valid base64 string') + return + + message = AnnouncementMessage(name, announcement) + + if message is None: + logger.error(f'Message type {message_type} does not exist') + return + + for message_receiver in self.receivers: + message_receiver(message) diff --git a/classes/MessageTypes/Announcement.py b/classes/MessageTypes/Announcement.py new file mode 100644 index 0000000..2e5870e --- /dev/null +++ b/classes/MessageTypes/Announcement.py @@ -0,0 +1,23 @@ +import base64 + +from classes.Message import Message + + +class AnnouncementMessage(Message): + def __init__(self, name: str, announcement: bytes): + super().__init__() + self.message_fields['type'] = 'announcement' + self.message_fields['name'] = name + self.message_fields['announcement'] = announcement + + def set_name(self, name: str): + self.message_fields['name'] = name + + def get_name(self) -> str: + return self.message_fields['name'] + + def set_announcement(self, announcement: bytes): + self.message_fields['announcement'] = base64.b64encode(announcement).decode('utf-8') + + def get_announcement(self) -> bytes: + return base64.b64decode(self.message_fields['announcement']) diff --git a/classes/MessageTypes/Introduction.py b/classes/MessageTypes/Introduction.py new file mode 100644 index 0000000..639f264 --- /dev/null +++ b/classes/MessageTypes/Introduction.py @@ -0,0 +1,33 @@ +import base64 + +from Crypto.PublicKey import ECC +from Crypto.PublicKey.ECC import EccKey + +from classes.Message import Message + + +class IntroductionMessage(Message): + def __init__(self, name: str, public_key: EccKey, random_seed_commit: bytes): + super().__init__() + self.message_fields['type'] = 'introduction' + self.set_name(name) + self.set_seed_commit(random_seed_commit) + self.set_key(public_key) + + def get_name(self) -> str: + return self.message_fields['name'] + + def set_name(self, name: str): + self.message_fields['name'] = name + + def get_seed_commit(self) -> bytes: + return base64.b64decode(self.message_fields['seed_commit']) + + def set_seed_commit(self, seed_commit: bytes): + self.message_fields['seed_commit'] = base64.b64encode(seed_commit).decode('utf-8') + + def get_key(self) -> EccKey: + return ECC.import_key(self.message_fields['key']) + + def set_key(self, key: EccKey): + self.message_fields['key'] = key.public_key().export_key(format="OpenSSH").strip() diff --git a/classes/MessageTypes/Ready.py b/classes/MessageTypes/Ready.py new file mode 100644 index 0000000..c801b9d --- /dev/null +++ b/classes/MessageTypes/Ready.py @@ -0,0 +1,41 @@ +import base64 + +from Crypto.PublicKey import ECC +from Crypto.PublicKey.ECC import EccKey + +from classes.Message import Message + + +class ReadyMessage(Message): + def __init__(self, name: str, participants: list[tuple[str, EccKey]], random_seed: bytes): + super().__init__() + self.message_fields['type'] = 'ready' + self.set_name(name) + self.set_participants(participants) + self.set_random_seed(random_seed) + + def set_name(self, name: str): + self.message_fields['name'] = name + + def get_name(self) -> str: + return self.message_fields['name'] + + def set_participants(self, participants: list[tuple[str, EccKey]]): + self.message_fields['participants'] = [] + for name, key in participants: + key = key.public_key().export_key(format='OpenSSH') + self.message_fields['participants'].append((name, key)) + + def get_participants(self) -> list[tuple[str, EccKey]]: + participant_list = [] + for name, key in self.message_fields['participants']: + key = ECC.import_key(key) + participant_list.append((name, key)) + + return participant_list + + def set_random_seed(self, random_seed: bytes): + self.message_fields['random_seed'] = base64.b64encode(random_seed).decode('utf-8') + + def get_random_seed(self) -> bytes: + return base64.b64decode(self.message_fields['random_seed']) \ No newline at end of file diff --git a/classes/MessageTypes/Shuffle.py b/classes/MessageTypes/Shuffle.py new file mode 100644 index 0000000..c4b2136 --- /dev/null +++ b/classes/MessageTypes/Shuffle.py @@ -0,0 +1,35 @@ +import base64 + +from classes.Message import Message + + +class ShuffleMessage(Message): + def __init__(self, name: str, cards: list[bytes], stage: str): + super().__init__() + self.message_fields['type'] = 'shuffle' + self.set_name(name) + self.set_cards(cards) + self.set_stage(stage) + + def set_name(self, name: str): + self.message_fields['name'] = name + + def get_name(self) -> str: + return self.message_fields['name'] + + def set_cards(self, cards: list[bytes]): + self.message_fields['cards'] = [] + for card in cards: + self.message_fields['cards'].append(base64.b64encode(card).decode('utf-8')) + + def get_cards(self) -> list[bytes]: + cards = [] + for card in self.message_fields['cards']: + cards.append(base64.b64decode(card)) + return cards + + def set_stage(self, stage: str): + self.message_fields['stage'] = stage + + def get_stage(self) -> str: + return self.message_fields['stage'] \ No newline at end of file diff --git a/classes/MessageTypes/__init__.py b/classes/MessageTypes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/classes/SantasBrain.py b/classes/SantasBrain.py new file mode 100644 index 0000000..16b7800 --- /dev/null +++ b/classes/SantasBrain.py @@ -0,0 +1,213 @@ +import hashlib +import logging +import secrets +import threading +from typing import Optional, Callable + +from Crypto.PublicKey import ECC +from Crypto.PublicKey.ECC import EccKey +from Crypto.Util.number import getPrime + +from classes import Message +from classes.Crypto.CSPRNG import CSPRNG +from classes.Crypto.CommutativeCipher import CommutativeCipher +from classes.MessageHandler import MessageHandler +from classes.MessageTypes.Introduction import IntroductionMessage +from classes.MessageTypes.Ready import ReadyMessage +from classes.UserInterface import UserInterface + +logger = logging.getLogger(__name__) + +class Brain: + def __init__(self, message_handler: MessageHandler, user_interface: UserInterface): + # We're going to need to do some + self.thread_lock = threading.Lock() + self.message_handler = message_handler + self.user_interface = user_interface + + self.other_possible_participants: dict[str, tuple[EccKey, bytes]] = {} + self.name = None + self.extra_info_for_santa = None + self.key = ECC.generate(curve='p256') + self.random_seed = secrets.token_bytes(32) + + self.message_handler.add_message_receiver(self.receive_message) + self.user_interface.add_user_info_listener(self.set_user_data) + self.user_interface.add_start_listener(self.receive_user_start_command) + + # The following fields are used during the exchange itself + self.other_participants: dict[str, tuple[EccKey, bytes]] = {} + self.other_ready_participants: dict[str, tuple[list[str], bytes]] = {} + + self.first_shuffle_key: CommutativeCipher = None + self.second_shuffle_key: CommutativeCipher = None + self.third_shuffle_key: CommutativeCipher = None + self.card_values: list[bytes] = [] + + def set_user_data(self, name: str, extra_info_for_santa: str): + with self.thread_lock: + if name in self.other_possible_participants: + logger.error(f'Participant already exists with username {name}. Please choose another.') + return + self.name = name + self.extra_info_for_santa = extra_info_for_santa + + # Send our introductions out on the network + self.send_introduction_message() + + def receive_user_start_command(self, other_participant_names: list[str]): + with self.thread_lock: + other_participants = {} + for other_participant_name in other_participant_names: + if other_participant_name not in self.other_possible_participants: + logger.error(f'Tried to start an exchange containing unknown participant {other_participant_name}') + return + + other_participants[other_participant_name] = self.other_possible_participants[other_participant_name] + + self.other_participants = other_participants + self.send_ready_message() + after_lock_command = None + with self.thread_lock: + if self.check_if_ready(): + after_lock_command = self.start_exchange_process() + if after_lock_command is not None: + after_lock_command() + + def receive_message(self, message: Message): + after_lock_call: Optional[Callable[[], None]] = None + with self.thread_lock: + # First, check if message signature is correct + sender_name = message.get_name() + key = None + if sender_name in self.other_possible_participants: + key, _ = self.other_possible_participants[sender_name] + elif isinstance(message, IntroductionMessage): + key = message.get_key() + + if key is None: + logger.warning(f'Received message from participant {sender_name}, but there is no validation key.') + return + + if not message.check_signature(key): + logger.warning(f'Received message from participant {sender_name} with invalid signature. Ignoring.') + return + + if isinstance(message, IntroductionMessage): + after_lock_call = self.handle_introduction(message) + elif isinstance(message, ReadyMessage): + after_lock_call = self.receive_ready_message(message) + + if after_lock_call is not None: + after_lock_call() + + def handle_introduction(self, introduction: IntroductionMessage) -> Optional[Callable[[], None]]: + name = introduction.get_name() + key = introduction.get_key() + commit = introduction.get_seed_commit() + + # Check if it's a participant we already know about + if name in self.other_possible_participants: + previous_key, previous_commit = self.other_possible_participants[name] + + # Either it's a participant we already know about, or it's someone trying to use the same name + # as a previous participant. Either way, we don't really do anything + if previous_key != key or previous_commit != commit: + logger.warning(f'A second participant tried to register with the already used name {name}. Ignoring.') + return None + + self.other_possible_participants[name] = (key, commit) + # Since this participant is new, they might not know about us yet. Send an introduction + # Also, tell the user interface about this new user + def post_introduction(): + self.send_introduction_message() + self.user_interface.add_user(name) + return post_introduction + + def send_introduction_message(self): + if self.name is None or self.key is None or self.random_seed is None: + return + + # We need to commit to our random seed by hashing it, but we don't actually want to send the seed itself yet, + # to prevent others from crafting their seeds based on the value of ours + hasher = hashlib.sha512() + hasher.update(self.random_seed) + introduction_message = IntroductionMessage(self.name, self.key.public_key(), hasher.digest()) + introduction_message.generate_and_sign(self.key) + self.message_handler.send_message(introduction_message) + + def send_ready_message(self): + other_participants = [(name, self.other_possible_participants[name][0]) for name in self.other_possible_participants] + ready_message = ReadyMessage(self.name, other_participants, self.random_seed) + ready_message.generate_and_sign(self.key) + self.message_handler.send_message(ready_message) + + def receive_ready_message(self, ready_message: ReadyMessage) -> Optional[Callable[[], None]]: + sender_name = ready_message.get_name() + sender_expected_participants = ready_message.get_participants() + sender_random_seed = ready_message.get_random_seed() + + if sender_name not in self.other_possible_participants: + logger.warning(f'Received ready message from unknown participant {sender_name}') + return None + + _, sender_commit = self.other_possible_participants[sender_name] + + hasher = hashlib.sha512() + hasher.update(sender_random_seed) + seed_hash = hasher.digest() + + if seed_hash != sender_commit: + logger.error(f'Participant {sender_name} sent random seed that did not match their initial commit!') + return None + + for expected_participant_name, expected_participant_key in sender_expected_participants: + if expected_participant_name not in self.other_possible_participants: + logger.warning(f'Participant {sender_name} expects exchange with unknown participant {expected_participant_name}') + return None + + our_key, _ = self.other_possible_participants[expected_participant_name] + if our_key != expected_participant_key: + logger.error(f'Participant {sender_name} has different public key for participant {expected_participant_name} than we have.') + return None + self.other_ready_participants[sender_name] = ([name for name, _ in sender_expected_participants], sender_random_seed) + + if self.check_if_ready(): + return self.start_exchange_process() + + def check_if_ready(self): + list_of_names = set([name for name in self.other_participants] + [self.name]) + for name in self.other_participants: + if name not in self.other_ready_participants: + return False + other_list_of_names = set(self.other_ready_participants[name][0] + [name]) + if list_of_names != other_list_of_names: + logger.critical(f'Participant {name} does not have the same list of participants as us!') + return False + return True + + def start_exchange_process(self) -> Optional[Callable[[], None]]: + # XOR all the seeds together + all_seeds = [self.random_seed] + for name in self.other_participants: + all_seeds.append(self.other_ready_participants[name][1]) + + longest_seed_length = max(len(seed) for seed in all_seeds) + total_seed = b'0' * longest_seed_length + for seed in all_seeds: + seed = b'0' * (longest_seed_length - len(seed)) + seed + total_seed = bytes(a ^ b for a, b in zip(total_seed, seed)) + + # Use the total random seed to initialize a cryptographically secure pseudo-random number generator + csprng = CSPRNG(total_seed) + # Since everyone is using the same seed, everyone should get same values for p and q + p = getPrime(1200, randfunc=csprng.get_random_bytes) + q = getPrime(800, randfunc=csprng.get_random_bytes) + + self.first_shuffle_key = CommutativeCipher(p, q) + self.second_shuffle_key = CommutativeCipher(p, q) + self.third_shuffle_key = CommutativeCipher(p, q) + # In the same way, everyone should get the same values for each card + for i in range(len(self.other_participants) + 1): + self.card_values.append(csprng.get_random_bytes(16)) + diff --git a/classes/UserInterface.py b/classes/UserInterface.py new file mode 100644 index 0000000..7ce4aeb --- /dev/null +++ b/classes/UserInterface.py @@ -0,0 +1,15 @@ +from collections.abc import Callable + + +class UserInterface: + def add_user(self, name: str): + pass + + def add_user_info_listener(self, callback: Callable[[str, str], None]): + pass + + def add_start_listener(self, callback: Callable[[list[str]], None]): + pass + + def announce_recipient(self, name: str, other_info: str): + pass diff --git a/classes/__init__.py b/classes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/main.py b/main.py new file mode 100644 index 0000000..e7d19f8 --- /dev/null +++ b/main.py @@ -0,0 +1,33 @@ +import secrets + +from Crypto.Util.number import getPrime +from Crypto.PublicKey import ECC + +from classes.Crypto.CommutativeCipher import CommutativeCipher +from classes.MessageTypes.Introduction import IntroductionMessage + +p = getPrime(1200) +q = getPrime(800) +cipher1 = CommutativeCipher(p, q) +cipher2 = CommutativeCipher(p, q) +message = 'Hei på deg'.encode('utf-8 ') +c1 = cipher1.encode(message) +c2 = cipher2.encode(c1) + +d1 = cipher1.decode(c2) +print(cipher2.decode(d1)) + +key = ECC.generate(curve='p256') + +test = key.public_key().export_key(format='OpenSSH') + +seed = secrets.randbits(256).to_bytes(32) + +key1 = ECC.import_key(test) +key2 = ECC.import_key(test) + +print(f'Keys are equal: {key1 == key2}') + +test = IntroductionMessage('Martin', key.public_key(), seed) +print(test.generate_and_sign(key)) +print(test.check_signature(key.public_key())) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f161669 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +pycryptodome~=3.21.0 \ No newline at end of file