MODULE ReleaseThreadPool;
IMPORT KernelLog;
CONST
Ready = 0;
GotWork = 5;
Working = 8;
Finished = 20;
Error = 70;
Exception = 80;
Terminating = 99;
Terminated = 100;
MaxNofDependencies = 64;
NoMoreDependencies* = -1;
MoreDependencies = -2;
Stats = TRUE;
TYPE
Dependencies* = ARRAY MaxNofDependencies + 1 OF LONGINT;
JobProcedure* = PROCEDURE {DELEGATE} (parameters : ANY; VAR error : BOOLEAN);
Job = OBJECT
VAR
proc : JobProcedure;
parameters : ANY;
uid : LONGINT;
priority : LONGINT;
completed : BOOLEAN;
dependencies : Dependencies;
previous, next : Job;
hashPrevious, hashNext : Job;
PROCEDURE IsCompleted() : BOOLEAN;
BEGIN {EXCLUSIVE}
RETURN completed;
END IsCompleted;
PROCEDURE &Init;
VAR i : LONGINT;
BEGIN
proc := NIL;
parameters := NIL;
priority := 0;
completed := FALSE;
FOR i := 0 TO MaxNofDependencies-1 DO
dependencies[i] := NoMoreDependencies;
END;
previous := NIL; next := NIL;
hashPrevious := NIL; hashNext := NIL;
END Init;
END Job;
TYPE
Worker = OBJECT
VAR
job : Job;
state : LONGINT;
threadPool : ThreadPool;
previous, next : Worker;
PROCEDURE SetJob(job: Job);
BEGIN {EXCLUSIVE}
ASSERT((job # NIL) & (job.proc # NIL));
ASSERT((state = Ready) OR (state = Finished));
SELF.job := job;
state := GotWork;
END SetJob;
PROCEDURE SetState(state : LONGINT);
BEGIN {EXCLUSIVE}
IF (SELF.state < Terminating) OR ((SELF.state = Terminating) & (state = Terminated)) THEN
SELF.state := state;
END;
END SetState;
PROCEDURE DoJob;
VAR trap, error : BOOLEAN;
BEGIN
trap := FALSE; error := FALSE;
job.proc(job.parameters, error);
FINALLY
IF trap THEN SetState(Exception);
ELSIF error THEN SetState(Error);
ELSE
SetState(Finished);
END;
END DoJob;
PROCEDURE Terminate;
BEGIN
SetState(Terminating);
BEGIN {EXCLUSIVE} AWAIT(state = Terminated); END;
END Terminate;
PROCEDURE &Init(threadPool : ThreadPool);
BEGIN
ASSERT(threadPool # NIL);
SELF.threadPool := threadPool;
state := Ready;
previous := NIL; next := NIL;
END Init;
BEGIN {ACTIVE}
LOOP
BEGIN {EXCLUSIVE}
AWAIT((state = GotWork) OR (state = Terminating));
IF (state = GotWork) THEN
state := Working;
ELSE
ASSERT(state = Terminating);
EXIT;
END;
END;
ASSERT(state = Working);
DoJob;
threadPool.JobDone(SELF, SELF.job);
END;
SetState(Terminated);
END Worker;
TYPE
WorkerList = OBJECT
VAR
head : Worker;
nofWorkers : LONGINT;
PROCEDURE GetNofWorkers() : LONGINT;
BEGIN {EXCLUSIVE}
RETURN nofWorkers;
END GetNofWorkers;
PROCEDURE Get() : Worker;
VAR worker : Worker;
BEGIN {EXCLUSIVE}
worker := head;
IF (worker # NIL) THEN
ASSERT(worker.previous = NIL);
head := head.next;
IF (head # NIL) THEN head.previous := NIL; END;
worker.next := NIL;
DEC(nofWorkers);
ASSERT(nofWorkers >= 0);
END;
ASSERT((worker = NIL) OR ((worker.previous = NIL) & (worker.next = NIL)));
RETURN worker;
END Get;
PROCEDURE Add(worker : Worker);
BEGIN {EXCLUSIVE}
ASSERT((worker # NIL) & (worker.previous = NIL) & (worker.next = NIL));
IF (head = NIL) THEN
head := worker;
ELSE
worker.next := head;
IF (worker.next # NIL) THEN worker.next.previous := worker; END;
head := worker;
END;
INC(nofWorkers);
ASSERT((head = worker) & (head.previous = NIL));
END Add;
PROCEDURE &Init;
BEGIN
head := NIL;
nofWorkers := 0;
END Init;
END WorkerList;
TYPE
HashTable = OBJECT
VAR
table : POINTER TO ARRAY OF Job;
size : LONGINT;
PROCEDURE Find(uid : LONGINT) : Job;
VAR hashValue : LONGINT; job : Job;
BEGIN
IF Stats THEN INC(Nlookups); END;
hashValue := uid MOD size;
job := table[hashValue].hashNext;
IF Stats & (job # NIL) & (job.hashNext # NIL) THEN INC(NlookupCollisions); END;
WHILE (job # NIL) & (job.uid # uid) DO job := job.hashNext; END;
IF Stats & (job = NIL) THEN INC(NlookupNotFound); END;
RETURN job;
END Find;
PROCEDURE Add(job : Job);
VAR hashValue : LONGINT;
BEGIN {EXCLUSIVE}
ASSERT((job # NIL) & (job.proc # NIL) & (job.hashPrevious = NIL) & (job.hashNext = NIL));
hashValue := job.uid MOD size;
job.hashPrevious := table[hashValue];
job.hashNext := table[hashValue].hashNext;
table[hashValue].hashNext := job;
IF (job.hashNext # NIL) THEN job.hashNext.hashPrevious := job; END;
ASSERT(job.hashPrevious # NIL);
END Add;
PROCEDURE Remove(job : Job);
BEGIN {EXCLUSIVE}
ASSERT((job # NIL) & (job.hashPrevious # NIL));
job.hashPrevious.hashNext := job.hashNext;
IF (job.hashNext # NIL) THEN job.hashNext.hashPrevious := job.hashPrevious; END;
job.hashPrevious := NIL; job.hashNext := NIL;
ASSERT((job.hashPrevious = NIL) & (job.hashNext = NIL));
END Remove;
PROCEDURE &Init(size : LONGINT);
VAR i : LONGINT;
BEGIN
ASSERT(size > 0);
SELF.size := size;
NEW(table, size);
FOR i := 0 TO size-1 DO
NEW(table[i]);
END;
END Init;
END HashTable;
TYPE
SortedJobList = OBJECT
VAR
head : Job;
nofJobs : LONGINT;
PROCEDURE Get() : Job;
VAR job : Job;
BEGIN {EXCLUSIVE}
job := head.next;
IF (job # NIL) THEN
head.next := job.next;
job.previous := NIL; job.next := NIL;
IF (head.next # NIL) THEN head.next.previous := head; END;
DEC(nofJobs);
END;
ASSERT(job # head);
ASSERT((job = NIL) OR ((job.previous = NIL) & (job.next = NIL)));
RETURN job;
END Get;
PROCEDURE Add(job : Job);
VAR previous : Job;
BEGIN {EXCLUSIVE}
ASSERT((job # NIL) & (job.previous = NIL) & (job.next = NIL));
previous := head;
WHILE (previous.next # NIL) & (previous.next.priority > job.priority) DO previous := previous.next; END;
job.next := previous.next;
previous.next := job;
job.previous := previous;
IF (job.next # NIL) THEN job.next.previous := job; END;
INC(nofJobs);
ASSERT(job.previous # NIL);
END Add;
PROCEDURE Remove(job : Job);
BEGIN {EXCLUSIVE}
ASSERT(job # head);
ASSERT((job # NIL) &(job.previous # NIL));
job.previous.next := job.next;
IF (job.next # NIL) THEN job.next.previous := job.previous; END;
job.previous := NIL; job.next := NIL;
DEC(nofJobs);
ASSERT((job.previous = NIL) & (job.next = NIL));
END Remove;
PROCEDURE &Init;
BEGIN
NEW(head);
nofJobs := 0;
END Init;
END SortedJobList;
TYPE
JobManager = OBJECT
VAR
hashTable : HashTable;
readyList : SortedJobList;
waitingList : SortedJobList;
PROCEDURE &Init(size : LONGINT);
BEGIN
NEW(hashTable, size);
NEW(readyList);
NEW(waitingList);
END Init;
PROCEDURE GetReadyJob() : Job;
BEGIN
RETURN readyList.Get();
END GetReadyJob;
PROCEDURE Add(job : Job);
BEGIN
ASSERT((job # NIL) & (job.proc # NIL));
hashTable.Add(job);
IF IsReady(job) THEN
readyList.Add(job);
ELSE
waitingList.Add(job);
END;
END Add;
PROCEDURE Remove(job : Job);
BEGIN
ASSERT(job # NIL);
hashTable.Remove(job);
ASSERT((job # NIL) & (job.previous = NIL) & (job.next = NIL) & (job.hashPrevious = NIL) & (job.hashNext = NIL));
END Remove;
PROCEDURE ReCheckDependencies;
VAR job, next : Job;
BEGIN {EXCLUSIVE}
IF Stats THEN INC(NdependencyChecks); END;
job := waitingList.head.next;
WHILE (job # NIL) DO
next := job.next;
IF IsReady(job) THEN
IF Stats THEN INC(NdependenciesResolved); END;
waitingList.Remove(job);
readyList.Add(job);
ELSE
IF Stats THEN INC(NdependenciesPersist); END;
END;
job := next;
END;
END ReCheckDependencies;
PROCEDURE IsReady(job : Job) : BOOLEAN;
VAR otherJob : Job; i : LONGINT;
BEGIN
ASSERT(job # NIL);
i := 0;
WHILE (job.dependencies[i] # NoMoreDependencies) DO
IF job.dependencies[i] # MoreDependencies THEN
otherJob := hashTable.Find(job.dependencies[i]);
IF (otherJob # NIL) & ~otherJob.IsCompleted() THEN
RETURN FALSE;
ELSE
job.dependencies[i] := MoreDependencies;
END;
END;
INC(i);
END;
RETURN TRUE;
END IsReady;
END JobManager;
TYPE
JobPool = OBJECT
VAR
head : Job;
nextUid : LONGINT;
PROCEDURE Get() : Job;
VAR job : Job;
BEGIN
BEGIN {EXCLUSIVE}
job := head.next;
IF (job # NIL) THEN
head.next := job.next;
IF Stats THEN INC(NjobPoolReused); END;
END;
END;
IF (job = NIL) THEN
NEW(job);
job.uid := GetUID();
IF Stats THEN INC(NjobPoolCreated); END;
ELSE
job.next := NIL;
END;
ASSERT(job # head);
ASSERT((job # NIL) & (job.previous = NIL) & (job.next = NIL) & (job.hashPrevious = NIL) & (job.hashNext = NIL));
RETURN job;
END Get;
PROCEDURE Recycle(job : Job);
BEGIN
ASSERT((job # NIL) & (job.previous = NIL) & (job.next = NIL) & (job.hashPrevious = NIL) & (job.hashNext = NIL));
job.Init;
job.uid := GetUID();
BEGIN {EXCLUSIVE}
job.next := head.next;
head.next := job;
END;
IF Stats THEN INC(NjobPoolRecycled); END;
END Recycle;
PROCEDURE GetUID() : LONGINT;
BEGIN {EXCLUSIVE}
INC(nextUid);
ASSERT(nextUid > 0);
RETURN nextUid;
END GetUID;
PROCEDURE &Init;
BEGIN
NEW(head);
nextUid := 0;
END Init;
END JobPool;
TYPE
ThreadPool* = OBJECT
VAR
readyList : WorkerList;
jobPool : JobPool;
jobManager : JobManager;
workers : POINTER TO ARRAY OF Worker;
hadError : BOOLEAN;
jobsOnTheFly : LONGINT;
doScheduling : BOOLEAN;
alive, dead : BOOLEAN;
PROCEDURE &Init*(nofWorkers : LONGINT);
VAR i : LONGINT;
BEGIN
ASSERT(nofWorkers > 0);
NEW(readyList);
NEW(jobPool);
NEW(jobManager, 512);
NEW(workers, nofWorkers);
FOR i := 0 TO nofWorkers-1 DO
NEW(workers[i], SELF);
readyList.Add(workers[i]);
END;
hadError := FALSE;
jobsOnTheFly := 0;
doScheduling := FALSE;
alive := TRUE; dead := FALSE;
END Init;
PROCEDURE CreateJob*(proc : JobProcedure; parameters : ANY; priority : LONGINT; CONST dependencies : Dependencies) : LONGINT;
VAR job : Job;
BEGIN
ASSERT((proc # NIL) & (priority >= 0));
job := jobPool.Get();
job.proc := proc;
job.parameters := parameters;
job.priority := priority;
job.dependencies := dependencies;
BEGIN {EXCLUSIVE} INC(jobsOnTheFly); END;
jobManager.Add(job);
BEGIN {EXCLUSIVE} doScheduling := TRUE; END;
IF Stats THEN INC(NjobsCreated); END;
RETURN job.uid;
END CreateJob;
PROCEDURE AwaitAllDone*;
BEGIN {EXCLUSIVE}
AWAIT((jobsOnTheFly = 0) OR (hadError));
END AwaitAllDone;
PROCEDURE JobDone(worker : Worker; job : Job);
VAR newJob : Job;
BEGIN
ASSERT((worker # NIL) & (job # NIL));
IF Stats THEN INC(NjobsDone); END;
jobManager.Remove(job);
jobPool.Recycle(job);
BEGIN {EXCLUSIVE} DEC(jobsOnTheFly); END;
IF (worker.state = Exception) THEN KernelLog.String("EXCEPTION"); KernelLog.Ln; END;
IF ~hadError & (worker.state # Error) & (worker.state # Exception) THEN
IF (worker.state >= Terminating) THEN KernelLog.String("TERMINTATE"); KernelLog.Ln; RETURN; END;
ASSERT(worker.state = Finished);
jobManager.ReCheckDependencies;
newJob := jobManager.GetReadyJob();
IF (newJob # NIL) THEN
worker.SetJob(newJob);
IF Stats THEN INC(NjobHandoverSucceeded); END;
ELSE
readyList.Add(worker);
IF Stats THEN INC(NjobHandoverFailed); END;
END;
IF (readyList.GetNofWorkers() > 0) THEN
BEGIN {EXCLUSIVE} doScheduling := TRUE; END;
END;
ELSE
KernelLog.String("Threadpool had error."); KernelLog.Ln;
BEGIN {EXCLUSIVE} hadError := TRUE; END;
IF Stats THEN INC(NjobErrors); END;
END;
END JobDone;
PROCEDURE DoScheduling;
VAR worker : Worker; job : Job;
BEGIN
REPEAT
worker := NIL;
IF (readyList.GetNofWorkers() > 0) THEN
worker := readyList.Get();
IF (worker # NIL) THEN
job := jobManager.GetReadyJob();
IF (job # NIL) THEN
IF Stats THEN INC(NjobsScheduled); END;
worker.SetJob(job);
ELSE
readyList.Add(worker);
END;
END;
END;
UNTIL (worker = NIL) OR (job = NIL);
END DoScheduling;
PROCEDURE Close*;
VAR i : LONGINT;
BEGIN
BEGIN {EXCLUSIVE} alive := FALSE; END;
FOR i := 0 TO LEN(workers)-1 DO
workers[i].Terminate;
END;
BEGIN {EXCLUSIVE} AWAIT(dead); END;
END Close;
BEGIN {ACTIVE}
WHILE alive DO
BEGIN {EXCLUSIVE} AWAIT(doScheduling OR (alive = FALSE));
doScheduling := FALSE;
END;
IF hadError THEN alive := FALSE; END;
IF alive THEN
IF Stats THEN INC(Nscheduling); END;
DoScheduling;
END;
END;
BEGIN {EXCLUSIVE} dead := TRUE; END;
END ThreadPool;
VAR
NjobsCreated-, NjobsDone-, NjobErrors-,
NjobHandoverSucceeded-, NjobHandoverFailed-,
NjobPoolCreated-, NjobPoolRecycled-, NjobPoolReused- ,
NdependencyChecks-, NdependenciesResolved-, NdependenciesPersist-,
Nscheduling-, NjobsScheduled-,
Nlookups-, NlookupCollisions-, NlookupNotFound- : LONGINT;
PROCEDURE ClearStats*;
BEGIN
NjobsCreated := 0; NjobsDone := 0; NjobErrors := 0;
NjobHandoverSucceeded := 0; NjobHandoverFailed := 0;
NjobPoolCreated := 0; NjobPoolRecycled := 0; NjobPoolReused := 0;
NdependencyChecks := 0; NdependenciesResolved := 0; NdependenciesPersist := 0;
Nscheduling := 0; NjobsScheduled := 0;
Nlookups := 0; NlookupCollisions := 0; NlookupNotFound := 0;
END ClearStats;
BEGIN
ClearStats;
END ReleaseThreadPool.
SystemTools.Free ReleaseThreadPool ~
ReleaseThreadPool.ClearStats ~
SystemTools.DoCommands
WMPerfMonPluginModVars.Install ReleaseThreadPool
ReleaseThreadPool.NjobsCreated ReleaseThreadPool.NjobsDone ReleaseThreadPool.NjobErrors
ReleaseThreadPool.NjobHandoverSucceeded ReleaseThreadPool.NjobHandoverFailed
ReleaseThreadPool.Nscheduling ReleaseThreadPool.NjobsScheduled
ReleaseThreadPool.Nlookups ReleaseThreadPool.NlookupCollisions ReleaseThreadPool.NlookupNotFound
~
WMPerfMonPluginModVars.Install JobPool
ReleaseThreadPool.NjobPoolCreated ReleaseThreadPool.NjobPoolRecycled ReleaseThreadPool.NjobPoolReused
ReleaseThreadPool.NdependencyChecks ReleaseThreadPool.NdependenciesResolved ReleaseThreadPool.NdependenciesPersist
~
~~