C#互斥消息队列

类似生产者和消费者,使用信号量控制队里里面数据量,使用lock控制一次只有一个线程操作队列

1、信号量,用于同步消息队列

using System;

using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace ZXRIS_RRD_9800
{
    class Semaphore
    {
        public Semaphore(int init)
        {
            value_ = init;
            event_ = new AutoResetEvent(false);
        }

        public int Release()
        {
            int newVal = Interlocked.Increment(ref value_);
            if (newVal <= 0)
            {
                 event_.Set();
            }
            return --newVal;
        }

        public bool WaitOne()
        {
            int newVal = Interlocked.Decrement(ref value_);
            if (newVal < 0)
            {
                if (!event_.WaitOne())
                {
                    Interlocked.Increment(ref value_);
                    return false;
                }
            }

            return true;
        }

        public bool WaitOne(int ms)
        {
            int newVal = Interlocked.Decrement(ref value_);
            if (newVal < 0)
            {
                if (!event_.WaitOne(ms, false))
                {
                    Interlocked.Increment(ref value_);
                    return false;
                }
            }

            return true;
        }

        public int Count()
        {
            return value_;
        }

        public void Clear()
        {
            value_ = 0;
            event_.Reset();
        }

        private int value_;
        private AutoResetEvent event_;

    }
}

2、消息队列,用信号量控制

using System;

using System.Collections.Generic;
using System.Text;

namespace **************
{
    public class AutoQueue<T>
    {
        public T Dequeue()
        {
            sem_.WaitOne();

            lock (lockObj_)
            {
                return queue_.Dequeue();
            }
        }

        public bool Dequeue(int ms, out T t)
        {
            if (!sem_.WaitOne(ms))
            {
                t = default(T);
                return false;
            }

            lock (lockObj_)
            {
                t = queue_.Dequeue();
            }
            return true;
        }

        public bool Peek(int ms, out T t)
        {
            if (!WaitAvailable(ms))
            {
                t = default(T);
                return false;
            }

            lock (lockObj_)
            {
                t = queue_.Peek();
            }
            return true;
        }

        private bool WaitAvailable(int ms)
        {
            int count = sem_.Count();
            if (count > 0)
                return true;

            System.Threading.Thread.Sleep(ms);

            return sem_.Count() > 0;
        }

        public void Enqueue(T t)
        {
            lock (lockObj_)
            {
                queue_.Enqueue(t);
            }

            sem_.Release();
        }

        public int Count
        {
            get
            {
                lock (lockObj_)
                {
                    return queue_.Count;
                }
            }
        }

        public bool Empty
        {
            get
            {
                lock (lockObj_)
                {
                    return (queue_.Count == 0);
                }
            }
        }

        public void Clear()
        {
            lock (lockObj_)
            {
                queue_.Clear();
                sem_.Clear();
            }
        }

        private Queue<T> queue_ = new Queue<T>();
        private Object lockObj_ = new Object();
        private Semaphore sem_ = new Semaphore(0);
    };
}

3、封装消息队列,存放数据帧

using System;

using System.Collections.Generic;
using System.Text;

namespace **************
{
    public class msgFrame
    {
        public Byte[] frame;
        public int framelen;
        public msgFrame(Byte[] frame, int len)
        {
            framelen = len;
            this.frame = new Byte[len];
            Array.Copy(frame, this.frame, len);
        }
        public msgFrame(int len)
        {
            frame = new Byte[len];
            framelen = 0;
        }
    }
    public class MsgQueue
    {
        private static MsgQueue instance = new MsgQueue();
        private MsgQueue() 
        {
            msgqueue = new AutoQueue<msgFrame>();
        }
        public static MsgQueue getInstance()
        {
            return instance;
        }

        private AutoQueue<msgFrame> msgqueue = null;

        public void pushFrame(Byte[] pframe, int framelen)
        {
            try
            {
                msgqueue.Enqueue(new msgFrame(pframe, framelen));
            }
            catch (System.Exception e)
            {
            	
            }
        }


        private msgFrame popframe = new msgFrame(CommonUse.MAX_FRAME_SIZE);
        public Boolean popFrame(Byte[] pframe, ref int framelen)
        {
            if (msgqueue.Dequeue(100, out popframe))
            {
                Array.Copy(popframe.frame, pframe, popframe.framelen);
                framelen = popframe.framelen;
                return true;
            }
            return false;
        }
    }
}
分类:

发表回复