From 2574061bed990981338da28a4e575c54a0a022a9 Mon Sep 17 00:00:00 2001 From: ennucore Date: Sat, 11 Dec 2021 18:54:36 +0300 Subject: [PATCH] Serialization + main loop starter --- Cargo.lock | 24 +++++++ Cargo.toml | 1 + src/interface.rs | 54 ++++++++++++--- src/interfaces/ip.rs | 65 +++++++++++++++-- src/interfaces/mod.rs | 28 +++++++- src/ironforce.rs | 123 ++++++++++++++++++++++++++++++--- src/lib.rs | 1 + src/res.rs | 6 ++ src/transport.rs | 157 +++++++++++++++++++++++++++++++----------- 9 files changed, 392 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 22c61f0..774888e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -236,10 +236,17 @@ dependencies = [ "rsa", "serde", "serde_cbor", + "serde_json", "sha2", "spin 0.9.2", ] +[[package]] +name = "itoa" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" + [[package]] name = "lazy_static" version = "1.4.0" @@ -507,6 +514,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "ryu" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "254df5081ce98661a883445175e52efe99d1cb2a5552891d965d2f5d0cad1c16" + [[package]] name = "scopeguard" version = "1.1.0" @@ -543,6 +556,17 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_json" +version = "1.0.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0ffa0837f2dfa6fb90868c2b5468cad482e175f7dad97e7421951e663f2b527" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha2" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index 5b67582..395ed48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ serde = { version = "1.0", features = ["derive", "alloc"], default-features = fa rayon = { version = "1.5.1", optional = true } core-error = "0.0.1-rc4" serde_cbor = "0.11.2" +serde_json = "1.0.72" spin = "0.9.2" [profile.dev.package.num-bigint-dig] diff --git a/src/interface.rs b/src/interface.rs index 6dd7572..f797781 100644 --- a/src/interface.rs +++ b/src/interface.rs @@ -24,12 +24,15 @@ pub trait Interface: InterfaceRequirements { /// For systems that don't support concurrency, there can be only one interface in this function waits for a message (to avoid blocking). /// That's why it's necessary to check if it is the case for this interface, and it's done using function `Interface::has_blocking_main()` fn main_loop_iteration(&mut self) -> IFResult<()>; + /// Check if `main_loop_iteration` stops execution and waits for a message fn has_blocking_main(&self) -> bool { false // hopefully... } + /// Get some way of identification for this interface fn id(&self) -> &str; + /// Send a message. If no `interface_data` is provided, we should consider it to be a broadcast. /// If, on the other hand, `interface_data` is not `None`, it should be used to send the message to the target. fn send( @@ -37,10 +40,17 @@ pub trait Interface: InterfaceRequirements { message: &[u8], /*MessageBytes*/ interface_data: Option, ) -> IFResult<()>; + /// Receive a message through this interface. Returns a result with an option of (message bytes, target). /// `None` means there is no message available at the time. /// The implementations of this function shouldn't wait for new messages, but fn receive(&mut self) -> IFResult>; + + /// Dump the interface to string + fn get_dump_data(&self) -> String; + + /// Create the interface from dumped data + fn from_dump(data: String) -> IFResult where Self: Sized; } #[cfg(test)] @@ -49,10 +59,10 @@ pub mod test_interface { use crate::message::MessageBytes; use crate::res::IFResult; use alloc::string::String; - use alloc::vec::Vec; - use alloc::vec; use alloc::string::ToString; use alloc::sync::Arc; + use alloc::vec; + use alloc::vec::Vec; use spin::Mutex; #[derive(Default)] @@ -80,6 +90,14 @@ pub mod test_interface { fn receive(&mut self) -> IFResult> { Ok(self.messages.pop()) } + + fn get_dump_data(&self) -> String { + "".to_string() + } + + fn from_dump(_data: String) -> IFResult { + Ok(Default::default()) + } } pub type Storage = Vec<(Vec, TargetingData)>; @@ -108,8 +126,7 @@ pub mod test_interface { } fn send(&mut self, message: &[u8], target: Option) -> IFResult<()> { - self - .storage + self.storage .lock() .push((Vec::from(message), target.unwrap_or_default())); Ok(()) @@ -118,17 +135,31 @@ pub mod test_interface { fn receive(&mut self) -> IFResult> { Ok(self.messages.pop()) } + + fn get_dump_data(&self) -> String { + "".to_string() + } + + fn from_dump(_data: String) -> IFResult { + Ok(TestInterface { + this_peer_id: "".to_string(), + storage: Arc::new(Default::default()), + messages: vec![], + }) + } } impl InterfaceRequirements for TestInterface {} pub fn create_test_interfaces(n: usize) -> Vec { let storage_mutex = Arc::new(Mutex::new(vec![])); - (0..n).map(|i| TestInterface { - this_peer_id: i.to_string(), - storage: storage_mutex.clone(), - messages: vec![], - }).collect() + (0..n) + .map(|i| TestInterface { + this_peer_id: i.to_string(), + storage: storage_mutex.clone(), + messages: vec![], + }) + .collect() } #[test] @@ -136,6 +167,9 @@ pub mod test_interface { let mut interfaces = create_test_interfaces(2); interfaces[0].send(b"123", Some("1".to_string())).unwrap(); interfaces[1].main_loop_iteration().unwrap(); - assert_eq!(interfaces[1].receive().unwrap().unwrap().0.as_slice(), b"123"); + assert_eq!( + interfaces[1].receive().unwrap().unwrap().0.as_slice(), + b"123" + ); } } diff --git a/src/interfaces/ip.rs b/src/interfaces/ip.rs index 6c61734..c196d65 100644 --- a/src/interfaces/ip.rs +++ b/src/interfaces/ip.rs @@ -7,6 +7,7 @@ use core::ops::RangeInclusive; use core::str::FromStr; use core::time::Duration; use rayon::prelude::*; +use serde::{Deserialize, Serialize}; use std::net::TcpStream; use std::{format, net}; @@ -16,6 +17,7 @@ use crate::res::{IFError, IFResult}; use crate::std::io::{Read, Write}; use crate::std::println; +pub const DEFAULT_PORT: u16 = 50000; const SOCKET_RANGE: RangeInclusive = 50000..=50010; /// The threshold for the number of peers below which we are desperate @@ -33,6 +35,13 @@ pub struct IPInterface { main_loop_iterations: u64, } +/// Data for the serialization of `IPInterface` +#[derive(Serialize, Deserialize)] +pub struct SerData { + pub peers: Vec, + pub port: u16, +} + #[derive(Debug, Clone)] struct IPPackage { version: u8, @@ -67,7 +76,6 @@ impl MessageType { } } - fn compare_addrs(peer: &Peer, addr: net::SocketAddr) -> bool { addr.ip() == peer.0 && addr.port() == peer.1 } @@ -171,13 +179,31 @@ impl Interface for IPInterface { self.main_loop_iterations += 1; // Every 50 iterations we connect to everyone we know if self.main_loop_iterations % 50 == 0 { - let connected_addresses = self.connections.iter().filter_map(|conn| conn.peer_addr().ok()).collect::>(); - let peers_we_do_not_have_connections_with = self.peers.iter().filter(|p| !connected_addresses.iter().any(|addr| compare_addrs(p, *addr))).copied().collect::>(); - self.connections.extend(IPInterface::get_connections_to_peers(&peers_we_do_not_have_connections_with, self.peers.len() < PEER_THRESHOLD * 2)); + let connected_addresses = self + .connections + .iter() + .filter_map(|conn| conn.peer_addr().ok()) + .collect::>(); + let peers_we_do_not_have_connections_with = self + .peers + .iter() + .filter(|p| { + !connected_addresses + .iter() + .any(|addr| compare_addrs(p, *addr)) + }) + .copied() + .collect::>(); + self.connections + .extend(IPInterface::get_connections_to_peers( + &peers_we_do_not_have_connections_with, + self.peers.len() < PEER_THRESHOLD * 2, + )); } // We do a peer exchange every 30 iterations if self.main_loop_iterations % 30 == 0 { - let connection_index = (self.main_loop_iterations / 30) as usize % self.connections.len(); + let connection_index = + (self.main_loop_iterations / 30) as usize % self.connections.len(); IPInterface::request_peers(&mut self.connections[connection_index])?; } Ok(()) @@ -238,6 +264,35 @@ impl Interface for IPInterface { None => Ok(None), } } + + fn get_dump_data(&self) -> String { + let data = SerData { + peers: self.peers.clone(), + port: self.listener.local_addr().unwrap().port(), + }; + serde_json::to_string(&data).unwrap() + } + + fn from_dump(data: String) -> IFResult { + if !data.is_empty() { + let data: SerData = serde_json::from_str(data.as_str()).unwrap(); + IPInterface::new(data.port, data.peers) + } else { + let ip_path = std::path::Path::new(".if_ip_peers"); + let peers = if ip_path.exists() { + std::fs::read_to_string(ip_path) + .unwrap() + .split('\n') + .filter_map(|line| net::SocketAddr::from_str(line).ok()) + .map(|addr| (addr.ip(), addr.port())) + .collect() + } else { + println!("Warning: there are no peers in IP, which makes it essentially useless"); + vec![] + }; + IPInterface::new(DEFAULT_PORT, peers) + } + } } impl IPInterface { diff --git a/src/interfaces/mod.rs b/src/interfaces/mod.rs index 7b6c990..898c31e 100644 --- a/src/interfaces/mod.rs +++ b/src/interfaces/mod.rs @@ -3,7 +3,33 @@ pub mod ip; use crate::interface::Interface; use alloc::vec; +use alloc::vec::Vec; +use alloc::boxed::Box; +use alloc::string::String; +#[cfg(feature = "std")] +use crate::interfaces::ip::IPInterface; +use crate::res::IFResult; -pub fn get_interfaces() -> alloc::vec::Vec> { +#[cfg(not(feature = "std"))] +pub fn get_interfaces() -> Vec> { vec![] } + +#[cfg(feature = "std")] +pub fn get_interfaces() -> Vec> { + vec![Box::new(IPInterface::from_dump(Default::default()).unwrap())] +} + +#[cfg(not(feature = "std"))] +pub fn restore_interfaces(_data: Vec) -> IFResult>> { + Ok(vec![]) +} + +#[cfg(feature = "std")] +pub fn restore_interfaces(data: Vec) -> IFResult>> { + if data.is_empty() { + Ok(get_interfaces()) + } else { + Ok(vec![Box::new(IPInterface::from_dump(data[0].clone())?)]) + } +} diff --git a/src/ironforce.rs b/src/ironforce.rs index 4e61b0d..84428c8 100644 --- a/src/ironforce.rs +++ b/src/ironforce.rs @@ -1,15 +1,20 @@ use crate::crypto::{Keys, PublicKey}; use crate::message::{Message, MessageType, ServiceMessageType}; use crate::res::{IFError, IFResult}; -use crate::transport::Transport; +use crate::transport::{PeerInfo, Transport}; use crate::tunnel::{Tunnel, TunnelPublic}; use alloc::collections::BTreeMap; +#[cfg(feature = "std")] +use alloc::string::ToString; use alloc::vec; use alloc::vec::Vec; +use serde::{Deserialize, Serialize}; #[cfg(feature = "std")] use std::println; const TUNNEL_MAX_REPEAT_COUNT: u32 = 3; +#[cfg(feature = "std")] +const DEFAULT_FILE: &str = ".if_data.json"; /// Main worker pub struct IronForce { @@ -41,6 +46,28 @@ pub struct IronForce { /// /// Maps tunnel's first local_id to the number tunnel_counters: BTreeMap, + /// Auto save + auto_save: bool, +} + +/// Data for the serialization of IF +#[derive(Serialize, Deserialize)] +pub struct IFSerializationData { + pub keys: Keys, + pub tunnels: Vec, + pub peers: Vec, + pub interfaces_data: Vec, +} + +impl IFSerializationData { + pub fn default() -> IFSerializationData { + IFSerializationData { + keys: Keys::generate(), + tunnels: vec![], + peers: vec![], + interfaces_data: vec![], + } + } } impl IronForce { @@ -56,6 +83,7 @@ impl IronForce { has_background_worker: false, processed_messages: vec![], tunnel_counters: Default::default(), + auto_save: false, } } @@ -249,17 +277,28 @@ impl IronForce { } } } - MessageType::SingleCast if message.check_recipient(&self.keys) => self.messages.push(message.clone()), + MessageType::SingleCast if message.check_recipient(&self.keys) => { + self.messages.push(message.clone()) + } MessageType::SingleCast => { - if let Some(tunnel) = self.tunnels.iter().find(|tun| tun.id == Some(message.tunnel_id.0)) { - let peer_id = if message.tunnel_id.1 { tunnel.peer_ids.0 } else { tunnel.peer_ids.1 }; - self.transport.send_message(serde_cbor::to_vec(&message)?, Some(peer_id))?; + if let Some(tunnel) = self + .tunnels + .iter() + .find(|tun| tun.id == Some(message.tunnel_id.0)) + { + let peer_id = if message.tunnel_id.1 { + tunnel.peer_ids.0 + } else { + tunnel.peer_ids.1 + }; + self.transport + .send_message(serde_cbor::to_vec(&message)?, Some(peer_id))?; } } MessageType::Broadcast => { self.messages.push(message.clone()); self.send_to_all(message)?; - }, + } } Ok(()) } @@ -289,6 +328,67 @@ impl IronForce { ) .unwrap() } + + pub fn get_serialization_data(&self) -> IFSerializationData { + IFSerializationData { + keys: self.keys.clone(), + tunnels: self.tunnels.clone(), + peers: self.transport.peers.clone(), + interfaces_data: self.transport.get_interfaces_data(), + } + } + + pub fn from_serialization_data(data: IFSerializationData) -> IFResult { + Ok(Self { + keys: data.keys, + transport: Transport::restore(data.peers.clone(), data.interfaces_data.clone())?, + tunnels: data.tunnels, + additional_modules: vec![], + messages: vec![], + tunnels_pending: vec![], + has_background_worker: false, + processed_messages: vec![], + tunnel_counters: Default::default(), + auto_save: true, + }) + } + + #[cfg(feature = "std")] + pub fn save_to_file(&self, filename: Option) -> IFResult<()> { + std::fs::write( + filename.unwrap_or_else(|| DEFAULT_FILE.to_string()), + serde_json::to_string(&self.get_serialization_data())?, + )?; + Ok(()) + } + + #[cfg(feature = "std")] + pub fn launch_main_loop( + mut self, + sleep_millis: u64, + ) -> ( + std::thread::JoinHandle, + std::sync::Arc>, + ) { + self.has_background_worker = true; + let container = std::sync::Arc::new(std::sync::Mutex::new(self)); + let container_clone = container.clone(); + let thread = std::thread::spawn(move || { + let mut counter: u64 = 0; + loop { + match container_clone.lock().unwrap().main_loop_iteration() { + Ok(_) => {} + Err(e) => println!("An error happened in the main loop: {:?}", e), + } + counter += 1; + std::thread::sleep(std::time::Duration::from_millis(sleep_millis)); + if counter % 50 == 0 { + container_clone.lock().unwrap().save_to_file(None).unwrap() + } + } + }); + (thread, container) + } } #[cfg(test)] @@ -319,6 +419,7 @@ mod if_testing { has_background_worker: false, processed_messages: vec![], tunnel_counters: Default::default(), + auto_save: false, }) .collect() } @@ -371,15 +472,15 @@ mod if_testing { #[cfg(feature = "std")] mod test_with_ip { use crate::crypto::Keys; + use crate::interfaces::ip::create_test_interfaces; use crate::ironforce::IronForce; + use crate::message::{Message, MessageType}; use crate::res::IFResult; use crate::transport::Transport; use alloc::boxed::Box; use alloc::vec; use alloc::vec::Vec; use std::println; - use crate::interfaces::ip::create_test_interfaces; - use crate::message::{Message, MessageType}; // fn create_test_interfaces(n: usize) -> impl Iterator { // let ip_addr = std::net::IpAddr::from_str("127.0.0.1").unwrap(); @@ -410,7 +511,8 @@ mod test_with_ip { tunnels_pending: vec![], has_background_worker: false, processed_messages: vec![], - tunnel_counters: Default::default() + tunnel_counters: Default::default(), + auto_save: false, }) .collect() } @@ -475,7 +577,8 @@ mod test_with_ip { .recipient(&key_1) .sign(&node0_keys) .build()?, - &key_1)?; + &key_1, + )?; let t2 = std::thread::spawn(move || { for _ in 0..18 { node1.main_loop_iteration().unwrap(); diff --git a/src/lib.rs b/src/lib.rs index 06b35b5..de516c5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ #![no_std] #![allow(dead_code)] #![feature(trait_alias)] +#![feature(never_type)] #[cfg(feature = "std")] extern crate std; diff --git a/src/res.rs b/src/res.rs index e2ef4cf..5d2dda1 100644 --- a/src/res.rs +++ b/src/res.rs @@ -40,6 +40,12 @@ impl From for IFError { } } +impl From for IFError { + fn from(e: serde_json::Error) -> Self { + Self::SerializationError(format!("{:?}", e)) + } +} + impl From for IFError { fn from(e: rsa::errors::Error) -> Self { Self::CryptoError(format!("{:?}", e)) diff --git a/src/transport.rs b/src/transport.rs index 5ad7d84..702bf6f 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -1,19 +1,19 @@ -use alloc::boxed::Box; -use alloc::string::{String, ToString}; -use alloc::vec::Vec; -use alloc::vec; use crate::interface::{Interface, TargetingData}; use crate::message::MessageBytes; use crate::res::IFResult; +use alloc::boxed::Box; +use alloc::string::{String, ToString}; +use alloc::vec; +use alloc::vec::Vec; +use serde::{Deserialize, Serialize}; #[cfg(feature = "std")] use rayon::prelude::*; #[cfg(feature = "std")] use std::println; - /// An identification of a peer - something that we can use to send a message to id -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct PeerInfo { /// Something to locally identify this peer pub peer_id: u64, @@ -33,31 +33,47 @@ impl PeerInfo { } } - /// The struct that manages all the communication with peers pub struct Transport { pub interfaces: Vec>, - peers: Vec, + pub(crate) peers: Vec, } impl Transport { /// Create new transport with given interfaces pub fn new(interfaces: Vec>) -> Self { #[cfg(not(feature = "std"))] - if interfaces.iter().map(|interface| interface.has_blocking_main() as u8).sum::() > 1 { + if interfaces + .iter() + .map(|interface| interface.has_blocking_main() as u8) + .sum::() + > 1 + { panic!("There is two interfaces with blocking main loops and we have no threads because this is no_std!"); } - Self { interfaces, peers: vec![] } + Self { + interfaces, + peers: vec![], + } } /// Find a peer in `self.peers` by its id fn get_peer_by_id(&self, peer_id: u64) -> Option { - self.peers.iter().find(|peer| peer.peer_id == peer_id).cloned() + self.peers + .iter() + .find(|peer| peer.peer_id == peer_id) + .cloned() } /// Try to find a peer in `self.peers` by interface_id and targeting data - fn get_peer_by_parameters(&self, interface_id: &str, data: &str /*&TargetingData*/) -> Option<&PeerInfo> { - self.peers.iter().find(|peer| peer.interface_id == interface_id && peer.interface_targeting_data == data) + fn get_peer_by_parameters( + &self, + interface_id: &str, + data: &str, /*&TargetingData*/ + ) -> Option<&PeerInfo> { + self.peers + .iter() + .find(|peer| peer.interface_id == interface_id && peer.interface_targeting_data == data) } /// Insert a new peer into out database and return its id or find an existing one with these parameters @@ -69,32 +85,42 @@ impl Transport { self.peers.push(new_peer); peer_id } - Some(peer) => peer.peer_id + Some(peer) => peer.peer_id, } } /// Get interface index by its ID fn interface_index_by_id(&self, interface_id: &str) -> usize { - self.interfaces.iter().position(|interface| interface.id() == interface_id).unwrap_or_else(|| panic!("Invalid interface id")) + self.interfaces + .iter() + .position(|interface| interface.id() == interface_id) + .unwrap_or_else(|| panic!("Invalid interface id")) } /// Send message bytes to a peer if data is provided or broadcast the data if `peer == None` pub fn send_message(&mut self, message: MessageBytes, peer_id: Option) -> IFResult<()> { - let peer = if let Some(peer_id) = peer_id { self.get_peer_by_id(peer_id) } else { None }; + let peer = if let Some(peer_id) = peer_id { + self.get_peer_by_id(peer_id) + } else { + None + }; match peer { // Broadcast None => { #[cfg(not(feature = "std"))] - { - for interface in &mut self.interfaces { - interface.send(&message, None)?; - } + { + for interface in &mut self.interfaces { + interface.send(&message, None)?; } + } // If we have concurrency, we will do it concurrently #[cfg(feature = "std")] - { - self.interfaces.par_iter_mut().map(|interface| interface.send(&message, None)).for_each(drop); - } + { + self.interfaces + .par_iter_mut() + .map(|interface| interface.send(&message, None)) + .for_each(drop); + } Ok(()) } // Singlecast @@ -109,7 +135,8 @@ impl Transport { /// /// Returns a result with an option of `(message, peer_id)` pub fn receive(&mut self) -> Option<(MessageBytes, u64 /* peer id*/)> { - if let Some((interface_id, (msg, peer_data))) = self.interfaces + if let Some((interface_id, (msg, peer_data))) = self + .interfaces .iter_mut() // For each interface return (interface id, message result) .map(|interface| (interface.id().to_string(), interface.receive())) @@ -120,30 +147,53 @@ impl Transport { println!("An error occurred while receiving: {:?}", e); (id, Err(e)) } - (id, Ok(r)) => (id, Ok(r)) + (id, Ok(r)) => (id, Ok(r)), }) // Find a result where there is a message .find(|r| matches!(r, (_, Ok(Some(_))))) // Safely unwrap this result (we already matched `Ok(Some(_))`) - .map(|(id, r)| (id, r.unwrap().unwrap())) { + .map(|(id, r)| (id, r.unwrap().unwrap())) + { Some((msg, self.find_or_add_peer(interface_id, peer_data))) - } else { None } + } else { + None + } } /// Run one iteration of the main loop pub fn main_loop_iteration(&mut self) -> IFResult<()> { #[cfg(feature = "std")] - self.interfaces.par_iter_mut().map(|interface| interface.main_loop_iteration()).collect::>()?; + self.interfaces + .par_iter_mut() + .map(|interface| interface.main_loop_iteration()) + .collect::>()?; #[cfg(not(feature = "std"))] - { - self.interfaces.iter_mut().try_for_each(|interface| if !interface.has_blocking_main() { interface.main_loop_iteration() } else { Ok(()) })?; - let blocking_interface_index = self.interfaces.iter().position(|interface| interface.has_blocking_main()); - if let Some(ind) = blocking_interface_index { - self.interfaces[ind].main_loop_iteration()?; + { + self.interfaces.iter_mut().try_for_each(|interface| { + if !interface.has_blocking_main() { + interface.main_loop_iteration() + } else { + Ok(()) } + })?; + let blocking_interface_index = self + .interfaces + .iter() + .position(|interface| interface.has_blocking_main()); + if let Some(ind) = blocking_interface_index { + self.interfaces[ind].main_loop_iteration()?; } + } Ok(()) } + + pub fn get_interfaces_data(&self) -> Vec { + self.interfaces.iter().map(|interface| interface.get_dump_data()).collect() + } + + pub fn restore(peers: Vec, interfaces_data: Vec) -> IFResult { + Ok(Transport { interfaces: crate::interfaces::restore_interfaces(interfaces_data)?, peers }) + } } #[cfg(test)] @@ -153,14 +203,20 @@ use crate::interface::test_interface::SimpleTestInterface; fn test_adding_peer_to_transport() { let mut transport = Transport::new(vec![Box::new(SimpleTestInterface::default())]); let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); - assert!(transport.get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str()).is_none()); - let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data.clone()); + assert!(transport + .get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str()) + .is_none()); + let peer_id = + transport.find_or_add_peer(interface_id.clone(), interface_targeting_data.clone()); let peer = PeerInfo { peer_id, interface_id: interface_id.clone(), interface_targeting_data: interface_targeting_data.clone(), }; - assert_eq!(transport.get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str()), Some(&peer)); + assert_eq!( + transport.get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str()), + Some(&peer) + ); assert_eq!(transport.get_peer_by_id(peer_id), Some(peer)); } @@ -170,10 +226,18 @@ fn test_transport_sending() { let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); let peer_id = transport.find_or_add_peer(interface_id, interface_targeting_data.clone()); transport.send_message(vec![239, 123], None).unwrap(); - assert_eq!(transport.interfaces[0].receive().unwrap(), Some((vec![239u8, 123], "".to_string()))); + assert_eq!( + transport.interfaces[0].receive().unwrap(), + Some((vec![239u8, 123], "".to_string())) + ); assert!(transport.interfaces[0].receive() == IFResult::Ok(None)); - transport.send_message(vec![239, 123], Some(peer_id)).unwrap(); - assert_eq!(transport.interfaces[0].receive(), IFResult::Ok(Some((vec![239, 123], interface_targeting_data)))); + transport + .send_message(vec![239, 123], Some(peer_id)) + .unwrap(); + assert_eq!( + transport.interfaces[0].receive(), + IFResult::Ok(Some((vec![239, 123], interface_targeting_data))) + ); } #[test] @@ -182,8 +246,19 @@ fn test_transport_receiving() { let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data); transport.send_message(vec![239, 123], None).unwrap(); - assert_eq!(transport.receive(), Some((vec![239u8, 123], transport.get_peer_by_parameters(interface_id.as_str(), "").unwrap().peer_id))); + assert_eq!( + transport.receive(), + Some(( + vec![239u8, 123], + transport + .get_peer_by_parameters(interface_id.as_str(), "") + .unwrap() + .peer_id + )) + ); assert!(transport.receive().is_none()); - transport.send_message(vec![239, 123], Some(peer_id)).unwrap(); + transport + .send_message(vec![239, 123], Some(peer_id)) + .unwrap(); assert_eq!(transport.receive(), Some((vec![239, 123], peer_id))); }