Figure
10-3 shows the class derivation hierarchy for the classes that implement the dynamic scheduling of Kahn process networks. The class ThreadList
provides mechanisms for terminating groups of threads. This class is used by PNScheduler
to create threads for each node in the program graph. The class SyncDataFlowProcess
implements the threads for the nodes.
The class ThreadList
implements a container class for manipulating groups of threads. It has two public methods.
virtual void add(PtThread*);
This method adds a PtThread
object to the list.
virtual ~ThreadScheduler();
This method terminates and deletes all threads in the list.
The class PNScheduler
controls the execution of a process network. Three data members support synchronization between the scheduler and the processes.
ThreadList* threads;
A container for the threads managed by the scheduler.
PNMonitor* monitor;
A monitor to guard the scheduler's condition variable.
PNCondition* start;
A condition variable for synchronizing with threads.
int iteration;
A counter for regulating the execution of the processes.
The createThreads
method, shown below, creates one process for each node in the program graph. A SyncDataFlowProcess
is created for each DataFlowStar
and added to the ThreadList
container.
// Create threads (dataflow processes).
void PNScheduler::createThreads()
{
if (! galaxy()) return;
GalStarIter nextStar(*galaxy());
DataFlowStar* star;
LOG_NEW; threads = new ThreadList;
// Create Threads for all the Stars.
while((star = (DataFlowStar*)nextStar++) != NULL)
{
LOG_NEW; SyncDataFlowProcess* p
= new SyncDataFlowProcess(*star,*start,iteration);
threads->add(p);
p->initialize();
}
}
It is often desirable to have a partial execution of a process network. The class SyncDataFlowProcess
, which is derived from DataFlowProcess
, supports this by synchronizing the execution of a thread with the iteration
counter that belongs to the PNScheduler
. The run
methods of PNScheduler
and SyncDataFlowProcess
implement this synchronization. The PNScheduler
run
method, shown below, increments the iteration
count to give every process an opportunity to run. The SyncDataFlowProcess
run
method, shown below, ensures that the number of invocations of the star's run
method does not exceed the iteration
count.
// Run (or continue) the simulation.
int PNScheduler::run()
{
if (SimControl::haltRequested() || ! galaxy())
{
Error::abortRun("cannot continue");
return FALSE;
}
while((currentTime < stopTime) && !SimControl::haltRequested())
{
// Notify all threads to continue.
{
CriticalSection region(start->monitor());
iteration++;
start->notifyAll();
}
PNThread::runAll();
while (PNGeodesic::blockedOnFull() > 0
&& !SimControl::haltRequested())
{
increaseBuffers();
PNThread::runAll();
}
currentTime += schedulePeriod;
}
return !SimControl::haltRequested();
}
void SyncDataFlowProcess::run()
{
int i = 0;
// Configure the star for dynamic execution.
star.setDynamicExecution(TRUE);
// Fire the star ad infinitum.
do
{
// Wait for notification to start.
{
CriticalSection region(start.monitor());
while (iteration <= i) start.wait();
i = iteration;
}
if (star.waitPort()) star.waitPort()->receiveData();
} while (star.run());
}
The increaseBuffers
method is used during the course of execution to adjust the channel capacities according to the theory presented in [Par95, ch. 4]. Each time execution stops, the program graph is examined for full channels. If there are any full channels, then the capacity of the smallest one is increased.
// Increase buffer capacities.
// Return number of full buffers encountered.
int PNScheduler::increaseBuffers()
{
int fullBuffers = 0;
PNGeodesic* smallest = NULL;
// Increase the capacity of the smallest full geodesic.
GalStarIter nextStar(*galaxy());
Star* star;
while ((star = nextStar++) != NULL)
{
BlockPortIter nextPort(*star);
PortHole* port;
while ((port = nextPort++) != NULL)
{
PNGeodesic* geo = NULL;
if (port->isItOutput() &&
(geo = (PNGeodesic*)port->geo()) != NULL)
{
if (geo->size() >= geo->capacity())
{
fullBuffers++;
if (smallest == NULL ||
geo->capacity() <
smallest->capacity())
smallest = geo;
}
}
}
}
if (smallest != NULL)
smallest->setCapacity(smallest->capacity() + 1);
return fullBuffers;
}
Copyright © 1990-1997, University of California. All rights
reserved.