From fa49945652052b103c8f5d9bf45c123b41445a01 Mon Sep 17 00:00:00 2001 From: ennucore Date: Mon, 6 Dec 2021 15:39:58 +0300 Subject: [PATCH] Wrote some logic in TestInterface, fixed some warnings --- src/interface.rs | 7 +-- src/interfaces/ip.rs | 104 ++++++++++++++++++++++++------------------- src/ironforce.rs | 59 +++++++++++++++++++++--- src/transport.rs | 4 +- src/tunnel.rs | 29 ++++++++---- 5 files changed, 138 insertions(+), 65 deletions(-) diff --git a/src/interface.rs b/src/interface.rs index 4a38a57..6dd7572 100644 --- a/src/interface.rs +++ b/src/interface.rs @@ -96,7 +96,7 @@ pub mod test_interface { let mut storage_locked = self.storage.lock(); while let Some(i) = storage_locked .iter() - .position(|msg| msg.1 == self.this_peer_id) + .position(|msg| msg.1 == self.this_peer_id || msg.1.is_empty()) { self.messages.push(storage_locked.remove(i)); } @@ -108,10 +108,11 @@ pub mod test_interface { } fn send(&mut self, message: &[u8], target: Option) -> IFResult<()> { - Ok(self + self .storage .lock() - .push((Vec::from(message), target.unwrap_or_default()))) + .push((Vec::from(message), target.unwrap_or_default())); + Ok(()) } fn receive(&mut self) -> IFResult> { diff --git a/src/interfaces/ip.rs b/src/interfaces/ip.rs index d68b791..990c310 100644 --- a/src/interfaces/ip.rs +++ b/src/interfaces/ip.rs @@ -1,16 +1,17 @@ use alloc::borrow::ToOwned; -use std::net; -use std::thread; +use alloc::string::{String, ToString}; use alloc::vec; use alloc::vec::Vec; -use alloc::string::{String, ToString}; use core::ops::RangeInclusive; use core::time::Duration; +use std::net; +#[cfg(test)] +use std::thread; use crate::interface::{Interface, InterfaceRequirements, TargetingData}; use crate::message::MessageBytes; -use crate::res::{IFError, IFResult}; use crate::res::IFError::General; +use crate::res::{IFError, IFResult}; use crate::std::io::{Read, Write}; use crate::std::println; @@ -22,7 +23,7 @@ pub struct IPInterface { connections: Vec, listener: net::TcpListener, peers: Vec, - package_queue: Vec + package_queue: Vec, } #[derive(Debug)] @@ -46,7 +47,7 @@ impl MessageType { 0 => Ok(MessageType::Service), 1 => Ok(MessageType::PeerRequest), 2 => Ok(MessageType::Common), - _ => Err(IFError::General("Incorrect message type".to_string())) + _ => Err(IFError::General("Incorrect message type".to_string())), } } @@ -54,7 +55,7 @@ impl MessageType { match self { MessageType::Service => 0, MessageType::PeerRequest => 1, - MessageType::Common => 2 + MessageType::Common => 2, } } } @@ -86,7 +87,12 @@ impl Interface for IPInterface { let mut message: Vec = vec![]; message_take.read_to_end(&mut message)?; - let package = IPPackage {version, package_type, size, message}; + let package = IPPackage { + version, + package_type, + size, + message, + }; self.package_queue.push(package); } @@ -102,43 +108,53 @@ impl Interface for IPInterface { fn send(&mut self, message: &[u8], interface_data: Option) -> IFResult<()> { let addr: net::SocketAddr = match interface_data { Some(ip_string) => ip_string.parse().expect("Unable to parse address"), - None => return Err(IFError::General(String::from("Not enough info to create connections"))) + None => { + return Err(IFError::General(String::from( + "Not enough info to create connections", + ))) + } }; println!("Connecting to {:?}", addr); - - let index = match self.connections.iter().position(|connection| connection.peer_addr().ok() == Some(addr)) { + let index = match self + .connections + .iter() + .position(|connection| connection.peer_addr().ok() == Some(addr)) + { None => { - let connection = match net::TcpStream::connect_timeout( - &addr, - Duration::new(1, 0), - ) { + let connection = match net::TcpStream::connect_timeout(&addr, Duration::new(1, 0)) { Ok(connection) => connection, - Err(_) => return Err(General(String::from("Can't connect to this port"))) + Err(_) => return Err(General(String::from("Can't connect to this port"))), }; self.connections.push(connection); self.connections.len() - 1 } - Some(i) => i + Some(i) => i, }; - println!("Sending message to {:?}", self.connections[index].peer_addr().unwrap()); + println!( + "Sending message to {:?}", + self.connections[index].peer_addr().unwrap() + ); self.connections[index].set_write_timeout(Some(Duration::new(1, 0)))?; - IPInterface::send_package(&mut self.connections[index], IPPackage { - version: 0, - package_type: MessageType::Common, - size: message.len() as u32, - message: Vec::from(message), - })?; + IPInterface::send_package( + &mut self.connections[index], + IPPackage { + version: 0, + package_type: MessageType::Common, + size: message.len() as u32, + message: Vec::from(message), + }, + )?; println!("Sent message"); Ok(()) } fn receive(&mut self) -> IFResult> { match self.package_queue.pop() { - Some(ip_package) => Ok(Some( (ip_package.message, "".to_string()) )), - None => Ok(None) + Some(ip_package) => Ok(Some((ip_package.message, "".to_string()))), + None => Ok(None), } } } @@ -148,7 +164,9 @@ impl IPInterface { let listener = match create_tcp_listener() { Some(listener) => listener, None => { - return Err(IFError::General(String::from("Unable to open TCP listener"))); + return Err(IFError::General(String::from( + "Unable to open TCP listener", + ))); } }; @@ -156,10 +174,10 @@ impl IPInterface { Ok(IPInterface { id: String::from("IP Interface"), - connections: vec!(), + connections: vec![], listener, - peers: vec!(), - package_queue: vec!() + peers: vec![], + package_queue: vec![], }) } pub fn dump(&self) -> IFResult> { @@ -173,9 +191,7 @@ impl IPInterface { } fn send_package(stream: &mut net::TcpStream, package: IPPackage) -> IFResult<()> { - let mut header: Vec = vec![]; - header.push(package.version); - header.push(package.package_type.to_u8()); + let mut header: Vec = vec![package.version, package.package_type.to_u8()]; for byte in size_to_bytes(package.size) { header.push(byte); } @@ -186,14 +202,13 @@ impl IPInterface { } } - fn create_tcp_listener() -> Option { for port in SOCKET_RANGE { match net::TcpListener::bind("127.0.0.1:".to_owned() + &port.to_string()) { Ok(listener) => return Some(listener), Err(_e) => {} } - }; + } None } @@ -206,7 +221,6 @@ fn parse_header(data: Vec) -> IFResult { }) } - fn size_to_bytes(mut a: u32) -> [u8; 4] { let mut arr: [u8; 4] = [0, 0, 0, 0]; for i in [3, 2, 1, 0] { @@ -232,7 +246,9 @@ fn test_creating_connection() -> IFResult<()> { let mut interface2 = IPInterface::new()?; let t1 = std::thread::spawn(move || { - interface1.send(&message, Some(String::from("127.0.0.1:50001"))).unwrap(); + interface1 + .send(&message, Some(String::from("127.0.0.1:50001"))) + .unwrap(); interface1 }); thread::sleep(Duration::from_millis(10)); @@ -244,9 +260,8 @@ fn test_creating_connection() -> IFResult<()> { match res1 { Ok(_res) => { println!("Thread Ok"); - } - Err(e) => println!("{:?}", e) + Err(e) => println!("{:?}", e), } let res2 = t2.join(); match res2 { @@ -255,18 +270,15 @@ fn test_creating_connection() -> IFResult<()> { match res.receive() { Ok(tmp) => match tmp { Some((message, _metadata)) => println!("Received {:?}", message), - None => println!("None") - } - Err(e) => println!("{:?}", e) + None => println!("None"), + }, + Err(e) => println!("{:?}", e), } } - Err(e) => println!("{:?}", e) + Err(e) => println!("{:?}", e), } Ok(()) } #[cfg(test)] pub mod test_ip_interface {} - - - diff --git a/src/ironforce.rs b/src/ironforce.rs index 34123b4..8c9a44c 100644 --- a/src/ironforce.rs +++ b/src/ironforce.rs @@ -2,7 +2,7 @@ use crate::crypto::PublicKey; use crate::message::{Message, MessageType, ServiceMessageType}; use crate::res::{IFError, IFResult}; use crate::transport::Transport; -use crate::tunnel::Tunnel; +use crate::tunnel::{Tunnel, TunnelPublic}; use alloc::vec; use alloc::vec::Vec; @@ -18,6 +18,12 @@ pub struct IronForce { additional_modules: Vec<()>, /// Non-service messages to give outside messages: Vec, + /// Tunnels that has not been confirmed yet (no backward spread) + /// + /// `[(Tunnel, Optional target node)]` + tunnels_pending: Vec<(TunnelPublic, Option /* target node */)>, + /// True if this instance has background thread + has_background_worker: bool, } impl IronForce { @@ -28,17 +34,26 @@ impl IronForce { tunnels: vec![], additional_modules: vec![], messages: vec![], + tunnels_pending: vec![], + has_background_worker: false, } } /// Create a new tunnel to another node - fn create_new_tunnel(&mut self, _destination: PublicKey) -> IFResult { - todo!() + fn initialize_tunnel_creation(&mut self, destination: PublicKey) -> IFResult<()> { + let tunnel = TunnelPublic::new_singlecast(); + self.tunnels_pending.push((tunnel.clone(), Some(destination))); + let message = Message::build() + .message_type(MessageType::Service(ServiceMessageType::TunnelBuilding( + tunnel + ))).build()?; + self.send_to_all(message)?; + Ok(()) } /// Send a multicast or broadcast message - fn send_to_all(&mut self, _message: Message) -> IFResult<()> { - todo!() + fn send_to_all(&mut self, message: Message) -> IFResult<()> { + self.transport.send_message(serde_cbor::to_vec(&message)?, None) } /// Send a message through tunnel @@ -74,7 +89,7 @@ impl IronForce { /// Process a message: if it's a service message, act accordingly. /// Otherwise, add to `self.messages` - fn process_message(&mut self, message: Message) { + fn process_message(&mut self, message: Message, _inc_peer: u64) -> IFResult<()> { match message.message_type { MessageType::Service(msg_type) => match msg_type { ServiceMessageType::TunnelBuilding(_tunnel) => { @@ -83,10 +98,42 @@ impl IronForce { }, MessageType::SingleCast | MessageType::Broadcast => self.messages.push(message), } + Ok(()) } /// Get a message from `self.messages` pub fn read_message(&mut self) -> Option { self.messages.pop() } + + pub fn main_loop_iteration(&mut self) -> IFResult<()> { + self.transport.main_loop_iteration()?; + while let Some((msg, inc_peer)) = self.transport.receive() { + self.process_message(serde_cbor::from_slice(msg.as_slice())?, inc_peer)? + } + Ok(()) + } +} + +#[cfg(test)] +mod if_testing { + use crate::ironforce::IronForce; + use alloc::vec::Vec; + use alloc::vec; + use alloc::boxed::Box; + use crate::interface::test_interface::create_test_interfaces; + use crate::transport::Transport; + + fn create_test_network() -> Vec { + let interfaces = create_test_interfaces(5); + let transports = interfaces.into_iter().map(|interface| Transport::new(vec![Box::new(interface)])); + transports.map(|tr| IronForce { + transport: tr, + tunnels: vec![], + additional_modules: vec![], + messages: vec![], + tunnels_pending: vec![], + has_background_worker: false + }).collect() + } } diff --git a/src/transport.rs b/src/transport.rs index 53ac106..5ad7d84 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -168,7 +168,7 @@ fn test_adding_peer_to_transport() { fn test_transport_sending() { let mut transport = Transport::new(vec![Box::new(SimpleTestInterface::default())]); let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); - let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data.clone()); + let peer_id = transport.find_or_add_peer(interface_id, interface_targeting_data.clone()); transport.send_message(vec![239, 123], None).unwrap(); assert_eq!(transport.interfaces[0].receive().unwrap(), Some((vec![239u8, 123], "".to_string()))); assert!(transport.interfaces[0].receive() == IFResult::Ok(None)); @@ -180,7 +180,7 @@ fn test_transport_sending() { fn test_transport_receiving() { let mut transport = Transport::new(vec![Box::new(SimpleTestInterface::default())]); let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); - let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data.clone()); + let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data); transport.send_message(vec![239, 123], None).unwrap(); assert_eq!(transport.receive(), Some((vec![239u8, 123], transport.get_peer_by_parameters(interface_id.as_str(), "").unwrap().peer_id))); assert!(transport.receive().is_none()); diff --git a/src/tunnel.rs b/src/tunnel.rs index ed179b8..e322ffc 100644 --- a/src/tunnel.rs +++ b/src/tunnel.rs @@ -2,6 +2,7 @@ use alloc::vec::Vec; use crate::crypto::PublicKey; use serde::{Serialize, Deserialize}; use sha2::Digest; +use alloc::vec; /// A tunnel that is used for communication #[derive(Serialize, Clone, Deserialize)] @@ -27,16 +28,29 @@ pub struct Tunnel { #[derive(Serialize, Clone, Deserialize)] pub struct TunnelPublic { /// Tunnel's id - id: Option, + pub id: Option, /// Ids, each of them is just for local storage on each node until a final global id is created - local_ids: Vec, + pub local_ids: Vec, /// Time at which this tunnel should be destroyed (UNIX epoch) - ttd: u64, + pub ttd: u64, /// Public keys of nodes in the tunnel nodes_in_tunnel: Option>, + /// Is this tunnel used for multicast? + pub is_multicast: bool, } impl TunnelPublic { + pub fn new_singlecast() -> Self { + Self { + id: None, + local_ids: vec![rand::random()], + ttd: 0, + nodes_in_tunnel: None, + is_multicast: false, + } + } + + /// Get the hash of the tunnel for verification pub fn hash(&self) -> Vec { sha2::Sha224::new() .chain(serde_cbor::to_vec(self).unwrap().as_slice()) @@ -49,14 +63,12 @@ impl TunnelPublic { id: Some(56), local_ids: vec![5, 500, 120], ttd: 56, - nodes_in_tunnel: Some(vec![crate::crypto::Keys::generate().get_public()]) + nodes_in_tunnel: Some(vec![crate::crypto::Keys::generate().get_public()]), + is_multicast: true, } } } -#[cfg(test)] -use alloc::vec; - #[test] fn test_tunnel_hashing() { let tun = TunnelPublic::new_for_test(); @@ -65,6 +77,7 @@ fn test_tunnel_hashing() { id: Some(56), local_ids: vec![5, 500, 120], ttd: 56, - nodes_in_tunnel: Some(vec![crate::crypto::Keys::generate().get_public()]) + nodes_in_tunnel: Some(vec![crate::crypto::Keys::generate().get_public()]), + is_multicast: false, }.hash()); }