This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
nms-nmsserver/src/com/nms/server/thread/netty/MessageDecoder.java

206 lines
5.3 KiB
Java
Raw Normal View History

2018-09-27 16:17:06 +08:00
package com.nms.server.thread.netty;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
import org.apache.log4j.Logger;
import com.nms.server.common.Constants;
2018-09-27 16:17:06 +08:00
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.ReferenceCountUtil;
/**
* 消息解析
* @author fang
*消息格式 | 4byte | 1byte | 1byte | .................... |
* |bodylength| msg Type| businessType | filename | body |
*/
public class MessageDecoder extends ByteToMessageDecoder {
private final static Logger logger = Logger.getLogger(MessageDecoder.class);
private final int fieldLength = 4;//四个字节保存body length
/**
* 一个字节保存 消息类型大于 0 为文件
* 0 byte[] ,value 文件名的长度
* 最大支持 127个字节的文件名
*/
private final int typeLength = 1;//
private final int businessTypeLength = 1;//业务类型
private final int headerLength = fieldLength + typeLength + businessTypeLength;//协议头长度
private boolean flag = true;//是否开始解析
private int bodyLength;
private int messageType;//消息类型
private BusinessType businessType;//业务类型
private int remain;//剩余未读取内容长度
private File tmpFile;
private String fileName;
private FileOutputStream fos;
private ByteBuf temBuf;
public MessageDecoder() {
init();
}
private void init(){
this.flag = true;
this.remain = -1;
this.bodyLength = -1;
this.messageType = 0;
this.businessType = null;
this.tmpFile = null;
this.fileName = null;
}
/**
* 已经成功读取协议头
* @throws IOException
* @throws FileNotFoundException
*/
private void begin() throws FileNotFoundException, IOException{
this.flag = false;
if(messageType > 0){
fos = new FileOutputStream(tmpFile);
}else{
temBuf = Unpooled.buffer(bodyLength);
}
}
private void write(byte[] data) throws FileNotFoundException, IOException{
if(messageType > 0){
fos.write(data);
fos.flush();
}else{
temBuf.writeBytes(data);
}
}
private void release() throws IOException{
if(fos != null){
try {
fos.close();
} catch (Exception e) {
e.printStackTrace();
}
fos = null;
}
if(temBuf != null){
ReferenceCountUtil.release(temBuf);
temBuf = null;
}
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Message m = decode(in);
if(m != null){
out.add(m);
init();
if(Constants.DEBUG_MESSAGEDECODER_DECODE) {
logger.debug("成功组包一条消息type : " +m.getBusinessType().getName() +",length : " + m.getLength());
}
2018-09-27 16:17:06 +08:00
}
}
private Message decode(ByteBuf in) throws Exception{
int readableBytes = in.readableBytes();
if(Constants.DEBUG_MESSAGEDECODER_DECODE) {
logger.debug("decode begin,flag:"+flag+ " ,readableBytes:" + readableBytes);
}
2018-09-27 16:17:06 +08:00
if(flag && readableBytes < (headerLength)){//初始状态,且不能读取 协议头
return null;
}
try {
byte[] tem = null;
if(flag && readableBytes >= (headerLength) ){
int fileNamelen = 0;
in.markReaderIndex();
int bl = in.readInt();
messageType = in.readByte();
int bt = in.readByte();//业务类型
if(messageType < 1){
remain = bodyLength = bl;
businessType = BusinessType.getType(bt);
}else{//大于100 位 文件
fileNamelen = messageType ;//文件名长度
if(readableBytes >= (headerLength + fileNamelen)){
remain = bodyLength = bl;
businessType = BusinessType.getType(bt);
tem = new byte[fileNamelen];
in.readBytes(tem);//读取文件名
String fileName = new String(tem);
this.fileName = fileName;
tmpFile = File.createTempFile("Datacontroller-","-"+fileName);
}else{//不能获取文件名,暂不读取数据
in.resetReaderIndex();
return null;
}
}
begin();
if(readableBytes < (headerLength + bodyLength + fileNamelen)){
int rl = readableBytes - headerLength - fileNamelen;
tem = new byte[rl];
in.readBytes(tem);
write(tem);
remain = bodyLength - rl;//剩余 n 个字节 没有读取
}else{
tem = new byte[bodyLength];
in.readBytes(tem);
write(tem);
remain = 0;
}
}else{
if(readableBytes < remain){
tem = new byte[readableBytes];
in.readBytes(tem);
write(tem);
remain = remain - readableBytes;
}else{
tem = new byte[remain];
in.readBytes(tem);
write(tem);
remain = 0;
}
}
if(remain == 0){
Message msg = new Message(bodyLength,messageType, businessType);
if(messageType > 0){
msg.setFile(tmpFile);
msg.setFileName(this.fileName);
}else{
msg.setData(temBuf.array());
}
release();
return msg;
}
} catch (Exception e) {
if(remain >0){
in.skipBytes(remain);
}
throw e;
}
if(Constants.DEBUG_MESSAGEDECODER_DECODE) {
logger.debug("decode end,type : "+businessType.getName()+" bodyLength: "+bodyLength+" , remain :" +remain);
}
2018-09-27 16:17:06 +08:00
return null;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
init();//重置
logger.error("",cause);
ctx.fireExceptionCaught(cause);
}
}