前言
C#提供的多进程同步对象有互斥锁和信号量,但是并没有条件变量。虽然信号量条件变量一定程度可以等效,但是具体的使用还是会有区别。比如说消息队列用条件变量就比信号量方便,用信号量要么缺乏灵活性,要么辅助代码已经和实现一个条件变量没区别了。本文提供了一种条件变量的实现方法,可以用于进程间的同步控制。
一、关键实现
1、用到的主要对象
下列对象都是跨进程的
//互斥变量, Mutex _mtx; //等待发送信号量 Semaphore _waitSem; //等待完成信号量 Semaphore _waitDone; //共享内存,用于存储计数变量 MemoryMappedFile _mmf; //共享内存的读写对象 MemoryMappedViewAccessor _mmva;
2、初始化区分创建和打开
利用Mutex判断是创建还是打开
bool isCreateNew; _mtx = new Mutex(false, name, out isCreateNew); if(isCreateNew){ //只能在创建时,初始化共享变量 }
3、变量放到共享内存
条件变量需要的计算对象就两个Waiting、Signals表示等待数和释放数。
//放到共享内存的数据 struct SharedData { public int Waiting; public int Signals; } SharedData Data { set { _mmva.Write(0, ref value); } get { SharedData ret; _mmva.Read(0, out ret); return ret; } }
4、等待和释放逻辑
参考了SDL2的条件变量实现,具体略。有兴趣的朋友可以自行查找源码查看。
二、完整代码
using System.IO.MemoryMappedFiles; using System.Runtime.InteropServices; namespace AC { /************************************************************************ * @Project: AC::ConditionVariable * @Decription: 条件变量 * 支持跨进程 * @Verision: v1.0.0 * 更新日志 * v1.0.0:实现基本功能 * @Author: Xin * @Create: 2024/07/18 15:25:00 * @LastUpdate: 2024/07/21 20:53:00 ************************************************************************ * Copyright @ 2025. All rights reserved. ************************************************************************/ class ConditionVariable : IDisposable { /// <summary> /// 构造方法 /// </summary> public ConditionVariable() { bool isCreateNew; Initialize(null, out isCreateNew); } /// <summary> /// 构造方法 /// </summary> /// <param name="name">唯一名称,系统级别,不同进程创建相同名称的本对象,就是同一个条件变量。</param> public ConditionVariable(string? name) { bool isCreateNew; Initialize(name, out isCreateNew); } /// <summary> /// 构造方法 /// </summary> /// <param name="name">唯一名称,系统级别,不同进程创建相同名称的本对象,就是同一个条件变量。</param> /// <param name="isCreateNew">表示是否新创建,是则是创建,否则是打开已存在的。</param> public ConditionVariable(string? name, out bool isCreateNew) { Initialize(name, out isCreateNew); } /// <summary> /// 等待 /// </summary> /// <param name="outerMtx">外部锁</param> public void WaitOne(Mutex outerMtx) { WaitOne(Timeout.InfiniteTimeSpan, outerMtx); } /// <summary> /// 等待超时 /// </summary> /// <param name="timeout">超时时间</param> /// <param name="outerMtx">外部锁</param> /// <returns>是则成功,否则超时</returns> public bool WaitOne(TimeSpan timeout, Mutex outerMtx) { bool isNotTimeout; //记录等待数量 _mtx.WaitOne(); var ws = Data; ws.Waiting++; Data = ws; _mtx.ReleaseMutex(); //解除外部的互斥锁,让其他线程可以进入条件等待。 outerMtx.ReleaseMutex(); //等待信号 isNotTimeout = _waitSem.WaitOne(timeout); _mtx.WaitOne(); ws = Data; if (isNotTimeout && ws.Signals > 0) { //通知发送信号的线程,等待完成。 _waitDone.Release(); ws.Signals--; } ws.Waiting--; Data = ws; _mtx.ReleaseMutex(); //加上外部互斥锁,还原外部的锁状态。 outerMtx.WaitOne(); return !isNotTimeout; } /// <summary> /// 释放,通知 /// </summary> public void Release() { _mtx.WaitOne(); var ws = Data; if (ws.Waiting > ws.Signals) { ws.Signals++; Data = ws; _waitSem.Release(); _mtx.ReleaseMutex(); _waitDone.WaitOne(); } else { _mtx.ReleaseMutex(); } } /// <summary> /// 释放全部,广播 /// </summary> public void ReleaseAll() { _mtx.WaitOne(); var ws = Data; if (ws.Waiting > ws.Signals) { int waiting = ws.Waiting - ws.Signals; ws.Signals = ws.Waiting; Data = ws; _waitSem.Release(waiting); _mtx.ReleaseMutex(); _waitDone.WaitOne(waiting); } else { _mtx.ReleaseMutex(); } } /// <summary> /// 销毁对象,只会销毁当前实例,如果多个打开同个名称,其他对象不受影响 /// </summary> public void Dispose() { _mtx.Dispose(); _waitSem.Dispose(); _waitDone.Dispose(); _mmva.Dispose(); _mmf.Dispose(); } void Initialize(string? name, out bool isCreateNew) { Mutex? mtx = null; Semaphore? waitSem = null; Semaphore? waitDone = null; MemoryMappedFile? mmf = null; MemoryMappedViewAccessor? mmva = null; try { mtx = _mtx = new Mutex(false, name, out isCreateNew); _mtx.WaitOne(); try { waitSem = _waitSem = new Semaphore(0, int.MaxValue, name + ".cv.ws"); waitDone = _waitDone = new Semaphore(0, int.MaxValue, name + ".cv.wd"); var _shmPath = Path.Combine(_TempDirectory, name + ".cv"); mmf = _mmf = MemoryMappedFile.CreateFromFile(File.Open(_shmPath, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite), null, Marshal.SizeOf<SharedData>(), MemoryMappedFileAccess.ReadWrite, HandleInheritability.Inheritable, false); mmva = _mmva = _mmf.CreateViewAccessor(); if (isCreateNew) Data = new SharedData() { Signals = 0, Waiting = 0 }; } finally { _mtx.ReleaseMutex(); } } catch { mtx?.Dispose(); waitSem?.Dispose(); waitDone?.Dispose(); mmf?.Dispose(); mmva?.Dispose(); isCreateNew = false; throw; } } Mutex _mtx; Semaphore _waitSem; Semaphore _waitDone; MemoryMappedFile _mmf; MemoryMappedViewAccessor _mmva; struct SharedData { public int Waiting; public int Signals; } SharedData Data { set { _mmva.Write(0, ref value); } get { SharedData ret; _mmva.Read(0, out ret); return ret; } } static string _TempDirectory = Path.GetTempPath() + "EE3E9111-8F65-4D68-AB2B-A018DD9ECF3C"; } }
三、使用示例
1、同步控制
using AC; ConditionVariable cv = new ConditionVariable(); Mutex mutex = new Mutex(); string text = ""; //子线程发送消息 new Thread(() => { int n = 0; while (true) { mutex.WaitOne(); text = (n++).ToString(); //通知主线程 cv.Release(); mutex.ReleaseMutex(); } }).Start(); //主线程接收消息 while (true) { mutex.WaitOne(); //等待子消息 cv.WaitOne(mutex); Console.WriteLine(text); mutex.ReleaseMutex(); }
2、跨进程控制
进程A
//不同进程名称相同就是同一个对象 ConditionVariable cv = new ConditionVariable("cv1"); Mutex mutex = new Mutex(false,"mx1"); //进程A发送消息 while (true) { mutex.WaitOne(); //共享进程读写略 //通知进程B cv.Release(); mutex.ReleaseMutex(); }
进程B
//不同进程名称相同就是同一个对象 ConditionVariable cv = new ConditionVariable("cv1"); Mutex mutex = new Mutex(false,"mx1"); //进程B接收消息 while (true) { mutex.WaitOne(); //等待进A程消息 cv.WaitOne(mutex); //共享进程读写略 Console.WriteLine("收到进程A消息"); mutex.ReleaseMutex(); }
总结
以上就是今天要讲的内容,之所以实现这样一个对象是因为,笔者在写跨进程队列通信,用信号量实现发现有所局限,想要完善与重写一个条件变量差异不大,索性直接实现一个条件变量,提供给队列使用,同时还具体通用性,在其他地方也能使用。总的来说,条件变量还是有用的,虽然需要遇到相应的使用场景才能意识到它的作用。