在C#中实现生产者/消费者模式。
实现生产者/消费者模式的原因是为了实现一个简单的通用生产者/消费者作业队列。通过使用Job类来存储包含委托形式的对象方法调用,当作业被处理时,委托会被调用。任何相关的参数也会被存储在Job类中。
通过这种简单的模式,可以实现在入队和出队过程中的多线程。实际上,这只是最简单的部分:多线程给代码带来了新的挑战,您以后会注意到它们。
下面的代码是一个实现了生产者/消费者模式的示例。
using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Threading; namespace MyNamespace { public class Program { public static void Main(string[] args) { MyApplication app = new MyApplication(); app.Run(); } } public class MyApplication { private BlockingCollectionJobQueue = new BlockingCollection (); private CancellationTokenSource JobCancellationTokenSource = new CancellationTokenSource(); private CancellationToken JobCancellationToken; private Timer Timer; private Thread UserInputThread; public void Run() { Thread.CurrentThread.Name = "Main"; Timer = new Timer(new TimerCallback(TimerCallback), null, 1000, 2000); UserInputThread = new Thread(new ThreadStart(ReadUserInputs)) { Name = "UserInputs", IsBackground = true }; UserInputThread.Start(); JobCancellationToken = JobCancellationTokenSource.Token; ProcessJobs(); JobQueue.Dispose(); Timer.Dispose(); UserInputThread.Abort(); Console.WriteLine("Done."); } private void ProcessJobs() { try { while (!JobQueue.IsCompleted) { JobQueue.Take(JobCancellationToken).Run(); } } catch { } } private void ReadUserInputs() { ConsoleKey key = ConsoleKey.Enter; while ((key = Console.ReadKey(true).Key) != ConsoleKey.Escape) { Job userInputJob = new Job("UserInput", this, new Action (ProcessUserInputs), key); JobQueue.Add(userInputJob); } JobCancellationTokenSource.Cancel(); } private void ProcessUserInputs(ConsoleKey key) { Console.WriteLine($"You just typed '{key}'. (Thread: {Thread.CurrentThread.Name})"); } private void TimerCallback(object param) { Job job = new Job("TimerJob", this, new Action (ProcessTimer), "A job from timer callback was processed."); JobQueue.TryAdd(job); } private void ProcessTimer(string message) { Console.WriteLine($"{message} (Thread: {Thread.CurrentThread.Name})"); } } public class Job { public string Name { get; } private object TargetObject; private Delegate TargetMethod; private object[] Arguments; public Job(string name, object obj, Delegate method, params object[] args) { Name = name; TargetObject = obj; TargetMethod = method; Arguments = args; } public void Run() { try { TargetMethod.Method.Invoke(TargetObject, Arguments); } catch (Exception ex) { Debug.WriteLine($"Unexpected error running job '{Name}': {ex}"); } } } }
以上代码是一个使用C#实现的生产者/消费者模式的示例。