1.1 什麼是粘包拆包
例如:發送 ABC, DEF兩個報文
- 收到ABCDEF一個報文,發生了粘包
- 收到AB,C,DEF三個報文,ABC發生了拆包
- 收到AB,CD,EF三個報文,即發生了拆包又發生了粘包
1.2 看一個粘包半包樣例
- 客户端每次把消息“ABC,DEF,GHI,JKL,MNO\n" 發生一百次給服務端
- 服務端將每次收到的消息輸出,並記錄收到的次數,然後將消息返回客户端
我們看下面服務端輸出的結果:
- 服務端一共接收到兩次消息,説明消息被合併了,發生了粘包
- 第一次輸出的消息最後一行只有"ABC,",這説明發生了拆包,一個完整的消息被拆分了才會出現這種情況
輸出結果:
Server Accept[ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,] and the counter is:1
Server Accept[DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
ABC,DEF,GHI,JKL,MNO
] and the counter is:2
代碼:
EchoClient.java
public class EchoClient {
private final int port;
private final String host;
public EchoClient(int port, String host) {
this.port = port;
this.host = host;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();/*線程組*/
try {
final Bootstrap b = new Bootstrap();;/*客户端啓動必須*/
b.group(group)/*將線程組傳入*/
.channel(NioSocketChannel.class)/*指定使用NIO進行網絡傳輸*/
.remoteAddress(new InetSocketAddress(host,port))/*配置要連接服務器的ip地址和端口*/
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoClient(9999,"127.0.0.1").start();
}
}
EchoClientHandler.java
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private AtomicInteger counter = new AtomicInteger(0);
/*** 客户端讀取到網絡數據後的處理*/
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
+"] and the counter is:"+counter.incrementAndGet());
}
/*** 客户端被通知channel活躍後,做事*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf msg = null;
String request = "ABC,DEF,GHI,JKL,MNO"
+ System.getProperty("line.separator");
//發送100次
for(int i=0;i<100;i++){
msg = Unpooled.buffer(request.length());
msg.writeBytes(request.getBytes());
ctx.writeAndFlush(msg);
}
}
/*** 發生異常後的處理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
EchoServer.java
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws InterruptedException {
EchoServer echoServer = new EchoServer(9999);
System.out.println("服務器即將啓動");
echoServer.start();
System.out.println("服務器關閉");
}
public void start() throws InterruptedException {
final EchoServerHandler serverHandler = new EchoServerHandler();
EventLoopGroup group = new NioEventLoopGroup();/*線程組*/
try {
ServerBootstrap b = new ServerBootstrap();/*服務端啓動必須*/
b.group(group)/*將線程組傳入*/
.channel(NioServerSocketChannel.class)/*指定使用NIO進行網絡傳輸*/
.localAddress(new InetSocketAddress(port))/*指定服務器監聽端口*/
/*服務端每接收到一個連接請求,就會新啓一個socket通信,也就是channel,
所以下面這段代碼的作用就是為這個子channel增加handle*/
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(serverHandler);/*添加到該子channel的pipeline的尾部*/
}
});
ChannelFuture f = b.bind().sync();/*異步綁定到服務器,sync()會阻塞直到完成*/
System.out.println("服務器啓動完成,等待客户端的連接和數據.....");
f.channel().closeFuture().sync();/*阻塞直到服務器的channel關閉*/
} finally {
group.shutdownGracefully().sync();/*優雅關閉線程組*/
}
}
}
EchoServerHandler.java
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger counter = new AtomicInteger(0);
/*** 服務端讀取到網絡數據後的處理*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;
String request = in.toString(CharsetUtil.UTF_8);
System.out.println("Server Accept["+request
+"] and the counter is:"+counter.incrementAndGet());
String resp = "Hello,"+request+". Welcome to Netty World!"
+ System.getProperty("line.separator");
ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
}
/*** 發生異常後的處理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
1.3 解決方法
TCP粘包/半包發生的原因:TCP是流式協議,消息無邊界,消息發送的節點為了加快轉發速度,會等到緩衝區滿再發送,那麼他可能會對消息拆包或者合併包滿足緩衝區再發送
解決問題的方法也很簡單:找出消息邊界,確定了消息的邊界,我們就能找到完整的消息
| 方式 | 確定消息邊界 | 優點 | 缺點 | 推薦 |
|---|---|---|---|---|
| TCP連接改成短連接 | 每個鏈接發送一個消息 | 簡單 | 效率低下 | 不推薦 |
| 固定長度 | 按長度取消息 | 簡單 | 空間浪費(消息太短需要填充) | 不推薦 |
| 分隔符 | 分隔符之間就是一個消息 | 空間不浪費 | 分隔符需要轉義 | 推薦 |
| 消息頭和消息體 | 根據消息頭確定一個消息 | 精確且不用轉義 | 實現相對複雜 | 推薦 |
下面我們依次看一下上面幾種方法如何實現:
1.4 短連接(不推薦)
每一次發送都創建一個新的連接,就可以避免粘包和拆包問題,但是這種方式效率低下,TCP三次握手四次揮手非常消耗性能,不推薦。
1.5 固定長度方式實現
客户端(服務端同樣處理)
實現ChannelInitializer類,添加下面兩行代碼new FixedLengthFrameDecoder(FixedLengthEchoServer.RESPONSE.length())表示固定長度消息
將其作為handler交給Bootstrap來處理消息
1.6 分割符
1.6.1 換行符作分隔符(客户端(服務端同樣處理)):
實現ChannelInitializer類,添加下面兩行代碼new LineBasedFrameDecoder(1024)表示固定長度消息,
同樣需要將ChannelInitializerImp交給Bootstrap來處理消息
1.6.2 自定義分隔符:
實現ChannelInitializer類,添加下面兩行代碼new DelimiterBasedFrameDecoder(1024, delimiter)表示固定長度消息,
同樣需要將ChannelInitializerImp交給Bootstrap來處理消息
1.7 消息頭和消息體
自己重新定義一種消息頭和消息體,根據消息頭來確定消息邊界,就可以知道是否發生粘包和拆包
1.8 總結:
Netty對粘包和拆包的處理進行了封裝,開發者使用很方便,比較推薦的實現方式是1.6 和1.7。
- 如果消息中沒有什麼特殊字符,可以採用1.6的方式,特殊字符當成分割符,實現簡單方便。
- 如果消息中不確定有哪些特殊字符,可以採用1.7的方式,但是實現複雜一點,相當於自己定義一個應用層協議。
備註:各種解決方案的代碼後續分享會出來