Browse Source

Wrote some logic in TestInterface, fixed some warnings

master
Lev 3 years ago
parent
commit
fa49945652
  1. 7
      src/interface.rs
  2. 94
      src/interfaces/ip.rs
  3. 59
      src/ironforce.rs
  4. 4
      src/transport.rs
  5. 29
      src/tunnel.rs

7
src/interface.rs

@ -96,7 +96,7 @@ pub mod test_interface {
let mut storage_locked = self.storage.lock(); let mut storage_locked = self.storage.lock();
while let Some(i) = storage_locked while let Some(i) = storage_locked
.iter() .iter()
.position(|msg| msg.1 == self.this_peer_id) .position(|msg| msg.1 == self.this_peer_id || msg.1.is_empty())
{ {
self.messages.push(storage_locked.remove(i)); self.messages.push(storage_locked.remove(i));
} }
@ -108,10 +108,11 @@ pub mod test_interface {
} }
fn send(&mut self, message: &[u8], target: Option<TargetingData>) -> IFResult<()> { fn send(&mut self, message: &[u8], target: Option<TargetingData>) -> IFResult<()> {
Ok(self self
.storage .storage
.lock() .lock()
.push((Vec::from(message), target.unwrap_or_default()))) .push((Vec::from(message), target.unwrap_or_default()));
Ok(())
} }
fn receive(&mut self) -> IFResult<Option<(MessageBytes, TargetingData)>> { fn receive(&mut self) -> IFResult<Option<(MessageBytes, TargetingData)>> {

94
src/interfaces/ip.rs

@ -1,16 +1,17 @@
use alloc::borrow::ToOwned; use alloc::borrow::ToOwned;
use std::net; use alloc::string::{String, ToString};
use std::thread;
use alloc::vec; use alloc::vec;
use alloc::vec::Vec; use alloc::vec::Vec;
use alloc::string::{String, ToString};
use core::ops::RangeInclusive; use core::ops::RangeInclusive;
use core::time::Duration; use core::time::Duration;
use std::net;
#[cfg(test)]
use std::thread;
use crate::interface::{Interface, InterfaceRequirements, TargetingData}; use crate::interface::{Interface, InterfaceRequirements, TargetingData};
use crate::message::MessageBytes; use crate::message::MessageBytes;
use crate::res::{IFError, IFResult};
use crate::res::IFError::General; use crate::res::IFError::General;
use crate::res::{IFError, IFResult};
use crate::std::io::{Read, Write}; use crate::std::io::{Read, Write};
use crate::std::println; use crate::std::println;
@ -22,7 +23,7 @@ pub struct IPInterface {
connections: Vec<net::TcpStream>, connections: Vec<net::TcpStream>,
listener: net::TcpListener, listener: net::TcpListener,
peers: Vec<net::IpAddr>, peers: Vec<net::IpAddr>,
package_queue: Vec<IPPackage> package_queue: Vec<IPPackage>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -46,7 +47,7 @@ impl MessageType {
0 => Ok(MessageType::Service), 0 => Ok(MessageType::Service),
1 => Ok(MessageType::PeerRequest), 1 => Ok(MessageType::PeerRequest),
2 => Ok(MessageType::Common), 2 => Ok(MessageType::Common),
_ => Err(IFError::General("Incorrect message type".to_string())) _ => Err(IFError::General("Incorrect message type".to_string())),
} }
} }
@ -54,7 +55,7 @@ impl MessageType {
match self { match self {
MessageType::Service => 0, MessageType::Service => 0,
MessageType::PeerRequest => 1, MessageType::PeerRequest => 1,
MessageType::Common => 2 MessageType::Common => 2,
} }
} }
} }
@ -86,7 +87,12 @@ impl Interface for IPInterface {
let mut message: Vec<u8> = vec![]; let mut message: Vec<u8> = vec![];
message_take.read_to_end(&mut message)?; message_take.read_to_end(&mut message)?;
let package = IPPackage {version, package_type, size, message}; let package = IPPackage {
version,
package_type,
size,
message,
};
self.package_queue.push(package); self.package_queue.push(package);
} }
@ -102,43 +108,53 @@ impl Interface for IPInterface {
fn send(&mut self, message: &[u8], interface_data: Option<TargetingData>) -> IFResult<()> { fn send(&mut self, message: &[u8], interface_data: Option<TargetingData>) -> IFResult<()> {
let addr: net::SocketAddr = match interface_data { let addr: net::SocketAddr = match interface_data {
Some(ip_string) => ip_string.parse().expect("Unable to parse address"), Some(ip_string) => ip_string.parse().expect("Unable to parse address"),
None => return Err(IFError::General(String::from("Not enough info to create connections"))) None => {
return Err(IFError::General(String::from(
"Not enough info to create connections",
)))
}
}; };
println!("Connecting to {:?}", addr); println!("Connecting to {:?}", addr);
let index = match self
let index = match self.connections.iter().position(|connection| connection.peer_addr().ok() == Some(addr)) { .connections
.iter()
.position(|connection| connection.peer_addr().ok() == Some(addr))
{
None => { None => {
let connection = match net::TcpStream::connect_timeout( let connection = match net::TcpStream::connect_timeout(&addr, Duration::new(1, 0)) {
&addr,
Duration::new(1, 0),
) {
Ok(connection) => connection, Ok(connection) => connection,
Err(_) => return Err(General(String::from("Can't connect to this port"))) Err(_) => return Err(General(String::from("Can't connect to this port"))),
}; };
self.connections.push(connection); self.connections.push(connection);
self.connections.len() - 1 self.connections.len() - 1
} }
Some(i) => i Some(i) => i,
}; };
println!("Sending message to {:?}", self.connections[index].peer_addr().unwrap()); println!(
"Sending message to {:?}",
self.connections[index].peer_addr().unwrap()
);
self.connections[index].set_write_timeout(Some(Duration::new(1, 0)))?; self.connections[index].set_write_timeout(Some(Duration::new(1, 0)))?;
IPInterface::send_package(&mut self.connections[index], IPPackage { IPInterface::send_package(
&mut self.connections[index],
IPPackage {
version: 0, version: 0,
package_type: MessageType::Common, package_type: MessageType::Common,
size: message.len() as u32, size: message.len() as u32,
message: Vec::from(message), message: Vec::from(message),
})?; },
)?;
println!("Sent message"); println!("Sent message");
Ok(()) Ok(())
} }
fn receive(&mut self) -> IFResult<Option<(MessageBytes, TargetingData)>> { fn receive(&mut self) -> IFResult<Option<(MessageBytes, TargetingData)>> {
match self.package_queue.pop() { match self.package_queue.pop() {
Some(ip_package) => Ok(Some((ip_package.message, "".to_string()))), Some(ip_package) => Ok(Some((ip_package.message, "".to_string()))),
None => Ok(None) None => Ok(None),
} }
} }
} }
@ -148,7 +164,9 @@ impl IPInterface {
let listener = match create_tcp_listener() { let listener = match create_tcp_listener() {
Some(listener) => listener, Some(listener) => listener,
None => { None => {
return Err(IFError::General(String::from("Unable to open TCP listener"))); return Err(IFError::General(String::from(
"Unable to open TCP listener",
)));
} }
}; };
@ -156,10 +174,10 @@ impl IPInterface {
Ok(IPInterface { Ok(IPInterface {
id: String::from("IP Interface"), id: String::from("IP Interface"),
connections: vec!(), connections: vec![],
listener, listener,
peers: vec!(), peers: vec![],
package_queue: vec!() package_queue: vec![],
}) })
} }
pub fn dump(&self) -> IFResult<Vec<u8>> { pub fn dump(&self) -> IFResult<Vec<u8>> {
@ -173,9 +191,7 @@ impl IPInterface {
} }
fn send_package(stream: &mut net::TcpStream, package: IPPackage) -> IFResult<()> { fn send_package(stream: &mut net::TcpStream, package: IPPackage) -> IFResult<()> {
let mut header: Vec<u8> = vec![]; let mut header: Vec<u8> = vec![package.version, package.package_type.to_u8()];
header.push(package.version);
header.push(package.package_type.to_u8());
for byte in size_to_bytes(package.size) { for byte in size_to_bytes(package.size) {
header.push(byte); header.push(byte);
} }
@ -186,14 +202,13 @@ impl IPInterface {
} }
} }
fn create_tcp_listener() -> Option<net::TcpListener> { fn create_tcp_listener() -> Option<net::TcpListener> {
for port in SOCKET_RANGE { for port in SOCKET_RANGE {
match net::TcpListener::bind("127.0.0.1:".to_owned() + &port.to_string()) { match net::TcpListener::bind("127.0.0.1:".to_owned() + &port.to_string()) {
Ok(listener) => return Some(listener), Ok(listener) => return Some(listener),
Err(_e) => {} Err(_e) => {}
} }
}; }
None None
} }
@ -206,7 +221,6 @@ fn parse_header(data: Vec<u8>) -> IFResult<IPPackage> {
}) })
} }
fn size_to_bytes(mut a: u32) -> [u8; 4] { fn size_to_bytes(mut a: u32) -> [u8; 4] {
let mut arr: [u8; 4] = [0, 0, 0, 0]; let mut arr: [u8; 4] = [0, 0, 0, 0];
for i in [3, 2, 1, 0] { for i in [3, 2, 1, 0] {
@ -232,7 +246,9 @@ fn test_creating_connection() -> IFResult<()> {
let mut interface2 = IPInterface::new()?; let mut interface2 = IPInterface::new()?;
let t1 = std::thread::spawn(move || { let t1 = std::thread::spawn(move || {
interface1.send(&message, Some(String::from("127.0.0.1:50001"))).unwrap(); interface1
.send(&message, Some(String::from("127.0.0.1:50001")))
.unwrap();
interface1 interface1
}); });
thread::sleep(Duration::from_millis(10)); thread::sleep(Duration::from_millis(10));
@ -244,9 +260,8 @@ fn test_creating_connection() -> IFResult<()> {
match res1 { match res1 {
Ok(_res) => { Ok(_res) => {
println!("Thread Ok"); println!("Thread Ok");
} }
Err(e) => println!("{:?}", e) Err(e) => println!("{:?}", e),
} }
let res2 = t2.join(); let res2 = t2.join();
match res2 { match res2 {
@ -255,18 +270,15 @@ fn test_creating_connection() -> IFResult<()> {
match res.receive() { match res.receive() {
Ok(tmp) => match tmp { Ok(tmp) => match tmp {
Some((message, _metadata)) => println!("Received {:?}", message), Some((message, _metadata)) => println!("Received {:?}", message),
None => println!("None") None => println!("None"),
} },
Err(e) => println!("{:?}", e) Err(e) => println!("{:?}", e),
} }
} }
Err(e) => println!("{:?}", e) Err(e) => println!("{:?}", e),
} }
Ok(()) Ok(())
} }
#[cfg(test)] #[cfg(test)]
pub mod test_ip_interface {} pub mod test_ip_interface {}

59
src/ironforce.rs

@ -2,7 +2,7 @@ use crate::crypto::PublicKey;
use crate::message::{Message, MessageType, ServiceMessageType}; use crate::message::{Message, MessageType, ServiceMessageType};
use crate::res::{IFError, IFResult}; use crate::res::{IFError, IFResult};
use crate::transport::Transport; use crate::transport::Transport;
use crate::tunnel::Tunnel; use crate::tunnel::{Tunnel, TunnelPublic};
use alloc::vec; use alloc::vec;
use alloc::vec::Vec; use alloc::vec::Vec;
@ -18,6 +18,12 @@ pub struct IronForce {
additional_modules: Vec<()>, additional_modules: Vec<()>,
/// Non-service messages to give outside /// Non-service messages to give outside
messages: Vec<Message>, messages: Vec<Message>,
/// Tunnels that has not been confirmed yet (no backward spread)
///
/// `[(Tunnel, Optional target node)]`
tunnels_pending: Vec<(TunnelPublic, Option<PublicKey> /* target node */)>,
/// True if this instance has background thread
has_background_worker: bool,
} }
impl IronForce { impl IronForce {
@ -28,17 +34,26 @@ impl IronForce {
tunnels: vec![], tunnels: vec![],
additional_modules: vec![], additional_modules: vec![],
messages: vec![], messages: vec![],
tunnels_pending: vec![],
has_background_worker: false,
} }
} }
/// Create a new tunnel to another node /// Create a new tunnel to another node
fn create_new_tunnel(&mut self, _destination: PublicKey) -> IFResult<Tunnel> { fn initialize_tunnel_creation(&mut self, destination: PublicKey) -> IFResult<()> {
todo!() let tunnel = TunnelPublic::new_singlecast();
self.tunnels_pending.push((tunnel.clone(), Some(destination)));
let message = Message::build()
.message_type(MessageType::Service(ServiceMessageType::TunnelBuilding(
tunnel
))).build()?;
self.send_to_all(message)?;
Ok(())
} }
/// Send a multicast or broadcast message /// Send a multicast or broadcast message
fn send_to_all(&mut self, _message: Message) -> IFResult<()> { fn send_to_all(&mut self, message: Message) -> IFResult<()> {
todo!() self.transport.send_message(serde_cbor::to_vec(&message)?, None)
} }
/// Send a message through tunnel /// Send a message through tunnel
@ -74,7 +89,7 @@ impl IronForce {
/// Process a message: if it's a service message, act accordingly. /// Process a message: if it's a service message, act accordingly.
/// Otherwise, add to `self.messages` /// Otherwise, add to `self.messages`
fn process_message(&mut self, message: Message) { fn process_message(&mut self, message: Message, _inc_peer: u64) -> IFResult<()> {
match message.message_type { match message.message_type {
MessageType::Service(msg_type) => match msg_type { MessageType::Service(msg_type) => match msg_type {
ServiceMessageType::TunnelBuilding(_tunnel) => { ServiceMessageType::TunnelBuilding(_tunnel) => {
@ -83,10 +98,42 @@ impl IronForce {
}, },
MessageType::SingleCast | MessageType::Broadcast => self.messages.push(message), MessageType::SingleCast | MessageType::Broadcast => self.messages.push(message),
} }
Ok(())
} }
/// Get a message from `self.messages` /// Get a message from `self.messages`
pub fn read_message(&mut self) -> Option<Message> { pub fn read_message(&mut self) -> Option<Message> {
self.messages.pop() self.messages.pop()
} }
pub fn main_loop_iteration(&mut self) -> IFResult<()> {
self.transport.main_loop_iteration()?;
while let Some((msg, inc_peer)) = self.transport.receive() {
self.process_message(serde_cbor::from_slice(msg.as_slice())?, inc_peer)?
}
Ok(())
}
}
#[cfg(test)]
mod if_testing {
use crate::ironforce::IronForce;
use alloc::vec::Vec;
use alloc::vec;
use alloc::boxed::Box;
use crate::interface::test_interface::create_test_interfaces;
use crate::transport::Transport;
fn create_test_network() -> Vec<IronForce> {
let interfaces = create_test_interfaces(5);
let transports = interfaces.into_iter().map(|interface| Transport::new(vec![Box::new(interface)]));
transports.map(|tr| IronForce {
transport: tr,
tunnels: vec![],
additional_modules: vec![],
messages: vec![],
tunnels_pending: vec![],
has_background_worker: false
}).collect()
}
} }

4
src/transport.rs

@ -168,7 +168,7 @@ fn test_adding_peer_to_transport() {
fn test_transport_sending() { fn test_transport_sending() {
let mut transport = Transport::new(vec![Box::new(SimpleTestInterface::default())]); let mut transport = Transport::new(vec![Box::new(SimpleTestInterface::default())]);
let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string());
let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data.clone()); let peer_id = transport.find_or_add_peer(interface_id, interface_targeting_data.clone());
transport.send_message(vec![239, 123], None).unwrap(); transport.send_message(vec![239, 123], None).unwrap();
assert_eq!(transport.interfaces[0].receive().unwrap(), Some((vec![239u8, 123], "".to_string()))); assert_eq!(transport.interfaces[0].receive().unwrap(), Some((vec![239u8, 123], "".to_string())));
assert!(transport.interfaces[0].receive() == IFResult::Ok(None)); assert!(transport.interfaces[0].receive() == IFResult::Ok(None));
@ -180,7 +180,7 @@ fn test_transport_sending() {
fn test_transport_receiving() { fn test_transport_receiving() {
let mut transport = Transport::new(vec![Box::new(SimpleTestInterface::default())]); let mut transport = Transport::new(vec![Box::new(SimpleTestInterface::default())]);
let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string()); let (interface_id, interface_targeting_data) = ("test_interface".to_string(), "hi".to_string());
let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data.clone()); let peer_id = transport.find_or_add_peer(interface_id.clone(), interface_targeting_data);
transport.send_message(vec![239, 123], None).unwrap(); transport.send_message(vec![239, 123], None).unwrap();
assert_eq!(transport.receive(), Some((vec![239u8, 123], transport.get_peer_by_parameters(interface_id.as_str(), "").unwrap().peer_id))); assert_eq!(transport.receive(), Some((vec![239u8, 123], transport.get_peer_by_parameters(interface_id.as_str(), "").unwrap().peer_id)));
assert!(transport.receive().is_none()); assert!(transport.receive().is_none());

29
src/tunnel.rs

@ -2,6 +2,7 @@ use alloc::vec::Vec;
use crate::crypto::PublicKey; use crate::crypto::PublicKey;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use sha2::Digest; use sha2::Digest;
use alloc::vec;
/// A tunnel that is used for communication /// A tunnel that is used for communication
#[derive(Serialize, Clone, Deserialize)] #[derive(Serialize, Clone, Deserialize)]
@ -27,16 +28,29 @@ pub struct Tunnel {
#[derive(Serialize, Clone, Deserialize)] #[derive(Serialize, Clone, Deserialize)]
pub struct TunnelPublic { pub struct TunnelPublic {
/// Tunnel's id /// Tunnel's id
id: Option<u64>, pub id: Option<u64>,
/// Ids, each of them is just for local storage on each node until a final global id is created /// Ids, each of them is just for local storage on each node until a final global id is created
local_ids: Vec<u64>, pub local_ids: Vec<u64>,
/// Time at which this tunnel should be destroyed (UNIX epoch) /// Time at which this tunnel should be destroyed (UNIX epoch)
ttd: u64, pub ttd: u64,
/// Public keys of nodes in the tunnel /// Public keys of nodes in the tunnel
nodes_in_tunnel: Option<Vec<PublicKey>>, nodes_in_tunnel: Option<Vec<PublicKey>>,
/// Is this tunnel used for multicast?
pub is_multicast: bool,
} }
impl TunnelPublic { impl TunnelPublic {
pub fn new_singlecast() -> Self {
Self {
id: None,
local_ids: vec![rand::random()],
ttd: 0,
nodes_in_tunnel: None,
is_multicast: false,
}
}
/// Get the hash of the tunnel for verification
pub fn hash(&self) -> Vec<u8> { pub fn hash(&self) -> Vec<u8> {
sha2::Sha224::new() sha2::Sha224::new()
.chain(serde_cbor::to_vec(self).unwrap().as_slice()) .chain(serde_cbor::to_vec(self).unwrap().as_slice())
@ -49,14 +63,12 @@ impl TunnelPublic {
id: Some(56), id: Some(56),
local_ids: vec![5, 500, 120], local_ids: vec![5, 500, 120],
ttd: 56, ttd: 56,
nodes_in_tunnel: Some(vec![crate::crypto::Keys::generate().get_public()]) nodes_in_tunnel: Some(vec![crate::crypto::Keys::generate().get_public()]),
is_multicast: true,
} }
} }
} }
#[cfg(test)]
use alloc::vec;
#[test] #[test]
fn test_tunnel_hashing() { fn test_tunnel_hashing() {
let tun = TunnelPublic::new_for_test(); let tun = TunnelPublic::new_for_test();
@ -65,6 +77,7 @@ fn test_tunnel_hashing() {
id: Some(56), id: Some(56),
local_ids: vec![5, 500, 120], local_ids: vec![5, 500, 120],
ttd: 56, ttd: 56,
nodes_in_tunnel: Some(vec![crate::crypto::Keys::generate().get_public()]) nodes_in_tunnel: Some(vec![crate::crypto::Keys::generate().get_public()]),
is_multicast: false,
}.hash()); }.hash());
} }

Loading…
Cancel
Save