//------------------------------------------------------------------------------ // 此代码版权(除特别声明或在XREF结尾的命名空间的代码)归作者本人若汝棋茗所有 // 源代码使用协议遵循本仓库的开源协议及附加协议,若本仓库没有设置,则按MIT开源协议授权 // CSDN博客:https://blog.csdn.net/qq_40374647 // 哔哩哔哩视频:https://space.bilibili.com/94253567 // Gitee源代码仓库:https://gitee.com/RRQM_Home // Github源代码仓库:https://github.com/RRQM // API首页:https://www.yuque.com/rrqm/touchsocket/index // 交流QQ群:234762506 // 感谢您的下载和使用 //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ using System; using System.Collections.Concurrent; using System.Threading; namespace TouchSocket.Core { /// /// 队列数据 /// public interface IQueueData { /// /// 数据长度 /// int Size { get; } } /// /// 传输字节 /// public class QueueDataBytes : IQueueData { /// /// 构造函数 /// /// /// /// public QueueDataBytes(byte[] buffer, int offset, int length) { Offset = offset; Length = length; Buffer = buffer; Size = length; } /// /// 从指定内存创建一个新对象,且内存也为新创建。 /// /// /// /// /// public static QueueDataBytes CreateNew(byte[] buffer, int offset, int length) { byte[] buf = new byte[length]; Array.Copy(buffer, offset, buf, 0, length); return new QueueDataBytes(buf); } /// /// 构造函数 /// /// public QueueDataBytes(byte[] buffer) : this(buffer, 0, buffer.Length) { } /// /// 数据内存 /// public byte[] Buffer { get; } /// /// 长度 /// public int Length { get; } /// /// 偏移 /// public int Offset { get; } /// /// 尺寸 /// public int Size { get; } } /// /// 智能数据安全队列 /// /// public class IntelligentDataQueue : ConcurrentQueue where T : IQueueData { private long m_actualSize; private bool m_free; private long m_maxSize; private Action m_onQueueChanged; private bool m_overflowWait; /// /// 构造函数 /// /// public IntelligentDataQueue(long maxSize) { m_free = true; m_overflowWait = true; MaxSize = maxSize; } /// /// 构造函数 /// public IntelligentDataQueue() : this(1024 * 1024 * 10) { } /// /// 实际尺寸 /// public long ActualSize => m_actualSize; /// /// 是否有空位允许入队 /// public bool Free => m_free; /// /// 允许的最大长度 /// public long MaxSize { get => m_maxSize; set { if (value < 1) { value = 1; } m_maxSize = value; } } /// /// 在队列修改时 /// public Action OnQueueChanged { get => m_onQueueChanged; set => m_onQueueChanged = value; } /// /// 溢出等待 /// public bool OverflowWait { get => m_overflowWait; set => m_overflowWait = value; } /// /// 超时时间。默认1000*30ms; /// public int Timeout { get; set; } = 1000 * 30; /// /// 清空队列 /// public void Clear(Action onClear) { while (base.TryDequeue(out T t)) { onClear?.Invoke(t); } } /// /// 入队 /// /// public new void Enqueue(T item) { lock (this) { bool free = m_actualSize < m_maxSize; if (m_free != free) { m_free = free; m_onQueueChanged?.Invoke(m_free); } if (m_overflowWait) { SpinWait.SpinUntil(Check, Timeout); } Interlocked.Add(ref m_actualSize, item.Size); base.Enqueue(item); } } /// /// 出队 /// /// /// public new bool TryDequeue(out T result) { if (base.TryDequeue(out result)) { Interlocked.Add(ref m_actualSize, -result.Size); bool free = m_actualSize < m_maxSize; if (m_free != free) { m_free = free; m_onQueueChanged?.Invoke(m_free); } return true; } return false; } private bool Check() { return m_actualSize < m_maxSize; } } }