|
|
|
@ -3,16 +3,15 @@ use alloc::string::{String, ToString};
|
|
|
|
|
use alloc::vec; |
|
|
|
|
use alloc::vec::Vec; |
|
|
|
|
use core::ops::RangeInclusive; |
|
|
|
|
#[cfg(test)] |
|
|
|
|
use core::str::FromStr; |
|
|
|
|
use core::time::Duration; |
|
|
|
|
use rayon::prelude::*; |
|
|
|
|
use std::net; |
|
|
|
|
use std::net::TcpStream; |
|
|
|
|
#[cfg(test)] |
|
|
|
|
use std::thread; |
|
|
|
|
use std::{format, net}; |
|
|
|
|
|
|
|
|
|
use crate::interface::{Interface, InterfaceRequirements, TargetingData}; |
|
|
|
|
use crate::message::MessageBytes; |
|
|
|
|
use crate::res::IFError::General; |
|
|
|
|
use crate::res::{IFError, IFResult}; |
|
|
|
|
use crate::std::io::{Read, Write}; |
|
|
|
|
use crate::std::println; |
|
|
|
@ -30,10 +29,10 @@ pub struct IPInterface {
|
|
|
|
|
connections: Vec<net::TcpStream>, |
|
|
|
|
listener: net::TcpListener, |
|
|
|
|
peers: Vec<Peer>, |
|
|
|
|
package_queue: Vec<IPPackage>, |
|
|
|
|
package_queue: Vec<(IPPackage, String /* from_peer */)>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[derive(Debug)] |
|
|
|
|
#[derive(Debug, Clone)] |
|
|
|
|
struct IPPackage { |
|
|
|
|
version: u8, |
|
|
|
|
package_type: MessageType, |
|
|
|
@ -41,7 +40,7 @@ struct IPPackage {
|
|
|
|
|
message: MessageBytes, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[derive(Debug)] |
|
|
|
|
#[derive(Debug, Copy, Clone)] |
|
|
|
|
enum MessageType { |
|
|
|
|
Common, |
|
|
|
|
PeerRequest, |
|
|
|
@ -58,7 +57,7 @@ impl MessageType {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn to_u8(&self) -> u8 { |
|
|
|
|
fn as_u8(&self) -> u8 { |
|
|
|
|
match self { |
|
|
|
|
MessageType::Common => 0, |
|
|
|
|
MessageType::PeerRequest => 1, |
|
|
|
@ -71,28 +70,47 @@ impl InterfaceRequirements for IPInterface {}
|
|
|
|
|
|
|
|
|
|
impl Interface for IPInterface { |
|
|
|
|
fn main_loop_iteration(&mut self) -> IFResult<()> { |
|
|
|
|
println!("Mainloop {:?}", self.listener.local_addr()); |
|
|
|
|
println!("Incoming connections: {:?}", self.listener.incoming()); |
|
|
|
|
|
|
|
|
|
match self.listener.accept() { |
|
|
|
|
Ok((stream, addr)) => { |
|
|
|
|
println!("New client: {:?}", addr); |
|
|
|
|
if self.peers.iter().all(|(ip, _)| *ip != addr.ip()) { |
|
|
|
|
self.peers.push((addr.ip(), addr.port())); |
|
|
|
|
if let Some(conn) = self.listener.incoming().next() { |
|
|
|
|
match conn { |
|
|
|
|
Ok(stream) => { |
|
|
|
|
stream.set_nonblocking(true)?; |
|
|
|
|
let addr = stream.peer_addr()?; |
|
|
|
|
println!( |
|
|
|
|
"({:?}): New client: {:?}", |
|
|
|
|
addr, |
|
|
|
|
self.listener.local_addr().unwrap() |
|
|
|
|
); |
|
|
|
|
if self.peers.iter().all(|(ip, _)| *ip != addr.ip()) { |
|
|
|
|
self.peers.push((addr.ip(), addr.port())); |
|
|
|
|
} |
|
|
|
|
self.connections.push(stream) |
|
|
|
|
} |
|
|
|
|
self.connections.push(stream) |
|
|
|
|
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {} |
|
|
|
|
Err(e) => return Err(IFError::from(e)), |
|
|
|
|
} |
|
|
|
|
Err(_e) => return Err(IFError::General(String::from("No incoming connection"))), |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
println!("Hello from mainloop"); |
|
|
|
|
let mut new_connections: Vec<TcpStream> = vec![]; |
|
|
|
|
for connection in &mut self.connections { |
|
|
|
|
connection.set_nonblocking(true)?; |
|
|
|
|
let mut header: [u8; 6] = [0, 0, 0, 0, 0, 0]; |
|
|
|
|
connection.read_exact(&mut header)?; |
|
|
|
|
match connection.read_exact(&mut header) { |
|
|
|
|
Ok(_) => {} |
|
|
|
|
Err(ref e) |
|
|
|
|
if e.kind() == std::io::ErrorKind::WouldBlock |
|
|
|
|
|| e.kind() == std::io::ErrorKind::UnexpectedEof => |
|
|
|
|
{ |
|
|
|
|
continue
|
|
|
|
|
} |
|
|
|
|
Err(e) => { |
|
|
|
|
println!("Error: {:?}", e); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
let version = header[0]; |
|
|
|
|
let package_type = MessageType::from_u8(header[1])?; |
|
|
|
|
let size = bytes_to_size([header[2], header[3], header[4], header[5]]); |
|
|
|
|
connection.set_nonblocking(false)?; |
|
|
|
|
connection.set_read_timeout(Some(std::time::Duration::from_millis(500)))?; |
|
|
|
|
|
|
|
|
|
let mut message_take = connection.take(size as u64); |
|
|
|
|
let mut message: Vec<u8> = vec![]; |
|
|
|
@ -110,7 +128,7 @@ impl Interface for IPInterface {
|
|
|
|
|
connection, |
|
|
|
|
IPPackage { |
|
|
|
|
version, |
|
|
|
|
package_type, |
|
|
|
|
package_type: MessageType::PeersShared, |
|
|
|
|
size: message.len() as u32, |
|
|
|
|
message, |
|
|
|
|
}, |
|
|
|
@ -123,7 +141,8 @@ impl Interface for IPInterface {
|
|
|
|
|
size, |
|
|
|
|
message, |
|
|
|
|
}; |
|
|
|
|
self.package_queue.push(package); |
|
|
|
|
self.package_queue |
|
|
|
|
.push((package, format!("{:?}", connection.peer_addr()?))); |
|
|
|
|
} |
|
|
|
|
MessageType::PeersShared => { |
|
|
|
|
let peers: Vec<Peer> = serde_cbor::from_slice(message.as_slice())?; |
|
|
|
@ -153,62 +172,57 @@ impl Interface for IPInterface {
|
|
|
|
|
&*self.id |
|
|
|
|
} |
|
|
|
|
fn send(&mut self, message: &[u8], interface_data: Option<TargetingData>) -> IFResult<()> { |
|
|
|
|
let addr: net::SocketAddr = match interface_data { |
|
|
|
|
Some(ip_string) => ip_string.parse().expect("Unable to parse address"), |
|
|
|
|
None => { |
|
|
|
|
return Err(IFError::General(String::from( |
|
|
|
|
"Not enough info to create connections", |
|
|
|
|
))) |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
println!( |
|
|
|
|
"({:?}): Connecting to {:?} to send a message", |
|
|
|
|
self.listener.local_addr().unwrap(), |
|
|
|
|
interface_data |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
println!("Connecting to {:?}", addr); |
|
|
|
|
let package = IPPackage { |
|
|
|
|
version: 0, |
|
|
|
|
package_type: MessageType::Common, |
|
|
|
|
size: message.len() as u32, |
|
|
|
|
message: Vec::from(message), |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
let index = match self |
|
|
|
|
.connections |
|
|
|
|
.iter() |
|
|
|
|
.position(|connection| connection.peer_addr().ok() == Some(addr)) |
|
|
|
|
{ |
|
|
|
|
match interface_data { |
|
|
|
|
Some(ip_string) => { |
|
|
|
|
let addr: net::SocketAddr = ip_string.parse().expect("Unable to parse address"); |
|
|
|
|
let index = self.obtain_connection(&(addr.ip(), addr.port()))?; |
|
|
|
|
println!( |
|
|
|
|
"({:?}): We have a connection to {:?}", |
|
|
|
|
self.listener.local_addr().unwrap(), |
|
|
|
|
self.connections[index].peer_addr().unwrap() |
|
|
|
|
); |
|
|
|
|
IPInterface::send_package(&mut self.connections[index], package)?; |
|
|
|
|
} |
|
|
|
|
None => { |
|
|
|
|
let connection = match net::TcpStream::connect_timeout(&addr, Duration::new(1, 0)) { |
|
|
|
|
Ok(connection) => connection, |
|
|
|
|
Err(_) => return Err(General(String::from("Can't connect to this port"))), |
|
|
|
|
}; |
|
|
|
|
self.connections.push(connection); |
|
|
|
|
self.connections.len() - 1 |
|
|
|
|
for conn in &mut self.connections { |
|
|
|
|
IPInterface::send_package(conn, package.clone())?; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Some(i) => i, |
|
|
|
|
}; |
|
|
|
|
println!( |
|
|
|
|
"Sending message to {:?}", |
|
|
|
|
self.connections[index].peer_addr().unwrap() |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
self.connections[index].set_write_timeout(Some(Duration::new(1, 0)))?; |
|
|
|
|
|
|
|
|
|
IPInterface::send_package( |
|
|
|
|
&mut self.connections[index], |
|
|
|
|
IPPackage { |
|
|
|
|
version: 0, |
|
|
|
|
package_type: MessageType::Common, |
|
|
|
|
size: message.len() as u32, |
|
|
|
|
message: Vec::from(message), |
|
|
|
|
}, |
|
|
|
|
)?; |
|
|
|
|
println!("Sent message"); |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
fn receive(&mut self) -> IFResult<Option<(MessageBytes, TargetingData)>> { |
|
|
|
|
if !self.package_queue.is_empty() { |
|
|
|
|
println!( |
|
|
|
|
"({:?}): New message from {}", |
|
|
|
|
self.listener.local_addr().unwrap(), |
|
|
|
|
self.package_queue.last().unwrap().1 |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
match self.package_queue.pop() { |
|
|
|
|
Some(ip_package) => Ok(Some((ip_package.message, "".to_string()))), |
|
|
|
|
Some((ip_package, data)) => Ok(Some((ip_package.message, data))), |
|
|
|
|
None => Ok(None), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl IPInterface { |
|
|
|
|
pub fn new(peers: Vec<Peer>) -> IFResult<Self> { |
|
|
|
|
let listener = match create_tcp_listener() { |
|
|
|
|
pub fn new(port: u16, peers: Vec<Peer>) -> IFResult<Self> { |
|
|
|
|
let listener = match create_tcp_listener(port) { |
|
|
|
|
Some(listener) => listener, |
|
|
|
|
None => { |
|
|
|
|
return Err(IFError::General(String::from( |
|
|
|
@ -250,7 +264,10 @@ impl IPInterface {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn send_package(stream: &mut net::TcpStream, package: IPPackage) -> IFResult<()> { |
|
|
|
|
let mut header: Vec<u8> = vec![package.version, package.package_type.to_u8()]; |
|
|
|
|
stream.set_write_timeout(Some(std::time::Duration::from_millis(700)))?; |
|
|
|
|
#[cfg(test)] |
|
|
|
|
stream.set_nonblocking(false)?; |
|
|
|
|
let mut header: Vec<u8> = vec![package.version, package.package_type.as_u8()]; |
|
|
|
|
for byte in size_to_bytes(package.size) { |
|
|
|
|
header.push(byte); |
|
|
|
|
} |
|
|
|
@ -280,22 +297,26 @@ impl IPInterface {
|
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn obtain_connection(&mut self, addr: &Peer) -> IFResult<()> { |
|
|
|
|
if self |
|
|
|
|
.connections |
|
|
|
|
.iter() |
|
|
|
|
.any(|con| con.peer_addr().is_ok() && con.peer_addr().unwrap().ip() == addr.0) |
|
|
|
|
{ |
|
|
|
|
return Ok(()); |
|
|
|
|
fn obtain_connection(&mut self, addr: &Peer) -> IFResult<usize> { |
|
|
|
|
fn compare_addrs(peer: &Peer, addr: net::SocketAddr) -> bool { |
|
|
|
|
addr.ip() == peer.0 && addr.port() == peer.1 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if let Some(pos) = self.connections.iter().position(|con| { |
|
|
|
|
con.peer_addr().is_ok() && compare_addrs(addr, con.peer_addr().unwrap()) |
|
|
|
|
}) { |
|
|
|
|
return Ok(pos); |
|
|
|
|
} |
|
|
|
|
if let Some(conn) = Self::new_connection(addr)? { |
|
|
|
|
self.connections.push(conn) |
|
|
|
|
self.connections.push(conn); |
|
|
|
|
Ok(self.connections.len() - 1) |
|
|
|
|
} else { |
|
|
|
|
Err(IFError::CouldNotConnect) |
|
|
|
|
} |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn new_connection(addr: &Peer) -> IFResult<Option<TcpStream>> { |
|
|
|
|
for port in addr.1 - 5..addr.1 + 5 { |
|
|
|
|
for port in addr.1..addr.1 + 3 { |
|
|
|
|
match net::TcpStream::connect_timeout( |
|
|
|
|
&net::SocketAddr::new(addr.0, port as u16), |
|
|
|
|
Duration::from_millis(500), |
|
|
|
@ -310,8 +331,8 @@ impl IPInterface {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn create_tcp_listener() -> Option<net::TcpListener> { |
|
|
|
|
for port in SOCKET_RANGE { |
|
|
|
|
fn create_tcp_listener(port: u16) -> Option<net::TcpListener> { |
|
|
|
|
for port in port..port + 5 { |
|
|
|
|
match net::TcpListener::bind("127.0.0.1:".to_owned() + &port.to_string()) { |
|
|
|
|
Ok(listener) => return Some(listener), |
|
|
|
|
Err(_e) => {} |
|
|
|
@ -348,22 +369,25 @@ fn bytes_to_size(arr: [u8; 4]) -> u32 {
|
|
|
|
|
|
|
|
|
|
#[test] |
|
|
|
|
fn test_creating_connection() -> IFResult<()> { |
|
|
|
|
let message = *b"Hello world from iron forest"; |
|
|
|
|
let message = *b"Hello world from ironforest"; |
|
|
|
|
let original_msg_copy = message; |
|
|
|
|
|
|
|
|
|
let mut interface1 = IPInterface::new(vec![])?; |
|
|
|
|
let mut interface2 = IPInterface::new(vec![])?; |
|
|
|
|
let mut interface1 = IPInterface::new(50000, vec![])?; |
|
|
|
|
let mut interface2 = IPInterface::new(50001, vec![])?; |
|
|
|
|
|
|
|
|
|
let t2 = std::thread::spawn(move || { |
|
|
|
|
for _ in 0..30 { |
|
|
|
|
interface2.main_loop_iteration().unwrap(); |
|
|
|
|
std::thread::sleep(std::time::Duration::from_millis(150)); |
|
|
|
|
} |
|
|
|
|
interface2 |
|
|
|
|
}); |
|
|
|
|
let t1 = std::thread::spawn(move || { |
|
|
|
|
interface1 |
|
|
|
|
.send(&message, Some(String::from("127.0.0.1:50001"))) |
|
|
|
|
.unwrap(); |
|
|
|
|
interface1 |
|
|
|
|
}); |
|
|
|
|
thread::sleep(Duration::from_millis(10)); |
|
|
|
|
let t2 = std::thread::spawn(move || { |
|
|
|
|
interface2.main_loop_iteration().unwrap(); |
|
|
|
|
interface2 |
|
|
|
|
}); |
|
|
|
|
let res1 = t1.join(); |
|
|
|
|
match res1 { |
|
|
|
|
Ok(_res) => { |
|
|
|
@ -377,8 +401,14 @@ fn test_creating_connection() -> IFResult<()> {
|
|
|
|
|
println!("Thread Ok"); |
|
|
|
|
match res.receive() { |
|
|
|
|
Ok(tmp) => match tmp { |
|
|
|
|
Some((message, _metadata)) => println!("Received {:?}", message), |
|
|
|
|
None => println!("None"), |
|
|
|
|
Some((message, _metadata)) => { |
|
|
|
|
println!("Received {:?}", message); |
|
|
|
|
assert_eq!(message, original_msg_copy) |
|
|
|
|
} |
|
|
|
|
None => { |
|
|
|
|
println!("None"); |
|
|
|
|
panic!(); |
|
|
|
|
} |
|
|
|
|
}, |
|
|
|
|
Err(e) => println!("{:?}", e), |
|
|
|
|
} |
|
|
|
@ -389,4 +419,17 @@ fn test_creating_connection() -> IFResult<()> {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[cfg(test)] |
|
|
|
|
pub mod test_ip_interface {} |
|
|
|
|
fn create_test_interfaces(n: usize) -> impl Iterator<Item = IPInterface> { |
|
|
|
|
let ip_addr = std::net::IpAddr::from_str("127.0.0.1").unwrap(); |
|
|
|
|
(0..n).map(move |i| { |
|
|
|
|
IPInterface::new( |
|
|
|
|
(5000 + 5 * i) as u16, |
|
|
|
|
// (0..n)
|
|
|
|
|
// .filter(|j| *j != i)
|
|
|
|
|
// .map(|j| (ip_addr, (5000 + 5 * j) as u16))
|
|
|
|
|
// .collect(),
|
|
|
|
|
vec![(ip_addr, (5000 + 5 * ((i + 5) % n)) as u16)], |
|
|
|
|
) |
|
|
|
|
.unwrap() |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|