同步异步 同步 (Synchronous):同步任务按顺序执行,一个任务完成后才开始下一个任务。调用方在等待任务完成时会被阻塞,直到任务结束并返回结果。
异步 (Asynchronous):异步任务可以同时执行,不必等待前一个任务完成。调用方在任务开始后立即返回,可以继续执行其他任务,不会被阻塞。结果通常通过回调、事件或Promise等方式处理。
阻塞、非阻塞 阻塞 :当一个线程在等待某个条件满足(如I/O操作完成、锁释放等)时,它会被挂起(阻塞)并暂停执行,直到条件满足。系统会将其置于等待队列中,直到阻塞条件解除。
非阻塞 :当一个线程在等待某个条件满足时,它不会被挂起,而是立即返回一个状态(如成功、失败、或需要重试),线程可以继续处理其他任务或尝试再次执行未完成的任务。
同步阻塞、非阻塞 同步阻塞(BIO) 以流的方式处理数据,底层是字节流。
1 2 3 4 5 6 7 8 9 10 11 try (ServerSocket serverSocket = new ServerSocket (port)) { logger.info("服务启动中" ); Socket socket; while ((socket = serverSocket.accept()) != null ) { logger.info("Accepted connection from {}" , socket.getRemoteSocketAddress()); threadPool.execute(new RequestHandlerThread (socket, requestHandler, serviceRegistry)); } } catch (Exception e) { logger.error("连接出错了" ); }
同步非阻塞(NIO) 面向数据块
buffer 一个可以写入数据的内存块,使用缓冲区读写数据遵循以下四个步骤
写数据
调用buffer.flip()方法,从写模式切换到读模式
读数据
调用buffer.clear()或buffer.compat()方法
buffer类的一些操作源码:
初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private int mark = -1 ; private int position = 0 ; private int limit; private int capacity;public static IntBuffer allocate (int capacity) { if (capacity < 0 ) throw createCapacityException(capacity); return new HeapIntBuffer (capacity, capacity, null ); }public static IntBuffer wrap (int [] array, int offset, int length) { try { return new HeapIntBuffer (array, offset, length, null ); } catch (IllegalArgumentException x) { throw new IndexOutOfBoundsException (); } }
put
1 2 3 4 5 6 7 8 9 10 11 12 13 public IntBuffer put (int x) { hb[ix(nextPutIndex())] = x; return this ; }final int nextPutIndex () { int p = position; if (p >= limit) throw new BufferOverflowException (); position = p + 1 ; return p; }
flip
1 2 3 4 5 6 7 public Buffer flip () { limit = position; position = 0 ; mark = -1 ; return this ; }
clear方法清空缓冲区;compact方法只会清空已读取的数据,而还未读取的数据继续保存在Buffer中;
channel
channel可以同时读写,流只能单独读或者写
channel可以实现异步读写数据
channel可以从buffer读取数据,也可以写数据到buffer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 try (RandomAccessFile raf = new RandomAccessFile (new File ("test.txt" ), "rw" ); FileChannel channel = raf.getChannel()) { channel.write(ByteBuffer.wrap("伞兵一号卢本伟准备就绪!" .getBytes())); System.out.println("After write: position : " + channel.position()); channel.position(0 ); ByteBuffer buffer = ByteBuffer.allocate(1024 ); int read = channel.read(buffer); System.out.println(new String (buffer.array(), 0 , read)); } catch (IOException e) { throw new RuntimeException (e);
channel还有个文件锁的功能,尝试获取与 FileChannel
关联的文件区域的独占或共享锁,shared变量声明是独占还是共享。
1 2 3 4 public abstract FileLock lock (long position, long size, boolean shared) public abstract FileLock tryLock (long position, long size, boolean shared)
selector 一个组件,可以检测多个NIO channel,看看读或者写事件是否就绪。
多个Channel以事件的方式可以注册到同一个Selector,从而达到用一个线程处理多个请求成为可能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); Selector selector = Selector.open()) { serverSocketChannel.bind(new InetSocketAddress (9999 )); serverSocketChannel.configureBlocking(false ); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true ) { int count = selector.select(); System.out.println("监听到" + count +"个事件" ); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("客户端连接:" + socketChannel.getRemoteAddress()); socketChannel.configureBlocking(false ); socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024 ); int read = socketChannel.read(byteBuffer); if (read > 0 ) { byteBuffer.flip(); System.out.println(new String (byteBuffer.array(), 0 , read)); } socketChannel.write(ByteBuffer.wrap("已收到" .getBytes())); } iterator.remove(); } } } catch (IOException e) { throw new RuntimeException (e); }try (SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress ("127.0.0.1" , 9999 )); Scanner scanner = new Scanner (System.in)) { System.out.println("已连接服务器!" ); while (true ) { System.out.println("请输入发送的内容:" ); String line = scanner.nextLine(); socketChannel.write(ByteBuffer.wrap(line.getBytes())); ByteBuffer buffer = ByteBuffer.allocate(1024 ); socketChannel.read(buffer); buffer.flip(); System.out.println("收到服务器返回的:" + new String (buffer.array(), 0 , buffer.limit())); } } catch (IOException e) { throw new RuntimeException (e); }
reactor模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 public class Reactor implements Runnable , Closeable { private final ServerSocketChannel serverSocketChannel; private final Selector selector; public Reactor () throws IOException { this .serverSocketChannel = ServerSocketChannel.open(); this .selector = Selector.open(); } @Override public void run () { try { serverSocketChannel.bind(new InetSocketAddress (9999 )); serverSocketChannel.configureBlocking(false ); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor (serverSocketChannel, selector)); while (true ) { int count = selector.select(); System.out.println("监听到" + count + "个事件" ); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { this .dispatch(iterator.next()); iterator.remove(); } } } catch (IOException e) { throw new RuntimeException (e); } } private void dispatch (SelectionKey key) { Object attachment = key.attachment(); if (attachment instanceof Runnable) { ((Runnable) attachment).run(); } } @Override public void close () throws IOException { serverSocketChannel.close(); selector.close(); } }public class Acceptor implements Runnable { private final ServerSocketChannel serverSocketChannel; private final Selector selector; public Acceptor (ServerSocketChannel serverSocketChannel, Selector selector) { this .serverSocketChannel = serverSocketChannel; this .selector = selector; } @Override public void run () { try { SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("客户端连接:" + socketChannel.getRemoteAddress()); socketChannel.configureBlocking(false ); socketChannel.register(selector, SelectionKey.OP_READ, new Handler (socketChannel)); } catch (IOException e) { throw new RuntimeException (e); } } }public class Handler implements Runnable { private final SocketChannel socketChannel; private final ThreadPoolExecutor pool; public Handler (SocketChannel socketChannel) { this .socketChannel = socketChannel; pool = new ThreadPoolExecutor (5 , 20 , 60 , TimeUnit.SECONDS, new LinkedBlockingQueue <>(), new ThreadPoolExecutor .CallerRunsPolicy()); } @Override public void run () { try { ByteBuffer byteBuffer = ByteBuffer.allocate(20 ); int read = socketChannel.read(byteBuffer); if (read < 0 ) { socketChannel.close(); System.out.println("客户端断开连接!" ); return ; } pool.execute(() -> { try { if (byteBuffer.remaining() == 0 ) { byteBuffer.flip(); System.out.println("线程 " + Thread.currentThread().getName() + " " + "处理客户端消息: " + new String (byteBuffer.array(), 0 , byteBuffer.remaining())); byteBuffer.clear(); } socketChannel.write(ByteBuffer.wrap("已收到" .getBytes())); } catch (IOException e) { throw new RuntimeException (e); } }); } catch (IOException e) { throw new RuntimeException (e); } } }
主从reactor模式 NIO存在的问题 客户端关闭导致服务端空轮询
粘包拆包问题
消息定长
每个包末尾使用特殊分隔符
将消息分为头部和本体,头部保存数据包的长度