Skip to content

Commit

Permalink
Merge pull request #55 from permaweb/feat/dev_json_iface
Browse files Browse the repository at this point in the history
feat: implements the JSON interface device
  • Loading branch information
samcamwilliams authored Dec 25, 2024
2 parents daeb741 + 405f7a4 commit 39e0300
Show file tree
Hide file tree
Showing 11 changed files with 377 additions and 186 deletions.
196 changes: 129 additions & 67 deletions src/dev_json_iface.erl
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
%%% @doc A device that provides a way for WASM execution to interact with
%%% the HyperBEAM (and AO) systems, using JSON as a shared data representation.
%%%
%%% The interface is extremely basic. It works as follows:
%%% The interface is easy to use. It works as follows:
%%%
%%% 1. The device is given a message that contains a process definition, WASM
%%% environment, and a message that contains the data to be processed,
%%% including the image to be used in part of `execute{pass=1}'.
%%% 2. The device is called with `execute{pass=2}', which reads the result of
%%% the process execution from the WASM environment and returns it as a
%%% the process execution from the WASM environment and adds it to the
%%% message.
%%%
%%% The device has the following requirements and interface:
%%% ```
%%% M1/Computed when M2/Pass == 1 ->
%%% M1/Computed when /Pass == 1 ->
%%% Assumes:
%%% M1/priv/WASM/Port
%%% M1/Process
%%% M2/Message
%%% M2/Message/Image
%%% M2/Image
%%% M2/Block-Height
%%% M2/Assignment/Block-Height
%%% Generates:
%%% /WASM/Handler
%%% /WASM/Params
Expand All @@ -37,13 +35,17 @@
%%% /Results/Data'''

-module(dev_json_iface).
-export([computed/3]).
-export([init/3, compute/3]).
-include("include/hb.hrl").
-include_lib("eunit/include/eunit.hrl").

%% @doc Initialize the device.
init(M1, _M2, _Opts) ->
{ok, hb_converge:set(M1, #{<<"WASM-Function">> => <<"handle">>})}.

%% @doc On first pass prepare the call, on second pass get the results.
computed(M1, M2, Opts) ->
case hb_converge:get(pass, M1, M2) of
compute(M1, M2, Opts) ->
case hb_converge:get(<<"Pass">>, M1, Opts) of
1 -> prep_call(M1, M2, Opts);
2 -> results(M1, M2, Opts);
_ -> {ok, M1}
Expand All @@ -52,15 +54,17 @@ computed(M1, M2, Opts) ->
%% @doc Prepare the WASM environment for execution by writing the process string and
%% the message as JSON representations into the WASM environment.
prep_call(M1, M2, Opts) ->
Port = hb_converge:get(<<"priv/WASM/Port">>, M1, Opts),
Process = hb_converge:get(<<"Process">>, M1, Opts),
Assignment = hb_converge:get(<<"Assignment">>, M2, Opts),
Message = hb_converge:get(<<"Message">>, M2, Opts),
Image = hb_converge:get(<<"Image">>, Message, Opts),
Port = hb_private:get(<<"priv/WASM/Port">>, M1, Opts),
Process = hb_converge:get(<<"Process">>, M1, Opts, #{ hashpath => ignore }),
Assignment = hb_converge:get(<<"Assignment">>, M2, Opts#{ hashpath => ignore }),
Message = hb_converge:get(<<"Message">>, M2, Opts#{ hashpath => ignore }),
Image = hb_converge:get(<<"Process/WASM-Image">>, M1, Opts),
BlockHeight = hb_converge:get(<<"Block-Height">>, Assignment, Opts),
RawMsgJson =
ar_bundles:item_to_json_struct(hb_message:message_to_tx(Message)),
{Props} = jiffy:decode(RawMsgJson),
ar_bundles:item_to_json_struct(
hb_message:message_to_tx(Message)
),
{Props} = RawMsgJson,
MsgJson = jiffy:encode({
Props ++
[
Expand All @@ -86,8 +90,8 @@ prep_call(M1, M2, Opts) ->
hb_converge:set(
M1,
#{
<<"WASM/Handler">> => <<"handle">>,
<<"WASM/Params">> => [MsgJsonPtr, ProcessJsonPtr]
<<"WASM-Function">> => <<"handle">>,
<<"WASM-Params">> => [MsgJsonPtr, ProcessJsonPtr]
},
Opts
)
Expand All @@ -96,10 +100,10 @@ prep_call(M1, M2, Opts) ->
%% @doc Read the computed results out of the WASM environment, assuming that
%% the environment has been set up by `prep_call/3' and that the WASM executor
%% has been called with `computed{pass=1}'.
results(M1, M2, Opts) ->
Port = hb_converge:get(<<"priv/WASM/Port">>, M1, Opts),
Type = hb_converge:get(<<"Results/WASM/Type">>, M2, Opts),
Proc = hb_converge:get(<<"Process">>, M2, Opts),
results(M1, _M2, Opts) ->
Port = hb_private:get(<<"priv/WASM/Port">>, M1, Opts),
Type = hb_converge:get(<<"Results/WASM/Type">>, M1, Opts),
Proc = hb_converge:get(<<"Process">>, M1, Opts),
case hb_converge:to_key(Type) of
error ->
{error,
Expand All @@ -116,67 +120,125 @@ results(M1, M2, Opts) ->
)
};
ok ->
[Ptr] = hb_converge:get(<<"WASM/Results/Body">>, M1, Opts),
[Ptr] = hb_converge:get(<<"Results/WASM/Output">>, M1, Opts),
{ok, Str} = hb_beamr_io:read_string(Port, Ptr),
Wallet = hb_opts:get(wallet, no_viable_wallet, Opts),
try jiffy:decode(Str, [return_maps]) of
#{<<"ok">> := true, <<"response">> := Resp} ->
% TODO: Handle all JSON interface output types.
#{
<<"Output">> := #{<<"data">> := Data},
<<"Messages">> := Messages
} = Resp,
hb_converge:set(
M1,
#{
<<"Results/Outbox">> =>
[
{
list_to_binary(integer_to_list(MessageNum)),
ar_bundles:sign_item(
ar_bundles:json_struct_to_item(
preprocess_results(Msg, Proc, Opts)
),
Wallet
)
}
||
{MessageNum, Msg} <-
lists:zip(lists:seq(1, length(Messages)), Messages)
],
<<"Results/Data">> => Data
},
Opts
)
Res =
hb_converge:set(
M1,
#{
<<"Results/Outbox">> =>
maps:from_list([
{MessageNum, preprocess_results(Msg, Proc, Opts)}
||
{MessageNum, Msg} <-
lists:zip(lists:seq(1, length(Messages)), Messages)
]),
<<"Results/Data">> => Data
},
Opts
),
{ok, Res}
catch
_:_ ->
hb_converge:set(
M1,
#{
<<"Results/Outbox">> => undefined,
<<"Results/Body">> => <<"JSON error parsing WASM result output.">>
},
Opts
)
{error,
hb_converge:set(
M1,
#{
<<"Results/Outbox">> => undefined,
<<"Results/Body">> => <<"JSON error parsing WASM result output.">>
},
Opts
)
}
end
end.

%% NOTE: After the process returns messages from an evaluation, the signing unit needs to add
%% some tags to each message, and spawn to help the target process know these messages are created
%% by a process.
%% @doc After the process returns messages from an evaluation, the
%% signing node needs to add some tags to each message and spawn such that
%% the target process knows these messages are created by a process.
preprocess_results(Msg, Proc, Opts) ->
{ok, FilteredMsg} =
hb_converge:resolve(Msg,
#{
path => remove,
items => [<<"From-Process">>, <<"From-Image">>, <<"Anchor">>]
},
Opts
RawTags = maps:get(<<"Tags">>, Msg, []),
TagList =
[
{maps:get(<<"name">>, Tag), maps:get(<<"value">>, Tag)}
||
Tag <- RawTags ],
Tags = maps:from_list(TagList),
FilteredMsg =
maps:without(
[<<"From-Process">>, <<"From-Image">>, <<"Anchor">>, <<"Tags">>],
Msg
),
hb_converge:set(
maps:merge(
FilteredMsg,
#{
Tags#{
<<"From-Process">> => hb_util:id(Proc, signed),
<<"From-Image">> => element(2, lists:keyfind(<<"Image">>, 1, Proc#tx.tags))
<<"From-Image">> => hb_converge:get(<<"WASM-Image">>, Proc, Opts)
}
).

%%% Tests

test_init() ->
application:ensure_all_started(hb).

generate_stack(File) ->
test_init(),
Wallet = hb:wallet(),
Msg0 = dev_wasm:store_wasm_image(File),
Image = maps:get(<<"WASM-Image">>, Msg0),
Msg1 = Msg0#{
device => <<"Stack/1.0">>,
<<"Device-Stack">> =>
[
<<"WASI/1.0">>,
<<"JSON-Iface/1.0">>,
<<"WASM-64/1.0">>,
<<"Multipass/1.0">>
],
<<"Passes">> => 2,
<<"Stack-Keys">> => [<<"Init">>, <<"Compute">>],
<<"Process">> =>
hb_message:sign(#{
<<"Type">> => <<"Process">>,
<<"WASM-Image">> => Image,
<<"Scheduler">> => hb:address(),
<<"Authority">> => hb:address()
}, Wallet)
},
{ok, Msg2} = hb_converge:resolve(Msg1, <<"Init">>, #{}),
Msg2.

generate_aos_msg(ProcID, Code) ->
Wallet = hb:wallet(),
#{
path => <<"Compute">>,
<<"Message">> =>
hb_message:sign(#{
<<"Action">> => <<"Eval">>,
data => Code,
target => ProcID
}, Wallet),
<<"Assignment">> =>
hb_message:sign(#{ <<"Block-Height">> => 1 }, Wallet)
}.

basic_aos_call_test() ->
Msg = generate_stack("test/aos-2-pure-xs.wasm"),
Proc = hb_converge:get(<<"Process">>, Msg, #{ hashpath => ignore }),
ProcID = hb_converge:get(id, Proc, #{}),
{ok, Msg3} =
hb_converge:resolve(
Msg,
generate_aos_msg(ProcID, <<"return 1+1">>),
#{}
),
Data = hb_converge:get(<<"Results/Data">>, Msg3, #{}),
?assertEqual(<<"2">>, Data).
28 changes: 21 additions & 7 deletions src/dev_message.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@
%%% in the message's underlying Erlang map. Private keys (`priv[.*]') are
%%% not included.
-module(dev_message).
-export([info/0, keys/1, id/1, unsigned_id/1, signers/1]).
-export([info/0, keys/1, id/1, unsigned_id/1, signed_id/1, signers/1]).
-export([set/3, remove/2, get/2, get/3]).
-include_lib("eunit/include/eunit.hrl").
-include("include/hb.hrl").



