diff --git a/examples/package.json b/examples/package.json index 5d1428078..be29ae77e 100644 --- a/examples/package.json +++ b/examples/package.json @@ -1,6 +1,6 @@ { "name": "yajsapi_examples", - "version": "0.1.0", + "version": "0.1.1", "description": "NodeJS API Examples for Next Golem", "repository": "https://github.com/golemfactory/yajsapi", "scripts": { @@ -15,7 +15,7 @@ "bluebird": "^3.5.0", "dayjs": "^1.8.31", "ts-node": "^9.0.0", - "yajsapi": "0.1.0-alpha.3" + "yajsapi": "file:../" }, "devDependencies": { "tsconfig-paths": "^3.9.0", diff --git a/package.json b/package.json index e5040d58e..ea7fa77fe 100755 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "yajsapi", - "version": "0.1.0-alpha.3", + "version": "0.1.1-alpha.3", "description": "NodeJS API for Next Golem", "repository": "https://github.com/golemfactory/yajsapi", "main": "dist/index.js", @@ -18,16 +18,23 @@ "axios": "^0.19.2", "bluebird": "^3.5.0", "commander": "^6.2.0", - "dayjs": "^1.9.6", + "dayjs": "~1.9.1", "js-csp": "^1.0.1", + "eccrypto": "^1.1.5", + "minimist": "^1.2.5", "rewire": "^3.0.2", + "secp256k1": "^4.0.2", + "sgx-ias-js": "~0.1.1", "tmp": "^0.2.1", + "utf8": "~3.0.0", "uuid": "^8.3.1", "winston": "^3.3.3", "ya-ts-client": "^0.2.0" }, "devDependencies": { + "@types/eccrypto": "^1.1.2", "@types/node": "14.6.2", + "@types/utf8": "^2.1.6", "ts-node": "^9.0.0", "tsconfig-paths": "^3.9.0", "typescript": "^3.9.7" diff --git a/yajsapi/crypto.ts b/yajsapi/crypto.ts new file mode 100644 index 000000000..8c758605b --- /dev/null +++ b/yajsapi/crypto.ts @@ -0,0 +1,123 @@ +import * as crypto from "crypto"; +import * as eccrypto from "eccrypto"; +import * as secp256k1 from "secp256k1"; + +export function rand_hex(length: number): string { + let byte_sz = Math.floor(length / 2); + return crypto.randomBytes(byte_sz).toString("hex"); +} + +export class PrivateKey { + inner!: Buffer; + + constructor() { + this.inner = eccrypto.generatePrivate(); + } + + static from(buffer: Buffer): PrivateKey { + let key = Object.create(this.prototype); + key.inner = buffer; + return key; + } + + static fromHex(hex: string): PublicKey { + let inner = Buffer.from(hex, "hex"); + return PublicKey.from(inner); + } + + publicKey(compressed: boolean = true): PublicKey { + let buffer = compressed + ? eccrypto.getPublicCompressed(this.inner) + : eccrypto.getPublic(this.inner); + return PublicKey.from(buffer); + } + + async derive(publicKey: PublicKey): Promise { + return await eccrypto.derive(this.inner, publicKey.inner); + } + + async sign(msg: Buffer) { + return await eccrypto.sign(this.inner, msg); + } + + toString(): string { + return this.inner.toString("hex"); + } +} + +export class PublicKey { + inner!: Buffer; + + private constructor() {} + + static from(buffer: Buffer): PublicKey { + let key = Object.create(this.prototype); + key.inner = buffer; + return key; + } + + static fromHex(hex: string): PublicKey { + let inner = Buffer.from(hex, "hex"); + return PublicKey.from(inner); + } + + toString(): string { + return this.inner.toString("hex"); + } +} + +export class CryptoCtx { + priv_key!: PrivateKey; + ephem_key!: Buffer; + + static async from(pub_key: PublicKey, priv_key?: PrivateKey): Promise { + priv_key = priv_key ? priv_key : new PrivateKey(); + let ephem_key = Buffer.from(secp256k1.ecdh(pub_key.inner, priv_key.inner)); + return new CryptoCtx(priv_key, ephem_key); + } + + private constructor(priv_key: PrivateKey, ephem_key: Buffer) { + this.priv_key = priv_key; + this.ephem_key = ephem_key; + } + + encrypt(data: Buffer): Buffer { + let iv = crypto.randomBytes(12); + let cipher = crypto.createCipheriv("aes-256-gcm", this.ephem_key, iv); + + let chunk_1 = cipher.update(data); + let chunk_2 = cipher.final(); + let tag = cipher.getAuthTag(); + + let buffer = Buffer.alloc(1 + iv.length + 1 + tag.length, 0, 'binary'); + let off = 0; + + buffer.writeUInt8(iv.length, off); + off += 1; + iv.copy(buffer, off); + off += iv.length; + buffer.writeUInt8(tag.length, off); + off += 1; + tag.copy(buffer, off); + + return Buffer.concat([buffer, chunk_1, chunk_2]); + } + + decrypt(data: Buffer): Buffer { + let off = 0; + let iv_length = data.readUInt8(off); + off += 1; + let iv = data.slice(off, off + iv_length); + off += iv_length; + let tag_length = data.readUInt8(off); + off += 1; + let tag = data.slice(off, off + tag_length); + off += tag_length; + let enc = data.slice(off); + + var cipher = crypto.createDecipheriv("aes-256-gcm", this.ephem_key, iv); + cipher.setAuthTag(tag); + + return Buffer.concat([cipher.update(enc), cipher.final()]); + } +} diff --git a/yajsapi/index.ts b/yajsapi/index.ts index 83aaccdd6..d24d89f33 100755 --- a/yajsapi/index.ts +++ b/yajsapi/index.ts @@ -1,9 +1,9 @@ import events from "events"; -import { Engine, Task, vm } from "./runner"; +import { Engine, Task, sgx, vm } from "./runner"; import { WorkContext } from "./runner/ctx"; import * as props from "./props"; import * as rest from "./rest"; import * as storage from "./storage"; import * as utils from "./utils"; -export { Engine, Task, vm, WorkContext, props, rest, storage, utils }; +export { Engine, Task, sgx, vm, WorkContext, props, rest, storage, utils }; diff --git a/yajsapi/props/inf.ts b/yajsapi/props/inf.ts index df2429237..3b47a07eb 100755 --- a/yajsapi/props/inf.ts +++ b/yajsapi/props/inf.ts @@ -1,42 +1,34 @@ import { Field, Model } from "./base"; -const INF_MEM: string = "golem.inf.mem.gib"; -const INF_STORAGE: string = "golem.inf.storage.gib"; -const INF_CORES: string = "golem.inf.cpu.cores"; -const INF_RUNTIME: string = "golem.runtime.name"; -const TRANSFER_CAPS: string = "golem.activity.caps.transfer.protocol"; +export const INF_MEM: string = "golem.inf.mem.gib"; +export const INF_STORAGE: string = "golem.inf.storage.gib"; +export const INF_CORES: string = "golem.inf.cpu.cores"; +export const INF_RUNTIME: string = "golem.runtime.name"; +export const TRANSFER_CAPS: string = "golem.activity.caps.transfer.protocol"; export enum RuntimeType { UNKNOWN = "", WASMTIME = "wasmtime", EMSCRIPTEN = "emscripten", VM = "vm", -} - -enum WasmInterface { - WASI_0 = "0", - WASI_0preview1 = "0preview1", + SGX_WASI = "sgx", + SGX_JS_SP = "sgx-js-sp", } export class InfBase { + cores: Field = new Field({ metadata: { key: INF_CORES } }); mem: Field = new Field({ metadata: { key: INF_MEM } }); runtime: Field = new Field({ metadata: { key: INF_RUNTIME } }); storage?: Field = new Field({ metadata: { key: INF_STORAGE } }); transfers: Field = new Field({ metadata: { key: TRANSFER_CAPS } }); -} -export class InfVm extends InfBase { - runtime = new Field({ - value: RuntimeType.VM, - metadata: { key: INF_RUNTIME }, - }); - cores: Field = new Field({ value: 1, metadata: { key: INF_CORES } }); + static fields(inf: InfBase, keys: string[]) { + return getFields(inf, keys); + } } -export const InfVmKeys = getFields(new InfVm(), ["mem", "storage", "runtime"]); - -function getFields(obj, keys) { +function getFields(obj: object, keys: string[]) { let fields = {}; keys.forEach((key) => { fields[key] = obj[key].metadata.key; @@ -49,24 +41,8 @@ export class ExeUnitRequest extends Model { package_url: Field = new Field({ metadata: { key: "golem.srv.comp.task_package" }, }); - constructor(package_url) { + constructor(package_url: any) { super(); this.package_url.value = package_url; } } - -export enum VmPackageFormat { - UNKNOWN = "", - GVMKIT_SQUASH = "gvmkit-squash", -} - -export class VmRequest extends ExeUnitRequest { - package_format: Field = new Field({ - metadata: { key: "golem.srv.comp.vm.package_format" }, - }); - - constructor(package_url, package_format) { - super(package_url); - this.package_format.value = package_format; - } -} diff --git a/yajsapi/rest/activity.ts b/yajsapi/rest/activity.ts index 03108835b..9c718a073 100755 --- a/yajsapi/rest/activity.ts +++ b/yajsapi/rest/activity.ts @@ -3,47 +3,145 @@ import { RequestorStateApi, } from "ya-ts-client/dist/ya-activity/api"; import * as yaa from "ya-ts-client/dist/ya-activity/src/models"; -import { sleep } from "../utils"; +import { attest, types } from "sgx-ias-js"; import { Configuration } from "ya-ts-client/dist/ya-activity"; -import { logger } from "../utils"; +import { Credentials, ExeScriptCommandResult, SgxCredentials } from "ya-ts-client/dist/ya-activity/src/models"; +import { CryptoCtx, PrivateKey, PublicKey, rand_hex } from "../crypto"; +import { sleep, logger } from "../utils"; +import { Agreement } from "./market"; +import { SGX_CONFIG } from "../runner/sgx"; +import * as utf8 from "utf8"; export class ActivityService { - private _api; - private _state; + private _api!: RequestorControlApi; + private _state!: RequestorStateApi; constructor(cfg: Configuration) { this._api = new RequestorControlApi(cfg); this._state = new RequestorStateApi(cfg); } - async new_activity(agreement_id: string): Promise { + async create_activity(agreement: Agreement, secure: boolean = false): Promise { try { - let { data: activity_id } = await this._api.createActivity(agreement_id); - let _activity = new Activity(this._api); - _activity.state = this._state; - _activity.id = activity_id; - return _activity; + if (secure) { + return await this._create_secure_activity(agreement); + } else { + return await this._create_activity(agreement.id()); + } } catch (error) { - logger.error(`Failed to create activity for agreement ${agreement_id}`); + logger.error(`Failed to create activity for agreement ${agreement.id()}`); throw error; } } + + async _create_activity(agreement_id: string): Promise { + let { data: response } = await this._api.createActivity(agreement_id); + let activity_id = typeof response == "string" + ? response + : response.activityId; + return new Activity(activity_id, this._api, this._state); + } + + async _create_secure_activity(agreement: Agreement): Promise { + let priv_key = new PrivateKey(); + let pub_key = priv_key.publicKey(); + let crypto_ctx: CryptoCtx; + + let { + data: response, + } = await this._api.createActivity({ + agreementId: agreement.id(), + requestorPubKey: pub_key.toString(), + }); + + let activity_id = typeof response == "string" + ? response + : response.activityId; + let credentials = typeof response == "string" + ? undefined + : response.credentials; + + try { + if (!credentials) { + throw Error("Missing credentials in CreateActivity response"); + } + if (pub_key.toString() != credentials.sgx.requestorPubKey) { + throw Error("Invalid requestor public key in CreateActivity response"); + } + + let enclave_key = PublicKey.fromHex(credentials.sgx.enclavePubKey); + crypto_ctx = await CryptoCtx.from(enclave_key, priv_key); + + if (SGX_CONFIG.enableAttestation) { + await this._attest(activity_id, agreement, credentials); + } + + } catch (error) { + await this._api.destroyActivity(activity_id); + throw error; + } + + return new SecureActivity( + activity_id, + credentials.sgx, + crypto_ctx, + this._api, + this._state + ); + } + + async _attest(activity_id: string, agreement: Agreement, credentials: Credentials) { + let demand = (await agreement.details()).raw_details.demand; + let pkg = demand.properties["golem.srv.comp.task_package"]; + + if (!pkg) { + throw new Error("Invalid agreement: missing package"); + } + + let evidence: attest.AttestationResponse = { + report: credentials.sgx.iasReport, + signature: types.parseHex(credentials.sgx.iasSig), + }; + let verifier = attest.AttestationVerifier.from(evidence) + .data(types.parseHex(credentials.sgx.requestorPubKey)) + .data(types.parseHex(credentials.sgx.enclavePubKey)) + .data(new TextEncoder().encode(pkg)) // encode as utf-8 bytes + .mr_enclave_list(SGX_CONFIG.exeunitHashes) + .nonce(utf8.encode(activity_id)) // encode as utf-8 string + .max_age(SGX_CONFIG.maxEvidenceAge); + + if (!SGX_CONFIG.allowDebug) { + verifier.not_debug(); + } + if (!SGX_CONFIG.allowOutdatedTcb) { + verifier.not_outdated(); + } + + let result = verifier.verify(); + if (result.verdict != attest.AttestationVerdict.Ok) { + let name = result.verdict.toString(); + throw new Error(`Attestation failed: ${name}: ${result.message}`) + } + } } class ExeScriptRequest implements yaa.ExeScriptRequest { text!: string; - constructor(text) { + constructor(text: string) { this.text = text; } } -export class Activity { - private _api!: RequestorControlApi; - private _state!: RequestorStateApi; - private _id!: string; +class Activity { + protected _api!: RequestorControlApi; + protected _state!: RequestorStateApi; + protected _id!: string; + protected _credentials?: object; - constructor(_api: RequestorControlApi) { + constructor(id: string, _api: RequestorControlApi, _state: RequestorStateApi) { + this._id = id; this._api = _api; + this._state = _state; } set id(x) { @@ -54,19 +152,35 @@ export class Activity { return this._id; } + get credentials(): object | undefined { + return this._credentials; + } + + get exeunitHashes(): string[] | undefined { + return SGX_CONFIG.exeunitHashes.map(value => value.toString()); + } + + async exec(script: object[]) { + let script_txt = JSON.stringify(script); + let req: yaa.ExeScriptRequest = new ExeScriptRequest(script_txt); + let { data: batch_id } = await this._api.exec(this._id, req); + return new Batch(this, batch_id, script.length); + } + async state(): Promise { let { data: result } = await this._state.getActivityState(this._id); let state: yaa.ActivityState = result; return state; } - async send(script: object[]) { - let script_txt = JSON.stringify(script); - let _script_request: yaa.ExeScriptRequest = new ExeScriptRequest( - script_txt + async results(batch_id: string, timeout: number = 30): Promise { + let { data: results } = await this._api.getExecBatchResults( + this._id, + batch_id, + undefined, + timeout, ); - let { data: batch_id } = await this._api.exec(this._id, _script_request); - return new Batch(this._api, this._id, batch_id, script.length); + return results; } async ready(): Promise { @@ -92,8 +206,96 @@ export class Activity { } } +class SecureActivity extends Activity { + _crypto_ctx!: CryptoCtx; + + constructor( + id: string, + credentials: SgxCredentials, + crypto_ctx: CryptoCtx, + _api: RequestorControlApi, + _state: RequestorStateApi + ) { + super(id, _api, _state); + this._credentials = credentials; + this._crypto_ctx = crypto_ctx; + } + + async exec(script: object[]) { + let cmd = { exec: { exe_script: script } }; + let batch_id = await this._send(rand_hex(32), cmd); + return new Batch(this, batch_id, script.length); + } + + async results(batch_id: string, timeout: number = 8): Promise { + let cmd = { getExecBatchResults: { command_index: undefined } }; + let res = await this._send(batch_id, cmd, timeout); + return res; + } + + async _send(batch_id: string, cmd: object, timeout?: number): Promise { + let req = new SecureRequest(this._id, batch_id, cmd, timeout); + let req_buf = Buffer.from(JSON.stringify(req)); + let enc_req = this._crypto_ctx.encrypt(req_buf); + + let { data: enc_res } = await this._api.callEncrypted( + this._id, + // cannot be null / undefined; + // overriden by transformRequest below + '', + { + responseType: 'arraybuffer', + headers: { + 'Content-Type': 'application/octet-stream', + 'Accept': 'application/octet-stream' + }, + transformRequest: [ + // workaround for string conversion; + // we _must_ send a Buffer object + (_headers: any, _data: any) => enc_req, + ], + timeout: 0, + } + ); + + let res_buf = this._crypto_ctx.decrypt(Buffer.from(enc_res)); + let res = SecureResponse.from_buffer(res_buf); + return res.unwrap(); + } +} + +class SecureRequest { + constructor( + private activityId: string, + private batchId: string, + private command: object, + private timeout?: number) {} +} + +class SecureResponse { + command!: string; + Ok?: any; + Err?: any; + + static from_buffer(buffer: Buffer): SecureResponse { + return Object.assign( + new SecureResponse(), + JSON.parse(buffer.toString()) + ); + } + + unwrap(): any { + if (this.command == "error" || !!this.Err) { + throw new Error(this.Err || this.Ok); + } + return this.Ok; + } +} + class Result { idx!: Number; + stdout?: string; + stderr?: string; message?: string; } @@ -105,21 +307,21 @@ class CommandExecutionError extends Error { } class Batch implements AsyncIterable { - private _api!: RequestorControlApi; - private _activity_id!: string; + private _activity!: Activity; private _batch_id!: string; - private _size; + private _size!: number; + public credentials?: SgxCredentials; constructor( - _api: RequestorControlApi, - activity_id: string, + activity: Activity, batch_id: string, - batch_size: number + batch_size: number, + credentials?: SgxCredentials, ) { - this._api = _api; - this._activity_id = activity_id; + this._activity = activity; this._batch_id = batch_id; this._size = batch_size; + this.credentials = credentials; } return(value: any): Promise> { throw new Error("Method not implemented."); @@ -137,12 +339,7 @@ class Batch implements AsyncIterable { let last_idx = 0; while (last_idx < this._size) { let any_new: boolean = false; - let { data: exe_list } = await this._api.getExecBatchResults( - this._activity_id, - this._batch_id, - undefined, - 30 //timeout 30s - ); + let exe_list = await this._activity.results(this._batch_id); let results: yaa.ExeScriptCommandResult[] = exe_list; results = results.slice(last_idx); for (let result of results) { @@ -152,10 +349,12 @@ class Batch implements AsyncIterable { if (result.result.toString() == "Error") throw new CommandExecutionError( last_idx.toString(), - result.message || "" + result.stderr || result.message || "" ); let _result = new Result(); _result.idx = result.index; + _result.stdout = result.stdout; + _result.stderr = result.stderr; _result.message = result.message; yield _result; last_idx = result.index + 1; diff --git a/yajsapi/rest/configuration.ts b/yajsapi/rest/configuration.ts index 4d2aa3879..efd299d6b 100755 --- a/yajsapi/rest/configuration.ts +++ b/yajsapi/rest/configuration.ts @@ -1,4 +1,6 @@ import { yaActivity, yaMarket, yaPayment } from "ya-ts-client"; +import { Agent as HttpAgent } from "http"; +import { Agent as HttpsAgent } from "https"; const DEFAULT_API_URL: string = "http://127.0.0.1:7465"; @@ -21,6 +23,7 @@ export class Configuration { private __market_url!: string; private __payment_url!: string; private __activity_url!: string; + private __axios_opts!: object; constructor( app_key = null, @@ -58,6 +61,15 @@ export class Configuration { "YAGNA_ACTIVITY_URL", "/activity-api/v1" ); + + this.__axios_opts = { + httpAgent: new HttpAgent({ + keepAlive: true, + }), + httpsAgent: new HttpsAgent({ + keepAlive: true, + }), + }; } app_key(): string | null { @@ -81,6 +93,7 @@ export class Configuration { apiKey: this.app_key() as string, basePath: this.__market_url, accessToken: this.app_key() as string, + baseOptions: this.__axios_opts, }); return cfg; } @@ -90,6 +103,7 @@ export class Configuration { apiKey: this.app_key() as string, basePath: this.__payment_url, accessToken: this.app_key() as string, + baseOptions: this.__axios_opts, }); return cfg; } @@ -99,6 +113,7 @@ export class Configuration { apiKey: this.app_key() as string, basePath: this.__activity_url, accessToken: this.app_key() as string, + baseOptions: this.__axios_opts, }); return cfg; } diff --git a/yajsapi/rest/index.ts b/yajsapi/rest/index.ts index b2705677e..5f19b472e 100755 --- a/yajsapi/rest/index.ts +++ b/yajsapi/rest/index.ts @@ -2,6 +2,7 @@ import { Configuration } from "./configuration"; import { Market } from "./market"; import { Invoice, InvoiceStatus, Payment } from "./payment"; import { ActivityService as Activity } from "./activity"; +import * as sgx from "../runner/sgx"; import * as vm from "../runner/vm"; export { @@ -12,4 +13,5 @@ export { Market, Payment, vm, + sgx, }; diff --git a/yajsapi/runner/common.ts b/yajsapi/runner/common.ts new file mode 100644 index 000000000..266e50d64 --- /dev/null +++ b/yajsapi/runner/common.ts @@ -0,0 +1,50 @@ + +import axios from "axios"; +import { DemandBuilder } from "../props/builder"; +import { ExeUnitRequest } from "../props/inf"; + +export const DEFAULT_REPO_URL = "http://3.249.139.167:8000"; + +export async function resolve_url(repo_url: string, image_hash: string): Promise { + let resp = await axios.get( + `${repo_url}/image.${image_hash}.link` + ); + if (resp.status != 200) throw Error(`Error: ${resp.status}`); + + let image_url = await resp.data; + return `hash:sha3:${image_hash}:${image_url}`; +} + +export class Constraints { + inner!: string[]; + + constructor() { + this.inner = []; + } + + extend(items: string[]) { + this.inner.push.apply(this.inner, items); + } + + toString(): string { + return `(&${this.inner.join("\n\t")})`; + } +} + +export class DemandDecor { + constr!: Constraints; + request!: ExeUnitRequest; + public secure!: boolean; + + constructor(constr: Constraints, request: ExeUnitRequest, secure: boolean = false) { + this.constr = constr; + this.request = request; + this.secure = secure; + } + + async decorate_demand(demand: DemandBuilder) { + demand.ensure(this.constr.toString()); + demand.add(this.request); + } +} + diff --git a/yajsapi/runner/ctx.ts b/yajsapi/runner/ctx.ts index dc4511f85..c493d3955 100755 --- a/yajsapi/runner/ctx.ts +++ b/yajsapi/runner/ctx.ts @@ -46,6 +46,9 @@ export class CommandContainer { } export class Work { + public output: object[] = []; + public attestation?: object; + async prepare() { // Executes before commands are send to provider. } @@ -91,6 +94,7 @@ class _SendWork extends Work { this._idx = commands.transfer({ _from: this._src.download_url(), _to: `container:${this._dst_path}`, + _args: {}, }); } } @@ -146,11 +150,30 @@ class _Run extends Work { this._idx = commands.run({ entry_point: this.cmd, args: this.args || [], + capture: { + stdout: { atEnd: {} }, + stderr: { atEnd: {} }, + } }); } } +class _Sign extends Work { + private _idx; + + constructor() { + super(); + this._idx = null; + } + + register(commands: any) { + //CommandContainer + this._idx = commands.sign({}); + } +} + const StorageEvent = events.DownloadStarted || events.DownloadFinished; + class _RecvFile extends Work { private _storage; private _dst_path; @@ -268,6 +291,10 @@ export class WorkContext { new _RecvFile(this._storage, src_path, dst_path, this._emitter) ); } + sign() { + this._prepare(); + this._pending_steps.push(new _Sign()); + } log(args) { logger.info(`${this._id}: ${args}`); } diff --git a/yajsapi/runner/index.ts b/yajsapi/runner/index.ts index d59983787..12af4df20 100755 --- a/yajsapi/runner/index.ts +++ b/yajsapi/runner/index.ts @@ -34,7 +34,12 @@ import { Queue, sleep, } from "../utils"; + +import * as _common from "./common"; import * as _vm from "./vm"; +import * as _sgx from "./sgx"; +export const sgx = _sgx; +export const vm = _vm; import { Task, TaskStatus } from "./task"; import { Consumer, SmartQueue } from "./smartq"; @@ -187,7 +192,7 @@ export class Engine { private _strategy; private _api_config; private _stack; - private _package; + private _demand_decor; private _conf; private _expires; private _get_offers_deadline; @@ -201,7 +206,7 @@ export class Engine { private _wrapped_emitter; constructor( - _package: _vm.Package, + _demand_decor: _common.DemandDecor, max_workers: Number = 5, timeout: any = dayjs.duration({ minutes: 5 }).asMilliseconds(), //timedelta budget: string, //number @@ -213,7 +218,7 @@ export class Engine { this._strategy = strategy; this._api_config = new rest.Configuration(); this._stack = new AsyncExitStack(); - this._package = _package; + this._demand_decor = _demand_decor; this._conf = new _EngineConf(max_workers, timeout); // TODO: setup precision this._budget_amount = parseFloat(budget); @@ -255,7 +260,7 @@ export class Engine { for (let x of multi_payment_decoration.properties) { builder._props[x.key] = x.value; } - await this._package.decorate_demand(builder); + await this._demand_decor.decorate_demand(builder); await this._strategy.decorate_demand(builder); let offer_buffer: { [key: string]: string | _BufferItem } = {}; //Dict[str, _BufferItem] @@ -284,6 +289,7 @@ export class Engine { let agreements_to_pay: Set = new Set(); let invoices: Map = new Map(); let payment_closing: boolean = false; + let secure = this._demand_decor.secure; let offers_collected = 0; let proposals_confirmed = 0; @@ -449,7 +455,7 @@ export class Engine { let _act; try { - _act = await activity_api.new_activity(agreement.id()); + _act = await activity_api.create_activity(agreement, secure) } catch (error) { emit(new events.ActivityCreateFailed({ agr_id: agreement.id() })); throw error; @@ -494,10 +500,15 @@ export class Engine { ); } let task_id = current_worker_task ? current_worker_task.id : null; + batch.attestation = { + credentials: act.credentials, + nonce: act.id, + exeunitHashes: act.exeunitHashes + }; await batch.prepare(); let cc = new CommandContainer(); batch.register(cc); - let remote = await act.send(cc.commands()); + let remote = await act.exec(cc.commands()); emit( new events.ScriptSent({ agr_id: agreement.id(), @@ -507,6 +518,7 @@ export class Engine { ); try { for await (let step of remote) { + batch.output.push(step); emit( new events.CommandExecuted({ success: true, @@ -803,5 +815,3 @@ export class Engine { await this._stack.aclose(); } } - -export const vm = _vm; diff --git a/yajsapi/runner/sgx.ts b/yajsapi/runner/sgx.ts new file mode 100644 index 000000000..9bfde44b9 --- /dev/null +++ b/yajsapi/runner/sgx.ts @@ -0,0 +1,141 @@ +import * as fs from "fs"; +import { Field } from "../props/base"; +import { + DEFAULT_REPO_URL, + Constraints, + DemandDecor, + resolve_url +} from "./common"; +import { + INF_CORES, + INF_RUNTIME, + ExeUnitRequest, + InfBase, + RuntimeType, +} from "../props/inf"; +import { types } from "sgx-ias-js"; + +class _InfSgxWasi extends InfBase { + runtime = new Field({ + value: RuntimeType.SGX_WASI, + metadata: { key: INF_RUNTIME }, + }); + cores: Field = new Field({ value: 1, metadata: { key: INF_CORES } }); +} + +const _InfSgxWasiKeys = InfBase.fields( + new _InfSgxWasi(), + ["cores", "mem", "storage", "runtime"] +); + +class _SgxWasiConstrains extends Constraints { + constructor(min_mem_gib: number, min_storage_gib: number) { + super(); + super.extend([ + `(${_InfSgxWasiKeys["cores"]}>=1)`, + `(${_InfSgxWasiKeys["mem"]}>=${min_mem_gib})`, + `(${_InfSgxWasiKeys["storage"]}>=${min_storage_gib})`, + `(${_InfSgxWasiKeys["runtime"]}=${RuntimeType.SGX_WASI})`, + ]); + } +} + +class _InfSgxJsSp extends InfBase { + runtime = new Field({ + value: RuntimeType.SGX_JS_SP, + metadata: { key: INF_RUNTIME }, + }); + cores: Field = new Field({ value: 1, metadata: { key: INF_CORES } }); +} + +const _InfSgxJsSpKeys = InfBase.fields( + new _InfSgxJsSp(), + ["cores", "mem", "storage", "runtime"]); + +class _SgxJsSpConstrains extends Constraints { + constructor(min_mem_gib: number, min_storage_gib: number) { + super(); + super.extend([ + `(${_InfSgxJsSpKeys["cores"]}>=1)`, + `(${_InfSgxJsSpKeys["mem"]}>=${min_mem_gib})`, + `(${_InfSgxJsSpKeys["storage"]}>=${min_storage_gib})`, + `(${_InfSgxJsSpKeys["runtime"]}=${RuntimeType.SGX_JS_SP})`, + ]); + } +} + +const DEFAULT_SGX_CONFIG = { + "enableAttestation": true, + "exeunitHashes": ["5edbb025714683961d4a2cb51b1d0a4ee8225a6ced167f29eb67f639313d9490"], + "allowDebug": true, + "allowOutdatedTcb": true, + "maxEvidenceAge": 60 +}; + +class SgxConfig { + enableAttestation!: boolean; + exeunitHashes!: types.bytes.Bytes32[]; + allowDebug!: boolean; + allowOutdatedTcb!: boolean; + maxEvidenceAge!: number; // seconds + + static from_env(): SgxConfig { + let env_path = process.env.YAGNA_SGX_CONFIG; + let json = env_path + ? fs.readFileSync(env_path) + : DEFAULT_SGX_CONFIG; + + json["exeunitHashes"].forEach((hex: string, i: number) => { + json["exeunitHashes"][i] = types.bytes.Bytes32.from(types.parseHex(hex)); + }); + + let sgx_config: SgxConfig = Object.create(this.prototype); + return Object.assign(sgx_config, json); + } +} + +export const SGX_CONFIG = SgxConfig.from_env(); + +export enum SgxEngine { + WASI = "sgx", + JS_SP = "sgx-js-sp", +} + +export async function repo( + engine: SgxEngine, + image_hash: string, + min_mem_gib: number = 0.5, + min_storage_gib: number = 2.0, + image_repo: string = DEFAULT_REPO_URL, +): Promise { + /* + Builds reference to a demand decorator. + + - *engine*: SGX runtime engine to use. + - *image_hash*: finds package by its contents hash. + - *image_repo* image repository to query. + - *min_mem_gib*: minimal memory required to execute application code. + - *min_storage_gib* minimal disk storage to execute tasks. + + */ + + let pkg_url = await resolve_url(image_repo, image_hash); + let secure = true; + + switch (engine) { + case SgxEngine.JS_SP: + return new DemandDecor( + new _SgxJsSpConstrains(min_mem_gib, min_storage_gib), + new ExeUnitRequest(pkg_url), + secure, + ); + case SgxEngine.WASI: + return new DemandDecor( + new _SgxWasiConstrains(min_mem_gib, min_storage_gib), + new ExeUnitRequest(pkg_url), + secure, + ); + default: + throw Error(`Invalid SGX runtime engine: ${engine}`); + } +} diff --git a/yajsapi/runner/vm.ts b/yajsapi/runner/vm.ts index f2af699d9..59f58c4f3 100755 --- a/yajsapi/runner/vm.ts +++ b/yajsapi/runner/vm.ts @@ -1,88 +1,80 @@ -import axios from "axios"; -import { DemandBuilder } from "../props/builder"; +import { Field } from "../props/base"; import { - InfVmKeys, + DEFAULT_REPO_URL, + Constraints, + DemandDecor, + resolve_url +} from "./common"; +import { + INF_RUNTIME, RuntimeType, - VmRequest, - VmPackageFormat, + ExeUnitRequest, + InfBase, } from "../props/inf"; -const _DEFAULT_REPO_URL = "http://3.249.139.167:8000"; - -class _VmConstrains { - public min_mem_gib!: number; - public min_storage_gib!: number; - public cores: number = 1; - - constructor(min_mem_gib, min_storage_gib, cores = 1) { - this.min_mem_gib = min_mem_gib; - this.min_storage_gib = min_storage_gib; - this.cores = cores; - } - - toString() { - let rules = [ - `(${InfVmKeys["mem"]}>=${this.min_mem_gib})`, - `(${InfVmKeys["storage"]}>=${this.min_storage_gib})`, - `(${InfVmKeys["runtime"]}=${RuntimeType.VM})`, - ].join("\n\t"); - return `(&${rules})`; - } +export enum VmPackageFormat { + UNKNOWN = "", + GVMKIT_SQUASH = "gvmkit-squash", } -export class Package { - async resolve_url(): Promise { - return ""; - } - async decorate_demand(demand: DemandBuilder) {} +class _InfVm extends InfBase { + runtime = new Field({ + value: RuntimeType.VM, + metadata: { key: INF_RUNTIME }, + }); } +const _InfVmKeys = InfBase.fields( + new _InfVm(), + ["cores", "mem", "storage", "runtime"] +); -class _VmPackage extends Package { - repo_url!: string; - image_hash!: string; - constraints!: _VmConstrains; - - constructor(repo_url, image_hash, constraints) { +class _VmConstrains extends Constraints { + constructor(min_mem_gib: number, min_storage_gib: number, min_cores: number = 1) { super(); - this.repo_url = repo_url; - this.image_hash = image_hash; - this.constraints = constraints; + super.extend([ + `(${_InfVmKeys["cores"]}>=${min_cores})`, + `(${_InfVmKeys["mem"]}>=${min_mem_gib})`, + `(${_InfVmKeys["storage"]}>=${min_storage_gib})`, + `(${_InfVmKeys["runtime"]}=${RuntimeType.VM})`, + ]); } +} - async resolve_url(): Promise { - let resp = await axios.get( - `${this.repo_url}/image.${this.image_hash}.link` - ); - if (resp.status != 200) throw Error(`Error: ${resp.status}`); - - let image_url = await resp.data; - let image_hash = this.image_hash; - return `hash:sha3:${image_hash}:${image_url}`; - } +class _VmRequest extends ExeUnitRequest { + package_format: Field = new Field({ + metadata: { key: "golem.srv.comp.vm.package_format" }, + }); - async decorate_demand(demand: DemandBuilder) { - let image_url = await this.resolve_url(); - demand.ensure(this.constraints.toString()); - demand.add(new VmRequest(image_url, VmPackageFormat.GVMKIT_SQUASH)); + constructor(package_url: string, package_format: VmPackageFormat) { + super(package_url); + this.package_format.value = package_format; } } export async function repo( image_hash: string, min_mem_gib: number = 0.5, - min_storage_gib: number = 2.0 -): Promise<_VmPackage> { + min_storage_gib: number = 2.0, + min_cores: number = 1, + image_format: VmPackageFormat = VmPackageFormat.GVMKIT_SQUASH, + image_repo: string = DEFAULT_REPO_URL, +): Promise { /* - Builds reference to application package. + Builds reference to a demand decorator. - *image_hash*: finds package by its contents hash. + - *image_format* vm image format to use. + - *image_repo* image repository to query. - *min_mem_gib*: minimal memory required to execute application code. - *min_storage_gib* minimal disk storage to execute tasks. + - *min_cores* minimal cpu core count to execute tasks. */ - return new _VmPackage( - _DEFAULT_REPO_URL, - image_hash, - new _VmConstrains(min_mem_gib, min_storage_gib) + + let pkg_url = await resolve_url(image_repo, image_hash); + + return new DemandDecor( + new _VmConstrains(min_mem_gib, min_storage_gib, min_cores), + new _VmRequest(pkg_url, image_format), ); }