Skip to content

Commit

Permalink
[lcm] Eschew LCM C++ API (use C instead) (#20116)
Browse files Browse the repository at this point in the history
C++ is just pointlessly slower, more complicated, and less ABI-stable.
  • Loading branch information
jwnimmer-tri authored Sep 5, 2023
1 parent a5dfe42 commit d34b2dc
Showing 1 changed file with 47 additions and 37 deletions.
84 changes: 47 additions & 37 deletions lcm/drake_lcm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
#include <vector>

#include <glib.h>
#include <lcm/lcm-cpp.hpp>
#include <lcm/lcm-cpp.hpp> // N.B. C++ is used only by get_lcm_instance().
#include <lcm/lcm.h>

#include "drake/common/drake_assert.h"
#include "drake/common/drake_copyable.h"
Expand All @@ -34,12 +35,11 @@ class DrakeLcm::Impl {

explicit Impl(const DrakeLcmParams& params)
: requested_lcm_url_(params.lcm_url),
lcm_url_(requested_lcm_url_),
deferred_initialization_(params.defer_initialization),
lcm_(requested_lcm_url_),
channel_suffix_(params.channel_suffix) {
// This duplicates logic from external/lcm/lcm.c, but until LCM offers an
// API for this it's the best we can do.
lcm_url_ = requested_lcm_url_;
if (lcm_url_.empty()) {
char* env_url = ::getenv("LCM_DEFAULT_URL");
if (env_url) {
Expand All @@ -49,6 +49,7 @@ class DrakeLcm::Impl {
lcm_url_ = kLcmDefaultUrl;
}
}
// Check DRAKE_ALLOW_NETWORK.
if (lcm_url_.substr(0, 7) != "memq://") {
if (!drake::internal::IsNetworkingAllowed("lcm")) {
throw std::runtime_error(fmt::format(
Expand All @@ -57,6 +58,11 @@ class DrakeLcm::Impl {
lcm_url_));
}
}
// Create the native instance only after all other checks are finished.
lcm_ = ::lcm_create(lcm_url_.c_str());
if (lcm_ == nullptr) {
throw std::runtime_error("Failure on lcm_create()");
}
}

// Housekeeping: scrub any deallocated subscriptions.
Expand All @@ -72,7 +78,8 @@ class DrakeLcm::Impl {
const std::string requested_lcm_url_;
std::string lcm_url_;
bool deferred_initialization_{};
::lcm::LCM lcm_;
lcm_t* lcm_{};
std::unique_ptr<::lcm::LCM> lcm_cpp_; // Typically nullptr.
const std::string channel_suffix_;
std::vector<std::weak_ptr<DrakeSubscription>> subscriptions_;
std::string handle_subscriptions_error_message_;
Expand All @@ -90,7 +97,7 @@ DrakeLcm::DrakeLcm(const DrakeLcmParams& params)
// ctor) and NOT in the first HandleSubscriptions call. Without this,
// ThreadSanitizer builds may report false positives related to the
// self-test happening concurrently with LCM publishing.
impl_->lcm_.getFileno();
::lcm_get_fileno(impl_->lcm_);
}
}

Expand All @@ -99,17 +106,21 @@ std::string DrakeLcm::get_lcm_url() const {
}

::lcm::LCM* DrakeLcm::get_lcm_instance() {
return &impl_->lcm_;
if (impl_->lcm_cpp_ == nullptr) {
// Create the C++ wrapper only when requested by the user or our unit test.
impl_->lcm_cpp_ = std::make_unique<::lcm::LCM>(impl_->lcm_);
}
return impl_->lcm_cpp_.get();
}

void DrakeLcm::Publish(const std::string& channel, const void* data,
int data_size, std::optional<double>) {
DRAKE_THROW_UNLESS(!channel.empty());
if (impl_->channel_suffix_.empty()) {
impl_->lcm_.publish(channel, data, data_size);
::lcm_publish(impl_->lcm_, channel.c_str(), data, data_size);
} else {
const std::string actual_channel = channel + impl_->channel_suffix_;
impl_->lcm_.publish(actual_channel, data, data_size);
::lcm_publish(impl_->lcm_, actual_channel.c_str(), data, data_size);
}
}

Expand Down Expand Up @@ -137,7 +148,7 @@ class DrakeSubscription final : public DrakeSubscriptionInterface {
DrakeLcmInterface::MultichannelHandlerFunction;

static std::shared_ptr<DrakeSubscription> CreateSingleChannel(
::lcm::LCM* native_instance, const std::string& channel,
::lcm_t* native_instance, const std::string& channel,
HandlerFunction single_channel_handler) {
// N.B. The argument to CreateMultichannel is regex, so we need to escape
// the channel name as part delegating to it.
Expand All @@ -150,7 +161,7 @@ class DrakeSubscription final : public DrakeSubscriptionInterface {
}

static std::shared_ptr<DrakeSubscription> CreateMultichannel(
::lcm::LCM* native_instance, std::string_view channel_regex,
::lcm_t* native_instance, std::string_view channel_regex,
MultichannelHandlerFunction handler) {
DRAKE_DEMAND(native_instance != nullptr);
DRAKE_DEMAND(handler != nullptr);
Expand All @@ -177,7 +188,7 @@ class DrakeSubscription final : public DrakeSubscriptionInterface {
DRAKE_DEMAND(strong_self_reference_ == nullptr);
if (native_subscription_) {
DRAKE_DEMAND(native_instance_ != nullptr);
native_instance_->unsubscribe(native_subscription_);
::lcm_unsubscribe(native_instance_, native_subscription_);
}
}

Expand All @@ -197,17 +208,20 @@ class DrakeSubscription final : public DrakeSubscriptionInterface {
queue_capacity_ = capacity;
if (native_subscription_) {
DRAKE_DEMAND(native_instance_ != nullptr);
native_subscription_->setQueueCapacity(capacity);
::lcm_subscription_set_queue_capacity(native_subscription_,
queue_capacity_);
}
}

void AttachIfNeeded() {
if (native_subscription_ != nullptr) {
return;
}
native_subscription_ = native_instance_->subscribeFunction(
channel_regex_, &DrakeSubscription::NativeCallback, this);
native_subscription_->setQueueCapacity(queue_capacity_);
native_subscription_ =
::lcm_subscribe(native_instance_, channel_regex_.c_str(),
&DrakeSubscription::NativeCallback, this);
::lcm_subscription_set_queue_capacity(native_subscription_,
queue_capacity_);
}

// This is ONLY called from the DrakeLcm dtor. Thus, a HandleSubscriptions
Expand All @@ -216,7 +230,7 @@ class DrakeSubscription final : public DrakeSubscriptionInterface {
DRAKE_DEMAND(!weak_self_reference_.expired());
if (native_subscription_) {
DRAKE_DEMAND(native_instance_ != nullptr);
native_instance_->unsubscribe(native_subscription_);
::lcm_unsubscribe(native_instance_, native_subscription_);
}
native_instance_ = {};
native_subscription_ = {};
Expand All @@ -225,15 +239,6 @@ class DrakeSubscription final : public DrakeSubscriptionInterface {
strong_self_reference_ = {};
}

// The native LCM stack calls into here.
static void NativeCallback(const ::lcm::ReceiveBuffer* buffer,
const std::string& channel,
DrakeSubscription* self) {
DRAKE_DEMAND(buffer != nullptr);
DRAKE_DEMAND(self != nullptr);
self->InstanceCallback(channel, buffer);
}

private:
struct AsIfPrivateConstructor {};

Expand All @@ -243,19 +248,24 @@ class DrakeSubscription final : public DrakeSubscriptionInterface {
explicit DrakeSubscription(AsIfPrivateConstructor = {}) {}

private:
void InstanceCallback(const std::string& channel,
const ::lcm::ReceiveBuffer* buffer) {
DRAKE_DEMAND(!weak_self_reference_.expired());
if (user_callback_ != nullptr) {
user_callback_(channel, buffer->data, buffer->data_size);
// The native LCM stack calls into here.
static void NativeCallback(const ::lcm_recv_buf_t* buffer,
const char* channel, void* user_data) {
DRAKE_DEMAND(buffer != nullptr);
DRAKE_DEMAND(channel != nullptr);
DRAKE_DEMAND(user_data != nullptr);
auto* self = static_cast<DrakeSubscription*>(user_data);
DRAKE_DEMAND(!self->weak_self_reference_.expired());
if (self->user_callback_ != nullptr) {
self->user_callback_(channel, buffer->data, buffer->data_size);
}
}

std::string channel_regex_;

// The native handle we can use to unsubscribe.
::lcm::LCM* native_instance_{};
::lcm::Subscription* native_subscription_{};
::lcm_t* native_instance_{};
::lcm_subscription_t* native_subscription_{};
int queue_capacity_{1};

DrakeLcmInterface::MultichannelHandlerFunction user_callback_;
Expand All @@ -276,7 +286,7 @@ std::shared_ptr<DrakeSubscriptionInterface> DrakeLcm::Subscribe(
// Add the new subscriber.
const std::string actual_channel = channel + impl_->channel_suffix_;
auto result = DrakeSubscription::CreateSingleChannel(
&(impl_->lcm_), actual_channel, std::move(handler));
impl_->lcm_, actual_channel, std::move(handler));
if (!impl_->deferred_initialization_) {
result->AttachIfNeeded();
}
Expand Down Expand Up @@ -306,8 +316,7 @@ std::shared_ptr<DrakeSubscriptionInterface> DrakeLcm::SubscribeMultichannel(

// Add the new subscriber.
auto result = DrakeSubscription::CreateMultichannel(
&(impl_->lcm_),
std::string(regex) + ConvertLiteralStringToLcmRegex(suffix),
impl_->lcm_, std::string(regex) + ConvertLiteralStringToLcmRegex(suffix),
std::move(handler));
if (!impl_->deferred_initialization_) {
result->AttachIfNeeded();
Expand All @@ -332,8 +341,8 @@ int DrakeLcm::HandleSubscriptions(int timeout_millis) {
// Keep pumping handleTimeout until it's empty, but only pause for the
// timeout on the first attempt.
int total_messages = 0;
int zero_or_one = impl_->lcm_.handleTimeout(timeout_millis);
for (; zero_or_one > 0; zero_or_one = impl_->lcm_.handleTimeout(0)) {
int zero_or_one = ::lcm_handle_timeout(impl_->lcm_, timeout_millis);
for (; zero_or_one > 0; zero_or_one = ::lcm_handle_timeout(impl_->lcm_, 0)) {
DRAKE_DEMAND(zero_or_one == 1);
++total_messages;
}
Expand Down Expand Up @@ -362,6 +371,7 @@ DrakeLcm::~DrakeLcm() {
subscription->Detach();
}
}
::lcm_destroy(impl_->lcm_);
}

} // namespace lcm
Expand Down

0 comments on commit d34b2dc

Please sign in to comment.