/* * A DE domain "map" actor which applies a map procedure * specified in a subclass of MapReduceAlgorithm to input * key and value tokens and outputs them to a to reduce * actors. This className is specified in the parameter * className. The number of reduce actors to output to is * specified by the parameter numberOfReduceOutputs. This * actor inputs true false values on the doneReceiving port * to let it know when it can stop applying the map function. * It emits true on the doneEmitting port when it is done * sending key/value tokens to the reduce outputs. * * @author Adam Cataldo */ danglingPortsOkay; Map is { actor worker = ptolemy.actor.ptalon.lib.MapWorker; actor storage = ptolemy.actor.ptalon.lib.MapFileStorage; inport inputKey; inport inputValue; inport doneReceiving; outport[] outputKeys; outport[] outputValues; outport doneEmitting; parameter className; parameter numberOfReduceOutputs; relation unprocessedKey; relation unprocessedValue; worker(inputKey := inputKey, inputValue := inputValue, outputKey := unprocessedKey, outputValue := unprocessedValue, classNameForMap := [[className]]); transparent relation storageOutputKey; transparent relation storageOutputValue; storage(inputKey := unprocessedKey, inputValue := unprocessedValue, doneReceiving := doneReceiving, outputKey := storageOutputKey, outputValue := storageOutputValue, doneEmitting := doneEmitting, numberOfOutputs := [[numberOfReduceOutputs]]); for a initially [[0]] [[a < numberOfReduceOutputs]] { this(outputKeys := storageOutputKey, outputValues := storageOutputValue); } next [[a + 1]] }