网站首页 > java教程 正文
应用响应式Web开发组件
当下,对于具有广大用户群体的新型互联网应用而言,它们基本都需要考虑如何高效应对用户流量、如何确保系统弹性等核心技术主题。
在理论和实践的结合下,响应式编程是一种新型的编程模型,是确保系统弹性的一款强有力的武器。在响应式编程领域,存在一套完整的响应式流规范以及实现这一规范的开发工具。在现实中,开发人员通常不会直接使用这些偏底层的开发工具来开发应用程序,而是借助于特定的开发框架。而我们日常开发中每天都在使用的Spring就是这样一个支持响应式编程的开发框架。
在2017年,Spring发布了新版本Spring 5,这是自Spring 4发布以来将近4年的时间中所发布的第一个全新版本。Spring 5引入了很多核心功能,重要的是它全面拥抱了响应式编程的设计思想和实践。
Spring 5的响应式编程模型以Project Reactor库为基础,而后者则实现了响应式流规范。事实上,Spring Boot从2.x版本开始全面依赖Spring 5。
Spring Boot为我们提供了一系列响应式编程组件,而本章将重点关注如何使用Spring Boot框架来开发响应式Web服务。
响应式编程和Spring Boot
响应式编程是一种新的编程技术,其目的是构建响应式系统。对于响应式系统而言,任何时候都需要确保其具备即时响应性,这是大多数日常业务场景所需要的,但却是一项非常复杂而有挑战性的任务,需要对相关技术有深入的了解。本节将讨论这些技术。
响应式流规范和实现框架
对于响应式编程而言,首先要明确的概念是数据流(Data Stream)。简单来讲,所谓的流就是由生产者生产并由一个或多个消费者消费的元素序列。而一旦有了数据流,那么就势必面临流量控制问题。流量控制是讨论数据流的核心话题。而针对如何控制流量,业界存在一个响应式流规范,以及一批实现了该规范的开发工具。
1. 响应式流规范
Java API版本的响应式流只包含四个接口,即Publisher<T>、
Subscriber<T>、Subscription和Processor<T,R>。
发布者(Publisher)是潜在的包含无限数量的有序元素的生产者,它根据收到的请求向当前订阅者发送元素。Publisher<T>接口定义如代码清单5-1所示。
代码清单5-1 Publisher<T>接口定义代码
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}订阅者(Subscriber)从发布者那里订阅并接收元素。发布者向订阅者发送订阅令牌(Subscription Token)。通过订阅令牌,订阅者就可以向发布者请求多个元素。当元素准备就绪时,发布者就会向订阅者发送合适数量的元素。然后订阅者可以请求更多的元素,发布者也可能有多个来自订阅者的待处理请求。Subscriber <T>接口定义如代码清单5-2所示。
代码清单5-2 Subscriber<T>接口定义代码
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
当执行发布者的subscribe()方法时,发布者会回调订阅者的onSubscribe()方法。在这个方法中,通常订阅者会借助传入的Subscription对象向发布者请求n个数据。然后发布者通过不断调用订阅者的onNext()方法向订阅者发出最多n个数据。如果数据全部发完,则会调用onComplete()方法告知订阅者流已经发完;如果有错误发生,则通过onError()方法发出错误提示消息,这时同样也会终止数据流。
订阅(Subscription)表示订阅者订阅的一个令牌。当订阅请求成功时,发布者将其传递给订阅者。订阅者使用订阅令牌与发布者进行交互,例如请求更多的元素或取消订阅。Subscription接口定义如代码清单5-3所示。
代码清单5-3 Subscription接口定义代码
public interface Subscription {
public void request(long n);
public void cancel();
}
当发布者调用subscribe()方法注册订阅者时,会通过订阅者的回调方法onSubscribe()传入Subscription对象,之后订阅者就可以使用这个Subscription对象的request()方法向发布者请求数据。
处理器(Processor)充当订阅者和发布者之间的转换器(Transformer)。Processor<T,R>订阅类型T的数据元素,接收并转换为类型R的数据,发布该数据。Processor接口同时继承了Publisher和Subscriber接口,其定义如代码清单5-4所示。
代码清单5-4 Processor接口定义代码
public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
上述四个接口是各个响应式开发库之间互相实现兼容的桥梁,响应式流规范也仅仅聚焦于此,而对诸如转换、合并、分组等的操作一概未做要求,因此是一个非常抽象且精简的接口规范。
作为总结,我们可以把响应式流规范核心接口的交互方式梳理成图5-1。
可以看到,图5-1中所示的交互方式一共包含如下7个步骤。
1)当发布者使用subscribe()方法实现对该发布者的订阅时,首先会创建一个具有相应逻辑的Subscription对象,这个Subscription对象定义了如何处理请求,以及如何发出数据。
2)然后发布者将这个Subscription通过订阅者的onSubscribe()方法传给订阅者。
3)在订阅者的onSubscribe()方法中,需要通过Subscription的request()方法发起第一次请求。
4)Subscription收到请求,就可以通过回调订阅者的onNext()方法发出元素,有多少发多少,但不能超过请求的个数。
5)订阅者在onNext()方法中通常定义对元素的处理逻辑,处理完成之后,可以继续发起请求。
6)发布者根据需要继续满足订阅者的请求。
7)如果发布者的元素序列正常结束,就通过订阅者的onComplete()方法予以告知。如果序列发送过程中有错误,则通过订阅者的onError()方法予以告知并传递错误提示。这两种情况都会导致序列终止,订阅过程结束。
2. Project Reactor
Spring 5引入了响应式编程机制,并默认集成了Project Reactor(下文简称为Reactor)作为该机制的实现框架。Reactor诞生较晚,可以认为它是第二代响应式开发框架。所以它是一款完全基于响应式流规范设计和实现的工具库,在使用上直观易懂。
在Reactor框架中,数据流的表现形式如图5-2所示。
图5-2中的数据流模型从语义上可以用如下公式表示:
onNext x 0..N [onError | onComplete]
上述公式包含了如下三种不同类型的方法调用,分别处理不同场景下的消息通知。
onNext():正常包含元素的消息通知。
onComplete():序列结束的消息通知。
onError():序列出错的消息通知,可以没有。
按照响应式流规范,当这些消息通知产生时,订阅者中对应的onNext()、onComplete()和onError()这三个方法将被调用。如果序列没有出错,则onError()方法不会被调用;而如果不调用onComplete()方法,我们就会得到一个无限异步序列。通常,无限异步序列应该只用于测试等特殊场景。
针对数据流,Reactor提供了两个核心组件,即Flux和Mono。其中Flux代表包含0到n个元素的异步序列,而Mono则表示包含0个或1个元素的异步序列。
创建Flux的方式非常多,这些方式可以分成两大类,一类是充分利用Flux的静态方法,另一类则是动态创建Flux。这里的静态方法常见的包括just()、fromArray()、fromIterable()、fromStream()、empty()、error()、never()、range()、interval()等,而动态方法则包括generate()和create()。
创建Mono的方式也类似。另外,和其他主流的响应式编程框架一样,Reactor框架的设计目标也是为了简化响应式流的使用方法。为此,Reactor框架为我们提供了大量操作符用于操作Flux和Mono对象。常见的包括用于数据转换的flatMap、用于数据过滤的filter、用于操作组合的zipWith、用于条件控制的defaultIfEmpty,以及subscribe和log等工具操作符。
由于本章的重点是介绍Spring WebFlux,而Project Reactor是WebFlux的底层框架,我们一般不会直接使用该框架开发Web应用程序。
响应式编程的应用场景分析
在介绍完响应式流程规范以及开发工具Project Reactor之后,你可能会问,响应式编程到底有什么用?现实中哪些场景可以用得上响应式编程?这是一个好问题,本小节将基于一些具体的应用场景来探讨这一话题。
1. 响应式编程的应用场景
本质上,我们可以认为响应式编程并不仅仅是一种编程技术,而且是一种架构设计的系统方法,因此适用于诸多场景。它既可以用于简单的Web应用系统,也可以用于大型企业的解决方案。当然,基于响应式数据流,我们也完全可以利用它构建流式系统或大数据系统。
数据流处理是响应式编程的一大应用场景。流式系统的主要特点是低延迟和高吞吐量。对于这类系统,大多数数据是从服务器端传出的,因此客户端扮演消费者的角色。这个时候,通过使用非阻塞式通信可以确保资源得到高效的利用,从而实现低延迟和高吞吐量。流式系统的表现形式也可以有很多,日常的日志埋点和分析、服务运行时的状态采集等都属于这种类型。对高并发流量的处理,通常涉及大量的I/O操作。相较于传统的同步阻塞式I/O模型,响应式编程所具备的异步非阻塞式I/O模型非常适合应对高并发流量的业务场景。这类场景中比较典型的一种表现形式就是微服务架构中的API网关,因为网关的作用就是响应来自前端系统的流量并将其转发到后端服务。
讲到微服务架构,如何构建一个具有异步非阻塞式请求处理流程的Web服务也是开发人员的核心诉求,我们需要高效处理跨服务之间的网络请求。针对这种场景,响应式编程及其相关技术(如本章要介绍的RSocket协议)同样也是一种非常有效的解决方案。
2. 响应式编程在开源框架中的应用
响应式编程在系统开发过程中逐渐得到广泛的应用。结合上文所分析的三种典型应用场景,这里我们以对应的Netflix Hystrix、Spring CloudGateway以及Spring WebFlux这三款主流的开源框架为例,解析这些框架背后的响应式编程技术。
(1)Netflix Hystrix中的滑动窗口和响应式编程
我们已经在第1章介绍Spring家庭生态时提到过Spring Cloud微服务开发框架。
在Spring Cloud微服务开发框架中,存在一个Spring Cloud NetflixHystrix组件,该组件基于Netflix Hystrix实现了服务熔断功能。NetflixHystrix是Netflix开源的一款容错库,使用了HystrixCircuitBreaker类来实现熔断器。该类通过一个circuitOpen状态位控制着整个熔断判断流程,而这个状态位本身的状态值则取决于系统目前的执行数据和健康指标。那么,HystrixCircuitBreaker如何动态获取系统运行时的各项数据呢?这里就使用到了一个HealthCountsStream类,从命名上不难看出,这就是一种数据流。
HealthCountsStream在设计上采用了一种特定的机制,即滑动窗口(RollingWindow)机制,核心代码如代码清单5-5所示。
代码清单5-5 Netflix Hystrix中的滑动窗口核心代码
this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>()
{
@Override
public Observable<Bucket> call() {
return inputEventStream.observe()
// 使用window操作符收集一个Bucket时间内的数据
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
// 将每个window内聚集起来的事件集合汇总成Bucket
.flatMap(reduceBucketToSummary).startWith(emptyEventCountsToStart);
}
});
在技术选型上,Hystrix采用了基于响应式编程思想的RxJava。与其他响应式编程框架一样,RxJava同样实现了前面介绍的响应式流规范。使用RxJava的一大好处是可以通过RxJava的一系列操作符来实现滑动窗口,包括上述代码所展示的window、flatMap和reduce等。其中window操作符把当前流中的元素收集到另外的流序列;flatMap操作符把流中的每个元素转换成一个流,再把转换之后得到的所有流中的元素进行合并;而reduce操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的流。
(2)Spring Cloud Gateway中的过滤器和响应式编程
Spring Cloud Gateway是Spring Cloud微服务开发框架中的另一个核心组件,是Spring官方自己开发的一款API网关。在技术体系上,Spring CloudGateway基于最新的Spring 5和Spring Boot 2以及用于响应式编程的ProjectReactor框架,提供响应式、非阻塞式I/O模型。和其他API网关系统类似,Spring Cloud Gateway中的核心组件也是过滤器。
过滤器用于在响应HTTP请求之前或之后修改请求本身及对应的响应结果。Spring Cloud Gateway提供了一个全局过滤器(GlobalFilter)的概念,对所有路由都生效。我们来演示一下如何使用全局过滤器来对所有HTTP请求进行拦截,具体做法是实现GlobalFilter接口,示例代码如代码清单5-6所示。
代码清单5-6 GlobalFilter接口实现示例代码
@Configuration
public class JWTAuthFilter implements GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange,
GatewayFilterChain chain) {
ServerHttpRequest.Builder builder =
exchange.getRequest().mutate();
builder.header("Authorization","Token");
chain.filter(exchange.mutate().request(builder.build()).build());
return
chain.filter(exchange.mutate().request(builder.build()).build());
}
}
以上代码展示了如何利用全局过滤器在所有的请求中添加Header。在这个示例中,我们对所有经过API网关的HTTP请求添加了一个消息头,用来设置与访问Token相关的安全认证信息。这里filter()方法的返回值是Mono<Void>,代表它使用了响应式编程技术。
(3)Spring WebFlux中的请求处理流程和响应式编程
Spring WebFlux是Spring 5引入的全新的响应式Web服务开发框架,我们在后面中将对其详细讲解。现在只需要知道,在WebFlux中,和WebMVC类似,对HTTP请求的处理过程涉及HandlerMapping、HandlerAdapter、HandlerResultHandler类之间的交互,其核心的handle()方法的定义如代码清单5-7所示,该方法实现了流式处理请求机制。
代码清单5-7 Spring WebFlux中的handle()方法定义代码
public Mono<Void> handle(ServerWebExchange exchange) {
...
return Flux.fromIterable(this.handlerMappings)
//从handlerMapping这个map中获取HandlerMapping
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
//如果没有找到HandlerMapping,则抛出异常
.switchIfEmpty(createNotFoundError())
//触发HandlerAdapter的handle方法
.flatMap(handler -> invokeHandler(exchange, handler))
//触发HandlerResultHandler的handleResult方法
.flatMap(result -> handleResult(exchange, result));
}
在这个核心方法中,我们也看到了Project Reactor框架所通过的concatMap、switchIfEmpty和flatMap等响应式操作符,其中的flatMap操作符我们在前面已经讨论过。
Spring响应式编程组件
基于Project Reactor框架,Spring为我们提供了全面的响应式编程技术体系。对Web服务,我们可以使用响应式Web框架WebFlux;而对NoSQL响应式数据访问,我们则可以使用Spring Data Reactive框架。本节将对Spring中的这些响应式编程组件简要讲解。
1. Web服务和Spring WebFlux
在Spring WebMVC的基础上,我们将引入全新的Spring WebFlux框架。
WebFlux框架名称中的Flux一词就来源于Project Reactor框架中的Flux组件。WebFlux功能非常强大,不仅仅包含了对创建和访问响应式HTTP端点的支持,还可以实现服务器推送事件以及WebSocket。我们不对该框架的所有功能做全面介绍,对于应用程序而言,开发人员的主要工作是基于HTTP开发响应式服务,这也是本章的一大重点。
要想使用WebFlux,我们需要引入spring-boot-starter-webflux依赖包。图5-3展示了spring-boot-starter-webflux 2.5.3版本的依赖组件,可以看到该版本在spring-boot-starter 2.5.3版本的基础上依赖于springwebflux 5.3.9. RELEASE版本,而后者同样依赖于spring-web 5.3.9.RELEASE版本以及3.4.8. RELEASE版本的reactor-core组件。
Spring WebFlux提供了完整的支持响应式开发的服务端技术栈。和Spring WebMVC相比,Spring WebFlux既支持基于@Controller、@RequestMapping等注解的传统开发模式,又支持基于Router Functions的函数式开发模式。
关于框架背后的实现原理,传统的Spring MVC构建在Java EE的Servlet标准之上,该标准本身就是阻塞式和同步的。最新版本的Servlet虽然也添加了异步支持,但是在等待请求的过程中,仍然在线程池中保持着线程。而Spring WebFlux则是构建在响应式流以及它的实现框架Reactor基础之上的一个开发框架,因此可以基于HTTP实现异步非阻塞的Web服务。
2. 数据访问和Spring Data Reactive
我们知道Spring Data是Spring家族中针对数据访问而开发的一个框架,对各种数据存储媒介抽象了一批Repository接口以简化开发过程。而在Spring Data的基础上,Spring 5也全面提供了一组响应式数据访问模型。在新一代Spring框架中,我们可以把Spring Data划分为两大类型,一类是支持JDBC、JPA和部分NoSQL的传统Spring Data Repository,而另一类则是支持Mongo、Cassandra、Redis、Couchbase、R2DBC等的响应式Spring DataReactive Repository。
本文给大家讲解的内容是springweb服务应用响应式Web开发组件:响应式编程和Spring Boot
- 下文给大家讲解的是springweb服务应用响应式Web开发组件:Spring WebFlux
猜你喜欢
- 2024-12-03 Java,JDK11,发布订阅模式,响应式流(Reactive Streams)及背压
- 2024-12-03 有空就来学Hystrix RPC保护的原理,RPC监控之滑动窗口的实现原理
- 2024-12-03 开发Spring Boot应用并部署到Minikube
- 2024-12-03 Reactor响应式编程 第二篇 Spring Boot 整合 Reactor 简单例子
- 2024-12-03 反应式编程之Spring Web-Flux/Project Reactor
- 2024-12-03 即学即用Kotlin - 协程
- 2024-12-03 并发编程:CompletableFuture异步编程详解
- 2024-12-03 终于有人把安卓程序员必学知识点全整理出来了,有如醍醐灌顶
- 2024-12-03 Kotlin Flow的设计精髓:响应式编程在Android中的实践
- 2024-12-03 Reactive Programming 很简单
你 发表评论:
欢迎- 最近发表
-
- JAVA面试|为什么Spring Boot的jar可以直接运行?
- 什么情况,今年面试都不问八股文了??准备了几个月,结果一个都不问。。
- LangChain系列之如何使用LangChain4j构建RAG应用(1)
- JAVA入门教程-第2章 基本编程概念
- FTPC Pnuts语言(ftp mput put)
- 这9个工具库让我的Java开发效率提升了80%
- VS2022配置x86/x64调用32位和64位汇编语言动态库环境
- 别再裸写 parseFrom() 了!这才是 MQTT + Protobuf 消费的正确姿势
- aardio + Java + JavaScript 混合开发快速入门
- 铁打的程序,流水的语言,2018年JAVA编程还想坚挺500年?
- 标签列表
-
- java反编译工具 (77)
- java反射 (57)
- java接口 (61)
- java随机数 (63)
- java7下载 (59)
- java数据结构 (61)
- java 三目运算符 (65)
- java对象转map (63)
- Java继承 (69)
- java字符串替换 (60)
- 快速排序java (59)
- java并发编程 (58)
- java api文档 (60)
- centos安装java (57)
- java调用webservice接口 (61)
- java深拷贝 (61)
- 工厂模式java (59)
- java代理模式 (59)
- java.lang (57)
- java连接mysql数据库 (67)
- java重载 (68)
- java 循环语句 (66)
- java反序列化 (58)
- java时间函数 (60)
- java是值传递还是引用传递 (62)
本文暂时没有评论,来添加一个吧(●'◡'●)