Understanding Java Streams - Part Two

Most stream operations accept some kind of lambda expression parameter, a functional interface specifying the exact behaviour of the operation.

Understanding Java Streams - Part Two
Java Streams

Hi there, welcome to part two of Understanding Java 8 Streams. Before you move on, you must be aware of lambda expressions, Optional, method references and have gone through part one of Understanding Java 8 Streams.

In this article, we'll go through the usage of Java 8 Streams in depth like how to use the different kind of stream operations available, processing order and how ordering of stream operations affect runtime performance. Most stream operations accept some kind of lambda expression parameter, a functional interface specifying the exact behaviour of the operation. Most of those operations must be both non-interfering and stateless. What does that mean?

A function is non-interfering when it doesn't modify the underlying data source of the stream and a function is stateless when the execution of the operation is deterministic. Consider the below example:

List<String> languageList = Arrays.asList("Java", "Python", "Go", "C", "JavaScript");
languageList
	.stream()
	.filter(language -> language.startsWith("J"))
	.map(String::toUpperCase)
	.sorted()
	.forEach(System.out::println);

// JAVA
// JAVASCRIPT

In the above example, no lambda expression does modify languageList by adding or removing elements from the collection and this is non-interfering. And it is stateless because no lambda expression depends on any mutable variables or states from the outer scope which might change during execution.

Stream Creation and Different Kind of Streams:

We can create a Stream instance in many ways from various data sources. Once the stream has been created, we will not modify its source. This will allow us for the creation of muliple instances from a single source. Lists and Sets support new methods stream() and parallelStream() to either create a sequential or a parallel stream. Parallel stream operate on mutliple threads. Let's see how to create an empty stream.

Empty Stream:

If you want to create an empty stream, you can make use of empty() method.

Stream<String> emptyStream = Stream.empty();

Let's say we wouldn't want to create a stream with a null element; that could result in a NullPointerException at some point. To avoid that we can check for null and return an empty stream.

Stream<String> result = languageList == null || languageList.isEmpty() ? Stream.empty() : languageList.stream();
Stream<Integer> result = someNumber != null ? Stream.of(someNumber) : Stream.empty();

Stream of Collection:

We can create Stream from any type of Collection (Collection, List, Set) as shown in the below example:

Collection<String> fruitCollection = Arrays.asList("Guava", "Apple", "Strawberry", "Mango");
Stream<String> fruitStream = fruitCollection.stream();

Arrays.asList("Guava", "Apple", "Strawberry", "Mango")
	.stream()
	.findFirst()
	.ifPresent(System.out::println); // Guava

Calling the method stream() on a list of objects returns a regular object stream. But we don't have to create collections in order to work with streams as shown in below example:

Stream.of("Guava", "Apple", "Mango")
	.findFirst()
	.ifPresent(System.out::println); // Guava

Stream of Array:

We can also use array as a source of Stream.

Stream<String> arrayStream = Stream.of("Azure", "AWS", "Google Cloud");

We use Stream.of() to create a stream from a bunch of object references. They can also be created out of an existing array or a part of an array.

String[] cloudServices = new String[]{"Azure", "AWS", "Google Cloud"};
Stream<String> fullArrayStream = Arrays.stream(cloudServices);
Stream<String> partArrayStream = Arrays.stream(cloudServices, 1, 3);

Stream.builder():

To create a stream, we can also use builder but the desired type should be additionally specified in the right part of the statement because build() method will create an instance of the Stream<Object>.

Stream<String> streamBuilder = Stream.<String>builder().add("x").add("y").add("z").build();

Stream.generate():

We can use generate() method to create a Stream, this method takes a Supplier<T> for element generation. As a result the stream is infinite, it is developer's responsibility to specify the size. The generate() method will work until it reaches the memory limit if size is not specified.

Stream<String> generatedStream = Stream.generate(() -> "item").limit(5);

The above line of code creates a sequence of five strings with the value "item".

Strean.iterate():

We have another way to create an infinite stream, this is by using iterate() method.

Stream<Integer> iteratedStream = Stream.iterate(29, n -> n + 1).limit(5);

