refactor(async-pop): added comments, refactored some code to be more concise
continuous-integration/drone/push Build is failing Details

main
Guus van Meerveld 2 months ago
parent 2871fe13ce
commit b9053a8703
Signed by: Guusvanmeerveld
GPG Key ID: 2BA7D7912771966E

@ -7,12 +7,16 @@ mod utils;
use async_native_tls::{TlsConnector, TlsStream};
use parse::{parse_capabilities, Parser};
use socket::Socket;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::{TcpStream, ToSocketAddrs},
time::{timeout, Duration},
};
use types::{Capabilities, Capability, Stats, StatsResponse, UniqueIDResponse};
use types::{
Capabilities, Capability, Error, ErrorKind, Result, Stats, StatsResponse, UniqueIDResponse,
};
use utils::create_command;
#[derive(Eq, PartialEq, Debug)]
@ -39,9 +43,10 @@ fn get_connection_timeout(timeout: Option<Duration>) -> Duration {
}
}
/// Creates a client from a given socket connection.
async fn create_client_from_socket<S: AsyncRead + AsyncWrite + Unpin>(
socket: Socket<S>,
) -> types::Result<Client<S>> {
) -> Result<Client<S>> {
let mut client = Client {
marked_as_del: Vec::new(),
capabilities: Vec::new(),
@ -73,8 +78,11 @@ async fn create_client_from_socket<S: AsyncRead + AsyncWrite + Unpin>(
/// client.quit().unwrap();
/// }
/// ```
pub async fn new<S: AsyncRead + AsyncWrite + Unpin>(stream: S) -> types::Result<Client<S>> {
let socket = Socket::new(stream);
pub async fn new<S: AsyncRead + AsyncWrite + Unpin>(
stream: S,
timeout: Option<Duration>,
) -> Result<Client<S>> {
let socket = Socket::new(stream, timeout);
create_client_from_socket(socket).await
}
@ -85,14 +93,14 @@ pub async fn connect<A: ToSocketAddrs>(
domain: &str,
tls_connector: &TlsConnector,
connection_timeout: Option<Duration>,
) -> types::Result<Client<TlsStream<TcpStream>>> {
) -> Result<Client<TlsStream<TcpStream>>> {
let connection_timeout = get_connection_timeout(connection_timeout);
let tcp_stream = timeout(connection_timeout, TcpStream::connect(addr)).await??;
let tls_stream = tls_connector.connect(domain, tcp_stream).await?;
let socket = Socket::new(tls_stream);
let socket = Socket::new(tls_stream, None);
create_client_from_socket(socket).await
}
@ -103,18 +111,19 @@ pub async fn connect<A: ToSocketAddrs>(
pub async fn connect_plain<A: ToSocketAddrs>(
addr: A,
connection_timeout: Option<Duration>,
) -> types::Result<Client<TcpStream>> {
) -> Result<Client<TcpStream>> {
let connection_timeout = get_connection_timeout(connection_timeout);
let tcp_stream = timeout(connection_timeout, TcpStream::connect(addr)).await??;
let socket = Socket::new(tcp_stream);
let socket = Socket::new(tcp_stream, None);
create_client_from_socket(socket).await
}
impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
fn get_socket_mut(&mut self) -> types::Result<&mut Socket<S>> {
/// Check if the client is in the correct state and return a mutable reference to the tcp connection.
fn get_socket_mut(&mut self) -> Result<&mut Socket<S>> {
match self.socket.as_mut() {
Some(socket) => {
if self.state == ClientState::Transaction
@ -122,23 +131,24 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
{
Ok(socket)
} else {
Err(types::Error::new(
types::ErrorKind::ShouldNotBeConnected,
Err(Error::new(
ErrorKind::ShouldNotBeConnected,
"There is a connection, but our state indicates that we should not be connected",
))
}
}
None => Err(types::Error::new(
types::ErrorKind::NotConnected,
None => Err(Error::new(
ErrorKind::NotConnected,
"Not connected to any server",
)),
}
}
fn is_correct_state(&self, state: ClientState) -> types::Result<()> {
/// Check if the client is in the correct state.
fn check_client_state(&self, state: ClientState) -> Result<()> {
if self.state != state {
Err(types::Error::new(
types::ErrorKind::IncorrectStateForCommand,
Err(Error::new(
ErrorKind::IncorrectStateForCommand,
"The connection is not the right state to use this command",
))
} else {
@ -170,7 +180,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
/// client.noop()?;
/// ```
/// https://www.rfc-editor.org/rfc/rfc1939#page-9
pub async fn noop(&mut self) -> types::Result<()> {
pub async fn noop(&mut self) -> Result<()> {
let socket = self.get_socket_mut()?;
let command = "NOOP";
@ -180,14 +190,15 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
Ok(())
}
pub async fn uidl(&mut self, msg_number: Option<u32>) -> types::Result<UniqueIDResponse> {
self.has_capability_else_err(vec![Capability::Uidl])?;
pub async fn uidl(&mut self, msg_number: Option<u32>) -> Result<UniqueIDResponse> {
self.check_capability(vec![Capability::Uidl])?;
let response_is_multi_line = msg_number.is_none();
match msg_number.as_ref() {
Some(msg_number) => self.check_deleted(msg_number)?,
None => {}
};
if !response_is_multi_line {
self.is_deleted_else_err(msg_number.as_ref().unwrap())?;
}
let response_is_multi_line = msg_number.is_none();
let socket = self.get_socket_mut()?;
@ -210,10 +221,10 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
}
}
pub async fn top(&mut self, msg_number: u32, lines: u32) -> types::Result<Vec<u8>> {
self.is_deleted_else_err(&msg_number)?;
pub async fn top(&mut self, msg_number: u32, lines: u32) -> Result<Vec<u8>> {
self.check_deleted(&msg_number)?;
self.has_capability_else_err(vec![Capability::Top])?;
self.check_capability(vec![Capability::Top])?;
let socket = self.get_socket_mut()?;
@ -246,7 +257,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
}
}
fn is_deleted_else_err(&mut self, msg_number: &u32) -> types::Result<()> {
fn check_deleted(&mut self, msg_number: &u32) -> Result<()> {
if self.is_deleted(msg_number) {
Err(types::Error::new(
types::ErrorKind::MessageIsDeleted,
@ -273,8 +284,8 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
///
/// println!("{}", is_deleted);
/// ```
pub async fn dele(&mut self, msg_number: u32) -> types::Result<()> {
self.is_deleted_else_err(&msg_number)?;
pub async fn dele(&mut self, msg_number: u32) -> Result<()> {
self.check_deleted(&msg_number)?;
let socket = self.get_socket_mut()?;
@ -298,7 +309,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
/// client.rset().unwrap();
/// ```
/// https://www.rfc-editor.org/rfc/rfc1939#page-9
pub async fn rset(&mut self) -> types::Result<()> {
pub async fn rset(&mut self) -> Result<()> {
let socket = self.get_socket_mut()?;
let command = b"RSET";
@ -331,8 +342,8 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
/// println!("{}", subject);
/// ```
/// https://www.rfc-editor.org/rfc/rfc1939#page-8
pub async fn retr(&mut self, msg_number: u32) -> types::Result<Vec<u8>> {
self.is_deleted_else_err(&msg_number)?;
pub async fn retr(&mut self, msg_number: u32) -> Result<Vec<u8>> {
self.check_deleted(&msg_number)?;
let socket = self.get_socket_mut()?;
@ -349,16 +360,19 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
Ok(response)
}
pub async fn list(&mut self, msg_number: Option<u32>) -> types::Result<StatsResponse> {
if msg_number.is_some() {
self.is_deleted_else_err(msg_number.as_ref().unwrap())?;
}
pub async fn list(&mut self, msg_number: Option<u32>) -> Result<StatsResponse> {
match msg_number.as_ref() {
Some(msg_number) => {
self.check_deleted(msg_number)?;
}
None => {}
};
let socket = self.get_socket_mut()?;
let response_is_multi_line = msg_number.is_none();
let arguments = if msg_number.is_some() {
let arguments = if !response_is_multi_line {
vec![msg_number.unwrap().to_string()]
} else {
Vec::new()
@ -379,7 +393,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
}
}
pub async fn stat(&mut self) -> types::Result<Stats> {
pub async fn stat(&mut self) -> Result<Stats> {
let socket = self.get_socket_mut()?;
let command = b"STAT";
@ -393,8 +407,8 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
Ok(stats)
}
pub async fn apop(&mut self, name: &str, digest: &str) -> types::Result<()> {
self.is_correct_state(ClientState::Authentication)?;
pub async fn apop(&mut self, name: &str, digest: &str) -> Result<()> {
self.check_client_state(ClientState::Authentication)?;
self.has_read_greeting()?;
@ -405,13 +419,14 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
socket.send_command(command, false).await?;
self.state = ClientState::Transaction;
Ok(())
}
pub async fn auth<U: AsRef<str>>(&mut self, token: U) -> types::Result<()> {
self.is_correct_state(ClientState::Authentication)?;
pub async fn auth<U: AsRef<str>>(&mut self, token: U) -> Result<()> {
self.check_client_state(ClientState::Authentication)?;
self.has_capability_else_err(vec![Capability::Sasl(vec![String::from("XOAUTH2")])])?;
self.check_capability(vec![Capability::Sasl(vec![String::from("XOAUTH2")])])?;
self.has_read_greeting()?;
@ -430,10 +445,10 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
&mut self,
user: U,
password: P,
) -> types::Result<()> {
self.is_correct_state(ClientState::Authentication)?;
) -> Result<()> {
self.check_client_state(ClientState::Authentication)?;
self.has_capability_else_err(vec![
self.check_capability(vec![
Capability::User,
Capability::Sasl(vec![String::from("PLAIN")]),
])?;
@ -457,7 +472,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
Ok(())
}
pub async fn quit(&mut self) -> types::Result<()> {
pub async fn quit(&mut self) -> Result<()> {
let socket = self.get_socket_mut()?;
let command = b"QUIT";
@ -489,7 +504,8 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
}
}
fn has_capability_else_err(&mut self, capability: Vec<Capability>) -> types::Result<()> {
/// Make sure the given capabilities are present
fn check_capability(&mut self, capability: Vec<Capability>) -> Result<()> {
if !self.has_capability(capability) {
Err(types::Error::new(
types::ErrorKind::FeatureUnsupported,
@ -506,7 +522,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
}
/// Fetches a list of capabilities for the currently connected server and returns it.
pub async fn capa(&mut self) -> types::Result<Capabilities> {
pub async fn capa(&mut self) -> Result<Capabilities> {
let socket = self.get_socket_mut()?;
let command = b"CAPA";
@ -516,7 +532,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
Ok(parse_capabilities(&response))
}
fn has_read_greeting(&self) -> types::Result<()> {
fn has_read_greeting(&self) -> Result<()> {
if !self.read_greeting {
Err(types::Error::new(
types::ErrorKind::ServerFailedToGreet,
@ -527,7 +543,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
}
}
async fn read_greeting(&mut self) -> types::Result<String> {
async fn read_greeting(&mut self) -> Result<String> {
assert!(!self.read_greeting, "Cannot read greeting twice");
let socket = self.get_socket_mut()?;

@ -112,6 +112,9 @@ pub fn parse_server_response<'a>(full_response: &'a str) -> Result<&'a str> {
}
}
/// Parse the capabilities from a string formatted according to the rfc:
///
/// https://www.rfc-editor.org/rfc/rfc2449#page-4
pub fn parse_capabilities(response: &str) -> Capabilities {
let end_of_line = char::from_u32(LF as u32).unwrap();

@ -8,33 +8,37 @@ use tokio::{
use crate::{
constants::{DOT, END_OF_LINE, EOF, LF},
parse::{parse_server_response, parse_utf8_bytes},
types::{self, Error, ErrorKind},
types::{Error, ErrorKind, Result},
};
pub struct Socket<T: AsyncRead + AsyncWrite + Unpin> {
timeout: Duration,
stream: BufStream<T>,
}
impl<T: AsyncRead + AsyncWrite + Unpin> Socket<T> {
const RESPONSE_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_secs(5);
pub fn new(stream: T) -> Socket<T> {
pub fn new(stream: T, timeout: Option<Duration>) -> Socket<T> {
Self {
timeout: timeout.unwrap_or(Self::DEFAULT_RESPONSE_TIMEOUT),
stream: BufStream::new(stream),
}
}
/// Send a command to the server and read the response into a string.
pub async fn send_command<C: AsRef<[u8]>>(
&mut self,
command: C,
multi_line_response: bool,
) -> types::Result<String> {
) -> Result<String> {
self.send_bytes(command.as_ref()).await?;
self.read_response(multi_line_response).await
}
pub async fn read_response(&mut self, multi_line_response: bool) -> types::Result<String> {
/// Reads a response from the server.
pub async fn read_response(&mut self, multi_line_response: bool) -> Result<String> {
let mut response: Vec<u8> = Vec::new();
if multi_line_response {
@ -65,12 +69,17 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Socket<T> {
}
}
pub async fn read_multi_line(&mut self, buf: &mut Vec<u8>) -> types::Result<usize> {
/// Read a multiline response and insert it into a given buffer.
pub async fn read_multi_line(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
let mut total_bytes_read: usize = 0;
loop {
let bytes_read = self.read_line(buf).await?;
if bytes_read < 1 {
return Ok(bytes_read);
}
total_bytes_read += bytes_read;
let start_of_line = buf.len().saturating_sub(bytes_read);
@ -101,36 +110,37 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Socket<T> {
}
}
async fn read_line(&mut self, buf: &mut Vec<u8>) -> types::Result<usize> {
match timeout(Self::RESPONSE_TIMEOUT, self.stream.read_until(LF, buf))
async fn read_line(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
match timeout(self.timeout, self.stream.read_until(LF, buf))
.await
.map_err(|_| {
Error::new(
ErrorKind::InvalidResponse,
format!(
"Server did not respond with a valid response within {} seconds",
Self::RESPONSE_TIMEOUT.as_secs()
self.timeout.as_secs()
),
)
})? {
Ok(bytes_read) => {
if bytes_read == 0 {
return Err(types::Error::new(
types::ErrorKind::NoResponse,
return Err(Error::new(
ErrorKind::NoResponse,
"Server did not send any bytes",
));
}
Ok(bytes_read)
}
Err(err) => Err(types::Error::new(
types::ErrorKind::InvalidResponse,
Err(err) => Err(Error::new(
ErrorKind::InvalidResponse,
format!("Failed to read server response: {}", err.to_string()),
)),
}
}
pub async fn send_bytes(&mut self, buf: &[u8]) -> types::Result<()> {
/// Send some bytes to the server
pub async fn send_bytes(&mut self, buf: &[u8]) -> Result<()> {
self.stream.write_all(buf).await?;
self.stream.write_all(&END_OF_LINE).await?;

@ -1,13 +1,13 @@
use std::{error, fmt};
use async_native_tls::Error as TlsError;
use tokio::{io::Error as IoError, time::error::Elapsed};
use tokio::{io::Error as IoError, time::error::Elapsed as TimeoutError};
#[derive(Debug)]
pub enum ErrorKind {
Tls(TlsError),
Io(IoError),
Timeout(Elapsed),
Timeout(TimeoutError),
Connect,
NotConnected,
ShouldNotBeConnected,
@ -88,8 +88,8 @@ impl From<IoError> for Error {
}
}
impl From<Elapsed> for Error {
fn from(timeout_error: Elapsed) -> Self {
impl From<TimeoutError> for Error {
fn from(timeout_error: TimeoutError) -> Self {
Self::new(
ErrorKind::Timeout(timeout_error),
"Timeout when connecting to server",

Loading…
Cancel
Save