From 9da108486ce01ffe2709f7b6dd74c115e7308769 Mon Sep 17 00:00:00 2001 From: Alexey Date: Tue, 30 Nov 2021 19:37:28 +0300 Subject: [PATCH] Created IPInterface constructor. Made nonblocking mainloop in IP --- examples/test_ip_connection.rs | 47 ++++------ src/interfaces/ip.rs | 165 +++++++++++++++++++++++---------- src/lib.rs | 2 +- 3 files changed, 134 insertions(+), 80 deletions(-) diff --git a/examples/test_ip_connection.rs b/examples/test_ip_connection.rs index 5620d74..9a31824 100644 --- a/examples/test_ip_connection.rs +++ b/examples/test_ip_connection.rs @@ -4,53 +4,44 @@ use ironforce::interface::Interface; use ironforce::interfaces::ip::IPInterface; #[cfg(feature = "std")] use std::net; +use std::thread; +use std::time::Duration; +#[cfg(feature = "std")] +use ironforce::res::IFResult; #[cfg(feature = "std")] fn main() { - let mut listener = match net::TcpListener::bind("0.0.0.0:60000") { - Ok(tmp) => tmp, - Err(_) => { - return; - } - }; - let mut interface1 = IPInterface { - id: String::from("IP interface"), - connections: vec![], - listener, - }; + println!("hello"); + test(); +} - let option_listener = net::TcpListener::bind("0.0.0.0:50000"); - let mut listener = match option_listener { - Ok(tmp) => tmp, - Err(_) => { - return; - } - }; - let mut interface2 = IPInterface { - id: String::from("IP interface"), - connections: vec![], - listener, - }; +fn test() -> IFResult<()> { + let message = *b"Hello world from iron forest"; + + let mut interface1 = IPInterface::new()?; + let mut interface2 = IPInterface::new()?; let t1 = std::thread::spawn(move || { - interface1.send(&[0, 0, 1, 1], Some(String::from("0.0.0.0:50000"))); + interface1.send(&message, Some(String::from("127.0.0.1:50001"))).unwrap(); interface1 }); + thread::sleep(Duration::from_millis(10)); let t2 = std::thread::spawn(move || { - interface2.main_loop_iteration(); + + interface2.main_loop_iteration().unwrap(); interface2 }); let res1 = t1.join(); match res1 { Ok(_) => println!("Ok"), - Err(e) => println!("{:?}", e), + Err(e) => println!("{:?}", e) } let res2 = t2.join(); match res2 { Ok(_) => println!("Ok"), - Err(e) => println!("{:?}", e), + Err(e) => println!("{:?}", e) } - // res1.unwrap(); + Ok(()) } #[cfg(not(feature = "std"))] diff --git a/src/interfaces/ip.rs b/src/interfaces/ip.rs index eb969ce..e4e35e8 100644 --- a/src/interfaces/ip.rs +++ b/src/interfaces/ip.rs @@ -1,19 +1,29 @@ +use alloc::borrow::ToOwned; use std::net; +use std::thread; +use alloc::vec; use alloc::vec::Vec; -use alloc::string::String; +use alloc::string::{String, ToString}; +use core::cmp::min; +use core::ops::RangeInclusive; +use core::str::from_utf8; +use core::time::Duration; use crate::interface::{Interface, InterfaceRequirements, TargetingData}; use crate::message::MessageBytes; use crate::res::{IFError, IFResult}; +use crate::res::IFError::General; use crate::std::io::{Read, Write}; use crate::std::println; +const SOCKET_RANGE: RangeInclusive = 50000..=50010; /// Interface for interactions using tcp sockets pub struct IPInterface { - pub id: String, - pub connections: Vec, - pub listener: net::TcpListener, + id: String, + connections: Vec, + listener: net::TcpListener, + peers: Vec, } impl InterfaceRequirements for IPInterface {} @@ -21,41 +31,35 @@ impl InterfaceRequirements for IPInterface {} impl Interface for IPInterface { fn main_loop_iteration(&mut self) -> IFResult<()> { println!("Mainloop {:?}", self.listener.local_addr()); + println!("Incoming connections: {:?}", self.listener.incoming()); + match self.listener.accept() { Ok((stream, addr)) => { println!("New client: {:?}", addr); self.connections.push(stream) } - Err(e) => println!("couldn't get client: {:?}", e), + Err(e) => return Err(IFError::General(String::from("No incoming connection"))), } - // for stream in self.listener.incoming() { - // println!("Accepting stream..."); - // - // match stream { - // Ok(stream) => { - // self.connections.push(stream); - // println!("Stream accepted"); - // } - // Err(e) => { println!("Accepting failed"); return Err(IFError::General(String::from("Stream error"))); } - // } - // println!("End of cycle") - // } + println!("Hello from mainloop"); for mut connection in &self.connections { let mut size_arr: [u8; 4] = [0, 0, 0, 0]; connection.read_exact(&mut size_arr)?; - let mut size: u32 = 0; - for size_byte in &size_arr { - size = size * 256 + *size_byte as u32; - } + let mut size = bytes_to_size(size_arr) as u32; + + + let mut message_take = connection.take(size as u64); + let mut message: Vec = vec![]; + message_take.read_to_end(&mut message); println!("Size: {:?}", size); + println!("Message: {:?}", from_utf8(&*message)); } Ok(()) } fn has_blocking_main(&self) -> bool { - true + false } fn id(&self) -> &str { &*self.id @@ -63,18 +67,31 @@ impl Interface for IPInterface { fn send(&mut self, message: &[u8], interface_data: Option) -> IFResult<()> { let addr: net::SocketAddr = match interface_data { Some(ip_string) => ip_string.parse().expect("Unable to parse address"), - None => return Err(IFError::General(String::from("Not enough info to create connection"))) + None => return Err(IFError::General(String::from("Not enough info to create connections"))) }; println!("Connecting to {:?}", addr); + + let index = match self.connections.iter().position(|connection| connection.peer_addr().ok() == Some(addr)) { None => { - self.connections.push(net::TcpStream::connect(addr)?); + let connection = match net::TcpStream::connect_timeout( + &addr, + Duration::new(1, 0) + ) { + Ok(connection) => connection, + Err(_) => return Err(General(String::from("Can't connect to this port"))) + }; + self.connections.push(connection); self.connections.len() - 1 } Some(i) => i }; println!("Sending message to {:?}", self.connections[index].peer_addr().unwrap()); + + self.connections[index].set_write_timeout(Some(Duration::new(1, 0)))?; + + self.connections[index].write_all(&size_to_bytes(message.len()))?; self.connections[index].write_all(message)?; println!("Sent message"); Ok(()) @@ -82,49 +99,95 @@ impl Interface for IPInterface { fn receive(&mut self) -> IFResult> { todo!() } } -#[cfg(test)] -use alloc::vec; +impl IPInterface { + pub fn new() -> IFResult { + let mut listener = match create_tcp_listener() { + Some(listener) => listener, + None => { + return Err(IFError::General(String::from("Unable to open TCP listener"))); + } + }; -#[test] -fn test_creating_connection() { - let listener = match net::TcpListener::bind("0.0.0.0:60000") { - Ok(tmp) => tmp, - Err(_) => { return; } - }; - let mut interface1 = IPInterface { - id: String::from("IP interface"), - connections: vec![], - listener, - }; + listener.set_nonblocking(true)?; - let listener = match net::TcpListener::bind("0.0.0.0:50000") { - Ok(tmp) => tmp, - Err(_) => { return; } - }; - let mut interface2 = IPInterface { - id: String::from("IP interface"), - connections: vec![], - listener, + Ok(IPInterface { + id: String::from("IP Interface"), + connections: vec!(), + listener, + peers: vec!(), + }) + } + pub fn dump(&self) -> IFResult> { + Ok(serde_cbor::to_vec(&self.peers)?) + } + + pub fn load(&mut self, data: Vec) -> IFResult<()> { + let peers: Vec = serde_cbor::from_slice(&data)?; + self.peers = peers; + Ok(()) + } +} + +fn create_tcp_listener() -> Option { + for port in SOCKET_RANGE { + match net::TcpListener::bind("127.0.0.1:".to_owned() + &port.to_string()) { + Ok(listener) => return Some(listener), + Err(e) => {} + } }; + None +} + + + +fn size_to_bytes(mut a: usize) -> [u8; 4] { + let mut arr: [u8; 4] = [0, 0, 0, 0]; + for i in [3, 2, 1, 0] { + arr[i] = (a % 256) as u8; + a /= 256; + } + arr +} + +fn bytes_to_size(arr: [u8; 4]) -> usize { + let mut size = 0; + for size_byte in &arr { + size = size * 256 + *size_byte as u32; + } + size as usize +} + +#[test] +fn test_creating_connection() -> IFResult<()> { + let message = *b"Hello world from iron forest"; + + let mut interface1 = IPInterface::new()?; + let mut interface2 = IPInterface::new()?; let t1 = std::thread::spawn(move || { - interface1.send(&[0, 0, 1, 1], Some(String::from("0.0.0.0:50000"))).unwrap(); - interface1 + interface1.send(&message, Some(String::from("127.0.0.1:50000"))) }); + thread::sleep(Duration::from_millis(10)); let t2 = std::thread::spawn(move || { - interface2.main_loop_iteration().unwrap(); - interface2 + interface2.main_loop_iteration() }); let res1 = t1.join(); match res1 { - Ok(_) => println!("Ok"), + Ok(res) => { + println!("Thread Ok"); + println!("interface1: {:?}", res) + }, Err(e) => println!("{:?}", e) } let res2 = t2.join(); match res2 { - Ok(_) => println!("Ok"), + Ok(res) => { + println!("Thread Ok"); + println!("interface2: {:?}", res) + }, Err(e) => println!("{:?}", e) } + Ok(()) } #[cfg(test)] diff --git a/src/lib.rs b/src/lib.rs index 7fe605d..5131dd2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,7 @@ mod message; mod transport; pub mod interface; pub mod interfaces; -mod res; +pub mod res; mod tunnel; #[cfg(std)]