Worked a bit further. Not in a working state
This commit is contained in:
parent
889b6546ff
commit
e8e1fce791
@ -6,6 +6,7 @@ from collections.abc import Callable
|
||||
from json import JSONDecodeError
|
||||
|
||||
from Crypto.PublicKey import ECC
|
||||
from Crypto.PublicKey.ECC import EccKey
|
||||
|
||||
from classes.Message import Message
|
||||
from classes.MessageTypes.Announcement import AnnouncementMessage
|
||||
@ -20,7 +21,7 @@ class MessageHandler:
|
||||
def __init__(self):
|
||||
self.receivers: list[Callable[[Message], None]] = []
|
||||
|
||||
def send_message(self, message: Message):
|
||||
def send_message(self, message: Message, signing_key: EccKey):
|
||||
# Must be implemented by child classes
|
||||
pass
|
||||
|
||||
|
||||
@ -1,9 +1,13 @@
|
||||
import hashlib
|
||||
import logging
|
||||
import math
|
||||
import random
|
||||
import secrets
|
||||
import threading
|
||||
from collections import defaultdict
|
||||
from typing import Optional, Callable
|
||||
|
||||
import Crypto.Util.number
|
||||
from Crypto.PublicKey import ECC
|
||||
from Crypto.PublicKey.ECC import EccKey
|
||||
from Crypto.Util.number import getPrime
|
||||
@ -14,200 +18,343 @@ from classes.Crypto.CommutativeCipher import CommutativeCipher
|
||||
from classes.MessageHandler import MessageHandler
|
||||
from classes.MessageTypes.Introduction import IntroductionMessage
|
||||
from classes.MessageTypes.Ready import ReadyMessage
|
||||
from classes.MessageTypes.Shuffle import ShuffleMessage
|
||||
from classes.UserInterface import UserInterface
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SHUFFLE_CARDS_STAGE = 'shuffle_cards'
|
||||
DECRYPT_CARDS_STAGE = 'decrypt_cards'
|
||||
BUILD_ANONYMOUS_STAGE = 'build_announcement'
|
||||
SHUFFLE_ANONYMOUS_STAGE = 'shuffle_announcement'
|
||||
DECRYPT_ANONYMOUS_STAGE = 'decrypt_announcement'
|
||||
|
||||
class Brain:
|
||||
def __init__(self, message_handler: MessageHandler, user_interface: UserInterface):
|
||||
# We're going to need to do some
|
||||
self.thread_lock = threading.Lock()
|
||||
self.message_handler = message_handler
|
||||
self.user_interface = user_interface
|
||||
|
||||
self.other_possible_participants: dict[str, tuple[EccKey, bytes]] = {}
|
||||
self.name = None
|
||||
self.extra_info_for_santa = None
|
||||
self.key = ECC.generate(curve='p256')
|
||||
self.random_seed = secrets.token_bytes(32)
|
||||
# Store the names, public key and seed commits of each participant. We will assume that each participant uses
|
||||
# a unique name. It is expected that the participants in the secret santa will make sure that this condition is
|
||||
# met. If a non-unique name is received, throw an error to warn the user, and then ignore the new participant
|
||||
# with the existing name.
|
||||
self.known_participants: dict[str, tuple[EccKey, bytes]] = {}
|
||||
# Store who we expect to take part in the secret santa
|
||||
self.chosen_participants: Optional[list[str]] = None
|
||||
# Store all received messages (except Introduction messages), by sender
|
||||
self.received_messages: dict[str, list[Message]] = defaultdict(list)
|
||||
# Our own info, that we will send out
|
||||
self.own_name: Optional[str] = None
|
||||
self.signing_key: Optional[EccKey] = None
|
||||
self.random_seed: Optional[bytes] = None
|
||||
self.introduction_message: Optional[IntroductionMessage] = None
|
||||
self.info_for_santa: Optional[str] = None
|
||||
|
||||
self.message_handler.add_message_receiver(self.receive_message)
|
||||
self.user_interface.add_user_info_listener(self.set_user_data)
|
||||
# Secret key, used for receiving information about who your secret santa is
|
||||
self.secret_key = ECC.generate(curve='p256')
|
||||
|
||||
self.user_interface.add_user_info_listener(self.receive_user_info)
|
||||
self.user_interface.add_start_listener(self.receive_user_start_command)
|
||||
self.message_handler.add_message_receiver(self.receive_message)
|
||||
|
||||
# The following fields are used during the exchange itself
|
||||
self.other_participants: dict[str, tuple[EccKey, bytes]] = {}
|
||||
self.other_ready_participants: dict[str, tuple[list[str], bytes]] = {}
|
||||
# The variables we need through the secret santa process itself
|
||||
self.process_failed = False
|
||||
self.card_values: Optional[list[bytes]] = None
|
||||
self.card_exchange_cipher: Optional[CommutativeCipher] = None
|
||||
self.announcement_build_cipher: Optional[CommutativeCipher] = None
|
||||
self.announcement_shuffle_cipher: Optional[CommutativeCipher] = None
|
||||
self.sent_card_shuffling: bool = False
|
||||
self.sent_card_decryption: bool = False
|
||||
self.card_drawn: Optional[int] = None
|
||||
|
||||
self.first_shuffle_key: CommutativeCipher = None
|
||||
self.second_shuffle_key: CommutativeCipher = None
|
||||
self.third_shuffle_key: CommutativeCipher = None
|
||||
self.card_values: list[bytes] = []
|
||||
|
||||
def set_user_data(self, name: str, extra_info_for_santa: str):
|
||||
with self.thread_lock:
|
||||
if name in self.other_possible_participants:
|
||||
logger.error(f'Participant already exists with username {name}. Please choose another.')
|
||||
return
|
||||
self.name = name
|
||||
self.extra_info_for_santa = extra_info_for_santa
|
||||
|
||||
# Send our introductions out on the network
|
||||
self.send_introduction_message()
|
||||
|
||||
def receive_user_start_command(self, other_participant_names: list[str]):
|
||||
with self.thread_lock:
|
||||
other_participants = {}
|
||||
for other_participant_name in other_participant_names:
|
||||
if other_participant_name not in self.other_possible_participants:
|
||||
logger.error(f'Tried to start an exchange containing unknown participant {other_participant_name}')
|
||||
return
|
||||
|
||||
other_participants[other_participant_name] = self.other_possible_participants[other_participant_name]
|
||||
|
||||
self.other_participants = other_participants
|
||||
self.send_ready_message()
|
||||
after_lock_command = None
|
||||
with self.thread_lock:
|
||||
if self.check_if_ready():
|
||||
after_lock_command = self.start_exchange_process()
|
||||
if after_lock_command is not None:
|
||||
after_lock_command()
|
||||
|
||||
def receive_message(self, message: Message):
|
||||
after_lock_call: Optional[Callable[[], None]] = None
|
||||
with self.thread_lock:
|
||||
# First, check if message signature is correct
|
||||
sender_name = message.get_name()
|
||||
key = None
|
||||
if sender_name in self.other_possible_participants:
|
||||
key, _ = self.other_possible_participants[sender_name]
|
||||
elif isinstance(message, IntroductionMessage):
|
||||
key = message.get_key()
|
||||
|
||||
if key is None:
|
||||
logger.warning(f'Received message from participant {sender_name}, but there is no validation key.')
|
||||
return
|
||||
|
||||
if not message.check_signature(key):
|
||||
logger.warning(f'Received message from participant {sender_name} with invalid signature. Ignoring.')
|
||||
return
|
||||
|
||||
if isinstance(message, IntroductionMessage):
|
||||
after_lock_call = self.handle_introduction(message)
|
||||
elif isinstance(message, ReadyMessage):
|
||||
after_lock_call = self.receive_ready_message(message)
|
||||
|
||||
if after_lock_call is not None:
|
||||
after_lock_call()
|
||||
|
||||
def handle_introduction(self, introduction: IntroductionMessage) -> Optional[Callable[[], None]]:
|
||||
name = introduction.get_name()
|
||||
key = introduction.get_key()
|
||||
commit = introduction.get_seed_commit()
|
||||
|
||||
# Check if it's a participant we already know about
|
||||
if name in self.other_possible_participants:
|
||||
previous_key, previous_commit = self.other_possible_participants[name]
|
||||
|
||||
# Either it's a participant we already know about, or it's someone trying to use the same name
|
||||
# as a previous participant. Either way, we don't really do anything
|
||||
if previous_key != key or previous_commit != commit:
|
||||
logger.warning(f'A second participant tried to register with the already used name {name}. Ignoring.')
|
||||
return None
|
||||
|
||||
self.other_possible_participants[name] = (key, commit)
|
||||
# Since this participant is new, they might not know about us yet. Send an introduction
|
||||
# Also, tell the user interface about this new user
|
||||
def post_introduction():
|
||||
self.send_introduction_message()
|
||||
self.user_interface.add_user(name)
|
||||
return post_introduction
|
||||
|
||||
def send_introduction_message(self):
|
||||
if self.name is None or self.key is None or self.random_seed is None:
|
||||
def receive_user_info(self, name: str, info_for_santa: str):
|
||||
# We will only receive this once
|
||||
if (
|
||||
self.own_name is not None
|
||||
or self.signing_key is not None
|
||||
or self.introduction_message is not None
|
||||
or self.info_for_santa is not None
|
||||
or self.random_seed is not None
|
||||
):
|
||||
return
|
||||
|
||||
# We need to commit to our random seed by hashing it, but we don't actually want to send the seed itself yet,
|
||||
# to prevent others from crafting their seeds based on the value of ours
|
||||
self.own_name = name
|
||||
self.signing_key = ECC.generate(curve='p256')
|
||||
self.random_seed = secrets.token_bytes(32)
|
||||
self.info_for_santa = info_for_santa
|
||||
|
||||
# Need to create a commit for our random seed
|
||||
hasher = hashlib.sha512()
|
||||
hasher.update(self.random_seed)
|
||||
introduction_message = IntroductionMessage(self.name, self.key.public_key(), hasher.digest())
|
||||
introduction_message.generate_and_sign(self.key)
|
||||
self.message_handler.send_message(introduction_message)
|
||||
seed_commit = hasher.digest()
|
||||
self.introduction_message = IntroductionMessage(name, self.signing_key.public_key(), seed_commit)
|
||||
self.message_handler.send_message(self.introduction_message, self.signing_key)
|
||||
|
||||
def send_ready_message(self):
|
||||
other_participants = [(name, self.other_possible_participants[name][0]) for name in self.other_possible_participants]
|
||||
ready_message = ReadyMessage(self.name, other_participants, self.random_seed)
|
||||
ready_message.generate_and_sign(self.key)
|
||||
self.message_handler.send_message(ready_message)
|
||||
def receive_user_start_command(self, chosen_participants: list[str]):
|
||||
with self.thread_lock:
|
||||
# If we have not introduced ourselves to the world and selected a random seed and all that, ignore command
|
||||
if self.own_name is None or self.random_seed is None:
|
||||
return
|
||||
# If we have already selected who we want to exchange with, ignore this
|
||||
if self.chosen_participants is not None:
|
||||
return
|
||||
|
||||
def receive_ready_message(self, ready_message: ReadyMessage) -> Optional[Callable[[], None]]:
|
||||
sender_name = ready_message.get_name()
|
||||
sender_expected_participants = ready_message.get_participants()
|
||||
sender_random_seed = ready_message.get_random_seed()
|
||||
# Make sure that each participant we want to play with actually exists
|
||||
confirmed_existing = []
|
||||
|
||||
if sender_name not in self.other_possible_participants:
|
||||
logger.warning(f'Received ready message from unknown participant {sender_name}')
|
||||
return None
|
||||
# Make sure that no participant is repeated in the list
|
||||
chosen_participants = list(set(chosen_participants))
|
||||
|
||||
_, sender_commit = self.other_possible_participants[sender_name]
|
||||
for name in chosen_participants:
|
||||
if name not in self.known_participants:
|
||||
logger.error(f'Tried to start a secret santa with non-existing participant {name}')
|
||||
return
|
||||
key, _ = self.known_participants[name]
|
||||
confirmed_existing.append((name, key))
|
||||
|
||||
hasher = hashlib.sha512()
|
||||
hasher.update(sender_random_seed)
|
||||
seed_hash = hasher.digest()
|
||||
ready_message = ReadyMessage(self.own_name, confirmed_existing, self.random_seed)
|
||||
self.chosen_participants = sorted([name for name, key in confirmed_existing])
|
||||
self.message_handler.send_message(ready_message, self.signing_key)
|
||||
|
||||
if seed_hash != sender_commit:
|
||||
logger.error(f'Participant {sender_name} sent random seed that did not match their initial commit!')
|
||||
return None
|
||||
def receive_message(self, message: Message):
|
||||
# If this is an introduction message, it needs special handling
|
||||
if isinstance(message, IntroductionMessage):
|
||||
self.receive_introduction_message(message)
|
||||
# For normal, non-introduction messages, check that they're from who they purport to be from,
|
||||
# and add them to the list
|
||||
else:
|
||||
with self.thread_lock:
|
||||
name = message.get_name()
|
||||
if name not in self.known_participants:
|
||||
logger.warning(f'Received message from unknown participant {name}. Ignoring.')
|
||||
return
|
||||
key, _ = self.known_participants[name]
|
||||
if not message.check_signature(key):
|
||||
logger.warning(f'Received message that purports to be from {name} with invalid signature. Ignoring.')
|
||||
return
|
||||
self.received_messages[name].append(message)
|
||||
|
||||
for expected_participant_name, expected_participant_key in sender_expected_participants:
|
||||
if expected_participant_name not in self.other_possible_participants:
|
||||
logger.warning(f'Participant {sender_name} expects exchange with unknown participant {expected_participant_name}')
|
||||
return None
|
||||
# Each received message triggers the main function of this class
|
||||
messages_to_send = self.santa_loop()
|
||||
for message in messages_to_send:
|
||||
self.message_handler.send_message(message, self.signing_key)
|
||||
|
||||
our_key, _ = self.other_possible_participants[expected_participant_name]
|
||||
if our_key != expected_participant_key:
|
||||
logger.error(f'Participant {sender_name} has different public key for participant {expected_participant_name} than we have.')
|
||||
return None
|
||||
self.other_ready_participants[sender_name] = ([name for name, _ in sender_expected_participants], sender_random_seed)
|
||||
def receive_introduction_message(self, message: IntroductionMessage):
|
||||
discovered_new_participant = False
|
||||
name = message.get_name()
|
||||
key = message.get_key()
|
||||
|
||||
if self.check_if_ready():
|
||||
return self.start_exchange_process()
|
||||
with self.thread_lock:
|
||||
if name in self.known_participants and self.known_participants[name][0] != key:
|
||||
logger.error(
|
||||
f'There are two participants using the name {name}. All participants need unique name. '
|
||||
f'The second participant will be ignored, but this may lead to the santa exchange not being started.'
|
||||
)
|
||||
return
|
||||
|
||||
def check_if_ready(self):
|
||||
list_of_names = set([name for name in self.other_participants] + [self.name])
|
||||
for name in self.other_participants:
|
||||
if name not in self.other_ready_participants:
|
||||
if name not in self.known_participants:
|
||||
discovered_new_participant = True
|
||||
self.known_participants[name] = (key, message.get_seed_commit())
|
||||
|
||||
# If this a participant we don't already know about, chances are they don't know about us, either.
|
||||
# Send an introduction message, if we are ready for that
|
||||
if (
|
||||
discovered_new_participant
|
||||
and self.introduction_message is not None
|
||||
and self.signing_key is not None
|
||||
):
|
||||
self.message_handler.send_message(self.introduction_message, self.signing_key)
|
||||
|
||||
|
||||
# Santa's brain will be driven by receiving messages. We'll call this main method each time we receive a message.
|
||||
# This is probably inefficient, but it makes it easier to follow what the code is doing
|
||||
def santa_loop(self) -> list[Message]:
|
||||
# We don't want to send messages while holding the thread lock, in case this leads to a deadlock
|
||||
messages_to_send: list[Message] = []
|
||||
with self.thread_lock:
|
||||
# If something has caused the process to fail, don't try to do it again
|
||||
if self.process_failed:
|
||||
return messages_to_send
|
||||
# First of all, if the user hasn't pressed start yet, there is nothing to do here
|
||||
if self.chosen_participants is None:
|
||||
return messages_to_send
|
||||
# Next, check that the user has actually provided all the information we need to perform this process
|
||||
if self.own_name is None or self.info_for_santa is None or self.random_seed is None:
|
||||
return messages_to_send
|
||||
|
||||
# Next, if we haven't built our commutative ciphers and card values yet, attempt to do that first
|
||||
if (
|
||||
self.card_values is None
|
||||
or self.card_exchange_cipher is None
|
||||
or self.announcement_build_cipher is None
|
||||
or self.announcement_shuffle_cipher is None
|
||||
):
|
||||
should_continue = self.build_ciphers()
|
||||
if not should_continue:
|
||||
return messages_to_send
|
||||
|
||||
# We will give each participant a number, based on their alphabetical order
|
||||
all_participants = sorted(list(set(self.chosen_participants + [self.own_name])))
|
||||
|
||||
# Shuffle cards
|
||||
if not self.sent_card_shuffling:
|
||||
shuffle_message = self.build_shuffle_message(all_participants)
|
||||
if shuffle_message is None:
|
||||
return messages_to_send
|
||||
messages_to_send.append(shuffle_message)
|
||||
self.sent_card_shuffling = True
|
||||
|
||||
# Decrypt shuffled cards
|
||||
if not self.sent_card_decryption:
|
||||
decrypt_cards_message = self.build_decrypt_cards_message(all_participants)
|
||||
if decrypt_cards_message is None:
|
||||
return messages_to_send
|
||||
messages_to_send.append(decrypt_cards_message)
|
||||
self.sent_card_decryption = True
|
||||
|
||||
# Find which card we drew
|
||||
if self.card_drawn is None:
|
||||
own_index = all_participants.index(self.own_name)
|
||||
last_participant = all_participants[-1]
|
||||
message = next(
|
||||
(
|
||||
message for message in self.received_messages[last_participant]
|
||||
if isinstance(message, ShuffleMessage) and message.get_stage() == DECRYPT_CARDS_STAGE
|
||||
),
|
||||
None
|
||||
)
|
||||
if message is None:
|
||||
return messages_to_send
|
||||
self.decrypt_card_value(message.get_cards()[own_index])
|
||||
|
||||
|
||||
|
||||
|
||||
return messages_to_send
|
||||
|
||||
|
||||
def build_ciphers(self) -> bool:
|
||||
received_seeds = [self.random_seed]
|
||||
for name in self.chosen_participants:
|
||||
_, seed_commit = self.known_participants[name]
|
||||
ready_message = next(
|
||||
(message for message in self.received_messages[name] if isinstance(message, ReadyMessage)),
|
||||
None
|
||||
)
|
||||
if ready_message is None:
|
||||
# We haven't received ready messages from everyone yet, and so we cannot continue at the moment
|
||||
return False
|
||||
other_list_of_names = set(self.other_ready_participants[name][0] + [name])
|
||||
if list_of_names != other_list_of_names:
|
||||
logger.critical(f'Participant {name} does not have the same list of participants as us!')
|
||||
|
||||
# Check that the seed we received from this user matches the seed they committed to sending
|
||||
received_seed = ready_message.get_random_seed()
|
||||
hasher = hashlib.sha512()
|
||||
hasher.update(received_seed)
|
||||
expected_commit = hasher.digest()
|
||||
if expected_commit != seed_commit:
|
||||
logger.critical(
|
||||
f'Received random seed from user {name} that did not match their commit. '
|
||||
f'Cancelling secret santa exchange!'
|
||||
)
|
||||
self.process_failed = True
|
||||
return False
|
||||
|
||||
received_seeds.append(received_seed)
|
||||
# Next, create a combined random seed from all the provided seeds
|
||||
number_of_seed_bytes = max(len(seed) for seed in received_seeds)
|
||||
total_seed = b'\0' * number_of_seed_bytes
|
||||
for received_seed in received_seeds:
|
||||
# Pad seed with leading zeros if not long enough
|
||||
while len(received_seed) < number_of_seed_bytes:
|
||||
received_seed = b'\0' + received_seed
|
||||
# Xor seed with current seed so far
|
||||
total_seed = bytes(a ^ b for a, b in zip(total_seed, received_seed))
|
||||
|
||||
# Use this seed in a cryptographically secure random number generator
|
||||
random_generator = CSPRNG(total_seed)
|
||||
|
||||
p = Crypto.Util.number.getPrime(1500, randfunc=random_generator.get_random_bytes)
|
||||
q = Crypto.Util.number.getPrime(1000, randfunc=random_generator.get_random_bytes)
|
||||
|
||||
self.card_exchange_cipher = CommutativeCipher(p, q)
|
||||
self.announcement_build_cipher = CommutativeCipher(p, q)
|
||||
self.announcement_shuffle_cipher = CommutativeCipher(p, q)
|
||||
|
||||
self.card_values = [random_generator.get_random_bytes(8) for i in range(len(self.chosen_participants) + 1)]
|
||||
return True
|
||||
|
||||
def start_exchange_process(self) -> Optional[Callable[[], None]]:
|
||||
# XOR all the seeds together
|
||||
all_seeds = [self.random_seed]
|
||||
for name in self.other_participants:
|
||||
all_seeds.append(self.other_ready_participants[name][1])
|
||||
def build_shuffle_message(self, all_participants: list[str]) -> Optional[ShuffleMessage]:
|
||||
own_index = all_participants.index(self.own_name)
|
||||
if own_index == 0:
|
||||
# If we are the first participant, we must create the list
|
||||
card_deck = [card_value for card_value in self.card_values]
|
||||
else:
|
||||
previous_participant = all_participants[own_index - 1]
|
||||
message = next(
|
||||
(
|
||||
message for message in self.received_messages[previous_participant]
|
||||
if isinstance(message, ShuffleMessage) and message.get_stage() == SHUFFLE_CARDS_STAGE
|
||||
), None
|
||||
)
|
||||
# If we haven't received a shuffle message yet, we can't continue
|
||||
if message is None:
|
||||
return None
|
||||
|
||||
longest_seed_length = max(len(seed) for seed in all_seeds)
|
||||
total_seed = b'0' * longest_seed_length
|
||||
for seed in all_seeds:
|
||||
seed = b'0' * (longest_seed_length - len(seed)) + seed
|
||||
total_seed = bytes(a ^ b for a, b in zip(total_seed, seed))
|
||||
card_deck = message.get_cards()
|
||||
|
||||
# Use the total random seed to initialize a cryptographically secure pseudo-random number generator
|
||||
csprng = CSPRNG(total_seed)
|
||||
# Since everyone is using the same seed, everyone should get same values for p and q
|
||||
p = getPrime(1200, randfunc=csprng.get_random_bytes)
|
||||
q = getPrime(800, randfunc=csprng.get_random_bytes)
|
||||
# Shuffle by drawing random numbers from secret
|
||||
shuffled_deck = []
|
||||
while len(card_deck) > 0:
|
||||
drawn_card = secrets.randbelow(len(card_deck))
|
||||
shuffled_deck.append(card_deck[drawn_card])
|
||||
del card_deck[drawn_card]
|
||||
|
||||
self.first_shuffle_key = CommutativeCipher(p, q)
|
||||
self.second_shuffle_key = CommutativeCipher(p, q)
|
||||
self.third_shuffle_key = CommutativeCipher(p, q)
|
||||
# In the same way, everyone should get the same values for each card
|
||||
for i in range(len(self.other_participants) + 1):
|
||||
self.card_values.append(csprng.get_random_bytes(16))
|
||||
return ShuffleMessage(self.own_name, shuffled_deck, SHUFFLE_CARDS_STAGE)
|
||||
|
||||
def build_decrypt_cards_message(self, all_participants: list[str]) -> Optional[ShuffleMessage]:
|
||||
own_index = all_participants.index(self.own_name)
|
||||
# Again, special case if we are the first participant
|
||||
message = None
|
||||
previous_participant = all_participants[(own_index - 1) % len(all_participants)]
|
||||
if own_index == 0:
|
||||
message = next(
|
||||
(
|
||||
message for message in self.received_messages[previous_participant]
|
||||
if isinstance(message, ShuffleMessage) and message.get_stage() == SHUFFLE_CARDS_STAGE
|
||||
),
|
||||
None
|
||||
)
|
||||
else:
|
||||
message = next(
|
||||
(
|
||||
message for message in self.received_messages[previous_participant]
|
||||
if isinstance(message, ShuffleMessage) and message.get_stage() == DECRYPT_CARDS_STAGE
|
||||
),
|
||||
None
|
||||
)
|
||||
if message is None:
|
||||
return None
|
||||
|
||||
card_deck = message.get_cards()
|
||||
decrypted_cards = []
|
||||
for index, card_value in enumerate(card_deck):
|
||||
# Don't decrypt our own card
|
||||
if index == own_index:
|
||||
decrypted_cards.append(card_value)
|
||||
else:
|
||||
decrypted_cards.append(self.card_exchange_cipher.decode(card_value))
|
||||
|
||||
# In case we are the last participant in the list, we are actually ready to fully decrypt
|
||||
if own_index == len(all_participants) - 1:
|
||||
self.decrypt_card_value(decrypted_cards[-1])
|
||||
|
||||
return ShuffleMessage(self.own_name, decrypted_cards, DECRYPT_CARDS_STAGE)
|
||||
|
||||
def decrypt_card_value(self, card: bytes):
|
||||
decrypted_card_bytes = self.card_exchange_cipher.decode(card)
|
||||
if decrypted_card_bytes not in self.card_values:
|
||||
logging.critical(f'Received an invalid card after shuffling. Secret santa exchange failed!')
|
||||
self.process_failed = True
|
||||
return
|
||||
|
||||
self.card_drawn = self.card_values.index(decrypted_card_bytes)
|
||||
18
main.py
18
main.py
@ -1,13 +1,16 @@
|
||||
import base64
|
||||
import secrets
|
||||
from base64 import b64encode
|
||||
|
||||
import Crypto.Cipher.PKCS1_OAEP
|
||||
from Crypto.Util.number import getPrime
|
||||
from Crypto.PublicKey import ECC
|
||||
from Crypto.PublicKey import ECC, RSA
|
||||
|
||||
from classes.Crypto.CommutativeCipher import CommutativeCipher
|
||||
from classes.MessageTypes.Introduction import IntroductionMessage
|
||||
|
||||
p = getPrime(1200)
|
||||
q = getPrime(800)
|
||||
p = getPrime(1500)
|
||||
q = getPrime(1000)
|
||||
cipher1 = CommutativeCipher(p, q)
|
||||
cipher2 = CommutativeCipher(p, q)
|
||||
message = 'Hei på deg'.encode('utf-8 ')
|
||||
@ -31,3 +34,12 @@ print(f'Keys are equal: {key1 == key2}')
|
||||
test = IntroductionMessage('Martin', key.public_key(), seed)
|
||||
print(test.generate_and_sign(key))
|
||||
print(test.check_signature(key.public_key()))
|
||||
|
||||
rsa_key = RSA.generate(2048)
|
||||
message = 'Keep it secret! Keep it safe!'.encode('utf-8')
|
||||
cipher = Crypto.Cipher.PKCS1_OAEP.new(rsa_key.public_key())
|
||||
ciphertext = cipher.encrypt(message)
|
||||
decoder = Crypto.Cipher.PKCS1_OAEP.new(rsa_key)
|
||||
print(decoder.decrypt(ciphertext))
|
||||
|
||||
print(len(b64encode(rsa_key.public_key().export_key(format='DER'))))
|
||||
@ -1 +1 @@
|
||||
pycryptodome~=3.21.0
|
||||
pycryptodome>=3.21.0
|
||||
Loading…
x
Reference in New Issue
Block a user