/* A ReduceWorker actor, as a subsystem of the MapReduce system. Copyright (c) 2006-2014 The Regents of the University of California. All rights reserved. Permission is hereby granted, without written agreement and without license or royalty fees, to use, copy, modify, and distribute this software and its documentation for any purpose, provided that the above copyright notice and the following two paragraphs appear in all copies of this software. IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. PT_COPYRIGHT_VERSION_2 COPYRIGHTENDKEY */ package ptolemy.actor.ptalon.lib; import java.util.Hashtable; import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import ptolemy.actor.TypedAtomicActor; import ptolemy.actor.TypedIOPort; import ptolemy.data.BooleanToken; import ptolemy.data.StringToken; import ptolemy.data.expr.StringParameter; import ptolemy.data.type.BaseType; import ptolemy.kernel.CompositeEntity; import ptolemy.kernel.util.Attribute; import ptolemy.kernel.util.IllegalActionException; import ptolemy.kernel.util.NameDuplicationException; /////////////////////////////////////////////////////////////////// ////ReduceWorker /** A ReduceWorker actor, as a subsystem of the MapReduce system.
This actor has a parameter classNameForReduce which is the qualified name for a Java class that extends ptolemy.actor.ptalon.lib.MapReduceAlgorithm. It must also have a no argument constructor. By extending this abstract class, it will implement a method named reduce with type signature:
public List<String> reduce(String key,
BlockingQueue<String> value)
This method defines the Reduce algorithm for the MapReduce system. At each call, it should return a list of Strings, which is a reduction of the list of input values. At each firing, this actor inputs all available input keys and values. It outputs the value tokens when its doneReading port receives a true value. This should only happen after all inputs have been sent to the system.
When implementing a custom reduce method in a subclass of MapReduceAlgorithm, note to use the take method to get values from the queue. Call the isQueueEmpty of MapReduceAlgorithm to test if this actor has stopped putting values on the queue and that all values have been taken from the queue. The last element of the queue will always be the empty string. Ignore this value. @author Adam Cataldo @version $Id: ReduceWorker.java 70402 2014-10-23 00:52:20Z cxh $ @since Ptolemy II 6.1 @Pt.ProposedRating Red (cxh) @Pt.AcceptedRating Red (cxh) @see ptolemy.actor.ptalon.lib.KeyValuePair */ public class ReduceWorker extends TypedAtomicActor { /** Create a new actor in the specified container with the specified * name. The name must be unique within the container or an exception * is thrown. The container argument must not be null, or a * NullPointerException will be thrown. * * @param container The container. * @param name The name of this actor within the container. * @exception IllegalActionException If this actor cannot be contained * by the proposed container (see the setContainer() method). * @exception NameDuplicationException If the name coincides with * an entity already in the container. */ public ReduceWorker(CompositeEntity container, String name) throws IllegalActionException, NameDuplicationException { super(container, name); classNameForReduce = new StringParameter(this, "classNameForReduce"); classNameForReduce.setExpression("ptolemy.actor.ptalon.lib.WordCount"); inputKey = new TypedIOPort(this, "inputKey"); inputKey.setInput(true); inputKey.setTypeEquals(BaseType.STRING); inputKey.setMultiport(true); inputValue = new TypedIOPort(this, "inputValue"); inputValue.setInput(true); inputValue.setTypeEquals(BaseType.STRING); inputValue.setMultiport(true); outputKey = new TypedIOPort(this, "outputKey"); outputKey.setOutput(true); outputKey.setTypeEquals(BaseType.STRING); outputValue = new TypedIOPort(this, "outputValue"); outputValue.setOutput(true); outputValue.setTypeEquals(BaseType.STRING); doneReading = new TypedIOPort(this, "doneReading"); doneReading.setInput(true); doneReading.setTypeEquals(BaseType.BOOLEAN); } /////////////////////////////////////////////////////////////////// //// ports and parameters //// /** The qualified class name for a Java class containing a method * with signature: *
* public static List<String[]> map(String key, String value)
*
* Each element of each returned list should be a length two array of
* Strings.
*/
public StringParameter classNameForReduce;
/**
* A boolean input. When this input is true, the
* actor is done reading values, and it may output
* tokens for each key it received.
*/
public TypedIOPort doneReading;
/** A String input key.
*/
public TypedIOPort inputKey;
/** A String input value.
*/
public TypedIOPort inputValue;
/** A String output key.
*/
public TypedIOPort outputKey;
/** A String output value.
*/
public TypedIOPort outputValue;
///////////////////////////////////////////////////////////////////
//// public methods ////
/** React to a change in an attribute. This method is called by
* a contained attribute when its value changes. In this base class,
* the method does nothing. In derived classes, this method may
* throw an exception, indicating that the new attribute value
* is invalid. It is up to the caller to restore the attribute
* to a valid value if an exception is thrown. If the attribute
* changed is classNameForReduce, update this actor accordingly.
* @param attribute The attribute that changed.
* @exception IllegalActionException If the change is not acceptable
* to this container. If the class set in classNameForReduce
* does not exist, or if the class exists but does not contain a map
* method with an appropriate signature, this exception will be thrown.
*/
@Override
public void attributeChanged(Attribute attribute)
throws IllegalActionException {
if (attribute == classNameForReduce) {
_setReduceMethod();
} else {
super.attributeChanged(attribute);
}
}
/** Read in a token on the inputKey and inputValue
* ports and output pairs of tokens on the outputKey, outputValue
* ports.
*
* @exception IllegalActionException If there is any trouble calling
* the map method.
*/
@Override
public void fire() throws IllegalActionException {
super.fire();
if (_readMode) {
int numberInputs = inputKey.getWidth();
for (int i = 0; i < numberInputs; i++) {
while (inputKey.hasToken(i) && inputValue.hasToken(i)) {
String key = ((StringToken) inputKey.get(i)).stringValue();
String value = ((StringToken) inputValue.get(i))
.stringValue();
if (_runningAlgorithms.containsKey(key)) {
MapReduceAlgorithm algorithm = _runningAlgorithms
.get(key);
try {
algorithm.reduceValues.put(value);
} catch (InterruptedException e) {
throw new IllegalActionException(
"Interrupted while trying to put value for key "
+ key);
}
} else {
MapReduceAlgorithm newAlgorithm = null;
try {
newAlgorithm = (MapReduceAlgorithm) _reduceClass
.newInstance();
} catch (IllegalAccessException e) {
throw new IllegalActionException(
classNameForReduce.stringValue()
+ " does not have a no argument constructor");
} catch (InstantiationException e) {
throw new IllegalActionException(
classNameForReduce.stringValue()
+ " is abstract.");
} catch (ClassCastException e) {
throw new IllegalActionException(
"Unable to cast instance of "
+ classNameForReduce.stringValue()
+ " to ptolemy.actor.ptalon.lib.MapReduceAlgorithm.");
}
newAlgorithm.reduceKey = key;
newAlgorithm.reduceValues = new LinkedBlockingQueue