响应式编程WebFlux-SpringBoot(5)

1.响应式编程

  WebFlux是从Spring Framework 5.0开始引入响应式Web框架的。与Spring MVC不同, WebFlux不需要Servlet API,在完全异步且无阻塞,并通过Reactor项目实现Reactive Streams 规范。

  WebFlux可以在资源有限的情况下提高系统的吞吐量和伸缩性(不是提高性能)。这意味着, 在资源相同的情况下,WebFlux可以处理更多的请求(不是业务)。

  WebFlux除支持RESTful Web服务外,还可以用于提供动态HTML内容。

   MVC:

  MVC的工作流程是:主线程接收到请求(request)-准备数据-返回数据。整个过程是单线程咀塞的,用户会感觉等待时间长是因为,在结果处理好之后才返回数据给浏 览器。因此,如果请求很多,则吞吐量就上不去。

  WebFlux:

  WebFlux的工作流程是:主线程接收到请求—立刻返回数据与函数的组合(Mon或Flux, 不是结果),开启一个新Work线程去做实际的数据准备工作,进行真正的业务操作,Work 线程完成工作,返给用户真实数据(结果)。

  这种方式给人的感売是响应时间很短,因为返回的是不变的常数,它不随用户数量的增加而变化。

两者对比:

2.Mono和Flux

Mono和Flux是Reactor中的两个基本概念。

  • Mono和Flux属于事件发布者,为消费者提供订阅接口。当有事件发生时,Mono或Flux 会回调消费者的相应方法,然后通知消费者相应的事件。这也是晌应式编程模型。
  • Mono和Flux用于处理异步数据流,它不像MVC中那样直接返回String/List,而是将异步数据流包装成Mono或Flux对象

Mono和Flux的区别

  1. Flux可以发送很多item,并且这些item可以经过若干算子(operators )后才被订阅。 Mono只能发送一个itemo
  2. Mono主要用于返回单个数据。Flux用于返回多个数据。如果要根据id查询某个User对象,则返回的肯定是单个User,那么需要将其包装成Mono<User>,若需要获取所有User (这是一个集合),则需要将这个集合包装成Flux<User> 这里的单个数据并不是指一个数据,而是指封装好的一个对象。多个数据就是多个对象。
  3. Mono表示包含0或1个元素的异步序列。在该序列中可以包含3种不同类型的消息通知: 正常的包含元素的消息、序列结束的消息、序列岀错的消息。当消息通知(正常的包含元素的消息、 序列结束的消息、序列出错的消息)产生时,订阅者中有对应的方法onNext( )、onComplete( ). onError( )被调用
  4. Flux表示的是包含0到N个元素的异步序列,在该序列中可以包含与Mono相同的3种类型的消息通知。

  5. Flux和Mono之间可以进行转换。对一个Flux序列进行计数操作时,得到的结果是一个 Mono<Long>对象。把多个Mon。序列合并在一起,得到的是一个Flux对象

开发WebFlux的流程

  1. 注解式开发流程

  WebFlux是响应式框架,其中使用的注解式开发方式只是Spring团队为了更好地迁移而提供的。和MVC开发模式一样,地址映射也是通过@RequestMapping提供的,用@Controller或 @RestController 来代替 Handler 类。

  2.响应式开发流程  

    (1)创建 Handler 类

    这里的Handler类相当于Spring MVC的Controller层中的方法体。在响应式编程中,请求和响应不再是 HttpServletRequest 和 HttpServletResponse,而是变成了 ServerRequest 和 ServerResponse

    (2)配置 RouterFunction

    RouterFunction 和注解@RequestMapping 相似,都用于提供 URL 路径。RouterFunction 的格式也是固定的,第1个参数代表路径,第2个参数代表方法,合起来代表将URL映射到方法。

实例:Hello world

     配置WebFlux依赖:

 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

      编写控制器:

 @RestController
public class HelloWorldController {
@GetMapping("/helloworld")
public Mono<String> helloworld(){
return Mono.just("这是响应式编程");
}
}

   实例: 用注解式开发实现数据的增加、删除、修改和查询

     创建实体类:

 import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
