8. RxJava
- 这与上面的情况类似,唯一的区别是 RxJava 提供了更好的 DSL 可以进行流式编程,下面的例子中没有体现这一点。
- 性能优于 CompletableFuture 处理并行任务。
8.1 何时使用?
如果编码的场景适合异步非阻塞方式,那么可以首选 RxJava 或任何响应式开发库。还具有诸如 back-pressure 之类的附加功能,可以在生产者和消费者之间平衡负载。
- int userId = new Random().nextInt(10) + 1;
- ExecutorService executor = CustomThreads.getExecutorService(8);
-
- // I/O 任务
- Observable<String> postsObservable = Observable.just(userId).map(o -> JsonService.getPosts())
- .subscribeOn(Schedulers.from(executor));
- Observable<String> commentsObservable = Observable.just(userId).map(o -> JsonService.getComments())
- .subscribeOn(Schedulers.from(executor));
- Observable<String> albumsObservable = Observable.just(userId).map(o -> JsonService.getAlbums())
- .subscribeOn(Schedulers.from(executor));
- Observable<String> photosObservable = Observable.just(userId).map(o -> JsonService.getPhotos())
- .subscribeOn(Schedulers.from(executor));
-
- // 合并来自 /posts 和 /comments API 的响应
- // 作为这个操作的一部分,将执行内存中的一些任务
- Observable<String> postsAndCommentsObservable = Observable
- .zip(postsObservable, commentsObservable,
- (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments))
- .subscribeOn(Schedulers.from(executor));
-
- // 合并来自 /albums 和 /photos API 的响应
- // 作为这个操作的一部分,将执行内存中的一些任务
- Observable<String> albumsAndPhotosObservable = Observable
- .zip(albumsObservable, photosObservable,
- (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos))
- .subscribeOn(Schedulers.from(executor));
-
- // 构建最终响应
- Observable.zip(postsAndCommentsObservable, albumsAndPhotosObservable, (r1, r2) -> r1 + r2)
- .subscribeOn(Schedulers.from(executor))
- .subscribe((response) -> asyncResponse.resume(response), e -> asyncResponse.resume("error"));
9. Disruptor
[Queue vs RingBuffer]
图片1:http://tutorials.jenkov.com/java-concurrency/blocking-queues.html
图片2:https://www.baeldung.com/lmax-disruptor-concurrency
- 在本例中,HTTP 线程将被阻塞,直到 disruptor 完成任务,并且使用 countdowlatch 将 HTTP 线程与 ExecutorService 中的线程同步。
- 这个框架的主要特点是在没有任何锁的情况下处理线程间通信。在 ExecutorService 中,生产者和消费者之间的数据将通过 Queue传递,在生产者和消费者之间的数据传输过程中涉及到一个锁。Disruptor 框架通过一个名为 Ring Buffer 的数据结构(它是循环数组队列的扩展版本)来处理这种生产者-消费者通信,并且不需要任何锁。
- 这个库不适用于我们在这里讨论的这种用例。仅出于好奇而添加。
9.1 何时使用?
(编辑:ASP站长网)
|