前言
进程通信一般情况下比较少用,但是也有一些使用场景,有些做视频传输的似乎会用多进程来实现,还有在子进程中调用特定的库来避免内存泄漏,笔者最近也遇到了需要使用多进程的场景。多进程的使用最主要的就是进程间的通信,本文参考了go语言的ipc库,实现了一个基于共享内存的跨进程队列。
一、实现原理
1、用到的主要对象
//共享内存管理对象 MemoryMappedFile _mmf; //跨进程的互斥变量 Mutex _mtx; //入队信号量 Semaphore _semaEq; //出队信号量 Semaphore _semaDq;
2、创建共享内存
创建共享内存需要使用MemoryMappedFile.CreateFromFile
实现跨平台。CreateNew只能创建无法打开第二个,OpenExisting只支持windows。
string name="共享内存标识名称"; _shmPath="共享内存文件路径"+name; //通过文件路径创建共享内存 _mmf = MemoryMappedFile.CreateFromFile(File.Open(_shmPath, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite), null, (_QueuetHeaderSize + (elementBodyMaxSize + _ElementHeaderSize) * capacity), MemoryMappedFileAccess.ReadWrite, HandleInheritability.Inheritable, false); //创建互斥变量 _mtx = new Mutex(false, Name + ".mx"); //创建入队信号量,capacity为队列元素个数容量 _semaEq = new Semaphore(0, (int)capacity, name+ ".eq"); //创建出队信号量,capacity为队列元素个数容量 _semaDq = new Semaphore((int)capacity, (int)capacity, name + ".dq");
获取读写对象
_mmva = _mmf.CreateViewAccessor();
值类型数组方式写入
T[] obj; _mmva.WriteArray<T>(position , obj, 0, obj.Length);
3、头部信息
采用循环队列方式实现,判断队空队满通过count、capacity的方式(参考了C#的Queue源码),避免占用多一个空间。
struct QueueHeader { //元素大小 public nint ElementSize; //队列容量 public nint Capacity; //当前元素个数 public nint Count; //队列头 public nint Front; //队列尾 public nint Rear; }
队列头信息需要存储在共享内存中。
QueueHeader Header { get { QueueHeader header; _mmva.Read(0, out header); return header; } set { _mmva.Write(0, ref value); } }
4、入队
示例如下
bool Enqueuee<T>(T[] obj) where T : struct { //共享内存中读取header var header = Header; //队列满返回 if (header.Count == header.Capacity) return false; //计算写入的位置,头部长度+队尾*元素大小 nint position = _QueuetHeaderSize + header.Rear * header.ElementSize; //写入共享内存 _mmva.WriteArray<T>(position, obj, 0, obj.Length); //更新队尾 header.Rear = (header.Rear + 1) % header.Capacity; //更新长度 header.Count++; //更新头部信息到共享内存 Header=header; return true; }
同步
//等待出队信号量(如果队列满则会等待) if (!_semaDq.WaitOne(timeout)) return false; //进入互斥锁 if (!_mtx.WaitOne(timeout)) return false; try { //入队 Enqueue(obj); } finally { //通知入队信号量 _semaEq.Release(); //释放互斥锁 _mtx.ReleaseMutex(); } return true;
5、出队
object Dequeue() { //共享内存中读取header var header = Header; //队列空则返回 if (header.Count == 0) return null; //计算读取的位置,头部长度+队头*元素大小 long position = _QueuetHeaderSize + header.Front * header.ElementSize; //创建数据用于装载数据 Array arr = Array.CreateInstance(readType, msg.Header.ArrayLength); //将泛型转type调用。 var readArray = _ReadArrayGeneric.MakeGenericMethod(readType); //读取共享内存的数据 readArray.Invoke(_mmva, [position , arr, 0, arr.Length]); //更新队头 header.Front = (header.Front + 1) % header.Capacity; //更新长度 header.Count--; //更新头部信息到共享内存 Header = header; return msg; }
同步
//等待入队信号量(如果队列空则会等待) if (!_semaEq.WaitOne(timeout)) return null; //进入互斥锁 if (!_mtx.WaitOne(timeout!)) return null; try { //出队 return Dequeue(); } finally { //通知入队信号量 _semaDq.Release(); //释放互斥锁 _mtx.ReleaseMutex(); }
6、释放资源
/// <summary> /// 销毁队列,只会销毁当前实例,如果多个队列打开同个名称,其他队列不受影响 /// </summary> public void Dispose() { _mmf.Dispose(); _mmva.Dispose(); _mtx.Dispose(); _semaEq.Dispose(); _semaDq.Dispose(); }
二、完整代码
类的定义
/// <summary> /// 共享队列 /// 基于共享内存实现 /// </summary> class SharedQueue : IDisposable { /// <summary> /// 名称 /// </summary> public string Name { get; private set; } /// <summary> /// 元素最大大小 /// </summary> public long ElementMaxSize { get; private set; } /// <summary> /// 队列容量 /// </summary> public long Capacity { get; private set; } /// <summary> /// 表示是否新创建,是则是创建,否则是打开已存在的。 /// </summary> public bool IsNewCreate { get; private set; } /// <summary> /// 构造方法 /// </summary> /// <param name="name">唯一名称,系统级别,不同进程创建相同名称的本对象,就是同一个队列,可以进行数据传输。</param> /// <param name="capacity">队列容量,元素个数总量</param> /// <param name="elementBodyMaxSize">队列元素最大大小,此大小需要考虑传输数据Type.FullName长度</param> public SharedQueue(string name, nint capacity = 1, nint elementBodyMaxSize = 3145728); /// <summary> /// 发送数据 /// </summary> /// <param name="obj">发送的对象,支持值类型(元类型、结构体)、值类型数组、可json序列化的任意对象(实体类、数组、List、字典等等),无法序列化会产生异常。 /// 会根据类型自动判断传输方式,值类型以及值类型数组会直接内存拷贝,引用类型会进行序列化。 /// 此方法队列满了会阻塞,直到发送成功才返回。 /// </param> /// <param name="isForceSerialize">是否强制序列化,结构体不含引用的情况下会直接复制数据性能较高,但是如果结构体成员变量有引用类型则会引发异常,此时可以强制序列化。</param> public void Send(object obj, bool isForceSerialize = false); /// <summary> /// 接收数据 /// 此方法队列空会阻塞,直到有数据才返回。 /// </summary> /// <returns>接收的数据,与send的数据类型对应。可以通过type或is判断,或者提前知道类型直接转换</returns> public object Receive(); /// <summary> /// 发送数据超时 /// </summary> /// <param name="obj">发送的对象,支持值类型(元类型、结构体)、值类型数组、可json序列化的任意对象(实体类、数组、List、字典等等),无法序列化会产生异常。 /// 会根据类型自动判断传输方式,值类型以及值类型数组会直接内存拷贝,引用类型会进行序列化。</param> /// <param name="timeout">超时时长</param> /// <param name="isForceSerialize">是否强制序列化,结构体不含引用的情况下会直接复制数据性能较高,但是如果结构体成员变量有引用类型则会引发异常,此时可以强制序列化。</param> /// <returns>true发送成功,false超时</returns> public bool SendTimeout(object obj, TimeSpan timeout, bool isForceSerialize = false); /// <summary> /// 接收超时 /// </summary> /// <param name="timeout">超时时长</param> /// <returns>接收的数据,与send的数据类型对应。可以通过type或is判断,或者提前知道类型直接转换。 /// 超时返回null。 /// </returns> public object? ReceiveTimeout(TimeSpan timeout); /// <summary> /// 销毁队列,只会销毁当前实例,如果多个队列打开同个名称,其他队列不受影响 /// </summary> public void Dispose(); }
三、使用示例
1、传输byte[]数据
进程a
SharedQueue shq= new SharedQueue("shq1", 10); byte[] a = new byte[5] { 1, 2, 3, 4, 5 }; //发送数据 shq.send(a);
进程b
SharedQueue shq= new SharedQueue("shq1", 10); //接收数据 var a=shq.Receive() as byte[]; Console.Write("receive: "); foreach (var i in a) { Console.Write(i); }
2、传输字符串
进程a
SharedQueue shq= new SharedQueue("shq1", 10,64); shq.send("12345");
进程b
SharedQueue shq= new SharedQueue("shq1", 10,64); var a=shq.Receive() as string; Console.WriteLine("receive: " + a);
3、传输对象
class A { public string Name; public int Number; }
进程a
SharedQueue shq= new SharedQueue("shq1", 10,64); sq.Send(new A() { Name = "Tommy", Number = 102185784 });
进程b
SharedQueue shq= new SharedQueue("shq1", 10,64); var a=shq.Receive() as A; Console.WriteLine("receive: " + a.Name + " " + a.Number);
总结
以上就是今天要讲的内容,实现这样的一个对象,虽然代码量不多,但还是有一点难度的,很多细节需要处理,比如泛型转type以统一接口,信号量实现队列和条件变量是有差异的,用CreateFromFile才能实现跨平台。总的来说,有了这样的一个队列,跨线程通信就变的比较方便且高效了。