private long id;
private String name;
private int age;
}

     编写控制器:

 package com.itheima.controller;
import com.itheima.domain.User;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

@RestController
@RequestMapping("/user")
public class UserController {
Map<Long, User> users = new HashMap<Long, User>();
@PostConstruct
public void init() throws Exception {
users.put(Long.valueOf(1),new User(1,"John",28));
users.put(Long.valueOf(2),new User(2,"ane",29));
}
@GetMapping("/list")
public Flux<User> getAll() throws Exception {
return Flux.fromIterable(users.entrySet().stream()
.map(entry -> entry.getValue())
.collect(Collectors.toList()));
}
@GetMapping("/{id}")
public Mono<User> getUserById(@PathVariable Long id) throws Exception {
return Mono.justOrEmpty(users.get(id));
}
@PostMapping("")
public Mono<ResponseEntity<String>> addUser(User user) throws Exception {
users.put(user.getId(), user);
return Mono.just(new ResponseEntity<>("添加成功", HttpStatus.CREATED));
}
@PutMapping("/{id}")
public Mono<ResponseEntity<User>> updateUser(@PathVariable Long id,User user) throws Exception {
user.setId(id);
users.put(id, user);
return Mono.just(new ResponseEntity<>(user,HttpStatus.CREATED));
}
@DeleteMapping("/{id}")
public Mono<ResponseEntity<String>> deleteUser(@PathVariable Long id){
users.remove(id);
return Mono.just(new ResponseEntity<>("删除成功",HttpStatus.ACCEPTED));
}
}

看下数据转换过程:

 Map<Long, User> users:{1=User(id=1, name=John, age=28), 2=User(id=2, name=ane, age=29)}
users.entrySet():[1=User(id=1, name=John, age=28), 2=User(id=2, name=ane, age=29)]
users.entrySet().stream().toString():java.util.stream.ReferencePipeline$Head@785a5991
users.entrySet().stream()
.map(entry -> entry.getValue()).toString():java.util.stream.ReferencePipeline$3@748631e4
users.entrySet().stream()
.map(entry -> entry.getValue())
.collect(Collectors.toList()):[User(id=1, name=John, age=28), User(id=2, name=ane, age=29)]
可以看出entrySet()将map转为了set集合,entrySet() 方法返回映射中包含的映射的 Set 视图。

对应数据类型:

 Map<Long, User> users:type:class java.util.HashMap
users.entrySet():type:class java.util.HashMap$EntrySet
users.entrySet().stream():type:class java.util.stream.ReferencePipeline$Head
users.entrySet().stream()
.map(entry -> entry.getValue()):type:class java.util.stream.ReferencePipeline$3
users.entrySet().stream()
.map(entry -> entry.getValue())
.collect(Collectors.toList()):type:class java.util.ArrayList

users.entrySet().stream()
    返回支持顺序和并行聚合操作的元素序列。以下示例说明了使用Stream和IntStream的聚合操作:
     int sum = widgets.stream()
                      .filter(w -> w.getColor() == RED)
                      .mapToInt(w -> w.getWeight())
                      .sum();
 
  在此示例中, widgets是Collection<Widget> 。我们通过Collection.stream()创建一个Widget对象流,对其进行过滤以生成仅包含红色小部件的流,然后将其转换为表示每个红色小部件重量的int值流。然后将该流相加以产生总重量。 除了作为对象引用流的Stream之外,还有IntStream 、 LongStream和DoubleStream的原始特化,所有这些都被称为“流”,并符合此处描述的特征和限制。 为了执行计算,流操作被组合成一个流管道。流管道由源(可能是数组、集合、生成器函数、I/O 通道等)、零个或多个中间操作(将流转换为另一个流,例如filter(Predicate) ) 和终端操作(产生结果或副作用,例如count()或forEach(Consumer) )。流是懒惰的;仅在发起终端操作时才对源数据进行计算,并且仅在需要时消耗源元素。
  集合和流虽然有一些表面上的相似之处,但有不同的目标。馆藏主要关注其元素的有效管理和访问。相比之下,流不提供直接访问或操作其元素的方法,而是关注以声明方式描述其源以及将在该源上聚合执行的计算操作。但是,如果提供的流操作不提供所需的功能,则可以使用iterator()和spliterator()操作来执行受控遍历。
