From 445b285b799e152f175e0e372cb835cb2c2c9c17 Mon Sep 17 00:00:00 2001 From: Shlyakhtun Prokhor Date: Fri, 3 Dec 2021 09:28:43 +0300 Subject: [PATCH] Created IPPackage header. Added receive queue --- src/interfaces/ip.rs | 121 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 100 insertions(+), 21 deletions(-) diff --git a/src/interfaces/ip.rs b/src/interfaces/ip.rs index e4e35e8..e4e833c 100644 --- a/src/interfaces/ip.rs +++ b/src/interfaces/ip.rs @@ -10,6 +10,7 @@ use core::str::from_utf8; use core::time::Duration; use crate::interface::{Interface, InterfaceRequirements, TargetingData}; +use crate::interfaces::ip::MessageType::PeerRequest; use crate::message::MessageBytes; use crate::res::{IFError, IFResult}; use crate::res::IFError::General; @@ -24,6 +25,41 @@ pub struct IPInterface { connections: Vec, listener: net::TcpListener, peers: Vec, + package_queue: Vec +} + +#[derive(Debug)] +struct IPPackage { + version: u8, + package_type: MessageType, + size: u32, + message: MessageBytes, +} + +#[derive(Debug)] +enum MessageType { + Service, + PeerRequest, + Common, +} + +impl MessageType { + fn from_u8(id: u8) -> IFResult { + match id { + 0 => Ok(MessageType::Service), + 1 => Ok(MessageType::PeerRequest), + 2 => Ok(MessageType::Common), + _ => Err(IFError::General("Incorrect message type".to_string())) + } + } + + fn to_u8(&self) -> u8 { + match self { + MessageType::Service => 0, + MessageType::PeerRequest => 1, + MessageType::Common => 2 + } + } } impl InterfaceRequirements for IPInterface {} @@ -43,16 +79,18 @@ impl Interface for IPInterface { println!("Hello from mainloop"); for mut connection in &self.connections { - let mut size_arr: [u8; 4] = [0, 0, 0, 0]; - connection.read_exact(&mut size_arr)?; - let mut size = bytes_to_size(size_arr) as u32; - + let mut header: [u8; 6] = [0, 0, 0, 0, 0, 0]; + connection.read_exact(&mut header)?; + let version = header[0]; + let package_type = MessageType::from_u8(header[1])?; + let size = bytes_to_size([header[2], header[3], header[4], header[5]]); let mut message_take = connection.take(size as u64); let mut message: Vec = vec![]; message_take.read_to_end(&mut message); - println!("Size: {:?}", size); - println!("Message: {:?}", from_utf8(&*message)); + + let package = IPPackage {version, package_type, size, message}; + self.package_queue.push(package); } Ok(()) @@ -77,7 +115,7 @@ impl Interface for IPInterface { None => { let connection = match net::TcpStream::connect_timeout( &addr, - Duration::new(1, 0) + Duration::new(1, 0), ) { Ok(connection) => connection, Err(_) => return Err(General(String::from("Can't connect to this port"))) @@ -91,12 +129,21 @@ impl Interface for IPInterface { self.connections[index].set_write_timeout(Some(Duration::new(1, 0)))?; - self.connections[index].write_all(&size_to_bytes(message.len()))?; - self.connections[index].write_all(message)?; + IPInterface::send_package(&mut self.connections[index], IPPackage { + version: 0, + package_type: MessageType::Common, + size: message.len() as u32, + message: Vec::from(message), + }); println!("Sent message"); Ok(()) } - fn receive(&mut self) -> IFResult> { todo!() } + fn receive(&mut self) -> IFResult> { + match self.package_queue.pop() { + Some(ip_package) => Ok(Some( (ip_package.message, "".to_string()) )), + None => Ok(None) + } + } } impl IPInterface { @@ -115,6 +162,7 @@ impl IPInterface { connections: vec!(), listener, peers: vec!(), + package_queue: vec!() }) } pub fn dump(&self) -> IFResult> { @@ -126,8 +174,22 @@ impl IPInterface { self.peers = peers; Ok(()) } + + fn send_package(stream: &mut net::TcpStream, package: IPPackage) -> IFResult<()> { + let mut header: Vec = vec![]; + header.push(package.version); + header.push(package.package_type.to_u8()); + for byte in size_to_bytes(package.size) { + header.push(byte); + } + stream.write_all(&*header)?; + stream.write_all(&*package.message)?; + + Ok(()) + } } + fn create_tcp_listener() -> Option { for port in SOCKET_RANGE { match net::TcpListener::bind("127.0.0.1:".to_owned() + &port.to_string()) { @@ -138,9 +200,17 @@ fn create_tcp_listener() -> Option { None } +fn parse_header(data: Vec) -> IFResult { + Ok(IPPackage { + version: data[0], + package_type: MessageType::from_u8(data[1])?, + size: bytes_to_size([data[3], data[4], data[5], data[6]]), + message: vec![], + }) +} -fn size_to_bytes(mut a: usize) -> [u8; 4] { +fn size_to_bytes(mut a: u32) -> [u8; 4] { let mut arr: [u8; 4] = [0, 0, 0, 0]; for i in [3, 2, 1, 0] { arr[i] = (a % 256) as u8; @@ -149,12 +219,12 @@ fn size_to_bytes(mut a: usize) -> [u8; 4] { arr } -fn bytes_to_size(arr: [u8; 4]) -> usize { +fn bytes_to_size(arr: [u8; 4]) -> u32 { let mut size = 0; for size_byte in &arr { size = size * 256 + *size_byte as u32; } - size as usize + size } #[test] @@ -165,26 +235,35 @@ fn test_creating_connection() -> IFResult<()> { let mut interface2 = IPInterface::new()?; let t1 = std::thread::spawn(move || { - interface1.send(&message, Some(String::from("127.0.0.1:50000"))) + interface1.send(&message, Some(String::from("127.0.0.1:50001"))); + interface1 }); thread::sleep(Duration::from_millis(10)); let t2 = std::thread::spawn(move || { - interface2.main_loop_iteration() + interface2.main_loop_iteration(); + interface2 }); let res1 = t1.join(); match res1 { - Ok(res) => { + Ok(mut res) => { println!("Thread Ok"); - println!("interface1: {:?}", res) - }, + + + } Err(e) => println!("{:?}", e) } let res2 = t2.join(); match res2 { - Ok(res) => { + Ok(mut res) => { println!("Thread Ok"); - println!("interface2: {:?}", res) - }, + match res.receive() { + Ok(tmp) => match tmp { + Some((message, metadata)) => println!("Received {:?}", message), + None => println!("None") + } + Err(e) => println!("{:?}", e) + } + } Err(e) => println!("{:?}", e) } Ok(())