1. 为什么要写这篇文章
几年前 NoSQL 开始流行的时候,像其他团队一样,我们的团队也热衷于令人兴奋的新东西,并且计划替换一个应用程序的数据库。但是,当深入实现细节时,我们想起了一位智者曾经说过的话:“细节决定成败”。最终我们意识到 NoSQL 不是解决所有问题的银弹,而 NoSQL vs RDMS 的答案是:“视情况而定”。类似地,去年RxJava 和 Spring Reactor 这样的并发库加入了让人充满激情的语句,如异步非阻塞方法等。为了避免再犯同样的错误,我们尝试评估诸如 ExecutorService、 RxJava、Disruptor 和 Akka 这些并发框架彼此之间的差异,以及如何确定各自框架的正确用法。
本文中用到的术语在这里有更详细的描述。
2. 分析并发框架的示例用例
3. 快速更新线程配置
在开始比较并发框架的之前,让我们快速复习一下如何配置最佳线程数以提高并行任务的性能。这个理论适用于所有框架,并且在所有框架中使用相同的线程配置来度量性能。
- 对于内存任务,线程的数量大约等于具有最佳性能的内核的数量,尽管它可以根据各自处理器中的超线程特性进行一些更改。
- 例如,在8核机器中,如果对应用程序的每个请求都必须在内存中并行执行4个任务,那么这台机器上的负载应该保持为 @2 req/sec,在 ThreadPool 中保持8个线程。
- 对于 I/O 任务,ExecutorService 中配置的线程数应该取决于外部服务的延迟。
- 与内存中的任务不同,I/O 任务中涉及的线程将被阻塞,并处于等待状态,直到外部服务响应或超时。因此,当涉及 I/O 任务线程被阻塞时,应该增加线程的数量,以处理来自并发请求的额外负载。
- I/O 任务的线程数应该以保守的方式增加,因为处于活动状态的许多线程带来了上下文切换的成本,这将影响应用程序的性能。为了避免这种情况,应该根据 I/O 任务中涉及的线程的等待时间按比例增加此机器的线程的确切数量以及负载。
参考: http://baddotrobot.com/blog/2013/06/01/optimum-number-of-threads/
4. 性能测试结果
性能测试配置 GCP -> 处理器:Intel(R) Xeon(R) CPU @ 2.30GHz;架构:x86_64;CPU 内核:8个(注意:这些结果仅对该配置有意义,并不表示一个框架比另一个框架更好)。
5. 使用执行器服务并行化 IO 任务
5.1 何时使用?
如果一个应用程序部署在多个节点上,并且每个节点的 req/sec 小于可用的核心数量,那么 ExecutorService 可用于并行化任务,更快地执行代码。
5.2 什么时候适用?
如果一个应用程序部署在多个节点上,并且每个节点的 req/sec 远远高于可用的核心数量,那么使用 ExecutorService 进一步并行化只会使情况变得更糟。
当外部服务延迟增加到 400ms 时,性能测试结果如下(请求速率 @50 req/sec,8核)。
5.3 所有任务按顺序执行示例
- // I/O 任务:调用外部服务
- String posts = JsonService.getPosts();
- String comments = JsonService.getComments();
- String albums = JsonService.getAlbums();
- String photos = JsonService.getPhotos();
-
- // 合并来自外部服务的响应
- // (内存中的任务将作为此操作的一部分执行)
- int userId = new Random().nextInt(10) + 1;
- String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
- String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
-
- // 构建最终响应并将其发送回客户端
- String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
- return response;
5.4 I/O 任务与 ExecutorService 并行执行代码示例
- // 添加 I/O 任务
- List<Callable<String>> ioCallableTasks = new ArrayList<>();
- ioCallableTasks.add(JsonService::getPosts);
- ioCallableTasks.add(JsonService::getComments);
- ioCallableTasks.add(JsonService::getAlbums);
- ioCallableTasks.add(JsonService::getPhotos);
-
- // 调用所有并行任务
- ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
- List<Future<String>> futuresOfIOTasks = ioExecutorService.invokeAll(ioCallableTasks);
-
- // 获取 I/O 操作(阻塞调用)结果
- String posts = futuresOfIOTasks.get(0).get();
- String comments = futuresOfIOTasks.get(1).get();
- String albums = futuresOfIOTasks.get(2).get();
- String photos = futuresOfIOTasks.get(3).get();
-
- // 合并响应(内存中的任务是此操作的一部分)
- String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
- String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
-
- // 构建最终响应并将其发送回客户端
- return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
6. 使用执行器服务并行化 IO 任务(CompletableFuture)
与上述情况类似:处理传入请求的 HTTP 线程被阻塞,而 CompletableFuture 用于处理并行任务
6.1 何时使用?
(编辑:ASP站长网)
|