Browse Source

Created IPPackage header. Added receive queue

interface-2
Prokhor 3 years ago
parent
commit
3e25fbb4bb
  1. 121
      src/interfaces/ip.rs

121
src/interfaces/ip.rs

@ -10,6 +10,7 @@ use core::str::from_utf8;
use core::time::Duration;
use crate::interface::{Interface, InterfaceRequirements, TargetingData};
use crate::interfaces::ip::MessageType::PeerRequest;
use crate::message::MessageBytes;
use crate::res::{IFError, IFResult};
use crate::res::IFError::General;
@ -24,6 +25,41 @@ pub struct IPInterface {
connections: Vec<net::TcpStream>,
listener: net::TcpListener,
peers: Vec<net::IpAddr>,
package_queue: Vec<IPPackage>
}
#[derive(Debug)]
struct IPPackage {
version: u8,
package_type: MessageType,
size: u32,
message: MessageBytes,
}
#[derive(Debug)]
enum MessageType {
Service,
PeerRequest,
Common,
}
impl MessageType {
fn from_u8(id: u8) -> IFResult<MessageType> {
match id {
0 => Ok(MessageType::Service),
1 => Ok(MessageType::PeerRequest),
2 => Ok(MessageType::Common),
_ => Err(IFError::General("Incorrect message type".to_string()))
}
}
fn to_u8(&self) -> u8 {
match self {
MessageType::Service => 0,
MessageType::PeerRequest => 1,
MessageType::Common => 2
}
}
}
impl InterfaceRequirements for IPInterface {}
@ -43,16 +79,18 @@ impl Interface for IPInterface {
println!("Hello from mainloop");
for mut connection in &self.connections {
let mut size_arr: [u8; 4] = [0, 0, 0, 0];
connection.read_exact(&mut size_arr)?;
let mut size = bytes_to_size(size_arr) as u32;
let mut header: [u8; 6] = [0, 0, 0, 0, 0, 0];
connection.read_exact(&mut header)?;
let version = header[0];
let package_type = MessageType::from_u8(header[1])?;
let size = bytes_to_size([header[2], header[3], header[4], header[5]]);
let mut message_take = connection.take(size as u64);
let mut message: Vec<u8> = vec![];
message_take.read_to_end(&mut message);
println!("Size: {:?}", size);
println!("Message: {:?}", from_utf8(&*message));
let package = IPPackage {version, package_type, size, message};
self.package_queue.push(package);
}
Ok(())
@ -77,7 +115,7 @@ impl Interface for IPInterface {
None => {
let connection = match net::TcpStream::connect_timeout(
&addr,
Duration::new(1, 0)
Duration::new(1, 0),
) {
Ok(connection) => connection,
Err(_) => return Err(General(String::from("Can't connect to this port")))
@ -91,12 +129,21 @@ impl Interface for IPInterface {
self.connections[index].set_write_timeout(Some(Duration::new(1, 0)))?;
self.connections[index].write_all(&size_to_bytes(message.len()))?;
self.connections[index].write_all(message)?;
IPInterface::send_package(&mut self.connections[index], IPPackage {
version: 0,
package_type: MessageType::Common,
size: message.len() as u32,
message: Vec::from(message),
});
println!("Sent message");
Ok(())
}
fn receive(&mut self) -> IFResult<Option<(MessageBytes, TargetingData)>> { todo!() }
fn receive(&mut self) -> IFResult<Option<(MessageBytes, TargetingData)>> {
match self.package_queue.pop() {
Some(ip_package) => Ok(Some( (ip_package.message, "".to_string()) )),
None => Ok(None)
}
}
}
impl IPInterface {
@ -115,6 +162,7 @@ impl IPInterface {
connections: vec!(),
listener,
peers: vec!(),
package_queue: vec!()
})
}
pub fn dump(&self) -> IFResult<Vec<u8>> {
@ -126,8 +174,22 @@ impl IPInterface {
self.peers = peers;
Ok(())
}
fn send_package(stream: &mut net::TcpStream, package: IPPackage) -> IFResult<()> {
let mut header: Vec<u8> = vec![];
header.push(package.version);
header.push(package.package_type.to_u8());
for byte in size_to_bytes(package.size) {
header.push(byte);
}
stream.write_all(&*header)?;
stream.write_all(&*package.message)?;
Ok(())
}
}
fn create_tcp_listener() -> Option<net::TcpListener> {
for port in SOCKET_RANGE {
match net::TcpListener::bind("127.0.0.1:".to_owned() + &port.to_string()) {
@ -138,9 +200,17 @@ fn create_tcp_listener() -> Option<net::TcpListener> {
None
}
fn parse_header(data: Vec<u8>) -> IFResult<IPPackage> {
Ok(IPPackage {
version: data[0],
package_type: MessageType::from_u8(data[1])?,
size: bytes_to_size([data[3], data[4], data[5], data[6]]),
message: vec![],
})
}
fn size_to_bytes(mut a: usize) -> [u8; 4] {
fn size_to_bytes(mut a: u32) -> [u8; 4] {
let mut arr: [u8; 4] = [0, 0, 0, 0];
for i in [3, 2, 1, 0] {
arr[i] = (a % 256) as u8;
@ -149,12 +219,12 @@ fn size_to_bytes(mut a: usize) -> [u8; 4] {
arr
}
fn bytes_to_size(arr: [u8; 4]) -> usize {
fn bytes_to_size(arr: [u8; 4]) -> u32 {
let mut size = 0;
for size_byte in &arr {
size = size * 256 + *size_byte as u32;
}
size as usize
size
}
#[test]
@ -165,26 +235,35 @@ fn test_creating_connection() -> IFResult<()> {
let mut interface2 = IPInterface::new()?;
let t1 = std::thread::spawn(move || {
interface1.send(&message, Some(String::from("127.0.0.1:50000")))
interface1.send(&message, Some(String::from("127.0.0.1:50001")));
interface1
});
thread::sleep(Duration::from_millis(10));
let t2 = std::thread::spawn(move || {
interface2.main_loop_iteration()
interface2.main_loop_iteration();
interface2
});
let res1 = t1.join();
match res1 {
Ok(res) => {
Ok(mut res) => {
println!("Thread Ok");
println!("interface1: {:?}", res)
},
}
Err(e) => println!("{:?}", e)
}
let res2 = t2.join();
match res2 {
Ok(res) => {
Ok(mut res) => {
println!("Thread Ok");
println!("interface2: {:?}", res)
},
match res.receive() {
Ok(tmp) => match tmp {
Some((message, metadata)) => println!("Received {:?}", message),
None => println!("None")
}
Err(e) => println!("{:?}", e)
}
}
Err(e) => println!("{:?}", e)
}
Ok(())

Loading…
Cancel
Save