use alloc::boxed::Box; use alloc::string::String; use alloc::vec::Vec; use alloc::vec; use crate::interface::{Interface, TargetingData}; use crate::message::MessageBytes; use crate::res::IFResult; #[cfg(std)] use rayon::prelude::*; /// An identification of a peer - something that we can use to send a message to id #[derive(Clone, Debug, PartialEq)] pub struct PeerInfo { /// Something to locally identify this peer pub peer_id: u64, /// ID of the interface through which we communicate with this peer pub interface_id: String, /// Data that should be passed to the interface to send a message to this peer pub interface_targeting_data: TargetingData, } impl PeerInfo { fn new(interface_id: String, interface_targeting_data: TargetingData) -> Self { Self { peer_id: rand::random(), interface_id, interface_targeting_data, } } } /// The struct that manages all the communication with peers pub struct Transport { pub interfaces: Vec>, peers: Vec, } impl Transport { /// Create new transport with given interfaces pub fn new(interfaces: Vec>) -> Self { Self { interfaces, peers: vec![] } } /// Find a peer in `self.peers` by its id fn get_peer_by_id(&self, peer_id: u64) -> Option { self.peers.iter().find(|peer| peer.peer_id == peer_id).cloned() } /// Try to find a peer in `self.peers` by interface_id and targeting data fn get_peer_by_parameters(&self, interface_id: &str, data: &str /*&TargetingData*/) -> Option<&PeerInfo> { self.peers.iter().find(|peer| peer.interface_id == interface_id && peer.interface_targeting_data == data) } /// Insert a new peer into out database and return its id or find an existing one with these parameters fn find_or_add_peer(&mut self, interface_id: String, data: TargetingData) -> u64 { match self.get_peer_by_parameters(interface_id.as_str(), &data) { None => { let new_peer = PeerInfo::new(interface_id, data); let peer_id = new_peer.peer_id; self.peers.push(new_peer); peer_id } Some(peer) => peer.peer_id } } /// Get interface index by its ID fn interface_index_by_id(&self, interface_id: &str) -> usize { self.interfaces.iter().position(|interface| interface.id() == interface_id).unwrap_or_else(|| panic!("Invalid interface id")) } /// Send message bytes to a peer if data is provided or broadcast the data if `peer == None` pub fn send_message(&mut self, message: MessageBytes, peer_id: Option) -> IFResult<()> { let peer = if let Some(peer_id) = peer_id { self.get_peer_by_id(peer_id) } else { None }; match peer { // Broadcast None => { #[cfg(not(std))] { for interface in &mut self.interfaces { interface.send(&message, None)?; } } // If we have concurrency, we will do it concurrently #[cfg(std)] { self.interfaces.par_iter_mut().map(|interface| interface.send(&message, None)).for_each(drop); } Ok(()) } // Singlecast Some(peer) => { let interface_ind = self.interface_index_by_id(peer.interface_id.as_str()); self.interfaces[interface_ind].send(&message, Some(peer.interface_targeting_data)) } } } } #[cfg(test)] use crate::interface::test_interface::TestInterface; #[cfg(test)] use alloc::string::ToString; #[test] fn test_adding_peer_to_transport() { let mut transport = Transport::new(vec![Box::new(TestInterface::default())]); let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); assert!(transport.get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str()).is_none()); let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data.clone()); let peer = PeerInfo { peer_id, interface_id: interface_id.clone(), interface_targeting_data: interface_targeting_data.clone(), }; assert_eq!(transport.get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str()), Some(&peer)); assert_eq!(transport.get_peer_by_id(peer_id), Some(peer)); } #[test] fn test_transport_sending() { let mut transport = Transport::new(vec![Box::new(TestInterface::default())]); let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data.clone()); transport.send_message(vec![239, 123], None).unwrap(); assert_eq!(transport.interfaces[0].receive().unwrap(), Some((vec![239u8, 123], "".to_string()))); assert!(transport.interfaces[0].receive() == IFResult::Ok(None)); transport.send_message(vec![239, 123], Some(peer_id)).unwrap(); assert_eq!(transport.interfaces[0].receive(), IFResult::Ok(Some((vec![239, 123], interface_targeting_data)))); }