Over

120,000

Worldwide

Saturday - Sunday CLOSED

Mon - Fri 8.00 - 18.00

Call us

 

Parallel Streams

Parallel Streams

parallel() and parallelStream()

The Streams can be executed in serial or in parallel ways. Java Parallel Streams is a feature added from Java 8 for using multiple cores of the processor. Using this, we divide the code into multiple streams and the final result is the combination of individual outcomes.

When we create a stream it is always a serial stream, if not specified. They are basically non-parallel streams used by a single thread to process their pipeline. But listening to people from Oracle who talk about design choices behind Java 8, we often hear that parallelism was the main motivation, which was the driving force behind lambdas, stream APIs etc.

There are two ways for creating parallel streams in java:

1. Using parallel() method on a stream

2. Using parallelStream() on a Collection

Let us examine each of them.

Here is an example of parallel() method’s usage:

public static void main(String[] args) {

LongStream range1 = LongStream.rangeClosed(1L, 15L);

range1.forEach(System.out::println);

System.out.println(“———“);

//parallel

LongStream range2 = LongStream.rangeClosed(1L, 15L);

range2.parallel().forEach(System.out::println);

}

As a result we will get the first stream in a normally resulted way, but the second stream we will get in a random order depending on the execution of streams on every run time.

Another example is below:

public static void count() {

final long count = IntStream.range(1, 50)

.parallel()

.filter(number -> isOdd(number)).count();

System.out.println(“Count – ” + count);

}

public static boolean isOdd(int number) {

return number % 2 == 1;

}

The result will be:

Count – 25

Well, now we gonna observe the parallelStream() method:

Here is a simple example:

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);

list.stream().forEach(System.out::println);

System.out.println();

list.parallelStream().forEach(System.out::println);

The second forEach printing will have an unordered outputs from 1 to 9 due to parallelStream() method.

The output will be:

1

2

3

4

5

6

7

8

9

6

4

8

2

3

9

7

1

5

Another example we can see below.

This a simple Employee class:

class Employee {

private String name;

private String department;

private int salary;

Employee(String name, String department, int salary) {

this.name = name;

this.department = department;

this.salary = salary;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public int getSalary() {

return salary;

}

public void setSalary(int salary) {

this.salary = salary;

}

public String getDepartment() {

return department;

}

public void setDepartment(String department) {

this.department = department;

}

}

And this is a class where we create several Employees and count their average salary using parallelStream() method in streaming.

public class ParallelStreamExample {

public static void main(String[] args) {

List<Employee> employeeList = new ArrayList<>();

employeeList.add(new Employee(“John”, “IT”, 5800));

employeeList.add(new Employee(“Nick”, “IT”, 4300));

employeeList.add(new Employee(“Alice”, “IT”, 9000));

employeeList.add(new Employee(“Ann”, “Science”, 3500));

employeeList.add(new Employee(“Tom”, “Science”, 8000));

OptionalDouble averageSalary = employeeList.parallelStream()

.filter(e -> e.getDepartment().equals(“IT”))

.mapToInt(Employee::getSalary)

.average();

double avgSalary = 0.0;

if (averageSalary.isPresent()) {

avgSalary = averageSalary.getAsDouble();

}

System.out.println(“Average salary in IT department – ” + avgSalary); }

}

The output will be:

Average salary in IT debt- 6366.666666666667

While running the program which uses parallel stream, you can watch the work of the processors in your Resource monitor. Here is a picture of it from my previous run of the program.

As you can see the processors have worked simultaneously and that’s why it’s called parallel work. The parallel streams are processed by the parent thread that ordered the operation and additionally by the threads in the default JVM’s fork join pool: ForkJoinPool.common(). When executing a parallel stream, it runs in the Common Fork Join Pool, shared by all other parallel streams. You can use the ForkJoinPool constructor with a parallelism level of 4.

ForkJoinPool customThreadPool = new ForkJoinPool(4);

Some experimentation is required to determine the optimal value for different environments, but a good rule of thumb is simply choosing the number based on how many cores your CPU has.

The parallel streams have many advantages:

They can be used in the event of aggregate functions

They can quickly iterate over the large-sized collections

They should be used when the output of the operation is not needed to depend on the order of elements present in source collection

And if we use the parallel streams, but the environment is not multi-threaded, it will create a thread and can affect the new requests coming in.

To see another example done with parallel streams and compared without it, let us look to another example. Below is an example of merge sort, which we know from the time complexity perspective is known as an efficient sorting solution as it is O (n log (n)). The algorithm is recursively breaking down a problem into two or more sub-problems.

Here is the algorithm in merge sort:

public class MergeSort {

void merge(int[] arr, int left, int mid, int right) {

int n1 = mid – left + 1;

int n2 = right – mid;

int[] L = new int[n1];

int[] R = new int[n2];

if (n1 >= 0) System.arraycopy(arr, left, L, 0, n1);

for (int j = 0; j < n2; ++j)

R[j] = arr[mid + 1 + j];

int i = 0, j = 0;

int k = left;

while (i < n1 && j < n2) {

if (L[i] <= R[j]) {

arr[k] = L[i];

i++;

} else {

arr[k] = R[j];

j++;

}

k++;

}

while (i < n1) {

arr[k] = L[i];

i++;

k++;

}

while (j < n2) {

arr[k] = R[j];

j++;

k++;

}

}

void sort(int[] arr, int left, int right) {

if (left < right) {

int m = left + (right – left) / 2;

sort(arr, left, m);

sort(arr, m + 1, right);

merge(arr, left, m, right);

}

}

}

Here we have nothing parallelized and so one thread is going to perform through the sorting time. But if we need to implement an algorithm with a parallel solution, we must limit the number of threads at first, which should be close to the number of CPU cores as it is the most optimal number of threads.

Now we want to have a parallelized solution and we must use here the fork/join pattern, which is forking a big task into smaller ones and aggregating the results.

public class ParallelMergeSort extends RecursiveAction {

private static final int SORT_THRESHOLD = 128;

private final int[] values;

private final int from;

private final int to;

public ParallelMergeSort(int[] values) {

this(values, 0, values.length 1);

final ForkJoinPool forkJoinPool =

new ForkJoinPool(Runtime.getRuntime().availableProcessors() 1); forkJoinPool.invoke(new ParallelMergeSort(values, 0, values.length 1)); }

public ParallelMergeSort(int[] values, int from, int to) {

this.values = values;

this.from = from;

this.to = to;

}

public void sort() {

compute();

}

@Override

protected void compute() {

if (from < to) {

int size = to from;

if (size < SORT_THRESHOLD) {

insertionSort();

} else {

int mid = from + Math.floorDiv(size, 2);

invokeAll(

new ParallelMergeSort(values, from, mid),

new ParallelMergeSort(values, mid + 1, to)); merge(mid);

}

}

}

private void insertionSort() {

for (int i = from + 1; i <= to; ++i) {

int current = values[i];

int j = i – 1;

while (from <= j && current < values[j]) {

values[j + 1] = values[j–];

}

values[j + 1] = current;

}

}

private void merge(int mid) {

int[] left = Arrays.copyOfRange(values, from, mid + 1); int[] right = Arrays.copyOfRange(values, mid + 1, to + 1); int f = from;

int li = 0, ri = 0;

while (li < left.length && ri < right.length) {

if (left[li] <= right[ri]) {

values[f++] = left[li++];

} else {

values[f++] = right[ri++];

}

}

while (li < left.length) {

values[f++] = left[li++];

}

while (ri < right.length) {

values[f++] = right[ri++];

}

}

}

Here we have extended RecursiveAction and implemented the core logic in a compute() method. For testing both of those sorting ways we can see the result.

public class TestSort {

public static void test() {

int[] serial = new Random().ints(100_000_000).toArray();

int[] parallel = Arrays.copyOf(serial, serial.length);

long start;

MergeSort mergeSort = new MergeSort();

start = System.currentTimeMillis();

mergeSort.sort(serial, serial.length / 2, serial.length – serial.length / 2);

System.out.println(“Merge Sort done in: “

+ (System.currentTimeMillis() – start));

ParallelMergeSort sorter = new ParallelMergeSort(parallel);

start = System.currentTimeMillis();

sorter.sort();

System.out.println(“Parallel Merge Sort done in: “

+ (System.currentTimeMillis() – start));

}

public static void main(String[] args) {

test();

}

}

The result will be:

Merge Sort done in: 13476

Parallel Merge Sort done in: 3775,617

So clearly parallel merge sort is much faster.

To conclude this, we can say that parallel streams are convenient to use while having multiple threads and in case of using aggregate functions or iterate over big collections. This can be a useful tool in those cases. These two methods actually don’t have differences, according to Java docs parallelStream() method tries to return a parallel stream, whereas stream.parallel() returns a parallel stream.

Leave a Reply

Your email address will not be published. Required fields are marked *

Working Hours

  • Monday 9am - 6pm
  • Tuesday 9am - 6pm
  • Wednesday 9am - 6pm
  • Thursday 9am - 6pm
  • Friday 9am - 6pm
  • Saturday Closed
  • Sunday Closed