流管道,如上面的“小部件”示例,可以被视为对流源的查询。除非源明确设计用于并发修改(例如ConcurrentHashMap ),否则在查询流源时修改流源可能会导致不可预测或错误的行为。
大多数流操作接受描述用户指定行为的参数,例如上面示例中传递给mapToInt的 lambda 表达式w -> w.getWeight() 。

  为了保持正确的行为,这些行为参数: 必须是无干扰的(它们不会修改流源);和 在大多数情况下必须是无状态的(它们的结果不应依赖于在流管道执行期间可能更改的任何状态)。 此类参数始终是函数接口的实例,例如Function ,并且通常是 lambda 表达式或方法引用。除非另有说明,否则这些参数必须为非 null 。 一个流应该只被操作一次(调用一个中间或终端流操作)。例如,这排除了“分叉”流,其中相同的源提供两个或多个管道,或同一流的多次遍历。如果流实现检测到流正在被重用,它可能会抛出IllegalStateException 。但是,由于某些流操作可能返回其接收者而不是新的流对象,因此可能无法在所有情况下检测重用。 流有一个close()方法并实现了AutoCloseable ,但几乎所有的流实例在使用后实际上并不需要关闭。通常,只有源为 IO 通道的流(例如由Files.lines(Path, Charset)返回的流)才需要关闭。大多数流由集合、数组或生成函数支持,不需要特殊的资源管理。 (如果流确实需要关闭,可以在try -with-resources 语句中将其声明为资源。) 流管道可以按顺序或并行执行。这种执行模式是流的属性。流是通过初始选择顺序或并行执行来创建的。 (例如, Collection.stream()创建一个顺序流, Collection.parallelStream()创建一个并行流。)这种执行模式的选择可以通过sequential()或parallel()方法进行修改,并且可以通过以下方式查询isParallel()方法。

     users.entrySet().stream()
                .map(entry -> entry.getValue())
                .collect(Collectors.toList()):

  例如上方代码将返回map中包含的映射的set视图(即set集合),然后创建一个set引用流,将引用流中的数据取出,然后使用Collector对此流的元素执行可变归约操作,最后效果为将数据取出,并去掉了键,保留了值,转为了List数组。

   Collector封装了用作collect(Supplier, BiConsumer, BiConsumer)参数的函数,允许重用收集策略和组合收集操作,例如多级分组或分区。
  如果流是并行的,并且Collector是concurrent的,并且流是无序的或收集器是unordered的,则将执行并发归约(有关并发归约的详细信息,请参阅Collector 。)
这是终端操作。
  当并行执行时,可以实例化、填充和合并多个中间结果,以保持可变数据结构的隔离。因此,即使与非线程安全的数据结构(例如ArrayList )并行执行,也不需要额外的同步来进行并行归约。

  最后返回collect

     测试API功能

(1)获取数据

  启动项目后访问URL:http://localhost:8080/user/list ,会得到两个初始化的数据,输岀结果:[{"id":1,"name":"John","age":28},{"id":2,"name":"ane","age":29}]

  访问URL:http://localhost:8080/user/1,得到的是id为1的单个对象,见输出结果:{"id":1,"name":"John","age":28}

(2)修改数据

  修改数据的方法是通过PUT方式访问"http://localhost:8080/user/1",提交相应的name和age字段来修改内容(使用的是测试工具Postman ),如下所示。稍后会提示修改成功。

  

(3)添加数据

  可以通过POST方式提 User对象来添加数据。

  

(4)删除数据

  

   实例:用晌应式开发方式开发WebFlux

     (1)编写处理器类Handler

