原理:
场景:
此项目是处理实时监测数据,一旦tcp socket建立连接,会不间断实时发送数据,峰值输数据量在3M/秒,这样的数据量必然会造成数据粘包。
目的:
TCP连接面向流,读取网络的一包数据不一定正好是协议里定义的完整的一包,有可能是多包,有可能是半包,也有可能是一包半,现在要将每次读取的数据进行分包,也就是粘包处理,提取出完整的一包数据供上层使用,上层需要将完整的一包数据里的数据根据协议定义的格式提取出来。
实现:
将收到的数据copy到缓存区,在缓存区里循环从起始位按照协议找出完整的一包数据提取出来。
关键点在于根据协议找出完整一包数据的长度。
- 从网络读取数据后拷贝到缓存区
- 判断:缓存区里数据占位,长度小于某个值n,return再次读取网络数据。这个值n长度的数据内要能解析出单个完整包的长度,以便后续处理
- 循环:如果缓存区长度大于解析出来的完整一包的长度
- 执行: 取出完整一包数据后,然后剔除这包,将缓存区剩余数据放置起始位
- 循环里再次判断: 长度小于某个值n,return再次读取网络数据。 这个值n长度的数据内要能解析出单个完整包的长度,以便后续处理
如果协议定义了帧头,可以在取包的长度之前校验帧头,确保数据正确。
这里说明定义缓冲区buffer的长度大小:必须要大于可能收到的最大数据包的长度加上read读取一次网络最大数据长度
原因是缓冲区里可能剩下不到一包数据,下一次读取网络数据后要将数据copy至缓冲区,如果超过缓冲区大小就无法进行处理。可在copy时加一层判断,如果超过缓存区,就直接返回,断开连接。代表这种数据包不能进行处理。如果缓存区设计合理,不会出现此种情况。read读取一次网络最大数据长度是在read到的buffer定义的长度。缓冲区的buffer不要设置过大,占用太多内存。
数据源说明:第一位固定#。第二位表示之后有几位代表了之后的数据的长度,比如第一条数据的第二位4,代表之后的四位3350是从0:开始共有3350个字节长度的数据。之后的数据跟业务相关。
主要代码:
Java实现:
不可用于生产环境,理解思想后根据业务数据处理粘包
private static int MAXDATALEN = 500000; //处理数据缓冲池的长度
private static int RECEIVEDATALEN = 200000;//读取网络数据包最大长度
private int SiglePackageLen = 0;//提取出包的长度
private int SequenceLen = 0;//当前缓冲区内数据长度
private byte BuffSequencePackage[] = new byte[MAXDATALEN];//数据缓冲池
public void readData() {
//读取网络数据长度
int RecvLen;
//缓存区
byte ReceiveData[] = new byte[RECEIVEDATALEN];
try {
while (AdapterManager.getInstance().isFlag()) {
if (mSocket.isConnected()) {
if (!mSocket.isInputShutdown()) {
if ((RecvLen = inputstream.read(ReceiveData)) != 0) {
NetNum++;
// Log.i("Read", ">>>>>>>>>>>第" + NetNum + "次读取网络数据 共" + RecvLen + "字节<<<<<<<<<<<<<<<");
if (RecvLen <= 0) {
ToastUtils.showShortSafe(ERROR_Device);
Log.i(TAG, "设备断开连接,RecvLen: " + RecvLen);
return;
}
ReceivedPackage(RecvLen,ReceiveData);
}
}
}
}
} catch (SocketTimeoutException e) {
e.printStackTrace();
Log.i(TAG, "网络异常断开,收取数据超时");
ToastUtils.showShortSafe(ERROR_NetBreak);
} catch (SocketException e) {
e.printStackTrace();
Log.i(TAG, "停止任务,断开连接");
} catch (Exception e) {
e.printStackTrace();
Log.i(TAG, "读取数据异常");
}
}
/** * 粘包处理 * */
public void ReceivedPackage(int RecvLen,byte[] ReceiveData) throws Exception {
pakageNum = 0;
//将收到的数据copy至缓冲区
System.arraycopy(ReceiveData, 0, BuffSequencePackage, SequenceLen, RecvLen);
//记录缓冲区usedlength
SequenceLen += RecvLen;
//这一步保证后续操作能解析出这一包数据的长度
if (SequenceLen <10) {
return;
}
int lentemp = BuffSequencePackage[1] - '0';
SiglePackageLen = Integer.parseInt(new String(Arrays.copyOfRange(BuffSequencePackage, 2, lentemp + 2))) + 2 + lentemp;
//缓存区长度大于等于下一包的长度,说明缓冲区里还有完整的一包数据
while (SiglePackageLen <= SequenceLen) {
//判断任务是否结束。如果网络不佳或程序没有及时处理数据导致网络中的数据缓存过多,一次read读取数据就会将read的ReceiveData沾满,如果read的ReceiveData长度定义很大,将这些数据copy至缓存区,则这个循环会执行一段时间, 如果没有这层判断,即使任务结束,这个循环还在执行。
if (!AdapterManager.getInstance().isFlag()) {
return;
}
//取出第一包数据分析
byte SiglePackage[] = Arrays.copyOfRange(BuffSequencePackage, 2 + lentemp, SiglePackageLen);
AnalyseReceivedPackage(SiglePackage);
//剔除缓冲区第一包数据
byte temp[] = Arrays.copyOfRange(BuffSequencePackage, SiglePackageLen, SequenceLen);
//记录缓冲区usedlength
SequenceLen = SequenceLen - SiglePackageLen;
//清空缓存区
resumeSequence();
//将剩余数据copy至缓冲区,从头部开始
System.arraycopy(temp, 0, BuffSequencePackage, 0, temp.length);
//再次进行验证,剩余的长度是否能提取出下一包数据的长度
if (SequenceLen < 10) {
return;
}
//提取下一包数据长度
lentemp = BuffSequencePackage[1] - '0';
SiglePackageLen = Integer.parseInt(new String(Arrays.copyOfRange(BuffSequencePackage, 2, lentemp + 2))) + 2 + lentemp;
}
}
c++实现:
//s_plocalStationData是从队列里申请的缓存区,length是缓存区有效数据的长度,MessageHead是帧头结构体
LocalStationDataInfo* info = pContext->m_Array->s_plocalStationData;
//将数据copy到缓存区
memcpy(info->pdata+info->length , pOverlapBuff->GetBuffer(),nSize);
info->length =nSize + info->length;
if(info->length<sizeof(MessageHead)){
return;
}
unsigned long singlePakageLength = FindPackagetLength(info->pdata);
while(singlePakageLength <= (info->length)){
//将完整的一包提取出来分析
AnalyseReceivedPackage(info->pdata,singlePakageLength);
info->length =info->length - singlePakageLength;
//剔除缓存区第一包数据
memmove(info->pdata,info->pdata+singlePakageLength,info->length);
//确保缓存区剩余数据长度可以提取出下一包数据的包长
if(info->length < sizeof(MessageHead)){
return;
}
singlePakageLength = FindPackagetLength(info->pdata);
}