Netty编解码技术是什么意思呢?所谓的编解码技术,说白了就是java序列化技术。序列化有两个目的:
1、进行网络传输
2、对象持久化虽然我们可以使用java进行序列化,Netty去传输。但是java序列化的硬伤太多,比如java的序列化无法跨平台、序列化后码流太大、序列化性能非常低等等...
码流太大是什么意思呢?比如说原先的我一篇文档,比如说大小是1M,序列化完了之后可能0.5M,序列化减少二分之一的码,比较大。然后0.5M去网络传输这个不太好。你比如说用其它的一些主流序列化的话可能就0.01M,非常小。性能非常好。
性能太低就是说,我用java序列化的过程可能需要10s,而用其它的高性能序列化可能0.1s。差距就是这么的大。
序列化的目的无非就是网络传输。而目前主流的序列化框架有以下几种:
1、JBoss的Marshalling
2、Google的Protobuf3、基于Protobuf的Kyro4、MessagePack框架其实我们主要是讲Marshalling和Google的Protobuf。这两个是业界非常好用的框架。其中JBoss的Marshalling速度还要比Google的Protobuf要快,原因是因为Marshalling不是跨语言,两端都是java与java之间相互传输的。因此,在这种情况下我们就用它就行了。但如果你想实现跨语言,比如这边是c#,另一边是java。这种跨语言进行通信传输的话,那你就需要用到Google的Protobuf来进行跨语言的传输。性能也非常高。而且它自己有一些大端小端的优化机制。
下面开始Marshalling编码实现。
首先新建一个java工程,导入netty和jboss-marshalling的jar包,导入几张图片到sources文件夹以便测试。
新建一个Req类,并编写相关代码
1 package com.it448.serial; 2 3 import java.io.Serializable; 4 5 public class Req implements Serializable{ 6 private static final long serialVersionUID = 1L; 7 8 private String id ; 9 private String name ;10 private String requestMessage ;11 private byte[] attachment;12 13 public String getId() {14 return id;15 }16 public void setId(String id) {17 this.id = id;18 }19 public String getName() {20 return name;21 }22 public void setName(String name) {23 this.name = name;24 }25 public String getRequestMessage() {26 return requestMessage;27 }28 public void setRequestMessage(String requestMessage) {29 this.requestMessage = requestMessage;30 }31 public byte[] getAttachment() {32 return attachment;33 }34 public void setAttachment(byte[] attachment) {35 this.attachment = attachment;36 }37 }
新建一个Resp类,并编写相关代码
1 package com.it448.serial; 2 3 import java.io.Serializable; 4 5 public class Resp implements Serializable{ 6 7 private static final long serialVersionUID = 1L; 8 9 private String id;10 private String name;11 private String responseMessage;12 13 public String getId() {14 return id;15 }16 public void setId(String id) {17 this.id = id;18 }19 public String getName() {20 return name;21 }22 public void setName(String name) {23 this.name = name;24 }25 public String getResponseMessage() {26 return responseMessage;27 }28 public void setResponseMessage(String responseMessage) {29 this.responseMessage = responseMessage;30 }31 }
新建一个工具类GzipUtils,方便调用
1 package com.it448.utils; 2 3 import java.io.ByteArrayInputStream; 4 import java.io.ByteArrayOutputStream; 5 import java.io.File; 6 import java.io.FileInputStream; 7 import java.io.FileOutputStream; 8 import java.util.zip.GZIPInputStream; 9 import java.util.zip.GZIPOutputStream;10 11 public class GzipUtils {12 public static byte[] gzip(byte[] data) throws Exception{13 ByteArrayOutputStream bos = new ByteArrayOutputStream();14 GZIPOutputStream gzip = new GZIPOutputStream(bos);15 gzip.write(data);16 gzip.finish();17 gzip.close();18 byte[] ret = bos.toByteArray();19 bos.close();20 return ret;21 }22 23 public static byte[] ungzip(byte[] data) throws Exception{24 ByteArrayInputStream bis = new ByteArrayInputStream(data);25 GZIPInputStream gzip = new GZIPInputStream(bis);26 byte[] buf = new byte[1024];27 int num = -1;28 ByteArrayOutputStream bos = new ByteArrayOutputStream();29 while((num = gzip.read(buf, 0 , buf.length)) != -1 ){30 bos.write(buf, 0, num);31 }32 gzip.close();33 bis.close();34 byte[] ret = bos.toByteArray();35 bos.flush();36 bos.close();37 return ret;38 }39 40 public static void main(String[] args) throws Exception{41 42 // 读取文件43 String readPath = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "006.jpg";44 File file = new File(readPath); 45 FileInputStream in = new FileInputStream(file); 46 byte[] data = new byte[in.available()]; 47 in.read(data); 48 in.close(); 49 50 System.out.println("文件原始大小:" + data.length);51 // 测试压缩52 53 byte[] ret1 = GzipUtils.gzip(data);54 System.out.println("压缩之后大小:" + ret1.length);55 56 byte[] ret2 = GzipUtils.ungzip(ret1);57 System.out.println("还原之后大小:" + ret2.length);58 59 // 写出文件60 String writePath = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "006.jpg";61 FileOutputStream fos = new FileOutputStream(writePath);62 fos.write(ret2);63 fos.close(); 64 } 65 }
新建一个Marshalling工厂类MarshallingCodeCFactory.java
1 package com.it448.serial; 2 3 import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; 4 import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; 5 import io.netty.handler.codec.marshalling.MarshallerProvider; 6 import io.netty.handler.codec.marshalling.MarshallingDecoder; 7 import io.netty.handler.codec.marshalling.MarshallingEncoder; 8 import io.netty.handler.codec.marshalling.UnmarshallerProvider; 9 10 import org.jboss.marshalling.MarshallerFactory;11 import org.jboss.marshalling.Marshalling;12 import org.jboss.marshalling.MarshallingConfiguration;13 14 /**15 * Marshalling工厂16 * @author(xyh)17 * @since 2019-06-1218 */19 public final class MarshallingCodeCFactory {20 21 /**22 * 创建Jboss Marshalling解码器MarshallingDecoder23 * @return MarshallingDecoder24 */25 public static MarshallingDecoder buildMarshallingDecoder() {26 // 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。27 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");28 // 创建了MarshallingConfiguration对象,配置了版本号为5 29 final MarshallingConfiguration configuration = new MarshallingConfiguration();30 configuration.setVersion(5);31 // 根据marshallerFactory和configuration创建provider32 UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);33 // 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度34 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);35 return decoder;36 }37 38 /**39 * 创建Jboss Marshalling编码器MarshallingEncoder40 * @return MarshallingEncoder41 */42 public static MarshallingEncoder buildMarshallingEncoder() {43 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");44 final MarshallingConfiguration configuration = new MarshallingConfiguration();45 configuration.setVersion(5);46 MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);47 // 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组48 MarshallingEncoder encoder = new MarshallingEncoder(provider);49 return encoder;50 }51 }
新建一个服务端的Handler类ServerHandler.java
1 package com.it448.serial; 2 3 import java.io.File; 4 import java.io.FileOutputStream; 5 6 import com.it448.utils.GzipUtils; 7 import io.netty.channel.ChannelHandlerContext; 8 import io.netty.channel.ChannelInboundHandlerAdapter; 9 10 public class ServerHandler extends ChannelInboundHandlerAdapter{11 @Override12 public void channelActive(ChannelHandlerContext ctx) throws Exception {13 }14 15 @Override16 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {17 Req req = (Req)msg;18 System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());19 byte[] attachment = GzipUtils.ungzip(req.getAttachment());20 21 String path = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "001.jpg";22 FileOutputStream fos = new FileOutputStream(path);23 fos.write(attachment);24 fos.close();25 26 Resp resp = new Resp();27 resp.setId(req.getId());28 resp.setName("resp" + req.getId());29 resp.setResponseMessage("响应内容" + req.getId());30 ctx.writeAndFlush(resp);31 }32 33 @Override34 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {35 }36 37 @Override38 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {39 ctx.close();40 } 41 }
新建一个服务端类Server.java
1 package com.it448.serial; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel;10 import io.netty.channel.socket.nio.NioServerSocketChannel;11 import io.netty.handler.logging.LogLevel;12 import io.netty.handler.logging.LoggingHandler;13 14 public class Server {15 public static void main(String[] args) throws Exception{16 EventLoopGroup pGroup = new NioEventLoopGroup();17 EventLoopGroup cGroup = new NioEventLoopGroup();18 19 ServerBootstrap b = new ServerBootstrap();20 b.group(pGroup, cGroup)21 .channel(NioServerSocketChannel.class)22 .option(ChannelOption.SO_BACKLOG, 1024)23 // 设置日志24 .handler(new LoggingHandler(LogLevel.INFO))25 .childHandler(new ChannelInitializer() {26 protected void initChannel(SocketChannel sc) throws Exception {27 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());28 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());29 sc.pipeline().addLast(new ServerHandler());30 }31 });32 33 ChannelFuture cf = b.bind(8765).sync();34 35 cf.channel().closeFuture().sync();36 pGroup.shutdownGracefully();37 cGroup.shutdownGracefully(); 38 }39 }
新建一个客户端Handler类ClientHandler.java
1 package com.it448.serial; 2 3 import io.netty.channel.ChannelHandlerContext; 4 import io.netty.channel.ChannelInboundHandlerAdapter; 5 import io.netty.util.ReferenceCountUtil; 6 7 public class ClientHandler extends ChannelInboundHandlerAdapter{ 8 @Override 9 public void channelActive(ChannelHandlerContext ctx) throws Exception {10 }11 12 @Override13 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {14 try {15 Resp resp = (Resp)msg;16 System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage()); 17 } finally {18 ReferenceCountUtil.release(msg);19 }20 }21 22 @Override23 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {24 }25 26 @Override27 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {28 ctx.close();29 } 30 }
新建一个客户端类Client.java
1 package com.it448.serial; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.EventLoopGroup; 7 import io.netty.channel.nio.NioEventLoopGroup; 8 import io.netty.channel.socket.SocketChannel; 9 import io.netty.channel.socket.nio.NioSocketChannel;10 11 import java.io.File;12 import java.io.FileInputStream;13 14 import com.it448.utils.GzipUtils;15 16 public class Client {17 public static void main(String[] args) throws Exception{18 EventLoopGroup group = new NioEventLoopGroup();19 Bootstrap b = new Bootstrap();20 b.group(group)21 .channel(NioSocketChannel.class)22 .handler(new ChannelInitializer() {23 @Override24 protected void initChannel(SocketChannel sc) throws Exception {25 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());26 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());27 sc.pipeline().addLast(new ClientHandler());28 }29 });30 31 ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();32 33 for(int i = 0; i < 1000; i++ ){34 Req req = new Req();35 req.setId("" + i);36 req.setName("pro" + i);37 req.setRequestMessage("数据信息" + i); 38 String path = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "001.jpg";39 File file = new File(path);40 FileInputStream in = new FileInputStream(file); 41 byte[] data = new byte[in.available()]; 42 in.read(data); 43 in.close(); 44 req.setAttachment(GzipUtils.gzip(data));45 cf.channel().writeAndFlush(req);46 }47 48 cf.channel().closeFuture().sync();49 group.shutdownGracefully();50 }51 }
代码测试
首先启动服务端,也就是运行Server类的main方法。
然后启用客户端,也就是运行Client类的main方法。
测试结果
从图中可以看到,receive文件夹多了一张001.jpg的图片。说明图片已经传输过来了。
好了,这部分内容就讲到这里,送上今天的福利:三套Netty系列教程【价值600】,加wxhaox就可以领取。当然了,对应netty有任何疑问也都可以咨询!!
end -- 1560313059
-- 学而不思则罔,思而不学则殆