diff --git a/Quick.Threads.pas b/Quick.Threads.pas index cb4b850..07b6a2c 100644 --- a/Quick.Threads.pas +++ b/Quick.Threads.pas @@ -5,9 +5,9 @@ Unit : Quick.Threads Description : Thread safe collections Author : Kike Pérez - Version : 1.4 + Version : 1.5 Created : 09/03/2018 - Modified : 31/07/2019 + Modified : 22/08/2019 This file is part of QuickLib: https://github.com/exilon/QuickLib @@ -35,13 +35,13 @@ interface uses Classes, - //rtti, Types, SysUtils, DateUtils, Quick.Commons, //Quick.Chrono, Quick.Value, + Quick.FaultControl, {$IFNDEF FPC} System.RTLConsts, System.Generics.Collections, @@ -139,13 +139,20 @@ TThreadObjectList = class(TList) end; {$ENDIF} - {$IFDEF FPC} + {$IFNDEF FPC} + TAnonExceptionProc = reference to procedure(aException : Exception); + TAnonProc = TProc; + {$ELSE} TProc = procedure of object; + TAnonExceptionProc = procedure(aException : Exception) of object; {$ENDIF} + TThreadWorkStatus = (wsRunning, wsDone, wsException); + TAdvThread = class(TThread) private fExecuteProc : TProc; + fExceptionProc : TAnonExceptionProc; fTerminateProc : TProc; fExecuteWithSync : Boolean; fTerminateWithSync : Boolean; @@ -155,12 +162,14 @@ TAdvThread = class(TThread) procedure DoTerminate; override; public constructor Create(aProc : TProc; aSynchronize : Boolean); + procedure OnException(aProc : TAnonExceptionProc); procedure OnTerminate(aProc : TProc; aSynchronize : Boolean); procedure Execute; override; end; IAnonymousThread = interface procedure Start; + function OnException(aProc : TAnonExceptionProc) : IAnonymousThread; function OnTerminate(aProc : TProc) : IAnonymousThread; function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; end; @@ -173,11 +182,27 @@ TAnonymousThread = class(TInterfacedObject,IAnonymousThread) class function Execute(aProc : TProc) : IAnonymousThread; overload; class function Execute_Sync(aProc : TProc) : IAnonymousThread; overload; procedure Start; + function OnException(aProc : TAnonExceptionProc) : IAnonymousThread; function OnTerminate(aProc : TProc) : IAnonymousThread; overload; function OnTerminate_Sync(aProc : TProc) : IAnonymousThread; overload; end; - TParamArray = array of TFlexValue; + TParamValue = class + private + fName : string; + fValue : TFlexValue; + fOwned : Boolean; + public + constructor Create; overload; + constructor Create(const aName : string; aValue : TFlexValue; aOwnedValue : Boolean); overload; + constructor Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean); overload; + destructor Destroy; override; + property Name : string read fName write fName; + property Value : TFlexValue read fValue write fValue; + property Owned : Boolean read fOwned write fOwned; + end; + + TParamList = TObjectList; TWorkTaskStatus = (wtsPending, wtsAssigned, wtsRunning, wtsDone, wtsException); @@ -185,45 +210,91 @@ TAnonymousThread = class(TInterfacedObject,IAnonymousThread) TTimeMeasure = (tmDays, tmHours, tmMinutes, tmSeconds, tmMilliseconds); + ETaskInitializationError = class(Exception); + ETaskExecutionError = class(Exception); + ETaskParamError = class(Exception); + ETaskSchedulerError = class(Exception); + ITask = interface ['{0182FD36-5A7C-4C00-BBF8-7CFB1E3F9BB1}'] - function GetParam(aIndex : Integer) : TFlexValue; + function GetParam(aIndex : Integer) : TFlexValue; overload; + function GetParam(const aName : string) : TFlexValue; overload; + function GetParam2(aIndex : Integer) : PFlexValue; + procedure SetParam(aIndex : Integer; Value : TFlexValue); overload; + procedure SetParam(const aName : string; Value : TFlexValue); overload; function TaskStatus : TWorkTaskStatus; function GetNumWorker : Integer; procedure SetNumWorker(Value : Integer); function GetIdTask : Int64; procedure SetIdTask(Value : Int64); + function GetResult : TFlexValue; + procedure SetResult(aValue : TFlexValue); procedure DoExecute; procedure DoException(aException : Exception); procedure DoTerminate; - property Param[index : Integer] : TFlexValue read GetParam; + {$IFNDEF FPC} + property Param[index : Integer] : TFlexValue read GetParam write SetParam; default; + property Param[const Name : string] : TFlexValue read GetParam write SetParam; default; + {$ELSE} + property Param[index : Integer] : TFlexValue read GetParam write SetParam; + property ParamByName[const Name : string] : TFlexValue read GetParam write SetParam; default; + {$ENDIF} property NumWorker : Integer read GetNumWorker write SetNumWorker; + property Result : TFlexValue read GetResult write SetResult; property IdTask : Int64 read GetIdTask; + function Done : Boolean; + function Failed : Boolean; + function NumRetries : Integer; + function MaxRetries : Integer; + function LastException : Exception; + function CircuitBreaked : Boolean; function IsEnabled : Boolean; end; {$IFNDEF FPC} TTaskProc = reference to procedure(task : ITask); TTaskExceptionProc = reference to procedure(task : ITask; aException : Exception); + TTaskRetryProc = reference to procedure(task : ITask; aException : Exception; var aStopRetries : Boolean); {$ELSE} TTaskProc = procedure(task : ITask) of object; TTaskExceptionProc = procedure(task : ITask; aException : Exception) of object; + TTaskRetryProc = procedure(task : ITask; aException : Exception; var aStopRetries : Boolean) of object; {$ENDIF} IWorkTask = interface(ITask) + function OnInitialize(aTaskProc : TTaskProc) : IWorkTask; function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; + function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask; function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; + function Retry(aMaxRetries : Integer) : IWorkTask; + function RetryForever : IWorkTask; + function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload; + function WaitAndRetry(aWaitTimeArray : TArray) : IWorkTask; overload; + function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload; + function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload; + function WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload; + function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload; + function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload; procedure Run; end; IScheduledTask = interface(ITask) ['{AE551638-ECDE-4F64-89BF-F07BFCB9C9F7}'] + function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask; function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; + function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask; function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; function OnExpired_Sync(aTaskProc : TTaskProc) : IScheduledTask; + function Retry(aMaxRetries : Integer) : IScheduledTask; + function RetryForever : IScheduledTask; + function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload; + function WaitAndRetry(aWaitTimeArray : TArray) : IScheduledTask; overload; + function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload; + function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload; + function WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload; function CheckSchedule : Boolean; procedure DoExpire; function GetTaskName : string; @@ -243,13 +314,17 @@ TAnonymousThread = class(TInterfacedObject,IAnonymousThread) function IsFinished : Boolean; procedure Cancel; property Name : string read GetTaskName; + function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload; + function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload; end; TTask = class(TInterfacedObject,ITask) private fIdTask : Int64; fNumWorker : Integer; - fParamArray : TParamArray; + fNumRetries : Integer; + fParamList : TParamList; + fInitializeProc : TTaskProc; fExecuteProc : TTaskProc; fExceptProc : TTaskExceptionProc; fTerminateProc : TTaskProc; @@ -259,8 +334,18 @@ TTask = class(TInterfacedObject,ITask) fEnabled : Boolean; fExecuteWithSync : Boolean; fExceptionWithSync : Boolean; + fRetryProc : TTaskRetryProc; fTerminateWithSync : Boolean; - function GetParam(aIndex : Integer) : TFlexValue; + fFaultControl : TFaultControl; + fCustomFaultPolicy : Boolean; + fResult : TFlexValue; + function GetParam(aIndex : Integer) : TFlexValue; overload; + function GetParam(const aName : string) : TFlexValue; overload; + function GetParam2(aIndex : Integer) : PFlexValue; + procedure SetParam(aIndex : Integer; Value : TFlexValue); overload; + procedure SetParam(const aName : string; Value : TFlexValue); overload; + procedure SetParam(const aName : string; Value : TFlexValue; aOwned : Boolean); overload; + procedure DoInitialize; procedure DoExecute; procedure DoException(aException : Exception); procedure DoTerminate; @@ -268,10 +353,18 @@ TTask = class(TInterfacedObject,ITask) procedure SetNumWorker(Value : Integer); function GetIdTask : Int64; procedure SetIdTask(Value : Int64); + function GetResult : TFlexValue; + procedure SetResult(aValue : TFlexValue); protected + property FaultControl : TFaultControl read fFaultControl write fFaultControl; + property CustomFaultPolicy : Boolean read fCustomFaultPolicy write fCustomFaultPolicy; property ExecuteWithSync : Boolean read fExecuteWithSync write fExecuteWithSync; property TerminateWithSync : Boolean read fTerminateWithSync write fTerminateWithSync; property ExceptionWithSync : Boolean read fExceptionWithSync write fExceptionWithSync; + procedure DoRetry(aRaisedException : Exception; var vStopRetries : Boolean); + procedure SetFaultPolicy(aFaultPolicy : TFaultPolicy); + procedure SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double); overload; + procedure SetRetryPolicy(aWaitTimeMSArray : TArray); overload; public constructor Create(aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc); virtual; destructor Destroy; override; @@ -279,12 +372,29 @@ TTask = class(TInterfacedObject,ITask) property OwnedParams : Boolean read fOwnedParams write fOwnedParams; function IsEnabled : Boolean; function TaskStatus : TWorkTaskStatus; + function Done : Boolean; + function Failed : Boolean; + function NumRetries : Integer; + function MaxRetries : Integer; + function LastException : Exception; + function CircuitBreaked : Boolean; end; TWorkTask = class(TTask,IWorkTask) public + function OnInitialize(aTaskProc : TTaskProc) : IWorkTask; function OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; virtual; function OnTerminated(aTaskProc : TTaskProc) : IWorkTask; virtual; + function OnRetry(aTaskProc : TTaskRetryProc) : IWorkTask; virtual; + function Retry(aMaxRetries : Integer) : IWorkTask; + function RetryForever : IWorkTask; + function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload; + function WaitAndRetry(aWaitTimeArray : TArray) : IWorkTask; overload; + function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload; + function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IWorkTask; overload; + function WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IWorkTask; overload; + function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IWorkTask; overload; + function SetParameter(const aName : string; aValue : TFlexValue) : IWorkTask; overload; procedure Run; virtual; end; @@ -293,7 +403,6 @@ TWorkTask = class(TTask,IWorkTask) TScheduledTask = class(TTask,IScheduledTask) private fName : string; - fcurrentschedule : TPair; fExecutionTimes : Integer; fScheduleMode : TScheduleMode; fTimeInterval : Integer; @@ -314,9 +423,11 @@ TScheduledTask = class(TTask,IScheduledTask) property ExpireWithSync : Boolean read fExpireWithSync write fExpireWithSync; public property Name : string read fName write fName; + function OnInitialize(aTaskProc : TTaskProc) : IScheduledTask; property CurrentSchedule : TPair read GetCurrentSchedule; function OnException(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual; function OnException_Sync(aTaskProc : TTaskExceptionProc) : IScheduledTask; virtual; + function OnRetry(aTaskProc : TTaskRetryProc) : IScheduledTask; virtual; function OnTerminated(aTaskProc : TTaskProc) : IScheduledTask; virtual; function OnTerminated_Sync(aTaskProc : TTaskProc) : IScheduledTask; virtual; function OnExpired(aTaskProc : TTaskProc) : IScheduledTask; virtual; @@ -334,6 +445,15 @@ TScheduledTask = class(TTask,IScheduledTask) procedure RepeatEvery(aInterval : Integer; aTimeMeasure : TTimeMeasure; aRepeatTimes : Integer); overload; procedure RepeatEveryDay; procedure RepeatEveryWeek; + function Retry(aMaxRetries : Integer) : IScheduledTask; + function RetryForever : IScheduledTask; + function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload; + function WaitAndRetry(aWaitTimeArray : TArray) : IScheduledTask; overload; + function WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload; + function WaitAndRetryForever(aWaitTimeBetweenRetriesMS : Integer) : IScheduledTask; overload; + function WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor : Double) : IScheduledTask; overload; + function SetParameter(const aName : string; aValue : TFlexValue; aOwned : Boolean) : IScheduledTask; overload; + function SetParameter(const aName : string; aValue : TFlexValue) : IScheduledTask; overload; function IsFinished : Boolean; procedure Cancel; end; @@ -341,19 +461,34 @@ TScheduledTask = class(TTask,IScheduledTask) TWorkerStatus = (wsIdle, wsWorking, wsSuspended); TWorker = class(TThread) - private - fNumWorker : Integer; - fCurrentIdTask : Integer; + protected fStatus : TWorkerStatus; - fTaskQueue : TTaskQueue; + fCurrentTask : ITask; + fDefaultFaultPolicy : TFaultPolicy; procedure ExecuteTask; procedure TerminateTask; - protected - fCurrentTask : ITask; + public + constructor Create; + destructor Destroy; override; + property Status : TWorkerStatus read fStatus; + procedure SetFaultPolicy(aTask : TTask); + procedure Execute; override; + end; + + TSimpleWorker = class(TWorker) + public + constructor Create(aTask : ITask); + procedure Execute; override; + end; + + TQueueWorker = class(TWorker) + private + fCurrentIdTask : Integer; + fNumWorker : Integer; + fTaskQueue : TTaskQueue; public constructor Create(aNumWorker : Integer; aTaskQueue : TTaskQueue); property NumWorker : Integer read fNumWorker; - property Status : TWorkerStatus read fStatus; procedure Execute; override; end; @@ -367,6 +502,14 @@ TScheduledWorker = class(TWorker) TWorkerPool = TObjectList; + TRunTask = class + public + class function Execute(aTaskProc: TTaskProc): IWorkTask; overload; + class function Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload; + class function Execute_Sync(aTaskProc: TTaskProc): IWorkTask; overload; + class function Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; overload; + end; + TBackgroundTasks = class private fMaxQueue : Integer; @@ -419,12 +562,14 @@ TScheduledTasks = class fNumPushedTasks : Int64; fRemoveTaskAfterExpiration : Boolean; fIsStarted : Boolean; + fFaultPolicy : TFaultPolicy; public constructor Create; destructor Destroy; override; property NumPushedTasks : Int64 read fNumPushedTasks; property RemoveTaskAfterExpiration : Boolean read fRemoveTaskAfterExpiration write fRemoveTaskAfterExpiration; property IsStarted : Boolean read fIsStarted; + property FaultPolicy : TFaultPolicy read fFaultPolicy write fFaultPolicy; function AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload; function AddTask_Sync(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; overload; function AddTask(const aTaskName : string; aParamArray : array of const; aOwnedParams : Boolean; aTaskProc : TTaskProc) : IScheduledTask; overload; @@ -449,6 +594,7 @@ procedure TThreadedQueueCS.Clear; begin if TypeInfo(T) = TypeInfo(TObject) then PObject(@obj){$IFNDEF FPC}.DisposeOf;{$ELSE}.Free;{$ENDIF} end; + SetLength(FQueue,0); finally FQueueLock.Leave; @@ -479,6 +625,7 @@ destructor TThreadedQueueCS.Destroy; DoShutDown; FQueueLock.Free; FQueueCondVar.Free; + inherited; end; @@ -912,6 +1059,12 @@ class function TAnonymousThread.Execute_Sync(aProc: TProc): IAnonymousThread; Result := TAnonymousThread.Create(aProc,True); end; +function TAnonymousThread.OnException(aProc: TAnonExceptionProc): IAnonymousThread; +begin + Result := Self; + fThread.OnException(aProc); +end; + function TAnonymousThread.OnTerminate(aProc: TProc): IAnonymousThread; begin Result := Self; @@ -936,16 +1089,20 @@ constructor TTask.Create(aParamArray : array of const; aOwnedParams : Boolean; a i : Integer; begin fTaskStatus := TWorkTaskStatus.wtsPending; + fCustomFaultPolicy := False; + fNumRetries := 0; fExecuteWithSync := False; fTerminateWithSync := False; fExceptionWithSync := False; + fFaultControl := TFaultControl.Create; + fFaultControl.OnRetry := DoRetry; fOwnedParams := aOwnedParams; - SetLength(fParamArray,High(aParamArray)+1); + fParamList := TParamList.Create(True); for i := Low(aParamArray) to High(aParamArray) do begin - fParamArray[i].Create(aParamArray[i]); + fParamList.Add(TParamValue.Create(i.ToString,aParamArray[i],aOwnedParams)); {$IFDEF FPC} - fParamArray[i]._AddRef; + fParamList[i].Value._AddRef; {$ENDIF} end; fExecuteProc := aTaskProc; @@ -956,18 +1113,11 @@ destructor TTask.Destroy; var i : Integer; begin + fFaultControl.Free; //free passed params - if fOwnedParams then - begin - for i := Low(fParamArray) to High(fParamArray) do - begin - {$IFDEF FPC} - fParamArray[i]._Release; - {$ENDIF} - if (fParamArray[i].DataType = dtObject) and (fParamArray[i].AsObject <> nil) then fParamArray[i].AsObject.Free; - end; - end; - fParamArray := nil; + fParamList.Free; + if (not fResult.IsNullOrEmpty) and (fResult.IsObject) then fResult.AsObject.Free; + inherited; end; @@ -981,8 +1131,73 @@ procedure TTask.DoException(aException : Exception); procedure TTask.DoExecute; begin fTaskStatus := TWorkTaskStatus.wtsRunning; - if Assigned(fExecuteProc) then fExecuteProc(Self); - fTaskStatus := TWorkTaskStatus.wtsDone; + DoInitialize; + repeat + try + if Assigned(fExecuteProc) then fExecuteProc(Self); + fTaskStatus := TWorkTaskStatus.wtsDone; + fFaultControl.SuccessExecution; + except + on E : Exception do + begin + fTaskStatus := TWorkTaskStatus.wtsException; + {$IFNDEF FPC} + fFaultControl.FailedExecution(AcquireExceptionObject as Exception); + {$ELSE} + fFaultControl.FailedExecution(Exception(AcquireExceptionObject)); + {$ENDIF} + end; + end; + until not fFaultControl.NeedToRetry; +end; + +procedure TTask.DoInitialize; +begin + try + fFaultControl.Reset; + if Assigned(fInitializeProc) then fInitializeProc(Self); + except + on E : Exception do + begin + raise ETaskInitializationError.CreateFmt('Task initialization failed: %s',[e.Message]); + end; + end; +end; + +function TTask.Done: Boolean; +begin + Result := not fFaultControl.TaskFailed; +end; + +function TTask.Failed: Boolean; +begin + Result := fFaultControl.TaskFailed; +end; + +function TTask.CircuitBreaked: Boolean; +begin + Result := fFaultControl.CircuitBreaked; +end; + +function TTask.LastException: Exception; +begin + Result := fFaultControl.LastException; +end; + +function TTask.MaxRetries: Integer; +begin + Result := fFaultControl.MaxRetries; +end; + +function TTask.NumRetries: Integer; +begin + Result := fFaultControl.NumRetries; +end; + +procedure TTask.DoRetry(aRaisedException: Exception; var vStopRetries: Boolean); +begin + vStopRetries := False; + if Assigned(fRetryProc) then fRetryProc(Self,aRaisedException,vStopRetries); end; procedure TTask.DoTerminate; @@ -995,6 +1210,16 @@ function TTask.GetIdTask: Int64; Result := fIdTask; end; +procedure TTask.SetFaultPolicy(aFaultPolicy: TFaultPolicy); +begin + {$IFDEF FPC} + if not Assigned(fFaultControl) then fFaultControl := TFaultControl.Create; + {$ENDIF} + fFaultControl.MaxRetries := aFaultPolicy.MaxRetries; + fFaultControl.WaitTimeBetweenRetriesMS := aFaultPolicy.WaitTimeBetweenRetries; + fFaultControl.WaitTimeMultiplierFactor := aFaultPolicy.WaitTimeMultiplierFactor; +end; + procedure TTask.SetIdTask(Value : Int64); begin fIdTask := Value; @@ -1007,7 +1232,32 @@ function TTask.GetNumWorker: Integer; function TTask.GetParam(aIndex: Integer): TFlexValue; begin - Result := fParamArray[aIndex]; + Result := fParamList[aIndex].Value; +end; + +function TTask.GetParam(const aName: string): TFlexValue; +var + paramvalue : TParamValue; +begin + for paramvalue in fParamList do + begin + if CompareText(paramvalue.Name,aName) = 0 then + begin + Exit(paramvalue.Value) + end; + end; + //if not exists + raise ETaskParamError.CreateFmt('Task param "%s" not found!',[aName]); +end; + +function TTask.GetParam2(aIndex: Integer): PFlexValue; +begin + Result := @fParamList[aIndex].Value; +end; + +function TTask.GetResult: TFlexValue; +begin + Result := fResult; end; function TTask.IsEnabled: Boolean; @@ -1021,6 +1271,56 @@ procedure TTask.SetNumWorker(Value: Integer); fNumWorker := Value; end; +procedure TTask.SetParam(aIndex: Integer; Value: TFlexValue); +begin + if aIndex > fParamList.Count then raise ETaskParamError.CreateFmt('Task parameter index(%d) not found',[aIndex]); + fParamList[aIndex].Value := Value; +end; + +procedure TTask.SetParam(const aName: string; Value: TFlexValue; aOwned: Boolean); +var + paramvalue : TParamValue; +begin + //check if already exists parameter + for paramvalue in fParamList do + begin + if CompareText(paramvalue.Name,aName) = 0 then + begin + paramvalue.Value := Value; + Exit; + end; + end; + //if not exists, create one + fParamList.Add(TParamValue.Create(aName,Value,aOwned)); +end; + +procedure TTask.SetParam(const aName: string; Value: TFlexValue); +begin + SetParam(aName,Value,False); +end; + +procedure TTask.SetRetryPolicy(aMaxRetries, aWaitTimeBetweenRetriesMS : Integer; aWaitTimeMultiplierFactor: Double); +begin + fFaultControl.MaxRetries := aMaxRetries; + fFaultControl.WaitTimeBetweenRetriesMS := aWaitTimeBetweenRetriesMS; + fFaultControl.WaitTimeMultiplierFactor := aWaitTimeMultiplierFactor; + fCustomFaultPolicy := True; +end; + +procedure TTask.SetResult(aValue: TFlexValue); +begin + fResult := aValue; +end; + +procedure TTask.SetRetryPolicy(aWaitTimeMSArray: TArray); +begin + fFaultControl.MaxRetries := High(aWaitTimeMSArray) + 1; + fFaultControl.WaitTimeBetweenRetriesMS := 0; + fFaultControl.WaitTimeMultiplierFactor := 0; + fFaultControl.WaitTimeMSArray := aWaitTimeMSArray; + fCustomFaultPolicy := True; +end; + function TTask.TaskStatus: TWorkTaskStatus; begin Result := fTaskStatus; @@ -1034,6 +1334,18 @@ function TWorkTask.OnException(aTaskProc : TTaskExceptionProc) : IWorkTask; Result := Self; end; +function TWorkTask.OnInitialize(aTaskProc: TTaskProc): IWorkTask; +begin + fInitializeProc := aTaskProc; + Result := Self; +end; + +function TWorkTask.OnRetry(aTaskProc: TTaskRetryProc): IWorkTask; +begin + fRetryProc := aTaskProc; + Result := Self; +end; + function TWorkTask.OnTerminated(aTaskProc: TTaskProc): IWorkTask; begin fTerminateProc := aTaskProc; @@ -1045,6 +1357,60 @@ procedure TWorkTask.Run; fEnabled := True; end; +function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue): IWorkTask; +begin + Result := Self; + SetParam(aName,aValue); +end; + +function TWorkTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IWorkTask; +begin + Result := Self; + SetParam(aName,aValue,aOwned); +end; + +function TWorkTask.Retry(aMaxRetries: Integer): IWorkTask; +begin + Result := Self; + SetRetryPolicy(aMaxRetries,0,0); +end; + +function TWorkTask.RetryForever: IWorkTask; +begin + Result := Self; + SetRetryPolicy(-1,0,0); +end; + +function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IWorkTask; +begin + Result := Self; + SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,0); +end; + +function TWorkTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IWorkTask; +begin + Result := Self; + SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor); +end; + +function TWorkTask.WaitAndRetry(aWaitTimeArray: TArray): IWorkTask; +begin + Result := Self; + SetRetryPolicy(aWaitTimeArray); +end; + +function TWorkTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IWorkTask; +begin + Result := Self; + SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,0); +end; + +function TWorkTask.WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IWorkTask; +begin + Result := Self; + SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor); +end; + { TBackgroundTasks } procedure TBackgroundTasks.CancelAll; @@ -1064,6 +1430,10 @@ constructor TBackgroundTasks.Create(aConcurrentWorkers : Integer; aMaxQueue : In destructor TBackgroundTasks.Destroy; begin + CancelAll; + fTaskQueue.DoShutDown; + //while fTaskQueue.QueueSize > 0 do Sleep(0); + if Assigned(fWorkerPool) then fWorkerPool.Free; if Assigned(fTaskQueue) then fTaskQueue.Free; inherited; @@ -1113,7 +1483,7 @@ procedure TBackgroundTasks.Start; fWorkerPool := TWorkerPool.Create(True); for i := 1 to fConcurrentWorkers do begin - worker := TWorker.Create(i,fTaskQueue); + worker := TQueueWorker.Create(i,fTaskQueue); fWorkerPool.Add(worker); worker.Start; end; @@ -1121,15 +1491,30 @@ procedure TBackgroundTasks.Start; { TWorker } -constructor TWorker.Create(aNumWorker : Integer; aTaskQueue : TTaskQueue); +constructor TWorker.Create; begin inherited Create(True); - fNumWorker := aNumWorker; + fDefaultFaultPolicy := TFaultPolicy.Create; fStatus := TWorkerStatus.wsSuspended; - fTaskQueue := aTaskQueue; FreeOnTerminate := False; end; +destructor TWorker.Destroy; +begin + if Assigned(fDefaultFaultPolicy) then fDefaultFaultPolicy.Free; + inherited; +end; + +procedure TWorker.SetFaultPolicy(aTask: TTask); +begin + if not aTask.CustomFaultPolicy then aTask.SetFaultPolicy(fDefaultFaultPolicy); +end; + +procedure TWorker.Execute; +begin + +end; + procedure TWorker.ExecuteTask; begin fCurrentTask.DoExecute; @@ -1140,7 +1525,53 @@ procedure TWorker.TerminateTask; fCurrentTask.DoTerminate; end; -procedure TWorker.Execute; +{ TSimpleWorker } + +constructor TSimpleWorker.Create(aTask : ITask); +begin + inherited Create; + fCurrentTask := aTask; + FreeOnTerminate := True; +end; + +procedure TSimpleWorker.Execute; +begin + fStatus := TWorkerStatus.wsIdle; + while not Terminated do + begin + if (fCurrentTask <> nil) and (fCurrentTask.IsEnabled) then + try + fStatus := TWorkerStatus.wsWorking; + try + if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask) + else fCurrentTask.DoExecute; + except + on E : Exception do + begin + if fCurrentTask <> nil then fCurrentTask.DoException(E) + else raise ETaskExecutionError.Create(e.Message); + end; + end; + finally + if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask) + else fCurrentTask.DoTerminate; + fStatus := TWorkerStatus.wsIdle; + Terminate; + end; + end; + fStatus := TWorkerStatus.wsSuspended +end; + +{ TQueueWorker } + +constructor TQueueWorker.Create(aNumWorker: Integer; aTaskQueue: TTaskQueue); +begin + inherited Create; + fNumWorker := aNumWorker; + fTaskQueue := aTaskQueue; +end; + +procedure TQueueWorker.Execute; begin fStatus := TWorkerStatus.wsIdle; while not Terminated do @@ -1151,13 +1582,14 @@ procedure TWorker.Execute; fStatus := TWorkerStatus.wsWorking; try fCurrentIdTask := fCurrentTask.GetIdTask; + SetFaultPolicy(TTask(fCurrentTask)); if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask) else fCurrentTask.DoExecute; except on E : Exception do begin if fCurrentTask <> nil then fCurrentTask.DoException(E) - else raise Exception.Create(e.Message); + else raise ETaskExecutionError.Create(e.Message); end; end; finally @@ -1169,6 +1601,57 @@ procedure TWorker.Execute; fStatus := TWorkerStatus.wsSuspended end; +{ TScheduledWorker } + +constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask); +begin + inherited Create; + {$IFNDEF DELPHILINUX} + NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask); + {$ENDIF} + FreeOnTerminate := True; + fCurrentTask := aScheduledTask; +end; + +procedure TScheduledWorker.Execute; +begin + fStatus := TWorkerStatus.wsIdle; + if Assigned(fCurrentTask) then + begin + try + fStatus := TWorkerStatus.wsWorking; + try + SetFaultPolicy(TTask(fCurrentTask)); + if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask) + else fCurrentTask.DoExecute; + fStatus := TWorkerStatus.wsIdle; + except + on E : Exception do + begin + if fCurrentTask <> nil then fCurrentTask.DoException(E) + else raise ETaskExecutionError.Create(e.Message); + end; + end; + finally + if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask) + else fCurrentTask.DoTerminate; + //check if expired + if (fCurrentTask as IScheduledTask).IsFinished then + begin + if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask) + else (fCurrentTask as IScheduledTask).DoExpire; + end; + end; + end; + fCurrentTask := nil; + fStatus := TWorkerStatus.wsSuspended; +end; + +procedure TScheduledWorker.ExpireTask; +begin + (fCurrentTask as IScheduledTask).DoExpire; +end; + { TScheduledTasks } function TScheduledTasks.AddTask(const aTaskName : string; aTaskProc : TTaskProc) : IScheduledTask; @@ -1203,6 +1686,7 @@ constructor TScheduledTasks.Create; begin fNumPushedTasks := 0; fIsStarted := False; + fFaultPolicy := TFaultPolicy.Create; fTaskList := TScheduledTaskList.Create; end; @@ -1215,6 +1699,7 @@ destructor TScheduledTasks.Destroy; fScheduler.Free; end; if Assigned(fTaskList) then fTaskList.Free; + if Assigned(fFaultPolicy) then fFaultPolicy.Free; inherited; end; @@ -1225,7 +1710,7 @@ function TScheduledTasks.GetTask(aIdTask: Int64): IScheduledTask; function TScheduledTasks.GetTask(const aTaskName: string): IScheduledTask; begin - if not Assigned(fScheduler) then raise Exception.Create('Scheduler must be started to get a task!'); + if not Assigned(fScheduler) then raise ETaskSchedulerError.Create('Scheduler must be started to get a task!'); Result := fScheduler.Get(aTaskName); end; @@ -1249,6 +1734,18 @@ procedure TScheduledTasks.Stop; { TScheduledTask } +function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue): IScheduledTask; +begin + Result := Self; + SetParam(aName,aValue); +end; + +function TScheduledTask.SetParameter(const aName: string; aValue: TFlexValue; aOwned: Boolean): IScheduledTask; +begin + Result := Self; + SetParam(aName,aValue); +end; + function TScheduledTask.StartAt(aStartDate: TDateTime) : IScheduledTask; begin Result := Self; @@ -1312,6 +1809,48 @@ function TScheduledTask.StartTomorrowAt(aHour, aMinute: Word; aSecond : Word = 0 fNextExecution := fStartDate; end; +function TScheduledTask.Retry(aMaxRetries: Integer): IScheduledTask; +begin + Result := Self; + SetRetryPolicy(aMaxRetries,0,0); +end; + +function TScheduledTask.RetryForever: IScheduledTask; +begin + Result := Self; + SetRetryPolicy(-1,0,0); +end; + +function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer): IScheduledTask; +begin + Result := Self; + SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,0); +end; + +function TScheduledTask.WaitAndRetry(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor : Double): IScheduledTask; +begin + Result := Self; + SetRetryPolicy(aMaxRetries,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor); +end; + +function TScheduledTask.WaitAndRetry(aWaitTimeArray: TArray): IScheduledTask; +begin + Result := Self; + SetRetryPolicy(aWaitTimeArray); +end; + +function TScheduledTask.WaitAndRetryForever(aWaitTimeBetweenRetriesMS: Integer): IScheduledTask; +begin + Result := Self; + SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,0); +end; + +function TScheduledTask.WaitAndRetryForever(aMaxRetries, aWaitTimeBetweenRetriesMS: Integer; aWaitTimeMultiplierFactor: Double): IScheduledTask; +begin + Result := Self; + SetRetryPolicy(-1,aWaitTimeBetweenRetriesMS,aWaitTimeMultiplierFactor); +end; + procedure TScheduledTask.RepeatEvery(aInterval: Integer; aTimeMeasure: TTimeMeasure); begin if fStartDate = 0.0 then ClearSchedule; @@ -1464,6 +2003,12 @@ function TScheduledTask.OnException_Sync(aTaskProc: TTaskExceptionProc): ISchedu TTask(Result).ExceptionWithSync := True; end; +function TScheduledTask.OnRetry(aTaskProc: TTaskRetryProc): IScheduledTask; +begin + fRetryProc := aTaskProc; + Result := Self; +end; + function TScheduledTask.OnExpired(aTaskProc: TTaskProc): IScheduledTask; begin fExpiredProc := aTaskProc; @@ -1476,6 +2021,12 @@ function TScheduledTask.OnExpired_Sync(aTaskProc: TTaskProc): IScheduledTask; TScheduledTask(Result).ExpireWithSync := True; end; +function TScheduledTask.OnInitialize(aTaskProc: TTaskProc): IScheduledTask; +begin + fInitializeProc := aTaskProc; + Result := Self; +end; + function TScheduledTask.OnTerminated(aTaskProc: TTaskProc): IScheduledTask; begin fTerminateProc := aTaskProc; @@ -1488,56 +2039,6 @@ function TScheduledTask.OnTerminated_Sync(aTaskProc: TTaskProc): IScheduledTask; TTask(Result).TerminateWithSync := True; end; -{ TScheduledWorker } - -constructor TScheduledWorker.Create(aNumWorker : Integer; aScheduledTask: IScheduledTask); -begin - inherited Create(aNumWorker,nil); - {$IFNDEF DELPHILINUX} - NameThreadForDebugging(aScheduledTask.Name,aScheduledTask.IdTask); - {$ENDIF} - FreeOnTerminate := True; - fCurrentTask := aScheduledTask; -end; - -procedure TScheduledWorker.Execute; -begin - fStatus := TWorkerStatus.wsIdle; - if Assigned(fCurrentTask) then - begin - try - fStatus := TWorkerStatus.wsWorking; - try - if TTask(fCurrentTask).ExecuteWithSync then Synchronize(ExecuteTask) - else fCurrentTask.DoExecute; - fStatus := TWorkerStatus.wsIdle; - except - on E : Exception do - begin - if fCurrentTask <> nil then fCurrentTask.DoException(E) - else raise Exception.Create(e.Message); - end; - end; - finally - if TTask(fCurrentTask).TerminateWithSync then Synchronize(TerminateTask) - else fCurrentTask.DoTerminate; - //check if expired - if (fCurrentTask as IScheduledTask).IsFinished then - begin - if TScheduledTask(fCurrentTask).ExpireWithSync then Synchronize(ExpireTask) - else (fCurrentTask as IScheduledTask).DoExpire; - end; - end; - end; - fCurrentTask := nil; - fStatus := TWorkerStatus.wsSuspended; -end; - -procedure TScheduledWorker.ExpireTask; -begin - (fCurrentTask as IScheduledTask).DoExpire; -end; - { TScheduler } constructor TScheduler.Create(aTaskList : TScheduledTaskList); @@ -1650,7 +2151,19 @@ constructor TAdvThread.Create(aProc : TProc; aSynchronize : Boolean); procedure TAdvThread.DoExecute; begin - if Assigned(fExecuteProc) then fExecuteProc; + try + if Assigned(fExecuteProc) then fExecuteProc; + except + on E : Exception do + begin + {$IFNDEF FPC} + if Assigned(fExceptionProc) then fExceptionProc(AcquireExceptionObject as Exception) + {$ELSE} + if Assigned(fExceptionProc) then fExceptionProc(Exception(AcquireExceptionObject)) + {$ENDIF} + else raise e; + end; + end; end; procedure TAdvThread.CallToTerminate; @@ -1671,10 +2184,84 @@ procedure TAdvThread.Execute; end; +procedure TAdvThread.OnException(aProc: TAnonExceptionProc); +begin + fExceptionProc := aProc; +end; + procedure TAdvThread.OnTerminate(aProc: TProc; aSynchronize: Boolean); begin fTerminateWithSync := aSynchronize; fTerminateProc := aProc; end; +{ TRunTask } + +class function TRunTask.Execute(aTaskProc: TTaskProc): IWorkTask; +begin + Result := Execute([],False,aTaskProc); +end; + +class function TRunTask.Execute_Sync(aTaskProc: TTaskProc): IWorkTask; +begin + Result := Execute_Sync([],False,aTaskProc); +end; + +class function TRunTask.Execute(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; +var + task : TWorkTask; + worker : TSimpleWorker; +begin + task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc); + task.ExecuteWithSync := False; + Result := task; + worker := TSimpleWorker.Create(task); + worker.Start; +end; + +class function TRunTask.Execute_Sync(aParamArray: array of const; aOwnedParams: Boolean; aTaskProc: TTaskProc): IWorkTask; +var + task : TWorkTask; + worker : TSimpleWorker; +begin + task := TWorkTask.Create(aParamArray,aOwnedParams,aTaskProc); + task.ExecuteWithSync := True; + Result := task; + worker := TSimpleWorker.Create(task); + worker.Start; +end; + +{ TParamValue } + +constructor TParamValue.Create(const aName: string; aValue: TFlexValue; aOwnedValue: Boolean); +begin + inherited Create; + fName := aName; + fValue := aValue; + fOwned := aOwnedValue; +end; + +constructor TParamValue.Create(const aName: string; aValue: TVarRec; aOwnedValue: Boolean); +begin + inherited Create; + fName := aName; + fValue := aValue; + fOwned := aOwnedValue; +end; + +constructor TParamValue.Create; +begin + fName := ''; + fOwned := False; +end; + +destructor TParamValue.Destroy; +begin + {$IFDEF FPC} + fValue._Release; + {$ENDIF} + if (fOwned) and (fValue.IsObject) and (fValue.AsObject <> nil) then fValue.AsObject.Free; + inherited; +end; + end.