page contents

Java Socket实用样例教程

本文讲述了Java Socket实用样例教程!具有很好的参考价值,希望对大家有所帮助。一起跟随六星小编过来看看吧,具体如下:

attachments-2023-07-VtDltAUn64bf23923ac2f.png本文讲述了Java Socket实用样例教程!具有很好的参考价值,希望对大家有所帮助。一起跟随六星小编过来看看吧,具体如下:

最近用到了Java的Socket通信。
在搜索用法的过程中,发现好多都是用System.in举例的;但是实际工作中,并不是用System.in从控制台输入消息的,不太好用。
以下是个人从工作中总结的Socket实用代码样例。
二、服务器端(ServerSocket)
服务器端等待Socket连接的建立,接收Socket请求报文,处理后给Socket客户端返回响应报文。
1.SocketThread类,主要处理类,用来接收Socket请求报文、处理、返回响应报文。

@Slf4j
public class SocketThread(){
  private Socket socket;
  private InputStream inputStream;
  private OutputStream outputStream;
  
  public SocketThread(Socket socket){
    this.socket = socket;
  }
  //第一个方法,用来循环监听inputStream
  public void preStart() throws IOException, ClassNotFoundException, InterruptedExceptedException {
    while(true){
      //处理
      start();
      
      //服务器不关闭socket,客户端关闭即可
      //socket.close();
      
      if(socket == null || socket.isClosed()){
        log.info("socket为null或被关闭,停止循环");
        break;
      }
    }
  }
  
  //第二个方法,如果有消息就处理;如果没有消息,就sleep,然后返回
  private void start() throws IOException, ClassNotFoundException, InterruptedException{
    if( null == inputStream || socket.isInputShutdown() ){
      inputStream = socket.getInputStream();
    }
    int available = inputStream.available();
    //如果输入流不可用,说明本次循环没有收到消息,返回
    if(0 == available){
      Thread.sleep(500);
      return;
    }
    if(null == outputStream || socket.isOutputShutdown()){
      outputStream = socket.getOutputStream();
    }
    //得到消息
    JSONObject receive = receive(inputStream);
    //进行处理
    boolean deal = deal(receive);
    //返回响应报文
    backMsg(outputStream, deal, receive);
  }
  
  //从流中获取消息,封装成JSONObject对象
  private JSONObject receive(InputStream input){
    //消息格式为字节数组,type(1位)+datalength(4位)+data(datalength中指明的位数)
    byte t[] = new byte[1];
    byte dl[] = new byte[4];
    //从流中读取1位,保存到t中
    input.read(t, 0, 1);
    char type = (char)t[0];
    
    //从流中读取4位
    input.read(dl, 0, 4);
    ByteBuffer dlB = ByteBuffer.wrap(dl);
    int datalength = dlb.getInt();
    
    //准备获得data
    byte db = new byte[datalength];
    input.read(db,0,datalength);
    String dataStr = new String(db, "utf-8");
    JSONObject data = JSONObject.parseObject(dataStr);
    
    //转为JSONObject对象,并返回
    JSONObject json = new JSONObject();
    json.put("type",type);
    json.put("datalength",datalength);
    json.put("data",data);
    return json;
  }
  
  //处理逻辑,主要内容省略
  private boolean deal(JSONObject json){
     //标志位,如果处理成功,再改为true
     boolean bool = false;
     //省略处理逻辑......
     return bool;
  }
  
  //返回响应报文
  private void backMsg(OutputStream outputStream, boolean bool, JSONObject receive){
    JSONObject backJson = new JSONObject();
    if(bool){
      backJson.put("status","1");
    }else{
      backJson.put("status","-1");
    }
    backJson.put("dataId",receive.get("data").get("id"));
    
    byte[] backJsonByte = backJson.toString().getBytes();
    //规定,返回的byte数组格式为,json长度(int格式,占4位)+json的byte数组
    int totalLength = 4 + backJsonByte.length;
    BtyeBuffer buffer = ByteBuffer.allocate(totalLength);
    buffer.putInt(backJsonByte.length);
    buffer.put(backJsonByte);
    outputStream.write(buffer.array());
    outputStream.flush();
    
  }
  
}
2.SocketStart类,用于等待Socket连接建立,用到了线程池

