Browse Source

Peer sharing in IP

interface-2
Lev 3 years ago
parent
commit
aa15911403
  1. 132
      src/interfaces/ip.rs

132
src/interfaces/ip.rs

@ -4,7 +4,9 @@ use alloc::vec;
use alloc::vec::Vec;
use core::ops::RangeInclusive;
use core::time::Duration;
use rayon::prelude::*;
use std::net;
use std::net::TcpStream;
#[cfg(test)]
use std::thread;
@ -15,7 +17,10 @@ use crate::res::{IFError, IFResult};
use crate::std::io::{Read, Write};
use crate::std::println;
const SOCKET_RANGE: RangeInclusive<i32> = 50000..=50010;
const SOCKET_RANGE: RangeInclusive<u16> = 50000..=50010;
/// The threshold for the number of peers below which we are desperate
const PEER_THRESHOLD: usize = 70;
/// Interface for interactions using tcp sockets
pub struct IPInterface {
@ -36,26 +41,26 @@ struct IPPackage {
#[derive(Debug)]
enum MessageType {
Service,
PeerRequest,
Common,
PeerRequest,
PeersShared,
}
impl MessageType {
fn from_u8(id: u8) -> IFResult<MessageType> {
match id {
0 => Ok(MessageType::Service),
0 => Ok(MessageType::Common),
1 => Ok(MessageType::PeerRequest),
2 => Ok(MessageType::Common),
2 => Ok(MessageType::PeersShared),
_ => Err(IFError::General("Incorrect message type".to_string())),
}
}
fn to_u8(&self) -> u8 {
match self {
MessageType::Service => 0,
MessageType::Common => 0,
MessageType::PeerRequest => 1,
MessageType::Common => 2,
MessageType::PeersShared => 2,
}
}
}
@ -70,13 +75,17 @@ impl Interface for IPInterface {
match self.listener.accept() {
Ok((stream, addr)) => {
println!("New client: {:?}", addr);
if !self.peers.contains(&addr.ip()) {
self.peers.push(addr.ip());
}
self.connections.push(stream)
}
Err(_e) => return Err(IFError::General(String::from("No incoming connection"))),
}
println!("Hello from mainloop");
for mut connection in &self.connections {
let mut new_connections: Vec<TcpStream> = vec![];
for connection in &mut self.connections {
let mut header: [u8; 6] = [0, 0, 0, 0, 0, 0];
connection.read_exact(&mut header)?;
let version = header[0];
@ -87,6 +96,25 @@ impl Interface for IPInterface {
let mut message: Vec<u8> = vec![];
message_take.read_to_end(&mut message)?;
match package_type {
MessageType::PeerRequest => {
let peers_to_share = if self.peers.len() < PEER_THRESHOLD {
self.peers.clone()
} else {
self.peers.iter().skip(7).step_by(2).cloned().collect()
};
let message = serde_cbor::to_vec(&peers_to_share)?;
IPInterface::send_package(
connection,
IPPackage {
version,
package_type,
size: message.len() as u32,
message,
},
)?;
}
MessageType::Common => {
let package = IPPackage {
version,
package_type,
@ -95,6 +123,23 @@ impl Interface for IPInterface {
};
self.package_queue.push(package);
}
MessageType::PeersShared => {
let peers: Vec<net::IpAddr> = serde_cbor::from_slice(message.as_slice())?;
for peer in peers {
if !self.peers.contains(&peer) {
if let Some(conn) = IPInterface::new_connection(&peer)? {
new_connections.push(conn)
}
self.peers.push(peer);
}
}
}
}
}
for conn in new_connections.iter_mut() {
self.initialize_connection(conn)?;
}
self.connections.extend(new_connections);
Ok(())
}
@ -160,7 +205,7 @@ impl Interface for IPInterface {
}
impl IPInterface {
pub fn new() -> IFResult<Self> {
pub fn new(peers: Vec<net::IpAddr>) -> IFResult<Self> {
let listener = match create_tcp_listener() {
Some(listener) => listener,
None => {
@ -172,11 +217,23 @@ impl IPInterface {
listener.set_nonblocking(true)?;
let connections = peers
.par_iter()
.map(Self::new_connection)
.filter_map(|r| r.ok())
.filter_map(|r| r)
.map(|mut c| -> IFResult<TcpStream> {
Self::request_peers(&mut c)?;
Ok(c)
})
.filter_map(|r| r.ok())
.collect::<Vec<_>>();
Ok(IPInterface {
id: String::from("IP Interface"),
connections: vec![],
connections,
listener,
peers: vec![],
peers,
package_queue: vec![],
})
}
@ -200,6 +257,55 @@ impl IPInterface {
Ok(())
}
fn initialize_connection(&self, conn: &mut TcpStream) -> IFResult<()> {
if self.peers.len() < PEER_THRESHOLD {
Self::request_peers(conn)?;
}
Ok(())
}
fn request_peers(conn: &mut TcpStream) -> IFResult<()> {
IPInterface::send_package(
conn,
IPPackage {
version: 0,
package_type: MessageType::PeerRequest,
size: 0,
message: vec![],
},
)?;
Ok(())
}
fn obtain_connection(&mut self, addr: &net::IpAddr) -> IFResult<()> {
if self
.connections
.iter()
.any(|con| con.peer_addr().is_ok() && &con.peer_addr().unwrap().ip() == addr)
{
return Ok(());
}
if let Some(conn) = Self::new_connection(addr)? {
self.connections.push(conn)
}
Ok(())
}
fn new_connection(addr: &net::IpAddr) -> IFResult<Option<TcpStream>> {
for port in SOCKET_RANGE {
match net::TcpStream::connect_timeout(
&net::SocketAddr::new(*addr, port as u16),
Duration::from_millis(500),
) {
Ok(connection) => {
return Ok(Some(connection));
}
Err(_) => continue,
};
}
Ok(None)
}
}
fn create_tcp_listener() -> Option<net::TcpListener> {
@ -242,8 +348,8 @@ fn bytes_to_size(arr: [u8; 4]) -> u32 {
fn test_creating_connection() -> IFResult<()> {
let message = *b"Hello world from iron forest";
let mut interface1 = IPInterface::new()?;
let mut interface2 = IPInterface::new()?;
let mut interface1 = IPInterface::new(vec![])?;
let mut interface2 = IPInterface::new(vec![])?;
let t1 = std::thread::spawn(move || {
interface1

Loading…
Cancel
Save