The first element of the resulting stream is a first parameter of the iterate() method. For creating every following element the specified function is applied to the previous element. In the above code, the second element would be 30.

Stream of Primitives:

Apart from regular object streams, we have special kind of streams to work with primitive data types: int, long and double and they are IntStream, LongStream and DoubleStream. These special interfaces were created because Stream<T> is a generic interface and there is no way to use primitives as a type parameter with generics.

We can use IntStream.range() to replace the regular for-loop as shown in below example.

IntStream.range(1, 5)
	.forEach(System.out::println);
// 1
// 2
// 3
// 4
LongStream longStream = LongStream.rangeClosed(1, 3);
  • The range(int startInclusive, int endExclusive) method creates an ordered stream from the first parameter to the second parameter. It increments the value of subsequent elements with the step equal to 1. The result doesn't include the last parameter, it is just an upper bound of the sequence.
  • The rangeClosed(int startInclusive, int endInclusive) method does the same with only one difference - the second element is included. These two methods can be used to generate any of the three types of streams of primitives. 

We can make use of Random class for generating streams of primitives as it has wide range of methods for it. For example, the below code creates a DoubleStream, which has three elements.

Random random = new Random();
DoubleStream doubleStream = random.doubles(3);

All the primitive stream work like normal object streams. But there are few differences: primitive streams use a different lambda expressions, e.g. IntFunction instead of Function or IntPredicate instead of Predicate. They also support additional terminal operations like sum()max() and average():

Arrays.stream(new int[]{1, 2, 3})
	.map(n -> n + 1)
	.average()
	.ifPresent(System.out::println); // 3.0

Sometimes we may want to transform a regular object stream to a primitive stream or vice versa. For these kind of scenarios, object streams have special mapping operations mapToInt()mapToLong() and mapToDouble():

Stream.of("x1", "y2", "z3")
	.map(item -> item.substring(1))
	.mapToInt(Integer::parseInt)
	.max()
	ifPresent(System.out::println); // 3

We can transform primitive streams to object stream via mapToObj():

IntStream.range(1, 3)
	.mapToObj(n -> "x" + n)
	.forEach(System.out::println);
// x1
// x2
//-- Combination--
Stream.of(1.0, 2.0, 3.0)
	.mapToInt(Double::intValue)
	.mapToObj(n -> "x" + n)
	.forEach(System.out::println);
// x1
// x2
// x3

Processing Order:

We have seen how to work with different kinds of streams, let's look at how stream operations are processed under the hood. An important characteristic of intermediate operations is laziness. Look at the below example, where a terminal operation is not specified.

Stream.of("x1", "y2", "z3")
	.filter(s -> {
		System.out.println("filter: " + s);
		return true;
	});

When you execute the above code, nothing is printed to the console. The reason is that intermediate operations will only be exeucted when a terminal operations is present. Let's add a terminal operation forEach to it:

Stream.of("x1", "y2", "z3")
	.filter(s -> {
		System.out.println("filter: " + s);
		return true;
	})
	.forEach(s -> System.out.println("forEach:" + s);

Executing the above code snippet gives the following output on the console:

// filter:  x1
// forEach: x1
// filter:  y2
// forEach: y2
// filter:  z3
// forEach: z3

Here, you can see that the first string "x1" passes filter then forEach, only then the second string "y2" is processed. This behaviour can reduce the actual number of operations performed on each element as shown in the below example.

Stream.of("x1", "y1", "y2", "z3")
	.map(s -> {
		System.out.println("map: " + s);
		return s.toUpperCase();
	})
	.anyMatch(s -> {
		System.out.println("anyMatch: " + s);
		return s.startsWith("Y")
	});
// map: x1
// anyMatch: X1
// map: y1
// anyMatch: Y1

The termial operation anyMatch returns true as soon as the predicate applies to the given input element. This is true for the second element passed "Y2". Due to the vertical execution of the stream chain, map has only to be executed twice in this case. So, instead of mapping all elements of the stream, map will be called as few as possible.

Why order is important?

The following example contains two intermediate operations map and filter and the terminal operation forEach. Let's understand how these operations are being execute.

Stream.of("x1", "x2", "y1", "z3")
			.map(s -> {
				System.out.println("map: " + s);
				return s.toUpperCase();
			})
			.filter(s -> {
				System.out.println("filter: " + s);
				return s.startsWith("Y");
			})
			.forEach(s -> System.out.println("forEach: " + s));

// map: x1
// filter: X1
// map: y1
// filter: Y1
// forEach: Y1
// map: z1
// filter: Z1
// map: z2
// filter: Z2
// map: z3
// filter: Z3

As you might have guessed both map and filter are called five times for every string in the underlying collection wheras forEach is called only once. Now if we change the order of operations moving filter to the beginning of the chain, we can see a great reduction in actual number of executions as shown in the following example:

Stream.of("x1", "y1", "z1", "z2", "z3")
			.filter(s -> {
						System.out.println("filter: " + s);
						return s.startsWith("y");
			})
			.map(s -> {
				System.out.println("map: " + s);
				return s.toUpperCase();
			})
			.forEach(s -> System.out.println("forEach: " + s));

// filter: x1
// filter: y1
// map: y1
// forEach: Y1
// filter: z1
// filter: z2
// filter: z3

Now, map is only called once. So the operation pipeline performs much faster for larger numbers of input elements. Keep that in mind when composing complex method chains.

Let's move on to extend the above example by adding an operation sorted:

Stream.of("x1", "y1", "z1", "z2", "z3")
			.sorted((s1, s2) -> {
				System.out.printf("sort: %s=%s\n", s1, s2);
				return s1.compareTo(s2);
			})
			.filter(s -> {
				System.out.println("filter: " + s);
				return s.startsWith("y");
			}).map(s -> {
				System.out.println("map: " + s);
				return s.toUpperCase();
			}).forEach(s -> System.out.println("forEach: " + s));

// sort: y1=x1
// sort: z1=y1
// sort: z2=z1
// sort: z3=z2
// filter: x1
// filter: y1
// map: y1
// forEach: Y1
// filter: z1
// filter: z2
// filter: z3

Sorting is a special kind of intermediate operation. It is also called stateful operation since in order to sort a collection of elements you have to maintain state during ordering. First the sort operation is performed on the entire input collection. That means, sorted is executed horizontally. So in this case, sorted is called four times for multiple combinations on every element in the input collection. Once again we can optimize the execution/runtime performance by reordering the chain as shown below:

Stream.of("x1", "y1", "z1", "z2", "z3")
			.filter(s -> {
				System.out.println("filter: " + s);
				return s.startsWith("y");
			})
			.sorted((s1, s2) -> {
				System.out.printf("sort: %s=%s\n", s1, s2);
				return s1.compareTo(s2);
			})
			.map(s -> {
				System.out.println("map: " + s);
				return s.toUpperCase();
			}).forEach(s -> System.out.println("forEach: " + s));

In the above examle, sorted is never been called filter because filter reduces the input collection to just one element. So the performance is greatly increased for larger input collections.

Reusing streams:

Java 8 streams cannot be reused. The stream is closed as soon as you call any terminal operation.

Stream<String> stream = Stream.of("x1", "y1", "z1", "z2", "z3")
                .filter(s -> s.startsWith("y"));
stream.anyMatch(s -> true);
stream.noneMatch(s -> true);   // exception

Trying to call noneMatch after anyMatch on the same stream results in the following exception.

java.lang.IllegalStateException: stream has already been operated upon or closed
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
        at java.base/java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:538)
        at Main.main(Main.java:19)

To overcome this limitation, we have to create a new stream chain for every terminal operation we want to execute. For example, we could create a stream supplier to construct a new stream with all intermediiate operations already set up.

Supplier<Stream<String>> streamSupplier = () -> Stream.of("x1", "y1", "z1", "z2", "z3")
			.filter(s -> s.startsWith("y"));
streamSupplier.get().anyMatch(s -> true); // ok
streamSupplier.get().noneMatch(s -> true); // ok

Each call to get() constructs a new stream on which we are save to call the desired terminal operation. 

Let's stop it here. In part three we will see some advanced operations. Cheers!