|
|
|
@ -1,19 +1,19 @@
|
|
|
|
|
use alloc::boxed::Box; |
|
|
|
|
use alloc::string::{String, ToString}; |
|
|
|
|
use alloc::vec::Vec; |
|
|
|
|
use alloc::vec; |
|
|
|
|
use crate::interface::{Interface, TargetingData}; |
|
|
|
|
use crate::message::MessageBytes; |
|
|
|
|
use crate::res::IFResult; |
|
|
|
|
use alloc::boxed::Box; |
|
|
|
|
use alloc::string::{String, ToString}; |
|
|
|
|
use alloc::vec; |
|
|
|
|
use alloc::vec::Vec; |
|
|
|
|
use serde::{Deserialize, Serialize}; |
|
|
|
|
|
|
|
|
|
#[cfg(feature = "std")] |
|
|
|
|
use rayon::prelude::*; |
|
|
|
|
#[cfg(feature = "std")] |
|
|
|
|
use std::println; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// An identification of a peer - something that we can use to send a message to id
|
|
|
|
|
#[derive(Clone, Debug, PartialEq)] |
|
|
|
|
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] |
|
|
|
|
pub struct PeerInfo { |
|
|
|
|
/// Something to locally identify this peer
|
|
|
|
|
pub peer_id: u64, |
|
|
|
@ -33,31 +33,47 @@ impl PeerInfo {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// The struct that manages all the communication with peers
|
|
|
|
|
pub struct Transport { |
|
|
|
|
pub interfaces: Vec<Box<dyn Interface>>, |
|
|
|
|
peers: Vec<PeerInfo>, |
|
|
|
|
pub(crate) peers: Vec<PeerInfo>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl Transport { |
|
|
|
|
/// Create new transport with given interfaces
|
|
|
|
|
pub fn new(interfaces: Vec<Box<dyn Interface>>) -> Self { |
|
|
|
|
#[cfg(not(feature = "std"))] |
|
|
|
|
if interfaces.iter().map(|interface| interface.has_blocking_main() as u8).sum::<u8>() > 1 { |
|
|
|
|
if interfaces |
|
|
|
|
.iter() |
|
|
|
|
.map(|interface| interface.has_blocking_main() as u8) |
|
|
|
|
.sum::<u8>() |
|
|
|
|
> 1 |
|
|
|
|
{ |
|
|
|
|
panic!("There is two interfaces with blocking main loops and we have no threads because this is no_std!"); |
|
|
|
|
} |
|
|
|
|
Self { interfaces, peers: vec![] } |
|
|
|
|
Self { |
|
|
|
|
interfaces, |
|
|
|
|
peers: vec![], |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Find a peer in `self.peers` by its id
|
|
|
|
|
fn get_peer_by_id(&self, peer_id: u64) -> Option<PeerInfo> { |
|
|
|
|
self.peers.iter().find(|peer| peer.peer_id == peer_id).cloned() |
|
|
|
|
self.peers |
|
|
|
|
.iter() |
|
|
|
|
.find(|peer| peer.peer_id == peer_id) |
|
|
|
|
.cloned() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Try to find a peer in `self.peers` by interface_id and targeting data
|
|
|
|
|
fn get_peer_by_parameters(&self, interface_id: &str, data: &str /*&TargetingData*/) -> Option<&PeerInfo> { |
|
|
|
|
self.peers.iter().find(|peer| peer.interface_id == interface_id && peer.interface_targeting_data == data) |
|
|
|
|
fn get_peer_by_parameters( |
|
|
|
|
&self, |
|
|
|
|
interface_id: &str, |
|
|
|
|
data: &str, /*&TargetingData*/ |
|
|
|
|
) -> Option<&PeerInfo> { |
|
|
|
|
self.peers |
|
|
|
|
.iter() |
|
|
|
|
.find(|peer| peer.interface_id == interface_id && peer.interface_targeting_data == data) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Insert a new peer into out database and return its id or find an existing one with these parameters
|
|
|
|
@ -69,32 +85,42 @@ impl Transport {
|
|
|
|
|
self.peers.push(new_peer); |
|
|
|
|
peer_id |
|
|
|
|
} |
|
|
|
|
Some(peer) => peer.peer_id |
|
|
|
|
Some(peer) => peer.peer_id, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Get interface index by its ID
|
|
|
|
|
fn interface_index_by_id(&self, interface_id: &str) -> usize { |
|
|
|
|
self.interfaces.iter().position(|interface| interface.id() == interface_id).unwrap_or_else(|| panic!("Invalid interface id")) |
|
|
|
|
self.interfaces |
|
|
|
|
.iter() |
|
|
|
|
.position(|interface| interface.id() == interface_id) |
|
|
|
|
.unwrap_or_else(|| panic!("Invalid interface id")) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Send message bytes to a peer if data is provided or broadcast the data if `peer == None`
|
|
|
|
|
pub fn send_message(&mut self, message: MessageBytes, peer_id: Option<u64>) -> IFResult<()> { |
|
|
|
|
let peer = if let Some(peer_id) = peer_id { self.get_peer_by_id(peer_id) } else { None }; |
|
|
|
|
let peer = if let Some(peer_id) = peer_id { |
|
|
|
|
self.get_peer_by_id(peer_id) |
|
|
|
|
} else { |
|
|
|
|
None |
|
|
|
|
}; |
|
|
|
|
match peer { |
|
|
|
|
// Broadcast
|
|
|
|
|
None => { |
|
|
|
|
#[cfg(not(feature = "std"))] |
|
|
|
|
{ |
|
|
|
|
for interface in &mut self.interfaces { |
|
|
|
|
interface.send(&message, None)?; |
|
|
|
|
} |
|
|
|
|
{ |
|
|
|
|
for interface in &mut self.interfaces { |
|
|
|
|
interface.send(&message, None)?; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// If we have concurrency, we will do it concurrently
|
|
|
|
|
#[cfg(feature = "std")] |
|
|
|
|
{ |
|
|
|
|
self.interfaces.par_iter_mut().map(|interface| interface.send(&message, None)).for_each(drop); |
|
|
|
|
} |
|
|
|
|
{ |
|
|
|
|
self.interfaces |
|
|
|
|
.par_iter_mut() |
|
|
|
|
.map(|interface| interface.send(&message, None)) |
|
|
|
|
.for_each(drop); |
|
|
|
|
} |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
// Singlecast
|
|
|
|
@ -109,7 +135,8 @@ impl Transport {
|
|
|
|
|
///
|
|
|
|
|
/// Returns a result with an option of `(message, peer_id)`
|
|
|
|
|
pub fn receive(&mut self) -> Option<(MessageBytes, u64 /* peer id*/)> { |
|
|
|
|
if let Some((interface_id, (msg, peer_data))) = self.interfaces |
|
|
|
|
if let Some((interface_id, (msg, peer_data))) = self |
|
|
|
|
.interfaces |
|
|
|
|
.iter_mut() |
|
|
|
|
// For each interface return (interface id, message result)
|
|
|
|
|
.map(|interface| (interface.id().to_string(), interface.receive())) |
|
|
|
@ -120,30 +147,53 @@ impl Transport {
|
|
|
|
|
println!("An error occurred while receiving: {:?}", e); |
|
|
|
|
(id, Err(e)) |
|
|
|
|
} |
|
|
|
|
(id, Ok(r)) => (id, Ok(r)) |
|
|
|
|
(id, Ok(r)) => (id, Ok(r)), |
|
|
|
|
}) |
|
|
|
|
// Find a result where there is a message
|
|
|
|
|
.find(|r| matches!(r, (_, Ok(Some(_))))) |
|
|
|
|
// Safely unwrap this result (we already matched `Ok(Some(_))`)
|
|
|
|
|
.map(|(id, r)| (id, r.unwrap().unwrap())) { |
|
|
|
|
.map(|(id, r)| (id, r.unwrap().unwrap())) |
|
|
|
|
{ |
|
|
|
|
Some((msg, self.find_or_add_peer(interface_id, peer_data))) |
|
|
|
|
} else { None } |
|
|
|
|
} else { |
|
|
|
|
None |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Run one iteration of the main loop
|
|
|
|
|
pub fn main_loop_iteration(&mut self) -> IFResult<()> { |
|
|
|
|
#[cfg(feature = "std")] |
|
|
|
|
self.interfaces.par_iter_mut().map(|interface| interface.main_loop_iteration()).collect::<IFResult<_>>()?; |
|
|
|
|
self.interfaces |
|
|
|
|
.par_iter_mut() |
|
|
|
|
.map(|interface| interface.main_loop_iteration()) |
|
|
|
|
.collect::<IFResult<_>>()?; |
|
|
|
|
#[cfg(not(feature = "std"))] |
|
|
|
|
{ |
|
|
|
|
self.interfaces.iter_mut().try_for_each(|interface| if !interface.has_blocking_main() { interface.main_loop_iteration() } else { Ok(()) })?; |
|
|
|
|
let blocking_interface_index = self.interfaces.iter().position(|interface| interface.has_blocking_main()); |
|
|
|
|
if let Some(ind) = blocking_interface_index { |
|
|
|
|
self.interfaces[ind].main_loop_iteration()?; |
|
|
|
|
{ |
|
|
|
|
self.interfaces.iter_mut().try_for_each(|interface| { |
|
|
|
|
if !interface.has_blocking_main() { |
|
|
|
|
interface.main_loop_iteration() |
|
|
|
|
} else { |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
})?; |
|
|
|
|
let blocking_interface_index = self |
|
|
|
|
.interfaces |
|
|
|
|
.iter() |
|
|
|
|
.position(|interface| interface.has_blocking_main()); |
|
|
|
|
if let Some(ind) = blocking_interface_index { |
|
|
|
|
self.interfaces[ind].main_loop_iteration()?; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn get_interfaces_data(&self) -> Vec<String> { |
|
|
|
|
self.interfaces.iter().map(|interface| interface.get_dump_data()).collect() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn restore(peers: Vec<PeerInfo>, interfaces_data: Vec<String>) -> IFResult<Self> { |
|
|
|
|
Ok(Transport { interfaces: crate::interfaces::restore_interfaces(interfaces_data)?, peers }) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[cfg(test)] |
|
|
|
@ -153,14 +203,20 @@ use crate::interface::test_interface::SimpleTestInterface;
|
|
|
|
|
fn test_adding_peer_to_transport() { |
|
|
|
|
let mut transport = Transport::new(vec![Box::new(SimpleTestInterface::default())]); |
|
|
|
|
let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); |
|
|
|
|
assert!(transport.get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str()).is_none()); |
|
|
|
|
let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data.clone()); |
|
|
|
|
assert!(transport |
|
|
|
|
.get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str()) |
|
|
|
|
.is_none()); |
|
|
|
|
let peer_id = |
|
|
|
|
transport.find_or_add_peer(interface_id.clone(), interface_targeting_data.clone()); |
|
|
|
|
let peer = PeerInfo { |
|
|
|
|
peer_id, |
|
|
|
|
interface_id: interface_id.clone(), |
|
|
|
|
interface_targeting_data: interface_targeting_data.clone(), |
|
|
|
|
}; |
|
|
|
|
assert_eq!(transport.get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str()), Some(&peer)); |
|
|
|
|
assert_eq!( |
|
|
|
|
transport.get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str()), |
|
|
|
|
Some(&peer) |
|
|
|
|
); |
|
|
|
|
assert_eq!(transport.get_peer_by_id(peer_id), Some(peer)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -170,10 +226,18 @@ fn test_transport_sending() {
|
|
|
|
|
let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); |
|
|
|
|
let peer_id = transport.find_or_add_peer(interface_id, interface_targeting_data.clone()); |
|
|
|
|
transport.send_message(vec![239, 123], None).unwrap(); |
|
|
|
|
assert_eq!(transport.interfaces[0].receive().unwrap(), Some((vec![239u8, 123], "".to_string()))); |
|
|
|
|
assert_eq!( |
|
|
|
|
transport.interfaces[0].receive().unwrap(), |
|
|
|
|
Some((vec![239u8, 123], "".to_string())) |
|
|
|
|
); |
|
|
|
|
assert!(transport.interfaces[0].receive() == IFResult::Ok(None)); |
|
|
|
|
transport.send_message(vec![239, 123], Some(peer_id)).unwrap(); |
|
|
|
|
assert_eq!(transport.interfaces[0].receive(), IFResult::Ok(Some((vec![239, 123], interface_targeting_data)))); |
|
|
|
|
transport |
|
|
|
|
.send_message(vec![239, 123], Some(peer_id)) |
|
|
|
|
.unwrap(); |
|
|
|
|
assert_eq!( |
|
|
|
|
transport.interfaces[0].receive(), |
|
|
|
|
IFResult::Ok(Some((vec![239, 123], interface_targeting_data))) |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[test] |
|
|
|
@ -182,8 +246,19 @@ fn test_transport_receiving() {
|
|
|
|
|
let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); |
|
|
|
|
let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data); |
|
|
|
|
transport.send_message(vec![239, 123], None).unwrap(); |
|
|
|
|
assert_eq!(transport.receive(), Some((vec![239u8, 123], transport.get_peer_by_parameters(interface_id.as_str(), "").unwrap().peer_id))); |
|
|
|
|
assert_eq!( |
|
|
|
|
transport.receive(), |
|
|
|
|
Some(( |
|
|
|
|
vec![239u8, 123], |
|
|
|
|
transport |
|
|
|
|
.get_peer_by_parameters(interface_id.as_str(), "") |
|
|
|
|
.unwrap() |
|
|
|
|
.peer_id |
|
|
|
|
)) |
|
|
|
|
); |
|
|
|
|
assert!(transport.receive().is_none()); |
|
|
|
|
transport.send_message(vec![239, 123], Some(peer_id)).unwrap(); |
|
|
|
|
transport |
|
|
|
|
.send_message(vec![239, 123], Some(peer_id)) |
|
|
|
|
.unwrap(); |
|
|
|
|
assert_eq!(transport.receive(), Some((vec![239, 123], peer_id))); |
|
|
|
|
} |
|
|
|
|