类似生产者和消费者,使用信号量控制队里里面数据量,使用lock控制一次只有一个线程操作队列
1、信号量,用于同步消息队列
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace ZXRIS_RRD_9800
{
class Semaphore
{
public Semaphore(int init)
{
value_ = init;
event_ = new AutoResetEvent(false);
}
public int Release()
{
int newVal = Interlocked.Increment(ref value_);
if (newVal <= 0)
{
event_.Set();
}
return --newVal;
}
public bool WaitOne()
{
int newVal = Interlocked.Decrement(ref value_);
if (newVal < 0)
{
if (!event_.WaitOne())
{
Interlocked.Increment(ref value_);
return false;
}
}
return true;
}
public bool WaitOne(int ms)
{
int newVal = Interlocked.Decrement(ref value_);
if (newVal < 0)
{
if (!event_.WaitOne(ms, false))
{
Interlocked.Increment(ref value_);
return false;
}
}
return true;
}
public int Count()
{
return value_;
}
public void Clear()
{
value_ = 0;
event_.Reset();
}
private int value_;
private AutoResetEvent event_;
}
}
2、消息队列,用信号量控制
using System;
using System.Collections.Generic;
using System.Text;
namespace **************
{
public class AutoQueue<T>
{
public T Dequeue()
{
sem_.WaitOne();
lock (lockObj_)
{
return queue_.Dequeue();
}
}
public bool Dequeue(int ms, out T t)
{
if (!sem_.WaitOne(ms))
{
t = default(T);
return false;
}
lock (lockObj_)
{
t = queue_.Dequeue();
}
return true;
}
public bool Peek(int ms, out T t)
{
if (!WaitAvailable(ms))
{
t = default(T);
return false;
}
lock (lockObj_)
{
t = queue_.Peek();
}
return true;
}
private bool WaitAvailable(int ms)
{
int count = sem_.Count();
if (count > 0)
return true;
System.Threading.Thread.Sleep(ms);
return sem_.Count() > 0;
}
public void Enqueue(T t)
{
lock (lockObj_)
{
queue_.Enqueue(t);
}
sem_.Release();
}
public int Count
{
get
{
lock (lockObj_)
{
return queue_.Count;
}
}
}
public bool Empty
{
get
{
lock (lockObj_)
{
return (queue_.Count == 0);
}
}
}
public void Clear()
{
lock (lockObj_)
{
queue_.Clear();
sem_.Clear();
}
}
private Queue<T> queue_ = new Queue<T>();
private Object lockObj_ = new Object();
private Semaphore sem_ = new Semaphore(0);
};
}
3、封装消息队列,存放数据帧
using System;
using System.Collections.Generic;
using System.Text;
namespace **************
{
public class msgFrame
{
public Byte[] frame;
public int framelen;
public msgFrame(Byte[] frame, int len)
{
framelen = len;
this.frame = new Byte[len];
Array.Copy(frame, this.frame, len);
}
public msgFrame(int len)
{
frame = new Byte[len];
framelen = 0;
}
}
public class MsgQueue
{
private static MsgQueue instance = new MsgQueue();
private MsgQueue()
{
msgqueue = new AutoQueue<msgFrame>();
}
public static MsgQueue getInstance()
{
return instance;
}
private AutoQueue<msgFrame> msgqueue = null;
public void pushFrame(Byte[] pframe, int framelen)
{
try
{
msgqueue.Enqueue(new msgFrame(pframe, framelen));
}
catch (System.Exception e)
{
}
}
private msgFrame popframe = new msgFrame(CommonUse.MAX_FRAME_SIZE);
public Boolean popFrame(Byte[] pframe, ref int framelen)
{
if (msgqueue.Dequeue(100, out popframe))
{
Array.Copy(popframe.frame, pframe, popframe.framelen);
framelen = popframe.framelen;
return true;
}
return false;
}
}
}