diff --git a/cspot/include/MercurySession.h b/cspot/include/MercurySession.h index f01ee02..4359e0a 100644 --- a/cspot/include/MercurySession.h +++ b/cspot/include/MercurySession.h @@ -27,7 +27,8 @@ class MercurySession : public bell::Task, public cspot::Session { struct Response { Header mercuryHeader; DataParts parts; - bool fail; + int64_t sequenceId; + bool fail = true; }; typedef std::function ResponseCallback; typedef std::function&)> @@ -118,7 +119,7 @@ class MercurySession : public bell::Task, public cspot::Session { void reconnect(); std::unordered_map callbacks; - std::deque> partials; + std::deque partials; std::unordered_map subscriptions; std::unordered_map audioKeyCallbacks; @@ -137,6 +138,6 @@ class MercurySession : public bell::Task, public cspot::Session { void failAllPending(); - std::pair decodeResponse(const std::vector& data); + MercurySession::Response decodeResponse(const std::vector& data); }; } // namespace cspot diff --git a/cspot/include/TrackReference.h b/cspot/include/TrackReference.h index a5688d4..70b04e0 100644 --- a/cspot/include/TrackReference.h +++ b/cspot/include/TrackReference.h @@ -9,7 +9,7 @@ #include "pb_decode.h" #include "protobuf/connect.pb.h" -#define TRACK_SEND_LIMIT 10 +#define TRACK_SEND_LIMIT 25 namespace cspot { struct TrackReference { diff --git a/cspot/src/DeviceStateHandler.cpp b/cspot/src/DeviceStateHandler.cpp index f01fb64..df00e01 100644 --- a/cspot/src/DeviceStateHandler.cpp +++ b/cspot/src/DeviceStateHandler.cpp @@ -142,7 +142,7 @@ DeviceStateHandler::DeviceStateHandler(std::shared_ptr ctx) { this->ctx->session->addSubscriptionListener("hm://connect-state/", connectStateSubscription); - CSPOT_LOG(info, "Added connect-state subscrription"); + CSPOT_LOG(info, "Added connect-state subscription"); // the device connection status gets reported trough "hm://social-connect",if active auto socialConnectSubscription = [this](MercurySession::Response& res) { @@ -880,6 +880,9 @@ void DeviceStateHandler::parseCommand(std::vector& data) { sendCommand(CommandType::SKIP_PREV); } else if (command->at("endpoint") == "seek_to") { +#ifndef CONFIG_BELL_NOCODEC + needsToBeSkipped = false; +#endif if (command->at("relative") == "beginning") { //relative this->device.player_state.has_position_as_of_timestamp = true; this->device.player_state.position_as_of_timestamp = @@ -924,6 +927,13 @@ void DeviceStateHandler::parseCommand(std::vector& data) { std::make_shared( currentTracks[offset + queuedOffset], this->ctx, 0)); } +#ifndef CONFIG_BELL_NOCODEC + this->trackPlayer->seekMs( + trackQueue->preloadedTracks[0]->trackMetrics->getPosition()); + sendCommand( + CommandType::SEEK, + (int32_t)this->device.player_state.position_as_of_timestamp); +#endif this->putPlayerState(); } else if (command->at("endpoint") == "set_queue") { std::scoped_lock lock(trackQueue->tracksMutex); @@ -986,6 +996,13 @@ void DeviceStateHandler::parseCommand(std::vector& data) { 1], this->ctx, 0)); } +#ifndef CONFIG_BELL_NOCODEC + this->trackPlayer->seekMs( + trackQueue->preloadedTracks[0]->trackMetrics->getPosition()); + sendCommand( + CommandType::SEEK, + (int32_t)this->device.player_state.position_as_of_timestamp); +#endif this->putPlayerState(); } else if (command->at("endpoint") == "update_context") { unreference(this->device.player_state.session_id); @@ -1086,6 +1103,13 @@ void DeviceStateHandler::parseCommand(std::vector& data) { ? 2 : this->device.player_state.options .shuffling_context)); +#ifndef CONFIG_BELL_NOCODEC + this->trackPlayer->seekMs( + trackQueue->preloadedTracks[0]->trackMetrics->getPosition()); + sendCommand( + CommandType::SEEK, + (int32_t)this->device.player_state.position_as_of_timestamp); +#endif } else if (command->at("endpoint") == "set_options") { if (this->device.player_state.options.repeating_context != diff --git a/cspot/src/MercurySession.cpp b/cspot/src/MercurySession.cpp index 9d33a51..f58be03 100644 --- a/cspot/src/MercurySession.cpp +++ b/cspot/src/MercurySession.cpp @@ -22,7 +22,7 @@ using namespace cspot; MercurySession::MercurySession(std::shared_ptr timeProvider) - : bell::Task("mercury_dispatcher", 4 * 1024, 3, 1) { + : bell::Task("mercury_dispatcher", 32 * 1024, 3, 1) { this->timeProvider = timeProvider; } @@ -176,45 +176,29 @@ void MercurySession::handlePacket() { case RequestType::SUB: case RequestType::UNSUB: { CSPOT_LOG(debug, "Received mercury packet"); - auto response = this->decodeResponse(packet.data); - if (response.first == static_cast(ResponseFlag::FINAL)) { - - auto partial = partials.begin(); - while (partial != partials.end() && partial->first != response.second) - partial++; // if(partial.first == sequenceId) - if (partial != partials.end()) { - //if the event-id is negative, they got sent without any request - if (response.first >= 0) { - if (this->callbacks.count(response.second)) { - this->callbacks[response.second](partial->second); - this->callbacks.erase(this->callbacks.find(response.second)); - } + if (!response.fail) { + if (response.sequenceId >= 0) { + if (this->callbacks.count(response.sequenceId)) { + this->callbacks[response.sequenceId](response); + this->callbacks.erase(this->callbacks.find(response.sequenceId)); } - pb_release(Header_fields, &partial->second.mercuryHeader); - this->partials.erase(partial); } + pb_release(Header_fields, &response.mercuryHeader); } break; } case RequestType::SUBRES: { auto response = decodeResponse(packet.data); - - if (response.first == static_cast(ResponseFlag::FINAL)) { - auto partial = partials.begin(); - while (partial != partials.end() && partial->first != response.second) - partial++; // if(partial.first == sequenceId) - if (partial != partials.end()) { - auto uri = std::string(partial->second.mercuryHeader.uri); - for (auto& it : this->subscriptions) - if (uri.find(it.first) != std::string::npos) { - it.second(partial->second); - goto found_subscription; - } - found_subscription:; - pb_release(Header_fields, &partial->second.mercuryHeader); - this->partials.erase(partial); + if (!response.fail) { + std::string uri(response.mercuryHeader.uri); + for (auto& it : this->subscriptions) { + if (uri.find(it.first) != std::string::npos) { + it.second(response); + break; // Exit loop once subscription is found + } } + pb_release(Header_fields, &response.mercuryHeader); } break; } @@ -242,11 +226,12 @@ void MercurySession::failAllPending() { this->callbacks = {}; } -std::pair MercurySession::decodeResponse( +MercurySession::Response MercurySession::decodeResponse( const std::vector& data) { auto sequenceLength = ntohs(extract(data, 0)); int64_t sequenceId; uint8_t flag; + Response resp; if (sequenceLength == 2) sequenceId = ntohs(extract(data, 2)); else if (sequenceLength == 4) @@ -254,7 +239,7 @@ std::pair MercurySession::decodeResponse( else if (sequenceLength == 8) sequenceId = hton64(extract(data, 2)); else - return std::make_pair(0, 0); + return resp; size_t pos = 2 + sequenceLength; flag = (uint8_t)data[pos]; @@ -262,43 +247,47 @@ std::pair MercurySession::decodeResponse( uint16_t parts = ntohs(extract(data, pos)); pos += 2; auto partial = partials.begin(); - while (partial != partials.end() && partial->first != sequenceId) + while (partial != partials.end() && partial->sequenceId != sequenceId) partial++; // if(partial.first == sequenceId) if (partial == partials.end()) { CSPOT_LOG(debug, "Creating new Mercury Response, seq: %lli, flags: %i, parts: %i", sequenceId, flag, parts); - this->partials.push_back(std::make_pair(sequenceId, Response())); + this->partials.push_back(Response()); partial = partials.end() - 1; - partial->second.parts = {}; - partial->second.fail = false; + partial->parts = {}; + partial->sequenceId = sequenceId; } else CSPOT_LOG(debug, "Adding to Mercury Response, seq: %lli, flags: %i, parts: %i", sequenceId, flag, parts); uint8_t index = 0; while (parts) { - if (data.size() <= pos || partial->second.fail) + if (data.size() <= pos) break; auto partSize = ntohs(extract(data, pos)); pos += 2; - if (partial->second.mercuryHeader.uri == NULL) { - partial->second.fail = false; + if (partial->mercuryHeader.uri == NULL) { auto headerBytes = std::vector(data.begin() + pos, data.begin() + pos + partSize); - pbDecode(partial->second.mercuryHeader, Header_fields, headerBytes); + pbDecode(partial->mercuryHeader, Header_fields, headerBytes); } else { - if (index >= partial->second.parts.size()) - partial->second.parts.push_back(std::vector{}); - partial->second.parts[index].insert(partial->second.parts[index].end(), - data.begin() + pos, - data.begin() + pos + partSize); + if (index >= partial->parts.size()) + partial->parts.push_back(std::vector{}); + partial->parts[index].insert(partial->parts[index].end(), + data.begin() + pos, + data.begin() + pos + partSize); index++; } pos += partSize; parts--; } - return std::make_pair(flag, sequenceId); + if (flag == static_cast(ResponseFlag::FINAL)) { + resp = *partial; + partials.erase(partial); + resp.fail = false; + } + return resp; } void MercurySession::addSubscriptionListener(const std::string& uri, @@ -408,4 +397,4 @@ uint32_t MercurySession::requestAudioKey(const std::vector& trackId, // @TODO: Handle disconnect } return audioKeySequence - 1; -} \ No newline at end of file +} diff --git a/targets/cli/CliPlayer.cpp b/targets/cli/CliPlayer.cpp index a3a56e3..38bd39e 100644 --- a/targets/cli/CliPlayer.cpp +++ b/targets/cli/CliPlayer.cpp @@ -140,10 +140,10 @@ void CliPlayer::runTask() { } else { if (lastHash != chunk->trackHash) { if (lastHash) { - this->handler->trackPlayer->eofCallback(true); - tracks.pop_front(); tracks.at(0)->trackMetrics->endTrack(); this->handler->ctx->playbackMetrics->sendEvent(tracks[0]); + tracks.pop_front(); + this->handler->trackPlayer->eofCallback(true); } lastHash = chunk->trackHash; tracks.at(0)->trackMetrics->startTrackPlaying( diff --git a/targets/esp32/main/EspPlayer.cpp b/targets/esp32/main/EspPlayer.cpp index 68ce80e..7f957e8 100644 --- a/targets/esp32/main/EspPlayer.cpp +++ b/targets/esp32/main/EspPlayer.cpp @@ -147,12 +147,12 @@ void EspPlayer::runTask() { } else { if (lastHash != current_hash) { if (lastHash) { - this->handler->trackPlayer->eofCallback(true); - tracks.pop_front(); tracks.at(0)->trackMetrics->endTrack(); this->handler->ctx->playbackMetrics->sendEvent(tracks[0]); + tracks.pop_front(); + this->handler->trackPlayer->eofCallback(true); } - lastHash = chunk->trackHash; + lastHash = current_hash; tracks.at(0)->trackMetrics->startTrackPlaying( tracks.at(0)->requestedPosition); this->handler->putPlayerState(); diff --git a/targets/esp32/main/main.cpp b/targets/esp32/main/main.cpp index e1ab298..8f37e3a 100644 --- a/targets/esp32/main/main.cpp +++ b/targets/esp32/main/main.cpp @@ -140,7 +140,7 @@ class CSpotTask : public bell::Task { #endif public: - CSpotTask() : bell::Task("cspot", 8 * 1024, 0, 0) { + CSpotTask() : bell::Task("cspot", 32 * 1024, 0, 0) { startTask(); } void runTask() {