Skip to content

Commit

Permalink
Make audio fetch parameters tunable
Browse files Browse the repository at this point in the history
This change collects all those audio fetch parameters that were defined as
static constants into a dedicated struct, AudioFetchParams.
This struct can be read and set statically, allowing a user of the library to
modify those parameters without the need to recompile.
  • Loading branch information
lelloman committed Dec 20, 2023
1 parent a245a3c commit e175a88
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 59 deletions.
114 changes: 76 additions & 38 deletions audio/src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
io::{self, Read, Seek, SeekFrom},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
Arc, OnceLock,
},
time::Duration,
};
Expand Down Expand Up @@ -55,42 +55,75 @@ impl From<AudioFileError> for Error {
}
}

/// The minimum size of a block that is requested from the Spotify servers in one request.
/// This is the block size that is typically requested while doing a `seek()` on a file.
/// The Symphonia decoder requires this to be a power of 2 and > 32 kB.
/// Note: smaller requests can happen if part of the block is downloaded already.
pub const MINIMUM_DOWNLOAD_SIZE: usize = 64 * 1024;

/// The minimum network throughput that we expect. Together with the minimum download size,
/// this will determine the time we will wait for a response.
pub const MINIMUM_THROUGHPUT: usize = 8 * 1024;

/// The ping time that is used for calculations before a ping time was actually measured.
pub const INITIAL_PING_TIME_ESTIMATE: Duration = Duration::from_millis(500);

/// If the measured ping time to the Spotify server is larger than this value, it is capped
/// to avoid run-away block sizes and pre-fetching.
pub const MAXIMUM_ASSUMED_PING_TIME: Duration = Duration::from_millis(1500);
#[derive(Clone)]
pub struct AudioFetchParams {
/// The minimum size of a block that is requested from the Spotify servers in one request.
/// This is the block size that is typically requested while doing a `seek()` on a file.
/// The Symphonia decoder requires this to be a power of 2 and > 32 kB.
/// Note: smaller requests can happen if part of the block is downloaded already.
pub minimum_download_size: usize,

/// The minimum network throughput that we expect. Together with the minimum download size,
/// this will determine the time we will wait for a response.
pub minimum_throughput: usize,

/// The ping time that is used for calculations before a ping time was actually measured.
pub initial_ping_time_estimate: Duration,

/// If the measured ping time to the Spotify server is larger than this value, it is capped
/// to avoid run-away block sizes and pre-fetching.
pub maximum_assumed_ping_time: Duration,

/// Before playback starts, this many seconds of data must be present.
/// Note: the calculations are done using the nominal bitrate of the file. The actual amount
/// of audio data may be larger or smaller.
pub read_ahead_before_playback: Duration,

/// While playing back, this many seconds of data ahead of the current read position are
/// requested.
/// Note: the calculations are done using the nominal bitrate of the file. The actual amount
/// of audio data may be larger or smaller.
pub read_ahead_during_playback: Duration,

/// If the amount of data that is pending (requested but not received) is less than a certain amount,
/// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
/// data is calculated as `<pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>`
pub prefetch_threshold_factor: f32,

/// The time we will wait to obtain status updates on downloading.
pub download_timeout: Duration,
}

/// Before playback starts, this many seconds of data must be present.
/// Note: the calculations are done using the nominal bitrate of the file. The actual amount
/// of audio data may be larger or smaller.
pub const READ_AHEAD_BEFORE_PLAYBACK: Duration = Duration::from_secs(1);
impl Default for AudioFetchParams {
fn default() -> Self {
let minimum_download_size = 64 * 1024;
let minimum_throughput = 8 * 1024;
Self {
minimum_download_size,
minimum_throughput,
initial_ping_time_estimate: Duration::from_millis(500),
maximum_assumed_ping_time: Duration::from_millis(1500),
read_ahead_before_playback: Duration::from_secs(1),
read_ahead_during_playback: Duration::from_secs(5),
prefetch_threshold_factor: 4.0,
download_timeout: Duration::from_secs(
(minimum_download_size / minimum_throughput) as u64,
),
}
}
}

/// While playing back, this many seconds of data ahead of the current read position are
/// requested.
/// Note: the calculations are done using the nominal bitrate of the file. The actual amount
/// of audio data may be larger or smaller.
pub const READ_AHEAD_DURING_PLAYBACK: Duration = Duration::from_secs(5);
static AUDIO_FETCH_PARAMS: OnceLock<AudioFetchParams> = OnceLock::new();

