Browse Source

Created IPPackage header. Added receive queue

master
Prokhor 3 years ago committed by ennucore
parent
commit
445b285b79
  1. 121
      src/interfaces/ip.rs

121
src/interfaces/ip.rs

@ -10,6 +10,7 @@ use core::str::from_utf8;
use core::time::Duration; use core::time::Duration;
use crate::interface::{Interface, InterfaceRequirements, TargetingData}; use crate::interface::{Interface, InterfaceRequirements, TargetingData};
use crate::interfaces::ip::MessageType::PeerRequest;
use crate::message::MessageBytes; use crate::message::MessageBytes;
use crate::res::{IFError, IFResult}; use crate::res::{IFError, IFResult};
use crate::res::IFError::General; use crate::res::IFError::General;
@ -24,6 +25,41 @@ pub struct IPInterface {
connections: Vec<net::TcpStream>, connections: Vec<net::TcpStream>,
listener: net::TcpListener, listener: net::TcpListener,
peers: Vec<net::IpAddr>, peers: Vec<net::IpAddr>,
package_queue: Vec<IPPackage>
}
#[derive(Debug)]
struct IPPackage {
version: u8,
package_type: MessageType,
size: u32,
message: MessageBytes,
}
#[derive(Debug)]
enum MessageType {
Service,
PeerRequest,
Common,
}
impl MessageType {
fn from_u8(id: u8) -> IFResult<MessageType> {
match id {
0 => Ok(MessageType::Service),
1 => Ok(MessageType::PeerRequest),
2 => Ok(MessageType::Common),
_ => Err(IFError::General("Incorrect message type".to_string()))
}
}
fn to_u8(&self) -> u8 {
match self {
MessageType::Service => 0,
MessageType::PeerRequest => 1,
MessageType::Common => 2
}
}
} }
impl InterfaceRequirements for IPInterface {} impl InterfaceRequirements for IPInterface {}
@ -43,16 +79,18 @@ impl Interface for IPInterface {
println!("Hello from mainloop"); println!("Hello from mainloop");
for mut connection in &self.connections { for mut connection in &self.connections {
let mut size_arr: [u8; 4] = [0, 0, 0, 0]; let mut header: [u8; 6] = [0, 0, 0, 0, 0, 0];
connection.read_exact(&mut size_arr)?; connection.read_exact(&mut header)?;
let mut size = bytes_to_size(size_arr) as u32; let version = header[0];
let package_type = MessageType::from_u8(header[1])?;
let size = bytes_to_size([header[2], header[3], header[4], header[5]]);
let mut message_take = connection.take(size as u64); let mut message_take = connection.take(size as u64);
let mut message: Vec<u8> = vec![]; let mut message: Vec<u8> = vec![];
message_take.read_to_end(&mut message); message_take.read_to_end(&mut message);
println!("Size: {:?}", size);
println!("Message: {:?}", from_utf8(&*message)); let package = IPPackage {version, package_type, size, message};
self.package_queue.push(package);
} }
Ok(()) Ok(())
@ -77,7 +115,7 @@ impl Interface for IPInterface {
None => { None => {
let connection = match net::TcpStream::connect_timeout( let connection = match net::TcpStream::connect_timeout(
&addr, &addr,
Duration::new(1, 0) Duration::new(1, 0),
) { ) {
Ok(connection) => connection, Ok(connection) => connection,
Err(_) => return Err(General(String::from("Can't connect to this port"))) Err(_) => return Err(General(String::from("Can't connect to this port")))
@ -91,12 +129,21 @@ impl Interface for IPInterface {
self.connections[index].set_write_timeout(Some(Duration::new(1, 0)))?; self.connections[index].set_write_timeout(Some(Duration::new(1, 0)))?;
self.connections[index].write_all(&size_to_bytes(message.len()))?; IPInterface::send_package(&mut self.connections[index], IPPackage {
self.connections[index].write_all(message)?; version: 0,
package_type: MessageType::Common,
size: message.len() as u32,
message: Vec::from(message),
});
println!("Sent message"); println!("Sent message");
Ok(()) Ok(())
} }
fn receive(&mut self) -> IFResult<Option<(MessageBytes, TargetingData)>> { todo!() } fn receive(&mut self) -> IFResult<Option<(MessageBytes, TargetingData)>> {
match self.package_queue.pop() {
Some(ip_package) => Ok(Some( (ip_package.message, "".to_string()) )),
None => Ok(None)
}
}
} }
impl IPInterface { impl IPInterface {
@ -115,6 +162,7 @@ impl IPInterface {
connections: vec!(), connections: vec!(),
listener, listener,
peers: vec!(), peers: vec!(),
package_queue: vec!()
}) })
} }
pub fn dump(&self) -> IFResult<Vec<u8>> { pub fn dump(&self) -> IFResult<Vec<u8>> {
@ -126,8 +174,22 @@ impl IPInterface {
self.peers = peers; self.peers = peers;
Ok(()) Ok(())
} }
fn send_package(stream: &mut net::TcpStream, package: IPPackage) -> IFResult<()> {
let mut header: Vec<u8> = vec![];
header.push(package.version);
header.push(package.package_type.to_u8());
for byte in size_to_bytes(package.size) {
header.push(byte);
}
stream.write_all(&*header)?;
stream.write_all(&*package.message)?;
Ok(())
}
} }
fn create_tcp_listener() -> Option<net::TcpListener> { fn create_tcp_listener() -> Option<net::TcpListener> {
for port in SOCKET_RANGE { for port in SOCKET_RANGE {
match net::TcpListener::bind("127.0.0.1:".to_owned() + &port.to_string()) { match net::TcpListener::bind("127.0.0.1:".to_owned() + &port.to_string()) {
@ -138,9 +200,17 @@ fn create_tcp_listener() -> Option<net::TcpListener> {
None None
} }
fn parse_header(data: Vec<u8>) -> IFResult<IPPackage> {
Ok(IPPackage {
version: data[0],
package_type: MessageType::from_u8(data[1])?,
size: bytes_to_size([data[3], data[4], data[5], data[6]]),
message: vec![],
})
}
fn size_to_bytes(mut a: usize) -> [u8; 4] { fn size_to_bytes(mut a: u32) -> [u8; 4] {
let mut arr: [u8; 4] = [0, 0, 0, 0]; let mut arr: [u8; 4] = [0, 0, 0, 0];
for i in [3, 2, 1, 0] { for i in [3, 2, 1, 0] {
arr[i] = (a % 256) as u8; arr[i] = (a % 256) as u8;
@ -149,12 +219,12 @@ fn size_to_bytes(mut a: usize) -> [u8; 4] {
arr arr
} }
fn bytes_to_size(arr: [u8; 4]) -> usize { fn bytes_to_size(arr: [u8; 4]) -> u32 {
let mut size = 0; let mut size = 0;
for size_byte in &arr { for size_byte in &arr {
size = size * 256 + *size_byte as u32; size = size * 256 + *size_byte as u32;
} }
size as usize size
} }
#[test] #[test]
@ -165,26 +235,35 @@ fn test_creating_connection() -> IFResult<()> {
let mut interface2 = IPInterface::new()?; let mut interface2 = IPInterface::new()?;
let t1 = std::thread::spawn(move || { let t1 = std::thread::spawn(move || {
interface1.send(&message, Some(String::from("127.0.0.1:50000"))) interface1.send(&message, Some(String::from("127.0.0.1:50001")));
interface1
}); });
thread::sleep(Duration::from_millis(10)); thread::sleep(Duration::from_millis(10));
let t2 = std::thread::spawn(move || { let t2 = std::thread::spawn(move || {
interface2.main_loop_iteration() interface2.main_loop_iteration();
interface2
}); });
let res1 = t1.join(); let res1 = t1.join();
match res1 { match res1 {
Ok(res) => { Ok(mut res) => {
println!("Thread Ok"); println!("Thread Ok");
println!("interface1: {:?}", res)
},
}
Err(e) => println!("{:?}", e) Err(e) => println!("{:?}", e)
} }
let res2 = t2.join(); let res2 = t2.join();
match res2 { match res2 {
Ok(res) => { Ok(mut res) => {
println!("Thread Ok"); println!("Thread Ok");
println!("interface2: {:?}", res) match res.receive() {
}, Ok(tmp) => match tmp {
Some((message, metadata)) => println!("Received {:?}", message),
None => println!("None")
}
Err(e) => println!("{:?}", e)
}
}
Err(e) => println!("{:?}", e) Err(e) => println!("{:?}", e)
} }
Ok(()) Ok(())

Loading…
Cancel
Save