From 668e7042646aaa2dc2f7b3f12aa417831a05c7c9 Mon Sep 17 00:00:00 2001 From: alterdekim Date: Mon, 25 Nov 2024 18:15:50 +0300 Subject: [PATCH] Possible fix for Linux clients modified: src/client.rs --- src/client.rs | 62 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 19 deletions(-) diff --git a/src/client.rs b/src/client.rs index e125827..3fb5c29 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,7 +1,9 @@ pub mod general { use crate::config::ClientConfiguration; - use tokio_util::sync::CancellationToken; + use crossbeam_channel::{Receiver, Sender}; + use futures::{stream::SplitSink, StreamExt}; + use tokio_util::{codec::Framed, sync::CancellationToken}; use tokio::{net::UdpSocket, sync::{Mutex, mpsc}, io::{AsyncWriteExt, AsyncReadExt}, fs::File}; use log::{error, info, warn}; use aes_gcm::{ @@ -14,21 +16,22 @@ pub mod general { use x25519_dalek::{PublicKey, StaticSecret}; use crate::udp::{UDPVpnPacket, UDPVpnHandshake, UDPSerializable}; - use tun::{ DeviceReader, DeviceWriter }; + use tun::{ AsyncDevice, DeviceReader, DeviceWriter, TunPacketCodec }; pub trait ReadWrapper { - async fn read(&mut self, buf: &mut [u8]) -> Result; + async fn read(&mut self, buf: &mut Vec) -> Result; } pub struct DevReader { - pub dr: DeviceReader + pub dr: Receiver> } // TODO: implement custom Error impl ReadWrapper for DevReader { - async fn read(&mut self, buf: &mut [u8]) -> Result { - if let Ok(a) = self.dr.read(buf).await { - return Ok(a); + async fn read(&mut self, buf: &mut Vec) -> Result { + if let Ok(tb) = self.dr.recv() { + *buf = tb; + return Ok(buf.len()); } Err(()) } @@ -39,7 +42,7 @@ pub mod general { } impl ReadWrapper for FdReader { - async fn read(&mut self, buf: &mut [u8]) -> Result { + async fn read(&mut self, buf: &mut Vec) -> Result { let r = self.br.read(buf).await; if let Ok(a) = r { return Ok(a); @@ -58,8 +61,7 @@ pub mod general { } pub struct DevWriter { - pub dr: DeviceWriter, - //pub dev: AsyncDevice + pub dr: Sender> } // TODO: implement custom Error @@ -67,8 +69,9 @@ pub mod general { async fn write(&mut self, msg: WriterMessage) -> Result { match msg { WriterMessage::Plain(buf) => { - if let Ok(a) = self.dr.write(&buf).await { - return Ok(a); + let l = buf.len(); + if let Ok(()) = self.dr.send(buf) { + return Ok(l); } Err(()) }, @@ -272,8 +275,10 @@ pub mod android { pub mod desktop { use crate::client::general::{CoreVpnClient, DevReader, DevWriter, VpnClient}; use crate::config::ClientConfiguration; + use futures::{SinkExt, StreamExt}; use log::info; use tokio::net::UdpSocket; + use crossbeam_channel::unbounded; #[cfg(target_os = "linux")] use network_interface::{NetworkInterface, NetworkInterfaceConfig}; @@ -362,14 +367,33 @@ pub mod desktop { info!("SSS: {:?}", &self.client_config.server.endpoint); let sock = UdpSocket::bind(("0.0.0.0", 0)).await.unwrap(); sock.connect(&self.client_config.server.endpoint).await.unwrap(); - - info!("AsyncDevice"); + let dev = create_as_async(&config).unwrap(); - info!("Split device"); - let dr = dev.split(); - let (dev_writer, dev_reader) = dr.unwrap(); - info!("CoreVpnClient"); - let mut client = CoreVpnClient{ client_config: self.client_config.clone(), dev_reader: DevReader{ dr: dev_reader }, dev_writer: DevWriter{dr: dev_writer}, close_token: tokio_util::sync::CancellationToken::new()}; + let (mut dev_writer , mut dev_reader) = dev.into_framed().split(); + + // dev_reader + let (tx, rx) = unbounded(); + + // dev_writer + let (wx, wrx) = unbounded(); + + tokio::spawn(async move { + loop { + if let Some(Ok(buf)) = dev_reader.next().await { + tx.send(buf); + } + } + }); + + tokio::spawn(async move { + loop { + if let Ok(buf) = wrx.recv() { + let _ = dev_writer.send(buf).await; + } + } + }); + + let mut client = CoreVpnClient{ client_config: self.client_config.clone(), dev_reader: DevReader{ dr: rx }, dev_writer: DevWriter{dr: wx}, close_token: tokio_util::sync::CancellationToken::new()}; info!("Platform specific code"); #[cfg(target_os = "linux")]