/* This director implements the Ptides programming model. @Copyright (c) 2008-2014 The Regents of the University of California. All rights reserved. Permission is hereby granted, without written agreement and without license or royalty fees, to use, copy, modify, and distribute this software and its documentation for any purpose, provided that the above copyright notice and the following two paragraphs appear in all copies of this software. IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. PT_COPYRIGHT_VERSION_2 COPYRIGHTENDKEY */ package ptolemy.domains.ptides.kernel; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.TreeSet; import ptolemy.actor.Actor; import ptolemy.actor.ActorExecutionAspect; import ptolemy.actor.CompositeActor; import ptolemy.actor.IOPort; import ptolemy.actor.Receiver; import ptolemy.actor.TypedCompositeActor; import ptolemy.actor.TypedIOPort; import ptolemy.actor.lib.DiscreteClock; import ptolemy.actor.lib.PoissonClock; import ptolemy.actor.lib.TimeDelay; import ptolemy.actor.parameters.ParameterPort; import ptolemy.actor.parameters.SharedParameter; import ptolemy.actor.util.CausalityInterface; import ptolemy.actor.util.Dependency; import ptolemy.actor.util.PeriodicDirector; import ptolemy.actor.util.SuperdenseDependency; import ptolemy.actor.util.Time; import ptolemy.data.ArrayToken; import ptolemy.data.BooleanToken; import ptolemy.data.DoubleToken; import ptolemy.data.IntToken; import ptolemy.data.Token; import ptolemy.data.expr.Parameter; import ptolemy.data.type.BaseType; import ptolemy.domains.de.kernel.DEDirector; import ptolemy.domains.de.kernel.DEEventQueue; import ptolemy.domains.modal.modal.ModalModel; import ptolemy.domains.ptides.lib.PtidesPort; import ptolemy.kernel.CompositeEntity; import ptolemy.kernel.util.Attribute; import ptolemy.kernel.util.Decorator; import ptolemy.kernel.util.DecoratorAttributes; import ptolemy.kernel.util.IllegalActionException; import ptolemy.kernel.util.InternalErrorException; import ptolemy.kernel.util.KernelException; import ptolemy.kernel.util.NameDuplicationException; import ptolemy.kernel.util.NamedObj; import ptolemy.kernel.util.Settable; import ptolemy.kernel.util.Workspace; /** This director implements the Ptides programming model, * which is used for the design of distributed real-time systems. * *

This director can only be used inside the PtidesPlatform * and if the PtidesPlatform is dragged and dropped from the library, * it already contains a PtidesDirector. A PtidesPlatform must be * embedded in a timed director such as DE or Continuous. The time of * this enclosing director simulates physical time. * The localClock of this PtidesDirector simulates platformTime. * *

The Ptides director is based on the DE director. Like the DE * director, this director maintains a totally ordered set of events. * Event timestamps are given in logical time. The logical time is decoupled * from the platformTime. The model time of the director is the * platformTime unless an actor is fired; then the model time is * the timestamp of the event that caused the actor firing. * * Unlike the DE Director, this director can process events out of timestamp order * if they are not causally related. Whether events can be processed * is checked in a safe-to-process analysis. * This analysis returns a boolean to indicate whether an event * can be processed without violating Ptides * semantics, based on information such as events currently in the * event queue, their model time relationship with each other, as * well as the current platform physical time. In this particular version of * the Ptides scheduler, the director takes all events * from the event queue, and compares their timestamp * with the current platform time + a pre-computed offset (call this * the delayOffset). If the platform time is larger, than this event * is safe to process. Otherwise, we wait for platform time to pass * until this event becomes safe, at which point it is processed. * Other, smarter kinds of safe-to-process analysis can be * implemented in future versions.

* * Currently, only * the DE director can be used as the enclosing director. One reason * for using the DE director is that time cannot go backwards in DE, * which is an important physical time property. More importantly, * the fire method of this director changes the persistent state of * actors, which means this director cannot be used inside of an * actor that performs fix point iteration, which includes (currently), * Continuous, CT and SR. For more details, please refer * to Edward A. Lee, Haiyang Zheng. Leveraging * Synchronous Language Principles for Heterogeneous Modeling * and Design of Embedded Systems, Proceedings of the * 7th ACM & IEEE international conference on Embedded * software, ACM, 114-123, 2007.

* *

This director provides a set of features to address both * the distributed and the real-time aspects of system design. * To address the distributed aspect, each PtidesPlatform simulates * a computation platform * (e.g., a microprocessor), while the enclosing director simulates * the physical world. Actors under the Ptides director then communicate * to the outside via SensorPorts, ActuatorPorts, or network ports * (NetworkReceivers, NetworkTransmitters). *

* *

This director allows for simulation of execution time. If the PtidesPlatform * contains ResourceSchedulers, the scheduling of actors is performed by these. * Actors must specify in parameters which ResourceSchedulers they are assigned * to and the executionTime. The passage of execution time equals the passage * of platformTime. Execution time has no influence on the event timestamps. * *

In a Ptides environment, all platforms are assumed to be synchronized * within a bounded error. * * *

The platform time is used in the following * situations: generating timestamps for sensor events, enforcing deadlines * for actuation events, and to setup the wake-up time for timed interrupts. * Also, the Ptides operational semantics assumes * a bound in the time synchronization error. This error is captured in the * parameter {@link #clockSynchronizationErrorBound}. If * the actual error exceeds this bound, the safe-to-process analysis could * produce an incorrect result. The demo PtidesNetworkLatencyTest illustrates * this error.

* * *

The implementation is based on the operational semantics * of Ptides, as described in: Jia Zou, Slobodan Matic, Edward * A. Lee, Thomas Huining Feng, Patricia Derler. Execution * Strategies for Ptides, a Programming Model for Distributed * Embedded Systems, 15th IEEE Real-Time and Embedded Technology * and Applications Symposium, 2009, IEEE Computer Society, 77-86, * April, 2009.

* * * @author Patricia Derler, Edward A. Lee, Slobodan Matic, Mike Zimmer, Jia Zou * @version $Id: PtidesDirector.java 70402 2014-10-23 00:52:20Z cxh $ * @since Ptolemy II 10.0 * * @Pt.ProposedRating Red (derler) * @Pt.AcceptedRating Red (derler) */ public class PtidesDirector extends DEDirector implements Decorator { /** Construct a director in the given container with the given name. * The container argument must not be null, or a * NullPointerException will be thrown. * If the name argument is null, then the name is set to the * empty string. Increment the version number of the workspace. * @param container Container of the director. * @param name Name of this director. * @exception IllegalActionException If the * director is not compatible with the specified container. * @exception NameDuplicationException If the container not a * CompositeActor and the name collides with an entity in the container. */ public PtidesDirector(CompositeEntity container, String name) throws IllegalActionException, NameDuplicationException { super(container, name); clockSynchronizationErrorBound = new SharedParameter(this, "clockSynchronizationErrorBound"); clockSynchronizationErrorBound.setTypeEquals(BaseType.DOUBLE); clockSynchronizationErrorBound.setExpression("0.0"); _clockSynchronizationErrorBound = new Time(this, 0.0); autoThrottling = new Parameter(this, "autoThrotting"); autoThrottling.setTypeEquals(BaseType.BOOLEAN); autoThrottling.setExpression("true"); autoThrottling.setVisibility(Settable.EXPERT); _autoThrottling = true; } /////////////////////////////////////////////////////////////////// //// public parameters //// /** Bound on clock synchronization error across all platforms. */ public SharedParameter clockSynchronizationErrorBound; /** Auto throttling of local sources. This parameter is only visible * in expert mode and defaults to the boolean value true. */ public Parameter autoThrottling; /////////////////////////////////////////////////////////////////// //// public methods //// /** Add a new event to the input queue. Compute the time when * this input can be consumed and store in queue. The time depends on * the device delay. * @param sourcePort the source port. * @param event New input event. * @param deviceDelay The device delay. * @exception IllegalActionException If device delay parameter cannot be computed. */ public void addInputEvent(PtidesPort sourcePort, PtidesEvent event, double deviceDelay) throws IllegalActionException { if (sourcePort.isNetworkReceiverPort()) { double networkDelayBound = PtidesDirector._getDoubleParameterValue( sourcePort, "networkDelayBound"); double sourcePlatformDelayBound = PtidesDirector ._getDoubleParameterValue(sourcePort, "sourcePlatformDelayBound"); if (localClock.getLocalTime().subtract(event.timeStamp()) .getDoubleValue() > sourcePlatformDelayBound + networkDelayBound + _clockSynchronizationErrorBound.getDoubleValue()) { event = _handleTimingError( sourcePort, event, "Event on this network receiver came in too late. " + "(Physical time: " + localClock.getLocalTime() + ", Event timestamp: " + event.timeStamp() + ", Source platform delay bound: " + sourcePlatformDelayBound + ", Network delay bound: " + networkDelayBound + ")"); } } if (event != null) { Time inputReady = getModelTime().add(deviceDelay); List list = _inputEventQueue.get(inputReady); if (list == null) { list = new ArrayList(); } list.add(event); _inputEventQueue.put(inputReady, list); } } /** Update the director parameters when attributes are changed. * @param attribute The changed parameter. * @exception IllegalActionException If the parameter set is not valid. * Not thrown in this class. */ @Override public void attributeChanged(Attribute attribute) throws IllegalActionException { if (attribute == clockSynchronizationErrorBound) { _clockSynchronizationErrorBound = new Time(this, ((DoubleToken) clockSynchronizationErrorBound.getToken()) .doubleValue()); } else if (attribute == autoThrottling) { _autoThrottling = ((BooleanToken) autoThrottling.getToken()) .booleanValue(); } else { super.attributeChanged(attribute); } } /** Clone the object into the specified workspace. The new object is * not added to the directory of that workspace (you must do this * yourself if you want it there). * The result is an attribute with no container. * @param workspace The workspace for the cloned object. * @exception CloneNotSupportedException Not thrown in this base class * @return The new Attribute. */ @Override public Object clone(Workspace workspace) throws CloneNotSupportedException { PtidesDirector newObject = (PtidesDirector) super.clone(workspace); try { newObject._clockSynchronizationErrorBound = new Time(newObject, 0.0); newObject._numberOfTokensPerPort = new HashMap(); } catch (IllegalActionException e) { // cannot happen. } return newObject; } /** Create and return the decorated attributes for the PtidesDirector. * The director decorates local sources with throttling attributes that * allow for specification of how far ahead in logical time these actors * can execute or how many tokens they can produce at a time. * @param target The NamedObj that will be decorated. * @return The decorated attributes for the target NamedObj, or null if the * specified NamedObj is not decorated by this decorator. */ @Override public DecoratorAttributes createDecoratorAttributes(NamedObj target) { if (_isLocalSource(target)) { try { return new ThrottleAttributes(target, this); } catch (KernelException ex) { // This should not occur. throw new InternalErrorException(ex); } } return null; } /** * Return the default dependency between input and output ports, * which for the Ptides domain is a {@link SuperdenseDependency}. * * @return The default dependency that describes a time delay of 0.0, * and a index delay of 0 between ports. */ @Override public Dependency defaultDependency() { return SuperdenseDependency.OTIMES_IDENTITY; } /** Return local sources contained by the composite of this director. * @return List of entities. */ @Override public List decoratedObjects() { List list = new ArrayList(); CompositeEntity container = (CompositeEntity) getContainer(); for (Object target : container.entityList()) { if (_isLocalSource(target)) { list.add((NamedObj) target); } } return list; } /** * Before super.fire() is called, transfer all input events that are ready are * transferred. After super.fire() is called, transfer all output events that * are ready are transferred. */ @Override public void fire() throws IllegalActionException { // Transfer all inputs that are ready. List list = _inputEventQueue.get(getModelTime()); if (list != null) { for (PtidesEvent event : list) { if (event.ioPort() != null) { _currentLogicalTime = event.timeStamp(); _currentSourceTimestamp = event.sourceTimestamp(); _currentLogicalIndex = event.microstep(); event.receiver().put(event.token()); _currentLogicalTime = null; if (_debugging) { _debug("iiiiiiii - transfer inputs from " + event.ioPort()); } } } _inputEventQueue.remove(getModelTime()); } super.fire(); // Transfer all outputs to the ports that are ready. list = _outputEventDeadlines.get(getModelTime()); if (list != null) { for (PtidesEvent event : list) { _currentLogicalTime = event.timeStamp(); _currentSourceTimestamp = event.sourceTimestamp(); _currentLogicalIndex = event.microstep(); if (event.ioPort() instanceof PtidesPort) { double deviceDelay = _getDoubleParameterValue( event.ioPort(), "deviceDelay"); Queue ptidesOutputPortList = _ptidesOutputPortEventQueue .get(event.ioPort()); if (ptidesOutputPortList == null) { ptidesOutputPortList = new LinkedList(); } // modify deadline of event such that it will be output after deviceDelay PtidesEvent newEvent = new PtidesEvent(event.ioPort(), event.channel(), event.timeStamp(), event.microstep(), event.depth(), event.token(), event.receiver(), localClock.getLocalTime().add( deviceDelay), event.sourceTimestamp()); ptidesOutputPortList.add(newEvent); _ptidesOutputPortEventQueue.put( (PtidesPort) event.ioPort(), ptidesOutputPortList); } _currentLogicalTime = null; } _outputEventDeadlines.remove(getModelTime()); } // Transfer all outputs from ports to the outside for (PtidesPort port : _ptidesOutputPortEventQueue.keySet()) { Queue ptidesOutputPortList = _ptidesOutputPortEventQueue .get(port); if (ptidesOutputPortList != null && ptidesOutputPortList.size() > 0) { PtidesEvent event = ptidesOutputPortList.peek(); if (event.absoluteDeadline().equals(localClock.getLocalTime())) { _currentLogicalTime = event.timeStamp(); _currentSourceTimestamp = event.sourceTimestamp(); _currentLogicalIndex = event.microstep(); event.ioPort().send(0, event.token()); _currentLogicalTime = null; ptidesOutputPortList.poll(); } } } } /** Add a pure event to the queue of pure events. * @param actor Actor to fire. * @param time Time the actor should be fired at. * @param index Microstep the actor should be fired at. * @return The time the actor requested to be refired at. * @exception IllegalActionException If firing of the container doesn't succeed. */ @Override public Time fireAt(Actor actor, Time time, int index) throws IllegalActionException { // Setting a stop time for the director calls this method // with the actor equal to the container. if (actor == this.getContainer()) { fireContainerAt(time); return time; } int newIndex = index; if (_currentLogicalTime != null) { //newIndex = _currentLogicalIndex; if (_currentLogicalTime.compareTo(time) == 0 && index <= getIndex()) { if (!(actor instanceof CompositeActor) || ((CompositeActor) actor).getDirector() .scheduleContainedActors()) { newIndex = Math.max(getIndex(), index) + 1; } } } if (_isInitializing) { _currentSourceTimestamp = time; } int depth = 1; if (!(actor instanceof ActorExecutionAspect)) { depth = _getDepthOfActor(actor); } _pureEvents.put(new PtidesEvent(actor, null, time, newIndex, depth, _zeroTime, _currentSourceTimestamp)); _currentSourceTimestamp = null; Time environmentTime = super.getEnvironmentTime(); if (environmentTime.compareTo(time) <= 0) { fireContainerAt(time, newIndex); } return time; } /** Return the source timestamp of the event that is currently * being processed. If no event is being processed, * (i.e. event is analyzed for safe to process, actor is fired, ...) this * method can return null or the timestamp of the previous event. * This method should not be called if no event is currently being * processed. * @return The current source timestamp. */ public Time getCurrentSourceTimestamp() { return _currentSourceTimestamp; } /** Compute the deadline for an actor that requests a firing at time * timestamp. * @param actor The actor that requests firing. * @param timestamp The time when the actor wants to be fired. * @return The deadline for the actor. * @exception IllegalActionException If time objects cannot be created. */ @Override public Time getDeadline(NamedObj actor, Time timestamp) throws IllegalActionException { Time relativeDeadline = Time.POSITIVE_INFINITY; for (int i = 0; i < ((Actor) actor).outputPortList().size(); i++) { for (int j = 0; j < ((IOPort) ((Actor) actor).outputPortList().get( i)).sinkPortList().size(); j++) { double newRelativeDeadline = _getRelativeDeadline((TypedIOPort) ((IOPort) ((Actor) actor) .outputPortList().get(i)).sinkPortList().get(j)); if (newRelativeDeadline < Double.MAX_VALUE && newRelativeDeadline < relativeDeadline .getDoubleValue()) { relativeDeadline = new Time(this, newRelativeDeadline); } } } return timestamp.add(relativeDeadline); } /** Return a superdense time index for the current time, * where the index is equal to the microstep. * @return A superdense time index. * @see #setIndex(int) * @see ptolemy.actor.SuperdenseTimeDirector */ @Override public int getIndex() { if (_currentLogicalTime != null) { return _currentLogicalIndex; } return getMicrostep(); } /** Return the local time or, (i) if an actor is executing or (ii) an input * token is read, (i) the timestamp of the event that caused the actor * execution or (ii) the timestamp of the input event. * @return The local time or the semantic */ @Override public Time getModelTime() { if (_currentLogicalTime != null) { return _currentLogicalTime; } return localClock.getLocalTime(); } /** Return the current microstep or the microstep of the event, if * an actor is currently executing. */ @Override public int getMicrostep() { if (_currentLogicalTime != null) { return _currentLogicalIndex; } return super.getMicrostep(); } /** Return the superdense dependency hashmap * This is used for the code generation in order to fill * the generated director hashmap. * @return The Superdense dependency hashmap */ public Map> getSuperdenseDependencyPair() { return _superdenseDependencyPair; } /** Initialize all the actors and variables. Perform static analysis on * superdense dependencies between input ports in the topology. * @exception IllegalActionException If any of the methods contained * in initialize() throw it. */ @Override public void initialize() throws IllegalActionException { if (_numberOfTokensPerPort != null) { _numberOfTokensPerPort.clear(); } super.initialize(); } /** Returns false, as this director only decorates local sources * immediately contained by the PtidesDirector, thus it should * not cross opaque hierarchy boundaries. * @return false. */ @Override public boolean isGlobalDecorator() { return true; } /** Return a new receiver of the type {@link PtidesReceiver}. * @return A new PtidesReceiver. */ @Override public Receiver newReceiver() { if (_debugging && _verbose) { _debug("Creating a new Ptides receiver."); } return new PtidesReceiver(); } /** Return false if there are no more actors to be fired or the stop() * method has been called. * @return True If this director will be fired again. * @exception IllegalActionException If the stopWhenQueueIsEmpty parameter * does not contain a valid token, or refiring can not be requested. */ @Override public boolean postfire() throws IllegalActionException { // Do not call super.postfire() because that requests a // refiring at the next event time on the event queue. Boolean result = !_stopRequested && !_finishRequested; if (getModelTime().compareTo(getModelStopTime()) >= 0) { // If there is a still event on the event queue with time stamp // equal to the stop time, we want to process that event before // we declare that we are done. if (_eventQueue.size() == 0 || !_eventQueue.get().timeStamp() .equals(getModelStopTime())) { result = false; } } // Potentially set next fire time from _outputEventQueue. Set