Top Up Prev Next Bottom Contents Index Search

10.4 Scheduling


Figure
10-3 shows the class derivation hierarchy for the classes that implement the dynamic scheduling of Kahn process networks. The class ThreadList provides mechanisms for terminating groups of threads. This class is used by PNScheduler to create threads for each node in the program graph. The class SyncDataFlowProcess implements the threads for the nodes.

10.4.1 ThreadList

The class ThreadList implements a container class for manipulating groups of threads. It has two public methods.

virtual void add(PtThread*);
This method adds a PtThread object to the list.
virtual ~ThreadScheduler();
This method terminates and deletes all threads in the list.

10.4.2 PNScheduler

The class PNScheduler controls the execution of a process network. Three data members support synchronization between the scheduler and the processes.

ThreadList* threads;
A container for the threads managed by the scheduler.
PNMonitor* monitor;
A monitor to guard the scheduler's condition variable.
PNCondition* start;
A condition variable for synchronizing with threads.
int iteration;
A counter for regulating the execution of the processes.
The createThreads method, shown below, creates one process for each node in the program graph. A SyncDataFlowProcess is created for each DataFlowStar and added to the ThreadList container.


// Create threads (dataflow processes).
void PNScheduler::createThreads()
{
if (! galaxy()) return;
GalStarIter nextStar(*galaxy());
DataFlowStar* star;
LOG_NEW; threads = new ThreadList;

// Create Threads for all the Stars.
while((star = (DataFlowStar*)nextStar++) != NULL)
{
LOG_NEW; SyncDataFlowProcess* p
= new SyncDataFlowProcess(*star,*start,iteration);
threads->add(p);
p->initialize();
}
}
It is often desirable to have a partial execution of a process network. The class SyncDataFlowProcess, which is derived from DataFlowProcess, supports this by synchronizing the execution of a thread with the iteration counter that belongs to the PNScheduler. The run methods of PNScheduler and SyncDataFlowProcess implement this synchronization. The PNScheduler run method, shown below, increments the iteration count to give every process an opportunity to run. The SyncDataFlowProcess run method, shown below, ensures that the number of invocations of the star's run method does not exceed the iteration count.


// Run (or continue) the simulation.
int PNScheduler::run()
{
if (SimControl::haltRequested() || ! galaxy())
{
Error::abortRun("cannot continue");
return FALSE;
}

while((currentTime < stopTime) && !SimControl::haltRequested())
{
// Notify all threads to continue.
{
CriticalSection region(start->monitor());
iteration++;
start->notifyAll();
}
PNThread::runAll();
while (PNGeodesic::blockedOnFull() > 0
&& !SimControl::haltRequested())
{
increaseBuffers();
PNThread::runAll();
}
currentTime += schedulePeriod;
}

return !SimControl::haltRequested();
}

void SyncDataFlowProcess::run()
{
int i = 0;
// Configure the star for dynamic execution.
star.setDynamicExecution(TRUE);

// Fire the star ad infinitum.
do
{
// Wait for notification to start.
{
CriticalSection region(start.monitor());
while (iteration <= i) start.wait();
i = iteration;
}
if (star.waitPort()) star.waitPort()->receiveData();
} while (star.run());
}
The increaseBuffers method is used during the course of execution to adjust the channel capacities according to the theory presented in [Par95, ch. 4]. Each time execution stops, the program graph is examined for full channels. If there are any full channels, then the capacity of the smallest one is increased.


// Increase buffer capacities.
// Return number of full buffers encountered.
int PNScheduler::increaseBuffers()
{
int fullBuffers = 0;
PNGeodesic* smallest = NULL;

// Increase the capacity of the smallest full geodesic.
GalStarIter nextStar(*galaxy());
Star* star;
while ((star = nextStar++) != NULL)
{
BlockPortIter nextPort(*star);
PortHole* port;
while ((port = nextPort++) != NULL)
{
PNGeodesic* geo = NULL;
if (port->isItOutput() &&
(geo = (PNGeodesic*)port->geo()) != NULL)
{
if (geo->size() >= geo->capacity())
{
fullBuffers++;
if (smallest == NULL ||
geo->capacity() <
smallest->capacity())
smallest = geo;
}
}
}
}
if (smallest != NULL)
smallest->setCapacity(smallest->capacity() + 1);

return fullBuffers;
}


Top Up Prev Next Bottom Contents Index Search

Copyright © 1990-1997, University of California. All rights reserved.