From 6c69c80d7a08dd03f7f2b1ee428eced5fc5210cd Mon Sep 17 00:00:00 2001 From: ennucore Date: Fri, 10 Dec 2021 22:46:21 +0300 Subject: [PATCH] Tunnel generation (it's even working) --- src/interface.rs | 2 + src/interfaces/ip.rs | 24 ++++--- src/ironforce.rs | 155 ++++++++++++++++++++++++++++++++++++------- src/message.rs | 42 ++++++++---- src/tunnel.rs | 4 +- 5 files changed, 175 insertions(+), 52 deletions(-) diff --git a/src/interface.rs b/src/interface.rs index 6dd7572..405ecdc 100644 --- a/src/interface.rs +++ b/src/interface.rs @@ -53,6 +53,7 @@ pub mod test_interface { use alloc::vec; use alloc::string::ToString; use alloc::sync::Arc; + use std::println; use spin::Mutex; #[derive(Default)] @@ -112,6 +113,7 @@ pub mod test_interface { .storage .lock() .push((Vec::from(message), target.unwrap_or_default())); + println!("Sending in testinterface. Storage: {:?}", self.storage.lock()); Ok(()) } diff --git a/src/interfaces/ip.rs b/src/interfaces/ip.rs index b5532aa..a3cea9b 100644 --- a/src/interfaces/ip.rs +++ b/src/interfaces/ip.rs @@ -22,12 +22,14 @@ const SOCKET_RANGE: RangeInclusive = 50000..=50010; /// The threshold for the number of peers below which we are desperate const PEER_THRESHOLD: usize = 70; +type Peer = (net::IpAddr, u16); + /// Interface for interactions using tcp sockets pub struct IPInterface { id: String, connections: Vec, listener: net::TcpListener, - peers: Vec, + peers: Vec, package_queue: Vec, } @@ -75,8 +77,8 @@ impl Interface for IPInterface { match self.listener.accept() { Ok((stream, addr)) => { println!("New client: {:?}", addr); - if !self.peers.contains(&addr.ip()) { - self.peers.push(addr.ip()); + if self.peers.iter().all(|(ip, _)| *ip != addr.ip()) { + self.peers.push((addr.ip(), addr.port())); } self.connections.push(stream) } @@ -124,7 +126,7 @@ impl Interface for IPInterface { self.package_queue.push(package); } MessageType::PeersShared => { - let peers: Vec = serde_cbor::from_slice(message.as_slice())?; + let peers: Vec = serde_cbor::from_slice(message.as_slice())?; for peer in peers { if !self.peers.contains(&peer) { if let Some(conn) = IPInterface::new_connection(&peer)? { @@ -205,7 +207,7 @@ impl Interface for IPInterface { } impl IPInterface { - pub fn new(peers: Vec) -> IFResult { + pub fn new(peers: Vec) -> IFResult { let listener = match create_tcp_listener() { Some(listener) => listener, None => { @@ -242,7 +244,7 @@ impl IPInterface { } pub fn load(&mut self, data: Vec) -> IFResult<()> { - let peers: Vec = serde_cbor::from_slice(&data)?; + let peers: Vec = serde_cbor::from_slice(&data)?; self.peers = peers; Ok(()) } @@ -278,11 +280,11 @@ impl IPInterface { Ok(()) } - fn obtain_connection(&mut self, addr: &net::IpAddr) -> IFResult<()> { + fn obtain_connection(&mut self, addr: &Peer) -> IFResult<()> { if self .connections .iter() - .any(|con| con.peer_addr().is_ok() && &con.peer_addr().unwrap().ip() == addr) + .any(|con| con.peer_addr().is_ok() && con.peer_addr().unwrap().ip() == addr.0) { return Ok(()); } @@ -292,10 +294,10 @@ impl IPInterface { Ok(()) } - fn new_connection(addr: &net::IpAddr) -> IFResult> { - for port in SOCKET_RANGE { + fn new_connection(addr: &Peer) -> IFResult> { + for port in addr.1 - 5..addr.1 + 5 { match net::TcpStream::connect_timeout( - &net::SocketAddr::new(*addr, port as u16), + &net::SocketAddr::new(addr.0, port as u16), Duration::from_millis(500), ) { Ok(connection) => { diff --git a/src/ironforce.rs b/src/ironforce.rs index 8c9a44c..d2042fd 100644 --- a/src/ironforce.rs +++ b/src/ironforce.rs @@ -1,4 +1,4 @@ -use crate::crypto::PublicKey; +use crate::crypto::{Keys, PublicKey}; use crate::message::{Message, MessageType, ServiceMessageType}; use crate::res::{IFError, IFResult}; use crate::transport::Transport; @@ -8,6 +8,8 @@ use alloc::vec::Vec; /// Main worker pub struct IronForce { + /// Keys for this instance + keys: Keys, /// the struct that manages communicating with neighbor nodes transport: Transport, /// Tunnels that are known to this node @@ -20,40 +22,52 @@ pub struct IronForce { messages: Vec, /// Tunnels that has not been confirmed yet (no backward spread) /// - /// `[(Tunnel, Optional target node)]` - tunnels_pending: Vec<(TunnelPublic, Option /* target node */)>, + /// `[(Tunnel, Optional target node, local peer ids)]` + tunnels_pending: Vec<(TunnelPublic, Option /* target node */, (u64, u64) /* local peer ids */)>, /// True if this instance has background thread has_background_worker: bool, + /// Messages that were already processed (stored to avoid "echo chambers") + processed_messages: Vec, } impl IronForce { /// Create new worker pub fn new() -> Self { Self { + keys: Keys::generate(), transport: Transport::new(crate::interfaces::get_interfaces()), tunnels: vec![], additional_modules: vec![], messages: vec![], tunnels_pending: vec![], has_background_worker: false, + processed_messages: vec![], } } /// Create a new tunnel to another node fn initialize_tunnel_creation(&mut self, destination: PublicKey) -> IFResult<()> { let tunnel = TunnelPublic::new_singlecast(); - self.tunnels_pending.push((tunnel.clone(), Some(destination))); + self.tunnels_pending + .push((tunnel.clone(), Some(destination.clone()), (0, 0))); let message = Message::build() - .message_type(MessageType::Service(ServiceMessageType::TunnelBuilding( - tunnel - ))).build()?; + .message_type(MessageType::Service( + ServiceMessageType::TunnelBuildingForwardMovement( + tunnel, + destination.encrypt_data(&self.keys.get_public().to_vec())?, + ), + )) + .recipient(destination) + .sign(&self.keys) + .build()?; self.send_to_all(message)?; Ok(()) } /// Send a multicast or broadcast message fn send_to_all(&mut self, message: Message) -> IFResult<()> { - self.transport.send_message(serde_cbor::to_vec(&message)?, None) + self.transport + .send_message(serde_cbor::to_vec(&message)?, None) } /// Send a message through tunnel @@ -89,11 +103,81 @@ impl IronForce { /// Process a message: if it's a service message, act accordingly. /// Otherwise, add to `self.messages` - fn process_message(&mut self, message: Message, _inc_peer: u64) -> IFResult<()> { - match message.message_type { + fn process_message(&mut self, message: Message, inc_peer: u64) -> IFResult<()> { + if self.processed_messages.contains(&message.message_id) { + return Ok(()); + } + self.processed_messages.push(message.message_id); + match &message.message_type { MessageType::Service(msg_type) => match msg_type { - ServiceMessageType::TunnelBuilding(_tunnel) => { - todo!() + ServiceMessageType::TunnelBuildingForwardMovement(tunnel, sender_enc) => { + if message.check_recipient(&self.keys) { + let mut tunnel_pub = tunnel.clone(); + tunnel_pub.id = Some(rand::random()); + let sender = PublicKey::from_vec(self.keys.decrypt_data(sender_enc)?)?; + let tunnel = Tunnel { + id: tunnel_pub.id, + local_ids: tunnel_pub.local_ids.clone(), + peer_ids: (inc_peer, 0), + ttd: 0, + nodes_in_tunnel: None, + is_multicast: false, + target_node: Some(sender.clone()), + }; + self.tunnels.push(tunnel); + self.send_to_all( + Message::build() + .message_type(MessageType::Service( + ServiceMessageType::TunnelBuildingBackwardMovement(tunnel_pub.clone()), + )) + .tunnel(tunnel_pub.id.unwrap()) + .sign(&self.keys) + .build()? + )?; + } else { + let mut tunnel = tunnel.clone(); + tunnel.local_ids.push(rand::random()); + self.tunnels_pending.push((tunnel.clone(), None, (inc_peer, 0))); + self.send_to_all(message)?; + } + } + ServiceMessageType::TunnelBuildingBackwardMovement(tunnel_p) => { + match self + .tunnels_pending + .iter() + .find(|tun| tunnel_p.local_ids.contains(tun.0.local_ids.last().unwrap())) + { + // This doesn't concern us + None => {} + // This is a tunnel initialization proposed by us (and we got it back, yay) + Some((_, Some(target), peers)) => { + let tunnel = Tunnel { + id: tunnel_p.id, + local_ids: tunnel_p.local_ids.clone(), + peer_ids: (peers.0, inc_peer), + ttd: 0, + nodes_in_tunnel: None, + is_multicast: false, + target_node: Some(target.clone()), + }; + self.tunnels.push(tunnel); + // Send some initialization message or something + } + // This is a tunnel initialization proposed by someone else that has passed through us on its forward movement + Some((_, None, peers)) => { + let tunnel = Tunnel { + id: tunnel_p.id, + local_ids: tunnel_p.local_ids.clone(), + peer_ids: (peers.0, inc_peer), + ttd: 0, + nodes_in_tunnel: None, + is_multicast: false, + target_node: None + }; + self.tunnels.push(tunnel); + self.transport.send_message(serde_cbor::to_vec(&message)?, Some(peers.0))?; + } + } } }, MessageType::SingleCast | MessageType::Broadcast => self.messages.push(message), @@ -117,23 +201,44 @@ impl IronForce { #[cfg(test)] mod if_testing { - use crate::ironforce::IronForce; - use alloc::vec::Vec; - use alloc::vec; - use alloc::boxed::Box; + use crate::crypto::Keys; use crate::interface::test_interface::create_test_interfaces; + use crate::ironforce::IronForce; + use crate::res::IFResult; use crate::transport::Transport; + use alloc::boxed::Box; + use alloc::vec; + use alloc::vec::Vec; fn create_test_network() -> Vec { let interfaces = create_test_interfaces(5); - let transports = interfaces.into_iter().map(|interface| Transport::new(vec![Box::new(interface)])); - transports.map(|tr| IronForce { - transport: tr, - tunnels: vec![], - additional_modules: vec![], - messages: vec![], - tunnels_pending: vec![], - has_background_worker: false - }).collect() + let transports = interfaces + .into_iter() + .map(|interface| Transport::new(vec![Box::new(interface)])); + transports + .map(|tr| IronForce { + keys: Keys::generate(), + transport: tr, + tunnels: vec![], + additional_modules: vec![], + messages: vec![], + tunnels_pending: vec![], + has_background_worker: false, + processed_messages: vec![], + }) + .collect() + } + + #[test] + fn test_creating_a_tunnel() -> IFResult<()> { + let mut network = create_test_network(); + let key_1 = network[1].keys.get_public(); + network[0].initialize_tunnel_creation(key_1)?; + network[0].main_loop_iteration()?; + network[1].main_loop_iteration()?; + network[0].main_loop_iteration()?; + assert!(!network[0].tunnels.is_empty()); + // println!("T: {:?}", network[0].tunnels); + Ok(()) } } diff --git a/src/message.rs b/src/message.rs index 53ccd9e..312c73f 100644 --- a/src/message.rs +++ b/src/message.rs @@ -10,7 +10,7 @@ use sha2::Digest; pub(crate) type MessageBytes = Vec; /// Signature of the message: optional and optionally encrypted sender's key and signed hash -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub enum Signature { /// The message is signed. Author is unknown NotSigned, @@ -40,7 +40,7 @@ impl Signature { } /// Network name and version -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct NetworkInfo { network_name: String, version: String, @@ -55,7 +55,7 @@ impl Default for NetworkInfo { } } -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub enum MessageType { SingleCast, Broadcast, @@ -67,19 +67,32 @@ impl MessageType { match self { MessageType::SingleCast => Vec::from([0]), MessageType::Broadcast => Vec::from([1]), - MessageType::Service(ServiceMessageType::TunnelBuilding(tunnel)) => { - [2, 0].iter().chain(tunnel.hash().iter()).copied().collect() + MessageType::Service(ServiceMessageType::TunnelBuildingForwardMovement( + tunnel, + sender_enc, + )) => [2, 0] + .iter() + .chain(tunnel.hash().iter()) + .chain(sender_enc) + .copied() + .collect(), + MessageType::Service(ServiceMessageType::TunnelBuildingBackwardMovement(tunnel)) => { + [3, 0].iter().chain(tunnel.hash().iter()).copied().collect() } } } } -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub enum ServiceMessageType { - TunnelBuilding(TunnelPublic), + /// Creating a tunnel - stage 1 + /// + /// (tunnel to be created, sending node encrypted for the recipient) + TunnelBuildingForwardMovement(TunnelPublic, Vec), + TunnelBuildingBackwardMovement(TunnelPublic), } -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub enum MessageContent { /// Just plaintext message content Plain(Vec), @@ -108,7 +121,7 @@ impl MessageContent { } /// The struct for messages that are sent in the network -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct Message { /// Content of the message (not to be confused with the bytes that we are sending through interfaces) /// @@ -119,7 +132,7 @@ pub struct Message { /// Sender's signature pub signature: Signature, /// A random number that is used in hash together with the content - salt: u64, + pub message_id: u64, /// Hash of message content and the salt hash: Vec, /// Optional: hash of the message encrypted for the recipient, so that the recipient can know that this message is for them, but nobody else @@ -166,8 +179,8 @@ impl Message { } /// Check if this message is for this set of keys - pub fn check_recipient(&self, keys: Keys) -> bool { - keys.decrypt_data(&self.recipient_verification.clone().unwrap_or_default()) + pub fn check_recipient(&self, keys: &Keys) -> bool { + keys.decrypt_data(&self.recipient_verification.clone().unwrap()) .is_ok() } @@ -327,7 +340,7 @@ impl MessageBuilder { content: self.content, message_type: self.message_type.unwrap(), signature, - salt, + message_id: salt, hash, recipient_verification, tunnel_id: self.tunnel_id, @@ -342,8 +355,9 @@ use alloc::vec; #[test] fn test_hashing_message_type() { let msg_type_1 = MessageType::Broadcast; - let msg_type_2 = MessageType::Service(ServiceMessageType::TunnelBuilding( + let msg_type_2 = MessageType::Service(ServiceMessageType::TunnelBuildingForwardMovement( TunnelPublic::new_for_test(), + vec![1, 2, 3], )); assert_eq!(msg_type_1.hash(), msg_type_1.hash()); assert_eq!(msg_type_2.hash(), msg_type_2.hash()); diff --git a/src/tunnel.rs b/src/tunnel.rs index e322ffc..07d52f8 100644 --- a/src/tunnel.rs +++ b/src/tunnel.rs @@ -5,7 +5,7 @@ use sha2::Digest; use alloc::vec; /// A tunnel that is used for communication -#[derive(Serialize, Clone, Deserialize)] +#[derive(Serialize, Clone, Deserialize, Debug)] pub struct Tunnel { /// Tunnel's id. /// By the way, this id is `None` until the tunnel is validated in the backward movement @@ -25,7 +25,7 @@ pub struct Tunnel { } /// Tunnel, but only the fields that are ok to share -#[derive(Serialize, Clone, Deserialize)] +#[derive(Serialize, Clone, Deserialize, Debug)] pub struct TunnelPublic { /// Tunnel's id pub id: Option,