From 7c7b062be19867304c00e99ff5257649b9235ae2 Mon Sep 17 00:00:00 2001 From: Jeremy Nimmer Date: Tue, 5 Sep 2023 07:25:25 -0700 Subject: [PATCH] [lcm] Eschew LCM C++ API (use C instead) C++ is just pointlessly slower, more complicated, and less ABI-stable. --- lcm/drake_lcm.cc | 84 +++++++++++++++++++++++++++--------------------- 1 file changed, 47 insertions(+), 37 deletions(-) diff --git a/lcm/drake_lcm.cc b/lcm/drake_lcm.cc index 719e06f0d310..f3672549b3c7 100644 --- a/lcm/drake_lcm.cc +++ b/lcm/drake_lcm.cc @@ -7,7 +7,8 @@ #include #include -#include +#include // N.B. C++ is used only by get_lcm_instance(). +#include #include "drake/common/drake_assert.h" #include "drake/common/drake_copyable.h" @@ -34,12 +35,11 @@ class DrakeLcm::Impl { explicit Impl(const DrakeLcmParams& params) : requested_lcm_url_(params.lcm_url), - lcm_url_(requested_lcm_url_), deferred_initialization_(params.defer_initialization), - lcm_(requested_lcm_url_), channel_suffix_(params.channel_suffix) { // This duplicates logic from external/lcm/lcm.c, but until LCM offers an // API for this it's the best we can do. + lcm_url_ = requested_lcm_url_; if (lcm_url_.empty()) { char* env_url = ::getenv("LCM_DEFAULT_URL"); if (env_url) { @@ -49,6 +49,7 @@ class DrakeLcm::Impl { lcm_url_ = kLcmDefaultUrl; } } + // Check DRAKE_ALLOW_NETWORK. if (lcm_url_.substr(0, 7) != "memq://") { if (!drake::internal::IsNetworkingAllowed("lcm")) { throw std::runtime_error(fmt::format( @@ -57,6 +58,11 @@ class DrakeLcm::Impl { lcm_url_)); } } + // Create the native instance only after all other checks are finished. + lcm_ = ::lcm_create(lcm_url_.c_str()); + if (lcm_ == nullptr) { + throw std::runtime_error("Failure on lcm_create()"); + } } // Housekeeping: scrub any deallocated subscriptions. @@ -72,7 +78,8 @@ class DrakeLcm::Impl { const std::string requested_lcm_url_; std::string lcm_url_; bool deferred_initialization_{}; - ::lcm::LCM lcm_; + lcm_t* lcm_{}; + std::unique_ptr<::lcm::LCM> lcm_cpp_; // Typically nullptr. const std::string channel_suffix_; std::vector> subscriptions_; std::string handle_subscriptions_error_message_; @@ -90,7 +97,7 @@ DrakeLcm::DrakeLcm(const DrakeLcmParams& params) // ctor) and NOT in the first HandleSubscriptions call. Without this, // ThreadSanitizer builds may report false positives related to the // self-test happening concurrently with LCM publishing. - impl_->lcm_.getFileno(); + ::lcm_get_fileno(impl_->lcm_); } } @@ -99,17 +106,21 @@ std::string DrakeLcm::get_lcm_url() const { } ::lcm::LCM* DrakeLcm::get_lcm_instance() { - return &impl_->lcm_; + if (impl_->lcm_cpp_ == nullptr) { + // Create the C++ wrapper only when requested by the user or our unit test. + impl_->lcm_cpp_ = std::make_unique<::lcm::LCM>(impl_->lcm_); + } + return impl_->lcm_cpp_.get(); } void DrakeLcm::Publish(const std::string& channel, const void* data, int data_size, std::optional) { DRAKE_THROW_UNLESS(!channel.empty()); if (impl_->channel_suffix_.empty()) { - impl_->lcm_.publish(channel, data, data_size); + ::lcm_publish(impl_->lcm_, channel.c_str(), data, data_size); } else { const std::string actual_channel = channel + impl_->channel_suffix_; - impl_->lcm_.publish(actual_channel, data, data_size); + ::lcm_publish(impl_->lcm_, actual_channel.c_str(), data, data_size); } } @@ -137,7 +148,7 @@ class DrakeSubscription final : public DrakeSubscriptionInterface { DrakeLcmInterface::MultichannelHandlerFunction; static std::shared_ptr CreateSingleChannel( - ::lcm::LCM* native_instance, const std::string& channel, + ::lcm_t* native_instance, const std::string& channel, HandlerFunction single_channel_handler) { // N.B. The argument to CreateMultichannel is regex, so we need to escape // the channel name as part delegating to it. @@ -150,7 +161,7 @@ class DrakeSubscription final : public DrakeSubscriptionInterface { } static std::shared_ptr CreateMultichannel( - ::lcm::LCM* native_instance, std::string_view channel_regex, + ::lcm_t* native_instance, std::string_view channel_regex, MultichannelHandlerFunction handler) { DRAKE_DEMAND(native_instance != nullptr); DRAKE_DEMAND(handler != nullptr); @@ -177,7 +188,7 @@ class DrakeSubscription final : public DrakeSubscriptionInterface { DRAKE_DEMAND(strong_self_reference_ == nullptr); if (native_subscription_) { DRAKE_DEMAND(native_instance_ != nullptr); - native_instance_->unsubscribe(native_subscription_); + ::lcm_unsubscribe(native_instance_, native_subscription_); } } @@ -197,7 +208,8 @@ class DrakeSubscription final : public DrakeSubscriptionInterface { queue_capacity_ = capacity; if (native_subscription_) { DRAKE_DEMAND(native_instance_ != nullptr); - native_subscription_->setQueueCapacity(capacity); + ::lcm_subscription_set_queue_capacity(native_subscription_, + queue_capacity_); } } @@ -205,9 +217,11 @@ class DrakeSubscription final : public DrakeSubscriptionInterface { if (native_subscription_ != nullptr) { return; } - native_subscription_ = native_instance_->subscribeFunction( - channel_regex_, &DrakeSubscription::NativeCallback, this); - native_subscription_->setQueueCapacity(queue_capacity_); + native_subscription_ = + ::lcm_subscribe(native_instance_, channel_regex_.c_str(), + &DrakeSubscription::NativeCallback, this); + ::lcm_subscription_set_queue_capacity(native_subscription_, + queue_capacity_); } // This is ONLY called from the DrakeLcm dtor. Thus, a HandleSubscriptions @@ -216,7 +230,7 @@ class DrakeSubscription final : public DrakeSubscriptionInterface { DRAKE_DEMAND(!weak_self_reference_.expired()); if (native_subscription_) { DRAKE_DEMAND(native_instance_ != nullptr); - native_instance_->unsubscribe(native_subscription_); + ::lcm_unsubscribe(native_instance_, native_subscription_); } native_instance_ = {}; native_subscription_ = {}; @@ -225,15 +239,6 @@ class DrakeSubscription final : public DrakeSubscriptionInterface { strong_self_reference_ = {}; } - // The native LCM stack calls into here. - static void NativeCallback(const ::lcm::ReceiveBuffer* buffer, - const std::string& channel, - DrakeSubscription* self) { - DRAKE_DEMAND(buffer != nullptr); - DRAKE_DEMAND(self != nullptr); - self->InstanceCallback(channel, buffer); - } - private: struct AsIfPrivateConstructor {}; @@ -243,19 +248,24 @@ class DrakeSubscription final : public DrakeSubscriptionInterface { explicit DrakeSubscription(AsIfPrivateConstructor = {}) {} private: - void InstanceCallback(const std::string& channel, - const ::lcm::ReceiveBuffer* buffer) { - DRAKE_DEMAND(!weak_self_reference_.expired()); - if (user_callback_ != nullptr) { - user_callback_(channel, buffer->data, buffer->data_size); + // The native LCM stack calls into here. + static void NativeCallback(const ::lcm_recv_buf_t* buffer, + const char* channel, void* user_data) { + DRAKE_DEMAND(buffer != nullptr); + DRAKE_DEMAND(channel != nullptr); + DRAKE_DEMAND(user_data != nullptr); + auto* self = static_cast(user_data); + DRAKE_DEMAND(!self->weak_self_reference_.expired()); + if (self->user_callback_ != nullptr) { + self->user_callback_(channel, buffer->data, buffer->data_size); } } std::string channel_regex_; // The native handle we can use to unsubscribe. - ::lcm::LCM* native_instance_{}; - ::lcm::Subscription* native_subscription_{}; + ::lcm_t* native_instance_{}; + ::lcm_subscription_t* native_subscription_{}; int queue_capacity_{1}; DrakeLcmInterface::MultichannelHandlerFunction user_callback_; @@ -276,7 +286,7 @@ std::shared_ptr DrakeLcm::Subscribe( // Add the new subscriber. const std::string actual_channel = channel + impl_->channel_suffix_; auto result = DrakeSubscription::CreateSingleChannel( - &(impl_->lcm_), actual_channel, std::move(handler)); + impl_->lcm_, actual_channel, std::move(handler)); if (!impl_->deferred_initialization_) { result->AttachIfNeeded(); } @@ -306,8 +316,7 @@ std::shared_ptr DrakeLcm::SubscribeMultichannel( // Add the new subscriber. auto result = DrakeSubscription::CreateMultichannel( - &(impl_->lcm_), - std::string(regex) + ConvertLiteralStringToLcmRegex(suffix), + impl_->lcm_, std::string(regex) + ConvertLiteralStringToLcmRegex(suffix), std::move(handler)); if (!impl_->deferred_initialization_) { result->AttachIfNeeded(); @@ -332,8 +341,8 @@ int DrakeLcm::HandleSubscriptions(int timeout_millis) { // Keep pumping handleTimeout until it's empty, but only pause for the // timeout on the first attempt. int total_messages = 0; - int zero_or_one = impl_->lcm_.handleTimeout(timeout_millis); - for (; zero_or_one > 0; zero_or_one = impl_->lcm_.handleTimeout(0)) { + int zero_or_one = ::lcm_handle_timeout(impl_->lcm_, timeout_millis); + for (; zero_or_one > 0; zero_or_one = ::lcm_handle_timeout(impl_->lcm_, 0)) { DRAKE_DEMAND(zero_or_one == 1); ++total_messages; } @@ -362,6 +371,7 @@ DrakeLcm::~DrakeLcm() { subscription->Detach(); } } + ::lcm_destroy(impl_->lcm_); } } // namespace lcm