博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty入门教程:Netty拆包粘包技术讲解
阅读量:7042 次
发布时间:2019-06-28

本文共 13702 字,大约阅读时间需要 45 分钟。

Netty编解码技术是什么意思呢?所谓的编解码技术,说白了就是java序列化技术。序列化有两个目的:

1、进行网络传输

2、对象持久化

虽然我们可以使用java进行序列化,Netty去传输。但是java序列化的硬伤太多,比如java的序列化无法跨平台、序列化后码流太大、序列化性能非常低等等...

码流太大是什么意思呢?比如说原先的我一篇文档,比如说大小是1M,序列化完了之后可能0.5M,序列化减少二分之一的码,比较大。然后0.5M去网络传输这个不太好。你比如说用其它的一些主流序列化的话可能就0.01M,非常小。性能非常好。

性能太低就是说,我用java序列化的过程可能需要10s,而用其它的高性能序列化可能0.1s。差距就是这么的大。

序列化的目的无非就是网络传输。而目前主流的序列化框架有以下几种:

1、JBoss的Marshalling

2、Google的Protobuf
3、基于Protobuf的Kyro
4、MessagePack框架

其实我们主要是讲Marshalling和Google的Protobuf。这两个是业界非常好用的框架。其中JBoss的Marshalling速度还要比Google的Protobuf要快,原因是因为Marshalling不是跨语言,两端都是java与java之间相互传输的。因此,在这种情况下我们就用它就行了。但如果你想实现跨语言,比如这边是c#,另一边是java。这种跨语言进行通信传输的话,那你就需要用到Google的Protobuf来进行跨语言的传输。性能也非常高。而且它自己有一些大端小端的优化机制。

下面开始Marshalling编码实现。

首先新建一个java工程,导入netty和jboss-marshalling的jar包,导入几张图片到sources文件夹以便测试。

 

新建一个Req类,并编写相关代码

1 package com.it448.serial; 2  3 import java.io.Serializable; 4  5 public class Req implements Serializable{ 6     private static final long serialVersionUID = 1L; 7      8     private String id ; 9     private String name ;10     private String requestMessage ;11     private byte[] attachment;12     13     public String getId() {14         return id;15     }16     public void setId(String id) {17         this.id = id;18     }19     public String getName() {20         return name;21     }22     public void setName(String name) {23         this.name = name;24     }25     public String getRequestMessage() {26         return requestMessage;27     }28     public void setRequestMessage(String requestMessage) {29         this.requestMessage = requestMessage;30     }31     public byte[] getAttachment() {32         return attachment;33     }34     public void setAttachment(byte[] attachment) {35         this.attachment = attachment;36     }37 }

 

新建一个Resp类,并编写相关代码

1 package com.it448.serial; 2  3 import java.io.Serializable; 4  5 public class Resp implements Serializable{ 6      7     private static final long serialVersionUID = 1L; 8      9     private String id;10     private String name;11     private String responseMessage;12     13     public String getId() {14         return id;15     }16     public void setId(String id) {17         this.id = id;18     }19     public String getName() {20         return name;21     }22     public void setName(String name) {23         this.name = name;24     }25     public String getResponseMessage() {26         return responseMessage;27     }28     public void setResponseMessage(String responseMessage) {29         this.responseMessage = responseMessage;30     }31 }

 

新建一个工具类GzipUtils,方便调用

