Skip to content

Commit

Permalink
Switching to paho.mqtt.cpp library for recieving binary audio chunks …
Browse files Browse the repository at this point in the history
…over MQTT (#27)

Co-authored-by: Vincent Raymond <[email protected]>
  • Loading branch information
vincentraymond-ua and Vincent Raymond authored Apr 29, 2022
1 parent be28d05 commit 606ca0d
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 32 deletions.
5 changes: 4 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ubuntu:focal
FROM ubuntu:impish

#ubuntu setup
ENV DEBIAN_FRONTEND "noninteractive"
Expand All @@ -16,6 +16,9 @@ RUN apt-get update -y && apt-get upgrade -y && \

# Mosquitto
mosquitto mosquitto-clients libmosquitto-dev \
libssl-dev \
libpaho-mqttpp-dev \
libpaho-mqtt-dev \

# PostgreSQL
libpq-dev postgresql-server-dev-all
Expand Down
1 change: 1 addition & 0 deletions src/Mosquitto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ void Mosquitto::mosquitto_callback_on_message(
mosquitto->on_message(topic, ss.str());
}


//----------------------------------------------------------------------
// Virtual functions
//----------------------------------------------------------------------
Expand Down
55 changes: 29 additions & 26 deletions src/OpensmileSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@
#include <string.h>
#include <thread>
#include <vector>

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include <boost/log/trivial.hpp>

#include <smileapi/SMILEapi.h>
#include <mqtt/client.h>

#include "GlobalMosquittoListener.h"
#include "JsonBuilder.h"
#include "base64.h"
#include "util.h"
#include <smileapi/SMILEapi.h>


#include "OpensmileSession.h"


OpensmileSession::OpensmileSession(string participant_id,
string mqtt_host_internal,
int mqtt_port_internal) {
Expand Down Expand Up @@ -67,21 +70,28 @@ void OpensmileSession::Initialize() {
smile_set_log_callback(this->handle, &log_callback, this->builder);
this->opensmile_thread = std::thread(smile_run, this->handle);

// Connect to broker
// Initialize paho mqtt Mosquitto client
BOOST_LOG_TRIVIAL(info)
<< "Initializing Mosquitto connection for: " << this->participant_id;
this->connect(
this->mqtt_host_internal, this->mqtt_port_internal, 1000, 1000, 1000);
this->subscribe(this->participant_id);
this->set_max_seconds_without_messages(10000);
this->listener_thread = thread([this] { this->loop(); });
string client_id = this->participant_id + "_SPEECH_ANALYZER";
string server_address = "tcp://" + this->mqtt_host_internal + ":" + to_string(this->mqtt_port_internal);
this->mqtt_client = new mqtt::client(server_address, client_id);

mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);

this->mqtt_client->connect(connOpts);
this->mqtt_client->subscribe(this->participant_id, 2);

}

void OpensmileSession::Shutdown() {
this->running = false;

// Close listening session
this->close();
this->mqtt_client->stop_consuming();
this->mqtt_client->disconnect();

// Close Opensmile Session
smile_extaudiosource_set_external_eoi(this->handle, "externalAudioSource");
Expand All @@ -93,10 +103,17 @@ void OpensmileSession::Shutdown() {
}

void OpensmileSession::Loop() {
chrono::milliseconds duration(1);
BOOST_LOG_TRIVIAL(info)
<< "Begin processing audio chunks for: " << this->participant_id;
while (true) {
this_thread::yield();
this_thread::sleep_for(duration);
// Recieve message from broker
auto msg = this->mqtt_client->consume_message();
auto payload = msg->get_payload();

// Payload is always string, so needs to be copied to float vector
vector<float> float_chunk(payload.size());
memcpy(&float_chunk[0], payload.data(), payload.size()/sizeof(float));
this->PublishChunk(float_chunk);
}
}

Expand All @@ -113,17 +130,3 @@ void OpensmileSession::PublishChunk(vector<float> float_chunk) {
}
}

void OpensmileSession::on_message(const std::string& topic,
const std::string& message) {
nlohmann::json m = nlohmann::json::parse(message);

// Decode base64 chunk
string coded_src = m["chunk"];
int encoded_data_length = Base64decode_len(coded_src.c_str());
vector<char> decoded(encoded_data_length);
Base64decode(&decoded[0], coded_src.c_str());
vector<float> float_chunk(decoded.size() / sizeof(float));
memcpy(&float_chunk[0], &decoded[0], decoded.size());

this->PublishChunk(float_chunk);
}
10 changes: 5 additions & 5 deletions src/OpensmileSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
#include <queue>
#include <thread>
#include <vector>

#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include "JsonBuilder.h"
#include <mqtt/client.h>
#include <smileapi/SMILEapi.h>

class OpensmileSession : Mosquitto {
#include "JsonBuilder.h"

class OpensmileSession{
public:
OpensmileSession(std::string participant_id, std::string mqtt_host_internal,
int mqtt_port_internal);
Expand All @@ -24,14 +25,13 @@ class OpensmileSession : Mosquitto {
void Shutdown();
void Loop();
void PublishChunk(std::vector<float> float_chunk);
void on_message(const std::string& topic,
const std::string& message) override;

bool running = false;
int pid;

std::string mqtt_host_internal;
int mqtt_port_internal;
mqtt::client *mqtt_client;
std::thread listener_thread;

std::string participant_id;
Expand Down

0 comments on commit 606ca0d

Please sign in to comment.