@Slf4j
public class SocketStart {
  private ExecutorService executorService;
  @PreDestroy
  public void destroy() throws InterruptedException{
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.DAYS);
  }
  
  public SocketStart() throws IOException{
    executorService = new ThreadPoolExecutor(100,100,1,TimeUnit.MILLISECONDS,
    new SynchronousQueue<>(),
    new ThreadFactoryBuilder().setNameFormat("SocketStart-pool-%s-thread").build(),
    new ThreadPoolExecutor.CallerRunsPolicy());
    
    ServerSocket ss = new ServerSocket(9999);
    while(true){
      Socket accept = ss.accept();
      log.info("ServerSocket收到请求");
      CompletableFuture.runAsync(() -> {
        try{
          SocketThread socketThread = new SocketThread(accept);
          socketThread.preStart();
        }catch(Exception e1){
          log.error("ServerSocket报错",e1);
          try{
            //如果报错,说明这个socket连接无法继续处理了,关闭(后续客户端可以重新建立一个socket连接)
            accept.close();
          }catch(Exception e2){
            log.error("报错后,关闭socket失败");
          }
        }
      }, executorService);
    }
  }
}
3.Application.java,springboot项目的启动类,也启动ServerSocket监听

@SpringBootApplication
@CompomentScan(basePackages = "com")
public class Application {
  public static void main(final String[] args){
    SpringApplication s = new SpringApplication(Application.class);
    s.run(args);
    
    //启动ServerSocket
    new Thread(()->{
      try{
        //这个new方法执行不完,因为其中一直在等待Socket连接
        //如果感觉不合适,就自己拆成2个方法
        new SocketStart();
      }catch(Exception e){
        e.printStackTrace();
      }
    }).start();
  }
}
三、客户端(Socket)
1.SocketSend类,主要类,用来发送Socket请求,接收socket响应