1 package com.it448.utils; 2  3 import java.io.ByteArrayInputStream; 4 import java.io.ByteArrayOutputStream; 5 import java.io.File; 6 import java.io.FileInputStream; 7 import java.io.FileOutputStream; 8 import java.util.zip.GZIPInputStream; 9 import java.util.zip.GZIPOutputStream;10 11 public class GzipUtils {12     public static byte[] gzip(byte[] data) throws Exception{13         ByteArrayOutputStream bos = new ByteArrayOutputStream();14         GZIPOutputStream gzip = new GZIPOutputStream(bos);15         gzip.write(data);16         gzip.finish();17         gzip.close();18         byte[] ret = bos.toByteArray();19         bos.close();20         return ret;21     }22     23     public static byte[] ungzip(byte[] data) throws Exception{24         ByteArrayInputStream bis = new ByteArrayInputStream(data);25         GZIPInputStream gzip = new GZIPInputStream(bis);26         byte[] buf = new byte[1024];27         int num = -1;28         ByteArrayOutputStream bos = new ByteArrayOutputStream();29         while((num = gzip.read(buf, 0 , buf.length)) != -1 ){30             bos.write(buf, 0, num);31         }32         gzip.close();33         bis.close();34         byte[] ret = bos.toByteArray();35         bos.flush();36         bos.close();37         return ret;38     }39     40     public static void main(String[] args) throws Exception{41         42         // 读取文件43         String readPath = System.getProperty("user.dir") + File.separatorChar + "sources" +  File.separatorChar + "006.jpg";44         File file = new File(readPath);  45         FileInputStream in = new FileInputStream(file);  46         byte[] data = new byte[in.available()];  47         in.read(data);  48         in.close();  49         50         System.out.println("文件原始大小:" + data.length);51         // 测试压缩52         53         byte[] ret1 = GzipUtils.gzip(data);54         System.out.println("压缩之后大小:" + ret1.length);55         56         byte[] ret2 = GzipUtils.ungzip(ret1);57         System.out.println("还原之后大小:" + ret2.length);58         59         // 写出文件60         String writePath = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + "006.jpg";61         FileOutputStream fos = new FileOutputStream(writePath);62         fos.write(ret2);63         fos.close();    64     }   65 }

 

新建一个Marshalling工厂类MarshallingCodeCFactory.java

1 package com.it448.serial; 2  3 import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; 4 import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; 5 import io.netty.handler.codec.marshalling.MarshallerProvider; 6 import io.netty.handler.codec.marshalling.MarshallingDecoder; 7 import io.netty.handler.codec.marshalling.MarshallingEncoder; 8 import io.netty.handler.codec.marshalling.UnmarshallerProvider; 9 10 import org.jboss.marshalling.MarshallerFactory;11 import org.jboss.marshalling.Marshalling;12 import org.jboss.marshalling.MarshallingConfiguration;13 14 /**15  * Marshalling工厂16  * @author(xyh)17  * @since 2019-06-1218  */19 public final class MarshallingCodeCFactory {20 21     /**22           * 创建Jboss Marshalling解码器MarshallingDecoder23      * @return MarshallingDecoder24      */25     public static MarshallingDecoder buildMarshallingDecoder() {26         // 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。27         final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");28         // 创建了MarshallingConfiguration对象,配置了版本号为5 29         final MarshallingConfiguration configuration = new MarshallingConfiguration();30         configuration.setVersion(5);31         // 根据marshallerFactory和configuration创建provider32         UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);33         // 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度34         MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);35         return decoder;36     }37 38     /**39           * 创建Jboss Marshalling编码器MarshallingEncoder40      * @return MarshallingEncoder41      */42     public static MarshallingEncoder buildMarshallingEncoder() {43         final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");44         final MarshallingConfiguration configuration = new MarshallingConfiguration();45         configuration.setVersion(5);46         MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);47         // 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组48         MarshallingEncoder encoder = new MarshallingEncoder(provider);49         return encoder;50     }51 }

 

新建一个服务端的Handler类ServerHandler.java

1 package com.it448.serial; 2  3 import java.io.File; 4 import java.io.FileOutputStream; 5  6 import com.it448.utils.GzipUtils; 7 import io.netty.channel.ChannelHandlerContext; 8 import io.netty.channel.ChannelInboundHandlerAdapter; 9 10 public class ServerHandler extends ChannelInboundHandlerAdapter{11     @Override12     public void channelActive(ChannelHandlerContext ctx) throws Exception {13     }14 15     @Override16     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {17         Req req = (Req)msg;18         System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());19         byte[] attachment = GzipUtils.ungzip(req.getAttachment());20         21         String path = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + "001.jpg";22         FileOutputStream fos = new FileOutputStream(path);23         fos.write(attachment);24         fos.close();25         26         Resp resp = new Resp();27         resp.setId(req.getId());28         resp.setName("resp" + req.getId());29         resp.setResponseMessage("响应内容" + req.getId());30         ctx.writeAndFlush(resp);31     }32 33     @Override34     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {35     }36 37     @Override38     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {39         ctx.close();40     }    41 }

 

新建一个服务端类Server.java

1 package com.it448.serial; 2  3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel;10 import io.netty.channel.socket.nio.NioServerSocketChannel;11 import io.netty.handler.logging.LogLevel;12 import io.netty.handler.logging.LoggingHandler;13 14 public class Server {15     public static void main(String[] args) throws Exception{16         EventLoopGroup pGroup = new NioEventLoopGroup();17         EventLoopGroup cGroup = new NioEventLoopGroup();18         19         ServerBootstrap b = new ServerBootstrap();20         b.group(pGroup, cGroup)21          .channel(NioServerSocketChannel.class)22          .option(ChannelOption.SO_BACKLOG, 1024)23          // 设置日志24          .handler(new LoggingHandler(LogLevel.INFO))25          .childHandler(new ChannelInitializer
() {26 protected void initChannel(SocketChannel sc) throws Exception {27 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());28 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());29 sc.pipeline().addLast(new ServerHandler());30 }31 });32 33 ChannelFuture cf = b.bind(8765).sync();34 35 cf.channel().closeFuture().sync();36 pGroup.shutdownGracefully();37 cGroup.shutdownGracefully(); 38 }39 }

 

