Limit number of parallel image conversions
Tokio will happy start hundreds of parallel image conversions, which overwhelms seaf_fuse, so limit the parallel conversion to 4 using a semaphore.
This commit is contained in:
		| @@ -10,7 +10,7 @@ use error::AppError; | |||||||
| use exif::Exif; | use exif::Exif; | ||||||
| use image::{imageops::FilterType, DynamicImage}; | use image::{imageops::FilterType, DynamicImage}; | ||||||
| use serde::{Serialize, Deserialize}; | use serde::{Serialize, Deserialize}; | ||||||
| use tokio::task; | use tokio::{task, sync::Semaphore}; | ||||||
| use tower_http::{trace::{self, TraceLayer}, compression::CompressionLayer}; | use tower_http::{trace::{self, TraceLayer}, compression::CompressionLayer}; | ||||||
| use tracing::Level; | use tracing::Level; | ||||||
| use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; | use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; | ||||||
| @@ -34,6 +34,7 @@ type SecretKey = [u8; 64]; | |||||||
| type ImageListCache = Arc<RwLock<Vec<ImageInfo>>>; | type ImageListCache = Arc<RwLock<Vec<ImageInfo>>>; | ||||||
| type ImageCache = Arc<RwLock<HashMap<OsString, (DateTime<Utc>, Vec<u8>)>>>; | type ImageCache = Arc<RwLock<HashMap<OsString, (DateTime<Utc>, Vec<u8>)>>>; | ||||||
| type Config = Arc<RwLock<config::Config>>; | type Config = Arc<RwLock<config::Config>>; | ||||||
|  | type CpuTaskLimit = Arc<Semaphore>; | ||||||
|  |  | ||||||
| #[derive(Clone, extract::FromRef)] | #[derive(Clone, extract::FromRef)] | ||||||
| struct ApplicationState { | struct ApplicationState { | ||||||
| @@ -43,6 +44,7 @@ struct ApplicationState { | |||||||
|     image_dir: ImageDir, |     image_dir: ImageDir, | ||||||
|     image_list_cache: ImageListCache, |     image_list_cache: ImageListCache, | ||||||
|     secret_key: SecretKey, |     secret_key: SecretKey, | ||||||
|  |     cpu_task_limit: CpuTaskLimit, | ||||||
| } | } | ||||||
|  |  | ||||||
| #[tokio::main] | #[tokio::main] | ||||||
| @@ -102,6 +104,7 @@ async fn main() { | |||||||
|             image_dir, |             image_dir, | ||||||
|             image_list_cache, |             image_list_cache, | ||||||
|             secret_key, |             secret_key, | ||||||
|  |             cpu_task_limit: Arc::new(Semaphore::new(4)), | ||||||
|         }); |         }); | ||||||
|  |  | ||||||
|     let addr = SocketAddr::from(([0, 0, 0, 0], 3000)); |     let addr = SocketAddr::from(([0, 0, 0, 0], 3000)); | ||||||
| @@ -265,6 +268,7 @@ async fn converted_image( | |||||||
|     State(image_dir): State<ImageDir>, |     State(image_dir): State<ImageDir>, | ||||||
|     State(secret_key): State<SecretKey>, |     State(secret_key): State<SecretKey>, | ||||||
|     session: ReadableSession, |     session: ReadableSession, | ||||||
|  |     State(cpu_task_limit): State<CpuTaskLimit>, | ||||||
| ) -> Result<impl IntoResponse, AppError> { | ) -> Result<impl IntoResponse, AppError> { | ||||||
|     session.get::<()>("logged_in") |     session.get::<()>("logged_in") | ||||||
|         .ok_or(anyhow!("Trying to load image while not logged in!")) |         .ok_or(anyhow!("Trying to load image while not logged in!")) | ||||||
| @@ -308,6 +312,7 @@ async fn converted_image( | |||||||
|             image_buffer |             image_buffer | ||||||
|         } |         } | ||||||
|         None => { |         None => { | ||||||
|  |             let _cpu_task_permit = cpu_task_limit.clone().acquire_owned().await?; | ||||||
|             let image_buffer = task::spawn_blocking(move || { |             let image_buffer = task::spawn_blocking(move || { | ||||||
|                 convert_image(&image_path) |                 convert_image(&image_path) | ||||||
|                     .with_context(|| format!("Could not convert image {:?}", image_path)) |                     .with_context(|| format!("Could not convert image {:?}", image_path)) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user