/* * A discrete-event Map Reduce model. To use this, provide * a name for a Java class that extends * ptolemy.actor.ptalon.lib.MapReduceAlgorithm for the className * parameter. In this class, you provide the basic map and reduce * methods, which you are required to implement to extend * MapReduceAlgorithm. If the qualified name for this class is * foo.bar.MyClass, set * * className = "foo.bar.MyClass" * * Set the total number of map and reduce machines by providing the * parameter numberOfMachines. The data that will be processed should * be specified with the fileName token. The input data should * be stored in a file with file type ".map", like "C:/ptII/data.map". * The reduced data will be stored in "C:/ptII/data1.red", * "C:/ptII/data2.red", ..., up to the number of reduce machines generated * (which will be ceiling(numberOfMachines / 2)). In this case, you would set * * fileName = "C:/ptII/data" * * Note that a MapReduce actor could be written in a non-timed domain, * like Rendezvous, but the DE times provide a convenient ordering on times. * * @author Adam Cataldo */ MapReduce is { actor split = ptalonActor:ptolemy.actor.ptalon.demo.MapReduce.Split; actor stop = ptalonActor:ptolemy.actor.ptalon.demo.MapReduce.WaitingStop; outport status; actorparameter map; parameter numberOfMaps; actorparameter reduce; parameter numberOfReduces; parameter fileName; parameter className; port reference splitKeys; port reference splitValues; relation splitFinished; split( keys := splitKeys, values := splitValues, doneReading := splitFinished, file := [[fileName + ".map"]], numberOfOutputs := [[numberOfMaps]]); port reference stopInput; stop( input := stopInput, numberOfInputs := [[numberOfMaps]], status := status); for a initially [[0]] [[a < numberOfReduces]] { port reference reduceInKey[[a]]; port reference reduceInValue[[a]]; reduce( inputKey := reduceInKey[[a]], inputValue := reduceInValue[[a]], doneReceiving := splitFinished, file := [[fileName + a.toString + ".red"]], className := [[className]], numberOfInputs := [[numberOfMaps]]); } next [[a + 1]] for a initially [[0]] [[a < numberOfMaps]] { port reference mapOutKeys[[a]]; port reference mapOutValues[[a]]; relation mapFinished[[a]]; map( inputKey := splitKeys, inputValue := splitValues, outputKeys := mapOutKeys[[a]], outputValues := mapOutValues[[a]], doneReceiving := splitFinished, doneEmitting := mapFinished[[a]], className := [[className]], numberOfReduceOutputs := [[numberOfReduces]]); this(stopInput := mapFinished[[a]]); for b initially [[0]] [[b < numberOfReduces]] { this(reduceInKey[[b]] := mapOutKeys[[a]]); this(reduceInValue[[b]] := mapOutValues[[a]]); } next [[b + 1]] } next [[a + 1]] }