From 5b18cf7b5cb3f3dca8d1759b81460818f4ee1837 Mon Sep 17 00:00:00 2001 From: Troy Benson Date: Sun, 14 Jul 2024 19:28:14 +0000 Subject: [PATCH] feat: image processor docker --- .dockerignore | 7 +++- Cargo.toml | 2 + docker/ffmpeg.sh | 43 +------------------- docker/platform/image-processor.Dockerfile | 23 ----------- docker/rust.sh | 9 +++++ foundations/Cargo.toml | 1 - image-processor/Dockerfile | 47 ++++++++++++++++++++++ image-processor/src/drive/mod.rs | 1 - image-processor/src/drive/public_http.rs | 2 +- image-processor/src/management/grpc.rs | 9 +++-- image-processor/src/management/http.rs | 9 ++++- image-processor/src/management/mod.rs | 14 ++++++- image-processor/src/management/utils.rs | 8 ++++ image-processor/src/worker/mod.rs | 8 +++- 14 files changed, 107 insertions(+), 76 deletions(-) delete mode 100644 docker/platform/image-processor.Dockerfile create mode 100755 docker/rust.sh create mode 100644 image-processor/Dockerfile create mode 100644 image-processor/src/management/utils.rs diff --git a/.dockerignore b/.dockerignore index 8b5a6f9c9..2c0f9b58a 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,6 +1,11 @@ .vscode/ node_modules/ .env* -Dockerfile +**/Dockerfile +**/.dockerignore +**/.gitignore +.git/ +**/*.dockerfile dev/ .dockerignore +target/ diff --git a/Cargo.toml b/Cargo.toml index 8b40b0762..546abf845 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,8 @@ debug = true [profile.release-fat] inherits = "release" +debug = false +strip = true lto = "fat" [profile.wasm] diff --git a/docker/ffmpeg.sh b/docker/ffmpeg.sh index 892394b4a..f85415984 100755 --- a/docker/ffmpeg.sh +++ b/docker/ffmpeg.sh @@ -26,50 +26,11 @@ apt-get install -y --no-install-recommends \ libpng-dev \ libjpeg-dev \ libtiff-dev \ - libpng-16-16 \ + libpng16-16 \ libjpeg62 \ libtiff6 git clone https://github.com/ScuffleTV/external.git --depth 1 --recurse-submodule --shallow-submodules /tmp/external -/tmp/external/build.sh --prefix /usr/local --build "x264 x265 svt-av1 libvpx opus dav1d ffmpeg opencv" +/tmp/external/build.sh --prefix /usr/local --build "x264 x265 svt-av1 libvpx dav1d ffmpeg" ldconfig -rm -rf /tmp/external - -apt-get remove -y --purge \ - make \ - zip \ - unzip \ - curl \ - wget \ - git \ - ssh \ - ca-certificates \ - pkg-config \ - gnupg2 \ - cmake \ - clang-format \ - ninja-build \ - nasm \ - yasm \ - meson \ - libtool \ - autoconf \ - automake \ - build-essential \ - libpython3.11-stdlib \ - libpython3.11-minimal \ - libpython3.11 \ - python3.11 \ - python3.11-minimal \ - g++ \ - g++-12 \ - gcc \ - gcc-12 \ - "*-dev" \ - "*-dev-*" - -apt-get autoremove -y -apt-get clean -rm -rf /var/lib/apt/lists/* -rm /etc/ssh -rf diff --git a/docker/platform/image-processor.Dockerfile b/docker/platform/image-processor.Dockerfile deleted file mode 100644 index 0a34d7e9c..000000000 --- a/docker/platform/image-processor.Dockerfile +++ /dev/null @@ -1,23 +0,0 @@ -FROM ubuntu:lunar - -LABEL org.opencontainers.image.source=https://github.com/scuffletv/scuffle -LABEL org.opencontainers.image.description="Platform Image Processor Container for ScuffleTV" -LABEL org.opencontainers.image.licenses=BSD-4-Clause - -WORKDIR /app - -RUN --mount=type=bind,src=docker/ffmpeg.sh,dst=/mount/ffmpeg.sh \ - /mount/ffmpeg.sh - -RUN --mount=type=bind,src=docker/cve.sh,dst=/mount/cve.sh \ - /mount/cve.sh - -RUN --mount=type=bind,src=target/release/platform-image-processor,dst=/mount/platform-image-processor \ - cp /mount/platform-image-processor /app/platform-image-processor && \ - chmod +x /app/platform-image-processor - -# STOPSIGNAL SIGTERM - -# USER 1000 - -# ENTRYPOINT ["/app/platform-image-processor"] diff --git a/docker/rust.sh b/docker/rust.sh new file mode 100755 index 000000000..42709394a --- /dev/null +++ b/docker/rust.sh @@ -0,0 +1,9 @@ +set -ex + +apt-get update +apt-get install -y --no-install-recommends \ + build-essential \ + curl \ + ca-certificates + +curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --profile minimal diff --git a/foundations/Cargo.toml b/foundations/Cargo.toml index 7cc98e4f1..48f0ca041 100644 --- a/foundations/Cargo.toml +++ b/foundations/Cargo.toml @@ -251,5 +251,4 @@ default = [ "http", "http-tls", "http2", - # "http3", ] diff --git a/image-processor/Dockerfile b/image-processor/Dockerfile new file mode 100644 index 000000000..fdea6a66c --- /dev/null +++ b/image-processor/Dockerfile @@ -0,0 +1,47 @@ +FROM bitnami/minideb as builder + +WORKDIR /tmp + +ENV CARGO_HOME=/usr/local/cargo \ + PATH=/usr/local/cargo/bin:$PATH + +RUN --mount=type=bind,src=docker/ffmpeg.sh,dst=/mount/ffmpeg.sh \ + /mount/ffmpeg.sh + +RUN --mount=type=bind,src=docker/rust.sh,dst=/mount/rust.sh \ + /mount/rust.sh + +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + libclang-dev \ + protobuf-compiler \ + patchelf + +COPY . . + +ARG PROFILE=release-fat + +RUN cargo build --profile ${PROFILE} --bin scuffle-image-processor -p scuffle-image-processor --locked + +RUN mkdir /out && \ + mv target/${PROFILE}/scuffle-image-processor /out/image-processor && \ + ldd /out/image-processor | grep -o '/[^ ]*' | xargs -I '{}' cp {} /out && \ + patchelf --set-rpath '$ORIGIN' /out/image-processor + +FROM gcr.io/distroless/base-nossl-debian12 + +LABEL org.opencontainers.image.source=https://github.com/scuffletv/scuffle +LABEL org.opencontainers.image.description="Scuffle Image Processor" +LABEL org.opencontainers.image.licenses=BSD-4-Clause + +WORKDIR /app + +ENV LD_LIBRARY_PATH=/app:$LD_LIBRARY_PATH + +COPY --from=builder /out /app + +STOPSIGNAL SIGTERM + +USER 1000 + +ENTRYPOINT ["/app/image-processor"] diff --git a/image-processor/src/drive/mod.rs b/image-processor/src/drive/mod.rs index c953b4615..d71a45b68 100644 --- a/image-processor/src/drive/mod.rs +++ b/image-processor/src/drive/mod.rs @@ -99,7 +99,6 @@ impl Drive for AnyDrive { } async fn write(&self, path: &str, data: Bytes, options: Option) -> Result<(), DriveError> { - tracing::info!("writing to drive: {}", path); match self { AnyDrive::Local(drive) => drive.write(path, data, options).await, AnyDrive::S3(drive) => drive.write(path, data, options).await, diff --git a/image-processor/src/drive/public_http.rs b/image-processor/src/drive/public_http.rs index 541429178..728cea84b 100644 --- a/image-processor/src/drive/public_http.rs +++ b/image-processor/src/drive/public_http.rs @@ -29,7 +29,7 @@ impl PublicHttpDrive { pub async fn new(config: &PublicHttpDriveConfig) -> Result { tracing::debug!("setting up public http disk"); if !config.blacklist.is_empty() || !config.whitelist.is_empty() { - tracing::error!("blacklist and whitelist are not supported for public http disk"); + tracing::error!("blacklist and whitelist are not currently implemented for public http disk"); return Err(PublicHttpDriveError::Unsupported("blacklist and whitelist").into()); } diff --git a/image-processor/src/management/grpc.rs b/image-processor/src/management/grpc.rs index 94a11d269..aa3876a68 100644 --- a/image-processor/src/management/grpc.rs +++ b/image-processor/src/management/grpc.rs @@ -4,19 +4,21 @@ use tonic::{Request, Response}; use super::ManagementServer; impl ManagementServer { - pub async fn run_grpc(&self) -> Result<(), tonic::transport::Error> { - let addr = self.global.config().management.grpc.bind; + #[tracing::instrument(skip_all)] + pub async fn run_grpc(&self, addr: std::net::SocketAddr) -> Result<(), tonic::transport::Error> { let server = tonic::transport::Server::builder() .add_service(scuffle_image_processor_proto::image_processor_server::ImageProcessorServer::new(self.clone())) .serve_with_shutdown(addr, scuffle_foundations::context::Context::global().into_done()); - tracing::info!(%addr, "gRPC server listening"); + tracing::info!("gRPC management server listening on {}", addr); + server.await } } #[async_trait::async_trait] impl scuffle_image_processor_proto::image_processor_server::ImageProcessor for ManagementServer { + #[tracing::instrument(skip_all)] async fn process_image(&self, request: Request) -> tonic::Result> { let resp = match self.process_image(request.into_inner()).await { Ok(resp) => resp, @@ -30,6 +32,7 @@ impl scuffle_image_processor_proto::image_processor_server::ImageProcessor for M Ok(Response::new(resp)) } + #[tracing::instrument(skip_all)] async fn cancel_task(&self, request: Request) -> tonic::Result> { let resp = match self.cancel_task(request.into_inner()).await { Ok(resp) => resp, diff --git a/image-processor/src/management/http.rs b/image-processor/src/management/http.rs index 3c6bf111e..2afc8f2db 100644 --- a/image-processor/src/management/http.rs +++ b/image-processor/src/management/http.rs @@ -8,14 +8,16 @@ use scuffle_image_processor_proto::{ use super::ManagementServer; impl ManagementServer { - pub async fn run_http(&self) -> Result<(), scuffle_foundations::http::server::Error> { + #[tracing::instrument(skip_all)] + pub async fn run_http(&self, addr: std::net::SocketAddr) -> Result<(), scuffle_foundations::http::server::Error> { let router = Router::new() .route("/process_image", post(process_image)) .route("/cancel_task", post(cancel_task)) .fallback(not_found) .with_state(self.clone()); - let addr = self.global.config().management.http.bind; + tracing::info!("HTTP management server listening on {}", addr); + scuffle_foundations::http::server::Server::builder() .bind(addr) .build(router)? @@ -24,10 +26,12 @@ impl ManagementServer { } } +#[tracing::instrument(skip_all)] async fn not_found() -> (http::StatusCode, &'static str) { (http::StatusCode::NOT_FOUND, "Not Found") } +#[tracing::instrument(skip_all)] async fn process_image( State(server): State, Json(request): Json, @@ -48,6 +52,7 @@ async fn process_image( (status, Json(resp)) } +#[tracing::instrument(skip_all)] async fn cancel_task( State(server): State, Json(request): Json, diff --git a/image-processor/src/management/mod.rs b/image-processor/src/management/mod.rs index 3e3d3ba3b..4d1ed4ea7 100644 --- a/image-processor/src/management/mod.rs +++ b/image-processor/src/management/mod.rs @@ -17,6 +17,7 @@ use crate::worker::process::DecoderFrontend; pub mod grpc; pub mod http; +mod utils; mod validation; #[derive(Clone)] @@ -25,9 +26,12 @@ struct ManagementServer { } impl ManagementServer { + #[tracing::instrument(skip_all)] async fn process_image(&self, mut request: ProcessImageRequest) -> Result { let mut fragment = FragmentBuf::new(); + tracing::info!("new process image request"); + validate_task( &self.global, fragment.push("task"), @@ -128,7 +132,10 @@ impl ManagementServer { }) } + #[tracing::instrument(skip_all)] async fn cancel_task(&self, request: CancelTaskRequest) -> Result { + tracing::info!("new cancel task request"); + match Job::cancel( &self.global, request.id.parse().map_err(|err| Error { @@ -154,19 +161,22 @@ impl ManagementServer { } } +#[tracing::instrument(skip_all)] pub async fn start(global: Arc) -> anyhow::Result<()> { let server = ManagementServer { global }; let http = async { if server.global.config().management.http.enabled { - server.run_http().await.context("http") + let addr = utils::true_bind(server.global.config().management.http.bind).await?; + server.run_http(addr).await.context("http") } else { Ok(()) } }; let grpc = async { if server.global.config().management.grpc.enabled { - server.run_grpc().await.context("grpc") + let addr = utils::true_bind(server.global.config().management.grpc.bind).await?; + server.run_grpc(addr).await.context("grpc") } else { Ok(()) } diff --git a/image-processor/src/management/utils.rs b/image-processor/src/management/utils.rs new file mode 100644 index 000000000..b024ac8be --- /dev/null +++ b/image-processor/src/management/utils.rs @@ -0,0 +1,8 @@ +pub async fn true_bind(addr: std::net::SocketAddr) -> std::io::Result { + if addr.port() == 0 { + let bind = tokio::net::TcpListener::bind(addr).await?; + bind.local_addr() + } else { + Ok(addr) + } +} diff --git a/image-processor/src/worker/mod.rs b/image-processor/src/worker/mod.rs index f1110b2f1..5313d75df 100644 --- a/image-processor/src/worker/mod.rs +++ b/image-processor/src/worker/mod.rs @@ -21,6 +21,8 @@ pub async fn start(global: Arc) -> anyhow::Result<()> { let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency)); + tracing::info!("worker started with {} threads", concurrency); + let mut error_count = 0; let (_, handle) = context::Context::new(); @@ -38,8 +40,12 @@ pub async fn start(global: Arc) -> anyhow::Result<()> { }; let job = match Job::fetch(&global).await { - Ok(Some(job)) => job, + Ok(Some(job)) => { + tracing::debug!("fetched job"); + job + } Ok(None) => { + tracing::debug!("no jobs found"); tokio::time::sleep(config.worker.polling_interval).await; continue; }