新建一个客户端Handler类ClientHandler.java

1 package com.it448.serial; 2  3 import io.netty.channel.ChannelHandlerContext; 4 import io.netty.channel.ChannelInboundHandlerAdapter; 5 import io.netty.util.ReferenceCountUtil; 6  7 public class ClientHandler extends ChannelInboundHandlerAdapter{ 8     @Override 9     public void channelActive(ChannelHandlerContext ctx) throws Exception {10     }11 12     @Override13     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {14         try {15             Resp resp = (Resp)msg;16             System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());            17         } finally {18             ReferenceCountUtil.release(msg);19         }20     }21 22     @Override23     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {24     }25 26     @Override27     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {28         ctx.close();29     }    30 }

 

 

新建一个客户端类Client.java

1 package com.it448.serial; 2  3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.EventLoopGroup; 7 import io.netty.channel.nio.NioEventLoopGroup; 8 import io.netty.channel.socket.SocketChannel; 9 import io.netty.channel.socket.nio.NioSocketChannel;10 11 import java.io.File;12 import java.io.FileInputStream;13 14 import com.it448.utils.GzipUtils;15 16 public class Client {17     public static void main(String[] args) throws Exception{18         EventLoopGroup group = new NioEventLoopGroup();19         Bootstrap b = new Bootstrap();20         b.group(group)21          .channel(NioSocketChannel.class)22          .handler(new ChannelInitializer
() {23 @Override24 protected void initChannel(SocketChannel sc) throws Exception {25 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());26 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());27 sc.pipeline().addLast(new ClientHandler());28 }29 });30 31 ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();32 33 for(int i = 0; i < 1000; i++ ){34 Req req = new Req();35 req.setId("" + i);36 req.setName("pro" + i);37 req.setRequestMessage("数据信息" + i); 38 String path = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "001.jpg";39 File file = new File(path);40 FileInputStream in = new FileInputStream(file); 41 byte[] data = new byte[in.available()]; 42 in.read(data); 43 in.close(); 44 req.setAttachment(GzipUtils.gzip(data));45 cf.channel().writeAndFlush(req);46 }47 48 cf.channel().closeFuture().sync();49 group.shutdownGracefully();50 }51 }

 

代码测试

首先启动服务端,也就是运行Server类的main方法。

 

然后启用客户端,也就是运行Client类的main方法。

 

测试结果

 

从图中可以看到,receive文件夹多了一张001.jpg的图片。说明图片已经传输过来了。

好了,这部分内容就讲到这里,送上今天的福利:三套Netty系列教程【价值600】,加wxhaox就可以领取。当然了,对应netty有任何疑问也都可以咨询!!

end -- 1560313059

  -- 学而不思则罔,思而不学则殆

转载于:https://www.cnblogs.com/java13/p/11008878.html

你可能感兴趣的文章
Python Selenium Chrome Headless 爬取企查查数据
查看>>
Sublime Text 2 常用快捷键
查看>>
hadoop day 1
查看>>
剑指offer——面试题26:判断二叉树B是否为二叉树A的子结构
查看>>
HDU2024 C语言合法标识符
查看>>
STS中applicationContext.xml配置文件
查看>>
代码编写规范
查看>>
清除浮动总结
查看>>
【android-cocos2d-X iconv.h】在android下使用iconv
查看>>
转:Java中的StringTokenizer类的使用方法
查看>>
httpd实现基于用户的访问控制
查看>>
Verilog门级建模
查看>>
Java豆瓣电影爬虫——减少与数据库交互实现批量插入
查看>>
信息传递 计蒜客16863 NOIP模拟赛 概率DP
查看>>
C++Primer第5版学习笔记(二)
查看>>
java内存溢出分析
查看>>
c++学习笔记(新手学习笔记,如有错误请与作者联系)
查看>>
Bindservice调用服务流程
查看>>
python 数据类型
查看>>
spring mvc 的各种参数的绑定方式
查看>>