Netty Socket.IO 实时通讯

前言

简单介绍一下 WebSocket & Socket.IO

客户端服务器之间的实时数据传输是一个很重要的需求,最早只能通过 HTTP 轮询方式实现。在 WebSocket 标准没有推出之前,HTTP 轮询是一种可行的方案。

Http 轮询

轮询:在特定的时间间隔(如每秒),由客户端对服务器发出 HTTP 请求,然后由服务器返回最新的数据给客户端。缺点:客户端需要不断的向服务器发出请求,然而 HTTP 请求的 Header 非常长会占用很多带宽。

长轮询:客户端发送请求之后,服务器端会阻塞请求直到有数据传递或超时才返回。缺点:阻塞请求会造成服务器资源浪费,连接数有限。

WebSocket

WebScoket 是一种让客户端和服务器之间能进行双向实时通信的技术。它是 HTML 最新标准 HTML5 的一个协议规范。

WebSocket与HTTP、TCP的关系

WebSocket 与 HTTP 协议一样都是基于 TCP 的,所以它们都是可靠的协议。WebSocket 和 HTTP 协议样都属于应用层协议,WebSocket 在建立握手连接时,数据是通过HTTP协议传输的。但在建立连接之后,真正的数据传输阶段是不需要 HTTP 参与。

avatar

Socket.IO

Socket.IO 实际上是 WebSocket 的父集,Socket.IO 封装了 WebSocket轮询等方法,会根据情况选择方法来进行通讯。

Springboot 集成 Netty Socket.IO

引入 Jar 包

1
2
3
4
5
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.17</version>
</dependency>

编写代码

简单介绍一下 namespace & room & event 三者关系:一个 namespace 中可以有多个 room,一个room 中可以有多个 event

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package cn.sonicshield.socketio.config;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.SpringAnnotationScanner;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class NettySocketConfig {
private static final String TOKEN = "TOKEN";
// 监听端口
private static final Integer SOCKET_PORT = 9099;
// Ping消息间隔(毫秒),默认25000。客户端向服务器发送一条心跳消息间隔
private static final Integer PING_INTERVAL = 60000;
// Ping消息超时时间(毫秒),默认60000,这个时间间隔内没有接收到心跳消息就会发送超时事件
private static final Integer PING_TIMEOUT = 180000;
// 协议升级超时时间(毫秒),默认10000。HTTP握手升级为ws协议超时时间
private static final Integer UPGRADE_TIMEOUT = 10000;
@Bean
public SocketIOServer socketIOServer() {
/*
* 创建Socket,并设置监听端口
*/
com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
// 设置主机名,默认是0.0.0.0
// config.setHostname("localhost");
config.setPort(SOCKET_PORT);
config.setUpgradeTimeout(UPGRADE_TIMEOUT);
config.setPingInterval(PING_INTERVAL);
config.setPingTimeout(PING_TIMEOUT);
// 握手协议参数使用JWT的Token认证方案
config.setAuthorizationListener(data -> {
String token = data.getSingleUrlParam(TOKEN);
return !Strings.isNullOrEmpty(token);
});
return new SocketIOServer(config);
}
@Bean
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
return new SpringAnnotationScanner(socketServer);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package cn.sonicshield.socketio.model;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Message implements Serializable {
private String message;
private String userName;
private Long ts;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package cn.sonicshield.socketio.handler;
import cn.sonicshield.socketio.model.Message;
import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MessageHandler {
private final SocketIOServer server;
public MessageHandler(SocketIOServer server) {
this.server = server;
}
//添加 connect 事件,当客户端发起连接时调用
@OnConnect
public void onConnect(SocketIOClient client) {
log.info("onConnect > namespace={}, sessionId={}", client.getNamespace().getName(), client.getSessionId().toString());
}
//添加 disconnect 事件,客户端断开连接时调用
@OnDisconnect
public void onDisconnect(SocketIOClient client) {
client.disconnect();
log.info("onDisconnect > namespace={}, sessionId={}", client.getNamespace().getName(), client.getSessionId().toString());
}
// 自定义 message 事件
@OnEvent(value = "message")
public void onEvent(SocketIOClient client, AckRequest ackRequest, Message message) {
log.info("接收到客户端 namespace message = {}", client.getNamespace().getName(), message);
if (ackRequest.isAckRequested()) {
ackRequest.sendAckData(Message.builder().message("Hello World").ts(System.currentTimeMillis()).build());
}
client.sendEvent("message", message);
}
/**
* How to use the protobuf protocol:
* https://github.com/mrniko/netty-socketio/issues/497
*/
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package cn.sonicshield.socketio.common;
import cn.sonicshield.socketio.handler.MessageHandler;
import com.corundumstudio.socketio.SocketIONamespace;
import com.corundumstudio.socketio.SocketIOServer;
import com.google.common.collect.Lists;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
@Component
@Order(1)
@Slf4j
public class ServerRunner implements CommandLineRunner {
private List<String> namespaceList = Lists.newArrayList("/message", "/test");
private final SocketIOServer server;
private final SocketIONamespace messageSocketNameSpace;
private final SocketIONamespace testSocketNameSpace;
@Autowired
private ServerRunner(SocketIOServer server) {
this.server = server;
messageSocketNameSpace = server.addNamespace(namespaceList.get(0));
testSocketNameSpace = server.addNamespace(namespaceList.get(1));
}
@Bean(name = "messageNamespace")
public SocketIONamespace messageNameSpace() {
messageSocketNameSpace.addListeners(new MessageHandler(server));
return messageSocketNameSpace;
}
@Bean(name = "testNamespace")
public SocketIONamespace testSpace() {
testSocketNameSpace.addListeners(new MessageHandler(server));
return testSocketNameSpace;
}
@Override
public void run(String... args) {
log.info("#############################");
log.info("# #");
log.info("# ServerRunner 开始启动... #");
log.info("# #");
log.info("#############################");
server.start();
}
}

📁 项目源代码

----本文结束 感谢您的阅读----