当前位置:首页 > 联系方式 >

.Net中的并行编程-4.实现高性能异步队列

  • 2017-09-11 14:29:30
  • 浏览:
  • 作者:admin
.Net中的并行编程-4.实现高性能异步队列 该队列主要用于实时数据流的处理并简化多线程编程模型。设计该队列时考虑以下几点需求(需求来自公司的一个实际项目):

1. 支持多线程入队出队,尽量简化多线程编程的复杂度。

2. 支持事件触发机制,数据入队时才进行处理而不是使用定时处理机制, 而且内部能阻塞消费者线程。

3. 出队时数据处理的顺序要保证和入队时是一致的。

4. 容错性强,可以不间断运行。

以上需求点对应的解决方案:

1.ConcurrentQueue支持多线程而且多线程环境下的性能较高,对于多线程编程模型简化可用适配器模式可将消费者线程封装到队列内部,内部采用处理事件方式处理用户的任务。

2.对于事件触发机制首先信号量不适合,因为信号量达到指定数目时会阻塞线程,所以该部分需要自己编程实现(具体参考源码)。

3.队列的特性以及保证入队和出队顺序,采集软件,这里需要保证的是线程处理数据项的顺序。

4.可通过注册异常处理函数的方式解决异常的问题。

所以开发出以下代码:

复制代码

public class AsynQueue<T>

{

//队列是否正在处理数据

private int isProcessing;

//有线程正在处理数据

private const int Processing = 1;

//没有线程处理数据

private const int UnProcessing = 0;

//队列是否可用

private volatile bool enabled = true;

private Task currentTask;

public event Action<T> ProcessItemFunction;

public event EventHandler<EventArgs<Exception>> ProcessException;

private ConcurrentQueue<T> queue;

public AsynQueue()

{

queue = new ConcurrentQueue<T>();

Start();

}

public int Count

{

get

{

return queue.Count;

}

}

private void Start()

{

Thread process_Thread = new Thread(PorcessItem);

process_Thread.IsBackground = true;

process_Thread.Start();

}

public void Enqueue(T items)

{

if (items == null)

{

throw new ArgumentException("items");

}

queue.Enqueue(items);

DataAdded();

}

//数据添加完成后通知消费者线程处理

private void DataAdded()

{

if (enabled)

{

if (!IsProcessingItem())

{

currentTask = Task.Factory.StartNew(ProcessItemLoop);

}

}

}

//判断是否队列有线程正在处理

private bool IsProcessingItem()

{

return !(Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0);

}

private void ProcessItemLoop()

{

if (!enabled && queue.IsEmpty)

{

Interlocked.Exchange(ref isProcessing, 0);

return;

}

//处理的线程数 是否小于当前最大任务数

//if (Thread.VolatileRead(ref runingCore) <= this.MaxTaskCount)

//{

T publishFrame;

if (queue.TryDequeue(out publishFrame))

{

try

{

ProcessItemFunction(publishFrame);

}

catch (Exception ex)

{

OnProcessException(ex);

}

}

if (enabled && !queue.IsEmpty)

{

currentTask = Task.Factory.StartNew(ProcessItemLoop);

}

else

{

Interlocked.Exchange(ref isProcessing, UnProcessing);

}

}

/// <summary>

///定时处理线程调用函数

///主要是监视入队的时候线程 没有来的及处理的情况

/// </summary>

private void PorcessItem(object state)

{

int sleepCount = 0;

int sleepTime = 1000;

while (enabled)

{

//如果队列为空则根据循环的次数确定睡眠的时间

if (queue.IsEmpty)

{

if (sleepCount == 0)

{

sleepTime = 1000;

}

else if (sleepCount <= 3)

{

sleepTime = 1000 * 3;

}

else

{

sleepTime = 1000 * 50;

}

sleepCount++;

Thread.Sleep(sleepTime);

}

else

{

//判断是否队列有线程正在处理

if (enabled && Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0)

{

if (!queue.IsEmpty)

{

currentTask = Task.Factory.StartNew(ProcessItemLoop);

}

else

{

Interlocked.Exchange(ref isProcessing, 0);

}

sleepCount = 0;

sleepTime = 1000;

}

}

}

}

public void Flsuh()

{

Stop();

if (currentTask != null)

{

currentTask.Wait();

}

while (!queue.IsEmpty)

{

try

{

T publishFrame;

if (queue.TryDequeue(out publishFrame))

{

ProcessItemFunction(publishFrame);

}

}

catch (Exception ex)

{

OnProcessException(ex);

}

}

currentTask = null;

}

public void Stop()

{

this.enabled = false;

}

private void OnProcessException(System.Exception ex)

{

var tempException = ProcessException;

Interlocked.CompareExchange(ref ProcessException, null, null);

if (tempException != null)

{

ProcessException(ex, new EventArgs<Exception>(ex));

}

}

}

复制代码

企业建站2800元起,携手武汉肥猫科技,做一个有见地的颜值派!更多优惠请戳:襄阳网站建设 http://xiangyang.45qun.com

上一篇:PHP设计模式——抽象工厂

下一篇:最后一页