package com.nms.server.thread.netty; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.util.List; import org.apache.log4j.Logger; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.util.ReferenceCountUtil; /** * 消息解析 * @author fang *消息格式 | 4byte | 1byte | 1byte | .................... | * |bodylength| msg Type| businessType | filename | body | */ public class MessageDecoder extends ByteToMessageDecoder { private final static Logger logger = Logger.getLogger(MessageDecoder.class); private final int fieldLength = 4;//四个字节保存body length /** * 一个字节保存 消息类型,大于 0 为文件, * 0 为 byte[] ,value 为 文件名的长度, * 最大支持 127个字节的文件名 */ private final int typeLength = 1;// private final int businessTypeLength = 1;//业务类型 private final int headerLength = fieldLength + typeLength + businessTypeLength;//协议头长度 private boolean flag = true;//是否开始解析 private int bodyLength; private int messageType;//消息类型 private BusinessType businessType;//业务类型 private int remain;//剩余未读取内容长度 private File tmpFile; private String fileName; private FileOutputStream fos; private ByteBuf temBuf; public MessageDecoder() { init(); } private void init(){ this.flag = true; this.remain = -1; this.bodyLength = -1; this.messageType = 0; this.businessType = null; this.tmpFile = null; this.fileName = null; } /** * 已经成功读取协议头 * @throws IOException * @throws FileNotFoundException */ private void begin() throws FileNotFoundException, IOException{ this.flag = false; if(messageType > 0){ fos = new FileOutputStream(tmpFile); }else{ temBuf = Unpooled.buffer(bodyLength); } } private void write(byte[] data) throws FileNotFoundException, IOException{ if(messageType > 0){ fos.write(data); fos.flush(); }else{ temBuf.writeBytes(data); } } private void release() throws IOException{ if(fos != null){ try { fos.close(); } catch (Exception e) { e.printStackTrace(); } fos = null; } if(temBuf != null){ ReferenceCountUtil.release(temBuf); temBuf = null; } } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { Message m = decode(in); if(m != null){ out.add(m); init(); logger.debug("成功组包一条消息:type : " +m.getBusinessType().getName() +",length : " + m.getLength()); } } private Message decode(ByteBuf in) throws Exception{ int readableBytes = in.readableBytes(); logger.debug("decode begin,flag:"+flag+ " ,readableBytes:" + readableBytes); if(flag && readableBytes < (headerLength)){//初始状态,且不能读取 协议头 return null; } try { byte[] tem = null; if(flag && readableBytes >= (headerLength) ){ int fileNamelen = 0; in.markReaderIndex(); int bl = in.readInt(); messageType = in.readByte(); int bt = in.readByte();//业务类型 if(messageType < 1){ remain = bodyLength = bl; businessType = BusinessType.getType(bt); }else{//大于100 位 文件 fileNamelen = messageType ;//文件名长度 if(readableBytes >= (headerLength + fileNamelen)){ remain = bodyLength = bl; businessType = BusinessType.getType(bt); tem = new byte[fileNamelen]; in.readBytes(tem);//读取文件名 String fileName = new String(tem); this.fileName = fileName; tmpFile = File.createTempFile("Datacontroller-","-"+fileName); }else{//不能获取文件名,暂不读取数据 in.resetReaderIndex(); return null; } } begin(); if(readableBytes < (headerLength + bodyLength + fileNamelen)){ int rl = readableBytes - headerLength - fileNamelen; tem = new byte[rl]; in.readBytes(tem); write(tem); remain = bodyLength - rl;//剩余 n 个字节 没有读取 }else{ tem = new byte[bodyLength]; in.readBytes(tem); write(tem); remain = 0; } }else{ if(readableBytes < remain){ tem = new byte[readableBytes]; in.readBytes(tem); write(tem); remain = remain - readableBytes; }else{ tem = new byte[remain]; in.readBytes(tem); write(tem); remain = 0; } } if(remain == 0){ Message msg = new Message(bodyLength,messageType, businessType); if(messageType > 0){ msg.setFile(tmpFile); msg.setFileName(this.fileName); }else{ msg.setData(temBuf.array()); } release(); return msg; } } catch (Exception e) { if(remain >0){ in.skipBytes(remain); } throw e; } logger.debug("decode end,type : "+businessType.getName()+" bodyLength: "+bodyLength+" , remain :" +remain); return null; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { init();//重置 logger.error("",cause); ctx.fireExceptionCaught(cause); } }