Skip to content

Commit

Permalink
Port subscription expiration from yapapi (#177)
Browse files Browse the repository at this point in the history
  • Loading branch information
filipgolem authored May 5, 2021
1 parent 0ceff1a commit fda063f
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 133 deletions.
255 changes: 134 additions & 121 deletions yajsapi/executor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -419,90 +419,105 @@ export class Executor implements ComputationHistory {
logger.debug("Stopped processing debit notes.");
}

async function find_offers(): Promise<void> {
let _subscription: Subscription;
async function find_offers_for_subscription(subscription: Subscription): Promise<void> {
emit(new events.SubscriptionCreated({ sub_id: subscription.id() }));
let _proposals;
try {
_subscription = await builder.subscribe(market_api);
_proposals = subscription.events(self._worker_cancellation_token);
} catch (error) {
emit(new events.SubscriptionFailed({ reason: error }));
emit(
new events.CollectFailed({
sub_id: subscription.id(),
reason: error,
})
);
throw error;
}
await asyncWith(_subscription, async (subscription) => {
emit(new events.SubscriptionCreated({ sub_id: subscription.id() }));
let _proposals;
for await (let proposal of _proposals) {
emit(
new events.ProposalReceived({
prop_id: proposal.id(),
provider_id: proposal.issuer(),
})
);
offers_collected += 1;
let score;
try {
_proposals = subscription.events(self._worker_cancellation_token);
score = await strategy.score_offer(proposal, self);
logger.debug(`Scored offer ${proposal.id()}, ` +
`provider: ${proposal.props()["golem.node.id.name"]}, ` +
`strategy: ${strategy.constructor.name}, ` +
`score: ${score}`);
} catch (error) {
emit(
new events.CollectFailed({
sub_id: subscription.id(),
reason: error,
})
);
logger.log("debug", `Score offer error: ${error}`);
try {
await proposal.reject(error);
emit(
new events.ProposalRejected({
prop_id: proposal.id(),
reason: error,
})
);
} catch (e) {
emit(
new events.ProposalFailed({
prop_id: proposal.id(),
reason: e,
})
);
}
continue;
}
for await (let proposal of _proposals) {
emit(
new events.ProposalReceived({
prop_id: proposal.id(),
provider_id: proposal.issuer(),
})
);
offers_collected += 1;
let score;
if (score < SCORE_NEUTRAL) {
const reason = "Score too low";
try {
score = await strategy.score_offer(proposal, self);
logger.debug(`Scored offer ${proposal.id()}, ` +
`provider: ${proposal.props()["golem.node.id.name"]}, ` +
`strategy: ${strategy.constructor.name}, ` +
`score: ${score}`);
await proposal.reject(reason);
emit(new events.ProposalRejected({
prop_id: proposal.id(),
reason: reason,
}));
} catch (error) {
logger.log("debug", `Score offer error: ${error}`);
try {
await proposal.reject(error);
emit(
new events.ProposalRejected({
prop_id: proposal.id(),
reason: error,
})
);
} catch (e) {
emit(
new events.ProposalFailed({
prop_id: proposal.id(),
reason: e,
})
);
}
continue;
}
if (score < SCORE_NEUTRAL) {
const reason = "Score too low";
try {
await proposal.reject(reason);
emit(new events.ProposalRejected({
emit(
new events.ProposalFailed({
prop_id: proposal.id(),
reason: reason,
}));
} catch (error) {
emit(
new events.ProposalFailed({
prop_id: proposal.id(),
reason: error,
})
);
}
continue;
reason: error,
})
);
}
if (!proposal.is_draft()) {
try {
const common_platforms = self._get_common_payment_platforms(
proposal
);
if (common_platforms.length) {
builder._properties["golem.com.payment.chosen-platform"] =
common_platforms[0];
} else {
const reason = "No common payment platforms";
continue;
}
if (!proposal.is_draft()) {
try {
const common_platforms = self._get_common_payment_platforms(
proposal
);
if (common_platforms.length) {
builder._properties["golem.com.payment.chosen-platform"] =
common_platforms[0];
} else {
const reason = "No common payment platforms";
try {
await proposal.reject(reason);
emit(
new events.ProposalRejected({
prop_id: proposal.id,
reason: reason,
})
);
} catch (error) {
emit(
new events.ProposalFailed({
prop_id: proposal.id(),
reason: error,
})
);
}
continue;
}
let timeout = proposal.props()[DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP];
if (timeout) {
if (timeout < DEBIT_NOTE_MIN_TIMEOUT) {
const reason = "Debit note acceptance timeout too short";
try {
await proposal.reject(reason);
emit(
Expand All @@ -511,65 +526,62 @@ export class Executor implements ComputationHistory {
reason: reason,
})
);
} catch (error) {
} catch (e) {
emit(
new events.ProposalFailed({
prop_id: proposal.id(),
reason: error,
reason: e,
})
);
}
continue;
} else {
builder._properties[DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP] = timeout;
}
let timeout = proposal.props()[DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP];
if (timeout) {
if (timeout < DEBIT_NOTE_MIN_TIMEOUT) {
const reason = "Debit note acceptance timeout too short";
try {
await proposal.reject(reason);
emit(
new events.ProposalRejected({
prop_id: proposal.id,
reason: reason,
})
);
} catch (e) {
emit(
new events.ProposalFailed({
prop_id: proposal.id(),
reason: e,
})
);
}
continue;
} else {
builder._properties[DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP] = timeout;
}
}
await proposal.respond(
builder.properties(),
builder.constraints()
);
emit(new events.ProposalResponded({ prop_id: proposal.id() }));
} catch (error) {
emit(
new events.ProposalFailed({
prop_id: proposal.id(),
reason: error,
})
);
}
} else {
emit(new events.ProposalConfirmed({ prop_id: proposal.id() }));
offer_buffer[proposal.issuer()] = new _BufferItem(
Date.now(),
score,
proposal
await proposal.respond(
builder.properties(),
builder.constraints()
);
emit(new events.ProposalResponded({ prop_id: proposal.id() }));
} catch (error) {
emit(
new events.ProposalFailed({
prop_id: proposal.id(),
reason: error,
})
);
proposals_confirmed += 1;
}
} else {
emit(new events.ProposalConfirmed({ prop_id: proposal.id() }));
offer_buffer[proposal.issuer()] = new _BufferItem(
Date.now(),
score,
proposal
);
proposals_confirmed += 1;
}
});
}
}

async function find_offers(): Promise<void> {
let keepSubscribing = true;
while (keepSubscribing && !self._worker_cancellation_token.cancelled) {
try {
const subscription = await builder.subscribe(market_api);
await asyncWith(subscription, async (subscription) => {
try {
await find_offers_for_subscription(subscription);
} catch (error) {
logger.error(`Error while finding offers for a subscription: ${error}`);
keepSubscribing = false;
}
});
} catch (error) {
emit(new events.SubscriptionFailed({ reason: error }));
keepSubscribing = false;
}
}
logger.debug("Stopped checking and scoring new offers.");
}

Expand Down Expand Up @@ -626,7 +638,7 @@ export class Executor implements ComputationHistory {
if (self._worker_cancellation_token.cancelled) { return; }
const _batch_timeout = batch.timeout();
const batch_deadline = _batch_timeout
? dayjs.utc().unix() + _batch_timeout
? dayjs.utc().unix() + _batch_timeout / 1000
: null;
try {
let current_worker_task = consumer.last_item();
Expand Down Expand Up @@ -691,6 +703,7 @@ export class Executor implements ComputationHistory {
agreement_id: agreement.id(),
partial: true,
});
emit(new events.CheckingPayments());
} catch (error) {
if (self._worker_cancellation_token.cancelled) { return; }
try {
Expand Down
27 changes: 20 additions & 7 deletions yajsapi/rest/activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export class ActivityService {
let { data: response } = await this._api.createActivity({ agreementId: agreement_id }, { timeout: 30000, params: { timeout: 25 } });
let activity_id =
typeof response == "string" ? response : response.activityId;
logger.debug(`Created activity ${activity_id} for agreement ${agreement_id}`);
return new Activity(activity_id, this._api, this._state);
}

Expand Down Expand Up @@ -198,11 +199,18 @@ class Activity {
cancellationToken?: CancellationToken
): Promise<any> {
const script_txt = JSON.stringify(script);
const { data: batch_id } = await this._api.exec(
this._id,
new ExeScriptRequest(script_txt),
{ timeout: 5000 }
);
let batch_id;
try {
const { data } = await this._api.exec(
this._id,
new ExeScriptRequest(script_txt),
{ timeout: 10000 }
);
batch_id = data;
} catch (error) {
logger.warn(`Error while sending batch script to provider: ${error}`);
throw error;
}

if (stream) {
return new StreamingBatch(
Expand Down Expand Up @@ -355,7 +363,12 @@ export class CommandExecutionError extends Error {
}
}

class BatchTimeoutError extends Error {}
class BatchTimeoutError extends Error {
constructor(message) {
super(message);
this.name = "BatchTimeoutError";
}
}

class Batch implements AsyncIterable<events.CommandEventContext> {
protected api!: RequestorControlApi;
Expand Down Expand Up @@ -419,7 +432,7 @@ class PollingBatch extends Batch {
throw new CommandExecutionError(last_idx.toString(), "Interrupted.");
}
if (timeout && timeout <= 0) {
throw new BatchTimeoutError();
throw new BatchTimeoutError(`Task timeout for activity ${this.activity_id}`);
}
try {
let { data } = await this.api.getExecBatchResults(
Expand Down
17 changes: 12 additions & 5 deletions yajsapi/rest/market.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,17 +245,24 @@ export class Subscription {
{ timeout: 5000 }
);
for (let _proposal of proposals) {
if (cancellationToken && cancellationToken.cancelled) return;
if (_proposal.eventType === "ProposalEvent") {
yield new OfferProposal(this, _proposal as models.ProposalEvent);
}
}
if (cancellationToken && cancellationToken.cancelled) break;
if (!proposals || !proposals.length) {
await sleep(2);
}
} catch (error) {
logger.error(error);
throw Error(error);
if (error.response && error.response.status === 404) {
logger.debug(`Offer unsubscribed or its subscription expired, subscription_id: ${this._id}`);
this._open = false;
// Prevent calling `unsubscribe` which would result in API error for expired demand subscriptions
this._deleted = true;
} else {
logger.error(`Error while collecting offers: ${error}`);
throw error;
}
}
}
return;
Expand Down Expand Up @@ -286,8 +293,8 @@ export class Market {
let { data: sub_id } = await self._api.subscribeDemand(request, { timeout: 5000 });
return new Subscription(self._api, sub_id);
} catch (error) {
logger.error(error);
throw new Error(error);
logger.error(`Error while subscribing: ${error}`);
throw error;
}
}

Expand Down
Loading

0 comments on commit fda063f

Please sign in to comment.