在 Java 中,线程的实现是比较重量级的,所以线程的启动或者销毁是很消耗服务器的资源的,即使使用线程池来实现,使用上述传统的 Socket 方式,当连接数极具上升也会带来性能瓶颈,原因是线程的上线文切换开销会在高并发的时候体现的很明显,并且以上操作方式还是同步阻塞式的编程,性能问题在高并发的时候就会体现的尤为明显。
以上的流程,如下图:
4.2 NIO 多路复用
介于以上高并发的问题,NIO 的多路复用功能就显得意义非凡了。
NIO 是利用了单线程轮询事件的机制,通过高效地定位就绪的 Channel,来决定做什么,仅仅 select 阶段是阻塞的,可以有效避免大量客户端连接时,频繁线程切换带来的问题,应用的扩展能力有了非常大的提高。
- // NIO 多路复用
- ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 4,
- 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- threadPool.execute(new Runnable() {
- @Override
- public void run() {
- try (Selector selector = Selector.open();
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();) {
- serverSocketChannel.bind(new InetSocketAddress(InetAddress.getLocalHost(), port));
- serverSocketChannel.configureBlocking(false);
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
- while (true) {
- selector.select(); // 阻塞等待就绪的Channel
- Set<SelectionKey> selectionKeys = selector.selectedKeys();
- Iterator<SelectionKey> iterator = selectionKeys.iterator();
- while (iterator.hasNext()) {
- SelectionKey key = iterator.next();
- try (SocketChannel channel = ((ServerSocketChannel) key.channel()).accept()) {
- channel.write(Charset.defaultCharset().encode("你好,世界"));
- }
- iterator.remove();
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- });
- // Socket 客户端(接收信息并打印)
- try (Socket cSocket = new Socket(InetAddress.getLocalHost(), port)) {
- BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(cSocket.getInputStream()));
- bufferedReader.lines().forEach(s -> System.out.println("NIO 客户端:" + s));
- } catch (IOException e) {
- e.printStackTrace();
- }
- 首先,通过 Selector.open() 创建一个 Selector,作为类似调度员的角色;
- 然后,创建一个 ServerSocketChannel,并且向 Selector 注册,通过指定 SelectionKey.OP_ACCEPT,告诉调度员,它关注的是新的连接请求;
- 为什么我们要明确配置非阻塞模式呢?这是因为阻塞模式下,注册操作是不允许的,会抛出 IllegalBlockingModeException 异常;
- Selector 阻塞在 select 操作,当有 Channel 发生接入请求,就会被唤醒;
下面的图,可以有效的说明 NIO 复用的流程:
就这样 NIO 的多路复用就大大提升了服务器端响应高并发的能力。
4.3 AIO 版 Socket 实现
Java 1.7 提供了 AIO 实现的 Socket 是这样的,如下代码:
- // AIO线程复用版
- Thread sThread = new Thread(new Runnable() {
- @Override
- public void run() {
- AsynchronousChannelGroup group = null;
- try {
- group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4));
- AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress(InetAddress.getLocalHost(), port));
- server.accept(null, new CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel>() {
- @Override
- public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {
- server.accept(null, this); // 接收下一个请求
- try {
- Future<Integer> f = result.write(Charset.defaultCharset().encode("你好,世界"));
- f.get();
- System.out.println("服务端发送时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
- result.close();
- } catch (InterruptedException | ExecutionException | IOException e) {
- e.printStackTrace();
- }
- }
- @Override
- public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
- }
- });
- group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
- } catch (IOException | InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- sThread.start();
- // Socket 客户端
- AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
- Future<Void> future = client.connect(new InetSocketAddress(InetAddress.getLocalHost(), port));
- future.get();
- ByteBuffer buffer = ByteBuffer.allocate(100);
- client.read(buffer, null, new CompletionHandler<Integer, Void>() {
- @Override
- public void completed(Integer result, Void attachment) {
- System.out.println("客户端打印:" + new String(buffer.array()));
- }
- @Override
- public void failed(Throwable exc, Void attachment) {
- exc.printStackTrace();
- try {
- client.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- });
- Thread.sleep(10 * 1000);
五、总结
(编辑:ASP站长网)
|