|
|
|
@ -99,116 +99,149 @@ impl Interface for IPInterface {
|
|
|
|
|
self.connections.push(stream) |
|
|
|
|
} |
|
|
|
|
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {} |
|
|
|
|
Err(e) => return Err(IFError::from(e)), |
|
|
|
|
Err(e) => println!("An error happened with an incoming connection: {:?}", e), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
let mut new_connections: Vec<TcpStream> = vec![]; |
|
|
|
|
for connection in &mut self.connections { |
|
|
|
|
connection.set_nonblocking(true)?; |
|
|
|
|
let mut buf = [0u8; 6]; |
|
|
|
|
let peek_res = connection.peek(&mut buf); |
|
|
|
|
if peek_res.is_err() || peek_res.unwrap() < 6 { |
|
|
|
|
continue
|
|
|
|
|
} |
|
|
|
|
let mut header: [u8; 6] = [0, 0, 0, 0, 0, 0]; |
|
|
|
|
match connection.read_exact(&mut header) { |
|
|
|
|
Ok(_) => {} |
|
|
|
|
Err(ref e) |
|
|
|
|
if e.kind() == std::io::ErrorKind::WouldBlock |
|
|
|
|
|| e.kind() == std::io::ErrorKind::UnexpectedEof => |
|
|
|
|
{ |
|
|
|
|
continue
|
|
|
|
|
} |
|
|
|
|
Err(e) => { |
|
|
|
|
println!("Error: {:?}", e); |
|
|
|
|
let mut connections_to_delete = vec![]; |
|
|
|
|
for (i, connection) in self.connections.iter_mut().enumerate() { |
|
|
|
|
let res: std::io::Result<()> = { |
|
|
|
|
connection.set_nonblocking(true)?; |
|
|
|
|
let mut buf = [0u8; 6]; |
|
|
|
|
let peek_res = connection.peek(&mut buf); |
|
|
|
|
if peek_res.is_err() || peek_res.unwrap() < 6 { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
let version = header[0]; |
|
|
|
|
let package_type = MessageType::from_u8(header[1])?; |
|
|
|
|
let size = bytes_to_size([header[2], header[3], header[4], header[5]]); |
|
|
|
|
connection.set_nonblocking(false)?; |
|
|
|
|
connection.set_read_timeout(Some(std::time::Duration::from_millis(500)))?; |
|
|
|
|
|
|
|
|
|
let mut message_take = connection.take(size as u64); |
|
|
|
|
let mut message: Vec<u8> = vec![]; |
|
|
|
|
message_take.read_to_end(&mut message)?; |
|
|
|
|
|
|
|
|
|
match package_type { |
|
|
|
|
MessageType::PeerRequest => { |
|
|
|
|
let peers_to_share = if self.peers.len() < PEER_THRESHOLD { |
|
|
|
|
self.peers.clone() |
|
|
|
|
} else { |
|
|
|
|
self.peers.iter().skip(7).step_by(2).cloned().collect() |
|
|
|
|
}; |
|
|
|
|
let message = serde_cbor::to_vec(&peers_to_share)?; |
|
|
|
|
IPInterface::send_package( |
|
|
|
|
connection, |
|
|
|
|
IPPackage { |
|
|
|
|
let mut header: [u8; 6] = [0, 0, 0, 0, 0, 0]; |
|
|
|
|
match connection.read_exact(&mut header) { |
|
|
|
|
Ok(_) => {} |
|
|
|
|
Err(ref e) |
|
|
|
|
if e.kind() == std::io::ErrorKind::WouldBlock |
|
|
|
|
|| e.kind() == std::io::ErrorKind::UnexpectedEof => |
|
|
|
|
{ |
|
|
|
|
continue
|
|
|
|
|
} |
|
|
|
|
Err(e) => { |
|
|
|
|
println!("Error: {:?}", e); |
|
|
|
|
connections_to_delete.push(i); |
|
|
|
|
let connection_addr = if let Ok(r) = connection.peer_addr() { |
|
|
|
|
r |
|
|
|
|
} else { |
|
|
|
|
continue; |
|
|
|
|
}; |
|
|
|
|
if let Some(peer) = self |
|
|
|
|
.peers |
|
|
|
|
.iter() |
|
|
|
|
.find(|p| compare_addrs(p, connection_addr)) |
|
|
|
|
{ |
|
|
|
|
if let Some(Some(conn)) = IPInterface::new_connection(peer).ok() { |
|
|
|
|
new_connections.push(conn) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
let version = header[0]; |
|
|
|
|
let package_type = MessageType::from_u8(header[1])?; |
|
|
|
|
let size = bytes_to_size([header[2], header[3], header[4], header[5]]); |
|
|
|
|
connection.set_nonblocking(false)?; |
|
|
|
|
connection.set_read_timeout(Some(std::time::Duration::from_millis(500)))?; |
|
|
|
|
|
|
|
|
|
let mut message_take = connection.take(size as u64); |
|
|
|
|
let mut message: Vec<u8> = vec![]; |
|
|
|
|
message_take.read_to_end(&mut message)?; |
|
|
|
|
|
|
|
|
|
match package_type { |
|
|
|
|
MessageType::PeerRequest => { |
|
|
|
|
let peers_to_share = if self.peers.len() < PEER_THRESHOLD { |
|
|
|
|
self.peers.clone() |
|
|
|
|
} else { |
|
|
|
|
self.peers.iter().skip(7).step_by(2).cloned().collect() |
|
|
|
|
}; |
|
|
|
|
let message = serde_cbor::to_vec(&peers_to_share)?; |
|
|
|
|
IPInterface::send_package( |
|
|
|
|
connection, |
|
|
|
|
IPPackage { |
|
|
|
|
version, |
|
|
|
|
package_type: MessageType::PeersShared, |
|
|
|
|
size: message.len() as u32, |
|
|
|
|
message, |
|
|
|
|
}, |
|
|
|
|
)?; |
|
|
|
|
} |
|
|
|
|
MessageType::Common => { |
|
|
|
|
let package = IPPackage { |
|
|
|
|
version, |
|
|
|
|
package_type: MessageType::PeersShared, |
|
|
|
|
size: message.len() as u32, |
|
|
|
|
package_type, |
|
|
|
|
size, |
|
|
|
|
message, |
|
|
|
|
}, |
|
|
|
|
)?; |
|
|
|
|
} |
|
|
|
|
MessageType::Common => { |
|
|
|
|
let package = IPPackage { |
|
|
|
|
version, |
|
|
|
|
package_type, |
|
|
|
|
size, |
|
|
|
|
message, |
|
|
|
|
}; |
|
|
|
|
self.package_queue |
|
|
|
|
.push((package, format!("{:?}", connection.peer_addr()?))); |
|
|
|
|
} |
|
|
|
|
MessageType::PeersShared => { |
|
|
|
|
let peers: Vec<Peer> = serde_cbor::from_slice(message.as_slice())?; |
|
|
|
|
for peer in peers { |
|
|
|
|
if !self.peers.contains(&peer) { |
|
|
|
|
if let Some(conn) = IPInterface::new_connection(&peer)? { |
|
|
|
|
new_connections.push(conn) |
|
|
|
|
}; |
|
|
|
|
self.package_queue |
|
|
|
|
.push((package, format!("{:?}", connection.peer_addr()?))); |
|
|
|
|
} |
|
|
|
|
MessageType::PeersShared => { |
|
|
|
|
let peers: Vec<Peer> = serde_cbor::from_slice(message.as_slice())?; |
|
|
|
|
for peer in peers { |
|
|
|
|
if !self.peers.contains(&peer) { |
|
|
|
|
if let Some(conn) = IPInterface::new_connection(&peer)? { |
|
|
|
|
new_connections.push(conn) |
|
|
|
|
} |
|
|
|
|
self.peers.push(peer); |
|
|
|
|
} |
|
|
|
|
self.peers.push(peer); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Ok(()) |
|
|
|
|
}; |
|
|
|
|
if res.is_err() && res.unwrap_err().kind() == std::io::ErrorKind::BrokenPipe { |
|
|
|
|
connections_to_delete.push(i) |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
for (j, index) in connections_to_delete.iter().enumerate() { |
|
|
|
|
self.connections.remove(index - j); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for conn in new_connections.iter_mut() { |
|
|
|
|
self.initialize_connection(conn)?; |
|
|
|
|
self.initialize_connection(conn) |
|
|
|
|
.unwrap_or_else(|e| println!("Couldn't initialize connection: {:?}", e)); |
|
|
|
|
} |
|
|
|
|
self.connections.extend(new_connections); |
|
|
|
|
|
|
|
|
|
self.main_loop_iterations += 1; |
|
|
|
|
// Every 50 iterations we connect to everyone we know
|
|
|
|
|
if self.main_loop_iterations % 50 == 0 { |
|
|
|
|
let connected_addresses = self |
|
|
|
|
.connections |
|
|
|
|
.iter() |
|
|
|
|
.filter_map(|conn| conn.peer_addr().ok()) |
|
|
|
|
.collect::<Vec<_>>(); |
|
|
|
|
let peers_we_do_not_have_connections_with = self |
|
|
|
|
.peers |
|
|
|
|
.iter() |
|
|
|
|
.filter(|p| { |
|
|
|
|
!connected_addresses |
|
|
|
|
.iter() |
|
|
|
|
.any(|addr| compare_addrs(p, *addr)) |
|
|
|
|
}) |
|
|
|
|
.copied() |
|
|
|
|
.collect::<Vec<_>>(); |
|
|
|
|
let peers_we_do_not_have_connections_with = self.disconnected_peers(); |
|
|
|
|
self.connections |
|
|
|
|
.extend(IPInterface::get_connections_to_peers( |
|
|
|
|
&peers_we_do_not_have_connections_with, |
|
|
|
|
self.peers.len() < PEER_THRESHOLD * 2, |
|
|
|
|
)); |
|
|
|
|
} |
|
|
|
|
if self.connections.is_empty() { |
|
|
|
|
for peer in self.peers.clone() { |
|
|
|
|
self.obtain_connection(&peer) |
|
|
|
|
.map(|_| ()) |
|
|
|
|
.unwrap_or_else(|e| println!("Error in obtaining connection: {:?}", e)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// We do a peer exchange every 30 iterations
|
|
|
|
|
if self.main_loop_iterations % 30 == 0 && !self.connections.is_empty() { |
|
|
|
|
let connection_index = |
|
|
|
|
(self.main_loop_iterations / 30) as usize % self.connections.len(); |
|
|
|
|
IPInterface::request_peers(&mut self.connections[connection_index])?; |
|
|
|
|
match IPInterface::request_peers(&mut self.connections[connection_index]) { |
|
|
|
|
Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => { |
|
|
|
|
let peer = ( |
|
|
|
|
self.connections[connection_index].peer_addr()?.ip(), |
|
|
|
|
self.connections[connection_index].peer_addr()?.port(), |
|
|
|
|
); |
|
|
|
|
self.connections.remove(connection_index); |
|
|
|
|
let connection_index = self.obtain_connection(&peer)?; |
|
|
|
|
IPInterface::request_peers(&mut self.connections[connection_index])?; |
|
|
|
|
} |
|
|
|
|
Err(e) => println!("An error in peer sharing: {:?}", e), |
|
|
|
|
_ => {} |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
@ -219,8 +252,8 @@ impl Interface for IPInterface {
|
|
|
|
|
fn id(&self) -> &str { |
|
|
|
|
&*self.id |
|
|
|
|
} |
|
|
|
|
fn send(&mut self, message: &[u8], interface_data: Option<TargetingData>) -> IFResult<()> { |
|
|
|
|
|
|
|
|
|
fn send(&mut self, message: &[u8], interface_data: Option<TargetingData>) -> IFResult<()> { |
|
|
|
|
let package = IPPackage { |
|
|
|
|
version: 0, |
|
|
|
|
package_type: MessageType::Common, |
|
|
|
@ -231,13 +264,50 @@ impl Interface for IPInterface {
|
|
|
|
|
match interface_data { |
|
|
|
|
Some(ip_string) => { |
|
|
|
|
let addr: net::SocketAddr = ip_string.parse().expect("Unable to parse address"); |
|
|
|
|
let index = self.obtain_connection(&(addr.ip(), addr.port()))?; |
|
|
|
|
IPInterface::send_package(&mut self.connections[index], package)?; |
|
|
|
|
let peer = (addr.ip(), addr.port()); |
|
|
|
|
let index = self.obtain_connection(&peer)?; |
|
|
|
|
match IPInterface::send_package(&mut self.connections[index], package.clone()) { |
|
|
|
|
Ok(_) => {} |
|
|
|
|
Err(_) => { |
|
|
|
|
self.remove_all_connections_to_peer(&peer); |
|
|
|
|
let index = self.obtain_connection(&(addr.ip(), addr.port()))?; |
|
|
|
|
IPInterface::send_package(&mut self.connections[index], package).map_err( |
|
|
|
|
|e| { |
|
|
|
|
println!("Error while sending: {:?}", e); |
|
|
|
|
e |
|
|
|
|
}, |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
None => { |
|
|
|
|
for conn in &mut self.connections { |
|
|
|
|
IPInterface::send_package(conn, package.clone())?; |
|
|
|
|
if self.connections.len() < PEER_THRESHOLD |
|
|
|
|
&& self.connections.len() < self.peers.len() |
|
|
|
|
{ |
|
|
|
|
let new_connections = IPInterface::get_connections_to_peers( |
|
|
|
|
&self.disconnected_peers(), |
|
|
|
|
self.peers.len() < PEER_THRESHOLD, |
|
|
|
|
); |
|
|
|
|
self.connections.extend(new_connections); |
|
|
|
|
} |
|
|
|
|
let connections_to_delete = self |
|
|
|
|
.connections |
|
|
|
|
.iter_mut() |
|
|
|
|
.enumerate() |
|
|
|
|
.filter_map(|(i, conn)| { |
|
|
|
|
IPInterface::send_package(conn, package.clone()) |
|
|
|
|
.err() |
|
|
|
|
.map(|_| i) |
|
|
|
|
}) |
|
|
|
|
.collect::<Vec<_>>(); |
|
|
|
|
for (j, index) in connections_to_delete.iter().enumerate() { |
|
|
|
|
self.connections.remove(index - j); |
|
|
|
|
} |
|
|
|
|
self.connections |
|
|
|
|
.extend(IPInterface::get_connections_to_peers( |
|
|
|
|
&self.disconnected_peers(), |
|
|
|
|
self.peers.len() < PEER_THRESHOLD, |
|
|
|
|
)) |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
Ok(()) |
|
|
|
@ -296,7 +366,7 @@ impl IPInterface {
|
|
|
|
|
.filter_map(|r| r.ok()) |
|
|
|
|
.filter_map(|r| r) |
|
|
|
|
.map(|mut c| -> IFResult<TcpStream> { |
|
|
|
|
println!("Requesting peers from {:?}", c.peer_addr().unwrap()); |
|
|
|
|
println!("Requesting peers from {:?}", c.peer_addr().ok()); |
|
|
|
|
if do_peer_request { |
|
|
|
|
Self::request_peers(&mut c)?; |
|
|
|
|
} |
|
|
|
@ -306,6 +376,37 @@ impl IPInterface {
|
|
|
|
|
.collect::<Vec<_>>() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn connected_addresses(&self) -> Vec<net::SocketAddr> { |
|
|
|
|
self.connections |
|
|
|
|
.iter() |
|
|
|
|
.filter_map(|conn| conn.peer_addr().ok()) |
|
|
|
|
.collect::<Vec<_>>() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn disconnected_peers(&self) -> Vec<Peer> { |
|
|
|
|
let connected_addresses = self.connected_addresses(); |
|
|
|
|
self.peers |
|
|
|
|
.iter() |
|
|
|
|
.filter(|p| { |
|
|
|
|
!connected_addresses |
|
|
|
|
.iter() |
|
|
|
|
.any(|addr| compare_addrs(p, *addr)) |
|
|
|
|
}) |
|
|
|
|
.copied() |
|
|
|
|
.collect::<Vec<_>>() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn remove_all_connections_to_peer(&mut self, peer: &Peer) { |
|
|
|
|
while let Some(ind) = self |
|
|
|
|
.connections |
|
|
|
|
.iter() |
|
|
|
|
.filter_map(|conn| conn.peer_addr().ok()) |
|
|
|
|
.position(|addr| compare_addrs(&peer, addr)) |
|
|
|
|
{ |
|
|
|
|
self.connections.remove(ind); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn new(port: u16, peers: Vec<Peer>) -> IFResult<Self> { |
|
|
|
|
let listener = match create_tcp_listener(port) { |
|
|
|
|
Some(listener) => listener, |
|
|
|
@ -339,9 +440,8 @@ impl IPInterface {
|
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn send_package(stream: &mut net::TcpStream, package: IPPackage) -> IFResult<()> { |
|
|
|
|
fn send_package(stream: &mut net::TcpStream, package: IPPackage) -> std::io::Result<()> { |
|
|
|
|
stream.set_write_timeout(Some(std::time::Duration::from_millis(700)))?; |
|
|
|
|
#[cfg(test)] |
|
|
|
|
stream.set_nonblocking(false)?; |
|
|
|
|
let mut header: Vec<u8> = vec![package.version, package.package_type.as_u8()]; |
|
|
|
|
for byte in size_to_bytes(package.size) { |
|
|
|
@ -360,7 +460,7 @@ impl IPInterface {
|
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn request_peers(conn: &mut TcpStream) -> IFResult<()> { |
|
|
|
|
fn request_peers(conn: &mut TcpStream) -> std::io::Result<()> { |
|
|
|
|
IPInterface::send_package( |
|
|
|
|
conn, |
|
|
|
|
IPPackage { |
|
|
|
@ -374,9 +474,12 @@ impl IPInterface {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn obtain_connection(&mut self, addr: &Peer) -> IFResult<usize> { |
|
|
|
|
if let Some(pos) = self.connections.iter().position(|con| { |
|
|
|
|
con.peer_addr().is_ok() && compare_addrs(addr, con.peer_addr().unwrap()) |
|
|
|
|
}) { |
|
|
|
|
if let Some(pos) = self |
|
|
|
|
.connections |
|
|
|
|
.iter() |
|
|
|
|
.filter_map(|conn| conn.peer_addr().ok()) |
|
|
|
|
.position(|pa| compare_addrs(addr, pa)) |
|
|
|
|
{ |
|
|
|
|
return Ok(pos); |
|
|
|
|
} |
|
|
|
|
if let Some(conn) = Self::new_connection(addr)? { |
|
|
|
@ -387,7 +490,7 @@ impl IPInterface {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn new_connection(addr: &Peer) -> IFResult<Option<TcpStream>> { |
|
|
|
|
fn new_connection(addr: &Peer) -> std::io::Result<Option<TcpStream>> { |
|
|
|
|
for port in addr.1..addr.1 + 3 { |
|
|
|
|
match net::TcpStream::connect_timeout( |
|
|
|
|
&net::SocketAddr::new(addr.0, port as u16), |
|
|
|
|