C#多线程处理数据收发

处理两类消息:

  1. 数据交互消息,比如保存数据、查询、删除等操作;
  2. 透传操作命令,比如把接受到的命令发送出去,然后等待命令应答并返回。

三个线程:

  • 线程1:接收消息或者命令,组帧后放入消息队列,如果碰到的是心跳消息,则直接调用接口记录即可;
  • 线程2:从队列里面取出帧,如果是数据交互消息则处理并返回,如果是透传操作命令,则直接调用接口发送命令;
  • 线程3:使用wait…等待透传应答。

一、互斥消息队列

参考《C#互斥消息队列

二、DLL调用

2.1 操作接口

透传命令发送及等待,参考:《Win32 多线程程序设计-串口》

消息处理设计TCP/IP数据收发,参考《Win32 程序设计-TCP/IP》

    public class PDAComm
    {

        public const String strDllPath = @"\system\PDAComm.dll";

        [DllImport(strDllPath)]
        public static extern Boolean COM_Open();

        [DllImport(strDllPath)]
        public static extern Boolean COM_Close();

        [DllImport(strDllPath)]
        public static extern Boolean COM_Send(Byte[] buf, int buflen);

        [DllImport(strDllPath)]
        public static extern Boolean COM_Recv(Byte[] buf, int buflen, ref int rcvlen);


        [DllImport(strDllPath)]
        public static extern Boolean TCPServer_Start(UInt16 port);
        [DllImport(strDllPath)]
        public static extern Boolean TCPServer_Stop();
        [DllImport(strDllPath)]
        public static extern Boolean TCPServer_Send(Byte[] buf, int buflen);
        [DllImport(strDllPath)]
        public static extern Boolean TCPServer_Recv(Byte[] buf, int buflen, ref int rcvlen);
        [DllImport(strDllPath)]
        public static extern void SetRcvShaveHand();

        [DllImport(strDllPath)]
        public static extern int QueryPDAID(byte[] pdaID);
        [DllImport(strDllPath)]
        public static extern int UpdatePDAID(byte[] pdaID);

        [DllImport(strDllPath)]
        public static extern void SetPrintParam(bool bPDACommStatus, bool bPDACommData);

    }


        [DllImport(strDllPath)]
        public static extern Int32 Delivery_Send_MOIM(Int32 fd,
                                                      Int32 iLength,
                                                      Byte[] pCommand,
                                                      Int32 iTimeOut);
        [DllImport(strDllPath)]
        public static extern Int32 Delivery_Recv_MOIM(Int32 fd,
                                                      ref Int32 iLength,
                                                      Byte[] pCommand,
                                                      Int32 iTimeOut);

2.2 收发消息封装

using System;

using System.Collections.Generic;
using System.Text;

namespace ****************
{
    class Communication
    {
        public static readonly Communication instance = new Communication();

        //返回值
        //true:成功
        //false:失败
        public Boolean Open()
        {
            try
            {
                if (CReaderConfig.iCommunication == (int)CommonEnum.COMMUNICATION_TYPE.RS232)
                {
                    return PDAComm.COM_Open();
                }
                else if (CReaderConfig.iCommunication == (int)CommonEnum.COMMUNICATION_TYPE.TCP_SERVER)
                {
                    return PDAComm.TCPServer_Start(CReaderConfig.iServerPort);
                }
                return false;
            }
            catch (System.Exception )
            {
                Close();
                return false;
            }
        }

        //返回值
        //true:成功
        //false:失败
        public Boolean Close()
        {
            try
            {
                if (CReaderConfig.iCommunication == (int)CommonEnum.COMMUNICATION_TYPE.RS232)
                {
                    return PDAComm.COM_Close();
                }
                else if (CReaderConfig.iCommunication == (int)CommonEnum.COMMUNICATION_TYPE.TCP_SERVER)
                {
                    return PDAComm.TCPServer_Stop();
                }
                return false;
            }
            catch (System.Exception )
            {
                return false;
            }
        }

