博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java网络编程--echo服务器
阅读量:6176 次
发布时间:2019-06-21

本文共 14190 字,大约阅读时间需要 47 分钟。

客户端使用Java的阻塞IO

服务端使用Java的非阻塞NIO

package com.nio.echo;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.InetSocketAddress;import java.net.Socket;import java.util.Scanner;/** * @author 作者 E-mail: * @version 创建时间:2015-10-29 下午02:49:47 类说明 */public class EchoClient{    public static final String REMOT_IP = "127.0.0.1";    public static final int REMOTE_PORT = 8080;    public void connectServer() throws IOException    {        Socket socket = new Socket();        socket.connect(new InetSocketAddress(REMOT_IP, REMOTE_PORT));        if (socket.isConnected())        {            System.out.println("connect remote address success");        }        // 启动线程监听server端消息        new Thread(new client2server(socket)).start();        Scanner scanner = new Scanner(System.in);        OutputStream output = socket.getOutputStream();        while (true)        {            String str = scanner.nextLine();            if (str.equals("quit"))            {                socket.close();                break;            }            output.write(str.getBytes("UTF-8"));        }    }    public static void main(String[] args) throws IOException    {        new EchoClient().connectServer();    }}class client2server implements Runnable{    private Socket socket = null;    public client2server(Socket socket)    {        this.socket = socket;    }    @Override    public void run()    {        InputStream inputStream;        try        {            inputStream = socket.getInputStream();            byte[] bytes = new byte[1024];            while (true)            {                int num = inputStream.read(bytes);                if (num != -1)                {                    System.out.print(num + " ");                }                else                {                    System.out.println("server is shutup");                    break;                }                String str = new String(bytes, 0, num, "UTF-8");                System.out.println("get data: " + str);            }        }        catch(IOException e)        {            e.printStackTrace();        }    }}

 

package com.nio.echo;import java.io.IOException;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;import java.util.Iterator;import java.util.Set;/** * @author 作者 E-mail: * @version 创建时间:2015-10-29 下午02:49:12 类说明 */public class NIOEchoServer{    private static ServerSocketChannel ssc = null;    private static Selector selector = null;    private static final int PORT = 8080;    public static void startServer() throws IOException    {        ssc = ServerSocketChannel.open();        selector = Selector.open();        ssc.configureBlocking(false);        // nio 对socket 和serverSocket进行了怎样封装        final ServerSocket serverSocket = ssc.socket();        serverSocket.bind(new InetSocketAddress(PORT));        serverSocket.setReuseAddress(true);        final AcceptHandler acceptHandler = new AcceptHandler();        ssc.register(selector, SelectionKey.OP_ACCEPT, acceptHandler);        while (true)        {            int n = selector.select();            if (n == 0)                continue;            final Set
readyKeys = selector.selectedKeys(); final Iterator
it = readyKeys.iterator(); while (it.hasNext()) { final SelectionKey key = it.next(); final Handle handler = (Handle) key.attachment(); handler.doHandle(key); it.remove(); } } } public static void main(String[] args) throws IOException { NIOEchoServer.startServer(); }}interface Handle{ void doHandle(SelectionKey key) throws IOException;}class AcceptHandler implements Handle{ @Override public void doHandle(SelectionKey key) throws IOException { final ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); final SocketChannel sc = ssc.accept(); final IOHandler handler = new IOHandler(key.selector(), sc); System.out.println("server: connect success"); }}class IOHandler implements Handle{ private final ByteBuffer readBuffer = ByteBuffer.allocate(1024); private OutputBuffer outputBuffer = new OutputBuffer(); private SocketChannel socketChannel = null; // private Selector selector = null; private SelectionKey key = null; public IOHandler(Selector selector, SocketChannel sc) throws IOException { // this.selector = selector; this.socketChannel = sc; socketChannel.configureBlocking(false); key = socketChannel.register(selector, SelectionKey.OP_READ, this); } /** * 增加输出缓存 * * @param writeData * 要写出的数据 * @throws IOException * @return 返回处理的字节数 */ private int addWriteBuffer(ByteBuffer bytebuffer, int num) throws IOException { int prevPositon = bytebuffer.position(); outputBuffer.size += num; outputBuffer.writeBuffer.put(bytebuffer).flip(); int nowPosition = bytebuffer.position(); this.interestOps(0, SelectionKey.OP_WRITE); return nowPosition - prevPositon; } /** * 增加删除相应事件 * * @param remove * @param add */ private void interestOps(int remove, int add) { int cur = key.interestOps(); int ops = (cur & ~remove) | add; if (cur != ops) { key.interestOps(ops); key.selector().wakeup(); } } /** * ByteBuffer 转换 String * * @param buffer * @return */ public static String getString(ByteBuffer buffer) { Charset charset = null; CharsetDecoder decoder = null; CharBuffer charBuffer = null; try { charset = Charset.forName("UTF-8"); decoder = charset.newDecoder(); // charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空 charBuffer = decoder.decode(buffer.asReadOnlyBuffer()); return charBuffer.toString(); } catch(Exception ex) { ex.printStackTrace(); return ""; } } @Override public void doHandle(SelectionKey key) throws IOException { if (key.isReadable()) { System.out.print("server: meet read event ,before read position = " + readBuffer.position()); int num = socketChannel.read(readBuffer); // 关闭 if (num == -1) { System.out.println("close the channel "); key.channel(); key.channel().close(); return; } // 将position置为0 readBuffer.flip(); System.out.print(" reveive data " + getString(readBuffer)); int dealsize = addWriteBuffer(readBuffer, num); System.out.println(" write to writeBuffer size = " + dealsize + " nowPostion = " + readBuffer.position()); // 将处理过的数据清除 readBuffer.compact(); } else if (key.isWritable()) { System.out.print("meet write event"); long num = socketChannel.write(outputBuffer.writeBuffer); outputBuffer.size -= num; System.out.print("deal size = " + num + "left buffer size = " + outputBuffer.size); if (outputBuffer.size == 0) { System.out.println(" deal over,cancel write event"); interestOps(SelectionKey.OP_WRITE, 0); } // 清除已经处理过的数据 outputBuffer.writeBuffer.compact(); } }}class OutputBuffer{ public int size; public final ByteBuffer writeBuffer = ByteBuffer.allocate(1024);}

 

ByteBuffer没有提供有用数据的相关方法,只能自己写一个OutputBuffer来辅助处理

之前OutputBuffer只是封装了一个ByteBuffer以及一个size变量用于标示可以数据量

 

下面对OutputBuffer进行了重构,将size变量的修改以及数据的写入和写出操作都封装到方法中,其中output(SocketChannel socketChannel)

方法利用回调的思想,将socketChannel对象传入,在OutputBuffer当中实现数据的write输出

 

 

 

 

package com.nio.echo;import java.io.IOException;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;import java.util.Iterator;import java.util.Set;/** * @author 作者 E-mail: * @version 创建时间:2015-10-29 下午02:49:12 类说明 */public class NIOEchoServer{    private static ServerSocketChannel ssc = null;    private static Selector selector = null;    private static final int PORT = 8080;    public static void startServer() throws IOException    {        ssc = ServerSocketChannel.open();        selector = Selector.open();        ssc.configureBlocking(false);        // nio 对socket 和serverSocket进行了怎样封装        final ServerSocket serverSocket = ssc.socket();        serverSocket.bind(new InetSocketAddress(PORT));        serverSocket.setReuseAddress(true);        final AcceptHandler acceptHandler = new AcceptHandler();        ssc.register(selector, SelectionKey.OP_ACCEPT, acceptHandler);        while (true)        {            int n = selector.select();            if (n == 0)                continue;            final Set
readyKeys = selector.selectedKeys(); final Iterator
it = readyKeys.iterator(); while (it.hasNext()) { final SelectionKey key = it.next(); final Handle handler = (Handle) key.attachment(); handler.doHandle(key); it.remove(); } } } public static void main(String[] args) throws IOException { NIOEchoServer.startServer(); }}interface Handle{ void doHandle(SelectionKey key) throws IOException;}class AcceptHandler implements Handle{ @Override public void doHandle(SelectionKey key) throws IOException { final ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); final SocketChannel sc = ssc.accept(); final IOHandler handler = new IOHandler(key.selector(), sc); System.out.println("server: connect success"); }}class IOHandler implements Handle{ private final ByteBuffer readBuffer = ByteBuffer.allocate(1024); private OutputBuffer outputBuffer = new OutputBuffer(); private SocketChannel socketChannel = null; // private Selector selector = null; private SelectionKey key = null; public IOHandler(Selector selector, SocketChannel sc) throws IOException { // this.selector = selector; this.socketChannel = sc; socketChannel.configureBlocking(false); key = socketChannel.register(selector, SelectionKey.OP_READ, this); } /** * 增加输出缓存 * * @param writeData * 要写出的数据 * @throws IOException * @return 返回处理的字节数 */ private int addWriteBuffer(ByteBuffer bytebuffer, int num) throws IOException { int prevPositon = bytebuffer.position(); outputBuffer.put(bytebuffer, num); int nowPosition = bytebuffer.position(); this.interestOps(0, SelectionKey.OP_WRITE); return nowPosition - prevPositon; } /** * 增加删除相应事件 * * @param remove * @param add */ private void interestOps(int remove, int add) { int cur = key.interestOps(); int ops = (cur & ~remove) | add; if (cur != ops) { key.interestOps(ops); key.selector().wakeup(); } } /** * ByteBuffer 转换 String * * @param buffer * @return */ public static String getString(ByteBuffer buffer) { Charset charset = null; CharsetDecoder decoder = null; CharBuffer charBuffer = null; try { charset = Charset.forName("UTF-8"); decoder = charset.newDecoder(); // charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空 charBuffer = decoder.decode(buffer.asReadOnlyBuffer()); return charBuffer.toString(); } catch(Exception ex) { ex.printStackTrace(); return ""; } } @Override public void doHandle(SelectionKey key) throws IOException { if (key.isReadable()) { System.out.print("server: meet read event ,before read position = " + readBuffer.position()); int num = socketChannel.read(readBuffer); // 关闭 if (num == -1) { System.out.println("close the channel "); key.channel(); key.channel().close(); return; } // 将position置为0 readBuffer.flip(); System.out.println(" reveive data " + getString(readBuffer)); int dealsize = addWriteBuffer(readBuffer, num); // 将处理过的数据清除 readBuffer.compact(); } else if (key.isWritable()) { System.out.print("meet write event"); // 写数据 outputBuffer.output(socketChannel); if (outputBuffer.size() == 0) { System.out.println(" deal over,cancel write event"); interestOps(SelectionKey.OP_WRITE, 0); } } }}class OutputBuffer{ private int size; private final ByteBuffer writeBuffer = ByteBuffer.allocate(1024); public void output(SocketChannel socketChannel) throws IOException { int num = socketChannel.write(writeBuffer); writeBuffer.compact(); size -= num; } public void put(ByteBuffer b, int num) { writeBuffer.put(b).flip(); this.size += num; } public int size() { return this.size; }}

  

 

 

 

事实上在NIO网络编程中,写出数据的操作需要加入缓存才能保证效率,目的是为了写操作发生的时候不影响业务继续send消息,首先将send消息发送过来的数据缓存到A中,在写事件发生的时候将A中数据写出(此时仅短暂锁住A,将A中引用拿出,重新赋值新引用给A),这样写事件的处理过程和业务消息的send就可以高并发的进行。

 

转载于:https://www.cnblogs.com/wuxinliulei/p/4923148.html

你可能感兴趣的文章
网易2016研发工程师编程题:扫描透镜
查看>>
今天晚上 中国互联网被Struts2漏洞血洗
查看>>
升级linux bash
查看>>
软件架构师的12项修炼
查看>>
[改善Java代码]不要在构造函数中抛出异常
查看>>
PLC M8000 M8001 M8002 M8003
查看>>
Xamarin.Android提示找不到mono.Android.Support.v4
查看>>
在SWING里嵌入SWT的组件
查看>>
Target runtime Apache Tomcat v7.0 is not defined.
查看>>
用RSA加密实现Web登录密码加密传输
查看>>
Java垃圾回收机制
查看>>
CSS-禁用a标签
查看>>
easyui select 下拉框的取值和赋值
查看>>
Android: TextView 及其子类通过代码和 XML 设置字体大小的存在差异的分析
查看>>
ajax与HTML5 history pushState/replaceState实例
查看>>
cpython和lua源码阅读
查看>>
Zabbix如何设置脚本告警
查看>>
Java EE的未来
查看>>
如何开始学习以太坊及区块链,辍学程序员创办以太坊
查看>>
spark读取gz文件
查看>>