Skip to content

Commit

Permalink
Event manager (#4)
Browse files Browse the repository at this point in the history
* moved trackmetrics playStart back to TrackPlayer because needed

* changes in track queueing

* Changed structural from spirc to connectState

* minor changes

* minor clang-format changes

* Seeking now supported on all devices

* adjusting stack-sizes to new structure, minor adjustments for bell_nocodec

* Update MercurySession.cpp
  • Loading branch information
tobiasguyer authored Sep 29, 2024
1 parent 4f15632 commit 4d06ccb
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 59 deletions.
7 changes: 4 additions & 3 deletions cspot/include/MercurySession.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class MercurySession : public bell::Task, public cspot::Session {
struct Response {
Header mercuryHeader;
DataParts parts;
bool fail;
int64_t sequenceId;
bool fail = true;
};
typedef std::function<void(Response&)> ResponseCallback;
typedef std::function<void(bool, const std::vector<uint8_t>&)>
Expand Down Expand Up @@ -118,7 +119,7 @@ class MercurySession : public bell::Task, public cspot::Session {
void reconnect();

std::unordered_map<int64_t, ResponseCallback> callbacks;
std::deque<std::pair<int64_t, Response>> partials;
std::deque<Response> partials;
std::unordered_map<std::string, ResponseCallback> subscriptions;
std::unordered_map<uint32_t, AudioKeyCallback> audioKeyCallbacks;

Expand All @@ -137,6 +138,6 @@ class MercurySession : public bell::Task, public cspot::Session {

void failAllPending();

std::pair<int, int64_t> decodeResponse(const std::vector<uint8_t>& data);
MercurySession::Response decodeResponse(const std::vector<uint8_t>& data);
};
} // namespace cspot
2 changes: 1 addition & 1 deletion cspot/include/TrackReference.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "pb_decode.h"
#include "protobuf/connect.pb.h"

#define TRACK_SEND_LIMIT 10
#define TRACK_SEND_LIMIT 25

namespace cspot {
struct TrackReference {
Expand Down
26 changes: 25 additions & 1 deletion cspot/src/DeviceStateHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ DeviceStateHandler::DeviceStateHandler(std::shared_ptr<cspot::Context> ctx) {

this->ctx->session->addSubscriptionListener("hm://connect-state/",
connectStateSubscription);
CSPOT_LOG(info, "Added connect-state subscrription");
CSPOT_LOG(info, "Added connect-state subscription");

// the device connection status gets reported trough "hm://social-connect",if active
auto socialConnectSubscription = [this](MercurySession::Response& res) {
Expand Down Expand Up @@ -880,6 +880,9 @@ void DeviceStateHandler::parseCommand(std::vector<uint8_t>& data) {
sendCommand(CommandType::SKIP_PREV);

} else if (command->at("endpoint") == "seek_to") {
#ifndef CONFIG_BELL_NOCODEC
needsToBeSkipped = false;
#endif
if (command->at("relative") == "beginning") { //relative
this->device.player_state.has_position_as_of_timestamp = true;
this->device.player_state.position_as_of_timestamp =
Expand Down Expand Up @@ -924,6 +927,13 @@ void DeviceStateHandler::parseCommand(std::vector<uint8_t>& data) {
std::make_shared<cspot::QueuedTrack>(
currentTracks[offset + queuedOffset], this->ctx, 0));
}
#ifndef CONFIG_BELL_NOCODEC
this->trackPlayer->seekMs(
trackQueue->preloadedTracks[0]->trackMetrics->getPosition());
sendCommand(
CommandType::SEEK,
(int32_t)this->device.player_state.position_as_of_timestamp);
#endif
this->putPlayerState();
} else if (command->at("endpoint") == "set_queue") {
std::scoped_lock lock(trackQueue->tracksMutex);
Expand Down Expand Up @@ -986,6 +996,13 @@ void DeviceStateHandler::parseCommand(std::vector<uint8_t>& data) {
1],
this->ctx, 0));
}
#ifndef CONFIG_BELL_NOCODEC
this->trackPlayer->seekMs(
trackQueue->preloadedTracks[0]->trackMetrics->getPosition());
sendCommand(
CommandType::SEEK,
(int32_t)this->device.player_state.position_as_of_timestamp);
#endif
this->putPlayerState();
} else if (command->at("endpoint") == "update_context") {
unreference(this->device.player_state.session_id);
Expand Down Expand Up @@ -1086,6 +1103,13 @@ void DeviceStateHandler::parseCommand(std::vector<uint8_t>& data) {
? 2
: this->device.player_state.options
.shuffling_context));
#ifndef CONFIG_BELL_NOCODEC
this->trackPlayer->seekMs(
trackQueue->preloadedTracks[0]->trackMetrics->getPosition());
sendCommand(
CommandType::SEEK,
(int32_t)this->device.player_state.position_as_of_timestamp);
#endif
} else if (command->at("endpoint") == "set_options") {

if (this->device.player_state.options.repeating_context !=
Expand Down
85 changes: 37 additions & 48 deletions cspot/src/MercurySession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
using namespace cspot;

MercurySession::MercurySession(std::shared_ptr<TimeProvider> timeProvider)
: bell::Task("mercury_dispatcher", 4 * 1024, 3, 1) {
: bell::Task("mercury_dispatcher", 32 * 1024, 3, 1) {
this->timeProvider = timeProvider;
}

Expand Down Expand Up @@ -176,45 +176,29 @@ void MercurySession::handlePacket() {
case RequestType::SUB:
case RequestType::UNSUB: {
CSPOT_LOG(debug, "Received mercury packet");

auto response = this->decodeResponse(packet.data);
if (response.first == static_cast<uint8_t>(ResponseFlag::FINAL)) {

auto partial = partials.begin();
while (partial != partials.end() && partial->first != response.second)
partial++; // if(partial.first == sequenceId)
if (partial != partials.end()) {
//if the event-id is negative, they got sent without any request
if (response.first >= 0) {
if (this->callbacks.count(response.second)) {
this->callbacks[response.second](partial->second);
this->callbacks.erase(this->callbacks.find(response.second));
}
if (!response.fail) {
if (response.sequenceId >= 0) {
if (this->callbacks.count(response.sequenceId)) {
this->callbacks[response.sequenceId](response);
this->callbacks.erase(this->callbacks.find(response.sequenceId));
}
pb_release(Header_fields, &partial->second.mercuryHeader);
this->partials.erase(partial);
}
pb_release(Header_fields, &response.mercuryHeader);
}
break;
}
case RequestType::SUBRES: {
auto response = decodeResponse(packet.data);

if (response.first == static_cast<uint8_t>(ResponseFlag::FINAL)) {
auto partial = partials.begin();
while (partial != partials.end() && partial->first != response.second)
partial++; // if(partial.first == sequenceId)
if (partial != partials.end()) {
auto uri = std::string(partial->second.mercuryHeader.uri);
for (auto& it : this->subscriptions)
if (uri.find(it.first) != std::string::npos) {
it.second(partial->second);
goto found_subscription;
}
found_subscription:;
pb_release(Header_fields, &partial->second.mercuryHeader);
this->partials.erase(partial);
if (!response.fail) {
std::string uri(response.mercuryHeader.uri);
for (auto& it : this->subscriptions) {
if (uri.find(it.first) != std::string::npos) {
it.second(response);
break; // Exit loop once subscription is found
}
}
pb_release(Header_fields, &response.mercuryHeader);
}
break;
}
Expand Down Expand Up @@ -242,63 +226,68 @@ void MercurySession::failAllPending() {
this->callbacks = {};
}

std::pair<int, int64_t> MercurySession::decodeResponse(
MercurySession::Response MercurySession::decodeResponse(
const std::vector<uint8_t>& data) {
auto sequenceLength = ntohs(extract<uint16_t>(data, 0));
int64_t sequenceId;
uint8_t flag;
Response resp;
if (sequenceLength == 2)
sequenceId = ntohs(extract<int16_t>(data, 2));
else if (sequenceLength == 4)
sequenceId = ntohl(extract<int32_t>(data, 2));
else if (sequenceLength == 8)
sequenceId = hton64(extract<int64_t>(data, 2));
else
return std::make_pair(0, 0);
return resp;

size_t pos = 2 + sequenceLength;
flag = (uint8_t)data[pos];
pos++;
uint16_t parts = ntohs(extract<uint16_t>(data, pos));
pos += 2;
auto partial = partials.begin();
while (partial != partials.end() && partial->first != sequenceId)
while (partial != partials.end() && partial->sequenceId != sequenceId)
partial++; // if(partial.first == sequenceId)
if (partial == partials.end()) {
CSPOT_LOG(debug,
"Creating new Mercury Response, seq: %lli, flags: %i, parts: %i",
sequenceId, flag, parts);
this->partials.push_back(std::make_pair(sequenceId, Response()));
this->partials.push_back(Response());
partial = partials.end() - 1;
partial->second.parts = {};
partial->second.fail = false;
partial->parts = {};
partial->sequenceId = sequenceId;
} else
CSPOT_LOG(debug,
"Adding to Mercury Response, seq: %lli, flags: %i, parts: %i",
sequenceId, flag, parts);
uint8_t index = 0;
while (parts) {
if (data.size() <= pos || partial->second.fail)
if (data.size() <= pos)
break;
auto partSize = ntohs(extract<uint16_t>(data, pos));
pos += 2;
if (partial->second.mercuryHeader.uri == NULL) {
partial->second.fail = false;
if (partial->mercuryHeader.uri == NULL) {
auto headerBytes = std::vector<uint8_t>(data.begin() + pos,
data.begin() + pos + partSize);
pbDecode(partial->second.mercuryHeader, Header_fields, headerBytes);
pbDecode(partial->mercuryHeader, Header_fields, headerBytes);
} else {
if (index >= partial->second.parts.size())
partial->second.parts.push_back(std::vector<uint8_t>{});
partial->second.parts[index].insert(partial->second.parts[index].end(),
data.begin() + pos,
data.begin() + pos + partSize);
if (index >= partial->parts.size())
partial->parts.push_back(std::vector<uint8_t>{});
partial->parts[index].insert(partial->parts[index].end(),
data.begin() + pos,
data.begin() + pos + partSize);
index++;
}
pos += partSize;
parts--;
}
return std::make_pair(flag, sequenceId);
if (flag == static_cast<uint8_t>(ResponseFlag::FINAL)) {
resp = *partial;
partials.erase(partial);
resp.fail = false;
}
return resp;
}

void MercurySession::addSubscriptionListener(const std::string& uri,
Expand Down Expand Up @@ -408,4 +397,4 @@ uint32_t MercurySession::requestAudioKey(const std::vector<uint8_t>& trackId,
// @TODO: Handle disconnect
}
return audioKeySequence - 1;
}
}
4 changes: 2 additions & 2 deletions targets/cli/CliPlayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,10 @@ void CliPlayer::runTask() {
} else {
if (lastHash != chunk->trackHash) {
if (lastHash) {
this->handler->trackPlayer->eofCallback(true);
tracks.pop_front();
tracks.at(0)->trackMetrics->endTrack();
this->handler->ctx->playbackMetrics->sendEvent(tracks[0]);
tracks.pop_front();
this->handler->trackPlayer->eofCallback(true);
}
lastHash = chunk->trackHash;
tracks.at(0)->trackMetrics->startTrackPlaying(
Expand Down
6 changes: 3 additions & 3 deletions targets/esp32/main/EspPlayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ void EspPlayer::runTask() {
} else {
if (lastHash != current_hash) {
if (lastHash) {
this->handler->trackPlayer->eofCallback(true);
tracks.pop_front();
tracks.at(0)->trackMetrics->endTrack();
this->handler->ctx->playbackMetrics->sendEvent(tracks[0]);
tracks.pop_front();
this->handler->trackPlayer->eofCallback(true);
}
lastHash = chunk->trackHash;
lastHash = current_hash;
tracks.at(0)->trackMetrics->startTrackPlaying(
tracks.at(0)->requestedPosition);
this->handler->putPlayerState();
Expand Down
2 changes: 1 addition & 1 deletion targets/esp32/main/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class CSpotTask : public bell::Task {
#endif

public:
CSpotTask() : bell::Task("cspot", 8 * 1024, 0, 0) {
CSpotTask() : bell::Task("cspot", 32 * 1024, 0, 0) {
startTask();
}
void runTask() {
Expand Down

0 comments on commit 4d06ccb

Please sign in to comment.