        //返回值
        //0:成功
        //-1:失败
        public Boolean Send(Byte[] buf, int sndlen)
        {
            try
            {
                if (CReaderConfig.iCommunication == (int)CommonEnum.COMMUNICATION_TYPE.RS232)
                {
                    return PDAComm.COM_Send(buf, sndlen);
                }
                else if (CReaderConfig.iCommunication == (int)CommonEnum.COMMUNICATION_TYPE.TCP_SERVER)
                {
                    return PDAComm.TCPServer_Send(buf, sndlen);
                }
                return false;
            }
            catch (System.Exception )
            {
                return false;
            }
        }

        //返回值
        //>0:成功,接收到多少数据
        //-1:失败
        public int Recv(ref Byte[] buf, int buflen)
        {
            try
            {
                int rcvlen = 0;
                Boolean bsuccess = false;
                if (CReaderConfig.iCommunication == (int)CommonEnum.COMMUNICATION_TYPE.RS232)
                {
                    bsuccess = PDAComm.COM_Recv(buf, buflen, ref rcvlen);
                }
                else if (CReaderConfig.iCommunication == (int)CommonEnum.COMMUNICATION_TYPE.TCP_SERVER)
                {
                    bsuccess = PDAComm.TCPServer_Recv(buf, buflen, ref rcvlen);
                }
                else
                {
                    return 0;
                }

                if (!bsuccess)
                {
                    return 0;
                }
                return rcvlen;
            }
            catch (System.Exception )
            {
                return -1;
            }
        }
    }
}

三、组帧、解帧

收到消息后组帧,并将帧放入消息队列

using System;
using System.Collections.Generic;
using System.Text;

namespace **************8
{
    class CMOIMFrame
    {
        public static CMOIMFrame instance = new CMOIMFrame();

		/// <summary>
		/// 保存完整帧
		/// </summary>
		/// <param name="buf"></param>
		/// <param name="length"></param>
		public static Byte[] g_pFrame = new Byte[CommonUse.MAX_FRAME_SIZE];
		public static Int32 g_iFrameLen = 0;

		public static Int32 g_iFrameState = 1;

        public static Byte[] pDstFrame = new Byte[CommonUse.MAX_FRAME_SIZE];
        public static Int32 iDstLen = 0;

		public void ProcessMsg(Byte[] pBuf, int iLength)
		{
			for (Int32 i = 0; i < iLength; i++)
			{
				if (0 == MakeFrame(pBuf[i], ref g_pFrame, ref g_iFrameLen, ref g_iFrameState))
				{
                    //CRecordStreamFile.WriteInfoStreamToFile(g_pFrame, 3);

					//创建命令
                    if (ParseFrame(pDstFrame, ref iDstLen, g_pFrame, g_iFrameLen))
                    {
                        //CRecordStreamFile.WriteInfoStreamToFile(pDstFrame, iDstLen);
                        if (iDstLen == 6 && pDstFrame[3] == 0x22)
                        {
                            PDAComm.SetRcvShaveHand();
                        }
                        else
                        {
                            Array.Copy(g_pFrame, 0, pDstFrame, 3, iDstLen - 5);
                            
                            MsgQueue.getInstance().pushFrame(pDstFrame, iDstLen);
                        }
                    }

                    iDstLen = 0;
					//处理完帧后,将帧长度置为0
					g_iFrameLen = 0;
				}
                
			}
		}

        public Boolean CheckBCC(Byte[] pSrc, Int32 iSrcLen)
        {
            //先计算BCC是否正确
            Byte ucBCC = pSrc[iSrcLen - 2];
            Int32 iCurrentLen = iSrcLen;
            if (pSrc[iSrcLen - 3] == 0xfe)
            {
                iCurrentLen -= 1;
                if (pSrc[iSrcLen - 2] == 0x00)
                {
                    ucBCC = 0xfe;
                }
                else if (pSrc[iSrcLen - 2] == 0x01)
                {
                    ucBCC = 0xff;
                }
                else
                {
                    return false;
                }
            }
            if (ucBCC != CalXORCheckSum(pSrc, iCurrentLen - 2))
            {
                return false;
            }

            return true;
        }

