Browse Source

Serialization + main loop starter

master
Lev 3 years ago
parent
commit
3a7a320605
  1. 24
      Cargo.lock
  2. 1
      Cargo.toml
  3. 54
      src/interface.rs
  4. 65
      src/interfaces/ip.rs
  5. 28
      src/interfaces/mod.rs
  6. 123
      src/ironforce.rs
  7. 1
      src/lib.rs
  8. 6
      src/res.rs
  9. 157
      src/transport.rs

24
Cargo.lock generated

@ -236,10 +236,17 @@ dependencies = [
"rsa", "rsa",
"serde", "serde",
"serde_cbor", "serde_cbor",
"serde_json",
"sha2", "sha2",
"spin 0.9.2", "spin 0.9.2",
] ]
[[package]]
name = "itoa"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]] [[package]]
name = "lazy_static" name = "lazy_static"
version = "1.4.0" version = "1.4.0"
@ -507,6 +514,12 @@ dependencies = [
"zeroize", "zeroize",
] ]
[[package]]
name = "ryu"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "254df5081ce98661a883445175e52efe99d1cb2a5552891d965d2f5d0cad1c16"
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.1.0" version = "1.1.0"
@ -543,6 +556,17 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "serde_json"
version = "1.0.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0ffa0837f2dfa6fb90868c2b5468cad482e175f7dad97e7421951e663f2b527"
dependencies = [
"itoa",
"ryu",
"serde",
]
[[package]] [[package]]
name = "sha2" name = "sha2"
version = "0.8.2" version = "0.8.2"

1
Cargo.toml