Handler相当于MVC中的Controller 用于提供实现功能的方法,代码如下:

 package com.itheima.handler;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

@Component
public class HelloWorldHandler {
public Mono<ServerResponse>sayHello(ServerRequest serverRequest){
return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body(Mono.just("你好"),String.class);
}
}

     (2)编写路由器类Router

Router的主要功能是提供路由映射,相当于MVC模式中的注解@RequestMapping

 package com.itheima.router;
import com.itheima.handler.HelloWorldHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@Configuration
public class Router {
@Autowired
private HelloWorldHandler helloWorldHandler;
@Bean
public RouterFunction<ServerResponse> getString(){
return route(GET("/helloWorld"),req->helloWorldHandler.sayHello(req));
}
}

上述代码中,通过

return route(GET("/helloWorld"),req-&gt;helloWorldHandler.sayHello(req));
  来指定路由,包含HTTP方法和对应的功能方法。

   实例:用WebFlux模式操作MongoDB数据库,实现数据的增加、删除、修改和查询的功能

     (1)添加依赖

    要操作数据库,则需要添加相应的依赖。可以通过Spring Boot集成的MongoDB的Starter 依赖来快速实现配置和操作。具体依赖见以下代码

 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

在配置文件中配置MongoDB的地址信息(MongoDB 2.4以上版本),见以下代码:

spring.data.mongodb.uri=mongodb://127.0.0.1:27017/test

配置Mongo的格式如下:

spring.data.mongodb.uri=mongodb://用户名: 密码@ip 地址:端口号/数据库

     (2)创建实体类

    这里编写实体类并没有特别需要讲解的,只是利用了 Lombok插代码,具体见以下代码:

 package com.itheima.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
private long id;
private String name;
private int age;
}

     (3)编写接口

     Spring Boot 的 Starter 提供了 ReactiveMongoRepository 接口,用于操作 Mongo 数据库, 用法见以下代码:  

 package com.itheima.dao;
import com.itheima.domain.User;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

public interface UserRepository extends ReactiveMongoRepository<User,String> {
}

     (4)编写API

 package com.itheima.controller;
import com.itheima.dao.UserRepository;
import com.itheima.domain.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.PostConstruct;
import javax.validation.Valid;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

@RestController
@RequestMapping("/user")
public class UserController {
@Autowired
private UserRepository userRepository;
@GetMapping("/list")
public Flux<User> getAll(){
return userRepository.findAll();
}
@GetMapping(value = "/listdelay",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<User> getAllDelay(){
return userRepository.findAll().delayElements(Duration.ofSeconds(1));
}
@GetMapping("/{id}")
public Mono<ResponseEntity<User>> getUser(@PathVariable String id){
return userRepository.findById(id)
.map(getUser->ResponseEntity.ok(getUser))
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@PostMapping("")
public Mono<User> createUser(@Valid User user){
return userRepository.save(user);
}
@PutMapping("/{id}")
public Mono updateUser(@PathVariable(value = "id")String id,@Valid User user){
return userRepository.findById(id)
.flatMap(existingUser->{
existingUser.setName(user.getName());
return userRepository.save(existingUser);
})
.map(updateUser->new ResponseEntity<>(updateUser,HttpStatus.OK))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
}
  • produces = MediaType.APPLICATION_STREAM_JSON_VALUE:这里媒体类型必须是APPLICATION_STREAM_JSON_VALUE,否则调用端无法滚动得到结果,将一直 阻塞直到数据流结束或超时。
  • Duration.ofSeconds(1):代表一秒一秒地返回数据,而不是一下全部返回。
  • ResponseEntity.ok, ResponseEntity继承了 HttpEntity,是 HttpEntity 的子类,且可以添加HttpStatus状态码。
  • flatMap:返回的是迭代器中的元素。
  • HttpStatus: NOT_FOUND:代表HTTP状态是404,表示没有找到。
  • HttpStatus.OK:代表HTTP状态是200,表示处理成功

 

标签: Java

添加新评论