%% The list of keys that are exported by this device.
-define(DEVICE_KEYS, [path, id, unsigned_id, signers, keys, get, set, remove]).
-define(DEVICE_KEYS, [
path,
id,
unsigned_id,
signed_id,
signers,
keys, get,
set,
remove
]).

%% @doc Return the info for the identity device.
info() ->
Expand All @@ -23,18 +30,25 @@ info() ->
%% that. Otherwise, return the signed ID.
id(M) ->
ID =
case maps:get(signature, M, ?DEFAULT_SIG) of
?DEFAULT_SIG -> raw_id(M, unsigned);
case get(signature, M) of
{error, not_found} -> raw_id(M, unsigned);
_ -> raw_id(M, signed)
end,
?event({generated_id, {id, ID}, {msg, M}}),
{ok, ID}.

%% @doc Wrap a call to the `hb_util:id/2' function, which returns the
%% unsigned ID of a message.
unsigned_id(M) ->
{ok, raw_id(M, unsigned)}.

%% @doc Return the signed ID of a message.
signed_id(M) ->
try
{ok, raw_id(M, signed)}
catch
_:_ -> {error, not_signed}
end.

%% @doc Encode an ID in any format to a normalized, b64u 43 character binary.
raw_id(Item) -> raw_id(Item, unsigned).
raw_id(TX, Type) when is_record(TX, tx) ->
Expand Down
39 changes: 27 additions & 12 deletions src/dev_multipass.erl
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
%%% @doc A device that triggers repass events until a certain counter has been
%%% reached. This is useful for certain types of stacks that need various
%%% execution passes to be completed in sequence across devices.
-module(dev_multipass).
-export([init/2, execute/2, uses/0]).
-export([init/3, compute/3]).
-include_lib("eunit/include/eunit.hrl").

%%% A device that triggers repass events until a certain counter has been reached.
%%% This is useful for certain types of stacks that need various execution passes
%%% to be completed in sequence across devices.
init(M1, _M2, _Opts) ->
{ok, M1}.

init(S, Params) ->
{<<"Passes">>, Passes} = lists:keyfind(<<"Passes">>, 1, Params),
{ok, S#{ pass => 1, passes => binary_to_integer(Passes) }}.
compute(Msg1, _Msg2, Opts) ->
Passes = hb_converge:get(<<"Passes">>, Msg1, 1, Opts),
Pass = hb_converge:get(<<"Pass">>, Msg1, 1, Opts),
case Pass < Passes of
true ->
{pass, Msg1};
false ->
{ok, Msg1}
end.

execute(_, S = #{ pass := Pass, passes := Passes }) when Pass < Passes ->
{pass, S};
execute(_, S) ->
{ok, S}.
%%% Tests

uses() -> all.
basic_multipass_test() ->
Msg1 =
#{
<<"device">> => <<"Multipass/1.0">>,
<<"Passes">> => 2,
<<"Pass">> => 1
},
Msg2 = Msg1#{ <<"Pass">> => 2 },
?assertMatch({pass, _}, hb_converge:resolve(Msg1, <<"Compute">>, #{})),
?assertMatch({ok, _}, hb_converge:resolve(Msg2, <<"Compute">>, #{})).
Loading

0 comments on commit 39e0300

Please sign in to comment.