/* * A discrete-event Map Reduce model. To use this, provide * a name for a Java class that extends * ptolemy.actor.ptalon.lib.MapReduceAlgorithm for the className * parameter. In this class, you provide the basic map and reduce * methods, which you are required to implement to extend * MapReduceAlgorithm. If the qualified name for this class is * foo.bar.MyClass, set * * className = "foo.bar.MyClass" * * Set the total number of map and reduce machines by providing the * parameter numberOfMachines. The data that will be processed should * be specified with the fileNamePrefix token. The input data should * be stored in a file with file type ".map", like "C:/ptII/data.map". * The reduced data will be stored in "C:/ptII/data1.red", * "C:/ptII/data2.red", ..., up to the number of reduce machines generated * (which will be ceiling(numberOfMachines / 2)). In this case, you would set * * fileNamePrefix = "C:/ptII/data" * * Note that a MapReduce actor could be written in a non-timed domain, * like Rendezvous, but the DE times provide a convenient ordering on times. * * @author Adam Cataldo */ danglingPortsOkay; MapReduce is { actor split = ptalonActor:ptolemy.actor.ptalon.demo.MapReduce.Split; actor map = ptalonActor:ptolemy.actor.ptalon.demo.MapReduce.Map; actor reduce = ptalonActor:ptolemy.actor.ptalon.demo.MapReduce.Reduce; actor stop = ptalonActor:ptolemy.actor.ptalon.demo.MapReduce.WaitingStop; outport status; parameter className; parameter numberOfMachines; parameter fileNamePrefix; transparent relation splitKeys; transparent relation splitValues; relation splitFinished; split( keys := splitKeys, values := splitValues, doneReading := splitFinished, file := [[fileNamePrefix + ".map"]], numberOfOutputs := [[numberOfMachines / 2]]); transparent relation stopInput; stop( input := stopInput, numberOfInputs := [[numberOfMachines - numberOfMachines / 2]], status := status); for a initially [[0]] [[a < numberOfMachines - numberOfMachines / 2]] { transparent relation reduceInKey[[a]]; transparent relation reduceInValue[[a]]; reduce( inputKey := reduceInKey[[a]], inputValue := reduceInValue[[a]], doneReceiving := splitFinished, file := [[fileNamePrefix + a.toString + ".red"]], className := [[className]], numberOfInputs := [[numberOfMachines / 2]]); } next [[a + 1]] for a initially [[0]] [[a < numberOfMachines / 2]] { transparent relation mapOutKeys[[a]]; transparent relation mapOutValues[[a]]; relation mapFinished[[a]]; map( inputKey := splitKeys, inputValue := splitValues, outputKeys := mapOutKeys[[a]], outputValues := mapOutValues[[a]], doneReceiving := splitFinished, doneEmitting := mapFinished[[a]], className := [[className]], numberOfReduceOutputs := [[numberOfMachines - numberOfMachines / 2]]); this(stopInput := mapFinished[[a]]); for b initially [[0]] [[b < numberOfMachines - numberOfMachines / 2]] { this(reduceInKey[[b]] := mapOutKeys[[a]]); this(reduceInValue[[b]] := mapOutValues[[a]]); } next [[b + 1]] } next [[a + 1]] }