Skip to content

Commit

Permalink
Updates and a test for client streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
ccifra committed Dec 10, 2021
1 parent b039908 commit e2731cb
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 38 deletions.
Binary file not shown.
Binary file not shown.
Binary file modified labview source/gRPC lv Support/Client API/Close Client.vi
Binary file not shown.
Binary file modified labview source/gRPC lv Support/Client API/Create Client.vi
Binary file not shown.
Binary file not shown.
2 changes: 2 additions & 0 deletions labview source/gRPC lv Support/gprc-lvsupport.lvlib
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
<Item Name="Client Begin Bidirectional Streaming Call.vi" Type="VI" URL="../Client API/Client Begin Bidirectional Streaming Call.vi"/>
<Item Name="Client Begin Client Streaming Call.vi" Type="VI" URL="../Client API/Client Begin Client Streaming Call.vi"/>
<Item Name="Client Begin Server Streaming Call.vim" Type="VI" URL="../Client API/Client Begin Server Streaming Call.vim"/>
<Item Name="Client Complete Client Streaming Call.vim" Type="VI" URL="../Client API/Client Complete Client Streaming Call.vim"/>
<Item Name="Client Complete Streaming Call.vi" Type="VI" URL="../Client API/Client Complete Streaming Call.vi"/>
<Item Name="Client Read From Stream.vim" Type="VI" URL="../Client API/Client Read From Stream.vim"/>
<Item Name="Client Unary Call.vim" Type="VI" URL="../Client API/Client Unary Call.vim"/>
<Item Name="Client Write To Stream.vim" Type="VI" URL="../Client API/Client Write To Stream.vim"/>
<Item Name="Client Writes Complete.vi" Type="VI" URL="../Client API/Client Writes Complete.vi"/>
<Item Name="Close Client.vi" Type="VI" URL="../Client API/Close Client.vi"/>
<Item Name="Create Client.vi" Type="VI" URL="../Client API/Create Client.vi"/>
</Item>
Expand Down
137 changes: 108 additions & 29 deletions src/grpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,23 @@ namespace grpc_labview
{
}

//---------------------------------------------------------------------
//---------------------------------------------------------------------
void ClientCall::Finish()
{
}

//---------------------------------------------------------------------
//---------------------------------------------------------------------
ServerStreamingClientCall::~ServerStreamingClientCall()
{
_reader->Finish();
}

//---------------------------------------------------------------------
//---------------------------------------------------------------------
void ServerStreamingClientCall::Finish()
{
_status = _reader->Finish();
}

//---------------------------------------------------------------------
Expand All @@ -59,23 +71,59 @@ namespace grpc_labview
//---------------------------------------------------------------------
ClientStreamingClientCall::~ClientStreamingClientCall()
{
_writer->Finish();
}

//---------------------------------------------------------------------
//---------------------------------------------------------------------
void ClientStreamingClientCall::Finish()
{
WritesComplete();
_status = _writer->Finish();
}

//---------------------------------------------------------------------
//---------------------------------------------------------------------
void ClientStreamingClientCall::WritesComplete()
{
if (!_writesComplete)
{
_writesComplete = true;
_writer->WritesDone();
}
}

//---------------------------------------------------------------------
//---------------------------------------------------------------------
bool ClientStreamingClientCall::Write(LVMessage* message)
{
return _writer->Write(*message);
return _writer->Write(*message);
}

//---------------------------------------------------------------------
//---------------------------------------------------------------------
BidiStreamingClientCall::~BidiStreamingClientCall()
{
_readerWriter->Finish();
}

//---------------------------------------------------------------------
//---------------------------------------------------------------------
void BidiStreamingClientCall::Finish()
{
WritesComplete();
_status = _readerWriter->Finish();
}

//---------------------------------------------------------------------
//---------------------------------------------------------------------
void BidiStreamingClientCall::WritesComplete()
{
if (!_writesComplete)
{
_writesComplete = true;
_readerWriter->WritesDone();
}
}

