/* An actor that executes a contained actor in separate thread. Copyright (c) 2007-2014 The Regents of the University of California. All rights reserved. Permission is hereby granted, without written agreement and without license or royalty fees, to use, copy, modify, and distribute this software and its documentation for any purpose, provided that the above copyright notice and the following two paragraphs appear in all copies of this software. IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. PT_COPYRIGHT_VERSION_2 COPYRIGHTENDKEY */ package ptolemy.actor.lib.hoc; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.Set; import ptolemy.actor.Actor; import ptolemy.actor.Director; import ptolemy.actor.Executable; import ptolemy.actor.IOPort; import ptolemy.actor.NoTokenException; import ptolemy.actor.QueueReceiver; import ptolemy.actor.Receiver; import ptolemy.actor.util.BreakCausalityInterface; import ptolemy.actor.util.CausalityInterface; import ptolemy.actor.util.Time; import ptolemy.data.BooleanToken; import ptolemy.data.DoubleToken; import ptolemy.data.Token; import ptolemy.data.expr.Parameter; import ptolemy.data.type.BaseType; import ptolemy.kernel.ComponentEntity; import ptolemy.kernel.CompositeEntity; import ptolemy.kernel.util.Attribute; import ptolemy.kernel.util.IllegalActionException; import ptolemy.kernel.util.InternalErrorException; import ptolemy.kernel.util.NameDuplicationException; import ptolemy.kernel.util.Settable; import ptolemy.kernel.util.Workspace; /////////////////////////////////////////////////////////////////// //// ThreadedComposite /** A container for another actor that executes that other actor in a separate thread called the inside thread. This actor starts that thread in its initialize() method, which is invoked by its executive director (the director in charge of firing this actor). The thread that invokes the action methods of this actor (initialize(), prefire(), fire(), postfire(), and wrapup()) is called the director thread.

A paper describing the use of this actor is found at http://www.eecs.berkeley.edu/Pubs/TechRpts/2008/EECS-2008-151.html.

This actor automatically creates input and output ports to match those of the inside actor. Input events provided at those input ports are provided as input events to the contained actor. Outputs provided by the contained actor become output events of this actor. The time stamp of the input events is provided by the container of this actor. The time stamp of the output events depends on the delay parameter, as explained below.

The inside thread blocks waiting for inputs or pure events. Inputs are provided to that thread when the fire() method of this actor is invoked by the director thread. Pure events are provided after fireAt(), fireAtCurrentTime(), or fireAtFirstValidTimeAfter() are called by either the inside thread or the director thread. When the time of those firing requests becomes current time, the container will (presumably) fire this actor, and this actor will provide a pure event to the inside thread, causing it to fire the contained actor.

If the synchronizeToRealTime parameter is true, then when the inside thread encounters an input or pure event with time stamp t, it stalls until real time matches or exceeds t (measured in seconds since the start of execution of the inside thread). In contrast for example to the synchronizeToRealTime parameter of the DEDirector, this enables construction of models where only a portion of the model synchronizes to real time.

When the wrapup() method of this actor is called, the inside thread is provided with signal to terminate rather than to process additional inputs. The inside thread will also exit if stop() is called on this actor; however, in this case, which iterations are completed is nondeterminate (there may be inputs left unprocessed).

The parameters of this actor include all the parameters of the contained actor, and setting those parameters automatically sets the parameters of the contained actor.

In addition to the parameters of the contained actor, this actor has a delay parameter. This parameter is a double that be any nonnegative value or the special value UNDEFINED. If it is given a nonnegative value, then the value specifies the model-time delay between input events and the output events that result from reacting to those input events. That is, if this actor is given an input event with time stamp t, then if the contained actor produces any output events in reaction to that event, those output events will be produced by this actor with time stamp t + delay.

If delay has value UNDEFINED, then outputs are produced at the current model time of the executive director when the inside thread happens to produce those events, or if synchronizeToRealTime, at the greater of current model time and current real time (measured in seconds since the start of execution). This is accomplished by the inside thread calling fireAtFirstValidTimeAfter() of the enclosing director, and then producing the outputs when the requested firing occurs in the director thread. Note that with this value of the delay, it is possible for the inside thread to continue to execute and respond to input events after the wrapup phase of the director thread has been entered. The wrapup phase will stall until the inside thread has completed its processing of its inputs, but any outputs it produces after the wrapup phase has started will be discarded.

