Skip to content

Commit

Permalink
Merge branch 'main' into ir_fix_filter
Browse files Browse the repository at this point in the history
  • Loading branch information
longbinlai authored Jun 3, 2024
2 parents 563ce02 + a93146b commit 8ec7125
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 28 deletions.
2 changes: 1 addition & 1 deletion flex/engines/http_server/handler/hqps_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ bool hqps_http_handler::is_running() const { return running_.load(); }
bool hqps_http_handler::is_actors_running() const {
return !ic_handler_->is_current_scope_cancelled() &&
!adhoc_query_handler_->is_current_scope_cancelled() &&
proc_handler_->is_current_scope_cancelled();
!proc_handler_->is_current_scope_cancelled();
}

void hqps_http_handler::start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ Result<String> updateProcedure(

Result<IrResult.CollectiveResults> callProcedure(String graphId, QueryRequest request);

Result<String> callProcedureRaw(String graphId, String request);
Result<byte[]> callProcedureRaw(String graphId, byte[] request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.protobuf.InvalidProtocolBufferException;

import java.io.Closeable;
import java.nio.charset.StandardCharsets;
import java.util.List;

/***
Expand Down Expand Up @@ -317,14 +316,11 @@ public Result<String> updateProcedure(
}
}

private String encodeString(String jsonStr, int lastByte) {
private byte[] encodeString(String jsonStr, int lastByte) {
byte[] bytes = new byte[jsonStr.length() + 1];
// copy string to byte array
for (int i = 0; i < jsonStr.length(); i++) {
bytes[i] = (byte) jsonStr.charAt(i);
}
System.arraycopy(jsonStr.getBytes(), 0, bytes, 0, jsonStr.length());
bytes[jsonStr.length()] = (byte) lastByte;
return new String(bytes, StandardCharsets.UTF_8);
return bytes;
}

@Override
Expand All @@ -334,15 +330,14 @@ public Result<IrResult.CollectiveResults> callProcedure(
// Interactive currently support four type of inputformat, see
// flex/engines/graph_db/graph_db_session.h
// Here we add byte of value 1 to denote the input format is in JSON format.
String encodedStr = encodeString(request.toJson(), 1);
ApiResponse<String> response = queryApi.procCallWithHttpInfo(graphName, encodedStr);
byte[] encodedStr = encodeString(request.toJson(), 1);
ApiResponse<byte[]> response = queryApi.procCallWithHttpInfo(graphName, encodedStr);
if (response.getStatusCode() != 200) {
return Result.fromException(
new ApiException(response.getStatusCode(), response.getData()));
new ApiException(response.getStatusCode(), "Failed to call procedure"));
}
IrResult.CollectiveResults results =
IrResult.CollectiveResults.parseFrom(
response.getData().getBytes(StandardCharsets.UTF_8));
IrResult.CollectiveResults.parseFrom(response.getData());
return new Result<>(results);
} catch (ApiException e) {
e.printStackTrace();
Expand All @@ -354,19 +349,21 @@ public Result<IrResult.CollectiveResults> callProcedure(
}

@Override
public Result<String> callProcedureRaw(String graphName, String request) {
public Result<byte[]> callProcedureRaw(String graphName, byte[] request) {
try {
// Interactive currently support four type of inputformat, see
// flex/engines/graph_db/graph_db_session.h
// Here we add byte of value 0 to denote the input format is in raw encoder/decoder
// format.
String encodedStr = encodeString(request, 0);
ApiResponse<String> response = queryApi.procCallWithHttpInfo(graphName, encodedStr);
byte[] encodedStr = new byte[request.length + 1];
System.arraycopy(request, 0, encodedStr, 0, request.length);
encodedStr[request.length] = 0;
ApiResponse<byte[]> response = queryApi.procCallWithHttpInfo(graphName, encodedStr);
if (response.getStatusCode() != 200) {
return Result.fromException(
new ApiException(response.getStatusCode(), response.getData()));
new ApiException(response.getStatusCode(), "Failed to call procedure"));
}
return new Result<String>(response.getData());
return new Result<byte[]>(response.getData());
} catch (ApiException e) {
e.printStackTrace();
return Result.fromException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,7 @@ public void test9CallCppProcedure2() {
Encoder encoder = new Encoder(bytes);
encoder.put_int(1);
encoder.put_byte((byte) 1); // Assume the procedure index is 1
String encoded = new String(bytes);
Result<String> resp = session.callProcedureRaw(graphId, encoded);
Result<byte[]> resp = session.callProcedureRaw(graphId, bytes);
assertOk(resp);
}

Expand Down
8 changes: 4 additions & 4 deletions flex/interactive/sdk/python/interactive_sdk/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from pydantic import Field, StrictStr

from interactive_sdk.client.status import Status
from interactive_sdk.openapi.api.admin_service_graph_management_api import (
AdminServiceGraphManagementApi,
)
Expand Down Expand Up @@ -523,11 +524,10 @@ def call_procedure(
)
result = CollectiveResults()
if response.status_code == 200:
byte_data = response.data.encode('utf-8')
result.ParseFromString(byte_data)
return Result.from_response(response)
result.ParseFromString(response.data)
return Result.ok(result)
else:
return Result.from_response(result)
return Result(Status.from_response(response), result)
except Exception as e:
return Result.from_exception(e)

Expand Down
25 changes: 22 additions & 3 deletions flex/openapi/openapi_interactive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -909,13 +909,15 @@ paths:
text/plain:
schema:
type: string
format: byte
responses:
'200':
description: Successfully runned. Empty if failed?
content:
application/json:
text/plain:
schema:
type: string
format: byte
'500':
description: Server internal error
components:
Expand Down Expand Up @@ -998,11 +1000,24 @@ components:
properties:
timestamp:
type: string
DateType:
x-body-name: date_type
type: object
required:
- date32
properties:
date32:
type: string
TemporalType:
x-body-name: temporal_type
type: object
oneOf:
- $ref: '#/components/schemas/TimeStampType'
required:
- temporal
properties:
temporal:
oneOf:
- $ref: '#/components/schemas/TimeStampType'
- $ref: '#/components/schemas/DateType'
GSDataType:
x-body-name: gs_data_type
oneOf:
Expand Down Expand Up @@ -1364,6 +1379,10 @@ components:
- BOTH_OUT_IN
sort_on_compaction:
type: boolean
oe_mutability:
type: string
ie_mutability:
type: string
CreateEdgeType:
x-body-name: create_edge_type
allOf:
Expand Down
2 changes: 2 additions & 0 deletions flex/tests/interactive/test_call_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ def callProcedureWithJsonFormat(self, graph_id : str):
if not resp.is_ok():
print("call sample_app failed: ", resp.get_status_message())
exit(1)
res = resp.get_value()
print("call sample_app result: ", res)
self.call_cypher_queries()

def callProcedureWithEncoder(self, graph_id : str):
Expand Down

0 comments on commit 8ec7125

Please sign in to comment.