那就开始建立对应关系。因为“反应”是一系列行为动作,所以应该和“执行逻辑”对应。那“变化”只能和“数据”对应,其实这是对的,“数据”由不可用到可用,本身就是发生了一个“变化”。
这个对应关系建立的很完美,但是逻辑顺序却完全冲突。响应式是由变化主导反应,这很好理解,我都没有变化,你无须做出反应。同步阻塞是由执行逻辑主导数据,这也很好理解,我代码都没执行呢,根本不需要数据。
可见,它们的对应关系非常完美,但主导顺序完全相反,这就是一个非常非常有价值的发现。
因为我们只需把同步阻塞倒过来,就是实现响应式的大致方向。这样的推理貌似是对的,但实际当中是这样的吗?嗯,是这样的。
现在请大家和我一起扭转思维。原来以逻辑代码执行作为主线,数据作为参与者。现在以数据作为主线,逻辑代码执行作为参与者。说的再白一些,原来是数据传递到逻辑代码里,现在是逻辑代码传递到数据里。
有人也许会问,逻辑代码怎么传递?哈哈,Lambda表达式呀,函数式编程呀。
想象一下,有一个长长的管子,里面的水一直在流。
如果你想让水变成橙色的,只需在管子上开个口,加装一个可以持续投放橙色染料的装置,结果流经它的水都变成橙色的了。
如果你想让橙色的水变甜的话,只需在后面的管子上开个口,加装一个可以持续投放白糖的装置,结果流经它的水都变成甜的了。
同理,可以在后面继续加装投放柠檬酸的装置,让水变酸,在后面继续加装压入二氧化碳的装置,让水带气泡。
最后发现,自来水经过多道工序处理后变成了芬达。
如果把水流看作是数据流,把投放装置看作是逻辑代码,就变成了,数据先流入第一个逻辑代码,处理后再流入第二个逻辑代码,依次流下去直至结束。
这就是以数据作为主线,逻辑代码只是参与者,同时它也是Reactor实现响应式编程的原理,Spring官方使用的响应式类库就是Reactor。
其中,“以数据为主线”和“在变化时通知处理者”这两个功能Reactor库都已经实现了,我们需要做的就是“对变化做出反应”,即插入逻辑代码。
Reactor入门
在Reactor中,有两个非常重要的类,就是Mono和Flux,它们都是数据源,在它们内部都已经实现了“以数据为主线”和“在变化时通知处理者”这两个功能,而且还提供了方法让我们来插入逻辑代码用于“对变化做出反应”。
Mono表示0个或1个数据,Flux表示0到多个数据。先从简单的Mono开始。
设计一个简单的示例,首先创建一个数据源,只包含一个数据10,第一个处理就是加1,第二个处理就是奇偶性过滤,第三个处理就是把这个数据消费掉,然后就结束了。
为了清楚地看出来主线程执行的是哪些代码,工作线程执行的是哪些代码,特意打印了很多信息。
- public static void main(String[] args) {
- displayCurrTime(1);
- displayCurrThreadId(1);
- //创建一个数据源
- Mono.just(10)
- //延迟5秒再发射数据
- .delayElement(Duration.ofSeconds(5))
- //在数据上执行一个转换
- .map(n -> {
- displayCurrTime(2);
- displayCurrThreadId(2);
- displayValue(n);
- delaySeconds(2);
- return n + 1;
- })
- //在数据上执行一个过滤
- .filter(n -> {
- displayCurrTime(3);
- displayCurrThreadId(3);
- displayValue(n);
- delaySeconds(3);
- return n % 2 == 0;
- })
- //如果数据没了就用默认值
- .defaultIfEmpty(9)
- //订阅一个消费者把数据消费了
- .subscribe(n -> {
- displayCurrTime(4);
- displayCurrThreadId(4);
- displayValue(n);
- delaySeconds(2);
- System.out.println(n + " consumed, worker Thread over, exit.");
- });
- displayCurrTime(5);
- displayCurrThreadId(5);
- pause();
- }
- //显示当前时间
- static void displayCurrTime(int point) {
- System.out.println(point + " : " + LocalTime.now());
- }
- //显示当前线程Id
- static void displayCurrThreadId(int point) {
- System.out.println(point + " : " + Thread.currentThread().getId());
- }
- //显示当前的数值
- static void displayValue(int n) {
- System.out.println("input : " + n);
- }
- //延迟若干秒
- static void delaySeconds(int seconds) {
- try {
- TimeUnit.SECONDS.sleep(seconds);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- //主线程暂停
- static void pause() {
- try {
- System.out.println("main Thread over, paused.");
- System.in.read();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
(编辑:ASP站长网)
|