diff --git a/yajsapi/executor/index.ts b/yajsapi/executor/index.ts index cf18c44f1..54a24cc49 100755 --- a/yajsapi/executor/index.ts +++ b/yajsapi/executor/index.ts @@ -419,90 +419,105 @@ export class Executor implements ComputationHistory { logger.debug("Stopped processing debit notes."); } - async function find_offers(): Promise { - let _subscription: Subscription; + async function find_offers_for_subscription(subscription: Subscription): Promise { + emit(new events.SubscriptionCreated({ sub_id: subscription.id() })); + let _proposals; try { - _subscription = await builder.subscribe(market_api); + _proposals = subscription.events(self._worker_cancellation_token); } catch (error) { - emit(new events.SubscriptionFailed({ reason: error })); + emit( + new events.CollectFailed({ + sub_id: subscription.id(), + reason: error, + }) + ); throw error; } - await asyncWith(_subscription, async (subscription) => { - emit(new events.SubscriptionCreated({ sub_id: subscription.id() })); - let _proposals; + for await (let proposal of _proposals) { + emit( + new events.ProposalReceived({ + prop_id: proposal.id(), + provider_id: proposal.issuer(), + }) + ); + offers_collected += 1; + let score; try { - _proposals = subscription.events(self._worker_cancellation_token); + score = await strategy.score_offer(proposal, self); + logger.debug(`Scored offer ${proposal.id()}, ` + + `provider: ${proposal.props()["golem.node.id.name"]}, ` + + `strategy: ${strategy.constructor.name}, ` + + `score: ${score}`); } catch (error) { - emit( - new events.CollectFailed({ - sub_id: subscription.id(), - reason: error, - }) - ); + logger.log("debug", `Score offer error: ${error}`); + try { + await proposal.reject(error); + emit( + new events.ProposalRejected({ + prop_id: proposal.id(), + reason: error, + }) + ); + } catch (e) { + emit( + new events.ProposalFailed({ + prop_id: proposal.id(), + reason: e, + }) + ); + } + continue; } - for await (let proposal of _proposals) { - emit( - new events.ProposalReceived({ - prop_id: proposal.id(), - provider_id: proposal.issuer(), - }) - ); - offers_collected += 1; - let score; + if (score < SCORE_NEUTRAL) { + const reason = "Score too low"; try { - score = await strategy.score_offer(proposal, self); - logger.debug(`Scored offer ${proposal.id()}, ` + - `provider: ${proposal.props()["golem.node.id.name"]}, ` + - `strategy: ${strategy.constructor.name}, ` + - `score: ${score}`); + await proposal.reject(reason); + emit(new events.ProposalRejected({ + prop_id: proposal.id(), + reason: reason, + })); } catch (error) { - logger.log("debug", `Score offer error: ${error}`); - try { - await proposal.reject(error); - emit( - new events.ProposalRejected({ - prop_id: proposal.id(), - reason: error, - }) - ); - } catch (e) { - emit( - new events.ProposalFailed({ - prop_id: proposal.id(), - reason: e, - }) - ); - } - continue; - } - if (score < SCORE_NEUTRAL) { - const reason = "Score too low"; - try { - await proposal.reject(reason); - emit(new events.ProposalRejected({ + emit( + new events.ProposalFailed({ prop_id: proposal.id(), - reason: reason, - })); - } catch (error) { - emit( - new events.ProposalFailed({ - prop_id: proposal.id(), - reason: error, - }) - ); - } - continue; + reason: error, + }) + ); } - if (!proposal.is_draft()) { - try { - const common_platforms = self._get_common_payment_platforms( - proposal - ); - if (common_platforms.length) { - builder._properties["golem.com.payment.chosen-platform"] = - common_platforms[0]; - } else { - const reason = "No common payment platforms"; + continue; + } + if (!proposal.is_draft()) { + try { + const common_platforms = self._get_common_payment_platforms( + proposal + ); + if (common_platforms.length) { + builder._properties["golem.com.payment.chosen-platform"] = + common_platforms[0]; + } else { + const reason = "No common payment platforms"; + try { + await proposal.reject(reason); + emit( + new events.ProposalRejected({ + prop_id: proposal.id, + reason: reason, + }) + ); + } catch (error) { + emit( + new events.ProposalFailed({ + prop_id: proposal.id(), + reason: error, + }) + ); + } + continue; + } + let timeout = proposal.props()[DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP]; + if (timeout) { + if (timeout < DEBIT_NOTE_MIN_TIMEOUT) { + const reason = "Debit note acceptance timeout too short"; try { await proposal.reject(reason); emit( @@ -511,65 +526,62 @@ export class Executor implements ComputationHistory { reason: reason, }) ); - } catch (error) { + } catch (e) { emit( new events.ProposalFailed({ prop_id: proposal.id(), - reason: error, + reason: e, }) ); } continue; + } else { + builder._properties[DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP] = timeout; } - let timeout = proposal.props()[DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP]; - if (timeout) { - if (timeout < DEBIT_NOTE_MIN_TIMEOUT) { - const reason = "Debit note acceptance timeout too short"; - try { - await proposal.reject(reason); - emit( - new events.ProposalRejected({ - prop_id: proposal.id, - reason: reason, - }) - ); - } catch (e) { - emit( - new events.ProposalFailed({ - prop_id: proposal.id(), - reason: e, - }) - ); - } - continue; - } else { - builder._properties[DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP] = timeout; - } - } - await proposal.respond( - builder.properties(), - builder.constraints() - ); - emit(new events.ProposalResponded({ prop_id: proposal.id() })); - } catch (error) { - emit( - new events.ProposalFailed({ - prop_id: proposal.id(), - reason: error, - }) - ); } - } else { - emit(new events.ProposalConfirmed({ prop_id: proposal.id() })); - offer_buffer[proposal.issuer()] = new _BufferItem( - Date.now(), - score, - proposal + await proposal.respond( + builder.properties(), + builder.constraints() + ); + emit(new events.ProposalResponded({ prop_id: proposal.id() })); + } catch (error) { + emit( + new events.ProposalFailed({ + prop_id: proposal.id(), + reason: error, + }) ); - proposals_confirmed += 1; } + } else { + emit(new events.ProposalConfirmed({ prop_id: proposal.id() })); + offer_buffer[proposal.issuer()] = new _BufferItem( + Date.now(), + score, + proposal + ); + proposals_confirmed += 1; } - }); + } + } + + async function find_offers(): Promise { + let keepSubscribing = true; + while (keepSubscribing && !self._worker_cancellation_token.cancelled) { + try { + const subscription = await builder.subscribe(market_api); + await asyncWith(subscription, async (subscription) => { + try { + await find_offers_for_subscription(subscription); + } catch (error) { + logger.error(`Error while finding offers for a subscription: ${error}`); + keepSubscribing = false; + } + }); + } catch (error) { + emit(new events.SubscriptionFailed({ reason: error })); + keepSubscribing = false; + } + } logger.debug("Stopped checking and scoring new offers."); } @@ -626,7 +638,7 @@ export class Executor implements ComputationHistory { if (self._worker_cancellation_token.cancelled) { return; } const _batch_timeout = batch.timeout(); const batch_deadline = _batch_timeout - ? dayjs.utc().unix() + _batch_timeout + ? dayjs.utc().unix() + _batch_timeout / 1000 : null; try { let current_worker_task = consumer.last_item(); @@ -691,6 +703,7 @@ export class Executor implements ComputationHistory { agreement_id: agreement.id(), partial: true, }); + emit(new events.CheckingPayments()); } catch (error) { if (self._worker_cancellation_token.cancelled) { return; } try { diff --git a/yajsapi/rest/activity.ts b/yajsapi/rest/activity.ts index d53416db8..7020e8c73 100755 --- a/yajsapi/rest/activity.ts +++ b/yajsapi/rest/activity.ts @@ -60,6 +60,7 @@ export class ActivityService { let { data: response } = await this._api.createActivity({ agreementId: agreement_id }, { timeout: 30000, params: { timeout: 25 } }); let activity_id = typeof response == "string" ? response : response.activityId; + logger.debug(`Created activity ${activity_id} for agreement ${agreement_id}`); return new Activity(activity_id, this._api, this._state); } @@ -198,11 +199,18 @@ class Activity { cancellationToken?: CancellationToken ): Promise { const script_txt = JSON.stringify(script); - const { data: batch_id } = await this._api.exec( - this._id, - new ExeScriptRequest(script_txt), - { timeout: 5000 } - ); + let batch_id; + try { + const { data } = await this._api.exec( + this._id, + new ExeScriptRequest(script_txt), + { timeout: 10000 } + ); + batch_id = data; + } catch (error) { + logger.warn(`Error while sending batch script to provider: ${error}`); + throw error; + } if (stream) { return new StreamingBatch( @@ -355,7 +363,12 @@ export class CommandExecutionError extends Error { } } -class BatchTimeoutError extends Error {} +class BatchTimeoutError extends Error { + constructor(message) { + super(message); + this.name = "BatchTimeoutError"; + } +} class Batch implements AsyncIterable { protected api!: RequestorControlApi; @@ -419,7 +432,7 @@ class PollingBatch extends Batch { throw new CommandExecutionError(last_idx.toString(), "Interrupted."); } if (timeout && timeout <= 0) { - throw new BatchTimeoutError(); + throw new BatchTimeoutError(`Task timeout for activity ${this.activity_id}`); } try { let { data } = await this.api.getExecBatchResults( diff --git a/yajsapi/rest/market.ts b/yajsapi/rest/market.ts index d37b9b095..2388ae991 100755 --- a/yajsapi/rest/market.ts +++ b/yajsapi/rest/market.ts @@ -245,17 +245,24 @@ export class Subscription { { timeout: 5000 } ); for (let _proposal of proposals) { + if (cancellationToken && cancellationToken.cancelled) return; if (_proposal.eventType === "ProposalEvent") { yield new OfferProposal(this, _proposal as models.ProposalEvent); } } - if (cancellationToken && cancellationToken.cancelled) break; if (!proposals || !proposals.length) { await sleep(2); } } catch (error) { - logger.error(error); - throw Error(error); + if (error.response && error.response.status === 404) { + logger.debug(`Offer unsubscribed or its subscription expired, subscription_id: ${this._id}`); + this._open = false; + // Prevent calling `unsubscribe` which would result in API error for expired demand subscriptions + this._deleted = true; + } else { + logger.error(`Error while collecting offers: ${error}`); + throw error; + } } } return; @@ -286,8 +293,8 @@ export class Market { let { data: sub_id } = await self._api.subscribeDemand(request, { timeout: 5000 }); return new Subscription(self._api, sub_id); } catch (error) { - logger.error(error); - throw new Error(error); + logger.error(`Error while subscribing: ${error}`); + throw error; } } diff --git a/yajsapi/utils/log.ts b/yajsapi/utils/log.ts index 902af6c99..27e848708 100644 --- a/yajsapi/utils/log.ts +++ b/yajsapi/utils/log.ts @@ -175,6 +175,8 @@ class SummaryLogger { if (eventName === events.ComputationStarted.name) this._reset(); if (eventName === events.SubscriptionCreated.name) logger.info(event_type_to_string[eventName]); + else if (eventName === events.SubscriptionFailed.name) + logger.error(`Subscription failed: ${event["reason"]}`); else if (eventName === events.ProposalReceived.name) this.received_proposals[event["prop_id"]] = event["provider_id"]; else if (eventName === events.ProposalConfirmed.name) {