Skip to content

Latest commit

 

History

History
183 lines (139 loc) · 5.83 KB

File metadata and controls

183 lines (139 loc) · 5.83 KB

Fork/Join ( Parallel Programming )

  • Fork/Join Framework simplifies the creation and use of multiple threads and automatically makes use of multiple processors available.
  • The framework consists of following four classes
ForkJoinTask<V> An abstract class that defines a task
ForkJoinPool Manages the execution of ForkJoinTasks
RecursiveAction A subclass of ForkJoinTask<V> for tasks that do not return values
RecursiveTask<V> A subclass of ForkJoinTask<V> for the tasks that return values

ForkJoinTask<V>

  • It is an abstract class that defines a task that can be managed by a ForkJoinPool.
  • The type parameter V defines the return type of the task.
  • ForkJoinTasks are executed by threads managed by a thread pool of type ForkJoinPool.
  • At its core are fork() and join() methods.
    final ForkJoinTask<V> fork()
    final V join()
  • The fork method submits the invoking task for asynchronous execution of the invoking task.
  • The join method waits until the task on which it is called terminates.
  • Another method final V invoke() is a combination of both fork and join operations into a single call. it begins a task and then waits for it to end.

RecursiveAction

  • It is a subclass of ForkJoinTask, it encapsulates a task that does not return a result.
  • It is used to implement a recursive, divide-and-conquer strategy for tasks that don't return results.
  • It has a protected abstract void compute() method that needs to be implemented by a subclass, unless that subclass is also abstract.

RecursiveTask<V>

  • It is also a subclass of ForkJoinTask. it encapsulates a task that returns a result of type V.
  • Like RecursiveAction, it is also a abstract method. protected abstract V compute().
  • It is also used to implement a recursive, divide-and-conquer strategy for tasks that returns result.

ForkJoinPool

  • The execution fo ForkJoinTasks takes place within a ForkJoinPool.
  • In order to execute a ForkJoinTask, you must first have a ForkJoinPool.
  • There are two ways to acquire a ForkJoinPool.
    • using ForkJoinPool constructor.
    • using common pool static ForkJoinPool commonPool().
  • ForkJoinPool manages the execution of its threads using an approach called work-stealing. each worker threads maintains a queue of tasks. If one worker thread's queue is empty, it will take a task from another worker thread.
  • ForkJoinPool uses daemon threads. A daemon thread is automatically terminated when all user threads have terminated. Thus, no need to explicitly shutdown a ForkJoinPool.
  • However, with common pool it is possible to call shutdown() but it has no effect on the common pool.
  • Threads used by ForkJoinPool are daemon threads. so main thread needs to be alive until tasks are completed.

Fork and invoke() will start a task using the common pool if the task is not already running within a ForkJoinPool.

Recursive Action Example

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;


class Transformer extends RecursiveAction {
  private int threshold = 1000;
  int[] data;
  int start,end;


  Transformer(int[] vals,int s, int e){
    this.data = vals;
    this.start = s;
    this.end = e;
  }

  /**
   * The main computation performed by this task.
   */
  @Override
  protected void compute() {
    if(end - start < this.threshold){
      for(int i = start;i<end;i++){
        data[i] = data[i] * 2;
      }
    }else{
      int middle = (start + end) / 2;
      invokeAll(new Transformer(data,start,middle), new Transformer(data,middle,end));
    }
  }
}


public class Main {

  public static void main(String[] args) {

    int[] arr = new int[100000];
    for(int i=0;i<arr.length;i++){
      arr[i] = i;
    }

    System.out.println("A portion of the original sequence: ");
    for(int i=0;i<10;i++){
      System.out.print(arr[i] + " ");
    }

    System.out.println();

    Transformer trnsfrm = new Transformer(arr,0,arr.length);
    new ForkJoinPool().invoke(trnsfrm); // create new pool to use
//        ForkJoinPool.commonPool().invoke(trnsfrm); // get reference to common pool and uses it
//        trnsfrm.invoke(); // uses common pool

    System.out.println("sequence of the transformed sequence: ");
    for(int i=0;i<10;i++){
      System.out.print(arr[i] + " ");
    }
    System.out.println();
  }
}

RecursiveTask<V> Example

import java.util.concurrent.RecursiveTask;


class Summation extends RecursiveTask<Double> {
    private int threshold = 500;
    int[] data;
    int start,end;


    Summation(int[] vals,int s, int e){
        this.data = vals;
        this.start = s;
        this.end = e;
    }

    /**
     * The main computation performed by this task.
     */
    @Override
    protected Double compute() {
        double sum = 0;
        if(end - start < this.threshold){
            for(int i = start;i<end;i++){
                sum += data[i];
            }
        }else{
            int middle = (start + end) / 2;
            Summation taskA = new Summation(data,start,middle);
            Summation taskB = new Summation(data,middle,end);
            taskA.fork();
            taskB.fork();
            sum += taskA.join() + taskB.join();
        }
        return sum;
    }
}


public class Main {

    public static void main(String[] args) {

        int[] arr = new int[50000];
        for(int i=0;i<arr.length;i++){
            arr[i] = i%2==0 ? i : -i;
        }

        Summation sum = new Summation(arr,0,arr.length);

        long beginT = System.nanoTime();
        System.out.println("result: " + sum.invoke());
        long endT = System.nanoTime();

        System.out.println("Time taken: " + (endT - beginT) + "ns");

        System.out.println();
    }
}