本文讲述了Java Socket实用样例教程!具有很好的参考价值,希望对大家有所帮助。一起跟随六星小编过来看看吧,具体如下:
最近用到了Java的Socket通信。
在搜索用法的过程中,发现好多都是用System.in举例的;但是实际工作中,并不是用System.in从控制台输入消息的,不太好用。
以下是个人从工作中总结的Socket实用代码样例。
二、服务器端(ServerSocket)
服务器端等待Socket连接的建立,接收Socket请求报文,处理后给Socket客户端返回响应报文。
1.SocketThread类,主要处理类,用来接收Socket请求报文、处理、返回响应报文。
@Slf4j
public class SocketThread(){
private Socket socket;
private InputStream inputStream;
private OutputStream outputStream;
public SocketThread(Socket socket){
this.socket = socket;
}
//第一个方法,用来循环监听inputStream
public void preStart() throws IOException, ClassNotFoundException, InterruptedExceptedException {
while(true){
//处理
start();
//服务器不关闭socket,客户端关闭即可
//socket.close();
if(socket == null || socket.isClosed()){
log.info("socket为null或被关闭,停止循环");
break;
}
}
}
//第二个方法,如果有消息就处理;如果没有消息,就sleep,然后返回
private void start() throws IOException, ClassNotFoundException, InterruptedException{
if( null == inputStream || socket.isInputShutdown() ){
inputStream = socket.getInputStream();
}
int available = inputStream.available();
//如果输入流不可用,说明本次循环没有收到消息,返回
if(0 == available){
Thread.sleep(500);
return;
}
if(null == outputStream || socket.isOutputShutdown()){
outputStream = socket.getOutputStream();
}
//得到消息
JSONObject receive = receive(inputStream);
//进行处理
boolean deal = deal(receive);
//返回响应报文
backMsg(outputStream, deal, receive);
}
//从流中获取消息,封装成JSONObject对象
private JSONObject receive(InputStream input){
//消息格式为字节数组,type(1位)+datalength(4位)+data(datalength中指明的位数)
byte t[] = new byte[1];
byte dl[] = new byte[4];
//从流中读取1位,保存到t中
input.read(t, 0, 1);
char type = (char)t[0];
//从流中读取4位
input.read(dl, 0, 4);
ByteBuffer dlB = ByteBuffer.wrap(dl);
int datalength = dlb.getInt();
//准备获得data
byte db = new byte[datalength];
input.read(db,0,datalength);
String dataStr = new String(db, "utf-8");
JSONObject data = JSONObject.parseObject(dataStr);
//转为JSONObject对象,并返回
JSONObject json = new JSONObject();
json.put("type",type);
json.put("datalength",datalength);
json.put("data",data);
return json;
}
//处理逻辑,主要内容省略
private boolean deal(JSONObject json){
//标志位,如果处理成功,再改为true
boolean bool = false;
//省略处理逻辑......
return bool;
}
//返回响应报文
private void backMsg(OutputStream outputStream, boolean bool, JSONObject receive){
JSONObject backJson = new JSONObject();
if(bool){
backJson.put("status","1");
}else{
backJson.put("status","-1");
}
backJson.put("dataId",receive.get("data").get("id"));
byte[] backJsonByte = backJson.toString().getBytes();
//规定,返回的byte数组格式为,json长度(int格式,占4位)+json的byte数组
int totalLength = 4 + backJsonByte.length;
BtyeBuffer buffer = ByteBuffer.allocate(totalLength);
buffer.putInt(backJsonByte.length);
buffer.put(backJsonByte);
outputStream.write(buffer.array());
outputStream.flush();
}
}
2.SocketStart类,用于等待Socket连接建立,用到了线程池
@Slf4j
public class SocketStart {
private ExecutorService executorService;
@PreDestroy
public void destroy() throws InterruptedException{
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
}
public SocketStart() throws IOException{
executorService = new ThreadPoolExecutor(100,100,1,TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat("SocketStart-pool-%s-thread").build(),
new ThreadPoolExecutor.CallerRunsPolicy());
ServerSocket ss = new ServerSocket(9999);
while(true){
Socket accept = ss.accept();
log.info("ServerSocket收到请求");
CompletableFuture.runAsync(() -> {
try{
SocketThread socketThread = new SocketThread(accept);
socketThread.preStart();
}catch(Exception e1){
log.error("ServerSocket报错",e1);
try{
//如果报错,说明这个socket连接无法继续处理了,关闭(后续客户端可以重新建立一个socket连接)
accept.close();
}catch(Exception e2){
log.error("报错后,关闭socket失败");
}
}
}, executorService);
}
}
}
3.Application.java,springboot项目的启动类,也启动ServerSocket监听
@SpringBootApplication
@CompomentScan(basePackages = "com")
public class Application {
public static void main(final String[] args){
SpringApplication s = new SpringApplication(Application.class);
s.run(args);
//启动ServerSocket
new Thread(()->{
try{
//这个new方法执行不完,因为其中一直在等待Socket连接
//如果感觉不合适,就自己拆成2个方法
new SocketStart();
}catch(Exception e){
e.printStackTrace();
}
}).start();
}
}
三、客户端(Socket)
1.SocketSend类,主要类,用来发送Socket请求,接收socket响应
@Slf4j
@Component
public class SocketSend{
private static Socket socket = null;
//允许同时开启的循环个数
private static int maxLoopCount = 3;
//当前已有的循环个数
private static int nowLoopCount = 0;
//等待发送消息的队列
private static List<JSONObject> waitList = Collections.synchronizedList(new ArrayList<>());
//消息发送后,保存到oldMap里
private static ConcurrentHashMap<Long, JSONObject> oldMap = new ConcurrentHashMap<>();
//如果消息发送失败,保存重试次数的map;如果还有重试次数,就从oldMap里拿到旧消息重发
private static ConcurrentHashMap<Long, Integer> retryMap = new ConcurrentHashMap<>();
//发送socket消息的方法
public void send(JSONObject json) {
Assert.notNull(json);
//放入等待队列
synchronized(waitList){
waitList.add(json);
}
//使用异步方法,进行后续处理
deal();
}
//异步方法,实际处理如何发送消息(异步不阻塞)
@Async
private void deal() {
//判断是否可以再开启一个循环
synchronized("nowLoopCount"){
if(nowLoopCount < maxLoopCount){
nowLoopCount++;
}else{
//否则就不开启循环,直接返回(等待已开启的循环去处理)
return;
}
}
//建立socket连接
linkSocket();
OutputStream output = null;
InputStream input = null;
try{
Assert.isTrue(socket != null && socket.isConnected() && !socket.isClosed());
input = socket.getInputStream();
output = socket.getOutputStream();
//无限循环,处理waitList中的待发送消息,并处理接收到的消息
while (true) {
//发送了几条消息
int sendCount = 0;
//收到了几条消息
int readCount = 0;
sendCount = dealSend(output);
readCount = dealRead(input);
if(sendCount == 0 && readCount ==0){
Thread.sleep(500);
}
}
}catch (Exception e){
log.error("deal方法报错!",e);
}finally {
IOUtils.closeQuietly(input);
IOUtils.closeQuietly(output);
IOUtils.closeQuietly(socket);
synchronized ("nowLoopCount"){
nowLoopCount = nowLoopCount >=1 ? nowLoopCount - 1 : 0;
}
//再尝试创建一个循环;如果超过最大允许个数,会执行return;如果没有超过,就成功创建了一个循环
deal();
}
}
//建立socket连接
private void linkSocket() {
//判断socket对象是否还存在
if(null == socket || !socket.isConnected() || socket.isClosed()){
//关闭旧的socket
if(null != socket){
IOUtils.closeQuietly(socket);
socket = null;
}
//获取socket连接的重试次数
int maxSocketRetryCount = 3;
//当前已重试次数
int nowSocketRetryCount = 0;
//是否成功
boolean isSuccess = false;
while(!isSuccess){
if(nowSocketRetryCount > maxSocketRetryCount){
break;
}else{
nowSocketRetryCount++;
String url = "10.123.123.123";
int port = 9999;
int timeOut = 600000;
try{
socket = new Socket(url, port);
socket.setSoTimeout(timeOut);
isSuccess = true;
log.info("建立socket连接成功");
}catch(Exception eSocket){
log.error("初始化socket对象失败",eSocket);
socket = null;
}
}
}
}
}
private int dealSend(OutputStream output) throws IOException {
if(output == null || socket.isOutputShutdown()){
output = socket.getOutputStream();
}
int count = 0;
//将队列中的消息通过socket发给服务器端
synchronized (waitList){
count = waitList.size();
for(int i=0; i<count; i++){
JSONObject json = waitList.get(i);
byte[] jsonByte = json.toString().getBytes();
int totalLength = 1 + 4 + jsonByte.length;
BtyeBuffer buffer = ByteBuffer.allocate(totalLength);
//request的r,表示类型为请求(自己规定的)
char c = 'R';
//type(1位)
buffer.put((byte)backJsonByte.length);
//datalength(4位)
buffer.putInt(jsonByte.length);
//data(datalength中指明的位数)
buffer.put(jsonByte);
//发送给服务器的消息,格式为字节数组,type(1位)+datalength(4位)+data(datalength中指明的位数)
output.write(buffer.array());
log.info("客户端发送给服务器json:"+json.toString());
//放入旧消息map
oldMap.put(json.get("data").get("id"), json);
}
output.flush();
waitList.clear();
}
return count;
}
private int dealRead(InputStream input) throws IOException {
//如果没有旧消息,说明没有给服务器发送消息,服务器也就不会有返回信息了,直接返回
if(oldMap.isEmpty()){
return 0;
}
//用来保存服务器返回的消息
List<JSONObject> list = new ArrayList<>();
//消息数量
int count = 0;
//这个是从服务器输入流中读取信息的循环,当流不可用或读完后就会break
while (true){
if(input == null || socket.isInputShutdown()){
input = socket.getInputStream();
}
int available = input.available();
//如果流中没有数据了,可能是全部读取完了,也可能是本来就没有
if(available == 0){
break;
}else{
byte[] datalenByte = new byte[4];
//从流中读取;规定消息格式为:dataLength(int,4位)+data
input.read(datalenByte, 0, 4);
Assert.notNull(datalenByte);
ByteBuffer byteBuffer = ByteBuffer.wrap(datalenByte);
//获取data总长度
int datalen = byteBuffer.getInt(0);
byte[] dataByte = new byte[datalen];
//读取
input.read(dataByte,0,datalen);
//将消息转为json
JSONObejct backJson = JSONObject.parseObject(new String(dataByte,"utf-8"));
list.add(backJson);
count++;
}
//如果为空,直接返回
if(CollectionUtils.isEmpty(list)){
return count;
}
//处理服务器返回的消息
for(JSONObject json : list){
//json与上方对应
String status = json.get("status");
String dataId = json.get("dataId");
//如果服务器返回成功
if("1".equals(status)){
//说明不需要重新发送了,就从这2个map中移除
oldMap.remove(dataId);
retryMap.remove(dataId);
}else{
log.error("服务器返回失败提示,dataId为:"+dataId);
JSONObject oldJson = oldMap.get(dataId);
if(oldJson == null){
log.error("旧消息中没有dataId为【"+dataId+"】的json");
continue;
}
int retryCount = retryMap.contains(dataId) ? retryMap.get(dataId) : 0;
//最多重发3次
if(retryCount > 3){
log.error("重发超过3次,停止重发该消息。dataId【"+dataId+"】,json【"+oldMap.get(dataId)+"】");
oldMap.remove(dataId);
retryMap.remove(dataId);
}else{
log.info("重发dataId为【"+dataId+"】的消息");
waitList.add(oldMap.get(dataId));
retryMap.put(dataId, ++retryCount);
}
}
}
}
return count;
}
}
2.XXXServiceImpl.java,用来调用send方法,发送socket消息
@Service
public class XXXServiceImpl{
@Autowired
SocketSend socketSend;
public void xxxSend(String id){
//json样例
JSONObject json = new JSONObject();
JSONObject data = new JSONObject();
//关键是有这个id就行,其它根据需求自己写
//这个id是json消息的id,找一个生成不重复id的方法即可
data.put("id",id);
data.put("msg","aaaaaaaaaaaaaaaaaaaaaaa");
json.put("data",data);
socketSend.send(json);
}
}
四、要点总结
1.ServerSocket(服务器端)
(1)ServerSocket中,有一个while-true循环,监听socket连接的建立;每建立一个socket连接,就交给线程后续处理,然后继续监听下一个socket连接。
(2)后续处理线程中,也有一个while-true循环,核心语句是int available = inputStream.available(),当这个值为0时,说明现在没有客户端发来的消息,然后就sleep之后开始下一次循环;当不为0时,就从流中读取客户端发来的消息并处理,处理完后继续循环监听inputStream流中是否有消息。
2.socket(客户端)
(1)客户端维护了1个等待队列waitList,当有需要发送给服务器端的消息时,就放入该队列。
(2)调用客户端send方法后,会生成多个线程(@Async),执行while-true的循环(有最大个数限制);这些循环中,会判断waitList中是否待发送信息,如果有,就取出来并发送给服务器;同样也有int available = inputStream.available(),判断是否有服务器返回的消息,有则处理。
(3)循环中,首次会建立socket连接;如果不出意外,会一直保持socket连接(因为客户端经常需要给服务器发送消息,所以就不关闭了);如果遇到异常,就尝试关闭连接后重新建立socket连接。
(4)客户端还有2个map,oldMap与retryMap,用来保存发送过的旧信息,当收到服务器返回的处理成功的消息后,就从map中移除旧信息;当收到服务器返回的处理失败的信息后,就尝试重新发送信息,直到服务器返回处理成功,或者超过最大重试次数。
3.Socket流间的数据交互
(1)输入流与输出流之间是使用byte数组传输信息的,因此需要自己规定byte数组的格式(共多少位、哪些位表示什么含义),可以使用ByteBuffer读取与写入字节数组信息。
(2)写入outputStream流时记得使用flush()方法;读取inputStream流时可以使用inputStream.available()判断是否存在信息、是否读取完毕。
更多相关技术内容咨询欢迎前往并持续关注六星社区了解详情。
想高效系统的学习Java编程语言,推荐大家关注一个微信公众号:Java圈子。每天分享行业资讯、技术干货供大家阅读,关注即可免费领取整套Java入门到进阶的学习资料以及教程,感兴趣的小伙伴赶紧行动起来吧。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!