commit 26921a74779c7fd80358c9713484c08468a51df0 Author: ennucore Date: Sun Oct 31 15:18:27 2021 +0300 Init diff --git a/model/alice.py b/model/alice.py new file mode 100644 index 0000000..1508057 --- /dev/null +++ b/model/alice.py @@ -0,0 +1,30 @@ +from dataclasses import dataclass, field +from channel import Channel, QChannelImpl +import random as r +import typing +from utils import bin_encode, bin_decode + + +@dataclass +class Alice: + channel: Channel + sent: typing.List[bool] = field(default_factory=list) + my_basises: typing.List[bool] = field(default_factory=list) + key: typing.List[bool] = field(default_factory=list) + + def generate_key_and_send(self, n: int = 100): + self.sent = [bool(r.randint(0, 1)) for _ in range(n)] + self.my_basises = [bool(r.randint(0, 1)) for _ in range(n)] + for key, basis in zip(self.sent, self.my_basises): + self.channel.send_bit(key, basis) + + def generate_key(self, n: int = 100): + self.generate_key_and_send(n) + bobs_basises, bobs_correctness = bin_decode(self.channel.recv_classical(n, is_bob=False)), bin_decode( + self.channel.recv_classical(n, is_bob=False)) + acceptance = [ + my_basis == bobs_basis and bobs_correctness + for my_basis, bobs_basis, bobs_correctness in + zip(self.my_basises, bobs_basises, bobs_correctness)] + self.channel.send_classical(bin_encode(acceptance), is_bob=False) + self.key = [k for k, c in zip(self.sent, acceptance) if c] diff --git a/model/bob.py b/model/bob.py new file mode 100644 index 0000000..4906156 --- /dev/null +++ b/model/bob.py @@ -0,0 +1,29 @@ +from dataclasses import dataclass, field +from channel import Channel, QChannelImpl +import random as r +import typing + +from model.utils import bin_encode, bin_decode + + +@dataclass +class Bob: + channel: Channel + my_results: typing.List[typing.Tuple[bool, bool]] = field(default_factory=list) + key: typing.List[bool] = field(default_factory=list) + true_basises: typing.List[bool] = field(default_factory=list) + my_basises: typing.List[bool] = field(default_factory=list) + + def recv_photons(self, n: int = 100): + self.my_basises = [bool(r.randint(0, 1)) for _ in range(n)] + self.my_results = self.channel.get_photon_results(self.my_basises) + + def send_basises_and_correctness(self): + self.channel.send_classical(bin_encode(self.my_basises)) + self.channel.send_classical(bin_encode([left + right == 1 for left, right in self.my_results])) + + def generate_key(self, n: int = 100): + self.recv_photons(n) + self.send_basises_and_correctness() + correctness = bin_decode(self.channel.recv_classical(n)) + self.key = [(False if k[0] else True) for k, c in zip(self.my_results, correctness) if c] diff --git a/model/channel.py b/model/channel.py new file mode 100644 index 0000000..7fea19e --- /dev/null +++ b/model/channel.py @@ -0,0 +1,90 @@ +import random +from dataclasses import dataclass, field +import typing +import abc +import numpy as np + + +class Channel(abc.ABC): + @abc.abstractmethod + def send_classical(self, data: str, is_bob: bool = True): + pass + + @abc.abstractmethod + def send_bit(self, bit: bool, basis: bool): + pass + + @abc.abstractmethod + def recv_classical(self, n: int, is_bob: bool = True) -> str: + pass + + @abc.abstractmethod + def get_photon_results(self, basises: typing.Optional[typing.List[bool]]) -> typing.List[typing.Tuple[bool, bool]]: + """ + Returns tuple of clicks of 0 and 1 for each bit + """ + pass + + +@dataclass +class QChannelImpl(Channel): + # [[(value, basis)]] + bob_photons: typing.List[typing.List[typing.Tuple[bool, bool]]] = field(default_factory=list) + bob_classical_data_pool: str = '' + alice_classical_data_pool: str = '' + # probability for a photon to go to the wrong detector + p_opt: float = 0.05 + # dark count + p_dc: float = 0.05 + # the average number of emitted photons + mu: float = 1 + # eta - the probability for a detector to react to a photon + detector_sensitivity: float = 0.8 + + def get_number_of_photos(self) -> int: + return round(np.random.poisson(self.mu)) + + def send_classical(self, data: str, is_bob: bool = True): + # print(f'{["Alice", "Bob"][is_bob]} is sending data: {data}') + if is_bob: + self.bob_classical_data_pool += data + else: + self.alice_classical_data_pool += data + + def send_bit(self, bit: bool, basis: bool): + n_photons = self.get_number_of_photos() + delta = [(bit, basis) for _i in range(n_photons)] + self.bob_photons.append(delta) + + def recv_classical(self, n: int, is_bob: bool = True) -> str: + pool_name = ['alice_classical_data_pool', 'bob_classical_data_pool'][not is_bob] + if n == -1: + n = len(getattr(self, pool_name)) + # print(f'{["Alice", "Bob"][is_bob]} is receiving {n} ' + # f'characters of data. Length now: {len(getattr(self, pool_name))}') + while len(getattr(self, pool_name)) < n: + pass + data = getattr(self, pool_name)[:n] + setattr(self, pool_name, getattr(self, pool_name)[n:]) + # print(f'{["Alice", "Bob"][is_bob]} received {n} characters') + return data + + def get_photon_results(self, basises: typing.Optional[typing.List[bool]]) -> typing.List[typing.Tuple[bool, bool]]: + res = list() + for photons, basis in zip(self.bob_photons, basises): + clicks = [random.uniform(0, 1) <= self.p_dc, random.uniform(0, 1) <= self.p_dc] + for ph_bit, ph_basis in photons: + if random.uniform(0, 1) <= self.detector_sensitivity: # We detected everything correctly + if ph_basis != basis: + if random.uniform(0, 1) > 0.5: + clicks[1] = True + else: + clicks[0] = True + continue + if random.uniform(0, 1) > self.p_opt: # Photon doesn't flip + clicks[ph_bit] = True + else: + clicks[not ph_bit] = False + res.append((clicks[0], clicks[1])) + self.bob_photons = list() + return res diff --git a/model/main.py b/model/main.py new file mode 100644 index 0000000..c7b1037 --- /dev/null +++ b/model/main.py @@ -0,0 +1,30 @@ +from alice import Alice +from bob import Bob +from channel import QChannelImpl +from threading import Thread + +if __name__ == '__main__': + n = 1000 + channel = QChannelImpl() + alice, bob = Alice(channel), Bob(channel) + alice_thread = Thread(target=lambda: alice.generate_key(n)) + bob_thread = Thread(target=lambda: bob.generate_key(n)) + alice_thread.start() + bob_thread.start() + while alice_thread.is_alive() or bob_thread.is_alive(): + pass + print(list(map(int, alice.key))) + print(list(map(int, bob.key))) + print('Alice bits: ', *list(map(int, alice.sent))) + print('Bob bits: ', + *list(map(lambda t: {(0, 1): 1, (1, 0): 0, (0, 0): 2, (1, 1): 3}[(int(t[0]), int(t[1]))], bob.my_results))) + print('Alice basises:', *list(map(int, alice.my_basises))) + print('Bob basises: ', *list(map(int, bob.my_basises))) + bob_correctness = [left + right == 1 for left, right in bob.my_results] + print(' ', *[ + int(k) if c and b1 == b2 else ' ' + for c, k, b1, b2 in zip(bob_correctness, alice.sent, bob.my_basises, alice.my_basises)]) + print(' ', *[ + {(0, 1): 1, (1, 0): 0, (0, 0): 2, (1, 1): 3}[(int(k[0]), int(k[1]))] if c and b1 == b2 else ' ' + for c, k, b1, b2 in zip(bob_correctness, bob.my_results, bob.my_basises, alice.my_basises)]) + print(f'{100 * sum(k1 == k2 for k1, k2 in zip(alice.key, bob.key)) / len(alice.key):.2f}%, key length: {len(alice.key)}') diff --git a/model/utils.py b/model/utils.py new file mode 100644 index 0000000..58928ee --- /dev/null +++ b/model/utils.py @@ -0,0 +1,9 @@ +import typing + + +def bin_encode(data: typing.List[bool]) -> str: + return ''.join(map(lambda c: str(int(c)), data)) + + +def bin_decode(data: str) -> typing.List[bool]: + return [c == '1' for c in data]