        /// <summary>
        /// 分析帧
        /// </summary>
        /// <param name="src"></param>
        /// <param name="iSrcLen"></param>
        /// <param name="dst"></param>
        /// <param name="iDstLen"></param>
        /// <returns></returns>
        public Boolean ParseFrame(Byte[] pDst,  ref Int32 iDstLen,  Byte[] pSrc,  Int32 iSrcLen)
        {
            iDstLen = 0;

            pDst[iDstLen++] = pSrc[0];
            pDst[iDstLen++] = pSrc[1];

            //先计算BCC是否正确
            Byte BCC = pSrc[iSrcLen - 2];
            Int32 iCurrentLen = iSrcLen;
            if (pSrc[iSrcLen - 3] == 0xfe)
            {
                iCurrentLen -= 1;
                if (pSrc[iSrcLen - 2] == 0x00)
                {
                    BCC = 0xfe;
                }
                else if (pSrc[iSrcLen - 2] == 0x01)
                {
                    BCC = 0xff;
                }
                else
                    return false;
            }

            if (BCC != CalXORCheckSum(pSrc, iCurrentLen - 2))
            {
                return false;
            }

            //解帧
            Int32 index = 2;
            for (; index < iSrcLen - 1 && pSrc[index] != 0xFF; index++)
            {
                if (pSrc[index] == 0xFE)
                {
                    if (!(pSrc[index + 1] == 0x00 || pSrc[index + 1] == 0x01))
                    {
                        return false;
                    }
                }
                if (pSrc[index] == 0xFE && pSrc[index + 1] == 0x00)
                {
                    pDst[iDstLen++] = 0xFE;
                    index++;
                }
                else if (pSrc[index] == 0xFE && pSrc[index + 1] == 0x01)
                {
                    pDst[iDstLen++] = 0xFF;
                    index++;
                }
                else
                {
                    pDst[iDstLen++] = pSrc[index];
                }
            }

            //帧为尾
            pDst[iDstLen++] = 0xFF;

            return true;
        }


        /// <summary>
        /// 建帧
        /// </summary>
        /// <param name="src"></param>
        /// <param name="start"></param>
        /// <param name="srclen"></param>
        /// <param name="dst"></param>
        /// <param name="dstlen"></param>

        public void BuildFrame(Byte[] pDst,  ref Int32 iDstlen,  Byte[] pSrc, Int32 iSrcLen)
        {
            Byte tmpid = pSrc[0];

            iSrcLen = iSrcLen - 1;
            if (iSrcLen%8 != 0)
            {
                iSrcLen += 8 - iSrcLen % 8;
            }


            Array.Copy(pSrc, 1, pDst, 0, iSrcLen);

            Array.Clear(pSrc, 0, iSrcLen+1);
            RSUComm.EncryptionOBUSerialNo(CReaderConfig.iCommunication, pDst, 0, iSrcLen, pSrc, 0);
            Array.Clear(pDst, 0, iSrcLen);


            iDstlen = 0;

            //帧头
            pDst[iDstlen++] = 0xFF;
            pDst[iDstlen++] = 0xFF;
            pDst[iDstlen++] = tmpid;

            //包帧
            for (Int32 index = 0; index < iSrcLen; index++)
            {
                if (0xFF == pSrc[index])
                {
                    pDst[iDstlen++] = 0xFE;
                    pDst[iDstlen++] = 0x01;
                }
                else if (0xFE == pSrc[index])
                {
                    pDst[iDstlen++] = 0xFE;
                    pDst[iDstlen++] = 0x00;
                }
                else
                {
                    pDst[iDstlen++] = pSrc[index];
                }
            }

            //计算校验值
            Byte BCC = CalXORCheckSum(pDst, iDstlen);

            if (0xFF == BCC)
            {
                pDst[iDstlen++] = 0xFE;
                pDst[iDstlen++] = 0x01;
            }
            else if (0xFE == BCC)
            {
                pDst[iDstlen++] = 0xFE;
                pDst[iDstlen++] = 0x00;
            }
            else
            {
                pDst[iDstlen++] = BCC;
            }

            //设置帧尾
            pDst[iDstlen++] = 0xFF;
        }


        /// <summary>
        /// 计算异或植
        /// </summary>
        /// <param name="frame"></param>
        /// <param name="wDataLen"></param>
        /// <returns></returns>
        public Byte CalXORCheckSum(Byte[] pFrame, Int32 iDataLen)
        {
            Byte ucXORVal = pFrame[2];
            Int32 iLoop = 0;

            for (iLoop = 3; iLoop < iDataLen; iLoop++)
            {
                ucXORVal = (Byte)(ucXORVal ^ pFrame[iLoop]);
            }
            return ucXORVal;
        }