@ -20,6 +20,7 @@ serde = { version = "1.0", features = ["derive", "alloc"], default-features = fa
rayon = { version = "1.5.1", optional = true } rayon = { version = "1.5.1", optional = true }
core-error = "0.0.1-rc4" core-error = "0.0.1-rc4"
serde_cbor = "0.11.2" serde_cbor = "0.11.2"
serde_json = "1.0.72"
spin = "0.9.2" spin = "0.9.2"
[profile.dev.package.num-bigint-dig] [profile.dev.package.num-bigint-dig]

54
src/interface.rs

@ -24,12 +24,15 @@ pub trait Interface: InterfaceRequirements {
/// For systems that don't support concurrency, there can be only one interface in this function waits for a message (to avoid blocking). /// For systems that don't support concurrency, there can be only one interface in this function waits for a message (to avoid blocking).
/// That's why it's necessary to check if it is the case for this interface, and it's done using function `Interface::has_blocking_main()` /// That's why it's necessary to check if it is the case for this interface, and it's done using function `Interface::has_blocking_main()`
fn main_loop_iteration(&mut self) -> IFResult<()>; fn main_loop_iteration(&mut self) -> IFResult<()>;
/// Check if `main_loop_iteration` stops execution and waits for a message /// Check if `main_loop_iteration` stops execution and waits for a message
fn has_blocking_main(&self) -> bool { fn has_blocking_main(&self) -> bool {
false // hopefully... false // hopefully...
} }
/// Get some way of identification for this interface /// Get some way of identification for this interface
fn id(&self) -> &str; fn id(&self) -> &str;
/// Send a message. If no `interface_data` is provided, we should consider it to be a broadcast. /// Send a message. If no `interface_data` is provided, we should consider it to be a broadcast.
/// If, on the other hand, `interface_data` is not `None`, it should be used to send the message to the target. /// If, on the other hand, `interface_data` is not `None`, it should be used to send the message to the target.
fn send( fn send(
@ -37,10 +40,17 @@ pub trait Interface: InterfaceRequirements {
message: &[u8], /*MessageBytes*/ message: &[u8], /*MessageBytes*/
interface_data: Option<TargetingData>, interface_data: Option<TargetingData>,
) -> IFResult<()>; ) -> IFResult<()>;
/// Receive a message through this interface. Returns a result with an option of (message bytes, target). /// Receive a message through this interface. Returns a result with an option of (message bytes, target).
/// `None` means there is no message available at the time. /// `None` means there is no message available at the time.
/// The implementations of this function shouldn't wait for new messages, but /// The implementations of this function shouldn't wait for new messages, but
fn receive(&mut self) -> IFResult<Option<(MessageBytes, TargetingData /*interface data*/)>>; fn receive(&mut self) -> IFResult<Option<(MessageBytes, TargetingData /*interface data*/)>>;
/// Dump the interface to string
fn get_dump_data(&self) -> String;
/// Create the interface from dumped data
fn from_dump(data: String) -> IFResult<Self> where Self: Sized;
} }
#[cfg(test)] #[cfg(test)]
@ -49,10 +59,10 @@ pub mod test_interface {
use crate::message::MessageBytes; use crate::message::MessageBytes;
use crate::res::IFResult; use crate::res::IFResult;
use alloc::string::String; use alloc::string::String;
use alloc::vec::Vec;
use alloc::vec;
use alloc::string::ToString; use alloc::string::ToString;
use alloc::sync::Arc; use alloc::sync::Arc;
use alloc::vec;
use alloc::vec::Vec;
use spin::Mutex; use spin::Mutex;
#[derive(Default)] #[derive(Default)]
@ -80,6 +90,14 @@ pub mod test_interface {
fn receive(&mut self) -> IFResult<Option<(MessageBytes, TargetingData)>> { fn receive(&mut self) -> IFResult<Option<(MessageBytes, TargetingData)>> {
Ok(self.messages.pop()) Ok(self.messages.pop())
} }
fn get_dump_data(&self) -> String {
"".to_string()
}
fn from_dump(_data: String) -> IFResult<Self> {
Ok(Default::default())
}
} }
pub type Storage = Vec<(Vec<u8>, TargetingData)>; pub type Storage = Vec<(Vec<u8>, TargetingData)>;
@ -108,8 +126,7 @@ pub mod test_interface {
} }
fn send(&mut self, message: &[u8], target: Option<TargetingData>) -> IFResult<()> { fn send(&mut self, message: &[u8], target: Option<TargetingData>) -> IFResult<()> {
self self.storage
.storage
.lock() .lock()
.push((Vec::from(message), target.unwrap_or_default())); .push((Vec::from(message), target.unwrap_or_default()));
Ok(()) Ok(())
@ -118,17 +135,31 @@ pub mod test_interface {
fn receive(&mut self) -> IFResult<Option<(MessageBytes, TargetingData)>> { fn receive(&mut self) -> IFResult<Option<(MessageBytes, TargetingData)>> {
Ok(self.messages.pop()) Ok(self.messages.pop())
} }
fn get_dump_data(&self) -> String {
"".to_string()
}
fn from_dump(_data: String) -> IFResult<Self> {
Ok(TestInterface {
this_peer_id: "".to_string(),
storage: Arc::new(Default::default()),
messages: vec![],
})
}
} }
impl InterfaceRequirements for TestInterface {} impl InterfaceRequirements for TestInterface {}
pub fn create_test_interfaces(n: usize) -> Vec<TestInterface> { pub fn create_test_interfaces(n: usize) -> Vec<TestInterface> {
let storage_mutex = Arc::new(Mutex::new(vec![])); let storage_mutex = Arc::new(Mutex::new(vec![]));
(0..n).map(|i| TestInterface { (0..n)
this_peer_id: i.to_string(), .map(|i| TestInterface {
storage: storage_mutex.clone(), this_peer_id: i.to_string(),
messages: vec![], storage: storage_mutex.clone(),
}).collect() messages: vec![],
})
.collect()
} }
#[test] #[test]
@ -136,6 +167,9 @@ pub mod test_interface {
let mut interfaces = create_test_interfaces(2); let mut interfaces = create_test_interfaces(2);
interfaces[0].send(b"123", Some("1".to_string())).unwrap(); interfaces[0].send(b"123", Some("1".to_string())).unwrap();
interfaces[1].main_loop_iteration().unwrap(); interfaces[1].main_loop_iteration().unwrap();
assert_eq!(interfaces[1].receive().unwrap().unwrap().0.as_slice(), b"123"); assert_eq!(
interfaces[1].receive().unwrap().unwrap().0.as_slice(),
b"123"
);
} }
} }

65
src/interfaces/ip.rs

