Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[communication-ms] Communication with simulator #117

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions common/cpp/robocin/network/udp_multicast_socket_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <netinet/in.h>
#include <stdexcept>
#include <string>
#include <cstring>
#include <iostream>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/time.h>
Expand Down Expand Up @@ -48,17 +50,35 @@ void UdpMulticastSocketSender::connect(std::string_view ip_address, int port) co
}

void UdpMulticastSocketSender::send(const send_type& message) const {
if (::send(fd_, message.data(), message.size(), 0) == -1)
throw std::runtime_error("failed to send message.");
else if (static_cast<size_t>(::send(fd_, message.data(), message.size(), 0)) != message.size())
throw std::runtime_error("failed to send all message.");
// Simulator uses this one
uint32_t message_size = message.size();
ssize_t bytes_sent = ::send(fd_, message.data(), message_size, 0);

if (bytes_sent == message_size) {
// Sent successfully
return;
}

// Error occured
int err = errno;
const char* error_message = strerror(err);
std::cout << "[" << bytes_sent << "]: " << error_message << "\n";
}

void UdpMulticastSocketSender::send(std::string_view message) const {
if (::send(fd_, message.data(), message.size(), 0) == -1)
throw std::runtime_error("failed to send message.");
else if (static_cast<size_t>(::send(fd_, message.data(), message.size(), 0)) != message.size())
throw std::runtime_error("failed to send all message.");
// Simulator uses this one
uint32_t message_size = message.size();
ssize_t bytes_sent = ::send(fd_, message.data(), message_size, 0);

if (bytes_sent == message_size) {
// Sent successfully
return;
}

// Error occured
int err = errno;
const char* error_message = strerror(err);
std::cout << "[" << bytes_sent << "]: " << error_message << "\n";
}

int UdpMulticastSocketSender::fd() const { return fd_; }
Expand Down
3 changes: 3 additions & 0 deletions common/cpp/robocin/wip/service_discovery/addresses.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ inline constexpr std::string_view kNavigationOutputTopic = "navigation-output";
inline constexpr std::string_view kRobotAddress = "224.0.0.1";
inline constexpr int kRobotPort = 19901;

inline constexpr std::string_view kSimmAddress = "127.0.0.1";
inline constexpr int kSimmPort = 10301;

inline constexpr std::string_view kDecisionAddress = "ipc:///tmp/.ssl-core/decision.ipc";
inline constexpr std::string_view kDecisionTopic = "decision";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <print>
#include <robocin/detection_util/clock.h>
#include <robocin/memory/object_ptr.h>
#include <robocin/network/udp_multicast_socket_sender.h>
#include <robocin/network/zmq_poller.h>
#include <robocin/network/zmq_subscriber_socket.h>
#include <robocin/parameters/parameters.h>
Expand Down Expand Up @@ -43,21 +44,27 @@ using ::robocin::ZmqSubscriberSocket;
std::unique_ptr<IMessageReceiver> makeMessageReceiver() {
static constexpr std::array kNavigationTopics = {service_discovery::kNavigationOutputTopic};
static constexpr std::array kGatewayTopics = {service_discovery::kGameControllerRefereeTopic};
static constexpr std::array kPerceptionTopics = {service_discovery::kPerceptionDetectionTopic};

std::unique_ptr<IZmqSubscriberSocket> gateway_socket = std::make_unique<ZmqSubscriberSocket>();
gateway_socket->connect(service_discovery::kGatewayAddress, kGatewayTopics);

std::unique_ptr<IZmqSubscriberSocket> navigation_socket = std::make_unique<ZmqSubscriberSocket>();
navigation_socket->connect(service_discovery::kNavigationAddress, kNavigationTopics);

std::unique_ptr<IZmqSubscriberSocket> perception_socket = std::make_unique<ZmqSubscriberSocket>();
perception_socket->connect(service_discovery::kPerceptionAddress, kPerceptionTopics);

std::unique_ptr<IZmqPoller> zmq_poller = std::make_unique<ZmqPoller>();
zmq_poller->push(*gateway_socket);
zmq_poller->push(*navigation_socket);
zmq_poller->push(*perception_socket);

std::unique_ptr<IPayloadMapper> payload_mapper = std::make_unique<PayloadMapper>();

return std::make_unique<MessageReceiver>(std::move(navigation_socket),
std::move(gateway_socket),
std::move(perception_socket),
std::move(zmq_poller),
std::move(payload_mapper));
}
Expand All @@ -81,7 +88,13 @@ std::unique_ptr<IMessageSender> makeMessageSender() {
= std::make_unique<UdpMulticastSocketSender>();
robot_socket->connect(service_discovery::kRobotAddress, service_discovery::kRobotPort);

return std::make_unique<MessageSender>(std::move(communication_socket), std::move(robot_socket));
std::unique_ptr<IUdpMulticastSocketSender> simm_socket
= std::make_unique<UdpMulticastSocketSender>();
simm_socket->connect(service_discovery::kSimmAddress, service_discovery::kSimmPort);

return std::make_unique<MessageSender>(std::move(communication_socket),
std::move(robot_socket),
std::move(simm_socket));
}

