Push current state to Gitea

This commit is contained in:
Martin Asprusten 2025-04-17 19:25:23 +02:00
commit 889b6546ff
16 changed files with 746 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
.idea/*
**/__pycache__/**

49
classes/Crypto/CSPRNG.py Normal file
View File

@ -0,0 +1,49 @@
import math
import hashlib
# According to NIST Special Publication 800-90A, Revision 1, this should be a cryptographically secure pseudo-random
# number generator, provided I've implemented it properly, which is of course very possible I haven't
class CSPRNG:
def __init__(self, entropy: bytes, nonce: bytes=b'', personalization_string: bytes=b''):
self.V = hash_df(entropy + nonce + personalization_string, 888).to_bytes(111)
self.C = hash_df(int(0).to_bytes(0) + self.V, 888).to_bytes(111)
self.reseed_counter = 1
def hash_gen(self, requested_number_of_bits: int):
m = int(math.ceil(requested_number_of_bits / 512))
data = self.V
w = b''
for i in range(m):
hasher = hashlib.sha512()
hasher.update(data)
w += hasher.digest()
data = int.from_bytes(data)
data = (data + 1) % 2 ** 888
data = data.to_bytes(111)
w = int.from_bytes(w)
w = w >> (512 * m - requested_number_of_bits)
return w
def get_random_bytes(self, number_of_bytes: int):
return_bytes = self.hash_gen(number_of_bytes * 8).to_bytes(number_of_bytes)
hasher = hashlib.sha512()
hasher.update(int(3).to_bytes(1) + self.V)
h = hasher.digest()
new_v = (int.from_bytes(self.V) + int.from_bytes(h) + int.from_bytes(self.C) + self.reseed_counter) % 2 ** 888
self.V = new_v.to_bytes(111)
self.reseed_counter += 1
return return_bytes
# Hash derivation function as specified in section 10.3.1 of NIST Special Publication 800-90A, Revision 1
def hash_df(input_string: bytes, number_of_bits: int):
temp = b''
length = int(math.ceil(number_of_bits / 512))
for i in range(length):
hash_input = (i + 1).to_bytes(1) + number_of_bits.to_bytes(4) + input_string
m = hashlib.sha512()
m.update(hash_input)
temp += m.digest()
number = int.from_bytes(temp)
number = number >> (512 * length - number_of_bits)
return number

View File

@ -0,0 +1,92 @@
import base64
import secrets
import math
import Crypto.Util
# This commutative cipher is based on the SRA cryptographical system, which is just a modification of RSA where the
# modulus n is known, but both the encryption and decryption exponents are kept secret. As long as both keys use the
# same modulus, this cryptography system is commutative, i.e. Ea(Eb(x)) = Eb(Ea(x)) if encryption with key a is denoted
# as Ea() and encryption with key b is denoted as Eb.
class CommutativeCipher:
def __init__(self, p, q):
self.n = p*q
carmichael_function = (p-1) * (q-1)
# Make the exponent have almost as many bits as the modulus
number_of_bits = int(math.ceil(math.log(self.n) / math.log(2)))
self.e = Crypto.Util.number.getPrime(number_of_bits-10, randfunc=secrets.token_bytes)
self.d = pow(self.e, -1, carmichael_function)
def encode(self, message):
message_was_base64 = False
message_was_bytes = False
if isinstance(message, str):
message_bytes = base64.b64decode(message)
message = message_bytes
message_was_base64 = True
try:
message_int = int.from_bytes(message)
message = message_int
message_was_bytes = True
except TypeError:
# Assume message is already an integer
pass
if not isinstance(message, int):
raise Exception(
'The message to encrypt was not of the correct type (base64 string, bytes-like object, or integer'
)
if message >= self.n:
raise Exception(
'The message is equal to or larger than the modulus'
)
encrypted = pow(message, self.e, self.n)
if message_was_bytes:
# Find number of bits
number_of_bits = int(math.ceil(math.log(encrypted) / math.log(2)))
number_of_bytes = int(math.ceil(number_of_bits / 8))
encrypted = encrypted.to_bytes(number_of_bytes)
if message_was_base64:
encrypted = base64.b64encode(encrypted)
return encrypted
def decode(self, cipher):
cipher_was_base64 = False
cipher_was_bytes = False
if isinstance(cipher, str):
cipher_was_base64 = True
cipher = base64.b64decode(cipher)
try:
cipher_int = int.from_bytes(cipher)
cipher = cipher_int
cipher_was_bytes = True
except TypeError:
pass
if not isinstance(cipher, int):
raise Exception('The passed cipher was not a valid type (base64 string, bytes object or integer)')
if cipher >= self.n:
raise Exception('The passed cipher is equal to or larger than the modulus')
decrypted = pow(cipher, self.d, self.n)
if cipher_was_bytes:
number_of_bits = int(math.ceil(math.log(decrypted)/math.log(2)))
number_of_bytes = int(math.ceil(number_of_bits / 8))
decrypted = decrypted.to_bytes(number_of_bytes)
if cipher_was_base64:
decrypted = base64.b64encode(decrypted)
return decrypted

View File

41
classes/Message.py Normal file
View File

@ -0,0 +1,41 @@
import base64
import json
from Crypto.Hash import SHA256
from Crypto.PublicKey.ECC import EccKey
from Crypto.Signature import DSS
class Message:
def __init__(self, fields = None):
self.message_fields = {}
if fields is not None:
for field in fields:
self.message_fields[field] = fields[field]
def generate_and_sign(self, private_key: EccKey):
if 'signature' in self.message_fields:
del self.message_fields['signature']
message = json.dumps(self.message_fields)
h = SHA256.new(message.encode('utf-8'))
signer = DSS.new(private_key, 'fips-186-3')
signature = signer.sign(h)
self.message_fields['signature'] = base64.b64encode(signature).decode('utf-8')
return json.dumps(self.message_fields)
def check_signature(self, public_key: EccKey):
signature = base64.b64decode(self.message_fields['signature'])
message_copy = self.message_fields.copy()
del message_copy['signature']
message = json.dumps(message_copy)
h = SHA256.new(message.encode('utf-8'))
verifier = DSS.new(public_key, 'fips-186-3')
try:
verifier.verify(h, signature)
return True
except ValueError:
return False
def get_name(self) -> str:
# All subclasses have a get_name function, which tells who sent the message
pass

168
classes/MessageHandler.py Normal file
View File

@ -0,0 +1,168 @@
import base64
import binascii
import json
import logging
from collections.abc import Callable
from json import JSONDecodeError
from Crypto.PublicKey import ECC
from classes.Message import Message
from classes.MessageTypes.Announcement import AnnouncementMessage
from classes.MessageTypes.Introduction import IntroductionMessage
from classes.MessageTypes.Ready import ReadyMessage
from classes.MessageTypes.Shuffle import ShuffleMessage
logger = logging.getLogger(__name__)
class MessageHandler:
def __init__(self):
self.receivers: list[Callable[[Message], None]] = []
def send_message(self, message: Message):
# Must be implemented by child classes
pass
def add_message_receiver(self, message_receiver: Callable[[Message], None]):
self.receivers.append(message_receiver)
def remove_message_receiver(self, message_receiver: Callable[[Message], None]):
self.receivers.remove(message_receiver)
def decode_received_message(self, message_string: str):
try:
message_object = json.loads(message_string)
except JSONDecodeError | UnicodeDecodeError:
logger.error(f'Could not decode received string {message_string}')
return
message_type = message_object.get('type')
if message_type is None:
logger.error(f'Message type not found in message {message_string}')
return
message = None
if message_type == 'introduction':
name = message_object.get('name')
seed_commit = message_object.get('seed_commit')
key = message_object.get('key')
if None in [name, seed_commit, key]:
logger.error(f'Did not find expected fields for introduction message: {message_string}')
return
elif not isinstance(name, str) or not isinstance(seed_commit, str) or not isinstance(key, str):
logger.error(f'Received data of the wrong type for introduction message: {message_string}')
return
try:
key = ECC.import_key(key)
except ValueError:
logger.error(f'{key} is not a valid key')
return
try:
seed_commit = base64.b64decode(seed_commit, validate=True)
except binascii.Error:
logger.error(f'Seed commit {seed_commit} is not a valid base64 string')
return
message = IntroductionMessage(name, key, seed_commit)
elif message_type == 'ready':
name = message_object.get('name')
participants = message_object.get('participants')
random_seed = message_object.get('random_seed')
if None in [name, participants, random_seed]:
logger.error(f'Did not find expected fields for ready message {message_string}')
return
elif not isinstance(name, str) or not isinstance(participants, list) or not isinstance(random_seed, str):
logger.error(f'Received data of the wrong type for ready message: {message_string}')
return
elif not all(isinstance(participant, tuple) for participant in participants):
logger.error(f'Not all participants in participant list are tuples: {message_string}')
return
elif not all(len(participant_tuple) == 2 for participant_tuple in participants):
logger.error(f'Not all participant tuples are of length two in {message_string}')
return
elif not all(isinstance(name, str) and isinstance(key, bytes) for name, key in participants):
logger.error(f'Not all participant tuples contain a name and a key')
return
decoded_participants = []
for participant_name, key in participants:
try:
key = ECC.import_key(key)
decoded_participants.append((participant_name, key))
except ValueError:
logger.error(f'Could not decode public key {key} of participant {participant_name}')
return
try:
random_seed = base64.b64decode(random_seed, validate=True)
except binascii.Error:
logger.error(f'Random seed {random_seed} from {name} is not a valid base64 string')
return
message = ReadyMessage(name, participants, random_seed)
elif message_type == 'shuffle':
name = message_object.get('name')
cards = message_object.get('cards')
stage = message_object.get('stage')
if None in [name, cards, stage]:
logger.error(f'Did not receive all expected fields for shuffle message: {message_string}')
return
elif not isinstance(name, str) or not isinstance(cards, list) or not isinstance(stage, str):
logger.error(f'Received fields were not correct type for shuffle message: {message_string}')
return
elif not all(isinstance(card, str) for card in cards):
logger.error(f'All received cards were not of type string: {message_string}')
return
new_cards = []
for card in cards:
try:
new_cards.append(base64.b64decode(card, validate=True))
except binascii.Error:
logger.error(f'{card} is not a valid base64 string')
return
message = ShuffleMessage(name, new_cards, stage)
elif message_type == 'announcement':
name = message_object.get('name')
announcement = message_object.get('announcement')
if None in [name, announcement]:
logger.error(f'Did not receive all expected fields for announcement message: {message_string}')
return
elif not isinstance(name, str) or not isinstance(announcement, str):
logger.error(f'Received fields for announcement message are not of correct type: {message_string}')
return
try:
announcement = base64.b64decode(announcement, validate=True)
except binascii.Error:
logger.error(f'{announcement} is not a valid base64 string')
return
message = AnnouncementMessage(name, announcement)
if message is None:
logger.error(f'Message type {message_type} does not exist')
return
for message_receiver in self.receivers:
message_receiver(message)

View File

@ -0,0 +1,23 @@
import base64
from classes.Message import Message
class AnnouncementMessage(Message):
def __init__(self, name: str, announcement: bytes):
super().__init__()
self.message_fields['type'] = 'announcement'
self.message_fields['name'] = name
self.message_fields['announcement'] = announcement
def set_name(self, name: str):
self.message_fields['name'] = name
def get_name(self) -> str:
return self.message_fields['name']
def set_announcement(self, announcement: bytes):
self.message_fields['announcement'] = base64.b64encode(announcement).decode('utf-8')
def get_announcement(self) -> bytes:
return base64.b64decode(self.message_fields['announcement'])

View File

@ -0,0 +1,33 @@
import base64
from Crypto.PublicKey import ECC
from Crypto.PublicKey.ECC import EccKey
from classes.Message import Message
class IntroductionMessage(Message):
def __init__(self, name: str, public_key: EccKey, random_seed_commit: bytes):
super().__init__()
self.message_fields['type'] = 'introduction'
self.set_name(name)
self.set_seed_commit(random_seed_commit)
self.set_key(public_key)
def get_name(self) -> str:
return self.message_fields['name']
def set_name(self, name: str):
self.message_fields['name'] = name
def get_seed_commit(self) -> bytes:
return base64.b64decode(self.message_fields['seed_commit'])
def set_seed_commit(self, seed_commit: bytes):
self.message_fields['seed_commit'] = base64.b64encode(seed_commit).decode('utf-8')
def get_key(self) -> EccKey:
return ECC.import_key(self.message_fields['key'])
def set_key(self, key: EccKey):
self.message_fields['key'] = key.public_key().export_key(format="OpenSSH").strip()

View File

@ -0,0 +1,41 @@
import base64
from Crypto.PublicKey import ECC
from Crypto.PublicKey.ECC import EccKey
from classes.Message import Message
class ReadyMessage(Message):
def __init__(self, name: str, participants: list[tuple[str, EccKey]], random_seed: bytes):
super().__init__()
self.message_fields['type'] = 'ready'
self.set_name(name)
self.set_participants(participants)
self.set_random_seed(random_seed)
def set_name(self, name: str):
self.message_fields['name'] = name
def get_name(self) -> str:
return self.message_fields['name']
def set_participants(self, participants: list[tuple[str, EccKey]]):
self.message_fields['participants'] = []
for name, key in participants:
key = key.public_key().export_key(format='OpenSSH')
self.message_fields['participants'].append((name, key))
def get_participants(self) -> list[tuple[str, EccKey]]:
participant_list = []
for name, key in self.message_fields['participants']:
key = ECC.import_key(key)
participant_list.append((name, key))
return participant_list
def set_random_seed(self, random_seed: bytes):
self.message_fields['random_seed'] = base64.b64encode(random_seed).decode('utf-8')
def get_random_seed(self) -> bytes:
return base64.b64decode(self.message_fields['random_seed'])

View File

@ -0,0 +1,35 @@
import base64
from classes.Message import Message
class ShuffleMessage(Message):
def __init__(self, name: str, cards: list[bytes], stage: str):
super().__init__()
self.message_fields['type'] = 'shuffle'
self.set_name(name)
self.set_cards(cards)
self.set_stage(stage)
def set_name(self, name: str):
self.message_fields['name'] = name
def get_name(self) -> str:
return self.message_fields['name']
def set_cards(self, cards: list[bytes]):
self.message_fields['cards'] = []
for card in cards:
self.message_fields['cards'].append(base64.b64encode(card).decode('utf-8'))
def get_cards(self) -> list[bytes]:
cards = []
for card in self.message_fields['cards']:
cards.append(base64.b64decode(card))
return cards
def set_stage(self, stage: str):
self.message_fields['stage'] = stage
def get_stage(self) -> str:
return self.message_fields['stage']

View File

213
classes/SantasBrain.py Normal file
View File

@ -0,0 +1,213 @@
import hashlib
import logging
import secrets
import threading
from typing import Optional, Callable
from Crypto.PublicKey import ECC
from Crypto.PublicKey.ECC import EccKey
from Crypto.Util.number import getPrime
from classes import Message
from classes.Crypto.CSPRNG import CSPRNG
from classes.Crypto.CommutativeCipher import CommutativeCipher
from classes.MessageHandler import MessageHandler
from classes.MessageTypes.Introduction import IntroductionMessage
from classes.MessageTypes.Ready import ReadyMessage
from classes.UserInterface import UserInterface
logger = logging.getLogger(__name__)
class Brain:
def __init__(self, message_handler: MessageHandler, user_interface: UserInterface):
# We're going to need to do some
self.thread_lock = threading.Lock()
self.message_handler = message_handler
self.user_interface = user_interface
self.other_possible_participants: dict[str, tuple[EccKey, bytes]] = {}
self.name = None
self.extra_info_for_santa = None
self.key = ECC.generate(curve='p256')
self.random_seed = secrets.token_bytes(32)
self.message_handler.add_message_receiver(self.receive_message)
self.user_interface.add_user_info_listener(self.set_user_data)
self.user_interface.add_start_listener(self.receive_user_start_command)
# The following fields are used during the exchange itself
self.other_participants: dict[str, tuple[EccKey, bytes]] = {}
self.other_ready_participants: dict[str, tuple[list[str], bytes]] = {}
self.first_shuffle_key: CommutativeCipher = None
self.second_shuffle_key: CommutativeCipher = None
self.third_shuffle_key: CommutativeCipher = None
self.card_values: list[bytes] = []
def set_user_data(self, name: str, extra_info_for_santa: str):
with self.thread_lock:
if name in self.other_possible_participants:
logger.error(f'Participant already exists with username {name}. Please choose another.')
return
self.name = name
self.extra_info_for_santa = extra_info_for_santa
# Send our introductions out on the network
self.send_introduction_message()
def receive_user_start_command(self, other_participant_names: list[str]):
with self.thread_lock:
other_participants = {}
for other_participant_name in other_participant_names:
if other_participant_name not in self.other_possible_participants:
logger.error(f'Tried to start an exchange containing unknown participant {other_participant_name}')
return
other_participants[other_participant_name] = self.other_possible_participants[other_participant_name]
self.other_participants = other_participants
self.send_ready_message()
after_lock_command = None
with self.thread_lock:
if self.check_if_ready():
after_lock_command = self.start_exchange_process()
if after_lock_command is not None:
after_lock_command()
def receive_message(self, message: Message):
after_lock_call: Optional[Callable[[], None]] = None
with self.thread_lock:
# First, check if message signature is correct
sender_name = message.get_name()
key = None
if sender_name in self.other_possible_participants:
key, _ = self.other_possible_participants[sender_name]
elif isinstance(message, IntroductionMessage):
key = message.get_key()
if key is None:
logger.warning(f'Received message from participant {sender_name}, but there is no validation key.')
return
if not message.check_signature(key):
logger.warning(f'Received message from participant {sender_name} with invalid signature. Ignoring.')
return
if isinstance(message, IntroductionMessage):
after_lock_call = self.handle_introduction(message)
elif isinstance(message, ReadyMessage):
after_lock_call = self.receive_ready_message(message)
if after_lock_call is not None:
after_lock_call()
def handle_introduction(self, introduction: IntroductionMessage) -> Optional[Callable[[], None]]:
name = introduction.get_name()
key = introduction.get_key()
commit = introduction.get_seed_commit()
# Check if it's a participant we already know about
if name in self.other_possible_participants:
previous_key, previous_commit = self.other_possible_participants[name]
# Either it's a participant we already know about, or it's someone trying to use the same name
# as a previous participant. Either way, we don't really do anything
if previous_key != key or previous_commit != commit:
logger.warning(f'A second participant tried to register with the already used name {name}. Ignoring.')
return None
self.other_possible_participants[name] = (key, commit)
# Since this participant is new, they might not know about us yet. Send an introduction
# Also, tell the user interface about this new user
def post_introduction():
self.send_introduction_message()
self.user_interface.add_user(name)
return post_introduction
def send_introduction_message(self):
if self.name is None or self.key is None or self.random_seed is None:
return
# We need to commit to our random seed by hashing it, but we don't actually want to send the seed itself yet,
# to prevent others from crafting their seeds based on the value of ours
hasher = hashlib.sha512()
hasher.update(self.random_seed)
introduction_message = IntroductionMessage(self.name, self.key.public_key(), hasher.digest())
introduction_message.generate_and_sign(self.key)
self.message_handler.send_message(introduction_message)
def send_ready_message(self):
other_participants = [(name, self.other_possible_participants[name][0]) for name in self.other_possible_participants]
ready_message = ReadyMessage(self.name, other_participants, self.random_seed)
ready_message.generate_and_sign(self.key)
self.message_handler.send_message(ready_message)
def receive_ready_message(self, ready_message: ReadyMessage) -> Optional[Callable[[], None]]:
sender_name = ready_message.get_name()
sender_expected_participants = ready_message.get_participants()
sender_random_seed = ready_message.get_random_seed()
if sender_name not in self.other_possible_participants:
logger.warning(f'Received ready message from unknown participant {sender_name}')
return None
_, sender_commit = self.other_possible_participants[sender_name]
hasher = hashlib.sha512()
hasher.update(sender_random_seed)
seed_hash = hasher.digest()
if seed_hash != sender_commit:
logger.error(f'Participant {sender_name} sent random seed that did not match their initial commit!')
return None
for expected_participant_name, expected_participant_key in sender_expected_participants:
if expected_participant_name not in self.other_possible_participants:
logger.warning(f'Participant {sender_name} expects exchange with unknown participant {expected_participant_name}')
return None
our_key, _ = self.other_possible_participants[expected_participant_name]
if our_key != expected_participant_key:
logger.error(f'Participant {sender_name} has different public key for participant {expected_participant_name} than we have.')
return None
self.other_ready_participants[sender_name] = ([name for name, _ in sender_expected_participants], sender_random_seed)
if self.check_if_ready():
return self.start_exchange_process()
def check_if_ready(self):
list_of_names = set([name for name in self.other_participants] + [self.name])
for name in self.other_participants:
if name not in self.other_ready_participants:
return False
other_list_of_names = set(self.other_ready_participants[name][0] + [name])
if list_of_names != other_list_of_names:
logger.critical(f'Participant {name} does not have the same list of participants as us!')
return False
return True
def start_exchange_process(self) -> Optional[Callable[[], None]]:
# XOR all the seeds together
all_seeds = [self.random_seed]
for name in self.other_participants:
all_seeds.append(self.other_ready_participants[name][1])
longest_seed_length = max(len(seed) for seed in all_seeds)
total_seed = b'0' * longest_seed_length
for seed in all_seeds:
seed = b'0' * (longest_seed_length - len(seed)) + seed
total_seed = bytes(a ^ b for a, b in zip(total_seed, seed))
# Use the total random seed to initialize a cryptographically secure pseudo-random number generator
csprng = CSPRNG(total_seed)
# Since everyone is using the same seed, everyone should get same values for p and q
p = getPrime(1200, randfunc=csprng.get_random_bytes)
q = getPrime(800, randfunc=csprng.get_random_bytes)
self.first_shuffle_key = CommutativeCipher(p, q)
self.second_shuffle_key = CommutativeCipher(p, q)
self.third_shuffle_key = CommutativeCipher(p, q)
# In the same way, everyone should get the same values for each card
for i in range(len(self.other_participants) + 1):
self.card_values.append(csprng.get_random_bytes(16))

15
classes/UserInterface.py Normal file
View File

@ -0,0 +1,15 @@
from collections.abc import Callable
class UserInterface:
def add_user(self, name: str):
pass
def add_user_info_listener(self, callback: Callable[[str, str], None]):
pass
def add_start_listener(self, callback: Callable[[list[str]], None]):
pass
def announce_recipient(self, name: str, other_info: str):
pass

0
classes/__init__.py Normal file
View File

33
main.py Normal file
View File

@ -0,0 +1,33 @@
import secrets
from Crypto.Util.number import getPrime
from Crypto.PublicKey import ECC
from classes.Crypto.CommutativeCipher import CommutativeCipher
from classes.MessageTypes.Introduction import IntroductionMessage
p = getPrime(1200)
q = getPrime(800)
cipher1 = CommutativeCipher(p, q)
cipher2 = CommutativeCipher(p, q)
message = 'Hei på deg'.encode('utf-8 ')
c1 = cipher1.encode(message)
c2 = cipher2.encode(c1)
d1 = cipher1.decode(c2)
print(cipher2.decode(d1))
key = ECC.generate(curve='p256')
test = key.public_key().export_key(format='OpenSSH')
seed = secrets.randbits(256).to_bytes(32)
key1 = ECC.import_key(test)
key2 = ECC.import_key(test)
print(f'Keys are equal: {key1 == key2}')
test = IntroductionMessage('Martin', key.public_key(), seed)
print(test.generate_and_sign(key))
print(test.check_signature(key.public_key()))

1
requirements.txt Normal file
View File

@ -0,0 +1 @@
pycryptodome~=3.21.0