Lev
3 years ago
commit
26921a7477
5 changed files with 188 additions and 0 deletions
@ -0,0 +1,30 @@
|
||||
from dataclasses import dataclass, field |
||||
from channel import Channel, QChannelImpl |
||||
import random as r |
||||
import typing |
||||
from utils import bin_encode, bin_decode |
||||
|
||||
|
||||
@dataclass |
||||
class Alice: |
||||
channel: Channel |
||||
sent: typing.List[bool] = field(default_factory=list) |
||||
my_basises: typing.List[bool] = field(default_factory=list) |
||||
key: typing.List[bool] = field(default_factory=list) |
||||
|
||||
def generate_key_and_send(self, n: int = 100): |
||||
self.sent = [bool(r.randint(0, 1)) for _ in range(n)] |
||||
self.my_basises = [bool(r.randint(0, 1)) for _ in range(n)] |
||||
for key, basis in zip(self.sent, self.my_basises): |
||||
self.channel.send_bit(key, basis) |
||||
|
||||
def generate_key(self, n: int = 100): |
||||
self.generate_key_and_send(n) |
||||
bobs_basises, bobs_correctness = bin_decode(self.channel.recv_classical(n, is_bob=False)), bin_decode( |
||||
self.channel.recv_classical(n, is_bob=False)) |
||||
acceptance = [ |
||||
my_basis == bobs_basis and bobs_correctness |
||||
for my_basis, bobs_basis, bobs_correctness in |
||||
zip(self.my_basises, bobs_basises, bobs_correctness)] |
||||
self.channel.send_classical(bin_encode(acceptance), is_bob=False) |
||||
self.key = [k for k, c in zip(self.sent, acceptance) if c] |
@ -0,0 +1,29 @@
|
||||
from dataclasses import dataclass, field |
||||
from channel import Channel, QChannelImpl |
||||
import random as r |
||||
import typing |
||||
|
||||
from model.utils import bin_encode, bin_decode |
||||
|
||||
|
||||
@dataclass |
||||
class Bob: |
||||
channel: Channel |
||||
my_results: typing.List[typing.Tuple[bool, bool]] = field(default_factory=list) |
||||
key: typing.List[bool] = field(default_factory=list) |
||||
true_basises: typing.List[bool] = field(default_factory=list) |
||||
my_basises: typing.List[bool] = field(default_factory=list) |
||||
|
||||
def recv_photons(self, n: int = 100): |
||||
self.my_basises = [bool(r.randint(0, 1)) for _ in range(n)] |
||||
self.my_results = self.channel.get_photon_results(self.my_basises) |
||||
|
||||
def send_basises_and_correctness(self): |
||||
self.channel.send_classical(bin_encode(self.my_basises)) |
||||
self.channel.send_classical(bin_encode([left + right == 1 for left, right in self.my_results])) |
||||
|
||||
def generate_key(self, n: int = 100): |
||||
self.recv_photons(n) |
||||
self.send_basises_and_correctness() |
||||
correctness = bin_decode(self.channel.recv_classical(n)) |
||||
self.key = [(False if k[0] else True) for k, c in zip(self.my_results, correctness) if c] |
@ -0,0 +1,90 @@
|
||||
import random |
||||
from dataclasses import dataclass, field |
||||
import typing |
||||
import abc |
||||
import numpy as np |
||||
|
||||
|
||||
class Channel(abc.ABC): |
||||
@abc.abstractmethod |
||||
def send_classical(self, data: str, is_bob: bool = True): |
||||
pass |
||||
|
||||
@abc.abstractmethod |
||||
def send_bit(self, bit: bool, basis: bool): |
||||
pass |
||||
|
||||
@abc.abstractmethod |
||||
def recv_classical(self, n: int, is_bob: bool = True) -> str: |
||||
pass |
||||
|
||||
@abc.abstractmethod |
||||
def get_photon_results(self, basises: typing.Optional[typing.List[bool]]) -> typing.List[typing.Tuple[bool, bool]]: |
||||
""" |
||||
Returns tuple of clicks of 0 and 1 for each bit |
||||
""" |
||||
pass |
||||
|
||||
|
||||
@dataclass |
||||
class QChannelImpl(Channel): |
||||
# [[(value, basis)]] |
||||
bob_photons: typing.List[typing.List[typing.Tuple[bool, bool]]] = field(default_factory=list) |
||||
bob_classical_data_pool: str = '' |
||||
alice_classical_data_pool: str = '' |
||||
# probability for a photon to go to the wrong detector |
||||
p_opt: float = 0.05 |
||||
# dark count |
||||
p_dc: float = 0.05 |
||||
# the average number of emitted photons |
||||
mu: float = 1 |
||||
# eta - the probability for a detector to react to a photon |
||||
detector_sensitivity: float = 0.8 |
||||
|
||||
def get_number_of_photos(self) -> int: |
||||
return round(np.random.poisson(self.mu)) |
||||
|
||||
def send_classical(self, data: str, is_bob: bool = True): |
||||
# print(f'{["Alice", "Bob"][is_bob]} is sending data: {data}') |
||||
if is_bob: |
||||
self.bob_classical_data_pool += data |
||||
else: |
||||
self.alice_classical_data_pool += data |
||||
|
||||
def send_bit(self, bit: bool, basis: bool): |
||||
n_photons = self.get_number_of_photos() |
||||
delta = [(bit, basis) for _i in range(n_photons)] |
||||
self.bob_photons.append(delta) |
||||
|
||||
def recv_classical(self, n: int, is_bob: bool = True) -> str: |
||||
pool_name = ['alice_classical_data_pool', 'bob_classical_data_pool'][not is_bob] |
||||
if n == -1: |
||||
n = len(getattr(self, pool_name)) |
||||
# print(f'{["Alice", "Bob"][is_bob]} is receiving {n} ' |
||||
# f'characters of data. Length now: {len(getattr(self, pool_name))}') |
||||
while len(getattr(self, pool_name)) < n: |
||||
pass |
||||
data = getattr(self, pool_name)[:n] |
||||
setattr(self, pool_name, getattr(self, pool_name)[n:]) |
||||
# print(f'{["Alice", "Bob"][is_bob]} received {n} characters') |
||||
return data |
||||
|
||||
def get_photon_results(self, basises: typing.Optional[typing.List[bool]]) -> typing.List[typing.Tuple[bool, bool]]: |
||||
res = list() |
||||
for photons, basis in zip(self.bob_photons, basises): |
||||
clicks = [random.uniform(0, 1) <= self.p_dc, random.uniform(0, 1) <= self.p_dc] |
||||
for ph_bit, ph_basis in photons: |
||||
if random.uniform(0, 1) <= self.detector_sensitivity: # We detected everything correctly |
||||
if ph_basis != basis: |
||||
if random.uniform(0, 1) > 0.5: |
||||
clicks[1] = True |
||||
else: |
||||
clicks[0] = True |
||||
continue |
||||
if random.uniform(0, 1) > self.p_opt: # Photon doesn't flip |
||||
clicks[ph_bit] = True |
||||
else: |
||||
clicks[not ph_bit] = False |
||||
res.append((clicks[0], clicks[1])) |
||||
self.bob_photons = list() |
||||
return res |
@ -0,0 +1,30 @@
|
||||
from alice import Alice |
||||
from bob import Bob |
||||
from channel import QChannelImpl |
||||
from threading import Thread |
||||
|
||||
if __name__ == '__main__': |
||||
n = 1000 |
||||
channel = QChannelImpl() |
||||
alice, bob = Alice(channel), Bob(channel) |
||||
alice_thread = Thread(target=lambda: alice.generate_key(n)) |
||||
bob_thread = Thread(target=lambda: bob.generate_key(n)) |
||||
alice_thread.start() |
||||
bob_thread.start() |
||||
while alice_thread.is_alive() or bob_thread.is_alive(): |
||||
pass |
||||
print(list(map(int, alice.key))) |
||||
print(list(map(int, bob.key))) |
||||
print('Alice bits: ', *list(map(int, alice.sent))) |
||||
print('Bob bits: ', |
||||
*list(map(lambda t: {(0, 1): 1, (1, 0): 0, (0, 0): 2, (1, 1): 3}[(int(t[0]), int(t[1]))], bob.my_results))) |
||||
print('Alice basises:', *list(map(int, alice.my_basises))) |
||||
print('Bob basises: ', *list(map(int, bob.my_basises))) |
||||
bob_correctness = [left + right == 1 for left, right in bob.my_results] |
||||
print(' ', *[ |
||||
int(k) if c and b1 == b2 else ' ' |
||||
for c, k, b1, b2 in zip(bob_correctness, alice.sent, bob.my_basises, alice.my_basises)]) |
||||
print(' ', *[ |
||||
{(0, 1): 1, (1, 0): 0, (0, 0): 2, (1, 1): 3}[(int(k[0]), int(k[1]))] if c and b1 == b2 else ' ' |
||||
for c, k, b1, b2 in zip(bob_correctness, bob.my_results, bob.my_basises, alice.my_basises)]) |
||||
print(f'{100 * sum(k1 == k2 for k1, k2 in zip(alice.key, bob.key)) / len(alice.key):.2f}%, key length: {len(alice.key)}') |
Loading…
Reference in new issue