Skip to content

Commit

Permalink
transport(ptp): move from protobuf to fixed-size c-struct
Browse files Browse the repository at this point in the history
This commit moves the abstraction of a PTP message from a protobuf
object to a fixed-size C-struct with a heap pointer. The rationale
is that these PTP messages move through the system, and even when
careful it is challenging to keep track of the copies allocations
that protobuf is doing under the hood. In exchange, we are very
explicity about the copies and allocations we do.
  • Loading branch information
csegarragonz committed Feb 29, 2024
1 parent 7483943 commit e0099ec
Show file tree
Hide file tree
Showing 17 changed files with 599 additions and 240 deletions.
21 changes: 6 additions & 15 deletions include/faabric/transport/PointToPointBroker.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <faabric/batch-scheduler/SchedulingDecision.h>
#include <faabric/transport/PointToPointClient.h>
#include <faabric/transport/PointToPointMessage.h>
#include <faabric/util/config.h>
#include <faabric/util/locks.h>

Expand Down Expand Up @@ -120,27 +121,16 @@ class PointToPointBroker

void updateHostForIdx(int groupId, int groupIdx, std::string newHost);

void sendMessage(int groupId,
int sendIdx,
int recvIdx,
const uint8_t* buffer,
size_t bufferSize,
void sendMessage(const PointToPointMessage& msg,
std::string hostHint,
bool mustOrderMsg = false);

void sendMessage(int groupId,
int sendIdx,
int recvIdx,
const uint8_t* buffer,
size_t bufferSize,
void sendMessage(const PointToPointMessage& msg,
bool mustOrderMsg = false,
int sequenceNum = NO_SEQUENCE_NUM,
std::string hostHint = "");

std::vector<uint8_t> recvMessage(int groupId,
int sendIdx,
int recvIdx,
bool mustOrderMsg = false);
void recvMessage(PointToPointMessage& msg, bool mustOrderMsg = false);

void clearGroup(int groupId);

Expand All @@ -163,7 +153,8 @@ class PointToPointBroker

std::shared_ptr<faabric::util::FlagWaiter> getGroupFlag(int groupId);

Message doRecvMessage(int groupId, int sendIdx, int recvIdx);
// Returns the message response code and the sequence number
std::pair<MessageResponseCode, int> doRecvMessage(PointToPointMessage& msg);

void initSequenceCounters(int groupId);

Expand Down
7 changes: 4 additions & 3 deletions include/faabric/transport/PointToPointClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
#include <faabric/proto/faabric.pb.h>
#include <faabric/transport/MessageEndpointClient.h>
#include <faabric/transport/PointToPointCall.h>
#include <faabric/transport/PointToPointMessage.h>

namespace faabric::transport {

std::vector<std::pair<std::string, faabric::PointToPointMappings>>
getSentMappings();

std::vector<std::pair<std::string, faabric::PointToPointMessage>>
std::vector<std::pair<std::string, PointToPointMessage>>
getSentPointToPointMessages();

std::vector<std::tuple<std::string,
faabric::transport::PointToPointCall,
faabric::PointToPointMessage>>
PointToPointMessage>>
getSentLockMessages();

void clearSentMessages();
Expand All @@ -26,7 +27,7 @@ class PointToPointClient : public faabric::transport::MessageEndpointClient

void sendMappings(faabric::PointToPointMappings& mappings);

void sendMessage(faabric::PointToPointMessage& msg,
void sendMessage(const PointToPointMessage& msg,
int sequenceNum = NO_SEQUENCE_NUM);

void groupLock(int appId,
Expand Down
45 changes: 45 additions & 0 deletions include/faabric/transport/PointToPointMessage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#pragma once

#include <cstdint>
#include <span>

namespace faabric::transport {

/* Simple fixed-size C-struct to capture the state of a PTP message moving
* through Faabric.
*
* We require fixed-size, and no unique pointers to be able to use
* high-throughput ring-buffers to send the messages around. This also means
* that we manually malloc/free the data pointer. The message size is:
* 4 * int32_t = 4 * 4 bytes = 16 bytes
* 1 * size_t = 1 * 8 bytes = 8 bytes
* 1 * void* = 1 * 8 bytes = 8 bytes
* total = 32 bytes = 4 * 8 so the struct is naturally 8 byte-aligned
*/
struct PointToPointMessage
{
int32_t appId;
int32_t groupId;
int32_t sendIdx;
int32_t recvIdx;
size_t dataSize;
void* dataPtr;
};
static_assert((sizeof(PointToPointMessage) % 8) == 0,
"PTP message mus be 8-aligned!");

// The wire format for a PTP message is very simple: the fixed-size struct,
// followed by dataSize bytes containing the payload.
void serializePtpMsg(std::span<uint8_t> buffer, const PointToPointMessage& msg);

// This parsing function mallocs space for the message payload. This is to
// keep the PTP message at fixed-size, and be able to efficiently move it
// around in-memory queues
void parsePtpMsg(std::span<const uint8_t> bytes, PointToPointMessage* msg);

// Alternative signature for parsing PTP messages for when the caller can
// provide an already-allocated buffer to write into
void parsePtpMsg(std::span<const uint8_t> bytes,
PointToPointMessage* msg,
std::span<uint8_t> preAllocBuffer);
}
39 changes: 26 additions & 13 deletions src/mpi/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <faabric/mpi/MpiWorld.h>
#include <faabric/mpi/mpi.pb.h>
#include <faabric/planner/PlannerClient.h>
#include <faabric/transport/PointToPointMessage.h>
#include <faabric/transport/macros.h>
#include <faabric/util/ExecGraph.h>
#include <faabric/util/batch.h>
Expand Down Expand Up @@ -60,14 +61,16 @@ void MpiWorld::sendRemoteMpiMessage(std::string dstHost,
throw std::runtime_error("Error serialising message");
}
try {
broker.sendMessage(
thisRankMsg->groupid(),
sendRank,
recvRank,
reinterpret_cast<const uint8_t*>(serialisedBuffer.data()),
serialisedBuffer.size(),
dstHost,
true);
// It is safe to send a pointer to a stack-allocated object
// because the broker will make an additional copy (and so will NNG!)
faabric::transport::PointToPointMessage msg(
{ .groupId = thisRankMsg->groupid(),
.sendIdx = sendRank,
.recvIdx = recvRank,
.dataSize = serialisedBuffer.size(),
.dataPtr = (void*)serialisedBuffer.data() });

Check warning on line 71 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L66-L71

Added lines #L66 - L71 were not covered by tests

broker.sendMessage(msg, dstHost, true);

Check warning on line 73 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L73

Added line #L73 was not covered by tests
} catch (std::runtime_error& e) {
SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - send {} -> {}",
thisRankMsg->appid(),
Expand All @@ -82,10 +85,12 @@ void MpiWorld::sendRemoteMpiMessage(std::string dstHost,
std::shared_ptr<MPIMessage> MpiWorld::recvRemoteMpiMessage(int sendRank,
int recvRank)
{
std::vector<uint8_t> msg;
faabric::transport::PointToPointMessage msg(
{ .groupId = thisRankMsg->groupid(),
.sendIdx = sendRank,
.recvIdx = recvRank });

Check warning on line 91 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L88-L91

Added lines #L88 - L91 were not covered by tests
try {
msg =
broker.recvMessage(thisRankMsg->groupid(), sendRank, recvRank, true);
broker.recvMessage(msg, true);

Check warning on line 93 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L93

Added line #L93 was not covered by tests
} catch (std::runtime_error& e) {
SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - recv (remote) {} -> {}",
thisRankMsg->appid(),
Expand All @@ -95,7 +100,12 @@ std::shared_ptr<MPIMessage> MpiWorld::recvRemoteMpiMessage(int sendRank,
recvRank);
throw e;
}
PARSE_MSG(MPIMessage, msg.data(), msg.size());

// Parsing into the protobuf makes a copy of the message, so we can
// free the heap pointer after
PARSE_MSG(MPIMessage, msg.dataPtr, msg.dataSize);
faabric::util::free(msg.dataPtr);

Check warning on line 107 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L106-L107

Added lines #L106 - L107 were not covered by tests

return std::make_shared<MPIMessage>(parsedMsg);
}

Expand Down Expand Up @@ -599,7 +609,10 @@ void MpiWorld::doRecv(std::shared_ptr<MPIMessage>& m,
// Assert message integrity
// Note - this checks won't happen in Release builds
if (m->messagetype() != messageType) {
SPDLOG_ERROR("Different message types (got: {}, expected: {})",
SPDLOG_ERROR("{}:{}:{} Different message types (got: {}, expected: {})",
m->worldid(),
m->sender(),
m->destination(),

Check warning on line 615 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L612-L615

Added lines #L612 - L615 were not covered by tests
m->messagetype(),
messageType);
}
Expand Down
8 changes: 0 additions & 8 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,6 @@ message StateAppendedResponse {
// POINT-TO-POINT
// ---------------------------------------------

message PointToPointMessage {
int32 appId = 1;
int32 groupId = 2;
int32 sendIdx = 3;
int32 recvIdx = 4;
bytes data = 5;
}

message PointToPointMappings {
int32 appId = 1;
int32 groupId = 2;
Expand Down
26 changes: 23 additions & 3 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,12 +456,32 @@ Scheduler::checkForMigrationOpportunities(faabric::Message& msg,
auto groupIdxs = broker.getIdxsRegisteredForGroup(groupId);
groupIdxs.erase(0);
for (const auto& recvIdx : groupIdxs) {
broker.sendMessage(
groupId, 0, recvIdx, BYTES_CONST(&newGroupId), sizeof(int));
// It is safe to send a pointer to the stack, because the
// transport layer will perform an additional copy of the PTP
// message to put it in the message body
// TODO(no-inproc): this may not be true once we move the inproc
// sockets to in-memory queues
faabric::transport::PointToPointMessage msg(
{ .groupId = groupId,
.sendIdx = 0,
.recvIdx = recvIdx,
.dataSize = sizeof(int),
.dataPtr = &newGroupId });

broker.sendMessage(msg);
}
} else if (overwriteNewGroupId == 0) {
std::vector<uint8_t> bytes = broker.recvMessage(groupId, 0, groupIdx);
faabric::transport::PointToPointMessage msg(
{ .groupId = groupId, .sendIdx = 0, .recvIdx = groupIdx });

Check warning on line 475 in src/scheduler/Scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

src/scheduler/Scheduler.cpp#L474-L475

Added lines #L474 - L475 were not covered by tests
// TODO(no-order): when we remove the need to order ptp messages we
// should be able to call recv giving it a pre-allocated buffer,
// avoiding the hassle of malloc-ing and free-ing
broker.recvMessage(msg);
std::vector<uint8_t> bytes((uint8_t*)msg.dataPtr,
(uint8_t*)msg.dataPtr + msg.dataSize);

Check warning on line 481 in src/scheduler/Scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

src/scheduler/Scheduler.cpp#L479-L481

Added lines #L479 - L481 were not covered by tests
newGroupId = faabric::util::bytesToInt(bytes);
// The previous call makes a copy, so safe to free now
faabric::util::free(msg.dataPtr);

Check warning on line 484 in src/scheduler/Scheduler.cpp

View check run for this annotation

Codecov / codecov/patch

src/scheduler/Scheduler.cpp#L484

Added line #L484 was not covered by tests
} else {
// In some settings, like tests, we already know the new group id, so
// we can set it here (and in fact, we need to do so when faking two
Expand Down
1 change: 1 addition & 0 deletions src/transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ faabric_lib(transport
MessageEndpointServer.cpp
PointToPointBroker.cpp
PointToPointClient.cpp
PointToPointMessage.cpp
PointToPointServer.cpp
)

Expand Down
1 change: 1 addition & 0 deletions src/transport/MessageEndpointClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ void MessageEndpointClient::asyncSend(int header,
sequenceNum);
}

// TODO: consider making an iovec-style scatter/gather alternative signature
void MessageEndpointClient::asyncSend(int header,
const uint8_t* buffer,
size_t bufferSize,
Expand Down
Loading

0 comments on commit e0099ec

Please sign in to comment.