//---------------------------------------------------------------------
//---------------------------------------------------------------------
bool BidiStreamingClientCall::Read(LVMessage* message)
Expand Down Expand Up @@ -138,19 +186,19 @@ LIBRARY_EXPORT int32_t ClientUnaryCall(grpc_labview::gRPCid* clientId, grpc_labv

auto clientCall = new grpc_labview::ClientCall();
*callId = clientCall;
clientCall->occurrence = *occurrence;
clientCall->request = std::make_shared<grpc_labview::LVMessage>(requestMetadata);
clientCall->response = std::make_shared<grpc_labview::LVMessage>(responseMetadata);
clientCall->_occurrence = *occurrence;
clientCall->_request = std::make_shared<grpc_labview::LVMessage>(requestMetadata);
clientCall->_response = std::make_shared<grpc_labview::LVMessage>(responseMetadata);

grpc_labview::ClusterDataCopier::CopyFromCluster(*clientCall->request.get(), requestCluster);
grpc_labview::ClusterDataCopier::CopyFromCluster(*clientCall->_request.get(), requestCluster);

clientCall->_runFuture = std::async(
std::launch::async,
[=]()
{
grpc::internal::RpcMethod method(methodName, grpc::internal::RpcMethod::NORMAL_RPC);
clientCall->status = grpc::internal::BlockingUnaryCall(client->Channel.get(), method, &clientCall->context, *clientCall->request.get(), clientCall->response.get());
grpc_labview::SignalOccurrence(clientCall->occurrence);
clientCall->_status = grpc::internal::BlockingUnaryCall(client->Channel.get(), method, &clientCall->_context, *clientCall->_request.get(), clientCall->_response.get());
grpc_labview::SignalOccurrence(clientCall->_occurrence);
return 0;
});
return 0;
Expand All @@ -167,16 +215,16 @@ LIBRARY_EXPORT int32_t CompleteClientUnaryCall2(grpc_labview::gRPCid* callId, in
}

int32_t result = 0;
if (call->status.ok())
if (call->_status.ok())
{
grpc_labview::ClusterDataCopier::CopyToCluster(*call->response.get(), responseCluster);
grpc_labview::ClusterDataCopier::CopyToCluster(*call->_response.get(), responseCluster);
}
else
{
result = -(1000 + call->status.error_code());
result = -(1000 + call->_status.error_code());
if (errorMessage != nullptr)
{
grpc_labview::SetLVString(errorMessage, call->status.error_message());
grpc_labview::SetLVString(errorMessage, call->_status.error_message());
}
if (errorDetailsCluster != nullptr)
{
Expand Down Expand Up @@ -215,11 +263,11 @@ LIBRARY_EXPORT int32_t ClientBeginClientStreamingCall(grpc_labview::gRPCid* clie

auto clientCall = new grpc_labview::ClientStreamingClientCall();
*callId = clientCall;
clientCall->request = std::make_shared<grpc_labview::LVMessage>(requestMetadata);
clientCall->response = std::make_shared<grpc_labview::LVMessage>(responseMetadata);
clientCall->_request = std::make_shared<grpc_labview::LVMessage>(requestMetadata);
clientCall->_response = std::make_shared<grpc_labview::LVMessage>(responseMetadata);

grpc::internal::RpcMethod method(methodName, grpc::internal::RpcMethod::CLIENT_STREAMING);
auto writer = grpc_impl::internal::ClientWriterFactory<grpc_labview::LVMessage>::Create(client->Channel.get(), method, &clientCall->context, clientCall->response.get());
auto writer = grpc_impl::internal::ClientWriterFactory<grpc_labview::LVMessage>::Create(client->Channel.get(), method, &clientCall->_context, clientCall->_response.get());
clientCall->_writer = std::shared_ptr<grpc_impl::ClientWriterInterface<grpc_labview::LVMessage>>(writer);
return 0;
}
Expand All @@ -246,13 +294,13 @@ LIBRARY_EXPORT int32_t ClientBeginServerStreamingCall(grpc_labview::gRPCid* clie

auto clientCall = new grpc_labview::ServerStreamingClientCall();
*callId = clientCall;
clientCall->request = std::make_shared<grpc_labview::LVMessage>(requestMetadata);
clientCall->response = std::make_shared<grpc_labview::LVMessage>(responseMetadata);
clientCall->_request = std::make_shared<grpc_labview::LVMessage>(requestMetadata);
clientCall->_response = std::make_shared<grpc_labview::LVMessage>(responseMetadata);

grpc_labview::ClusterDataCopier::CopyFromCluster(*clientCall->request.get(), requestCluster);
grpc_labview::ClusterDataCopier::CopyFromCluster(*clientCall->_request.get(), requestCluster);

grpc::internal::RpcMethod method(methodName, grpc::internal::RpcMethod::SERVER_STREAMING);
auto reader = grpc_impl::internal::ClientReaderFactory<grpc_labview::LVMessage>::Create<grpc_labview::LVMessage>(client->Channel.get(), method, &clientCall->context, *clientCall->request.get());
auto reader = grpc_impl::internal::ClientReaderFactory<grpc_labview::LVMessage>::Create<grpc_labview::LVMessage>(client->Channel.get(), method, &clientCall->_context, *clientCall->_request.get());
clientCall->_reader = std::shared_ptr<grpc_impl::ClientReader<grpc_labview::LVMessage>>(reader);
return 0;
}
Expand All @@ -279,11 +327,11 @@ LIBRARY_EXPORT int32_t ClientBeginBidiStreamingCall(grpc_labview::gRPCid* client

auto clientCall = new grpc_labview::BidiStreamingClientCall();
*callId = clientCall;
clientCall->request = std::make_shared<grpc_labview::LVMessage>(requestMetadata);
clientCall->response = std::make_shared<grpc_labview::LVMessage>(responseMetadata);
clientCall->_request = std::make_shared<grpc_labview::LVMessage>(requestMetadata);
clientCall->_response = std::make_shared<grpc_labview::LVMessage>(responseMetadata);

grpc::internal::RpcMethod method(methodName, grpc::internal::RpcMethod::BIDI_STREAMING);
auto readerWriter = grpc_impl::internal::ClientReaderWriterFactory<grpc_labview::LVMessage, grpc_labview::LVMessage>::Create(client->Channel.get(), method, &clientCall->context);
auto readerWriter = grpc_impl::internal::ClientReaderWriterFactory<grpc_labview::LVMessage, grpc_labview::LVMessage>::Create(client->Channel.get(), method, &clientCall->_context);
clientCall->_readerWriter = std::shared_ptr<grpc_impl::ClientReaderWriterInterface<grpc_labview::LVMessage, grpc_labview::LVMessage>>(readerWriter);
return 0;
}
Expand All @@ -300,8 +348,8 @@ LIBRARY_EXPORT int32_t ClientBeginReadFromStream(grpc_labview::gRPCid* callId, g
std::launch::async,
[=]()
{
call->response->Clear();
auto result = reader->Read(call->response.get());
call->_response->Clear();
auto result = reader->Read(call->_response.get());
grpc_labview::SignalOccurrence(occurrence);
return result;
});
Expand All @@ -323,7 +371,7 @@ LIBRARY_EXPORT int32_t ClientCompleteReadFromStream(grpc_labview::gRPCid* callId
*success = reader->_readFuture.get();
if (reader->_readFuture.get())
{
grpc_labview::ClusterDataCopier::CopyToCluster(*call->response.get(), responseCluster);
grpc_labview::ClusterDataCopier::CopyToCluster(*call->_response.get(), responseCluster);
}
return 0;
}
Expand All @@ -342,8 +390,38 @@ LIBRARY_EXPORT int32_t ClientWriteToStream(grpc_labview::gRPCid* callId, int8_t*
{
return -1;
}
grpc_labview::ClusterDataCopier::CopyFromCluster(*clientCall->request.get(), requestCluster);
*success = writer->Write(clientCall->request.get());
grpc_labview::ClusterDataCopier::CopyFromCluster(*clientCall->_request.get(), requestCluster);
*success = writer->Write(clientCall->_request.get());
return 0;
}

//---------------------------------------------------------------------
//---------------------------------------------------------------------
LIBRARY_EXPORT int32_t ClientWritesComplete(grpc_labview::gRPCid* callId)
{
auto writer = callId->CastTo<grpc_labview::StreamWriter>();
if (!writer)
{
return -1;
}
writer->WritesComplete();
}

//---------------------------------------------------------------------
//---------------------------------------------------------------------
LIBRARY_EXPORT int32_t ClientCompleteClientStreamingCall(grpc_labview::gRPCid* callId, int8_t* responseCluster, grpc_labview::LStrHandle* errorMessage, grpc_labview::AnyCluster* errorDetailsCluster)
{
auto call = callId->CastTo<grpc_labview::ClientCall>();
if (!call)
{
return -1;
}
call->Finish();
if (call->_status.ok())
{
grpc_labview::ClusterDataCopier::CopyToCluster(*call->_response.get(), responseCluster);
}
delete call;
return 0;
}

Expand All @@ -356,6 +434,7 @@ LIBRARY_EXPORT int32_t ClientCompleteStreamingCall(grpc_labview::gRPCid* callId,
{
return -1;
}
call->Finish();
delete call;
return 0;
}
27 changes: 18 additions & 9 deletions src/grpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ namespace grpc_labview
{
public:
virtual ~ClientCall();
virtual void Finish();

public:
MagicCookie occurrence;
grpc::ClientContext context;
std::shared_ptr<LVMessage> request;
std::shared_ptr<LVMessage> response;
grpc::Status status;
MagicCookie _occurrence;
grpc::ClientContext _context;
std::shared_ptr<LVMessage> _request;
std::shared_ptr<LVMessage> _response;
grpc::Status _status;
std::future<int> _runFuture;
};

Expand All @@ -51,6 +52,7 @@ namespace grpc_labview
{
public:
virtual bool Write(LVMessage* message) = 0;
virtual void WritesComplete() = 0;
};

//---------------------------------------------------------------------
Expand All @@ -71,7 +73,7 @@ namespace grpc_labview
public:
~ServerStreamingClientCall() override;
bool Read(LVMessage* message) override;

void Finish() override;
public:
std::shared_ptr<grpc_impl::ClientReaderInterface<grpc_labview::LVMessage>> _reader;
};
Expand All @@ -82,11 +84,14 @@ namespace grpc_labview
{
public:
~ClientStreamingClientCall();

void Finish() override;
bool Write(LVMessage* message) override;
void WritesComplete() override;

public:
std::shared_ptr<grpc_impl::ClientWriterInterface<grpc_labview::LVMessage>> _writer;

private:
bool _writesComplete;
};

//---------------------------------------------------------------------
Expand All @@ -95,10 +100,14 @@ namespace grpc_labview
{
public:
~BidiStreamingClientCall();

void Finish() override;
void WritesComplete() override;
bool Read(LVMessage* message) override;
bool Write(LVMessage* message) override;

public:
std::shared_ptr<grpc_impl::ClientReaderWriterInterface<grpc_labview::LVMessage, grpc_labview::LVMessage>> _readerWriter;
private:
bool _writesComplete;
};
}
16 changes: 16 additions & 0 deletions src/test_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class DataMarshalTestServer final : public queryserver::QueryServer::Service
grpc::Status Invoke(grpc::ServerContext* context, const queryserver::InvokeRequest* request, queryserver::InvokeResponse* response) override;
grpc::Status Query(grpc::ServerContext* context, const queryserver::QueryRequest* request, queryserver::QueryResponse* response) override;
grpc::Status Register(grpc::ServerContext* context, const queryserver::RegistrationRequest* request, grpc::ServerWriter<queryserver::ServerEvent>* writer) override;
grpc::Status ClientStream(grpc::ServerContext* context, grpc::ServerReader<::queryserver::RegistrationRequest>* reader, queryserver::ServerEvent* response) override;
};

