spring-webflux

Author Avatar
kevin
发表:2025-01-22 02:44:00
修改:2024-10-09 00:44:25

spring-webflux是spring在5.0版本后提供的一套响应式编程风格的web开发框架

响应式

当调用一个api获取数据时,无需阻塞等待数据返回,而是当有数据返回时会进行告知。可见响应式是非阻塞的,意味着调用方法后,CPU可以去做别的事情,当接收到数据响应时CPU再回来处理,这种方式提高了系统的吞吐量

Reactor

Spring-webflux框架是基于Reactor这个开源项目开发的。Reactor框架是跟Spring紧密配合的。

提供了两种API类型,分别是Mono和Flux;

 Mono一般作用于单个对象
 		Mono<UserEntity> user = userService.getUserById(userId);
 Flux一般作用于多个对象
 		Flux<UserEntity> user  = userService.getAllUser();

并发模型

webmvc和webflux都支持使用注解来定义一个Controller,但是其实现方式完全不同

webmvc

是一个Servlet应用,实现是阻塞式IO,其维护一个线程池来处理每一个用户请求,也就是当Servlet容器启动时,就会创建比如10个线程出来,因此系统吞吐量的瓶颈在于有限的连接数和阻塞的请求处理过程

webflux

可以基于netty这样的NIO网络框架,它只需要很少的几个工作线程(Event loop worker)就能够处理并响应请求。由于无需阻塞等待方法返回,CPU资源就得到了更好的利用。

注意

webflux并不能让程序运行地更快;而是提高了并发处理请求的能力,即提高了系统吞吐量

代码示例

Spring-webflux是默认使用Netty提供HTTP服务

Netty started on port(s): 8099

首先创建一个 SpringBoot 项目

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
         <version>2.3.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
     </parent>
    <groupId>com.yee.kevin</groupId>
    <artifactId>yee.kevin</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
   <dependency>
          <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-webflux</artifactId>
   </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-r2dbc</artifactId>
    </dependency>
        <!--   连接池 -->
        <dependency>
            <groupId>io.r2dbc</groupId>
            <artifactId>r2dbc-pool</artifactId>
        </dependency>
        <!--mysql 驱动 确保 Spring Boot 的版本大于等于 2.3.0,在此版本之后才开始支持 MYSQL 的响应式驱动-->
        <dependency>
            <groupId>dev.miku</groupId>
            <artifactId>r2dbc-mysql</artifactId>
        </dependency>
        <dependency>
           <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
           <scope>test</scope>
         <exclusions>
             <exclusion>
                 <groupId>org.junit.vintage</groupId>
             <artifactId>junit-vintage-engine</artifactId>
          </exclusion>
      </exclusions>
       </dependency>
          <dependency>
             <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
           </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.10</version>
            <scope>provided</scope>
        </dependency>
     </dependencies>

   <build>
       <plugins>
          <plugin>
               <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-maven-plugin</artifactId>
       </plugin>
    </plugins>
 </build>
</project>

定义一个对象

@Data
@Table("user")
public class UserEntity {
    @Id
    private Long id;
    private String name;
    private String phone;
    private String mail;
}

定义处理层

mvc一般是controller层,webflux是handler层

@RestController
public class UserController {

    @Autowired
    private UserService userService;


    public Mono<ServerResponse> getAllUser(ServerRequest request){
        Flux<UserEntity> user  = userService.getAllUser();
        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(user,UserEntity.class);
    }

    public Mono<ServerResponse> getUser(ServerRequest request){
        Long userId = Long.parseLong(request.pathVariable("id"));
        Mono<UserEntity> user = userService.getUserById(userId);
        return user.flatMap(person ->
                ServerResponse.ok()
                        .contentType(MediaType.APPLICATION_JSON)
                        .bodyValue(user)
                        .switchIfEmpty(ServerResponse.notFound().build()));
    }

    public Mono<ServerResponse> createUser(ServerRequest request) {
        Mono<UserEntity> user = request.bodyToMono(UserEntity.class);
        Mono<Void>  voidMono = userService.saveUserInfo(user);
        return ServerResponse.ok()
                .build(voidMono);
    }
}

route路由映射

响应式风格中不再使用@RequestMapping声明地址映射了

而是通过RouterFunctions.route().xxx()方法 xxx表示get或者post等方法

@Configuration
public class UserRoute {



    @Resource
    private PersonController personController;
    @Resource
    private UserController userController;

    @Bean
    public RouterFunction<ServerResponse> personRoute(){
        return  RouterFunctions.route()
                .GET("/user/{id}", RequestPredicates.accept(MediaType.APPLICATION_JSON),userController::getUser)          .GET("/user",RequestPredicates.accept(MediaType.APPLICATION_JSON),userController::getAllUser)
                .POST("/user",userController::createUser)
                .build();
    }
}

Service层

@Service
public class UserServiceImpl implements UserService {

    @Autowired
    private UserDao userDao;


    @Override
    public Flux<UserEntity> getAllUser() {
        return userDao.findAll();
    }

    @Override
    public Mono<UserEntity> getUserById(Long UserId) {
        return userDao.findById(UserId);
    }

    @Override
    public Mono<Void> saveUserInfo(Mono<UserEntity> userEntityMono) {
        return userDao.saveAll(userEntityMono).thenEmpty(Mono.empty());
    }
}

不支持mysql

因为WebFlux是响应式的,要想发挥出WebFlux的性能就得将代码全改成响应式的,而JDBC目前是没支持的(至少MySQL还没支持)

响应式数据库

Spring Data R2DBC运用熟悉的Spring抽象和repository 支持R2DBC

目前支持: Postgres、H2、Microsoft SQL Server 、MySQL、MariaDB

确保 Spring Boot 的版本大于等于 2.3.0,在此版本之后才开始支持 MYSQL 的响应式驱动

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>

<!--   连接池 -->
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-pool</artifactId>
</dependency>

<!--mysql 驱动-->
<dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
</dependency>

配置文件

application.yml配置文件添加mysql的连接

server:
  port: 8099
spring:
  r2dbc:
    name: r2dbc
    url: r2dbcs:mysql://127.0.0.1:3306/student_user
    username: root
    password: 123456
    pool:
      enabled: true
      validation-query: SELECT 1

替代mybatis

由于不支持mysql所以使用不了mybatis

单表增删改查使用ReactiveCrudRepository

新建Dao层

可以在其中 使用@Query执行特定的SQL

@Repository
public interface UserDao extends ReactiveCrudRepository<UserEntity,Long> {

    @Query("SELECT * FROM user WHERE id = :id")
    Flux<UserEntity> findDateById(Long id);

    @Query("UPDATE user SET name = :name WHERE id = :id")
    Mono<Integer> updateNameById(String name, Long id);
}

启动类

@SpringBootApplication
public class UserApplication {
    public static void main(String[] args) {
        SpringApplication.run(UserApplication.class);
    }
}
评论