/* An actor that executes a contained actor in separate thread.
Copyright (c) 2007-2014 The Regents of the University of California.
All rights reserved.
Permission is hereby granted, without written agreement and without
license or royalty fees, to use, copy, modify, and distribute this
software and its documentation for any purpose, provided that the above
copyright notice and the following two paragraphs appear in all copies
of this software.
IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
ENHANCEMENTS, OR MODIFICATIONS.
PT_COPYRIGHT_VERSION_2
COPYRIGHTENDKEY
*/
package ptolemy.actor.lib.hoc;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import ptolemy.actor.Actor;
import ptolemy.actor.Director;
import ptolemy.actor.Executable;
import ptolemy.actor.IOPort;
import ptolemy.actor.NoTokenException;
import ptolemy.actor.QueueReceiver;
import ptolemy.actor.Receiver;
import ptolemy.actor.util.BreakCausalityInterface;
import ptolemy.actor.util.CausalityInterface;
import ptolemy.actor.util.Time;
import ptolemy.data.BooleanToken;
import ptolemy.data.DoubleToken;
import ptolemy.data.Token;
import ptolemy.data.expr.Parameter;
import ptolemy.data.type.BaseType;
import ptolemy.kernel.ComponentEntity;
import ptolemy.kernel.CompositeEntity;
import ptolemy.kernel.util.Attribute;
import ptolemy.kernel.util.IllegalActionException;
import ptolemy.kernel.util.InternalErrorException;
import ptolemy.kernel.util.NameDuplicationException;
import ptolemy.kernel.util.Settable;
import ptolemy.kernel.util.Workspace;
///////////////////////////////////////////////////////////////////
//// ThreadedComposite
/**
A container for another actor that executes that other actor
in a separate thread called the inside thread.
This actor starts that thread in its initialize()
method, which is invoked by its executive director (the director
in charge of firing this actor). The thread that invokes the
action methods of this actor
(initialize(), prefire(), fire(), postfire(), and wrapup())
is called the director thread.
This actor automatically creates input and output ports to
match those of the inside actor. Input events provided at those
input ports are provided as input events to the contained actor.
Outputs provided by the contained actor become output events
of this actor. The time stamp of the input events is provided
by the container of this actor. The time stamp of the output events
depends on the delay parameter, as explained below.
The inside thread blocks waiting for inputs or pure events.
Inputs are provided to that thread when the fire() method of
this actor is invoked by the director thread.
Pure events are provided after fireAt(),
fireAtCurrentTime(), or fireAtFirstValidTimeAfter() are called
by either the inside thread or the director thread.
When the time of those firing requests becomes current time,
the container will (presumably) fire this actor, and
this actor will provide a pure event to the inside thread,
causing it to fire the contained actor.
If the synchronizeToRealTime parameter is true, then
when the inside thread encounters an input or pure event
with time stamp t, it stalls until real time matches
or exceeds t (measured in seconds since the start of
execution of the inside thread). In contrast for example
to the synchronizeToRealTime parameter of the DEDirector,
this enables construction of models where only a portion of the
model synchronizes to real time.
When the wrapup() method of this actor is called, the inside thread is
provided with signal to terminate rather than to process additional
inputs. The inside thread will also exit if stop() is called on this
actor; however, in this case, which iterations are completed
is nondeterminate (there may be inputs left unprocessed).
The parameters of this actor include all the parameters of the
contained actor, and setting those parameters automatically
sets the parameters of the contained actor.
In addition to the parameters of the contained actor, this actor
has a delay parameter. This parameter is a double that
be any nonnegative value or the special value UNDEFINED.
If it is given a nonnegative value, then the value specifies
the model-time delay between input events and the output
events that result from reacting to those input events.
That is, if this actor is given an input event with time
stamp t, then if the contained actor produces any output
events in reaction to that event, those output events will be
produced by this actor with time stamp t + delay.
If delay has value UNDEFINED, then
outputs are produced at the current model time of the executive
director when the inside thread happens to produce those events,
or if synchronizeToRealTime, at the greater of current
model time and current real time (measured in seconds since
the start of execution).
This is accomplished by the inside thread calling
fireAtFirstValidTimeAfter() of the enclosing director, and
then producing the outputs when the requested firing occurs
in the director thread. Note that with this value of the
delay, it is possible for the inside thread to
continue to execute and respond to input events after
the wrapup phase of the director thread has been entered.
The wrapup phase will stall until the inside thread has
completed its processing of its inputs, but any outputs
it produces after the wrapup phase has started will be
discarded.
The most common use of this actor is in the DE domain,
although it can also be used in CT, SR, SDF, and other domains,
with some care. See the above referenced memo.
Regardless of the value of delay, this actor is treated
by DE as introducing a delay, much like the TimedDelay actor.
In fact, if delay is 0.0, there will be a one tick delay
in superdense time, just as with the TimedDelay actor.
If the inside model also has a time delay (e.g. if you
put a TimedDelay actor inside a ThreadedComposite), then
the total delay is the sum of the two delays.
Discussion:
There are several useful things you can do with this model.
We describe some use cases here:
Background execution. When delay is greater than
or equal to 0.0,
then when this actor is fired in response to input events
with time stamp t, the actual
processing of those events occurs later in a separate thread. The
director thread is not blocked, and can continue to process events
with time stamps less than or equal to t + delay.
The director thread is blocked from processing events with larger
time stamps than that because this is necessary to preserve DE
semantics. To implement this, this actor uses fireAt() to
request a firing at time t + delay, and when that
firing occurs, it blocks the director thread until the reaction
is complete.
Parallel firing. Note that if delay is set to 0.0,
it may seem that there is no point in using this actor, since
model time will not be allowed to increase past t until
the contained actor has reacted to events with time stamp t.
However, there is actually exploitable concurrency if there
are other actors in the model that also have pending input
events with time stamp t. Those event can be processed
concurrently with this actor reacting to its input event.
A typical use case will broadcast an event to several instances
of ThreadedComposite, in which case each of those several
inside threads can execute concurrently in reaction to those
input events.
Real-time source. If the contained actor (and hence this
actor) has no inputs and synchronizeToRealTime is true, then
the contained actor must call fireAt() or one of its variants so that
the inside thread will be provided with pure events.
The behavior depends on which variant of the fireAt() methods is used
by the inside actor. There are three cases:
FIXME: Described these. In particular, delay needs to specify the
minimum increment between these or fireAt() could result in an
exception. Do we want a parameter to relax that?
On subtlety of this actor is that it cannot expose instances of ParameterPort
without introducing nondeterminacy in the execution. A ParameterPort
is an input port that sets the value of a parameter with the same name. Upon receiving
a token at such a port, if this actor were to set a parameter visible by the
inside thread, there is no assurance that the inside thread is not still
executing an earlier iteration. Thus, it could appear to be sending a message
backward in time, which would be truly bizarre. To prevent this error,
this actor does not mirror such ports, and hence they appear on the outside
only as parameters.
@author Edward A. Lee
@version $Id: ThreadedComposite.java 70402 2014-10-23 00:52:20Z cxh $
@since Ptolemy II 8.0
@Pt.ProposedRating Yellow (eal)
@Pt.AcceptedRating Red (eal)
*/
public class ThreadedComposite extends MirrorComposite {
/** Create an actor with a name and a container.
* The container argument must not be null, or a
* NullPointerException will be thrown. This actor will use the
* workspace of the container for synchronization and version counts.
* If the name argument is null, then the name is set to the empty string.
* Increment the version of the workspace.
* @param container The container actor.
* @param name The name of this actor.
* @exception IllegalActionException If the container is incompatible
* with this actor.
* @exception NameDuplicationException If the name coincides with
* an actor already in the container.
*/
public ThreadedComposite(CompositeEntity container, String name)
throws IllegalActionException, NameDuplicationException {
// The false argument specifies that instances of ParameterPort
// should not be mirrored. This would make the behavior nondeterminate,
// so we expose these only as parameters.
super(container, name, false);
setClassName("ptolemy.actor.lib.hoc.ThreadedComposite");
// Create the ThreadedDirector in the proper workspace.
ThreadedDirector threadedDirector = new ThreadedDirector(workspace());
threadedDirector.setContainer(this);
threadedDirector.setName(uniqueName("ThreadedDirector"));
// Hidden parameter defining "UNDEFINED".
Parameter UNDEFINED = new Parameter(this, "UNDEFINED");
UNDEFINED.setVisibility(Settable.EXPERT);
UNDEFINED.setPersistent(false);
UNDEFINED.setExpression("-1.0");
delay = new Parameter(this, "delay");
delay.setTypeEquals(BaseType.DOUBLE);
delay.setExpression("0.0");
synchronizeToRealTime = new Parameter(this, "synchronizeToRealTime");
synchronizeToRealTime.setTypeEquals(BaseType.BOOLEAN);
synchronizeToRealTime.setExpression("false");
}
///////////////////////////////////////////////////////////////////
//// parameters ////
/** The model-time delay between the input events and the
* output events. This is a double that defaults to 0.0,
* indicating that outputs should have the same time stamps
* as the inputs that trigger them. If it has a value greater
* than zero, then the outputs will have large time stamps
* by that amount. If it has the value UNDEFINED
* (or any negative number), then the output time stamp
* will be nondeterminate, and will depend on the current
* model time of the outside director when the output is
* produced or on current real time.
*/
public Parameter delay;
/** If set to true, the inside thread stalls until real time matches
* the time stamps of input events or pure events for each firing.
* In addition, if delay is set to undefined and this is set
* to true, then output events are assigned a time stamp that is the
* greater of current model time and real time.
* Time is measured since the start of the execution of the inside
* thread. This is a boolean that defaults to false. Changing
* the value of this parameter has no effect until the next
* execution of the model.
*/
public Parameter synchronizeToRealTime;
///////////////////////////////////////////////////////////////////
//// public methods ////
/** React to a change in an attribute. This method is called by
* a contained attribute when its value changes. In this base class,
* the method does nothing. In derived classes, this method may
* throw an exception, indicating that the new attribute value
* is invalid. It is up to the caller to restore the attribute
* to a valid value if an exception is thrown.
* @param attribute The attribute that changed.
* @exception IllegalActionException If the change is not acceptable
* to this container (not thrown in this base class).
*/
@Override
public void attributeChanged(Attribute attribute)
throws IllegalActionException {
if (attribute == delay) {
_delayValue = ((DoubleToken) delay.getToken()).doubleValue();
} else {
super.attributeChanged(attribute);
}
}
/** Clone the actor into the specified workspace.
* @param workspace The workspace for the new object.
* @return A new actor.
* @exception CloneNotSupportedException If a derived class has
* has an attribute that cannot be cloned.
*/
@Override
public Object clone(Workspace workspace) throws CloneNotSupportedException {
ThreadedComposite newObject = (ThreadedComposite) super
.clone(workspace);
try {
// Remove the old inner ThreadedDirector that is in the wrong workspace.
List iterateDirectors = newObject
.attributeList(ThreadedDirector.class);
ThreadedDirector oldThreadedDirector = (ThreadedDirector) iterateDirectors
.get(0);
String threadedDirectorName = oldThreadedDirector.getName();
oldThreadedDirector.setContainer(null);
// Create a new ThreadedDirector that is in the right workspace.
ThreadedDirector iterateDirector = newObject.new ThreadedDirector(
workspace);
iterateDirector.setContainer(newObject);
iterateDirector.setName(threadedDirectorName);
} catch (Throwable throwable) {
throw new CloneNotSupportedException("Could not clone: "
+ throwable);
}
newObject._causalityInterface = null;
newObject._realStartTime = 0L;
return newObject;
}
/** Override the base class to return a causality interface that
* indicates that the output does not depend (immediately) on
* the input. This method assumes that the director deals with BooleanDependencies
* and returns an instance of BreakCausalityInterface.
* @return A representation of the dependencies between input ports
* and output ports.
*/
@Override
public CausalityInterface getCausalityInterface() {
// FIXME: This will not work property with Ptides because it will effectively
// declare that the delay from input to output is infinite, which it is not.
// What we want is for the delay from input to output to be a superdense time
// delay of (0.0, 1). This could be implemented by a class similar to
// BreakCausalityInterface that does the right thing when the director
// provides a Dependency that is a SuperdenseTimeIdentity.
if (_causalityInterface == null) {
_causalityInterface = new BreakCausalityInterface(this,
getExecutiveDirector().defaultDependency());
}
return _causalityInterface;
}
/** Iterate the contained actors of the
* container of this director.
* @return False if any contained actor returns false in postfire.
* @exception IllegalActionException If any called method of
* of the contained actor throws it, or if the contained
* actor is not opaque.
*/
public boolean iterateContainedActors() throws IllegalActionException {
// Don't call "super.fire();" here, this actor contains its
// own director.
boolean result = true;
List actors = entityList();
for (Actor actor : actors) {
if (_stopRequested) {
break;
}
if (!((ComponentEntity) actor).isOpaque()) {
throw new IllegalActionException(this,
"Inside actor is not opaque "
+ "(perhaps it needs a director).");
}
if (_debugging) {
_debug("---- Iterating actor in inside thread: "
+ actor.getFullName());
}
if (actor.iterate(1) == Executable.STOP_ITERATING) {
result = false;
_debug("---- Prefire returned false: " + actor.getFullName());
}
}
return result;
}
///////////////////////////////////////////////////////////////////
//// private variables ////
/** The cached value of the delay parameter. */
private double _delayValue = 0.0;
/** The real time at which the model begins executing, in milliseconds. */
private long _realStartTime = 0;
///////////////////////////////////////////////////////////////////
//// inner classes ////
///////////////////////////////////////////////////////////////////
//// TokenFrame
/** Bundle data associated with ports and a time stamp.
* There are three types of frames:
* EVENT is a (possibly empty) bundle of data and a time
* stamp that is either provided to the inside thread from
* the inputs of a ThreadedComposite or provided by the
* inside thread to form the outputs of a ThreadedComposite.
* POSTFIRE is a frame indicating that the inside actor
* can be postfired. No tokens are provided (they are assumed
* to have been consumed in the firing). STOP is a frame
* provided to the inside thread to indicate that it should
* stop executing.
*/
protected static class TokenFrame {
// FindBugs suggests making this class static so as to decrease
// the size of instances and avoid dangling references.
/** Construct a TokenFrame.
* @param theTime The time of this token frame.
* @param theTokens a list of QueueTokens.
* @param theType The FrameType.
*/
public TokenFrame(Time theTime, List theTokens,
FrameType theType) {
tokens = theTokens;
time = theTime;
type = theType;
}
/** The time. */
public final Time time;
/** A list of tokens. */
public final List tokens;
/** The type of the frame. */
public final FrameType type;
// Final fields (FindBugs suggestion)
/** A (possibly empty) bundle of data and a time
* stamp that is either provided to the inside thread from
* the inputs of a ThreadedComposite or provided by the
* inside thread to form the outputs of a ThreadedComposite.
*/
public final static FrameType EVENT = new FrameType();
/** POSTFIRE is a frame indicating that the inside actor
* can be postfired. No tokens are provided (they are assumed
* to have been consumed in the firing).
*/
public final static FrameType POSTFIRE = new FrameType();
/** STOP is a frame provided to the inside thread to indicate
* that it should stop executing.
*/
public final static FrameType STOP = new FrameType();
private static class FrameType {
private FrameType() {
};
}
}
///////////////////////////////////////////////////////////////////
//// QueuedToken
/** Bundle of a token and the input port and channel
* at which it arrived.
*/
private static class QueuedToken {
// FindBugs suggests making this class static so as to decrease
// the size of instances and avoid dangling references.
public QueuedToken(IOPort thePort, int theChannel, Token theToken) {
token = theToken;
channel = theChannel;
port = thePort;
}
public final int channel;
public final Token token;
public final IOPort port;
@Override
public String toString() {
return "token " + token + " for port " + port.getFullName() + "("
+ channel + ")";
}
}
///////////////////////////////////////////////////////////////////
//// ThreadedDirector
/** A specialized director that fires a contained actor
* in a separate thread. The prefire() method returns true
* if the inside thread is alive. The fire() method posts
* input events, if any, for the current firing on a queue for
* the inside thread to consume. If the firing is in response
* to a prior refiring request by this director, then the fire()
* method will also wait for the inside thread to complete
* its firing, and will then produce outputs from that firing.
* The postfire() method posts
* a request to postfire the contained actor and also requests
* a refiring of this director at the current time plus the delay
* value (unless the delay value is UNDEFINED). The wrapup() method
* requests termination of the inside thread. If postfire()
* of the contained actor returns false, then postfire() of this director
* will return false, requesting a halt to execution of the model.
*/
private class ThreadedDirector extends Director {
/** Construct a new instance of the director for ThreadedComposite.
* The director is created in the specified workspace with
* no container and an empty string as a name. You can then change
* the name with setName(). If the workspace argument is null, then
* use the default workspace. You should set the local director or
* executive director before attempting to send data to the actor
* or to execute it. Add the actor to the workspace directory.
* Increment the version number of the workspace.
* @param workspace The workspace that will list the actor.
* @exception IllegalActionException If the container is incompatible
* with this actor.
* @exception NameDuplicationException If the name coincides with
* an actor already in the container.
*/
public ThreadedDirector(Workspace workspace)
throws IllegalActionException, NameDuplicationException {
super(workspace);
setPersistent(false);
}
/** Clone the director into the specified workspace.
* @param workspace The workspace for the new object.
* @return A new director.
* @exception CloneNotSupportedException If a derived class has
* has an attribute that cannot be cloned.
*/
@Override
public Object clone(Workspace workspace)
throws CloneNotSupportedException {
ThreadedDirector newObject = (ThreadedDirector) super
.clone(workspace);
newObject._exception = null;
newObject._inputTokens = null;
newObject._thread = null;
newObject._outputTimes = new LinkedList