Skip to content

Commit

Permalink
feat: image processor redesign
Browse files Browse the repository at this point in the history
  • Loading branch information
TroyKomodo authored Jun 23, 2024
1 parent d5a6822 commit 042cb07
Show file tree
Hide file tree
Showing 266 changed files with 7,821 additions and 6,507 deletions.
3,376 changes: 934 additions & 2,442 deletions Cargo.lock

Large diffs are not rendered by default.

43 changes: 22 additions & 21 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
[workspace]

members = [
"platform/api",
"platform/image_processor",
"video/edge",
"video/ingest",
"video/transcoder",
"video/lib/*",
"video/api",
"video/player",
"video/player_types",
"video/common",
"video/cli",
"binary-helper",
"utils",
"proto",
"config",
"config/derive",
# "platform/api",
"image-processor",
"image-processor/proto",
# "video/edge",
# "video/ingest",
# "video/transcoder",
# "video/lib/*",
# "video/api",
# "video/player",
# "video/player_types",
# "video/common",
# "video/cli",
# "binary-helper",
# "utils",
# "proto",
# "config",
# "config/derive",
"ffmpeg",
"foundations",
"foundations/macros",
Expand Down Expand Up @@ -52,7 +53,7 @@ h265 = { path = "video/lib/h265" }
mp4 = { path = "video/lib/mp4" }
rtmp = { path = "video/lib/rtmp" }
transmuxer = { path = "video/lib/transmuxer" }
utils = { path = "utils", default-features = false, package = "scuffle-utils" }
scuffle-utils = { path = "utils", default-features = false }
config = { path = "config", package = "scuffle-config" }
pb = { path = "proto" }
video-common = { path = "video/common" }
Expand All @@ -62,12 +63,12 @@ video-edge = { path = "video/edge" }
video-ingest = { path = "video/ingest" }
video-transcoder = { path = "video/transcoder" }
binary-helper = { path = "binary-helper" }
ffmpeg = { path = "ffmpeg" }
scuffle-ffmpeg = { path = "ffmpeg" }

# These patches are pending PRs to the upstream crates
# TODO: Remove these once the PRs are merged
[patch.crates-io]
# https://github.com/remkop22/postgres-from-row/pull/9
postgres-from-row = { git = "https://github.com/ScuffleTV/postgres-from-row.git", branch = "troy/from_fn" }
# https://github.com/madonoharu/tsify/pull/32
tsify = { git = "https://github.com/ScuffleTV/tsify.git", branch = "sisou/comments" }
# postgres-from-row = { git = "https://github.com/ScuffleTV/postgres-from-row.git", branch = "troy/from_fn" }
# # https://github.com/madonoharu/tsify/pull/32
# tsify = { git = "https://github.com/ScuffleTV/tsify.git", branch = "sisou/comments" }
11 changes: 6 additions & 5 deletions binary-helper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ tracing = "0.1"
thiserror = "1.0"
tokio = { version = "1.36", features = ["full"] }
serde = { version = "1.0.1", features = ["derive"] }
async-nats = "0.33"
async-nats = "0.34"
ulid = "1.1"
async-trait = "0.1"
tonic = { version = "0.11", features = ["tls"] }
anyhow = "1.0"
tower-layer = "0.3"
async-stream = "0.3"
futures-util = "0.3"
rustls = "0.22"
rustls = "0.23"
rustls-pemfile = "2.0"
fred = { version = "8.0.0", features = ["enable-rustls", "sentinel-client", "dns"] }
tokio-postgres-rustls = "0.11"
tokio-postgres-rustls = "0.12"
tracing-subscriber = { features = ["env-filter", "fmt", "json"], version = "0.3" }
once_cell = "1.19"
aws-config = { version = "1.1" }
Expand All @@ -31,13 +31,14 @@ http-body = { version = "1.0.0"}
hyper = "1"
bytes = "1.0"
pin-project = "1"
tokio-rustls = "0.25"

tokio-postgres = { version = "0.7" }
postgres-types = { version = "0.2", features = ["with-serde_json-1", "with-chrono-0_4", "derive"] }
deadpool-postgres = { version = "0.12" }
deadpool-postgres = { version = "0.13" }
postgres-from-row = { version = "0.5" }
prost = { version = "0.12" }

config = { workspace = true }
utils = { workspace = true, features = ["all"] }
scuffle-utils = { workspace = true, features = ["all"] }
pb = { workspace = true }
38 changes: 20 additions & 18 deletions binary-helper/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use fred::interfaces::ClientLike;
use fred::types::ServerConfig;
use hyper::StatusCode;
use rustls::RootCertStore;
use utils::database::deadpool_postgres::{ManagerConfig, PoolConfig, RecyclingMethod, Runtime};
use utils::database::tokio_postgres::NoTls;
use utils::database::Pool;
use utils::http::RouteError;
use scuffle_utils::database::deadpool_postgres::{ManagerConfig, PoolConfig, RecyclingMethod, Runtime};
use scuffle_utils::database::tokio_postgres::NoTls;
use scuffle_utils::database::Pool;
use scuffle_utils::http::RouteError;

use crate::config::{DatabaseConfig, NatsConfig, RedisConfig};

Expand Down Expand Up @@ -40,7 +40,7 @@ macro_rules! impl_global_traits {

impl binary_helper::global::GlobalDb for $struct {
#[inline(always)]
fn db(&self) -> &Arc<utils::database::Pool> {
fn db(&self) -> &Arc<scuffle_utils::database::Pool> {
&self.db
}
}
Expand All @@ -50,7 +50,7 @@ macro_rules! impl_global_traits {
}

pub trait GlobalCtx {
fn ctx(&self) -> &utils::context::Context;
fn ctx(&self) -> &scuffle_utils::context::Context;
}

pub trait GlobalConfig {
Expand Down Expand Up @@ -124,16 +124,16 @@ pub async fn setup_nats(
Ok((nats, jetstream))
}

pub async fn setup_database(config: &DatabaseConfig) -> anyhow::Result<Arc<utils::database::Pool>> {
pub async fn setup_database(config: &DatabaseConfig) -> anyhow::Result<Arc<scuffle_utils::database::Pool>> {
let mut pg_config = config
.uri
.parse::<utils::database::tokio_postgres::Config>()
.parse::<scuffle_utils::database::tokio_postgres::Config>()
.context("invalid database uri")?;

pg_config.ssl_mode(if config.tls.is_some() {
utils::database::tokio_postgres::config::SslMode::Require
scuffle_utils::database::tokio_postgres::config::SslMode::Require
} else {
utils::database::tokio_postgres::config::SslMode::Disable
scuffle_utils::database::tokio_postgres::config::SslMode::Disable
});

let manager = if let Some(tls) = &config.tls {
Expand Down Expand Up @@ -164,15 +164,15 @@ pub async fn setup_database(config: &DatabaseConfig) -> anyhow::Result<Arc<utils
.with_client_auth_cert(certs, key)
.context("failed to create redis tls config")?;

utils::database::deadpool_postgres::Manager::from_config(
scuffle_utils::database::deadpool_postgres::Manager::from_config(
pg_config,
tokio_postgres_rustls::MakeRustlsConnect::new(tls),
ManagerConfig {
recycling_method: RecyclingMethod::Fast,
},
)
} else {
utils::database::deadpool_postgres::Manager::from_config(
scuffle_utils::database::deadpool_postgres::Manager::from_config(
pg_config,
NoTls,
ManagerConfig {
Expand Down Expand Up @@ -230,7 +230,7 @@ pub async fn setup_redis(config: &RedisConfig) -> anyhow::Result<Arc<fred::clien

let certs = rustls_pemfile::certs(&mut io::BufReader::new(io::Cursor::new(cert))).collect::<Result<Vec<_>, _>>()?;

let mut cert_store = RootCertStore::empty();
let mut cert_store = tokio_rustls::rustls::RootCertStore::empty();
if let Some(ca_cert) = &tls.ca_cert {
let ca_cert = tokio::fs::read(ca_cert).await.context("failed to read redis ca cert")?;
let ca_certs =
Expand All @@ -240,11 +240,13 @@ pub async fn setup_redis(config: &RedisConfig) -> anyhow::Result<Arc<fred::clien
}
}

Some(fred::types::TlsConfig::from(fred::types::TlsConnector::from(
rustls::ClientConfig::builder()
.with_root_certificates(cert_store)
.with_client_auth_cert(certs, key)
.context("failed to create redis tls config")?,
Some(fred::types::TlsConfig::from(fred::types::TlsConnector::Rustls(
tokio_rustls::TlsConnector::from(Arc::new(
tokio_rustls::rustls::ClientConfig::builder()
.with_root_certificates(cert_store)
.with_client_auth_cert(certs, key)
.context("failed to create redis tls config")?,
)),
)))
} else {
None
Expand Down
4 changes: 2 additions & 2 deletions binary-helper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::Context as _;
use scuffle_utils::context::Context;
use scuffle_utils::signal;
use tokio::signal::unix::SignalKind;
use tokio::{select, time};
use tonic::transport::{Certificate, Identity, Server, ServerTlsConfig};
pub use traits::{Config, Global};
use utils::context::Context;
use utils::signal;

use self::config::GrpcConfig;

Expand Down
2 changes: 1 addition & 1 deletion binary-helper/src/traits.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use utils::context::Context;
use scuffle_utils::context::Context;

pub trait Config {
fn parse() -> anyhow::Result<Self>
Expand Down
37 changes: 11 additions & 26 deletions dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ version: "3.1"
name: "db-scuffle-dev"

services:
cockroach:
image: ghcr.io/scuffletv/ci/cockroach:latest
mongo:
image: mongo:latest
pull_policy: "always"
command: start-single-node --insecure --advertise-addr=0.0.0.0
volumes:
- cockroach:/cockroach/cockroach-data
ports:
- "127.0.0.1:5432:26257"
- "127.0.0.1:8080:8080"
- "27111:27017"
volumes:
- mongo:/data/db

nats:
image: ghcr.io/scuffletv/ci/nats:latest
Expand All @@ -33,8 +31,8 @@ services:
- "127.0.0.1:9000:9000"
- "127.0.0.1:9001:9001"
environment:
- "MINIO_ACCESS_KEY=root"
- "MINIO_SECRET_KEY=scuffle123"
- "MINIO_ACCESS_KEY=minioadmin"
- "MINIO_SECRET_KEY=minioadmin"
volumes:
- minio:/data
command: server /data --console-address ":9001"
Expand All @@ -47,26 +45,13 @@ services:
entrypoint: >
/bin/sh -c "
set -eux;
/usr/bin/mc config host add myminio http://minio:9000 root scuffle123;
/usr/bin/mc rb --force myminio/scuffle-video || true;
/usr/bin/mc rb --force myminio/scuffle-image-processor || true;
/usr/bin/mc rb --force myminio/scuffle-image-processor-public || true;
/usr/bin/mc mb myminio/scuffle-video;
/usr/bin/mc mb myminio/scuffle-image-processor;
/usr/bin/mc mb myminio/scuffle-image-processor-public;
/usr/bin/mc anonymous set download myminio/scuffle-video;
/usr/bin/mc anonymous set download myminio/scuffle-image-processor-public;
/usr/bin/mc config host add myminio http://minio:9000 minioadmin minioadmin;
/usr/bin/mc mb myminio/image-processor;
/usr/bin/mc anonymous set download myminio/image-processor;
exit 0;
"
redis:
image: ghcr.io/scuffletv/ci/redis:latest
pull_policy: "always"
ports:
- "127.0.0.1:6379:6379"

volumes:
cockroach:
nats:
minio:
redis:
mongo:
6 changes: 2 additions & 4 deletions ffmpeg/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
[package]
name = "ffmpeg"
name = "scuffle-ffmpeg"
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"

[dependencies]
ffmpeg-sys-next = "6.1"
ffmpeg-sys-next = "7"
libc = "0.2"
bytes = { optional = true, version = "1" }
tokio = { optional = true, version = "1" }
crossbeam-channel = { optional = true, version = "0.5" }
tracing = { optional = true, version = "0.1" }
utils = { workspace = true, optional = true }

[features]
default = []
task-abort = ["dep:utils"]
channel = ["dep:bytes"]
tokio-channel = ["channel", "dep:tokio"]
crossbeam-channel = ["channel", "dep:crossbeam-channel"]
Expand Down
9 changes: 0 additions & 9 deletions ffmpeg/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,6 @@ impl GenericDecoder {
}

pub fn send_packet(&mut self, packet: &Packet) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _guard = utils::task::AbortGuard::new();

// Safety: `packet` is a valid pointer, and `self.decoder` is a valid pointer.
let ret = unsafe { avcodec_send_packet(self.decoder.as_mut_ptr(), packet.as_ptr()) };

Expand All @@ -165,9 +162,6 @@ impl GenericDecoder {
}

pub fn send_eof(&mut self) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _guard = utils::task::AbortGuard::new();

// Safety: `self.decoder` is a valid pointer.
let ret = unsafe { avcodec_send_packet(self.decoder.as_mut_ptr(), std::ptr::null()) };

Expand All @@ -178,9 +172,6 @@ impl GenericDecoder {
}

pub fn receive_frame(&mut self) -> Result<Option<VideoFrame>, FfmpegError> {
#[cfg(feature = "task-abort")]
let _guard = utils::task::AbortGuard::new();

let mut frame = Frame::new()?;

// Safety: `frame` is a valid pointer, and `self.decoder` is a valid pointer.
Expand Down
15 changes: 0 additions & 15 deletions ffmpeg/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,6 @@ impl Encoder {
outgoing_time_base: AVRational,
settings: impl Into<EncoderSettings>,
) -> Result<Self, FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();

if codec.as_ptr().is_null() {
return Err(FfmpegError::NoEncoder);
}
Expand Down Expand Up @@ -489,9 +486,6 @@ impl Encoder {
}

pub fn send_eof(&mut self) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();

// Safety: `self.encoder` is a valid pointer.
let ret = unsafe { avcodec_send_frame(self.encoder.as_mut_ptr(), std::ptr::null()) };
if ret == 0 {
Expand All @@ -502,9 +496,6 @@ impl Encoder {
}

pub fn send_frame(&mut self, frame: &Frame) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();

// Safety: `self.encoder` and `frame` are valid pointers.
let ret = unsafe { avcodec_send_frame(self.encoder.as_mut_ptr(), frame.as_ptr()) };
if ret == 0 {
Expand All @@ -515,9 +506,6 @@ impl Encoder {
}

pub fn receive_packet(&mut self) -> Result<Option<Packet>, FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();

let mut packet = Packet::new()?;

const AVERROR_EAGAIN: i32 = AVERROR(EAGAIN);
Expand Down Expand Up @@ -631,9 +619,6 @@ impl<T: Send + Sync> MuxerEncoder<T> {
}

pub fn send_eof(&mut self) -> Result<(), FfmpegError> {
#[cfg(feature = "task-abort")]
let _abort_guard = utils::task::AbortGuard::new();

self.encoder.send_eof()?;
self.handle_packets()?;

Expand Down
Loading

0 comments on commit 042cb07

Please sign in to comment.