        /// <summary>
        /// 字节逆转
        /// </summary>
        /// <param name="RSCTL"></param>
        /// <returns></returns>
        public Byte ReverseByte(Byte bRSCTL)
        {
            Byte bTmp = (Byte)(bRSCTL >> 4);
            bRSCTL = (Byte)(bRSCTL << 4);
            return (Byte)(bRSCTL | bTmp);
        }

        //iState = 1:开始
        //iState = 2:获得半个帧头
        //iState = 3:中间数据,获得帧尾时结束
        public Int32 MakeFrame(Byte bData,  ref Byte[] pSrcFrame,  ref Int32 iSrcFrameLen,  ref Int32 iState)
        {
            switch (iState)
            {
                case 1:
                    if (bData == 0xFF)
                    {
                        //半个帧头
                        pSrcFrame[iSrcFrameLen++] = bData;
                        iState = 2;
                        return 1;
                    }
                    else
                    {
                        //没有找到帧头
                        iState = 1;
                        iSrcFrameLen = 0;
                        return -1;
                    }
                case 2:
                    if (bData == 0xFF)
                    {
                        //找到完整的帧头
                        pSrcFrame[iSrcFrameLen++] = bData;
                        iState = 3;
                        return 1;
                    }
                    else
                    {
                        //找到非法帧头
                        iState = 1;
                        iSrcFrameLen = 0;
                        return -1;
                    }
                case 3:
                    if (bData != 0xFF)
                    {
                        //如果不是帧尾,则继续
                        pSrcFrame[iSrcFrameLen++] = bData;
                        return 1;
                    }
                    else
                    {
                        if (iSrcFrameLen != 2)
                        {
                            //找到帧尾
                            pSrcFrame[iSrcFrameLen++] = bData;
                            iState = 1;
                            return 0;

                        }
                        else
                        {
                            //0xFF, 0xFF, 0xFF这种情况下,只认为有一个头
                            iState = 3;
                            return 1;
                        }

                    }
            }
            return -1;
        }
    }
}

四、线程操作

4.1 线程基类,管理线程

using System;
using System.Collections.Generic;
using System.Text;
using System.Windows.Forms;
using System.Threading;

namespace ********************
{
    public class CThreadManage
    {

        /// <summary>
        /// 线程句柄
        /// </summary>
        private Thread hThread = null;

        /// <summary>
        /// 判断线程是否运行
        /// </summary>
        private Boolean bRuning = false;

        /// <summary>
        /// 判断线程单次运行是否结束
        /// </summary>
        private Boolean bRuningEnd = true;

        public Thread HThread
        {
            get
            {
                return hThread;
            }
            set
            {
                hThread = value;
            }
        }

        public Boolean BRuning
        {
            get
            {
                return bRuning;
            }
            set
            {
                bRuning = value;
            }
        }

        public Boolean BRuningEnd
        {
            get
            {
                return bRuningEnd;
            }
            set
            {
                bRuningEnd = value;
            }
        }

        public void CloseThread()
        {
            try
            {
                BRuning = false;
                if ( HThread != null )
                {
                    while ( !BRuningEnd )
                    {
                        Application.DoEvents();
                        Thread.Sleep(10);
                    }
                    HThread.Join();
                    HThread = null;
                }
            }
            catch (System.Exception )
            {

            }
        }
    }
}

4.2 创建和关闭线程

SyncListeningThread:收消息
SyncProcessThread:处理消息
ThoroughlyThread:透传消息

private CThreadManage SyncListeningThread = null;
private CThreadManage SyncProcessThread = null;
private CThreadManage ThoroughlyThread = null;