The most common use of this actor is in the DE domain, although it can also be used in CT, SR, SDF, and other domains, with some care. See the above referenced memo. Regardless of the value of delay, this actor is treated by DE as introducing a delay, much like the TimedDelay actor. In fact, if delay is 0.0, there will be a one tick delay in superdense time, just as with the TimedDelay actor. If the inside model also has a time delay (e.g. if you put a TimedDelay actor inside a ThreadedComposite), then the total delay is the sum of the two delays.

Discussion:

There are several useful things you can do with this model. We describe some use cases here:

Background execution. When delay is greater than or equal to 0.0, then when this actor is fired in response to input events with time stamp t, the actual processing of those events occurs later in a separate thread. The director thread is not blocked, and can continue to process events with time stamps less than or equal to t + delay. The director thread is blocked from processing events with larger time stamps than that because this is necessary to preserve DE semantics. To implement this, this actor uses fireAt() to request a firing at time t + delay, and when that firing occurs, it blocks the director thread until the reaction is complete.

Parallel firing. Note that if delay is set to 0.0, it may seem that there is no point in using this actor, since model time will not be allowed to increase past t until the contained actor has reacted to events with time stamp t. However, there is actually exploitable concurrency if there are other actors in the model that also have pending input events with time stamp t. Those event can be processed concurrently with this actor reacting to its input event. A typical use case will broadcast an event to several instances of ThreadedComposite, in which case each of those several inside threads can execute concurrently in reaction to those input events.

Real-time source. If the contained actor (and hence this actor) has no inputs and synchronizeToRealTime is true, then the contained actor must call fireAt() or one of its variants so that the inside thread will be provided with pure events. The behavior depends on which variant of the fireAt() methods is used by the inside actor. There are three cases: FIXME: Described these. In particular, delay needs to specify the minimum increment between these or fireAt() could result in an exception. Do we want a parameter to relax that?

