//------------------------------------------------------------------------------ // 此代码版权(除特别声明或在XREF结尾的命名空间的代码)归作者本人若汝棋茗所有 // 源代码使用协议遵循本仓库的开源协议及附加协议,若本仓库没有设置,则按MIT开源协议授权 // CSDN博客:https://blog.csdn.net/qq_40374647 // 哔哩哔哩视频:https://space.bilibili.com/94253567 // Gitee源代码仓库:https://gitee.com/RRQM_Home // Github源代码仓库:https://github.com/RRQM // API首页:https://www.yuque.com/rrqm/touchsocket/index // 交流QQ群:234762506 // 感谢您的下载和使用 //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ using System; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using TouchSocket.Core; namespace TouchSocket.Rpc.TouchRpc { /// /// 通道 /// [DebuggerDisplay("ID={ID},Status={Status}")] public abstract class Channel : DisposableObject { /// /// 是否具有数据可读 /// public abstract int Available { get; } /// /// 缓存容量 /// public abstract int CacheCapacity { get; set; } /// /// 判断当前通道能否调用 /// public abstract bool CanMoveNext { get; } /// /// 能否写入 /// public abstract bool CanWrite { get; } /// /// 通道ID /// public abstract int ID { get; } /// /// 最后一次操作时显示消息 /// public abstract string LastOperationMes { get; } /// /// 状态 /// public abstract ChannelStatus Status { get; } /// /// 目的ID地址。仅当该通道由两个客户端打通时有效。 /// public abstract string TargetId { get; } /// /// 超时时间,默认1000*10ms。 /// public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(10); /// /// 是否被使用 /// public abstract bool Using { get; } /// /// 取消 /// public abstract void Cancel(string operationMes = null); /// /// 异步取消 /// /// public Task CancelAsync(string operationMes = null) { return EasyTask.Run(() => { this.Cancel(operationMes); }); } /// /// 完成操作 /// public abstract void Complete(string operationMes = null); /// /// 异步完成操作 /// /// public Task CompleteAsync(string operationMes = null) { return EasyTask.Run(() => { this.Complete(operationMes); }); } /// /// 获取当前的数据 /// public abstract byte[] GetCurrent(); /// /// 继续。 /// 调用该指令时,接收方会跳出接收,但是通道依然可用,所以接收方需要重新调用 /// /// public abstract void HoldOn(string operationMes = null); /// /// 异步调用继续 /// /// /// public Task HoldOnAsync(string operationMes = null) { return EasyTask.Run(() => { this.HoldOn(operationMes); }); } /// /// 转向下个元素 /// /// public abstract bool MoveNext(); /// /// 转向下个元素 /// /// public Task MoveNextAsync() { return EasyTask.Run(() => { return this.MoveNext(); }); } /// /// 阻塞读取数据,直到有数据,或者超时。 /// /// public Task ReadAsync() { return EasyTask.Run(() => { if (this.MoveNext()) { return this.GetCurrent(); } return null; }); } /// /// 尝试写入。 /// /// /// /// /// public bool TryWrite(byte[] data, int offset, int length) { if (this.CanWrite) { try { this.Write(data, offset, length); return true; } catch { } } return false; } /// /// 尝试写入 /// /// /// public bool TryWrite(byte[] data) { return this.TryWrite(data, 0, data.Length); } /// /// 异步尝试写入 /// /// /// /// /// public async Task TryWriteAsync(byte[] data, int offset, int length) { if (this.CanWrite) { try { await this.WriteAsync(data, offset, length); return true; } catch { } } return false; } /// /// 异步尝试写入 /// /// /// public Task TryWriteAsync(byte[] data) { return this.TryWriteAsync(data, 0, data.Length); } /// /// 写入通道 /// /// /// /// public abstract void Write(byte[] data, int offset, int length); /// /// 写入通道 /// /// public void Write(byte[] data) { this.Write(data, 0, data.Length); } /// /// 写入通道 /// /// /// /// public Task WriteAsync(byte[] data, int offset, int length) { return Task.Run(() => { this.Write(data, offset, length); }); } /// /// 写入通道 /// /// public void WriteAsync(byte[] data) { this.WriteAsync(data, 0, data.Length); } } internal class InternalChannel : Channel { private readonly RpcActor m_actor; private readonly IntelligentDataQueue m_dataQueue; private readonly AutoResetEvent m_moveWaitHandle; private readonly string m_targetId; private readonly Timer m_timer; private int m_cacheCapacity; private volatile bool m_canFree; private byte[] m_currentData; private int m_id; private string m_lastOperationMes; private DateTime m_lastOperationTime; private ChannelStatus m_status; private bool m_using; public InternalChannel(RpcActor client, string targetId) { this.m_actor = client; this.m_lastOperationTime = DateTime.Now; this.m_targetId = targetId; this.m_status = ChannelStatus.Default; this.m_cacheCapacity = 1024 * 1024 * 5; this.m_dataQueue = new IntelligentDataQueue(this.m_cacheCapacity) { OverflowWait = false, OnQueueChanged = OnQueueChanged }; this.m_moveWaitHandle = new AutoResetEvent(false); this.m_canFree = true; this.m_timer = new Timer((o) => { if (DateTime.Now - this.m_lastOperationTime > TimeSpan.FromTicks(this.Timeout.Ticks * 3)) { this.SafeDispose(); } }, null, 1000, 1000); } /// /// 析构函数 /// ~InternalChannel() { this.Dispose(false); } /// /// 是否具有数据可读 /// public override int Available => this.m_dataQueue.Count; /// /// 缓存容量 /// public override int CacheCapacity { get => this.m_cacheCapacity; set { if (value < 0) { value = 1024; } this.m_cacheCapacity = value; this.m_dataQueue.MaxSize = value; } } /// /// 判断当前通道能否调用 /// public override bool CanMoveNext { get { if (this.Available > 0) { return true; } if ((byte)this.m_status >= 4) { return false; } return true; } } /// /// 能否写入 /// public override bool CanWrite => (byte)this.m_status <= 3; /// /// ID /// public override int ID => this.m_id; /// /// 最后一次操作时显示消息 /// public override string LastOperationMes { get => this.m_lastOperationMes; } /// /// 状态 /// public override ChannelStatus Status => this.m_status; /// /// 目的ID地址。 /// public override string TargetId => this.m_targetId; /// /// 是否被使用 /// public override bool Using => this.m_using; #region 操作 /// /// 取消 /// public override void Cancel(string operationMes = null) { if ((byte)this.m_status > 3) { return; } try { this.RequestCancel(true); var channelPackage = new ChannelPackage() { ChannelId = this.m_id, RunNow = true, DataType = ChannelDataType.CancelOrder, Message = operationMes, SourceId = this.m_actor.ID, TargetId = this.m_targetId, Route = this.m_targetId.HasValue() }; this.m_actor.SendChannelPackage(channelPackage); this.m_lastOperationTime = DateTime.Now; } catch { } } /// /// 完成操作 /// public override void Complete(string operationMes = null) { if ((byte)this.m_status > 3) { return; } this.RequestComplete(true); var channelPackage = new ChannelPackage() { ChannelId = this.m_id, RunNow = true, DataType = ChannelDataType.CompleteOrder, Message = operationMes, SourceId = this.m_actor.ID, TargetId = this.m_targetId, Route = this.m_targetId.HasValue() }; this.m_actor.SendChannelPackage(channelPackage); this.m_lastOperationTime = DateTime.Now; } /// /// 继续。 /// 调用该指令时,接收方会跳出接收,但是通道依然可用,所以接收方需要重新调用 /// /// public override void HoldOn(string operationMes = null) { if ((byte)this.m_status > 3) { return; } var channelPackage = new ChannelPackage() { ChannelId = this.m_id, RunNow = true, DataType = ChannelDataType.HoldOnOrder, Message = operationMes, SourceId = this.m_actor.ID, TargetId = this.m_targetId, Route = this.m_targetId.HasValue() }; this.m_actor.SendChannelPackage(channelPackage); this.m_lastOperationTime = DateTime.Now; } /// /// /// /// protected override void Dispose(bool disposing) { try { this.m_timer.SafeDispose(); this.RequestDispose(true); if ((byte)this.m_status > 3) { return; } var channelPackage = new ChannelPackage() { ChannelId = this.m_id, RunNow = true, DataType = ChannelDataType.HoldOnOrder, SourceId = this.m_actor.ID, TargetId = this.m_targetId, Route = this.m_targetId.HasValue() }; this.m_actor.SendChannelPackage(channelPackage); this.m_lastOperationTime = DateTime.Now; } catch { } base.Dispose(disposing); } #endregion 操作 /// /// 获取当前的数据 /// public override byte[] GetCurrent() { return this.m_currentData; } /// /// 转向下个元素 /// /// public override bool MoveNext() { if (!this.CanMoveNext) { return false; } if (this.m_dataQueue.TryDequeue(out ChannelPackage channelPackage)) { switch (channelPackage.DataType) { case ChannelDataType.DataOrder: { this.m_currentData = channelPackage.Data.Array; return true; } case ChannelDataType.CompleteOrder: this.RequestComplete(true); return false; case ChannelDataType.CancelOrder: this.RequestCancel(true); return false; case ChannelDataType.DisposeOrder: this.RequestDispose(true); return false; case ChannelDataType.HoldOnOrder: this.m_status = ChannelStatus.HoldOn; return false; case ChannelDataType.QueueRun: this.m_canFree = true; return false; case ChannelDataType.QueuePause: this.m_canFree = false; return false; default: return false; } } this.m_moveWaitHandle.Reset(); if (this.m_moveWaitHandle.WaitOne(this.Timeout)) { return this.MoveNext(); } else { this.m_status = ChannelStatus.Overtime; return false; } } /// /// 写入通道 /// /// /// /// public override void Write(byte[] data, int offset, int length) { if ((byte)this.m_status > 3) { throw new Exception($"通道已{this.m_status}"); } if (!SpinWait.SpinUntil(() => { return this.m_canFree; }, this.Timeout)) { throw new TimeoutException(); } var channelPackage = new ChannelPackage() { ChannelId = this.m_id, DataType = ChannelDataType.DataOrder, Data = new ArraySegment(data, offset, length), SourceId = this.m_actor.ID, TargetId = this.m_targetId, Route = this.m_targetId.HasValue() }; this.m_actor.SendChannelPackage(channelPackage); this.m_lastOperationTime = DateTime.Now; } internal void ReceivedData(ChannelPackage channelPackage) { this.m_lastOperationTime = DateTime.Now; if (channelPackage.RunNow) { switch (channelPackage.DataType) { case ChannelDataType.CompleteOrder: this.m_lastOperationMes = channelPackage.Message; this.RequestComplete(false); break; case ChannelDataType.CancelOrder: this.m_lastOperationMes = channelPackage.Message; this.RequestCancel(false); break; case ChannelDataType.DisposeOrder: this.RequestDispose(false); break; case ChannelDataType.HoldOnOrder: this.m_lastOperationMes = channelPackage.Message; this.m_status = ChannelStatus.HoldOn; break; case ChannelDataType.QueueRun: this.m_canFree = true; return; case ChannelDataType.QueuePause: this.m_canFree = false; return; default: return; } } this.m_dataQueue.Enqueue(channelPackage); this.m_moveWaitHandle.Set(); } internal void SetID(int id) { this.m_id = id; } internal void SetUsing() { this.m_using = true; } private void Clear() { try { lock (this) { this.m_dataQueue.Clear(); if (this.m_actor.RemoveChannel(this.m_id)) { this.m_moveWaitHandle.Set(); this.m_moveWaitHandle.SafeDispose(); } } } catch { } } private void OnQueueChanged(bool free) { if ((byte)this.m_status > 3) { return; } try { var channelPackage = new ChannelPackage() { ChannelId = this.m_id, RunNow = true, DataType = free ? ChannelDataType.QueueRun : ChannelDataType.QueuePause, SourceId = this.m_actor.ID, TargetId = this.m_targetId, Route = this.m_targetId.HasValue() }; this.m_actor.SendChannelPackage(channelPackage); this.m_lastOperationTime = DateTime.Now; } catch { } } private void RequestCancel(bool clear) { this.m_status = ChannelStatus.Cancel; if (clear) { this.Clear(); } } private void RequestComplete(bool clear) { this.m_status = ChannelStatus.Completed; if (clear) { this.Clear(); } } internal void RequestDispose(bool clear) { if (clear) { this.Clear(); } if ((byte)this.m_status > 3) { return; } this.m_status = ChannelStatus.Disposed; } } }