Skip to content

Commit

Permalink
refactor: use iterative render retry
Browse files Browse the repository at this point in the history
  • Loading branch information
ginkoid committed May 7, 2024
1 parent b96df35 commit 30fad33
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 65 deletions.
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ strip = true

[dependencies]
anyhow = "1.0.82"
async-recursion = "1.1.0"
axum = "0.7.5"
thiserror = "1.0.59"
tokio = { version = "1.37.0", features = ["full"] }
106 changes: 56 additions & 50 deletions src/bin/web.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::{bail, Error, Result};
use async_recursion::async_recursion;
use axum::{
body,
extract::{Path, State},
Expand All @@ -13,28 +12,29 @@ use tex::proto;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
task, time,
sync::Mutex,
task::{self, JoinHandle},
time,
};

#[tokio::main]
async fn main() {
let render_endpoint = env::var("RENDER_ENDPOINT").unwrap_or("localhost:5000".to_string());
let pool_size = env::var("POOL_SIZE")
.unwrap_or("2".to_string())
.parse::<usize>()
.unwrap();
async fn main() -> Result<()> {
let render_endpoint =
env::var("RENDER_ENDPOINT").unwrap_or_else(|_| "localhost:5000".to_string());
let pool_size = env::var("POOL_SIZE").map_or(2, |p| {
p.parse::<usize>().expect("POOL_SIZE should be a number")
});
let app = sync::Arc::new(App {
pool: RenderPool::new(pool_size, render_endpoint.as_str())
.await
.unwrap(),
pool: RenderPool::new(pool_size, render_endpoint.as_str()).await?,
});
let router = Router::new()
.route("/health", get(|| async { "ok" }))
.route("/render", post(render_post))
.route("/render/:tex", get(render_get))
.with_state(app);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, router).await.unwrap();
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
axum::serve(listener, router).await?;
Ok(())
}

struct App {
Expand Down Expand Up @@ -84,25 +84,25 @@ enum RenderError {
}

struct RenderPool {
streams: sync::Mutex<VecDeque<task::JoinHandle<Result<TcpStream>>>>,
streams: Mutex<VecDeque<task::JoinHandle<Result<TcpStream>>>>,
size: usize,
addr: String,
}

impl RenderPool {
async fn new(size: usize, addr: &str) -> Result<Self> {
let pool = Self {
streams: sync::Mutex::new(VecDeque::with_capacity(size)),
streams: Mutex::new(VecDeque::with_capacity(size)),
size,
addr: addr.to_string(),
};
for _ in 0..size {
pool.connect();
pool.connect().await;
}
Ok(pool)
}

fn connect(&self) {
async fn connect(&self) {
let addr = self.addr.clone();
let handle = task::spawn(async move {
for _ in 0..3 {
Expand All @@ -115,44 +115,50 @@ impl RenderPool {
}
bail!("failed to connect");
});
self.streams.lock().unwrap().push_back(handle);
self.streams.lock().await.push_back(handle);
}

#[async_recursion]
async fn do_render(&self, content: body::Bytes, tries: usize) -> Result<body::Bytes> {
if tries > self.size {
bail!("too many tries");
}
async fn get_response(
handle: JoinHandle<Result<TcpStream, Error>>,
content: &mut body::Bytes,
) -> Result<proto::Response, Error> {
let mut stream = handle.await??;
stream.write_all_buf(&mut content.clone()).await?;
stream.write_all(b"\n\\end{document}\n").await?;

self.connect();
let handle = self.streams.lock().unwrap().pop_front().unwrap();
let response = match time::timeout(time::Duration::from_secs(5), async {
let mut stream = handle.await??;
stream.write_all_buf(&mut content.clone()).await?;
stream.write_all(b"\n\\end{document}\n").await?;

let code = stream.read_u32().await?;
let mut data = vec![0; stream.read_u32().await? as usize];
stream.read_exact(&mut data[..]).await?;
Ok::<proto::Response, Error>(proto::Response {
code: code.try_into()?,
data,
})
let code = stream.read_u32().await?;
let mut data = vec![0; stream.read_u32().await? as usize];
stream.read_exact(&mut data[..]).await?;
Ok::<proto::Response, Error>(proto::Response {
code: code.try_into()?,
data,
})
.await
{
Ok(Ok(response)) => response,
Ok(Err(_)) => return self.do_render(content, tries + 1).await,
Err(_) => return Err(RenderError::Timeout.into()),
};
match response.code {
proto::Code::Ok => Ok(response.data.into()),
proto::Code::ErrTex => Err(RenderError::Tex(String::from_utf8(response.data)?).into()),
_ => bail!("internal error: {:?}", response.code),
}
}

async fn render(&self, content: body::Bytes) -> Result<body::Bytes> {
self.do_render(content, 0).await
async fn render(&self, mut content: body::Bytes) -> Result<body::Bytes> {
for _ in 0..self.size + 1 {
self.connect().await;
let handle = self.streams.lock().await.pop_front().unwrap();

let response = match time::timeout(
time::Duration::from_secs(5),
RenderPool::get_response(handle, &mut content),
)
.await
{
Ok(Ok(response)) => response,
Ok(Err(_)) => continue,
Err(_) => return Err(RenderError::Timeout.into()),
};

return match response.code {
proto::Code::Ok => Ok(response.data.into()),
proto::Code::ErrTex => {
Err(RenderError::Tex(String::from_utf8(response.data)?).into())
}
_ => bail!("internal error: {:?}", response.code),
};
}
bail!("too many tries")
}
}
4 changes: 2 additions & 2 deletions src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ impl From<Code> for u32 {

impl TryFrom<u32> for Code {
type Error = Error;
fn try_from(n: u32) -> Result<Self> {
fn try_from(n: u32) -> Result<Self, Self::Error> {
Ok(match n {
0 => Self::Ok,
1 => Self::ErrTex,
2 => Self::ErrMupdf,
3 => Self::ErrInternal,
_ => bail!("invalid code"),
_ => bail!("invalid code: {}", n),
})
}
}
Expand Down

0 comments on commit 30fad33

Please sign in to comment.