@Slf4j
@Component
public class SocketSend{
    private static Socket socket = null;
    //允许同时开启的循环个数
    private static int maxLoopCount = 3;
    //当前已有的循环个数
    private static int nowLoopCount = 0;
    //等待发送消息的队列
    private static List<JSONObject> waitList = Collections.synchronizedList(new ArrayList<>());
    //消息发送后,保存到oldMap里
    private static ConcurrentHashMap<Long, JSONObject> oldMap = new ConcurrentHashMap<>();
    //如果消息发送失败,保存重试次数的map;如果还有重试次数,就从oldMap里拿到旧消息重发
    private static ConcurrentHashMap<Long, Integer> retryMap = new ConcurrentHashMap<>();
    //发送socket消息的方法
    public void send(JSONObject json) {
        Assert.notNull(json);
        //放入等待队列
        synchronized(waitList){
            waitList.add(json);
        }
        //使用异步方法,进行后续处理
        deal();
    }
    //异步方法,实际处理如何发送消息(异步不阻塞)
    @Async
    private void deal() {
        //判断是否可以再开启一个循环
        synchronized("nowLoopCount"){
            if(nowLoopCount < maxLoopCount){
                nowLoopCount++;
            }else{
                //否则就不开启循环,直接返回(等待已开启的循环去处理)
                return;
            }
        }
        //建立socket连接
        linkSocket();
        OutputStream output = null;
        InputStream input = null;
        try{
            Assert.isTrue(socket != null && socket.isConnected() && !socket.isClosed());
            input = socket.getInputStream();
            output = socket.getOutputStream();
            //无限循环,处理waitList中的待发送消息,并处理接收到的消息
            while (true) {
                //发送了几条消息
                int sendCount = 0;
                //收到了几条消息
                int readCount = 0;
                sendCount = dealSend(output);
                readCount = dealRead(input);
                if(sendCount == 0 && readCount ==0){
                    Thread.sleep(500);
                }
            }
        }catch (Exception e){
            log.error("deal方法报错!",e);
        }finally {
            IOUtils.closeQuietly(input);
            IOUtils.closeQuietly(output);
            IOUtils.closeQuietly(socket);
            synchronized ("nowLoopCount"){
                nowLoopCount = nowLoopCount >=1 ? nowLoopCount - 1 : 0;
            }
            //再尝试创建一个循环;如果超过最大允许个数,会执行return;如果没有超过,就成功创建了一个循环
            deal();
        }
    }
    //建立socket连接
    private void linkSocket() {
        //判断socket对象是否还存在
        if(null == socket || !socket.isConnected() || socket.isClosed()){
            //关闭旧的socket
            if(null != socket){
                IOUtils.closeQuietly(socket);
                socket = null;
            }
            //获取socket连接的重试次数
            int maxSocketRetryCount = 3;
            //当前已重试次数
            int nowSocketRetryCount = 0;
            //是否成功
            boolean isSuccess = false;
            while(!isSuccess){
                if(nowSocketRetryCount > maxSocketRetryCount){
                    break;
                }else{
                    nowSocketRetryCount++;
                    String url = "10.123.123.123";
                    int port = 9999;
                    int timeOut = 600000;
                    try{
                        socket = new Socket(url, port);
                        socket.setSoTimeout(timeOut);
                        isSuccess = true;
                        log.info("建立socket连接成功");
                    }catch(Exception eSocket){
                        log.error("初始化socket对象失败",eSocket);
                        socket = null;
                    }
                }
            }
        }
    }
    private int dealSend(OutputStream output) throws IOException {
        if(output == null || socket.isOutputShutdown()){
            output = socket.getOutputStream();
        }
        int count = 0;
        //将队列中的消息通过socket发给服务器端
        synchronized (waitList){
            count = waitList.size();
            for(int i=0; i<count; i++){
                JSONObject json = waitList.get(i);
                byte[] jsonByte = json.toString().getBytes();
                int totalLength = 1 + 4 + jsonByte.length;
                BtyeBuffer buffer = ByteBuffer.allocate(totalLength);
                //request的r,表示类型为请求(自己规定的)
                char c = 'R';
                //type(1位)
                buffer.put((byte)backJsonByte.length);
                //datalength(4位)
                buffer.putInt(jsonByte.length);
                //data(datalength中指明的位数)
                buffer.put(jsonByte);
                //发送给服务器的消息,格式为字节数组,type(1位)+datalength(4位)+data(datalength中指明的位数)
                output.write(buffer.array());
                log.info("客户端发送给服务器json:"+json.toString());
                //放入旧消息map
                oldMap.put(json.get("data").get("id"), json);
            }
            output.flush();
            waitList.clear();
        }
        return count;
    }
    private int dealRead(InputStream input) throws IOException {
        //如果没有旧消息,说明没有给服务器发送消息,服务器也就不会有返回信息了,直接返回
        if(oldMap.isEmpty()){
            return 0;
        }
        //用来保存服务器返回的消息
        List<JSONObject> list = new ArrayList<>();
        //消息数量
        int count = 0;
        //这个是从服务器输入流中读取信息的循环,当流不可用或读完后就会break
        while (true){
            if(input == null || socket.isInputShutdown()){
                input = socket.getInputStream();
            }
            int available = input.available();
            //如果流中没有数据了,可能是全部读取完了,也可能是本来就没有
            if(available == 0){
                break;
            }else{
                byte[] datalenByte = new byte[4];
                //从流中读取;规定消息格式为:dataLength(int,4位)+data
                input.read(datalenByte, 0, 4);
                Assert.notNull(datalenByte);
                ByteBuffer byteBuffer = ByteBuffer.wrap(datalenByte);
                //获取data总长度
                int datalen = byteBuffer.getInt(0);
                byte[] dataByte = new byte[datalen];
                //读取
                input.read(dataByte,0,datalen);
                //将消息转为json
                JSONObejct backJson = JSONObject.parseObject(new String(dataByte,"utf-8"));
                list.add(backJson);
                count++;
            }
            //如果为空,直接返回
            if(CollectionUtils.isEmpty(list)){
                return count;
            }
            //处理服务器返回的消息
            for(JSONObject json : list){
                //json与上方对应
                String status = json.get("status");
                String dataId = json.get("dataId");
                //如果服务器返回成功
                if("1".equals(status)){
                    //说明不需要重新发送了,就从这2个map中移除
                    oldMap.remove(dataId);
                    retryMap.remove(dataId);
                }else{
                    log.error("服务器返回失败提示,dataId为:"+dataId);
                    JSONObject oldJson = oldMap.get(dataId);
                    if(oldJson == null){
                        log.error("旧消息中没有dataId为【"+dataId+"】的json");
                        continue;
                    }
                    int retryCount = retryMap.contains(dataId) ? retryMap.get(dataId) : 0;
                    //最多重发3次
                    if(retryCount > 3){
                        log.error("重发超过3次,停止重发该消息。dataId【"+dataId+"】,json【"+oldMap.get(dataId)+"】");
                        oldMap.remove(dataId);
                        retryMap.remove(dataId);
                    }else{
                        log.info("重发dataId为【"+dataId+"】的消息");
                        waitList.add(oldMap.get(dataId));
                        retryMap.put(dataId, ++retryCount);
                    }
                }
            }
        }
        return count;
    }
}
2.XXXServiceImpl.java,用来调用send方法,发送socket消息

