From 3e61c2731726fc1ccd359b287df44053e9b736b2 Mon Sep 17 00:00:00 2001 From: ennucore Date: Mon, 6 Dec 2021 16:25:17 +0300 Subject: [PATCH] Peer sharing in IP --- src/interfaces/ip.rs | 146 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 126 insertions(+), 20 deletions(-) diff --git a/src/interfaces/ip.rs b/src/interfaces/ip.rs index 990c310..b5532aa 100644 --- a/src/interfaces/ip.rs +++ b/src/interfaces/ip.rs @@ -4,7 +4,9 @@ use alloc::vec; use alloc::vec::Vec; use core::ops::RangeInclusive; use core::time::Duration; +use rayon::prelude::*; use std::net; +use std::net::TcpStream; #[cfg(test)] use std::thread; @@ -15,7 +17,10 @@ use crate::res::{IFError, IFResult}; use crate::std::io::{Read, Write}; use crate::std::println; -const SOCKET_RANGE: RangeInclusive = 50000..=50010; +const SOCKET_RANGE: RangeInclusive = 50000..=50010; + +/// The threshold for the number of peers below which we are desperate +const PEER_THRESHOLD: usize = 70; /// Interface for interactions using tcp sockets pub struct IPInterface { @@ -36,26 +41,26 @@ struct IPPackage { #[derive(Debug)] enum MessageType { - Service, - PeerRequest, Common, + PeerRequest, + PeersShared, } impl MessageType { fn from_u8(id: u8) -> IFResult { match id { - 0 => Ok(MessageType::Service), + 0 => Ok(MessageType::Common), 1 => Ok(MessageType::PeerRequest), - 2 => Ok(MessageType::Common), + 2 => Ok(MessageType::PeersShared), _ => Err(IFError::General("Incorrect message type".to_string())), } } fn to_u8(&self) -> u8 { match self { - MessageType::Service => 0, + MessageType::Common => 0, MessageType::PeerRequest => 1, - MessageType::Common => 2, + MessageType::PeersShared => 2, } } } @@ -70,13 +75,17 @@ impl Interface for IPInterface { match self.listener.accept() { Ok((stream, addr)) => { println!("New client: {:?}", addr); + if !self.peers.contains(&addr.ip()) { + self.peers.push(addr.ip()); + } self.connections.push(stream) } Err(_e) => return Err(IFError::General(String::from("No incoming connection"))), } println!("Hello from mainloop"); - for mut connection in &self.connections { + let mut new_connections: Vec = vec![]; + for connection in &mut self.connections { let mut header: [u8; 6] = [0, 0, 0, 0, 0, 0]; connection.read_exact(&mut header)?; let version = header[0]; @@ -87,14 +96,50 @@ impl Interface for IPInterface { let mut message: Vec = vec![]; message_take.read_to_end(&mut message)?; - let package = IPPackage { - version, - package_type, - size, - message, - }; - self.package_queue.push(package); + match package_type { + MessageType::PeerRequest => { + let peers_to_share = if self.peers.len() < PEER_THRESHOLD { + self.peers.clone() + } else { + self.peers.iter().skip(7).step_by(2).cloned().collect() + }; + let message = serde_cbor::to_vec(&peers_to_share)?; + IPInterface::send_package( + connection, + IPPackage { + version, + package_type, + size: message.len() as u32, + message, + }, + )?; + } + MessageType::Common => { + let package = IPPackage { + version, + package_type, + size, + message, + }; + self.package_queue.push(package); + } + MessageType::PeersShared => { + let peers: Vec = serde_cbor::from_slice(message.as_slice())?; + for peer in peers { + if !self.peers.contains(&peer) { + if let Some(conn) = IPInterface::new_connection(&peer)? { + new_connections.push(conn) + } + self.peers.push(peer); + } + } + } + } + } + for conn in new_connections.iter_mut() { + self.initialize_connection(conn)?; } + self.connections.extend(new_connections); Ok(()) } @@ -160,7 +205,7 @@ impl Interface for IPInterface { } impl IPInterface { - pub fn new() -> IFResult { + pub fn new(peers: Vec) -> IFResult { let listener = match create_tcp_listener() { Some(listener) => listener, None => { @@ -172,11 +217,23 @@ impl IPInterface { listener.set_nonblocking(true)?; + let connections = peers + .par_iter() + .map(Self::new_connection) + .filter_map(|r| r.ok()) + .filter_map(|r| r) + .map(|mut c| -> IFResult { + Self::request_peers(&mut c)?; + Ok(c) + }) + .filter_map(|r| r.ok()) + .collect::>(); + Ok(IPInterface { id: String::from("IP Interface"), - connections: vec![], + connections, listener, - peers: vec![], + peers, package_queue: vec![], }) } @@ -200,6 +257,55 @@ impl IPInterface { Ok(()) } + + fn initialize_connection(&self, conn: &mut TcpStream) -> IFResult<()> { + if self.peers.len() < PEER_THRESHOLD { + Self::request_peers(conn)?; + } + Ok(()) + } + + fn request_peers(conn: &mut TcpStream) -> IFResult<()> { + IPInterface::send_package( + conn, + IPPackage { + version: 0, + package_type: MessageType::PeerRequest, + size: 0, + message: vec![], + }, + )?; + Ok(()) + } + + fn obtain_connection(&mut self, addr: &net::IpAddr) -> IFResult<()> { + if self + .connections + .iter() + .any(|con| con.peer_addr().is_ok() && &con.peer_addr().unwrap().ip() == addr) + { + return Ok(()); + } + if let Some(conn) = Self::new_connection(addr)? { + self.connections.push(conn) + } + Ok(()) + } + + fn new_connection(addr: &net::IpAddr) -> IFResult> { + for port in SOCKET_RANGE { + match net::TcpStream::connect_timeout( + &net::SocketAddr::new(*addr, port as u16), + Duration::from_millis(500), + ) { + Ok(connection) => { + return Ok(Some(connection)); + } + Err(_) => continue, + }; + } + Ok(None) + } } fn create_tcp_listener() -> Option { @@ -242,8 +348,8 @@ fn bytes_to_size(arr: [u8; 4]) -> u32 { fn test_creating_connection() -> IFResult<()> { let message = *b"Hello world from iron forest"; - let mut interface1 = IPInterface::new()?; - let mut interface2 = IPInterface::new()?; + let mut interface1 = IPInterface::new(vec![])?; + let mut interface2 = IPInterface::new(vec![])?; let t1 = std::thread::spawn(move || { interface1