private Boolean StartUpListeningThread()
{
    if (SyncListeningThread == null)
    {
        SyncListeningThread = new CThreadManage();
        SyncListeningThread.BRuning = true;
        SyncListeningThread.BRuningEnd = true;
        SyncListeningThread.HThread = new Thread(new ThreadStart(SyncListeningFunc));
        SyncListeningThread.HThread.Start();
    }
    if (SyncProcessThread == null)
    {
        SyncProcessThread = new CThreadManage();
        SyncProcessThread.BRuning = true;
        SyncProcessThread.BRuningEnd = true;
        SyncProcessThread.HThread = new Thread(new ThreadStart(SyncProcessFunc));
        SyncProcessThread.HThread.Start();
    }
    if (ThoroughlyThread == null)
    {
        ThoroughlyThread = new CThreadManage();
        ThoroughlyThread.BRuning = true;
        ThoroughlyThread.BRuningEnd = true;
        ThoroughlyThread.HThread = new Thread(new ThreadStart(ThoroughlyFunc));
        ThoroughlyThread.HThread.Start();
    }
    return true;
}

private Boolean CloseListeningThread(Boolean bCloseThread)
{
    if (bCloseThread)
    {
        if (SyncListeningThread != null)
        {
            SyncListeningThread.CloseThread();
            SyncListeningThread = null;
        }
        if (SyncProcessThread != null)
        {
            SyncProcessThread.CloseThread();
            SyncProcessThread = null;
        }
        if (ThoroughlyThread != null)
        {
            ThoroughlyThread.CloseThread();
            ThoroughlyThread = null;
        }
    }
    return Communication.instance.Close();
}

4.3 接收消息线程

接受消息,组帧,放入消息队列(如果是心跳消息,则设置时间即可)

private void SyncListeningFunc()
{
    try
    {
        Byte[] pRecv_Buffer = new Byte[CommonUse.MAX_FRAME_SIZE];
        Int32 iRecvdBufferLen = 0;

        while (SyncListeningThread.BRuning)
        {
            SyncListeningThread.BRuningEnd = false;

            iRecvdBufferLen = Communication.instance.Recv(ref pRecv_Buffer, CommonUse.MAX_FRAME_SIZE);
            if (iRecvdBufferLen > 0)
            {
                CMOIMFrame.instance.ProcessMsg(pRecv_Buffer, iRecvdBufferLen);
            }
            Thread.Sleep(10);
            SyncListeningThread.BRuningEnd = true;
        }
    }
    catch (ThreadAbortException)
    {
        return;
    }
    catch (Exception)
    {
        SyncListeningFunc();
    }
}

4.4 处理帧线程

根据收到的线程进行相应处理,比如如果操作本地数据库,则直接操作并返回;如果是透传命令,则直接透传即可。

private void SyncProcessFunc()
{
    try
    {
        Byte[] pRecv_Buffer = new Byte[CommonUse.MAX_FRAME_SIZE];
        Int32 iRecvdBufferLen = 0;


        while (SyncProcessThread.BRuning)
        {

            SyncProcessThread.BRuningEnd = false;

            if (MsgQueue.getInstance().popFrame(pRecv_Buffer, ref iRecvdBufferLen))
            {
                CRRDCommand RRDCmd = CRRDCommandFactory.createRRDCommand(pRecv_Buffer, iRecvdBufferLen);

                if (RRDCmd != null)
                {
                    RRDCmd.Execute();
                }
            }


            SyncProcessThread.BRuningEnd = true;

        }
    }
    catch (ThreadAbortException)
    {
        return;
    }
    catch (Exception)
    {
        SyncProcessFunc();
    }
}

2.5 透传线程

透传命令发送后,通过Wait…等待透传命令结果,并返回

private void ThoroughlyFunc()
{
    try
    {
        Byte[] pRecv_Buffer = new Byte[CommonUse.MAX_FRAME_SIZE];
        Int32 iRecvdBufferLen = 0;

        while (ThoroughlyThread.BRuning)
        {
            ThoroughlyThread.BRuningEnd = false;

            if (RSUComm.Delivery_Recv_MOIM(CDeviceManage.instance.GetRSUHandle(), ref iRecvdBufferLen, pRecv_Buffer, 0) == 0)
            {
                Communication.instance.Send(pRecv_Buffer, iRecvdBufferLen);
            }
            iRecvdBufferLen = 0;
            Thread.Sleep(10);

            ThoroughlyThread.BRuningEnd = true;
        }
    }
    catch (ThreadAbortException)
    {
        return;
    }
    catch (Exception)
    {
        ThoroughlyFunc();
    }
}

分类:

发表回复