/* * --------------------------INTRODUCTION------------------------------ * Brian Goetz on the need for better Java support for parallelism: * "The Java language has had support for threads and concurrency * from day 1; the language includes synchronization primitives such as * synchronized and volatile, and the class library includes classes such as * Thread. However, the concurrency primitives that were sensible in 1995 * reflected the hardware reality of the time: most commercially available systems * provided no parallelism at all, and even the most expensive systems provided * only limited parallelism. In those days, threads were used primarily for * expressing asynchrony, not concurrency, and as a result, these mechanisms were * generally adequate to the demands of the time." * * "As multiprocessor systems started to become cheaper, more applications needed * to exploit the hardware parallelism they provided, and programmers found that * writing concurrent programs using the low-level primitives provided by the * language and class library was difficult and error-prone." * * "As we enter the many-core era, we will need to find finer-grained parallelism * or risk keeping processors idle even though there is plenty of work to do.... * Java 7 will include a framework for representing a certain class of finer- * grained parallel algorithms: the fork-join framework." * * * --------------------------ABOUT FORK/JOIN--------------------------- * Fork/Join for Java is a framework developed by Doug Lea, based on his original * paper from June 2000: http://gee.cs.oswego.edu/dl/papers/fj.pdf. Fork/Join * algorithms are parallel versions of divide-and-conquer algorithms. If a task * is small, then it is calculated sequentially on a single thread. If a task is * large, then it is divided into several parts and each part is added to a queue * for later computation. Queued tasks are free to divide again, be worked on by * any worker thread, or queue up waiting if the maximum number of threads has * been reached. When all the pieces have been divided up and calculated, then a * final result is calculated off the partial results of the distributed pieces. * "Fork" refers to the division of tasks, and "Join" refers to the merging of * results. * * Fork/Join is similar to MapReduce in that they are both algorithms for * parallelizing tasks. One difference is that Fork/Join tasks know how to * subdivide themselves if too large, whereas MapReduce algorithms typically * divide up all the work into portions before the algorithm starts. * * The cleverness of fork/join lies in its worker scheduling method called * "work stealing". Each worker thread maintains a double ended queue of tasks. * Any forks of a task get pushed onto the head of that thread's deque, not back * onto the general threadpool. Workers execute tasks in youngest first order. * When a worker has no more tasks to run it attempts to steal a task from * another worker by grabbing the tail of the other thread's deque. When a worker * thread has no work and fails to steal any from others, it backs off. * * One advantages of work stealing is reduced contention because stealers take * tasks from the opposite end of the deque than workers. Another advantage occurs * because recursive divide-and-conquer algorithms generate larger tasks early. * Older, stolen tasks provide larger units of work leading to further recursive * division. * * Fork/Join is being developed by the JSR expert group using the name jsr166y. * The main JSR 166 was included in Java 5 and the java.util.concurrent classes. * * --------------------------USEFUL LINKS------------------------------ * This file is a guide to using Groovy with Java Fork/Join (JSR 166y). * JSR Home: http://jcp.org/en/jsr/detail?id=166 * Concurrency Interest: http://gee.cs.oswego.edu/dl/concurrency-interest/ * Wiki: http://artisans-serverintellect-com.si-eioswww6.com/default.asp?W1 * JSR API: http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166ydocs/ * Downloadable Jar (Tested with Java 6): * http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166y.jar * Original Paper: http://gee.cs.oswego.edu/dl/papers/fj.pdf * Brian Goetz on Fork Join: * Part 1: http://www.ibm.com/developerworks/java/library/j-jtp11137.html * Part 2: http://www.ibm.com/developerworks/java/library/j-jtp03048.html * * -----------------------TABLE OF CONTENTS---------------------------- */ import jsr166y.forkjoin.ForkJoinPool import jsr166y.forkjoin.ForkJoinExecutor import jsr166y.forkjoin.RecursiveAction import jsr166y.forkjoin.ParallelArray import jsr166y.forkjoin.Ops.Predicate import jsr166y.forkjoin.Ops.Procedure import jsr166y.forkjoin.Ops.ObjectToDouble import jsr166y.forkjoin.Ops.DoubleReducer import jsr166y.forkjoin.ParallelDoubleArray.SummaryStatistics /* * A ForkJoinPool is a host for a group of ForkJoinWorkerThreads that perform * ForkJoinTasks. ForkJoinPool does not implement the Java ExecutorService * interface because it only executes ForkJoinTasks, not arbitrary Runnables. * * This code produces a Fork/Join pool with 10 workers */ ForkJoinExecutor pool = new ForkJoinPool(poolSize: 10); /* * Everyone loves the fibonacci sequence. It will be the basis for a few examples * showing how the core of Fork/Join works. Here is a sequential implementation: */ def fib fib = {n -> if (n <= 1) return n else return fib(n - 1) + fib(n - 2) } println "Sequential fib: ${fib(15)}" // Output: Sequential fib: 610 /* * In order to use Fibonacci with Fork/Join, we need to model the algorithm as an * object that can both solve the problem _and_ subdivide the problem (that is, * both conquer and divide). This class can do just that. The solve() method * simple invokes the sequential fib(), while the fork() method returns a two * element array representing parts which can be joined together later. */ public class FibonacciProblem { int n; def solve() { fibonacci(n) } def fork() { return [new FibonacciProblem(n: n - 1), new FibonacciProblem(n: n - 2)] } def fibonacci(n) { if (n <= 1) return n else return fibonacci(n - 1) + fibonacci(n - 2) } } /* * Once the problem is modeled, we can wrap it in a Fork/Join Task that controls * when to fork and when to solve (when to divide and when to conquer). The * task also needs to hold the result in mutable state so that it can be * queried later. */ public class FibonacciTask extends RecursiveAction { FibonacciProblem problem int result @Override protected void compute() { if (problem.n < 4) result = problem.solve() else { def subproblems = problem.fork() def subTasks = subproblems.collect { new FibonacciTask(problem: it) } forkJoin(subTasks) result = subTasks.collect { it.result }.sum() } } } /* * In the above example, notice how any problem less than 4 steps deep is simply * solved inline, otherwise the problem is fork into multiple parts and re- * queued using the forkJoin method. forkJoin is a convenience for separate fork() * and join() calls, adding a list of subtasks to the deque. * * One problem with modeling the fibonacci algorithm this way is that the * algorithm is spread out across two classes: the *Problem and the *Task. The * FibonacciProblem.fibonacci() method is complete, in that it uses the + * operator to join its two results up in its recursive final statement. But if * the problem is split then that + sign is represented in the FibonacciTask * at the point where sum() is called. This is a duplication of logic! Perhaps * it would be better to model the Problem and the Task as one entity... but then * that entity might be overloaded because it is has task and problem functions. * * Regardless, we can now throw a new FibonacciProblem on the framework and watch * it compute across threads. Now, much larger results can be calculated without * running out of stack space. The result should also be calculated faster. * Disclaimer: this is an admittedly naive implementation of fib. */ def task = new FibonacciTask(problem: new FibonacciProblem(n: 15)) pool.invoke(task) println "Forked fib: $task.result" // Output Forked fib: 610 /* * So far, it's fair to make two observations: modeling a problem as a recursive * fork/join task is _hard_, and using Groovy to do it offers little value. * Luckily, JSR166y offers some higher level abstractions so that we don't have * to deal with the low level tasks and actions directly, and using these * higher level constructs is where Groovy closures pay dividends. * * Many of the divide-and-conquer algorithms that fit a fork/join framework * operator on array and list types. Because of this, it is possible to build * up a large and useful library of functions and types to hide the messy details * of the fork/join tasks. * * The following examples are all going to operate on a "Student" type. We'll * generate some random test data for 1000 students. This usage of adding methods * to java.util.Random is probably not the best use case for meta-programming * but it sure is cute. */ class Student { int id; int graduationYear; double gpa; // 4 point scale double adjustedGpa; // 5 point scale } Random.metaClass Random.metaClass.randomYear = { 2000 + delegate.nextInt(11) } Random.metaClass.randomGPA = { delegate.nextInt(41) / 10 } Random rand = new Random(); def allStudents = (1..1000).collect { new Student(id: it, graduationYear: rand.randomYear(), gpa: rand.randomGPA()) } /* * One of the core helper classes is the ParallelArray, which is an array which * maintains an F/J executor in order to provide parallel aggregate operations. * Creating one is pretty easy but requires a reference to the executor pool: */ ParallelArray students = new ParallelArray(pool, allStudents as Student[]) /* * Filtering selects a subset of the elements from the array. This is like a * parallel version of Groovy's Collection#findAll(Closure). Note: The closure * is not executed when withFilter() is called. It is lazily executed as needed, * which in this case is when size() is called. Each invocation of the closure * will potentially occur on a different thread. The return type is itself a * functional array, so many of these methods may be chained, as you'll see later. */ def graduates = students .withFilter({ it.graduationYear <= 2008 } as Predicate) println "${graduates.size()} of the ${students.size()} students have graduated." // Output: 831 of the 1000 have graduated. /* * Application applies a void function to each element in the array, modifying * the array in the process. It is a bit like Groovy's List#collect(Closure) * except that it operates on the array itself and does not construct a new * result. Again, each invocation of the closure potentially occurs on * a different thread. However, in this case the closure is executed at the time * apply() is invoked, not lazily. */ students .apply({ it.adjustedGpa = it.gpa * 1.25 } as Procedure) def s = students.get(0) println "Student $s.id has GPA $s.gpa of 4.0 and $s.adjustedGpa of 5.0" // Output: Student 1 has GPA 2.6 of 4.0 and 3.25 of 5.0 /* * Mapping converting elements from the original array type to a new type, exactly * as you would expect Groovy's List#collect(Closure) to work. In this case, each * closure invocation happens on a different thread and the invocation occurs * lazily as needed. In this particular example, however, each closure executes * on the same thread as the script. Why? The sequentially() method returns the * ParralelArray as an Iterable without parallel evaluation. Oops! Be * careful when mixing Collection and ParallelArray functions. */ def allGpas = students .withMapping({ it.gpa } as ObjectToDouble) def perfectScores = allGpas.sequentially().inject(0, {acc, item -> if (item == 4.0) return acc + 1 else return acc }) println "$perfectScores students have a 4.0 average" // Output: 24 students have a 4.0 average /* * Reduce returns a reduction of the ParallelArray. The Groovy analog is * Collection#inject(Object, Closure). In this example you see how easy it is to * chain a bunch of aggregate operators together. Again, each closure is invoked * on a different thread as needed. */ double bestGpa = students .withFilter({ it.graduationYear == 2008 } as Predicate) .withMapping({ it.gpa } as ObjectToDouble) .reduce({previous, current -> Math.max(previous, current) } as DoubleReducer, 0) println "The best GPA is $bestGpa" //Output: The best PGA is 4.0 /* * Summarization is a convenience for all the common aggregate operators, like * size(), min(), max(), indexOfMin(), indexOfMax(), sum(), and average(). */ SummaryStatistics summary = students .withFilter({ it.graduationYear == 2008 } as Predicate) .withMapping({ it.gpa } as ObjectToDouble) .summary() ParallelArray.metaClass //add [] operators for convenience ParallelArray.metaClass.getAt = { x -> delegate.get(x) } def worstStudent = students[summary.indexOfMin()] def bestStudent = students[summary.indexOfMax()] println "Worst student: $worstStudent.id had GPA $worstStudent.gpa" println "Best student: $bestStudent.id had GPA $bestStudent.gpa" println "Average GPA: ${summary.average()}" /* * --------------------------SUMMARY----------------------------------- * The Fork/Join ParallelArray type and libraries are very cool, even if it * lacks some of the conveniences expected from Groovy libraries. The tasks * and actions, on the other hand, were much harder to grasp and work with. * Defining an algorithm recursively is one thing to learn, but then having * to figure out how to fork the work is another, more difficult problem. * * There's also the issue of the verbose types the framework uses to represent * function types. The public class Ops, for instance, is a convenience class * that defines 132 different function types. Ouch, this is too much to remember, * and it seems like you could use generics to define 3 generic function types * rather than the 132 here: http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166ydocs/. * I shudder to think how hard this would be to write without an IDE with auto- * import and type discovery. Hooray for static typing tools. * * Working with the ParallelArray class was a breeze from Groovy, given that the * IDE helps you cast closures to the correct type. I can't imagine enjoying using * the framework from Java with the proliferation of anonymous inner classes that * it would require. I hesitate to say that the BGGA closure proposal would be * that much better... the example closures fit in one line of code now, but * adding generics and type definitions would probably push these to multiple * lines, at which point it might become a mess to read. A version of the * framework was made compatible with the BGGA proposal, but recent signs point * to no closures in Java 7. Hooray for Groovy for supporting this great today. * * The ParallelArray class is wonderfully useful, but where are ParallelList * and ParallelMap? Tieing the library to an array type limits the usefulness, * and pretty much rules infinite lists out (I know, I know, it's an unbearable * loss). I'd like to see these two types added to the library before release. * * Also, why the inconsistent naming convention? The reduce function is called * reduce, apply apply, but map is called withMapping and filter is called * withFilter. This is another area I humbly submit for cleanup before release. * * None of this solves the issue of mutable state within a multi-threaded * environment. I heard second-hand (http://enfranchisedmind.com/) that deadlock * issues doomed the CILK project, with F/J is partly based on. Tasks are mutable * by nature because the result is filled in at a later date, and it is left up * to the developer to code defensively to avoid deadlocks, which pushes the * developer back down into the assembly language of concurrency: synchronized * and volatile. Is a parallelization framework _not_ based on immutability * doomed to failure? * * Also, my intent was to use Terracotta to create a distributed Fork/Join * executor. But the executors lack the fine grain task control that distributed * systems would need. In the case of a failing fork, there is no way for the * executor to recall the forked problem piece and reassign it to a different * worker or add it back into the deque. The CommonJ framework seems up to the * task, which is what Terracotta recommended using on their forum and docs: * http://www.terracotta.org/confluence/display/explore/Master+Worker. Is a * parallelization framework _not_ based on distributed tasks doomed to failure? * * AND ANOTHER THING! I easily downloaded the framework, browsed the docs, and * started using it with Java 6. Why does this need to be in the JDK? With * Bea's CommonJ http://e-docs.bea.com/wls/docs92/commonj/index.html and * Functional Java http://functionaljava.org/ there seems to be movement in this * space from many sources. I have no idea why F/J needs to be part of the JDK * rather than just another 3rd party library. * * I didn't mean for the gripes section to get so long, and it might be unfair * to criticize a project that is still in the formalization process. It's a * cool project and it's great to see some functional programming coming to the * JDK. The ExecutorServices were a giant leap forward when included in Java 5, * and perhaps the finished version of F/J will be the same thing for Java 7! */