use alloc::boxed::Box; use alloc::string::{String, ToString}; use alloc::vec::Vec; use alloc::vec; use crate::interface::{Interface, TargetingData}; use crate::message::MessageBytes; use crate::res::IFResult; #[cfg(feature = "std")] use rayon::prelude::*; #[cfg(feature = "std")] use std::println; /// An identification of a peer - something that we can use to send a message to id #[derive(Clone, Debug, PartialEq)] pub struct PeerInfo { /// Something to locally identify this peer pub peer_id: u64, /// ID of the interface through which we communicate with this peer pub interface_id: String, /// Data that should be passed to the interface to send a message to this peer pub interface_targeting_data: TargetingData, } impl PeerInfo { fn new(interface_id: String, interface_targeting_data: TargetingData) -> Self { Self { peer_id: rand::random(), interface_id, interface_targeting_data, } } } /// The struct that manages all the communication with peers pub struct Transport { pub interfaces: Vec>, peers: Vec, } impl Transport { /// Create new transport with given interfaces pub fn new(interfaces: Vec>) -> Self { #[cfg(not(feature = "std"))] if interfaces.iter().map(|interface| interface.has_blocking_main() as u8).sum::() > 1 { panic!("There is two interfaces with blocking main loops and we have no threads because this is no_std!"); } Self { interfaces, peers: vec![] } } /// Find a peer in `self.peers` by its id fn get_peer_by_id(&self, peer_id: u64) -> Option { self.peers.iter().find(|peer| peer.peer_id == peer_id).cloned() } /// Try to find a peer in `self.peers` by interface_id and targeting data fn get_peer_by_parameters(&self, interface_id: &str, data: &str /*&TargetingData*/) -> Option<&PeerInfo> { self.peers.iter().find(|peer| peer.interface_id == interface_id && peer.interface_targeting_data == data) } /// Insert a new peer into out database and return its id or find an existing one with these parameters fn find_or_add_peer(&mut self, interface_id: String, data: TargetingData) -> u64 { match self.get_peer_by_parameters(interface_id.as_str(), &data) { None => { let new_peer = PeerInfo::new(interface_id, data); let peer_id = new_peer.peer_id; self.peers.push(new_peer); peer_id } Some(peer) => peer.peer_id } } /// Get interface index by its ID fn interface_index_by_id(&self, interface_id: &str) -> usize { self.interfaces.iter().position(|interface| interface.id() == interface_id).unwrap_or_else(|| panic!("Invalid interface id")) } /// Send message bytes to a peer if data is provided or broadcast the data if `peer == None` pub fn send_message(&mut self, message: MessageBytes, peer_id: Option) -> IFResult<()> { let peer = if let Some(peer_id) = peer_id { self.get_peer_by_id(peer_id) } else { None }; match peer { // Broadcast None => { #[cfg(not(feature = "std"))] { for interface in &mut self.interfaces { interface.send(&message, None)?; } } // If we have concurrency, we will do it concurrently #[cfg(feature = "std")] { self.interfaces.par_iter_mut().map(|interface| interface.send(&message, None)).for_each(drop); } Ok(()) } // Singlecast Some(peer) => { let interface_ind = self.interface_index_by_id(peer.interface_id.as_str()); self.interfaces[interface_ind].send(&message, Some(peer.interface_targeting_data)) } } } /// Poll all the interfaces and receive a message /// /// Returns a result with an option of `(message, peer_id)` pub fn receive(&mut self) -> Option<(MessageBytes, u64 /* peer id*/)> { if let Some((interface_id, (msg, peer_data))) = self.interfaces .iter_mut() // For each interface return (interface id, message result) .map(|interface| (interface.id().to_string(), interface.receive())) // If there was an error, print it .map(|res| match res { (id, Err(e)) => { #[cfg(feature = "std")] println!("An error occurred while receiving: {:?}", e); (id, Err(e)) } (id, Ok(r)) => (id, Ok(r)) }) // Find a result where there is a message .find(|r| matches!(r, (_, Ok(Some(_))))) // Safely unwrap this result (we already matched `Ok(Some(_))`) .map(|(id, r)| (id, r.unwrap().unwrap())) { Some((msg, self.find_or_add_peer(interface_id, peer_data))) } else { None } } /// Run one iteration of the main loop pub fn main_loop_iteration(&mut self) -> IFResult<()> { #[cfg(feature = "std")] self.interfaces.par_iter_mut().map(|interface| interface.main_loop_iteration()).collect::>()?; #[cfg(not(feature = "std"))] { self.interfaces.iter_mut().try_for_each(|interface| if !interface.has_blocking_main() { interface.main_loop_iteration() } else { Ok(()) })?; let blocking_interface_index = self.interfaces.iter().position(|interface| interface.has_blocking_main()); if let Some(ind) = blocking_interface_index { self.interfaces[ind].main_loop_iteration()?; } } Ok(()) } } #[cfg(test)] use crate::interface::test_interface::SimpleTestInterface; #[test] fn test_adding_peer_to_transport() { let mut transport = Transport::new(vec![Box::new(SimpleTestInterface::default())]); let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); assert!(transport.get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str()).is_none()); let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data.clone()); let peer = PeerInfo { peer_id, interface_id: interface_id.clone(), interface_targeting_data: interface_targeting_data.clone(), }; assert_eq!(transport.get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str()), Some(&peer)); assert_eq!(transport.get_peer_by_id(peer_id), Some(peer)); } #[test] fn test_transport_sending() { let mut transport = Transport::new(vec![Box::new(SimpleTestInterface::default())]); let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); let peer_id = transport.find_or_add_peer(interface_id, interface_targeting_data.clone()); transport.send_message(vec![239, 123], None).unwrap(); assert_eq!(transport.interfaces[0].receive().unwrap(), Some((vec![239u8, 123], "".to_string()))); assert!(transport.interfaces[0].receive() == IFResult::Ok(None)); transport.send_message(vec![239, 123], Some(peer_id)).unwrap(); assert_eq!(transport.interfaces[0].receive(), IFResult::Ok(Some((vec![239, 123], interface_targeting_data)))); } #[test] fn test_transport_receiving() { let mut transport = Transport::new(vec![Box::new(SimpleTestInterface::default())]); let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data); transport.send_message(vec![239, 123], None).unwrap(); assert_eq!(transport.receive(), Some((vec![239u8, 123], transport.get_peer_by_parameters(interface_id.as_str(), "").unwrap().peer_id))); assert!(transport.receive().is_none()); transport.send_message(vec![239, 123], Some(peer_id)).unwrap(); assert_eq!(transport.receive(), Some((vec![239, 123], peer_id))); }