/// If the amount of data that is pending (requested but not received) is less than a certain amount,
/// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
/// data is calculated as `<pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>`
pub const PREFETCH_THRESHOLD_FACTOR: f32 = 4.0;
impl AudioFetchParams {
pub fn set(params: AudioFetchParams) -> Result<(), AudioFetchParams> {
AUDIO_FETCH_PARAMS.set(params)
}

/// The time we will wait to obtain status updates on downloading.
pub const DOWNLOAD_TIMEOUT: Duration =
Duration::from_secs((MINIMUM_DOWNLOAD_SIZE / MINIMUM_THROUGHPUT) as u64);
pub fn get() -> &'static AudioFetchParams {
AUDIO_FETCH_PARAMS.get_or_init(AudioFetchParams::default)
}
}

pub enum AudioFile {
Cached(fs::File),
Expand Down Expand Up @@ -183,6 +216,7 @@ impl StreamLoaderController {

if let Some(ref shared) = self.stream_shared {
let mut download_status = shared.download_status.lock();
let download_timeout = AudioFetchParams::get().download_timeout;

while range.length
> download_status
Expand All @@ -191,7 +225,7 @@ impl StreamLoaderController {
{
if shared
.cond
.wait_for(&mut download_status, DOWNLOAD_TIMEOUT)
.wait_for(&mut download_status, download_timeout)
.timed_out()
{
return Err(AudioFileError::WaitTimeout.into());
Expand Down Expand Up @@ -297,7 +331,7 @@ impl AudioFileShared {
if ping_time_ms > 0 {
Duration::from_millis(ping_time_ms as u64)
} else {
INITIAL_PING_TIME_ESTIMATE
AudioFetchParams::get().initial_ping_time_estimate
}
}

Expand Down Expand Up @@ -395,14 +429,16 @@ impl AudioFileStreaming {
trace!("Streaming from {}", url);
}

let minimum_download_size = AudioFetchParams::get().minimum_download_size;

// When the audio file is really small, this `download_size` may turn out to be
// larger than the audio file we're going to stream later on. This is OK; requesting
// `Content-Range` > `Content-Length` will return the complete file with status code
// 206 Partial Content.
let mut streamer =
session
.spclient()
.stream_from_cdn(&cdn_url, 0, MINIMUM_DOWNLOAD_SIZE)?;
.stream_from_cdn(&cdn_url, 0, minimum_download_size)?;

// Get the first chunk with the headers to get the file size.
// The remainder of that chunk with possibly also a response body is then
Expand Down Expand Up @@ -490,9 +526,10 @@ impl Read for AudioFileStreaming {
return Ok(0);
}

let read_ahead_during_playback = AudioFetchParams::get().read_ahead_during_playback;
let length_to_request = if self.shared.is_download_streaming() {
let length_to_request = length
+ (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * self.shared.bytes_per_second as f32)
+ (read_ahead_during_playback.as_secs_f32() * self.shared.bytes_per_second as f32)
as usize;

// Due to the read-ahead stuff, we potentially request more than the actual request demanded.
Expand All @@ -515,11 +552,12 @@ impl Read for AudioFileStreaming {
.map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
}

let download_timeout = AudioFetchParams::get().download_timeout;
while !download_status.downloaded.contains(offset) {
if self
.shared
.cond
.wait_for(&mut download_status, DOWNLOAD_TIMEOUT)
.wait_for(&mut download_status, download_timeout)
.timed_out()
{
return Err(io::Error::new(
Expand Down
29 changes: 17 additions & 12 deletions audio/src/fetch/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ use librespot_core::{http_client::HttpClient, session::Session, Error};
use crate::range_set::{Range, RangeSet};

use super::{
AudioFileError, AudioFileResult, AudioFileShared, StreamLoaderCommand, StreamingRequest,
MAXIMUM_ASSUMED_PING_TIME, MINIMUM_DOWNLOAD_SIZE, MINIMUM_THROUGHPUT,
PREFETCH_THRESHOLD_FACTOR,
AudioFetchParams, AudioFileError, AudioFileResult, AudioFileShared, StreamLoaderCommand,
StreamingRequest,
};

struct PartialFileData {
Expand Down Expand Up @@ -151,6 +150,8 @@ struct AudioFileFetch {
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
network_response_times: Vec<Duration>,

params: AudioFetchParams,
}

// Might be replaced by enum from std once stable
Expand All @@ -166,8 +167,8 @@ impl AudioFileFetch {
}

fn download_range(&mut self, offset: usize, mut length: usize) -> AudioFileResult {
if length < MINIMUM_DOWNLOAD_SIZE {
length = MINIMUM_DOWNLOAD_SIZE;
if length < self.params.minimum_download_size {
length = self.params.minimum_download_size;
}

// If we are in streaming mode (so not seeking) then start downloading as large
Expand Down Expand Up @@ -258,13 +259,13 @@ impl AudioFileFetch {
fn handle_file_data(&mut self, data: ReceivedData) -> Result<ControlFlow, Error> {
match data {
ReceivedData::Throughput(mut throughput) => {
if throughput < MINIMUM_THROUGHPUT {
if throughput < self.params.minimum_throughput {
warn!(
"Throughput {} kbps lower than minimum {}, setting to minimum",
throughput / 1000,
MINIMUM_THROUGHPUT / 1000,
self.params.minimum_throughput / 1000,
);
throughput = MINIMUM_THROUGHPUT;
throughput = self.params.minimum_throughput;
}

let old_throughput = self.shared.throughput();
Expand All @@ -287,13 +288,13 @@ impl AudioFileFetch {
self.shared.set_throughput(avg_throughput);
}
ReceivedData::ResponseTime(mut response_time) => {
if response_time > MAXIMUM_ASSUMED_PING_TIME {
if response_time > self.params.maximum_assumed_ping_time {
warn!(
"Time to first byte {} ms exceeds maximum {}, setting to maximum",
response_time.as_millis(),
MAXIMUM_ASSUMED_PING_TIME.as_millis()
self.params.maximum_assumed_ping_time.as_millis()
);
response_time = MAXIMUM_ASSUMED_PING_TIME;
response_time = self.params.maximum_assumed_ping_time;
}

let old_ping_time_ms = self.shared.ping_time().as_millis();
Expand Down Expand Up @@ -423,6 +424,8 @@ pub(super) async fn audio_file_fetch(
initial_request,
));

let params = AudioFetchParams::get();

let mut fetch = AudioFileFetch {
session: session.clone(),
shared,
Expand All @@ -431,6 +434,8 @@ pub(super) async fn audio_file_fetch(
file_data_tx,
complete_tx: Some(complete_tx),
network_response_times: Vec::with_capacity(3),

params: params.clone(),
};

loop {
Expand Down Expand Up @@ -472,7 +477,7 @@ pub(super) async fn audio_file_fetch(
let throughput = fetch.shared.throughput();

let desired_pending_bytes = max(
(PREFETCH_THRESHOLD_FACTOR
(params.prefetch_threshold_factor
* ping_time_seconds
* fetch.shared.bytes_per_second as f32) as usize,
(ping_time_seconds * throughput as f32) as usize,
Expand Down
3 changes: 1 addition & 2 deletions audio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@ mod fetch;
mod range_set;

pub use decrypt::AudioDecrypt;
pub use fetch::{AudioFile, AudioFileError, StreamLoaderController};
pub use fetch::{MINIMUM_DOWNLOAD_SIZE, READ_AHEAD_BEFORE_PLAYBACK, READ_AHEAD_DURING_PLAYBACK};
pub use fetch::{AudioFetchParams, AudioFile, AudioFileError, StreamLoaderController};
2 changes: 1 addition & 1 deletion playback/src/decoder/symphonia_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl SymphoniaDecoder {
R: MediaSource + 'static,
{
let mss_opts = MediaSourceStreamOptions {
buffer_len: librespot_audio::MINIMUM_DOWNLOAD_SIZE,
buffer_len: librespot_audio::AudioFetchParams::get().minimum_download_size,
};
let mss = MediaSourceStream::new(Box::new(input), mss_opts);

Expand Down
10 changes: 4 additions & 6 deletions playback/src/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ use symphonia::core::io::MediaSource;
use tokio::sync::{mpsc, oneshot};

use crate::{
audio::{
AudioDecrypt, AudioFile, StreamLoaderController, READ_AHEAD_BEFORE_PLAYBACK,
READ_AHEAD_DURING_PLAYBACK,
},
audio::{AudioDecrypt, AudioFetchParams, AudioFile, StreamLoaderController},
audio_backend::Sink,
config::{Bitrate, NormalisationMethod, NormalisationType, PlayerConfig},
convert::Converter,
Expand Down Expand Up @@ -2223,13 +2220,14 @@ impl PlayerInternal {
..
} = self.state
{
let read_ahead_during_playback = AudioFetchParams::get().read_ahead_during_playback;
// Request our read ahead range
let request_data_length =
(READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize;
(read_ahead_during_playback.as_secs_f32() * bytes_per_second as f32) as usize;

// Request the part we want to wait for blocking. This effectively means we wait for the previous request to partially complete.
let wait_for_data_length =
(READ_AHEAD_BEFORE_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize;
(read_ahead_during_playback.as_secs_f32() * bytes_per_second as f32) as usize;

stream_loader_controller
.fetch_next_and_wait(request_data_length, wait_for_data_length)
Expand Down

0 comments on commit e175a88

Please sign in to comment.