//---------------------------------------------------------------------
Expand Down Expand Up @@ -60,6 +61,21 @@ grpc::Status DataMarshalTestServer::Register(grpc::ServerContext* context, const
return Status::OK;
}

//---------------------------------------------------------------------
//---------------------------------------------------------------------
grpc::Status DataMarshalTestServer::ClientStream(grpc::ServerContext* context, grpc::ServerReader<::queryserver::RegistrationRequest>* reader, queryserver::ServerEvent* response)
{
queryserver::RegistrationRequest request;
int count = 0;
while (reader->Read(&request))
{
count += 1;
}
response->set_status(count);
response->set_eventdata("Test Complete");
return Status::OK;
}

//---------------------------------------------------------------------
//---------------------------------------------------------------------
std::string GetServerAddress(int argc, char** argv)
Expand Down
Binary file added tests/Client/TestClientStreaming.vi
Binary file not shown.
Binary file modified tests/Client/TestServerStreaming.vi
Binary file not shown.
2 changes: 2 additions & 0 deletions tests/Protos/data_marshal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ service QueryServer {
rpc Invoke (InvokeRequest) returns (InvokeResponse) {}
rpc Query (QueryRequest) returns (QueryResponse) {}
rpc Register (RegistrationRequest) returns (stream ServerEvent) {}
rpc ClientStream (stream RegistrationRequest) returns (ServerEvent) {}
rpc BidiStream (stream RegistrationRequest) returns (ServerEvent) {}

rpc TestDataTypes(TestDataTypesParameters) returns (TestDataTypesParameters) {}
}
Expand Down
1 change: 1 addition & 0 deletions tests/Tests.lvproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<Property Name="server.vi.propertiesEnabled" Type="Bool">true</Property>
<Property Name="specify.custom.address" Type="Bool">false</Property>
<Item Name="Client" Type="Folder">
<Item Name="TestClientStreaming.vi" Type="VI" URL="../Client/TestClientStreaming.vi"/>
<Item Name="TestServerStreaming.vi" Type="VI" URL="../Client/TestServerStreaming.vi"/>
</Item>
<Item Name="Protobuf" Type="Folder">
Expand Down

0 comments on commit e2731cb

Please sign in to comment.