std::unique_ptr<IController> makeConsumer(object_ptr<IConcurrentQueue<Payload>> messages) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#include "communication/controller/consumer_controller.h"

#include "communication/messaging/receiver/payload.h"

#include "communication/parameters/parameters.h"
#include <cstddef>
#include <robocin/concurrency/concurrent_queue.h>
#include <robocin/memory/object_ptr.h>
#include <robocin/output/log.h>
Expand All @@ -19,6 +20,12 @@ using ::protocols::communication::Communication;

} // namespace rc

namespace tp {

using ::protocols::third_party::simulation::RobotControl;

} // namespace tp

} // namespace

ConsumerController::ConsumerController(
Expand Down Expand Up @@ -46,11 +53,20 @@ void ConsumerController::exec(std::span<const Payload> payloads) {
// return;
// }

if (std::optional<rc::Communication> robot_command = communication_processor_->process(payloads);
robot_command != std::nullopt) {
// ilog("command: {} sent.", robot_command->command().DebugString());
message_sender_->send(*robot_command);
}
if (pUseSimulator()) {
if (std::optional<tp::RobotControl> robot_control
= communication_processor_->processSimulator(payloads);
robot_control != std::nullopt) {
ilog("control: {} sent.", robot_control->DebugString());
message_sender_->send(*robot_control);
}
} else {
if (std::optional<rc::Communication> robot_command
= communication_processor_->processReal(payloads);
robot_command != std::nullopt) {
ilog("command: {} sent.", robot_command->DebugString());
// message_sender_->send(*robot_command);
}}
}

} // namespace communication
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ using ::robocin::ZmqDatagram;

MessageReceiver::MessageReceiver(std::unique_ptr<IZmqSubscriberSocket> navigation_socket,
std::unique_ptr<IZmqSubscriberSocket> gateway_socket,
std::unique_ptr<IZmqSubscriberSocket> perception_socket,
std::unique_ptr<IZmqPoller> zmq_poller,
std::unique_ptr<IPayloadMapper> payload_mapper) :
navigation_socket_{std::move(navigation_socket)},
gateway_socket_{std::move(gateway_socket)},
perception_socket_{std::move(perception_socket)},
zmq_poller_{std::move(zmq_poller)},
payload_mapper_{std::move(payload_mapper)} {}

