public class ReduceWorker extends TypedAtomicActor
This actor has a parameter classNameForReduce which is the qualified name for a Java class that extends ptolemy.actor.ptalon.lib.MapReduceAlgorithm. It must also have a no argument constructor. By extending this abstract class, it will implement a method named reduce with type signature:
public List<String> reduce(String key,
BlockingQueue<String> value)
This method defines the Reduce algorithm for the MapReduce system. At each call, it should return a list of Strings, which is a reduction of the list of input values. At each firing, this actor inputs all available input keys and values. It outputs the value tokens when its doneReading port receives a true value. This should only happen after all inputs have been sent to the system.
When implementing a custom reduce method in a subclass of MapReduceAlgorithm, note to use the take method to get values from the queue. Call the isQueueEmpty of MapReduceAlgorithm to test if this actor has stopped putting values on the queue and that all values have been taken from the queue. The last element of the queue will always be the empty string. Ignore this value.
KeyValuePair
Red (cxh) |
Red (cxh) |
Entity.ContainedObjectsIterator
Modifier and Type | Field and Description |
---|---|
StringParameter |
classNameForReduce
The qualified class name for a Java class containing a method
with signature:
public static List<String[]> map(String key, String value)
Each element of each returned list should be a length two array of
Strings. |
TypedIOPort |
doneReading
A boolean input.
|
TypedIOPort |
inputKey
A String input key.
|
TypedIOPort |
inputValue
A String input value.
|
TypedIOPort |
outputKey
A String output key.
|
TypedIOPort |
outputValue
A String output value.
|
_typesValid
_actorFiringListeners, _initializables, _notifyingActorFiring, _stopRequested
_changeListeners, _changeLock, _changeRequests, _debugging, _debugListeners, _deferChangeRequests, _elementName, _isPersistent, _verbose, _workspace, ATTRIBUTES, CLASSNAME, COMPLETE, CONTENTS, DEEP, FULLNAME, LINKS
COMPLETED, NOT_READY, STOP_ITERATING
Constructor and Description |
---|
ReduceWorker(CompositeEntity container,
java.lang.String name)
Create a new actor in the specified container with the specified
name.
|
Modifier and Type | Method and Description |
---|---|
void |
attributeChanged(Attribute attribute)
React to a change in an attribute.
|
void |
fire()
Read in a token on the inputKey and inputValue
ports and output pairs of tokens on the outputKey, outputValue
ports.
|
void |
initialize()
Extract the map method from the classNameForMap parameter.
|
boolean |
postfire()
Return true, unless stop() has been called, in which case,
return false.
|
boolean |
prefire()
Return true if there is an available key token and value token
on the inputKey and inputValue ports.
|
void |
wrapup()
Clean up memory.
|
_containedTypeConstraints, _customTypeConstraints, _defaultTypeConstraints, _fireAt, _fireAt, attributeTypeChanged, clone, clone, isBackwardTypeInferenceEnabled, newPort, typeConstraintList, typeConstraints
_actorFiring, _actorFiring, _declareDelayDependency, addActorFiringListener, addInitializable, connectionsChanged, createReceivers, declareDelayDependency, getCausalityInterface, getDirector, getExecutiveDirector, getManager, inputPortList, isFireFunctional, isStrict, iterate, newReceiver, outputPortList, preinitialize, pruneDependencies, recordFiring, removeActorFiringListener, removeDependency, removeInitializable, setContainer, stop, stopFire, terminate
_adjustDeferrals, _checkContainer, _getContainedObject, _propagateExistence, getContainer, instantiate, isAtomic, isOpaque, moveDown, moveToFirst, moveToIndex, moveToLast, moveUp, propagateExistence, setName
_addPort, _description, _exportMoMLContents, _removePort, _validateSettables, connectedPortList, connectedPorts, containedObjectsIterator, getAttribute, getPort, getPorts, linkedRelationList, linkedRelations, portList, removeAllPorts, setClassDefinition, uniqueName
_setParent, exportMoML, getChildren, getElementName, getParent, getPrototypeList, isClassDefinition, isWithinClassDefinition
_addAttribute, _adjustOverride, _attachText, _cloneFixAttributeFields, _containedDecorators, _copyChangeRequestList, _debug, _debug, _debug, _debug, _debug, _executeChangeRequests, _getIndentPrefix, _isMoMLSuppressed, _markContentsDerived, _notifyHierarchyListenersAfterChange, _notifyHierarchyListenersBeforeChange, _propagateValue, _removeAttribute, _splitName, _stripNumericSuffix, addChangeListener, addDebugListener, addHierarchyListener, attributeDeleted, attributeList, attributeList, decorators, deepContains, depthInHierarchy, description, description, event, executeChangeRequests, exportMoML, exportMoML, exportMoML, exportMoML, exportMoMLPlain, getAttribute, getAttributes, getChangeListeners, getClassName, getDecoratorAttribute, getDecoratorAttributes, getDerivedLevel, getDerivedList, getDisplayName, getFullName, getModelErrorHandler, getName, getName, getSource, handleModelError, isDeferringChangeRequests, isOverridden, isPersistent, lazyContainedObjectsIterator, message, notifyOfNameChange, propagateValue, propagateValues, removeAttribute, removeChangeListener, removeDebugListener, removeHierarchyListener, requestChange, setClassName, setDeferringChangeRequests, setDerivedLevel, setDisplayName, setModelErrorHandler, setPersistent, setSource, sortContainedObjects, toplevel, toString, validateSettables, workspace
equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
createReceivers, getCausalityInterface, getDirector, getExecutiveDirector, getManager, inputPortList, newReceiver, outputPortList
isFireFunctional, isStrict, iterate, stop, stopFire, terminate
addInitializable, preinitialize, removeInitializable
description, getContainer, getDisplayName, getFullName, getName, getName, setName
getDerivedLevel, getDerivedList, propagateValue
public StringParameter classNameForReduce
public static List<String[]> map(String key, String value)
Each element of each returned list should be a length two array of Strings.
public TypedIOPort doneReading
public TypedIOPort inputKey
public TypedIOPort inputValue
public TypedIOPort outputKey
public TypedIOPort outputValue
public ReduceWorker(CompositeEntity container, java.lang.String name) throws IllegalActionException, NameDuplicationException
container
- The container.name
- The name of this actor within the container.IllegalActionException
- If this actor cannot be contained
by the proposed container (see the setContainer() method).NameDuplicationException
- If the name coincides with
an entity already in the container.public void attributeChanged(Attribute attribute) throws IllegalActionException
attributeChanged
in class NamedObj
attribute
- The attribute that changed.IllegalActionException
- If the change is not acceptable
to this container. If the class set in classNameForReduce
does not exist, or if the class exists but does not contain a map
method with an appropriate signature, this exception will be thrown.public void fire() throws IllegalActionException
fire
in interface Executable
fire
in class AtomicActor<TypedIOPort>
IllegalActionException
- If there is any trouble calling
the map method.public boolean prefire() throws IllegalActionException
prefire
in interface Executable
prefire
in class AtomicActor<TypedIOPort>
IllegalActionException
- Not thrown in this class.public boolean postfire() throws IllegalActionException
AtomicActor
postfire
in interface Executable
postfire
in class AtomicActor<TypedIOPort>
IllegalActionException
- If thrown in the base class.public void wrapup() throws IllegalActionException
wrapup
in interface Initializable
wrapup
in class AtomicActor<TypedIOPort>
IllegalActionException
- If thrown in the base class.public void initialize() throws IllegalActionException
initialize
in interface Initializable
initialize
in class AtomicActor<TypedIOPort>
IllegalActionException
- If unable to extract an appropriate
map method.