Understanding Java Streams - Part Three

Hi there, welcome to part three and the final one. In this article we will see some advanced operations. Let's get started.
Advanced Operations:
Streams support plenty of different operations. Let's dive deeper into the more complex operations collect, flatMap and reduce.
For most of the examples, we will use the following class for demonstration purposes.
ublic class Person {
String name;
int age;
public Person(String name, int age) {
this.name = name;
this.age = age;
}
public int getName() {
return name;
}
public int getAge() {
return age;
}
@Override
public String toString() {
return this.name;
}
}
Collect:
Collect is very useful terminal operation which is used to transform the elements of the stream into a different kind of result, e.g. a List, Set or Map. Collect accepts a Collector which consists of four different operations: a supplier, an accumulator, a combiner and a finisher. This sounds complicated at first, but the good part is Java 8 supports various built-in collectors via the Collectors class. So for the most common operations you don't have to implement a collector yourself.
Let's start with a very common use case:
List<Person> people = Arrays.asList(
new Person("Srivastava", 25),
new Person("Vivek", 22),
new Person("Vasu", 26),
new Person("Akruthi", 26)
);
List<Person> filteredPeople = people
.stream()
.filter(person -> person.getName().startsWith("S"))
.collect(Collectors.toList());
System.out.println(filteredPeople); // [Srivastava]
As you can see it's simple to construct a list from the elements of a stream. If you need a set instead of list - just use Collectors.toSet()
.
Let's group all persons by age.
Map<Integer, List<Person>> personsByAge = people
.stream()
.collect(Collectors.groupingBy(Person::getAge));
personsByAge
.forEach((age, person) -> System.out.format("age %s: %s\n", age, person));
//age 26: [Vasu, Akruthi]
//age 22: [Vivek]
//age 25: [Srivastava]
Collectors are versatile. You can also create aggregations on the elements of the stream. For example, determining the average age of all persons.
Double averageAge = people
.stream()
.collect(Collectors.averagingInt(Person::getAge));
System.out.println(averageAge); // 24.75
If you are interested in statistics, the summarizing collectors return a special built-in summary statistics object. So we can simply determine min, max and average of the persons as well as the sum and count.
IntSummaryStatistics ageSummary = people
.stream()
.collect(Collectors.summarizingInt(Person::getAge));
System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=99, min=22, average=24.750000, max=26}
See the below example on how to join all persons into a string.
String phrase = people
.stream()
.filter(person -> person.getAge() > 25)
.map(Person::getName)
.collect(Collectors.joining(" and ", "Hello ",
" are of same age."));
System.out.println(phrase);
// Hello, Vasu and Akruthi are of same age.
The join collector accepts a delimiter as well as an optional prefix and suffix.
In order to transform the stream elements into a map, we have to specify how both the keys and values should be mapped. The mapped keys must be unique, otherwise an IllegalStateException
is thrown. You can optionally pass a merge function as an additional parameter to bypass the exception:
Map<Integer, String> map = people
.stream()
.collect(Collectors.toMap(
Person::getAge,
Person::getName,
(name1, name2) -> name1 + "; " + name2
));
System.out.println(map); // {22=Vivek, 25=Srivastava, 26=Vasu; Akruthi}
Now that we have seen most powerful built-in collectors, you can try to build your own special collector. Let's try to build our collector. We want to transform all persons of the stream into a single string consisting of all names in upper letters separated by the |
pipe character. In order to achieve this we create a new collector via Collector.of()
. We need to pass: a supplier, an accumulator, a combiner and a finisher as I said earlier.
Collector<Person, StringJoiner, String> personNameCollector = Collector
.of(
() -> new StringJoiner(" | "), // supplier
(j, p) -> j.add(p.getName().toUpperCase()), // accumulator
StringJoiner::merge, // combiner
StringJoiner::toString // finisher
);
String names = people
.stream()
.collect(personNameCollector);
System.out.println(names); // SRIVASTAVA | VIVEK | VASU | AKRUTHI
Since strings in Java are immutable, we need a helper class like StringJoiner
to let the collector construct our string. The supplier initially constructs such a StringJoiner with the appropriate delimiter. The accumulator is used to add each persons upper-cased name to the StringJoiner. The combiner knows how to merge two StringJoiner's into one. In the last step, the finisher constructs the desired String from the StringJoiner.
FlatMap:
We've already seen how to transform the objects of a stream into another type of objects by utilizing the map operation. Map is kind of limited because every object can only be mapped to exactly one other object. But what if we wantt to transform one object into multiple others or none at all? This is where flatMap
comes to the rescue.
FlatMap transforms each element of the stream into a stream of oter objects. So each object will be transformed into zero, one or multiple other objects backed by streams. The contents of those streams will then be placed into the returned stream of the flatMap operation.
Before we see flatMap in action we need an appropriate type of hierarchy:
class Foo {
String name;
List<Bar> bars = new ArrayList<>();
Foo(String name) {
this.name = name;
}
}
class Bar {
String name;
Bar(String name) {
this.name = name;
}
}
Next we instantiate a couple of objects.
List<Foo> foos = new ArrayList<>();
IntStream.range(1, 4)
.forEach(i -> foos.add(new Foo("Foo" + i)));
foos.forEach(f ->
IntStream.range(1, 4)
.forEach(i ->
f.getBars().add(new Bar(
"Bar" + i + " <- " + f.getName()
))
)
);
Now we have a list of three foos each consisting of three bars.
FlatMap accepts a function which has to return a stream of ojects. So in order to resolve the bar objects of each foo, we just pass the appropriate function:
foos.stream()
.flatMap(f -> f.getBars().stream())
.forEach(b -> System.out.println(b.getName()));
//Bar1 <- Foo1
//Bar2 <- Foo1
//Bar3 <- Foo1
//Bar1 <- Foo2
//Bar2 <- Foo2
//Bar3 <- Foo2
//Bar1 <- Foo3
//Bar2 <- Foo3
//Bar3 <- Foo3
As you can see, we transformed the stream of three foo objects into a stream o nine bar objects. Now the above code can be simplified into a single pipeline of stream operations.
IntStream.range(1, 4)
.mapToObj(i -> new Foo("Foo" + i))
.peek(f -> IntStream.range(1, 4)
.mapToObj(i -> new Bar("Bar" + i + " <- " + f.getName()))
.forEach(f.getBars()::add)
)
.flatMap(f -> f.getBars().stream())
.forEach(b -> System.out.println(b.getName()));
FlatMap is also available for the Optional class introduced in Java 8. Optional's flatMap operation returns an optional object of another type. So it can be utilized to prevent nasty null checks.
Think of a highly hierarchical structure like this:
class Outer {
Nested nested;
public Nested getNested() {
return nested;
}
}
class Nested {
Inner inner;
public Inner getInner() {
return inner;
}
}
class Inner {
String foo;
public String getFoo() {
return foo;
}
}
In order to resolve the inner string foo of an outer instance you have to add multiple null checks to prevent NullPointerExceptions.
Outer outer = new Outer();
if (outer != null && outer.getNested() != null &&
outer.getNested().getInner() != null) {
System.out.println(outer.getNested().getInner().getFoo());
}
The same behaviour can be obtained by utilizing optionals flatMap operation.
Optional.of(new Outer())
.flatMap(outer -> Optional.ofNullable(outer.getNested()))
.flatMap(nested -> Optional.ofNullable(nested.getInner()))
.flatMap(inner -> Optional.ofNullable(inner.getFoo()))
.ifPresent(System.out::println);
Each call to flatMap returns an Optional wrapping the desired objects if present or null if absent.
Reduce:
The reduction operation combines all elements of the stream into a single result. Java 8 supports three different kind of reduce methods. The first one reduces a stream of elements to exacty one element of the stream. Let's see how we can use this method to determine the oldest person:
people.stream()
.reduce((p1, p2) -> p1.getAge() > p2.getAge() ? p1 : p2)
.ifPresent(System.out::println); // Akruthi
The reduce method accepts a BinaryOperator
accumulator function. That's actually a BiFunction
where both operands share the same type, in that case Person. BiFunctions are like Function but accept two arguments. The example function compares both persons age in order to return the person with the maximum age.
The second reduce method accepts both an identity value and a BinaryOperator accumulator. This method can be utilized to construct a new Person with the aggregated names and ages from all other persons in the stream.
Person result = people.stream()
.reduce(new Person("", 0), (p1, p2) -> {
p1.age += p2.getAge();
p1.name += p2.getName();
return p1;
});
System.out.format("name=%s; age=%s", result.getName(), result.getAge());
// name=SrivastavaVivekVasuAkruthi; age=99
The third reduce method accepts three parameters: an identity value, a BiFunction accumulator and a combiner function of type BinaryOperator. Since the identity values type is not restricted to the Person type, we can utilize this reduction to determince the sum of ages from all persons:
Integer ageSum = people.stream()
.reduce(0, (sum, p) -> sum += p.age, Integer::sum);
System.out.println(ageSum); // 99
As you can see the result is 99, but what's happening exactly under the hood? Let's extend the code by some debug output:
Integer ageSum = people.stream()
.reduce(0, (sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
}, (sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
System.out.println(ageSum);
//accumulator: sum=0; person=Srivastava
//accumulator: sum=25; person=Vivek
//accumulator: sum=47; person=Vasu
//accumulator: sum=73; person=Akruthi
//99
As you can see the accumulator function does all the work. It first gets called with the initial identity value 0 and the first column Max. In the next three steps sum continualy increases by the age of the last steps person up to a total age of 99.
But wait, where is the execution of combiner? Executing he same stream in parallel will lift the secret:
Integer ageSum = people.parallelStream()
.reduce(0, (sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
}, (sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
System.out.println(ageSum);
//accumulator: sum=0; person=Vasu
//accumulator: sum=0; person=Srivastava
//accumulator: sum=0; person=Vivek
//accumulator: sum=0; person=Akruthi
//combiner: sum1=25; sum2=22
//combiner: sum1=26; sum2=26
//combiner: sum1=47; sum2=52
//99
Executing the stream in parallel results in an entirely different execution behavior. Now the combiner is actually called. Since the accumulator is called in parallel, the combiner is needed to sum up the separate accumulated values.
Parallel Streams:
Streams can be executed in parallel to increase runtime performance on large amount of input elements. Parallel streams use a common ForkJoinParallel
available via the static ForkJoinPool.commonPool()
method. The size of the underlying thread-pool uses up to five threads - depending on the amount of available physical CPU cores:
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism()); // 3
On my machine the common pool is initialized with a parallelism of 3 per default. This value can be increased or decreased by setting the following JVM parameter:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
Collections support the method parallelStream()
to create a parallel stream of elements. Alternatively you can call the intermediate method parallel()
on a given stream to convert a sequential stream to a parallel counterpart.
In order to understate the parallel execution behaviour of a parallel stream the next example prints information about the current thread:
Arrays.asList("x1", "x2", "y1", "z1", "z2")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n", s,
Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n", s,
Thread.currentThread().getName());
return s.toUpperCase();
})
.forEach(s ->
System.out.format("forEach: %s [%s]\n", s,
Thread.currentThread().getName()));
By investigating the debug output we should get a better understanding which threads actually used to execute the stream operations:
// Output
filter: y1 [main]
filter: z1 [ForkJoinPool.commonPool-worker-7]
map: z1 [ForkJoinPool.commonPool-worker-7]
filter: x2 [ForkJoinPool.commonPool-worker-5]
filter: z2 [ForkJoinPool.commonPool-worker-3]
map: z2 [ForkJoinPool.commonPool-worker-3]
map: x2 [ForkJoinPool.commonPool-worker-5]
forEach: Z1 [ForkJoinPool.commonPool-worker-7]
map: y1 [main]
filter: x1 [ForkJoinPool.commonPool-worker-7]
map: x1 [ForkJoinPool.commonPool-worker-7]
forEach: X1 [ForkJoinPool.commonPool-worker-7]
forEach: X2 [ForkJoinPool.commonPool-worker-5]
forEach: Z2 [ForkJoinPool.commonPool-worker-3]
forEach: Y1 [main]
As you can see the parallel stream utilizes all available threads from the common ForkJoinPool for executing the stream operations. The output may differ in consecutive runs because the behvaiour which particular thread is actually used is non-deterministic.
Let's extend the example by an additional stream operation, sort:
Arrays.asList("x1", "x2", "y1", "z1", "z2")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n", s,
Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map %s [%s]\n", s,
Thread.currentThread().getName());
return s.toUpperCase();
})
.sorted((s1, s2) -> {
System.out.format("sort: %s <> %s [%s]\n", s1, s2,
Thread.currentThread().getName());
return s1.compareTo(s2);
})
.forEach(s -> System.out.format("forEach: %s [%s]\n", s,
Thread.currentThread().getName()));
The result may look strange at first:
// Output
filter: y1 [main]
filter: x2 [ForkJoinPool.commonPool-worker-5]
map x2 [ForkJoinPool.commonPool-worker-5]
filter: z1 [ForkJoinPool.commonPool-worker-7]
map z1 [ForkJoinPool.commonPool-worker-7]
filter: z2 [ForkJoinPool.commonPool-worker-3]
map z2 [ForkJoinPool.commonPool-worker-3]
filter: x1 [ForkJoinPool.commonPool-worker-5]
map x1 [ForkJoinPool.commonPool-worker-5]
map y1 [main]
sort: X2 <> X1 [main]
sort: Y1 <> X2 [main]
sort: Z1 <> Y1 [main]
sort: Z2 <> Z1 [main]
forEach: Y1 [main]
forEach: Z2 [main]
forEach: Z1 [main]
forEach: X1 [ForkJoinPool.commonPool-worker-5]
forEach: X2 [ForkJoinPool.commonPool-worker-7]
It seems that sort is executed sequentially on the main thread only. Actually, sort on a parallel stream uses the new Java 8 method Arrays.parallelSort()
under the hood. This method decides on the length of the array if sorting will be performed sequentially or in parallel.
Thank you, cheers!