Skip to content

Commit

Permalink
Event manager (#2)
Browse files Browse the repository at this point in the history
* moved trackmetrics playStart back to TrackPlayer because needed

* changes in track queueing
  • Loading branch information
tobiasguyer authored Aug 1, 2024
1 parent 050cb32 commit b489cc9
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 47 deletions.
7 changes: 4 additions & 3 deletions cspot/include/TrackQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class TrackQueue : public bell::Task {
std::shared_ptr<bell::WrappedSemaphore> playableSemaphore;
std::shared_ptr<PlaybackState> playbackState;
std::atomic<bool> notifyPending = false;
std::function<void()> notifyCallback;

void runTask() override;
void stopTask();
Expand All @@ -123,6 +124,7 @@ class TrackQueue : public bell::Task {
TrackInfo getTrackInfo(std::string_view identifier);
std::shared_ptr<QueuedTrack> consumeTrack(
std::shared_ptr<QueuedTrack> prevSong, int& offset);
std::deque<std::shared_ptr<QueuedTrack>> preloadedTracks;

private:
static const int MAX_TRACKS_PRELOAD = 3;
Expand All @@ -132,9 +134,8 @@ class TrackQueue : public bell::Task {
std::shared_ptr<bell::WrappedSemaphore> processSemaphore;
std::unique_ptr<cspot::PlayerContext> playerContext;

std::deque<std::shared_ptr<QueuedTrack>> preloadedTracks;
std::deque<std::pair<int64_t, TrackReference>> queuedTracks = {};
std::vector<TrackReference> currentTracks;
std::vector<TrackReference> currentTracks = {};
std::vector<TrackReference> ghostTracks;
std::mutex tracksMutex, runningMutex;

Expand All @@ -144,7 +145,7 @@ class TrackQueue : public bell::Task {
uint32_t currentTracksIndex = -1;

bool isRunning = false;
bool context_resolved = false;
bool contextResolved = false;
bool continue_with_radio = true;

std::random_device rd;
Expand Down
2 changes: 2 additions & 0 deletions cspot/protobuf/spirc.options
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ DeviceState.sw_version type:FT_POINTER
DeviceState.name type:FT_POINTER
DeviceState.capabilities max_count:17, fixed_count:false
State.context_uri type:FT_POINTER
State.last_command_ident type:FT_POINTER
State.context_description type:FT_POINTER
TrackRef.queued type:FT_CALLBACK
2 changes: 2 additions & 0 deletions cspot/protobuf/spirc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ message State {
optional string context_description = 0x8;
optional bool shuffle = 0xd;
optional bool repeat = 0xe;
optional string last_command_ident = 0x14;
optional uint32 last_command_msgid = 0x15;
optional uint32 playing_track_index = 0x1a;
repeated TrackRef track = 0x1b;
}
Expand Down
11 changes: 11 additions & 0 deletions cspot/src/PlaybackState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ void PlaybackState::syncWithRemote() {

strcpy(innerFrame.state.context_uri, remoteFrame.state.context_uri);

if (remoteFrame.state.context_description != NULL) {
innerFrame.state.context_description =
(char*)realloc(innerFrame.state.context_description,
strlen(remoteFrame.state.context_description) + 1);
strcpy(innerFrame.state.context_description,
remoteFrame.state.context_description);
} else {
free(innerFrame.state.context_description);
innerFrame.state.context_description = NULL;
}

innerFrame.state.has_playing_track_index = true;
innerFrame.state.playing_track_index = remoteFrame.state.playing_track_index;
innerFrame.state.has_shuffle = remoteFrame.state.has_shuffle;
Expand Down
12 changes: 4 additions & 8 deletions cspot/src/PlayerContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ PlayerContext::PlayerContext(
this->index = index;
rng = std::default_random_engine{rd()};
std::string requestUrl = string_format("hm://context-resolve/v1/%s", uri);
auto responseHandler = [this, shuffle](MercurySession::Response& res) {
auto responseHandler = [this, shuffle, uri](MercurySession::Response& res) {
if (!res.parts.size())
return;
if (!res.parts[0].size())
return;
this->resolveInitial(*this->index, shuffle, res.parts[0]);
if (this->track_list->size() < 30)
resolveContext(*this->index, shuffle, uri);
};

ctx->session->execute(MercurySession::RequestType::GET, requestUrl,
responseHandler);
if (track_list->size() < 30)
resolveContext(*this->index, shuffle, uri);
}

static void randomizeIndex(std::vector<uint32_t>& index, uint16_t offset,
Expand Down Expand Up @@ -111,8 +111,6 @@ bool PlayerContext::resolveTracklist(uint32_t index, bool shuffle,
index = *this->index;
std::scoped_lock lock(*trackMutex);
auto jsonResult = nlohmann::json::parse(data);
if (queued_list->at(0).first == 0 || queued_list->at(0).first == -1)
index--;
for (uint32_t i = 0; i < queued_list->size(); i++)
if (queued_list->at(i).first != -1)
queued_list->at(i).first -= index;
Expand Down Expand Up @@ -199,10 +197,8 @@ bool PlayerContext::resolveInitial(uint32_t index, bool shuffle,
}
randomizeIndex(alternative_index, new_index.size(), rng);
}
track_list->erase(track_list->begin(), track_list->begin() + index);
last_index = shuffle ? index : new_index[index];
last_index = shuffle ? 0 : new_index[0];
last_resolve_shuffled = shuffle;
*this->index = 0;
static_cast<cspot::TrackQueue*>(queue)->reloadTracks();
return true;
}
35 changes: 12 additions & 23 deletions cspot/src/SpircHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ SpircHandler::SpircHandler(std::shared_ptr<cspot::Context> ctx) {
this->playbackState = std::make_shared<PlaybackState>(ctx);
this->trackQueue = std::make_shared<cspot::TrackQueue>(ctx, playbackState);

trackQueue->notifyCallback = [this]() {
this->notify();
};

auto EOFCallback = [this](bool loaded) {
if (trackQueue->isFinished()) {
sendEvent(EventType::DEPLETED);
}
if (!loaded)
trackQueue->skipTrack(TrackQueue::SkipDirection::NEXT, true);
trackQueue->skipTrack(TrackQueue::SkipDirection::NEXT, false);
};

auto trackLoadedCallback = [this](std::shared_ptr<QueuedTrack> track,
Expand All @@ -39,8 +43,6 @@ SpircHandler::SpircHandler(std::shared_ptr<cspot::Context> ctx) {
: PlaybackState::State::Playing);
playbackState->updatePositionMs(track->requestedPosition);

this->notify();

// Send playback start event, pause/unpause per request
sendEvent(EventType::PLAYBACK_START, (int)track->requestedPosition);
sendEvent(EventType::PLAY_PAUSE, paused);
Expand Down Expand Up @@ -95,29 +97,20 @@ void SpircHandler::notifyAudioReachedPlaybackEnd() {
if (!playbackState->innerFrame.state.repeat) {
currentTrack->trackMetrics->endTrack();
ctx->playbackMetrics->sendEvent(currentTrack);
trackQueue->skipTrack(TrackQueue::SkipDirection::NEXT, true);
// we moved to next track, re-acquire currentTrack again
currentTrack = trackQueue->consumeTrack(nullptr, offset);
if (!trackQueue->notifyPending)
trackQueue->skipTrack(TrackQueue::SkipDirection::NEXT, false);
else
trackQueue->notifyPending = false;
}
trackQueue->update_ghost_tracks();
notify();
}

void SpircHandler::notifyAudioReachedPlayback() {
int offset = 0;

// get HEAD track
auto currentTrack = trackQueue->consumeTrack(nullptr, offset);
currentTrack->trackMetrics->startTrackPlaying(
currentTrack->requestedPosition);
if (trackQueue->notifyPending) {
trackQueue->notifyPending = false;

playbackState->updatePositionMs(currentTrack->requestedPosition);

// Reset position in queued track
currentTrack->requestedPosition = 0;
}
this->notify();

sendEvent(EventType::TRACK_INFO, currentTrack->trackInfo);
}

Expand Down Expand Up @@ -150,16 +143,12 @@ void SpircHandler::handleFrame(std::vector<uint8_t>& data) {
ctx->playbackMetrics->end_source = influence;
this->trackPlayer->stop();
sendEvent(EventType::DISC);
trackQueue->preloadedTracks.clear();
}
break;
}
case MessageType_kMessageTypeSeek: {
this->trackPlayer->seekMs(playbackState->remoteFrame.position);

playbackState->updatePositionMs(playbackState->remoteFrame.position);

notify();

sendEvent(EventType::SEEK, (int)playbackState->remoteFrame.position);
break;
}
Expand Down
9 changes: 8 additions & 1 deletion cspot/src/TrackPlayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,12 @@ void TrackPlayer::runTask() {
track->trackMetrics->startTrackDecoding();
track->trackMetrics->track_size = currentTrackStream->getSize();

#ifndef CONFIG_BELL_NOCODEC
if (trackOffset == 0 && pendingSeekPositionMs == 0) {
this->trackLoaded(track, startPaused);
startPaused = false;
}

#ifndef CONFIG_BELL_NOCODEC
int32_t r =
ov_open_callbacks(this, &vorbisFile, NULL, 0, vorbisCallbacks);
#else
Expand Down Expand Up @@ -262,6 +262,10 @@ void TrackPlayer::runTask() {
eof = false;
track->loading = true;
//in case of a repeatedtrack, set requested position to 0
track->trackMetrics->startTrackPlaying(track->requestedPosition);
this->trackQueue->playbackState->updatePositionMs(
track->requestedPosition);
this->trackQueue->notifyCallback();
track->requestedPosition = 0;

CSPOT_LOG(info, "Playing");
Expand All @@ -280,6 +284,9 @@ void TrackPlayer::runTask() {
track->trackMetrics->newPosition(pendingSeekPositionMs);
skipped = true;
#endif
this->trackQueue->playbackState->updatePositionMs(
pendingSeekPositionMs);
this->trackQueue->notifyCallback();

// Reset the pending seek position
pendingSeekPositionMs = 0;
Expand Down
32 changes: 24 additions & 8 deletions cspot/src/TrackQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,8 @@ std::shared_ptr<QueuedTrack> TrackQueue::consumeTrack(
std::shared_ptr<QueuedTrack> prevTrack, int& offset) {
std::scoped_lock lock(tracksMutex);

if (currentTracksIndex == -1 || currentTracksIndex >= currentTracks.size()) {
if ((currentTracksIndex == -1 && !currentTracks.size()) ||
currentTracksIndex >= currentTracks.size()) {
return nullptr;
}

Expand Down Expand Up @@ -495,15 +496,15 @@ void TrackQueue::update_ghost_tracks(int16_t offset) {
index++;
}
if (queuedTracks.size() > 0) {
if (preloadedTracks[0]->ref.gid != queuedTracks[0].second.gid) {
if (preloadedTracks[offset]->ref.gid != queuedTracks[0].second.gid) {
ghostTracks.push_back(currentTracks[index]);
index++;
} else
ghostTracks.push_back(queuedTracks[0].second);
for (auto track : queuedTracks) {
if (track.first != -1)
break;
if (preloadedTracks[0]->ref.gid != track.second.gid) {
if (preloadedTracks[offset]->ref.gid != track.second.gid) {
ghostTracks.push_back(track.second);
}
}
Expand Down Expand Up @@ -538,10 +539,13 @@ void TrackQueue::processTrack(std::shared_ptr<QueuedTrack> track) {

if (track->state == QueuedTrack::State::READY) {
if (preloadedTracks.size() + currentTracksIndex + 1 >=
currentTracks.size())
currentTracks.size() &&
!contextResolved) {
playerContext->resolveContext(
currentTracksIndex, this->playbackState->innerFrame.state.shuffle,
this->playbackState->innerFrame.state.context_uri);
contextResolved = true;
}
if (preloadedTracks.size() < MAX_TRACKS_PRELOAD) {
// Queue a new track to preload
queueNextTrack(preloadedTracks.size());
Expand Down Expand Up @@ -672,13 +676,16 @@ void TrackQueue::reloadTracks(uint8_t offset) {
//preloadedTracks.clear();
queueNextTrack(preloadedTracks.size());
update_ghost_tracks();
//this->ctx->handler->notify();
this->notifyCallback();
contextResolved = false;
}
bool TrackQueue::updateTracks(uint32_t requestedPosition, bool initial) {
std::scoped_lock lock(tracksMutex);
bool cleared = true;

if (initial) {
if (preloadedTracks.size())
notifyPending = true;
// initialize new random_engine
rng = std::default_random_engine{rd()};
// Copy requested track list
Expand All @@ -703,9 +710,18 @@ bool TrackQueue::updateTracks(uint32_t requestedPosition, bool initial) {

playableSemaphore->give();
} else {
queuedTracks.push_back(std::make_pair(
-1, playbackState->remoteTracks[playbackState->innerFrame.state.index +
1 + queuedTracks.size()]));
auto track_it = queuedTracks.begin();
uint32_t offset = 0;
while (track_it->first < 0) {
track_it++;
offset++;
}
queuedTracks.insert(
track_it,
std::make_pair(
-1,
playbackState->remoteTracks[playbackState->innerFrame.state.index +
1 + offset]));
preloadedTracks.erase(preloadedTracks.begin() + 1, preloadedTracks.end());
queueNextTrack(1);
cleared = false;
Expand Down
6 changes: 2 additions & 4 deletions targets/esp32/components/VS1053/src/VS1053.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ esp_err_t VS1053_SINK::init(spi_host_device_t SPI,
//load_user_code(PLUGIN, PLUGIN_SIZE);
#endif
vTaskDelay(100 / portTICK_PERIOD_MS);
xTaskCreate(vs_feed, "track_feed", 4098, (void*)this, 10, &task_handle);
xTaskCreatePinnedToCore(vs_feed, "track_feed", 4098, (void*)this, 10,
&task_handle, 0);
return ESP_OK;
}

Expand Down Expand Up @@ -263,9 +264,6 @@ void VS1053_SINK::run_feed(size_t FILL_BUFFER_BEFORE_PLAYBACK) {
sdi_send_fillers(endFillByte, endFillBytes);
write_register(SCI_DECODE_TIME, 0); // Reset DECODE_TIME
new_state(track->state, VS1053_TRACK::VS_TRACK_STATE::tsPlaybackStart);
if (FILL_BUFFER_BEFORE_PLAYBACK <
xStreamBufferBytesAvailable(track->dataBuffer))
vTaskDelay(10 / portTICK_PERIOD_MS);
while (track->state != VS1053_TRACK::VS_TRACK_STATE::tsStopped) {
if (this->command_callbacks.size()) {
this->command_callbacks[0](track->track_id);
Expand Down

0 comments on commit b489cc9

Please sign in to comment.