@Service
public class XXXServiceImpl{
  @Autowired
  SocketSend socketSend;
 
  public void xxxSend(String id){
    //json样例
    JSONObject json = new JSONObject();
    JSONObject data = new JSONObject();
    //关键是有这个id就行,其它根据需求自己写
    //这个id是json消息的id,找一个生成不重复id的方法即可
    data.put("id",id);
    data.put("msg","aaaaaaaaaaaaaaaaaaaaaaa");
    json.put("data",data);
    socketSend.send(json);
  }
}
四、要点总结
1.ServerSocket(服务器端)
(1)ServerSocket中,有一个while-true循环,监听socket连接的建立;每建立一个socket连接,就交给线程后续处理,然后继续监听下一个socket连接。

(2)后续处理线程中,也有一个while-true循环,核心语句是int available = inputStream.available(),当这个值为0时,说明现在没有客户端发来的消息,然后就sleep之后开始下一次循环;当不为0时,就从流中读取客户端发来的消息并处理,处理完后继续循环监听inputStream流中是否有消息。

2.socket(客户端)
(1)客户端维护了1个等待队列waitList,当有需要发送给服务器端的消息时,就放入该队列。

(2)调用客户端send方法后,会生成多个线程(@Async),执行while-true的循环(有最大个数限制);这些循环中,会判断waitList中是否待发送信息,如果有,就取出来并发送给服务器;同样也有int available = inputStream.available(),判断是否有服务器返回的消息,有则处理。

(3)循环中,首次会建立socket连接;如果不出意外,会一直保持socket连接(因为客户端经常需要给服务器发送消息,所以就不关闭了);如果遇到异常,就尝试关闭连接后重新建立socket连接。

(4)客户端还有2个map,oldMap与retryMap,用来保存发送过的旧信息,当收到服务器返回的处理成功的消息后,就从map中移除旧信息;当收到服务器返回的处理失败的信息后,就尝试重新发送信息,直到服务器返回处理成功,或者超过最大重试次数。

3.Socket流间的数据交互
(1)输入流与输出流之间是使用byte数组传输信息的,因此需要自己规定byte数组的格式(共多少位、哪些位表示什么含义),可以使用ByteBuffer读取与写入字节数组信息。

(2)写入outputStream流时记得使用flush()方法;读取inputStream流时可以使用inputStream.available()判断是否存在信息、是否读取完毕。

更多相关技术内容咨询欢迎前往并持续关注六星社区了解详情。

想高效系统的学习Java编程语言,推荐大家关注一个微信公众号:Java圈子。每天分享行业资讯、技术干货供大家阅读,关注即可免费领取整套Java入门到进阶的学习资料以及教程,感兴趣的小伙伴赶紧行动起来吧。

attachments-2023-03-2AoKIjPQ64014b4ad30a3.jpg

  • 发表于 2023-07-25 09:21
  • 阅读 ( 180 )
  • 分类:Java开发

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
小柒
小柒

1474 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1474 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章