/* * A DE domain "reduce" actor which applies a reduce procedure * specified in a subclass of MapReduceAlgorithm to input * key and value tokens and outputs them to a to a file. * This class name is specified in the parameter * className, and the file in file. * The number of reduce actors to output to is * specified by the parameter numberOfReduceOutputs. This * actor inputs true false values on the doneReceiving port * to let it know when it can stop applying the map function. * It emits true on the doneEmitting port when it is done * sending key/value tokens to the reduce outputs. The parameter * numberOfInputs specifies how many inputs to connect to the * ReduceWorker. It should be equal to the number of map components. * * @author Adam Cataldo */ danglingPortsOkay; Reduce is { actor worker = ptolemy.actor.ptalon.lib.ReduceWorker; actor writer = ptolemy.actor.lib.io.ExpressionWriter; actor converter = ptolemy.actor.lib.ElementsToArray; inport[] inputKey; inport[] inputValue; inport doneReceiving; parameter file; parameter className; parameter numberOfInputs; transparent relation converterIn; relation converterOut; converter(input:= converterIn, output := converterOut); relation outKey; relation outValue; transparent relation workerInKey; transparent relation workerInValue; worker(inputKey := workerInKey, inputValue := workerInValue, outputKey := outKey, outputValue := outValue, doneReading := doneReceiving, classNameForReduce := [[className]]); this(converterIn := outKey); this(converterIn := outValue); for a initially [[0]] [[a < numberOfInputs]] { this(inputKey := workerInKey); this(inputValue := workerInValue); } next [[a + 1]] writer(input := converterOut, fileName := [[file]], confirmOverwrite := [[false]]); }