Fork me on GitHub
余鸢

Going reactive Spring Data

Reactive Repositories

存储库编程模型是Spring Data用户通常处理的最高级别的抽象。 它们通常由Spring Data提供的接口中定义的一组CRUD方法和特定于域的查询方法组成。 这里是一个reactive Spring Data存储库的定义:

1
2
3
4
5
6
7
public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, String> {
Flux<Person> findByLastname(Mono<String> lastname);
@Query("{ 'firstname': ?0, 'lastname': ?1}")
Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);
}

正如你可以看到,与你习惯的差别不大。 然而,与传统的存储库接口相反,反应式存储库使用 reactive 类型作为返回类型,也可以对参数类型使用。 在新引入的ReactiveCrudRepository中的CRUD方法,当然也使用这些类型。

默认情况下,reactive repositories使用 Project Reactor 类型,但也可以使用其他reactive repositories。 我们为这些提供自定义的库基础接口(例如RxJava1CrudRepository),并且还根据查询方法的需要自动调整类型,例如RxJava的ObservableSingle。 其余基本保持不变。 但是,请注意,当前的里程碑不支持分页,当然你必须在类路径上有必要的reactive存储库来激活对特定库的支持。

激活reactive Spring Data

类似于我们在blocking world中,通过@ Enable ...注解和一些基础结构设置激活对reactive Spring Data的支持:

1
2
3
4
5
6
7
8
9
10
11
12
13
@EnableReactiveMongoRepositories
public class AppConfig extends AbstractReactiveMongoConfiguration {
@Bean
public MongoClient mongoClient() {
return MongoClients.create();
}
@Override
protected String getDatabaseName() {
return "reactive";
}
}

看看我们如何使用不同的基类用于基础架构配置,因为我们需要使用MongoDB异步驱动程序。

使用reactive repositories

存储库现在可以像blocking repository一样使用,只不过现在可以以reactive方式处理结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@RestController
class PersonController {
private final PersonRepository people;
public PersonController(PersonRepository people) {
this.people = people;
}
@GetMapping("/people")
Flux<String> namesByLastname(@RequestParam Mono<String> lastname) {
Flux<Person> result = repository.findByLastname(lastname);
return result.map(it -> it.getFullName());
}
}

看看我们如何转发由Spring Web Reactive提供的reactive参数,将它们管理到库中,依次获取Flux,然后以reactive方式处理执行结果。 一般来说,reactive查询方法遵循与已知存储库相同的查询创建思想。 传递给查询方法的参数可以是plain(例如String)wrapped(Optional<String>, Stream<String>)或reactive包装参数(Mono<String>, Flux<String>)。 如果你使用reactive包装器一个参数类型,实现将推迟实际的查询创建和执行,直到实际订阅。

Reactive模板

正如传统的存储库基于传统的模板实现一样,被动的存储库建立在reactive模板之上。 blocking template API中可用的大多数操作在reactive模板中具有对应部分。 我们要将blocking world的更多特性传递到reactive template APIs中,但是有些操作根本不可能通过reactive式驱动程序(或者)在reactive world中没有意义。

下面是Spring Data MongoDB中ReactiveMongoOperations的摘录。 它由ReactiveMongoTemplate实现,并使用 Project Reactor的reactive类型,如MonoFlux包装响应。 一些方法还接受将数据流式传输到数据存储中的reactive类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public interface ReactiveMongoOperations {
// …
/**
* Map the results of an ad-hoc query on the specified collection to a
* single instance of an object of the specified type.
*/
<T> Mono<T> findOne(Query query, Class<T> entityClass);
/**
* Map the results of an ad-hoc query on the collection for the entity
* class to a List of the specified type.
*/
<T> Flux<T> find(Query query, Class<T> entityClass);
/**
* Insert the object into the specified collection.
*/
<T> Mono<T> insert(T objectToSave, String collectionName);
/**
* Insert the object into the collection for the entity type of the object
* to save.
*/
<T> Mono<T> insert(Mono<? extends T> objectToSave);
// …
}

注意,所有方法都遵循reactive执行模型,在调用时不执行任何包含任何I / O的操作,而只是在对返回值进行订阅时。

让我们通过模板插入一些数据:

1
2
3
4
5
6
Flux<Person> flux = Flux.just(new Person("Walter", "White"),
new Person("Skyler", "White"),
new Person("Saul", "Goodman"),
new Person("Jesse", "Pinkman"));
template.insertAll(flux).subscribe();

一些方法,如insertAll(…) - 接收reactive类型,以异步方式将传入数据流入你的MongoDB数据库,例如。 来自你在Spring Web Reactive控制器中收到的Flux,它将通过Jackson异步映射JSON数组:

1
2
3
4
5
@PostMapping("/people")
Flux<People> namesByLastname(@RequestBody Flux<Person> people) {
return template.insertAll(people);
}

正如你可以看到的,存储库和模板API允许你以reactive, non-blocking的方式描述请求处理。 也就是说,让我们深入了解Redis对reactive式数据访问的支持。

与Spring Data Redis的Reactive Connections

Spring Data Redis在连接级别上提供了initial reactive式支持,目前仅在Lettuce上,因为它是唯一支持reactive 数据访问的Redis驱动程序。 由于Redis通常用于低得多的抽象级别,Kay M1版本从低级别上的reactive式抽象开始。 LettuceConnectionFactory允许访问ReactiveRedisConnection,而ReactiveRedisConnection又提供对Redis命令的反应版本的访问

功能链接与运算符创建链以reactive方式访问Redis数据。 同样,所有I / O都是异步的。

1
2
3
4
5
ReactiveKeyCommands keyCommands = connection.keyCommands();
keyCommands.randomKey()
.flatMap(keyCommands::type)
.flatMap(System.out::println)
.subscribe();

此代码获取随机key并打印其数据类型。 不存在的随机key完成为空Mono

Reactive Redis命令有两种类型:接收纯参数和接收命令发布者。 命令发布器发出特定的Redis命令以将数据直接传输到Redis中。 每次发出的命令在命令执行后发出命令响应。

1
2
3
4
5
6
7
8
9
10
public interface ReactiveStringCommands {
// …
Mono<Boolean> set(ByteBuffer key, ByteBuffer value);
Flux<BooleanResponse<SetCommand>> set(Publisher<SetCommand> commands);
// …
}

传统Spring Data Redis在其blocking API上使用byte[]来交换数据。 如果数据在缓冲区(如ByteBuffer或Netty的ByteBuf)中可用,则byte[]强制数据复制。 Reactive 支持了很多关于高效的资源使用情况,所以我们决定公开方法接收并返回ByteBuffer的方法。

Summary

我希望这篇博文给你介绍了Kay在各种抽象级别提供的reactive 特性。 你可以在我们的示例存储库中找到所有可执行示例。

翻译自:

https://spring.io/blog/2016/11/28/going-reactive-with-spring-data