Socket TCP协议 实时通信的粘包处理 Java与C++实现



原理:

(经典)tcp粘包分析

场景:

此项目是处理实时监测数据,一旦tcp socket建立连接,会不间断实时发送数据,峰值输数据量在3M/秒,这样的数据量必然会造成数据粘包。

目的:

TCP连接面向流,读取网络的一包数据不一定正好是协议里定义的完整的一包,有可能是多包,有可能是半包,也有可能是一包半,现在要将每次读取的数据进行分包,也就是粘包处理,提取出完整的一包数据供上层使用,上层需要将完整的一包数据里的数据根据协议定义的格式提取出来。

实现:

将收到的数据copy到缓存区,在缓存区里循环从起始位按照协议找出完整的一包数据提取出来。
关键点在于根据协议找出完整一包数据的长度

  1. 从网络读取数据后拷贝到缓存区
  2. 判断:缓存区里数据占位,长度小于某个值n,return再次读取网络数据。这个值n长度的数据内要能解析出单个完整包的长度,以便后续处理
  3. 循环:如果缓存区长度大于解析出来的完整一包的长度
  4. 执行: 取出完整一包数据后,然后剔除这包,将缓存区剩余数据放置起始位
  5. 循环里再次判断: 长度小于某个值n,return再次读取网络数据。 这个值n长度的数据内要能解析出单个完整包的长度,以便后续处理

如果协议定义了帧头,可以在取包的长度之前校验帧头,确保数据正确。

这里说明定义缓冲区buffer的长度大小:必须要大于可能收到的最大数据包的长度加上read读取一次网络最大数据长度
原因是缓冲区里可能剩下不到一包数据,下一次读取网络数据后要将数据copy至缓冲区,如果超过缓冲区大小就无法进行处理。可在copy时加一层判断,如果超过缓存区,就直接返回,断开连接。代表这种数据包不能进行处理。如果缓存区设计合理,不会出现此种情况。read读取一次网络最大数据长度是在read到的buffer定义的长度。缓冲区的buffer不要设置过大,占用太多内存。
数据源

数据源说明:第一位固定#。第二位表示之后有几位代表了之后的数据的长度,比如第一条数据的第二位4,代表之后的四位3350是从0:开始共有3350个字节长度的数据。之后的数据跟业务相关。

主要代码:

Java实现:
不可用于生产环境,理解思想后根据业务数据处理粘包

private static int MAXDATALEN = 500000; //处理数据缓冲池的长度
private static int RECEIVEDATALEN = 200000;//读取网络数据包最大长度
private int SiglePackageLen = 0;//提取出包的长度
private int SequenceLen = 0;//当前缓冲区内数据长度
private byte BuffSequencePackage[] = new byte[MAXDATALEN];//数据缓冲池
public void readData() {
    //读取网络数据长度
        int RecvLen;
        //缓存区
        byte ReceiveData[] = new byte[RECEIVEDATALEN];
        try {
            while (AdapterManager.getInstance().isFlag()) {
                if (mSocket.isConnected()) {
                    if (!mSocket.isInputShutdown()) {
                        if ((RecvLen = inputstream.read(ReceiveData)) != 0) {
                            NetNum++;
// Log.i("Read", ">>>>>>>>>>>第" + NetNum + "次读取网络数据 共" + RecvLen + "字节<<<<<<<<<<<<<<<");
                            if (RecvLen <= 0) {
                                ToastUtils.showShortSafe(ERROR_Device);
                                Log.i(TAG, "设备断开连接,RecvLen: " + RecvLen);
                                return;
                            }
                            ReceivedPackage(RecvLen,ReceiveData);
                        }
                    }
                }
            }

        } catch (SocketTimeoutException e) {
            e.printStackTrace();
            Log.i(TAG, "网络异常断开,收取数据超时");
            ToastUtils.showShortSafe(ERROR_NetBreak);
        } catch (SocketException e) {
            e.printStackTrace();
            Log.i(TAG, "停止任务,断开连接");
        } catch (Exception e) {
            e.printStackTrace();
            Log.i(TAG, "读取数据异常");
        }

    }
     /** * 粘包处理 * */
    public void ReceivedPackage(int RecvLen,byte[] ReceiveData) throws Exception {
        pakageNum = 0;
        //将收到的数据copy至缓冲区
        System.arraycopy(ReceiveData, 0, BuffSequencePackage, SequenceLen, RecvLen);
        //记录缓冲区usedlength
        SequenceLen += RecvLen;
        //这一步保证后续操作能解析出这一包数据的长度
        if (SequenceLen <10) {
            return;
        }

        int lentemp = BuffSequencePackage[1] - '0';
        SiglePackageLen = Integer.parseInt(new String(Arrays.copyOfRange(BuffSequencePackage, 2, lentemp + 2))) + 2 + lentemp;
        //缓存区长度大于等于下一包的长度,说明缓冲区里还有完整的一包数据
        while (SiglePackageLen <= SequenceLen) {
            //判断任务是否结束。如果网络不佳或程序没有及时处理数据导致网络中的数据缓存过多,一次read读取数据就会将read的ReceiveData沾满,如果read的ReceiveData长度定义很大,将这些数据copy至缓存区,则这个循环会执行一段时间, 如果没有这层判断,即使任务结束,这个循环还在执行。
            if (!AdapterManager.getInstance().isFlag()) {
                return;
            }
            //取出第一包数据分析
            byte SiglePackage[] = Arrays.copyOfRange(BuffSequencePackage, 2 + lentemp, SiglePackageLen);
            AnalyseReceivedPackage(SiglePackage);
            //剔除缓冲区第一包数据
            byte temp[] = Arrays.copyOfRange(BuffSequencePackage, SiglePackageLen, SequenceLen);
            //记录缓冲区usedlength
            SequenceLen = SequenceLen - SiglePackageLen;
            //清空缓存区
            resumeSequence();
            //将剩余数据copy至缓冲区,从头部开始
            System.arraycopy(temp, 0, BuffSequencePackage, 0, temp.length);
            //再次进行验证,剩余的长度是否能提取出下一包数据的长度
            if (SequenceLen < 10) {
                return;
            }
            //提取下一包数据长度
            lentemp = BuffSequencePackage[1] - '0';
            SiglePackageLen = Integer.parseInt(new String(Arrays.copyOfRange(BuffSequencePackage, 2, lentemp + 2))) + 2 + lentemp;

        }

    }

c++实现:

//s_plocalStationData是从队列里申请的缓存区,length是缓存区有效数据的长度,MessageHead是帧头结构体
    LocalStationDataInfo* info = pContext->m_Array->s_plocalStationData;
    //将数据copy到缓存区
    memcpy(info->pdata+info->length , pOverlapBuff->GetBuffer(),nSize);
    info->length =nSize + info->length;
    if(info->length<sizeof(MessageHead)){
        return;
    }
    unsigned long singlePakageLength = FindPackagetLength(info->pdata);
    while(singlePakageLength <= (info->length)){
        //将完整的一包提取出来分析
        AnalyseReceivedPackage(info->pdata,singlePakageLength);
        info->length =info->length - singlePakageLength;
        //剔除缓存区第一包数据
        memmove(info->pdata,info->pdata+singlePakageLength,info->length);
        //确保缓存区剩余数据长度可以提取出下一包数据的包长
        if(info->length < sizeof(MessageHead)){ 
            return;
        }
        singlePakageLength = FindPackagetLength(info->pdata);
    }