IronForce is a decentralized network, Degeon is a messenger built on it
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

188 lines
8.0 KiB

use alloc::boxed::Box;
use alloc::string::{String, ToString};
use alloc::vec::Vec;
use alloc::vec;
use crate::interface::{Interface, TargetingData};
use crate::message::MessageBytes;
use crate::res::IFResult;
#[cfg(std)]
use rayon::prelude::*;
/// An identification of a peer - something that we can use to send a message to id
#[derive(Clone, Debug, PartialEq)]
pub struct PeerInfo {
/// Something to locally identify this peer
pub peer_id: u64,
/// ID of the interface through which we communicate with this peer
pub interface_id: String,
/// Data that should be passed to the interface to send a message to this peer
pub interface_targeting_data: TargetingData,
}
impl PeerInfo {
fn new(interface_id: String, interface_targeting_data: TargetingData) -> Self {
Self {
peer_id: rand::random(),
interface_id,
interface_targeting_data,
}
}
}
/// The struct that manages all the communication with peers
pub struct Transport {
pub interfaces: Vec<Box<dyn Interface>>,
peers: Vec<PeerInfo>,
}
impl Transport {
/// Create new transport with given interfaces
pub fn new(interfaces: Vec<Box<dyn Interface>>) -> Self {
#[cfg(not(std))]
if interfaces.iter().map(|interface| interface.has_blocking_main() as u8).sum::<u8>() > 1 {
panic!("There is two interfaces with blocking main loops and we have no threads because this is no_std!");
}
Self { interfaces, peers: vec![] }
}
/// Find a peer in `self.peers` by its id
fn get_peer_by_id(&self, peer_id: u64) -> Option<PeerInfo> {
self.peers.iter().find(|peer| peer.peer_id == peer_id).cloned()
}
/// Try to find a peer in `self.peers` by interface_id and targeting data
fn get_peer_by_parameters(&self, interface_id: &str, data: &str /*&TargetingData*/) -> Option<&PeerInfo> {
self.peers.iter().find(|peer| peer.interface_id == interface_id && peer.interface_targeting_data == data)
}
/// Insert a new peer into out database and return its id or find an existing one with these parameters
fn find_or_add_peer(&mut self, interface_id: String, data: TargetingData) -> u64 {
match self.get_peer_by_parameters(interface_id.as_str(), &data) {
None => {
let new_peer = PeerInfo::new(interface_id, data);
let peer_id = new_peer.peer_id;
self.peers.push(new_peer);
peer_id
}
Some(peer) => peer.peer_id
}
}
/// Get interface index by its ID
fn interface_index_by_id(&self, interface_id: &str) -> usize {
self.interfaces.iter().position(|interface| interface.id() == interface_id).unwrap_or_else(|| panic!("Invalid interface id"))
}
/// Send message bytes to a peer if data is provided or broadcast the data if `peer == None`
pub fn send_message(&mut self, message: MessageBytes, peer_id: Option<u64>) -> IFResult<()> {
let peer = if let Some(peer_id) = peer_id { self.get_peer_by_id(peer_id) } else { None };
match peer {
// Broadcast
None => {
#[cfg(not(std))]
{
for interface in &mut self.interfaces {
interface.send(&message, None)?;
}
}
// If we have concurrency, we will do it concurrently
#[cfg(std)]
{
self.interfaces.par_iter_mut().map(|interface| interface.send(&message, None)).for_each(drop);
}
Ok(())
}
// Singlecast
Some(peer) => {
let interface_ind = self.interface_index_by_id(peer.interface_id.as_str());
self.interfaces[interface_ind].send(&message, Some(peer.interface_targeting_data))
}
}
}
/// Poll all the interfaces and receive a message
///
/// Returns a result with an option of `(message, peer_id)`
pub fn receive(&mut self) -> Option<(MessageBytes, u64 /* peer id*/)> {
if let Some((interface_id, (msg, peer_data))) = self.interfaces
.iter_mut()
// For each interface return (interface id, message result)
.map(|interface| (interface.id().to_string(), interface.receive()))
// If there was an error, print it
.map(|res| match res {
(id, Err(e)) => {
#[cfg(std)]
println!("An error occurred while receiving: {:?}", e);
(id, Err(e))
}
(id, Ok(r)) => (id, Ok(r))
})
// Find a result where there is a message
.find(|r| matches!(r, (_, Ok(Some(_)))))
// Safely unwrap this result (we already matched `Ok(Some(_))`)
.map(|(id, r)| (id, r.unwrap().unwrap())) {
Some((msg, self.find_or_add_peer(interface_id, peer_data)))
} else { None }
}
/// Run one iteration of the main loop
pub fn main_loop_iteration(&mut self) -> IFResult<()> {
#[cfg(std)]
self.interfaces.par_iter_mut().map(|interface| interface.main_loop_iteration()).collect::<IFResult<_>>()?;
#[cfg(not(std))]
{
self.interfaces.iter_mut().map(|interface| if !interface.has_blocking_main() { interface.main_loop_iteration() } else { Ok(()) }).collect::<IFResult<_>>()?;
let blocking_interface_index = self.interfaces.iter().position(|interface| interface.has_blocking_main());
if let Some(ind) = blocking_interface_index {
self.interfaces[ind].main_loop_iteration()?;
}
}
Ok(())
}
}
#[cfg(test)]
use crate::interface::test_interface::TestInterface;
#[test]
fn test_adding_peer_to_transport() {
let mut transport = Transport::new(vec![Box::new(TestInterface::default())]);
let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string());
assert!(transport.get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str()).is_none());
let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data.clone());
let peer = PeerInfo {
peer_id,
interface_id: interface_id.clone(),
interface_targeting_data: interface_targeting_data.clone(),
};
assert_eq!(transport.get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str()), Some(&peer));
assert_eq!(transport.get_peer_by_id(peer_id), Some(peer));
}
#[test]
fn test_transport_sending() {
let mut transport = Transport::new(vec![Box::new(TestInterface::default())]);
let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string());
let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data.clone());
transport.send_message(vec![239, 123], None).unwrap();
assert_eq!(transport.interfaces[0].receive().unwrap(), Some((vec![239u8, 123], "".to_string())));
assert!(transport.interfaces[0].receive() == IFResult::Ok(None));
transport.send_message(vec![239, 123], Some(peer_id)).unwrap();
assert_eq!(transport.interfaces[0].receive(), IFResult::Ok(Some((vec![239, 123], interface_targeting_data))));
}
#[test]
fn test_transport_receiving() {
let mut transport = Transport::new(vec![Box::new(TestInterface::default())]);
let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string());
let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data.clone());
transport.send_message(vec![239, 123], None).unwrap();
assert_eq!(transport.receive(), Some((vec![239u8, 123], transport.get_peer_by_parameters(interface_id.as_str(), "").unwrap().peer_id)));
assert!(transport.receive().is_none());
transport.send_message(vec![239, 123], Some(peer_id)).unwrap();
assert_eq!(transport.receive(), Some((vec![239, 123], peer_id)));
}