On subtlety of this actor is that it cannot expose instances of ParameterPort without introducing nondeterminacy in the execution. A ParameterPort is an input port that sets the value of a parameter with the same name. Upon receiving a token at such a port, if this actor were to set a parameter visible by the inside thread, there is no assurance that the inside thread is not still executing an earlier iteration. Thus, it could appear to be sending a message backward in time, which would be truly bizarre. To prevent this error, this actor does not mirror such ports, and hence they appear on the outside only as parameters. @author Edward A. Lee @version $Id: ThreadedComposite.java 70402 2014-10-23 00:52:20Z cxh $ @since Ptolemy II 8.0 @Pt.ProposedRating Yellow (eal) @Pt.AcceptedRating Red (eal) */ public class ThreadedComposite extends MirrorComposite { /** Create an actor with a name and a container. * The container argument must not be null, or a * NullPointerException will be thrown. This actor will use the * workspace of the container for synchronization and version counts. * If the name argument is null, then the name is set to the empty string. * Increment the version of the workspace. * @param container The container actor. * @param name The name of this actor. * @exception IllegalActionException If the container is incompatible * with this actor. * @exception NameDuplicationException If the name coincides with * an actor already in the container. */ public ThreadedComposite(CompositeEntity container, String name) throws IllegalActionException, NameDuplicationException { // The false argument specifies that instances of ParameterPort // should not be mirrored. This would make the behavior nondeterminate, // so we expose these only as parameters. super(container, name, false); setClassName("ptolemy.actor.lib.hoc.ThreadedComposite"); // Create the ThreadedDirector in the proper workspace. ThreadedDirector threadedDirector = new ThreadedDirector(workspace()); threadedDirector.setContainer(this); threadedDirector.setName(uniqueName("ThreadedDirector")); // Hidden parameter defining "UNDEFINED". Parameter UNDEFINED = new Parameter(this, "UNDEFINED"); UNDEFINED.setVisibility(Settable.EXPERT); UNDEFINED.setPersistent(false); UNDEFINED.setExpression("-1.0"); delay = new Parameter(this, "delay"); delay.setTypeEquals(BaseType.DOUBLE); delay.setExpression("0.0"); synchronizeToRealTime = new Parameter(this, "synchronizeToRealTime"); synchronizeToRealTime.setTypeEquals(BaseType.BOOLEAN); synchronizeToRealTime.setExpression("false"); } /////////////////////////////////////////////////////////////////// //// parameters //// /** The model-time delay between the input events and the * output events. This is a double that defaults to 0.0, * indicating that outputs should have the same time stamps * as the inputs that trigger them. If it has a value greater * than zero, then the outputs will have large time stamps * by that amount. If it has the value UNDEFINED * (or any negative number), then the output time stamp * will be nondeterminate, and will depend on the current * model time of the outside director when the output is * produced or on current real time. */ public Parameter delay; /** If set to true, the inside thread stalls until real time matches * the time stamps of input events or pure events for each firing. * In addition, if delay is set to undefined and this is set * to true, then output events are assigned a time stamp that is the * greater of current model time and real time. * Time is measured since the start of the execution of the inside * thread. This is a boolean that defaults to false. Changing * the value of this parameter has no effect until the next * execution of the model. */ public Parameter synchronizeToRealTime; /////////////////////////////////////////////////////////////////// //// public methods //// /** React to a change in an attribute. This method is called by * a contained attribute when its value changes. In this base class, * the method does nothing. In derived classes, this method may * throw an exception, indicating that the new attribute value * is invalid. It is up to the caller to restore the attribute * to a valid value if an exception is thrown. * @param attribute The attribute that changed. * @exception IllegalActionException If the change is not acceptable * to this container (not thrown in this base class). */ @Override public void attributeChanged(Attribute attribute) throws IllegalActionException { if (attribute == delay) { _delayValue = ((DoubleToken) delay.getToken()).doubleValue(); } else { super.attributeChanged(attribute); } } /** Clone the actor into the specified workspace. * @param workspace The workspace for the new object. * @return A new actor. * @exception CloneNotSupportedException If a derived class has * has an attribute that cannot be cloned. */ @Override public Object clone(Workspace workspace) throws CloneNotSupportedException { ThreadedComposite newObject = (ThreadedComposite) super .clone(workspace); try { // Remove the old inner ThreadedDirector that is in the wrong workspace. List iterateDirectors = newObject .attributeList(ThreadedDirector.class); ThreadedDirector oldThreadedDirector = (ThreadedDirector) iterateDirectors .get(0); String threadedDirectorName = oldThreadedDirector.getName(); oldThreadedDirector.setContainer(null); // Create a new ThreadedDirector that is in the right workspace. ThreadedDirector iterateDirector = newObject.new ThreadedDirector( workspace); iterateDirector.setContainer(newObject); iterateDirector.setName(threadedDirectorName); } catch (Throwable throwable) { throw new CloneNotSupportedException("Could not clone: " + throwable); } newObject._causalityInterface = null; newObject._realStartTime = 0L; return newObject; } /** Override the base class to return a causality interface that * indicates that the output does not depend (immediately) on * the input. This method assumes that the director deals with BooleanDependencies * and returns an instance of BreakCausalityInterface. * @return A representation of the dependencies between input ports * and output ports. */ @Override public CausalityInterface getCausalityInterface() { // FIXME: This will not work property with Ptides because it will effectively // declare that the delay from input to output is infinite, which it is not. // What we want is for the delay from input to output to be a superdense time // delay of (0.0, 1). This could be implemented by a class similar to // BreakCausalityInterface that does the right thing when the director // provides a Dependency that is a SuperdenseTimeIdentity. if (_causalityInterface == null) { _causalityInterface = new BreakCausalityInterface(this, getExecutiveDirector().defaultDependency()); } return _causalityInterface; } /** Iterate the contained actors of the * container of this director. * @return False if any contained actor returns false in postfire. * @exception IllegalActionException If any called method of * of the contained actor throws it, or if the contained * actor is not opaque. */ public boolean iterateContainedActors() throws IllegalActionException { // Don't call "super.fire();" here, this actor contains its // own director. boolean result = true; List actors = entityList(); for (Actor actor : actors) { if (_stopRequested) { break; } if (!((ComponentEntity) actor).isOpaque()) { throw new IllegalActionException(this, "Inside actor is not opaque " + "(perhaps it needs a director)."); } if (_debugging) { _debug("---- Iterating actor in inside thread: " + actor.getFullName()); } if (actor.iterate(1) == Executable.STOP_ITERATING) { result = false; _debug("---- Prefire returned false: " + actor.getFullName()); } } return result; } /////////////////////////////////////////////////////////////////// //// private variables //// /** The cached value of the delay parameter. */ private double _delayValue = 0.0; /** The real time at which the model begins executing, in milliseconds. */ private long _realStartTime = 0; /////////////////////////////////////////////////////////////////// //// inner classes //// /////////////////////////////////////////////////////////////////// //// TokenFrame /** Bundle data associated with ports and a time stamp. * There are three types of frames: * EVENT is a (possibly empty) bundle of data and a time * stamp that is either provided to the inside thread from * the inputs of a ThreadedComposite or provided by the * inside thread to form the outputs of a ThreadedComposite. * POSTFIRE is a frame indicating that the inside actor * can be postfired. No tokens are provided (they are assumed * to have been consumed in the firing). STOP is a frame * provided to the inside thread to indicate that it should * stop executing. */ protected static class TokenFrame { // FindBugs suggests making this class static so as to decrease // the size of instances and avoid dangling references. /** Construct a TokenFrame. * @param theTime The time of this token frame. * @param theTokens a list of QueueTokens. * @param theType The FrameType. */ public TokenFrame(Time theTime, List theTokens, FrameType theType) { tokens = theTokens; time = theTime; type = theType; } /** The time. */ public final Time time; /** A list of tokens. */ public final List tokens; /** The type of the frame. */ public final FrameType type; // Final fields (FindBugs suggestion) /** A (possibly empty) bundle of data and a time * stamp that is either provided to the inside thread from * the inputs of a ThreadedComposite or provided by the * inside thread to form the outputs of a ThreadedComposite. */ public final static FrameType EVENT = new FrameType(); /** POSTFIRE is a frame indicating that the inside actor * can be postfired. No tokens are provided (they are assumed * to have been consumed in the firing). */ public final static FrameType POSTFIRE = new FrameType(); /** STOP is a frame provided to the inside thread to indicate * that it should stop executing. */ public final static FrameType STOP = new FrameType(); private static class FrameType { private FrameType() { }; } } /////////////////////////////////////////////////////////////////// //// QueuedToken /** Bundle of a token and the input port and channel * at which it arrived. */ private static class QueuedToken { // FindBugs suggests making this class static so as to decrease // the size of instances and avoid dangling references. public QueuedToken(IOPort thePort, int theChannel, Token theToken) { token = theToken; channel = theChannel; port = thePort; } public final int channel; public final Token token; public final IOPort port; @Override public String toString() { return "token " + token + " for port " + port.getFullName() + "(" + channel + ")"; } } /////////////////////////////////////////////////////////////////// //// ThreadedDirector /** A specialized director that fires a contained actor * in a separate thread. The prefire() method returns true * if the inside thread is alive. The fire() method posts * input events, if any, for the current firing on a queue for * the inside thread to consume. If the firing is in response * to a prior refiring request by this director, then the fire() * method will also wait for the inside thread to complete * its firing, and will then produce outputs from that firing. * The postfire() method posts * a request to postfire the contained actor and also requests * a refiring of this director at the current time plus the delay * value (unless the delay value is UNDEFINED). The wrapup() method * requests termination of the inside thread. If postfire() * of the contained actor returns false, then postfire() of this director * will return false, requesting a halt to execution of the model. */ private class ThreadedDirector extends Director { /** Construct a new instance of the director for ThreadedComposite. * The director is created in the specified workspace with * no container and an empty string as a name. You can then change * the name with setName(). If the workspace argument is null, then * use the default workspace. You should set the local director or * executive director before attempting to send data to the actor * or to execute it. Add the actor to the workspace directory. * Increment the version number of the workspace. * @param workspace The workspace that will list the actor. * @exception IllegalActionException If the container is incompatible * with this actor. * @exception NameDuplicationException If the name coincides with * an actor already in the container. */ public ThreadedDirector(Workspace workspace) throws IllegalActionException, NameDuplicationException { super(workspace); setPersistent(false); } /** Clone the director into the specified workspace. * @param workspace The workspace for the new object. * @return A new director. * @exception CloneNotSupportedException If a derived class has * has an attribute that cannot be cloned. */ @Override public Object clone(Workspace workspace) throws CloneNotSupportedException { ThreadedDirector newObject = (ThreadedDirector) super .clone(workspace); newObject._exception = null; newObject._inputTokens = null; newObject._thread = null; newObject._outputTimes = new LinkedList