Java NIO 网络编程模型 Proactor模式讲解和实战demo

在 Java NIO 中,Proactor 模式是一种异步 I/O 模式,与 Reactor 模式相对应,它的核心思想是在数据准备就绪后,系统通知应用程序进行 I/O 操作,这样应用程序就可以处理更多的连接而不必阻塞。相比 Reactor 模式,Proactor 模式更适合处理大量的并发连接和数据的情况。

在 Proactor 模式中,应用程序不再需要像 Reactor 模式那样轮询 I/O 事件,而是把 I/O 操作的具体实现交给系统完成。当数据准备就绪时,系统通知应用程序进行 I/O 操作,应用程序仅需要处理读写完成后的结果。

与 Reactor 模式相比,Proactor 模式的优点在于能够提供更高的并发处理能力,更少的系统开销和更少的 CPU 占用率,因为系统负责 I/O 操作,应用程序只需要处理数据。

下面是一个简单的使用 Proactor 模式的 Java NIO 网络编程示例:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.Future;

public class ProactorServer {

    private static final int BUFFER_SIZE = 1024;
    private static final int PORT = 8080;

    public static void main(String[] args) throws IOException {
        AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(PORT));
        serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
            @Override
            public void completed(AsynchronousSocketChannel channel, Void attachment) {
                serverSocketChannel.accept(null, this);
                ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
                channel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer result, ByteBuffer attachment) {
                        if (result < 0) {
                            try {
                                channel.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                            return;
                        }
                        attachment.flip();
                        channel.write(attachment, attachment, new CompletionHandler<Integer, ByteBuffer>() {
                            @Override
                            public void completed(Integer result, ByteBuffer attachment) {
                                if (attachment.hasRemaining()) {
                                    channel.write(attachment, attachment, this);
                                } else {
                                    attachment.compact();
                                    channel.read(attachment, attachment, this);
                                }
                            }

                            @Override
                            public void failed(Throwable exc, ByteBuffer attachment) {
                                try {
                                    channel.close();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                    }

                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {
                        try {
                            channel.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }

            @Override
            public void failed(Throwable exc, Void attachment) {
                System.out.println("Server Error");
            }
        });
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}