@ -7,6 +7,7 @@ use core::ops::RangeInclusive;
use core::str::FromStr; use core::str::FromStr;
use core::time::Duration; use core::time::Duration;
use rayon::prelude::*; use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::net::TcpStream; use std::net::TcpStream;
use std::{format, net}; use std::{format, net};
@ -16,6 +17,7 @@ use crate::res::{IFError, IFResult};
use crate::std::io::{Read, Write}; use crate::std::io::{Read, Write};
use crate::std::println; use crate::std::println;
pub const DEFAULT_PORT: u16 = 50000;
const SOCKET_RANGE: RangeInclusive<u16> = 50000..=50010; const SOCKET_RANGE: RangeInclusive<u16> = 50000..=50010;
/// The threshold for the number of peers below which we are desperate /// The threshold for the number of peers below which we are desperate
@ -33,6 +35,13 @@ pub struct IPInterface {
main_loop_iterations: u64, main_loop_iterations: u64,
} }
/// Data for the serialization of `IPInterface`
#[derive(Serialize, Deserialize)]
pub struct SerData {
pub peers: Vec<Peer>,
pub port: u16,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct IPPackage { struct IPPackage {
version: u8, version: u8,
@ -67,7 +76,6 @@ impl MessageType {
} }
} }
fn compare_addrs(peer: &Peer, addr: net::SocketAddr) -> bool { fn compare_addrs(peer: &Peer, addr: net::SocketAddr) -> bool {
addr.ip() == peer.0 && addr.port() == peer.1 addr.ip() == peer.0 && addr.port() == peer.1
} }
@ -171,13 +179,31 @@ impl Interface for IPInterface {
self.main_loop_iterations += 1; self.main_loop_iterations += 1;
// Every 50 iterations we connect to everyone we know // Every 50 iterations we connect to everyone we know
if self.main_loop_iterations % 50 == 0 { if self.main_loop_iterations % 50 == 0 {
let connected_addresses = self.connections.iter().filter_map(|conn| conn.peer_addr().ok()).collect::<Vec<_>>(); let connected_addresses = self
let peers_we_do_not_have_connections_with = self.peers.iter().filter(|p| !connected_addresses.iter().any(|addr| compare_addrs(p, *addr))).copied().collect::<Vec<_>>(); .connections
self.connections.extend(IPInterface::get_connections_to_peers(&peers_we_do_not_have_connections_with, self.peers.len() < PEER_THRESHOLD * 2)); .iter()
.filter_map(|conn| conn.peer_addr().ok())
.collect::<Vec<_>>();
let peers_we_do_not_have_connections_with = self
.peers
.iter()
.filter(|p| {
!connected_addresses
.iter()
.any(|addr| compare_addrs(p, *addr))
})
.copied()
.collect::<Vec<_>>();
self.connections
.extend(IPInterface::get_connections_to_peers(
&peers_we_do_not_have_connections_with,
self.peers.len() < PEER_THRESHOLD * 2,
));
} }
// We do a peer exchange every 30 iterations // We do a peer exchange every 30 iterations
if self.main_loop_iterations % 30 == 0 { if self.main_loop_iterations % 30 == 0 {
let connection_index = (self.main_loop_iterations / 30) as usize % self.connections.len(); let connection_index =
(self.main_loop_iterations / 30) as usize % self.connections.len();
IPInterface::request_peers(&mut self.connections[connection_index])?; IPInterface::request_peers(&mut self.connections[connection_index])?;
} }
Ok(()) Ok(())
@ -238,6 +264,35 @@ impl Interface for IPInterface {
None => Ok(None), None => Ok(None),
} }
} }
fn get_dump_data(&self) -> String {
let data = SerData {
peers: self.peers.clone(),
port: self.listener.local_addr().unwrap().port(),
};
serde_json::to_string(&data).unwrap()
}
fn from_dump(data: String) -> IFResult<Self> {
if !data.is_empty() {
let data: SerData = serde_json::from_str(data.as_str()).unwrap();
IPInterface::new(data.port, data.peers)
} else {
let ip_path = std::path::Path::new(".if_ip_peers");
let peers = if ip_path.exists() {
std::fs::read_to_string(ip_path)
.unwrap()
.split('\n')
.filter_map(|line| net::SocketAddr::from_str(line).ok())
.map(|addr| (addr.ip(), addr.port()))
.collect()
} else {
println!("Warning: there are no peers in IP, which makes it essentially useless");
vec![]
};
IPInterface::new(DEFAULT_PORT, peers)
}
}
} }
impl IPInterface { impl IPInterface {

28
src/interfaces/mod.rs

@ -3,7 +3,33 @@ pub mod ip;
use crate::interface::Interface; use crate::interface::Interface;
use alloc::vec; use alloc::vec;
use alloc::vec::Vec;
use alloc::boxed::Box;
use alloc::string::String;
#[cfg(feature = "std")]
use crate::interfaces::ip::IPInterface;
use crate::res::IFResult;
pub fn get_interfaces() -> alloc::vec::Vec<alloc::boxed::Box<dyn Interface>> { #[cfg(not(feature = "std"))]
pub fn get_interfaces() -> Vec<Box<dyn Interface>> {
vec![] vec![]
} }
#[cfg(feature = "std")]
pub fn get_interfaces() -> Vec<Box<dyn Interface>> {
vec![Box::new(IPInterface::from_dump(Default::default()).unwrap())]
}
#[cfg(not(feature = "std"))]
pub fn restore_interfaces(_data: Vec<String>) -> IFResult<Vec<Box<dyn Interface>>> {
Ok(vec![])
}
#[cfg(feature = "std")]
pub fn restore_interfaces(data: Vec<String>) -> IFResult<Vec<Box<dyn Interface>>> {
if data.is_empty() {
Ok(get_interfaces())
} else {
Ok(vec![Box::new(IPInterface::from_dump(data[0].clone())?)])
}
}

123
src/ironforce.rs

@ -1,15 +1,20 @@
use crate::crypto::{Keys, PublicKey}; use crate::crypto::{Keys, PublicKey};
use crate::message::{Message, MessageType, ServiceMessageType}; use crate::message::{Message, MessageType, ServiceMessageType};
use crate::res::{IFError, IFResult}; use crate::res::{IFError, IFResult};
use crate::transport::Transport; use crate::transport::{PeerInfo, Transport};
use crate::tunnel::{Tunnel, TunnelPublic}; use crate::tunnel::{Tunnel, TunnelPublic};
use alloc::collections::BTreeMap; use alloc::collections::BTreeMap;
#[cfg(feature = "std")]
use alloc::string::ToString;
use alloc::vec; use alloc::vec;
use alloc::vec::Vec; use alloc::vec::Vec;
use serde::{Deserialize, Serialize};
#[cfg(feature = "std")] #[cfg(feature = "std")]
use std::println; use std::println;
const TUNNEL_MAX_REPEAT_COUNT: u32 = 3; const TUNNEL_MAX_REPEAT_COUNT: u32 = 3;
#[cfg(feature = "std")]
const DEFAULT_FILE: &str = ".if_data.json";
/// Main worker /// Main worker
pub struct IronForce { pub struct IronForce {
@ -41,6 +46,28 @@ pub struct IronForce {
/// ///
/// Maps tunnel's first local_id to the number /// Maps tunnel's first local_id to the number
tunnel_counters: BTreeMap<u64, u32>, tunnel_counters: BTreeMap<u64, u32>,
/// Auto save
auto_save: bool,
}
/// Data for the serialization of IF
#[derive(Serialize, Deserialize)]
pub struct IFSerializationData {
pub keys: Keys,
pub tunnels: Vec<Tunnel>,
pub peers: Vec<PeerInfo>,
pub interfaces_data: Vec<alloc::string::String>,
}
impl IFSerializationData {
pub fn default() -> IFSerializationData {
IFSerializationData {
keys: Keys::generate(),
tunnels: vec![],
peers: vec![],
interfaces_data: vec![],
}
}
} }
impl IronForce { impl IronForce {
@ -56,6 +83,7 @@ impl IronForce {
has_background_worker: false, has_background_worker: false,
processed_messages: vec![], processed_messages: vec![],
tunnel_counters: Default::default(), tunnel_counters: Default::default(),
auto_save: false,
} }
} }
@ -249,17 +277,28 @@ impl IronForce {
} }
} }
} }
MessageType::SingleCast if message.check_recipient(&self.keys) => self.messages.push(message.clone()), MessageType::SingleCast if message.check_recipient(&self.keys) => {
self.messages.push(message.clone())
}
MessageType::SingleCast => { MessageType::SingleCast => {
if let Some(tunnel) = self.tunnels.iter().find(|tun| tun.id == Some(message.tunnel_id.0)) { if let Some(tunnel) = self
let peer_id = if message.tunnel_id.1 { tunnel.peer_ids.0 } else { tunnel.peer_ids.1 }; .tunnels
self.transport.send_message(serde_cbor::to_vec(&message)?, Some(peer_id))?; .iter()
.find(|tun| tun.id == Some(message.tunnel_id.0))
{
let peer_id = if message.tunnel_id.1 {
tunnel.peer_ids.0
} else {
tunnel.peer_ids.1
};
self.transport
.send_message(serde_cbor::to_vec(&message)?, Some(peer_id))?;
} }
} }
MessageType::Broadcast => { MessageType::Broadcast => {
self.messages.push(message.clone()); self.messages.push(message.clone());
self.send_to_all(message)?; self.send_to_all(message)?;
}, }
} }
Ok(()) Ok(())
} }
@ -289,6 +328,67 @@ impl IronForce {
) )
.unwrap() .unwrap()
} }
pub fn get_serialization_data(&self) -> IFSerializationData {
IFSerializationData {
keys: self.keys.clone(),
tunnels: self.tunnels.clone(),
peers: self.transport.peers.clone(),
interfaces_data: self.transport.get_interfaces_data(),
}
}
pub fn from_serialization_data(data: IFSerializationData) -> IFResult<Self> {
Ok(Self {
keys: data.keys,
transport: Transport::restore(data.peers.clone(), data.interfaces_data.clone())?,
tunnels: data.tunnels,
additional_modules: vec![],
messages: vec![],
tunnels_pending: vec![],
has_background_worker: false,
processed_messages: vec![],
tunnel_counters: Default::default(),
auto_save: true,
})
}
#[cfg(feature = "std")]
pub fn save_to_file(&self, filename: Option<alloc::string::String>) -> IFResult<()> {
std::fs::write(
filename.unwrap_or_else(|| DEFAULT_FILE.to_string()),
serde_json::to_string(&self.get_serialization_data())?,
)?;
Ok(())
}
#[cfg(feature = "std")]
pub fn launch_main_loop(
mut self,
sleep_millis: u64,
) -> (
std::thread::JoinHandle<!>,
std::sync::Arc<std::sync::Mutex<Self>>,
) {
self.has_background_worker = true;
let container = std::sync::Arc::new(std::sync::Mutex::new(self));
let container_clone = container.clone();
let thread = std::thread::spawn(move || {
let mut counter: u64 = 0;
loop {
match container_clone.lock().unwrap().main_loop_iteration() {
Ok(_) => {}
Err(e) => println!("An error happened in the main loop: {:?}", e),
}
counter += 1;
std::thread::sleep(std::time::Duration::from_millis(sleep_millis));
if counter % 50 == 0 {
container_clone.lock().unwrap().save_to_file(None).unwrap()
}
}
});
(thread, container)
}
} }
#[cfg(test)] #[cfg(test)]
@ -319,6 +419,7 @@ mod if_testing {
has_background_worker: false, has_background_worker: false,
processed_messages: vec![], processed_messages: vec![],
tunnel_counters: Default::default(), tunnel_counters: Default::default(),
auto_save: false,
}) })
.collect() .collect()
} }
@ -371,15 +472,15 @@ mod if_testing {
#[cfg(feature = "std")] #[cfg(feature = "std")]
mod test_with_ip { mod test_with_ip {
use crate::crypto::Keys; use crate::crypto::Keys;
use crate::interfaces::ip::create_test_interfaces;
use crate::ironforce::IronForce; use crate::ironforce::IronForce;
use crate::message::{Message, MessageType};
use crate::res::IFResult; use crate::res::IFResult;
use crate::transport::Transport; use crate::transport::Transport;
use alloc::boxed::Box; use alloc::boxed::Box;
use alloc::vec; use alloc::vec;
use alloc::vec::Vec; use alloc::vec::Vec;
use std::println; use std::println;
use crate::interfaces::ip::create_test_interfaces;
use crate::message::{Message, MessageType};
// fn create_test_interfaces(n: usize) -> impl Iterator<Item = IPInterface> { // fn create_test_interfaces(n: usize) -> impl Iterator<Item = IPInterface> {
// let ip_addr = std::net::IpAddr::from_str("127.0.0.1").unwrap(); // let ip_addr = std::net::IpAddr::from_str("127.0.0.1").unwrap();
@ -410,7 +511,8 @@ mod test_with_ip {
tunnels_pending: vec![], tunnels_pending: vec![],
has_background_worker: false, has_background_worker: false,
processed_messages: vec![], processed_messages: vec![],
tunnel_counters: Default::default() tunnel_counters: Default::default(),
auto_save: false,
}) })
.collect() .collect()
} }
@ -475,7 +577,8 @@ mod test_with_ip {
.recipient(&key_1) .recipient(&key_1)
.sign(&node0_keys) .sign(&node0_keys)
.build()?, .build()?,
&key_1)?; &key_1,
)?;
let t2 = std::thread::spawn(move || { let t2 = std::thread::spawn(move || {
for _ in 0..18 { for _ in 0..18 {
node1.main_loop_iteration().unwrap(); node1.main_loop_iteration().unwrap();

1
src/lib.rs

@ -1,6 +1,7 @@
#![no_std] #![no_std]
#![allow(dead_code)] #![allow(dead_code)]
#![feature(trait_alias)] #![feature(trait_alias)]
#![feature(never_type)]
#[cfg(feature = "std")] #[cfg(feature = "std")]
extern crate std; extern crate std;

6
src/res.rs

@ -40,6 +40,12 @@ impl From<serde_cbor::Error> for IFError {
} }
} }
impl From<serde_json::Error> for IFError {
fn from(e: serde_json::Error) -> Self {
Self::SerializationError(format!("{:?}", e))
}
}
impl From<rsa::errors::Error> for IFError { impl From<rsa::errors::Error> for IFError {
fn from(e: rsa::errors::Error) -> Self { fn from(e: rsa::errors::Error) -> Self {
Self::CryptoError(format!("{:?}", e)) Self::CryptoError(format!("{:?}", e))

157
src/transport.rs

@ -1,19 +1,19 @@
use alloc::boxed::Box;
use alloc::string::{String, ToString};
use alloc::vec::Vec;
use alloc::vec;
use crate::interface::{Interface, TargetingData}; use crate::interface::{Interface, TargetingData};
use crate::message::MessageBytes; use crate::message::MessageBytes;
use crate::res::IFResult; use crate::res::IFResult;
use alloc::boxed::Box;
use alloc::string::{String, ToString};
use alloc::vec;
use alloc::vec::Vec;
use serde::{Deserialize, Serialize};
#[cfg(feature = "std")] #[cfg(feature = "std")]
use rayon::prelude::*; use rayon::prelude::*;
#[cfg(feature = "std")] #[cfg(feature = "std")]
use std::println; use std::println;
/// An identification of a peer - something that we can use to send a message to id /// An identification of a peer - something that we can use to send a message to id
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct PeerInfo { pub struct PeerInfo {
/// Something to locally identify this peer /// Something to locally identify this peer
pub peer_id: u64, pub peer_id: u64,
@ -33,31 +33,47 @@ impl PeerInfo {
} }
} }
/// The struct that manages all the communication with peers /// The struct that manages all the communication with peers
pub struct Transport { pub struct Transport {
pub interfaces: Vec<Box<dyn Interface>>, pub interfaces: Vec<Box<dyn Interface>>,
peers: Vec<PeerInfo>, pub(crate) peers: Vec<PeerInfo>,
} }
impl Transport { impl Transport {
/// Create new transport with given interfaces /// Create new transport with given interfaces
pub fn new(interfaces: Vec<Box<dyn Interface>>) -> Self { pub fn new(interfaces: Vec<Box<dyn Interface>>) -> Self {
#[cfg(not(feature = "std"))] #[cfg(not(feature = "std"))]
if interfaces.iter().map(|interface| interface.has_blocking_main() as u8).sum::<u8>() > 1 { if interfaces
.iter()
.map(|interface| interface.has_blocking_main() as u8)
.sum::<u8>()
> 1
{
panic!("There is two interfaces with blocking main loops and we have no threads because this is no_std!"); panic!("There is two interfaces with blocking main loops and we have no threads because this is no_std!");
} }
Self { interfaces, peers: vec![] } Self {
interfaces,
peers: vec![],
}
} }
/// Find a peer in `self.peers` by its id /// Find a peer in `self.peers` by its id
fn get_peer_by_id(&self, peer_id: u64) -> Option<PeerInfo> { fn get_peer_by_id(&self, peer_id: u64) -> Option<PeerInfo> {
self.peers.iter().find(|peer| peer.peer_id == peer_id).cloned() self.peers
.iter()
.find(|peer| peer.peer_id == peer_id)
.cloned()
} }
/// Try to find a peer in `self.peers` by interface_id and targeting data /// Try to find a peer in `self.peers` by interface_id and targeting data
fn get_peer_by_parameters(&self, interface_id: &str, data: &str /*&TargetingData*/) -> Option<&PeerInfo> { fn get_peer_by_parameters(
self.peers.iter().find(|peer| peer.interface_id == interface_id && peer.interface_targeting_data == data) &self,
interface_id: &str,
data: &str, /*&TargetingData*/
) -> Option<&PeerInfo> {
self.peers
.iter()
.find(|peer| peer.interface_id == interface_id && peer.interface_targeting_data == data)
} }
/// Insert a new peer into out database and return its id or find an existing one with these parameters /// Insert a new peer into out database and return its id or find an existing one with these parameters
@ -69,32 +85,42 @@ impl Transport {
self.peers.push(new_peer); self.peers.push(new_peer);
peer_id peer_id
} }
Some(peer) => peer.peer_id Some(peer) => peer.peer_id,
} }
} }
/// Get interface index by its ID /// Get interface index by its ID
fn interface_index_by_id(&self, interface_id: &str) -> usize { fn interface_index_by_id(&self, interface_id: &str) -> usize {
self.interfaces.iter().position(|interface| interface.id() == interface_id).unwrap_or_else(|| panic!("Invalid interface id")) self.interfaces
.iter()
.position(|interface| interface.id() == interface_id)
.unwrap_or_else(|| panic!("Invalid interface id"))
} }
/// Send message bytes to a peer if data is provided or broadcast the data if `peer == None` /// Send message bytes to a peer if data is provided or broadcast the data if `peer == None`
pub fn send_message(&mut self, message: MessageBytes, peer_id: Option<u64>) -> IFResult<()> { pub fn send_message(&mut self, message: MessageBytes, peer_id: Option<u64>) -> IFResult<()> {
let peer = if let Some(peer_id) = peer_id { self.get_peer_by_id(peer_id) } else { None }; let peer = if let Some(peer_id) = peer_id {
self.get_peer_by_id(peer_id)
} else {
None
};
match peer { match peer {
// Broadcast // Broadcast
None => { None => {
#[cfg(not(feature = "std"))] #[cfg(not(feature = "std"))]
{ {
for interface in &mut self.interfaces { for interface in &mut self.interfaces {
interface.send(&message, None)?; interface.send(&message, None)?;
}
} }
}
// If we have concurrency, we will do it concurrently // If we have concurrency, we will do it concurrently
#[cfg(feature = "std")] #[cfg(feature = "std")]
{ {
self.interfaces.par_iter_mut().map(|interface| interface.send(&message, None)).for_each(drop); self.interfaces
} .par_iter_mut()
.map(|interface| interface.send(&message, None))
.for_each(drop);
}
Ok(()) Ok(())
} }
// Singlecast // Singlecast
@ -109,7 +135,8 @@ impl Transport {
/// ///
/// Returns a result with an option of `(message, peer_id)` /// Returns a result with an option of `(message, peer_id)`
pub fn receive(&mut self) -> Option<(MessageBytes, u64 /* peer id*/)> { pub fn receive(&mut self) -> Option<(MessageBytes, u64 /* peer id*/)> {
if let Some((interface_id, (msg, peer_data))) = self.interfaces if let Some((interface_id, (msg, peer_data))) = self
.interfaces
.iter_mut() .iter_mut()
// For each interface return (interface id, message result) // For each interface return (interface id, message result)
.map(|interface| (interface.id().to_string(), interface.receive())) .map(|interface| (interface.id().to_string(), interface.receive()))
@ -120,30 +147,53 @@ impl Transport {
println!("An error occurred while receiving: {:?}", e); println!("An error occurred while receiving: {:?}", e);
(id, Err(e)) (id, Err(e))
} }
(id, Ok(r)) => (id, Ok(r)) (id, Ok(r)) => (id, Ok(r)),
}) })
// Find a result where there is a message // Find a result where there is a message
.find(|r| matches!(r, (_, Ok(Some(_))))) .find(|r| matches!(r, (_, Ok(Some(_)))))
// Safely unwrap this result (we already matched `Ok(Some(_))`) // Safely unwrap this result (we already matched `Ok(Some(_))`)
.map(|(id, r)| (id, r.unwrap().unwrap())) { .map(|(id, r)| (id, r.unwrap().unwrap()))
{
Some((msg, self.find_or_add_peer(interface_id, peer_data))) Some((msg, self.find_or_add_peer(interface_id, peer_data)))
} else { None } } else {
None
}
} }
/// Run one iteration of the main loop /// Run one iteration of the main loop
pub fn main_loop_iteration(&mut self) -> IFResult<()> { pub fn main_loop_iteration(&mut self) -> IFResult<()> {
#[cfg(feature = "std")] #[cfg(feature = "std")]
self.interfaces.par_iter_mut().map(|interface| interface.main_loop_iteration()).collect::<IFResult<_>>()?; self.interfaces
.par_iter_mut()
.map(|interface| interface.main_loop_iteration())
.collect::<IFResult<_>>()?;
#[cfg(not(feature = "std"))] #[cfg(not(feature = "std"))]
{ {
self.interfaces.iter_mut().try_for_each(|interface| if !interface.has_blocking_main() { interface.main_loop_iteration() } else { Ok(()) })?; self.interfaces.iter_mut().try_for_each(|interface| {
let blocking_interface_index = self.interfaces.iter().position(|interface| interface.has_blocking_main()); if !interface.has_blocking_main() {
if let Some(ind) = blocking_interface_index { interface.main_loop_iteration()
self.interfaces[ind].main_loop_iteration()?; } else {
Ok(())
} }
})?;
let blocking_interface_index = self
.interfaces
.iter()
.position(|interface| interface.has_blocking_main());
if let Some(ind) = blocking_interface_index {
self.interfaces[ind].main_loop_iteration()?;
} }
}
Ok(()) Ok(())
} }
pub fn get_interfaces_data(&self) -> Vec<String> {
self.interfaces.iter().map(|interface| interface.get_dump_data()).collect()
}
pub fn restore(peers: Vec<PeerInfo>, interfaces_data: Vec<String>) -> IFResult<Self> {
Ok(Transport { interfaces: crate::interfaces::restore_interfaces(interfaces_data)?, peers })
}
} }
#[cfg(test)] #[cfg(test)]
@ -153,14 +203,20 @@ use crate::interface::test_interface::SimpleTestInterface;
fn test_adding_peer_to_transport() { fn test_adding_peer_to_transport() {
let mut transport = Transport::new(vec![Box::new(SimpleTestInterface::default())]); let mut transport = Transport::new(vec![Box::new(SimpleTestInterface::default())]);
let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string());
assert!(transport.get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str()).is_none()); assert!(transport
let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data.clone()); .get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str())
.is_none());
let peer_id =
transport.find_or_add_peer(interface_id.clone(), interface_targeting_data.clone());
let peer = PeerInfo { let peer = PeerInfo {
peer_id, peer_id,
interface_id: interface_id.clone(), interface_id: interface_id.clone(),
interface_targeting_data: interface_targeting_data.clone(), interface_targeting_data: interface_targeting_data.clone(),
}; };
assert_eq!(transport.get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str()), Some(&peer)); assert_eq!(
transport.get_peer_by_parameters(interface_id.as_str(), interface_targeting_data.as_str()),
Some(&peer)
);
assert_eq!(transport.get_peer_by_id(peer_id), Some(peer)); assert_eq!(transport.get_peer_by_id(peer_id), Some(peer));
} }
@ -170,10 +226,18 @@ fn test_transport_sending() {
let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string());
let peer_id = transport.find_or_add_peer(interface_id, interface_targeting_data.clone()); let peer_id = transport.find_or_add_peer(interface_id, interface_targeting_data.clone());
transport.send_message(vec![239, 123], None).unwrap(); transport.send_message(vec![239, 123], None).unwrap();
assert_eq!(transport.interfaces[0].receive().unwrap(), Some((vec![239u8, 123], "".to_string()))); assert_eq!(
transport.interfaces[0].receive().unwrap(),
Some((vec![239u8, 123], "".to_string()))
);
assert!(transport.interfaces[0].receive() == IFResult::Ok(None)); assert!(transport.interfaces[0].receive() == IFResult::Ok(None));
transport.send_message(vec![239, 123], Some(peer_id)).unwrap(); transport
assert_eq!(transport.interfaces[0].receive(), IFResult::Ok(Some((vec![239, 123], interface_targeting_data)))); .send_message(vec![239, 123], Some(peer_id))
.unwrap();
assert_eq!(
transport.interfaces[0].receive(),
IFResult::Ok(Some((vec![239, 123], interface_targeting_data)))
);
} }
#[test] #[test]
@ -182,8 +246,19 @@ fn test_transport_receiving() {
let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string());
let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data); let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data);
transport.send_message(vec![239, 123], None).unwrap(); transport.send_message(vec![239, 123], None).unwrap();
assert_eq!(transport.receive(), Some((vec![239u8, 123], transport.get_peer_by_parameters(interface_id.as_str(), "").unwrap().peer_id))); assert_eq!(
transport.receive(),
Some((
vec![239u8, 123],
transport
.get_peer_by_parameters(interface_id.as_str(), "")
.unwrap()
.peer_id
))
);
assert!(transport.receive().is_none()); assert!(transport.receive().is_none());
transport.send_message(vec![239, 123], Some(peer_id)).unwrap(); transport
.send_message(vec![239, 123], Some(peer_id))
.unwrap();
assert_eq!(transport.receive(), Some((vec![239, 123], peer_id))); assert_eq!(transport.receive(), Some((vec![239, 123], peer_id)));
} }

Loading…
Cancel
Save