Convert Flux into List, Map – Reactor

In this tutorial, I will show you ways to convert Flux to List, and FLux to Map example that uses collectList(), collectSortedList(), collectMap(), collectMultimap() function..

More Practice: Spring Boot WebFlux example: Building Rest API


Ways to convert Flux into Collection

We will use Flux methods such as:

  • collectList(): accumulate sequence into a Mono<List>.
  • collectSortedList(): accumulate sequence and sort into a Mono<List>.
  • collectMap(): convert sequence into a Mono<Map>.
  • collectMultimap(): convert sequence into a Mono<Map> that each Map’s key can be paired with multi-value (in a Collection).

Then the Mono result above will be converted into a real List/Map using block() method.

Declare & Initialize Flux

There are many ways to initialize a Flux, in this tutorial, we’re gonna use a simple way with Flux.just() function.

Flux<String> flux = Flux.just(
						"website_0:bezkoder.com", 
						"meta_0:Java Tutorial",
						"meta_1:Project Reactor");

Flux to List Conversion

Flux collectList()

collectList() will accumulates sequence into a Mono<List>, then we use block() method to subscribe to the Mono and block indefinitely until a next signal is received.

List<String> list1 = flux.collectList().block();
list1.forEach(System.out::println);

Result:

website_0:bezkoder.com
meta_0:Java Tutorial
meta_1:Project Reactor

Flux collectSortedList()

collectSortedList() accumulates sequence and sort into a Mono<List>, then we use block() method to subscribe to the Mono and block it.

List<String> list2 = flux.collectSortedList().block();
list2.forEach(System.out::println);

Result:

meta_0:Java Tutorial
meta_1:Project Reactor
website_0:bezkoder.com

Flux to List without block()

If you don’t want to use block(), you can make it work with Java Disposable.subscribe() method.

List<String> list3 = new ArrayList<>();
flux.collectList().subscribe(list3::addAll);
list3.forEach(System.out::println);

You should know that converting a Flux to a List/Stream makes the whole thing NOT reactive. So the subscribe() method doesn’t help you keep away from blocking. You may or may not want this depending on the use-case.

The block() or subscribe() method won’t return anything if the Flux is infinite. For example:

// never return
Flux.interval(Duration.ofMillis(1000)).collectList().block();

// never return also
Flux.interval(Duration.ofMillis(1000)).collectList().subscribe();

To prevent blocking indefinitely or for too long, just use Duration as a parameter of block() like this: block(Duration.ofMillis(1000)).

If the subscription does not complete on time, it will throw a timeout exception.

Flux to Map Conversion

Flux collectMap()

Function prototype:

Mono<Map> collectMap(keyExtractor, valueExtractor)

– First, the function converts sequence into a Mono<Map>.
– Finally, the Mono becomes real List/Map by block() method.

Map<String, String> map1 = flux
		.collectMap(
				item -> item.split(":")[0], 
				item -> item.split(":")[1])
		.block();
map1.forEach((key, value) -> System.out.println(key + " -> " + value));

Result:

website -> bezkoder.com
meta_1 -> Project Reactor
meta_0 -> Java Tutorial

Flux collectMultimap()

Function prototype:

Mono<Map<Object, Collection>> collectMultimap(keyExtractor, valueExtractor)

collectMultimap(): convert sequence into a Mono<Map> that each Map’s key can be paired with multi-value (in a Collection).

For example, we’re gonna get a Map with website and meta as keys:

Map<String, Collection<String>> map2 = flux
		.collectMultimap(
				item -> item.split("_[0-9]+:")[0], 
				item -> item.split(":")[1])
		.block();
map2.forEach((key, value) -> System.out.println(key + " -> " + value));

Check the result:

website -> [bezkoder.com]
meta -> [Java Tutorial, Project Reactor]

Implementation

Technology

– Java 8
– Maven 3.6.1
– Reactor Core 3.4.0 with the 2020.0.1 release train.

Source Code

package com.bezkoder.reactor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import reactor.core.publisher.Flux;

public class FluxCollection {
	public static void main(String[] args) {
		Flux<String> flux = Flux.just(
				"website_0:bezkoder.com",
				"meta_0:Java Tutorial",
				"meta_1:Project Reactor");

		System.out.println(">>> flux.collectList() >>>");
		List<String> list1 = flux.collectList().block();
		list1.forEach(System.out::println);

		System.out.println("\n>>> flux.collectSortedList() >>>");
		List<String> list2 = flux.collectSortedList().block();
		list2.forEach(System.out::println);

		System.out.println("\n>>> flux to list without block >>>");
		List<String> list3 = new ArrayList<>();
		flux.collectList().subscribe(list3::addAll);
		list3.forEach(System.out::println);

		System.out.println("\n>>> flux.collectMap() >>>");
		Map<String, String> map1 = flux
				.collectMap(
						item -> item.split(":")[0], 
						item -> item.split(":")[1])
				.block();
		map1.forEach((key, value) -> System.out.println(key + " -> " + value));

		System.out.println("\n>>> flux.collectMultimap() >>>");
		Map<String, Collection<String>> map2 = flux
				.collectMultimap(
						item -> item.split("_[0-9]+:")[0], 
						item -> item.split(":")[1])
				.block();
		map2.forEach((key, value) -> System.out.println(key + " -> " + value));

		System.out.println("\n>>> flux to map without block >>>");
		Map<String, Collection<String>> map3 = new HashMap<>();
		flux.collectMultimap(
				item -> item.split("_[0-9]+:")[0],
				item -> item.split(":")[1])
			.subscribe(map3::putAll);
		map3.forEach((key, value) -> System.out.println(key + " -> " + value));
	}
}

The Result

>>> flux.collectList() >>>
website_0:bezkoder.com
meta_0:Java Tutorial
meta_1:Project Reactor

>>> flux.collectSortedList() >>>
meta_0:Java Tutorial
meta_1:Project Reactor
website_0:bezkoder.com

>>> flux to list without block() >>>
website_0:bezkoder.com
meta_0:Java Tutorial
meta_1:Project Reactor

>>> flux.collectMap() >>>
meta_1 -> Project Reactor
meta_0 -> Java Tutorial
website_0 -> bezkoder.com

>>> flux.collectMultimap() >>>
website -> [bezkoder.com]
meta -> [Java Tutorial, Project Reactor]

>>> flux to map without block() >>>
website -> [bezkoder.com]
meta -> [Java Tutorial, Project Reactor]

Appendix: Getting Reactor

Reactor installation in Maven

– First, import the BOM by adding the following to pom.xml:

<dependencyManagement> 
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2020.0.1</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

– Next, add dependency:

<dependencies>
  <dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
  </dependency>
</dependencies>

Reactor installation in Gradle

– First, apply the plugin from the Gradle Plugin Portal:

plugins {
  id "io.spring.dependency-management" version "1.0.7.RELEASE" 
}

– Next use dependency-management to import the BOM:

dependencyManagement {
  imports {
    mavenBom "io.projectreactor:reactor-bom:2020.0.1"
  }
}

– Finally, add dependency:

dependencies {
  implementation 'io.projectreactor:reactor-core' 
}

Further Reading

More Practice: Spring Boot WebFlux example: Building Rest API

3 thoughts to “Convert Flux into List, Map – Reactor”

  1. Your example is on a flux with limited items.
    If the flux is endless and I only want to get 10 items from the flux, how to do that?
    I don’t want to use take(), as it cancels the flux.

    Thanks for any solution!

Comments are closed to reduce spam. If you have any question, please send me an email.