Expand All @@ -33,6 +35,14 @@ Payload MessageReceiver::receive() {
while (datagrams.empty()) {
zmq_poller_->poll(pCommunicationPollerTimeoutMs());

while (true) {
ZmqDatagram perception_zmq_datagram = zmq_poller_->receive(*perception_socket_);
if (perception_zmq_datagram.empty()) {
break;
}
datagrams.emplace_back(std::move(perception_zmq_datagram));
}

while (true) {
ZmqDatagram navigation_zmq_datagram = zmq_poller_->receive(*navigation_socket_);
if (navigation_zmq_datagram.empty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class MessageReceiver : public IMessageReceiver {
public:
MessageReceiver(std::unique_ptr<::robocin::IZmqSubscriberSocket> navigation_socket,
std::unique_ptr<::robocin::IZmqSubscriberSocket> gateway_socket,
std::unique_ptr<::robocin::IZmqSubscriberSocket> perception_socket,
std::unique_ptr<::robocin::IZmqPoller> zmq_poller,
std::unique_ptr<IPayloadMapper> payload_mapper);

Expand All @@ -36,6 +37,7 @@ class MessageReceiver : public IMessageReceiver {
private:
std::unique_ptr<::robocin::IZmqSubscriberSocket> navigation_socket_;
std::unique_ptr<::robocin::IZmqSubscriberSocket> gateway_socket_;
std::unique_ptr<::robocin::IZmqSubscriberSocket> perception_socket_;
std::unique_ptr<::robocin::IZmqPoller> zmq_poller_;
std::unique_ptr<IPayloadMapper> payload_mapper_;
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#include "communication/messaging/receiver/payload.h"

#include <protocols/perception/detection.pb.h>

namespace communication {
namespace {

namespace rc {

using ::protocols::navigation::Navigation;
using ::protocols::perception::Detection;

} // namespace rc

Expand All @@ -17,14 +20,21 @@ using ::protocols::third_party::game_controller::Referee;

} // namespace

Payload::Payload(std::vector<tp::Referee> referee, std::vector<rc::Navigation> navigation) :
Payload::Payload(std::vector<tp::Referee> referee,
std::vector<rc::Navigation> navigation,
std::vector<rc::Detection> detection) :
referee_{std::move(referee)},
navigation_{std::move(navigation)} {}
navigation_{std::move(navigation)},
detection_{std::move(detection)} {}

std::span<const rc::Navigation> Payload::getNavigation() const { return navigation_; }

std::span<const tp::Referee> Payload::getReferee() const { return referee_; }

bool Payload::empty() const { return referee_.empty() and navigation_.empty(); }
std::span<const rc::Detection> Payload::getDetections() const { return detection_; }

bool Payload::empty() const {
return referee_.empty() and navigation_.empty() and detection_.empty();
}

} // namespace communication
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define COMMUNICATION_MESSAGING_RECEIVER_PAYLOAD_H

#include <protocols/navigation/navigation.pb.h>
#include <protocols/perception/detection.pb.h>
#include <protocols/third_party/game_controller/referee.pb.h>
#include <robocin/network/zmq_datagram.h>
#include <span>
Expand All @@ -12,15 +13,18 @@ namespace communication {
class Payload {
public:
Payload(std::vector<::protocols::third_party::game_controller::Referee> referee,
std::vector<::protocols::navigation::Navigation> navigation);
std::vector<::protocols::navigation::Navigation> navigation,
std::vector<::protocols::perception::Detection> detection);

[[nodiscard]] std::span<const ::protocols::navigation::Navigation> getNavigation() const;
[[nodiscard]] std::span<const ::protocols::third_party::game_controller::Referee>
getReferee() const;
[[nodiscard]] std::span<const ::protocols::perception::Detection> getDetections() const;

[[nodiscard]] bool empty() const;

private:
std::vector<::protocols::perception::Detection> detection_;
std::vector<::protocols::navigation::Navigation> navigation_;
std::vector<::protocols::third_party::game_controller::Referee> referee_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "communication/messaging/receiver/payload.h"

#include <protocols/perception/detection.pb.h>
#include <robocin/network/zmq_datagram.h>
#include <robocin/output/log.h>
#include <robocin/wip/service_discovery/addresses.h>
Expand All @@ -18,6 +19,7 @@ using ::robocin::ZmqDatagram;
namespace rc {

using ::protocols::navigation::Navigation;
using ::protocols::perception::Detection;

} // namespace rc

Expand All @@ -32,6 +34,7 @@ using ::protocols::third_party::game_controller::Referee;
Payload PayloadMapper::fromZmqDatagrams(std::span<const ZmqDatagram> messages) const {
std::vector<rc::Navigation> navigation;
std::vector<tp::Referee> referee;
std::vector<rc::Detection> detection;

for (const ZmqDatagram& zmq_datagram : messages) {
if (zmq_datagram.topic() == service_discovery::kNavigationOutputTopic) {
Expand All @@ -44,12 +47,17 @@ Payload PayloadMapper::fromZmqDatagrams(std::span<const ZmqDatagram> messages) c
referee_message.ParseFromString(std::string{zmq_datagram.message()});
referee.emplace_back(std::move(referee_message));

} else if (zmq_datagram.topic() == service_discovery::kPerceptionDetectionTopic) {
rc::Detection detection_message;
detection_message.ParseFromString(std::string{zmq_datagram.message()});
// robocin::ilog("Detection message received: {}", detection_message.DebugString());
detection.emplace_back(std::move(detection_message));
} else {
// wlog("zmq_datagram with topic '{}' not processed.", zmq_datagram.topic());
}
}

return Payload{std::move(referee), std::move(navigation)};
return Payload{std::move(referee), std::move(navigation), std::move(detection)};
}

} // namespace communication
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include "communication/messaging/sender/message_sender.h"

#include "message_sender.h"

#include <protocols/perception/detection.pb.h>
#include <protocols/third_party/simulation/robot_control.pb.h>
#include <robocin/output/log.h>
#include <robocin/wip/service_discovery/addresses.h>

Expand All @@ -17,15 +20,23 @@ using ::protocols::communication::Communication;

} // namespace rc

namespace tp {

using ::protocols::third_party::simulation::RobotControl;

} // namespace tp

} // namespace

MessageSender::MessageSender(std::unique_ptr<::robocin::IZmqPublisherSocket> communication_socket,
std::unique_ptr<::robocin::IUdpMulticastSocketSender> robot_socket) :
std::unique_ptr<::robocin::IUdpMulticastSocketSender> robot_socket,
std::unique_ptr<::robocin::IUdpMulticastSocketSender> simm_socket) :
communication_socket_{std::move(communication_socket)},
robot_socket_{std::move(robot_socket)} {}
robot_socket_{std::move(robot_socket)},
simm_socket_{std::move(simm_socket)} {}

void MessageSender::send(const rc::Communication& robot_command) {
ilog("sending... {}", robot_command.DebugString());
// ilog("sending...robot_command {}", robot_command.DebugString());

communication_socket_->send({
service_discovery::kCommunicationCommandTopic,
Expand All @@ -36,4 +47,16 @@ void MessageSender::send(const rc::Communication& robot_command) {
// ilog("Message sent");
}

void MessageSender::send(const tp::RobotControl& robot_control) {
ilog("sending...robot_control {}", robot_control.DebugString());

communication_socket_->send({
service_discovery::kCommunicationCommandTopic,
robot_control.SerializeAsString(),
});
simm_socket_->send(robot_control.SerializeAsString());

// ilog("Message sent");
}

} // namespace communication
Loading