In this tutorial, I will show you ways to convert Flux to List, and FLux to Map example that uses collectList()
, collectSortedList()
, collectMap()
, collectMultimap()
function..
More Practice: Spring Boot WebFlux example: Building Rest API
Contents
Ways to convert Flux into Collection
We will use Flux
methods such as:
collectList()
: accumulate sequence into aMono<List>
.collectSortedList()
: accumulate sequence and sort into aMono<List>
.collectMap()
: convert sequence into aMono<Map>
.collectMultimap()
: convert sequence into aMono<Map>
that each Map’s key can be paired with multi-value (in aCollection
).
Then the Mono
result above will be converted into a real List/Map using block()
method.
Declare & Initialize Flux
There are many ways to initialize a Flux
, in this tutorial, we’re gonna use a simple way with Flux.just()
function.
Flux<String> flux = Flux.just(
"website_0:bezkoder.com",
"meta_0:Java Tutorial",
"meta_1:Project Reactor");
Flux to List Conversion
Flux collectList()
collectList()
will accumulates sequence into a Mono<List>
, then we use block()
method to subscribe to the Mono and block indefinitely until a next signal is received.
List<String> list1 = flux.collectList().block();
list1.forEach(System.out::println);
Result:
website_0:bezkoder.com
meta_0:Java Tutorial
meta_1:Project Reactor
Flux collectSortedList()
collectSortedList()
accumulates sequence and sort into a Mono<List>
, then we use block()
method to subscribe to the Mono and block it.
List<String> list2 = flux.collectSortedList().block();
list2.forEach(System.out::println);
Result:
meta_0:Java Tutorial
meta_1:Project Reactor
website_0:bezkoder.com
Flux to List without block()
If you don’t want to use block()
, you can make it work with Java Disposable.subscribe()
method.
List<String> list3 = new ArrayList<>();
flux.collectList().subscribe(list3::addAll);
list3.forEach(System.out::println);
You should know that converting a Flux to a List/Stream makes the whole thing NOT reactive. So the subscribe()
method doesn’t help you keep away from blocking. You may or may not want this depending on the use-case.
The block()
or subscribe()
method won’t return anything if the Flux is infinite. For example:
// never return
Flux.interval(Duration.ofMillis(1000)).collectList().block();
// never return also
Flux.interval(Duration.ofMillis(1000)).collectList().subscribe();
To prevent blocking indefinitely or for too long, just use Duration
as a parameter of block()
like this: block(Duration.ofMillis(1000))
.
If the subscription does not complete on time, it will throw a timeout exception.
Flux to Map Conversion
Flux collectMap()
Function prototype:
Mono<Map> collectMap(keyExtractor, valueExtractor)
– First, the function converts sequence into a Mono<Map>
.
– Finally, the Mono becomes real List/Map by block()
method.
Map<String, String> map1 = flux
.collectMap(
item -> item.split(":")[0],
item -> item.split(":")[1])
.block();
map1.forEach((key, value) -> System.out.println(key + " -> " + value));
Result:
website -> bezkoder.com
meta_1 -> Project Reactor
meta_0 -> Java Tutorial
Flux collectMultimap()
Function prototype:
Mono<Map<Object, Collection>> collectMultimap(keyExtractor, valueExtractor)
collectMultimap()
: convert sequence into a Mono<Map>
that each Map’s key can be paired with multi-value (in a Collection
).
For example, we’re gonna get a Map with website
and meta
as keys:
Map<String, Collection<String>> map2 = flux
.collectMultimap(
item -> item.split("_[0-9]+:")[0],
item -> item.split(":")[1])
.block();
map2.forEach((key, value) -> System.out.println(key + " -> " + value));
Check the result:
website -> [bezkoder.com]
meta -> [Java Tutorial, Project Reactor]
Implementation
Technology
– Java 8
– Maven 3.6.1
– Reactor Core 3.4.0 with the 2020.0.1 release train.
Source Code
package com.bezkoder.reactor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import reactor.core.publisher.Flux;
public class FluxCollection {
public static void main(String[] args) {
Flux<String> flux = Flux.just(
"website_0:bezkoder.com",
"meta_0:Java Tutorial",
"meta_1:Project Reactor");
System.out.println(">>> flux.collectList() >>>");
List<String> list1 = flux.collectList().block();
list1.forEach(System.out::println);
System.out.println("\n>>> flux.collectSortedList() >>>");
List<String> list2 = flux.collectSortedList().block();
list2.forEach(System.out::println);
System.out.println("\n>>> flux to list without block >>>");
List<String> list3 = new ArrayList<>();
flux.collectList().subscribe(list3::addAll);
list3.forEach(System.out::println);
System.out.println("\n>>> flux.collectMap() >>>");
Map<String, String> map1 = flux
.collectMap(
item -> item.split(":")[0],
item -> item.split(":")[1])
.block();
map1.forEach((key, value) -> System.out.println(key + " -> " + value));
System.out.println("\n>>> flux.collectMultimap() >>>");
Map<String, Collection<String>> map2 = flux
.collectMultimap(
item -> item.split("_[0-9]+:")[0],
item -> item.split(":")[1])
.block();
map2.forEach((key, value) -> System.out.println(key + " -> " + value));
System.out.println("\n>>> flux to map without block >>>");
Map<String, Collection<String>> map3 = new HashMap<>();
flux.collectMultimap(
item -> item.split("_[0-9]+:")[0],
item -> item.split(":")[1])
.subscribe(map3::putAll);
map3.forEach((key, value) -> System.out.println(key + " -> " + value));
}
}
The Result
>>> flux.collectList() >>>
website_0:bezkoder.com
meta_0:Java Tutorial
meta_1:Project Reactor
>>> flux.collectSortedList() >>>
meta_0:Java Tutorial
meta_1:Project Reactor
website_0:bezkoder.com
>>> flux to list without block() >>>
website_0:bezkoder.com
meta_0:Java Tutorial
meta_1:Project Reactor
>>> flux.collectMap() >>>
meta_1 -> Project Reactor
meta_0 -> Java Tutorial
website_0 -> bezkoder.com
>>> flux.collectMultimap() >>>
website -> [bezkoder.com]
meta -> [Java Tutorial, Project Reactor]
>>> flux to map without block() >>>
website -> [bezkoder.com]
meta -> [Java Tutorial, Project Reactor]
Appendix: Getting Reactor
Reactor installation in Maven
– First, import the BOM by adding the following to pom.xml:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2020.0.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
– Next, add dependency:
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
</dependencies>
Reactor installation in Gradle
– First, apply the plugin from the Gradle Plugin Portal:
plugins {
id "io.spring.dependency-management" version "1.0.7.RELEASE"
}
– Next use dependency-management to import the BOM:
dependencyManagement {
imports {
mavenBom "io.projectreactor:reactor-bom:2020.0.1"
}
}
– Finally, add dependency:
dependencies {
implementation 'io.projectreactor:reactor-core'
}
Further Reading
More Practice: Spring Boot WebFlux example: Building Rest API
Your example is on a flux with limited items.
If the flux is endless and I only want to get 10 items from the flux, how to do that?
I don’t want to use take(), as it cancels the flux.
Thanks for any solution!
I love to learn more and more from your tutorials.
This is the kind of tutorial that should be shared around the net. Thanks =)