//------------------------------------------------------------------------------ // 此代码版权(除特别声明或在XREF结尾的命名空间的代码)归作者本人若汝棋茗所有 // 源代码使用协议遵循本仓库的开源协议及附加协议,若本仓库没有设置,则按MIT开源协议授权 // CSDN博客:https://blog.csdn.net/qq_40374647 // 哔哩哔哩视频:https://space.bilibili.com/94253567 // Gitee源代码仓库:https://gitee.com/RRQM_Home // Github源代码仓库:https://github.com/RRQM // API首页:https://www.yuque.com/rrqm/touchsocket/index // 交流QQ群:234762506 // 感谢您的下载和使用 //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ using System; using System.Threading.Tasks; using TouchSocket.Core; using TouchSocket.Resources; namespace TouchSocket.Rpc.TouchRpc { /// /// 具有远程键值存贮的操作端。 /// public abstract class RedisClient : ICache { /// /// 序列化转换器。 /// public BytesConverter Converter { get; set; } /// /// 超时设定。默认30000ms /// public int Timeout { get; set; } = 30 * 1000; /// /// /// /// /// /// /// /// /// 参数为空 /// 操作超时 /// 其他异常 public bool Add(string key, TValue value, int duration = 60000) { var cache = new CacheEntry(key) { Duration = TimeSpan.FromSeconds(duration) }; if (!(value is byte[])) { cache.Value = Converter.ConvertTo(value); } return AddCache(cache); } /// /// /// /// /// /// 参数为空 /// 操作超时 /// 其他异常 public bool AddCache(ICacheEntry entity) { if (ContainsCache(entity.Key)) { return false; } else { return SetCache(entity); } } /// /// /// /// /// /// 参数为空 /// 操作超时 /// 其他异常 public Task AddCacheAsync(ICacheEntry entity) { return EasyTask.Run(() => { return AddCache(entity); }); } /// /// /// /// 参数为空 /// 操作超时 /// 其他异常 public abstract void ClearCache(); /// /// /// /// /// 参数为空 /// 操作超时 /// 其他异常 public Task ClearCacheAsync() { return EasyTask.Run(() => { ClearCache(); }); } /// /// /// /// /// /// 参数为空 /// 操作超时 /// 其他异常 public abstract bool ContainsCache(string key); /// /// /// /// /// /// 参数为空 /// 操作超时 /// 其他异常 public Task ContainsCacheAsync(string key) { return EasyTask.Run(() => { return ContainsCache(key); }); } /// /// 获取缓存的键值对。 /// /// /// /// /// 参数为空 /// 操作超时 /// 其他异常 public TValue Get(string key) { if (TryGet(key, out var cache)) { return cache; } return default; } /// /// /// /// /// /// 参数为空 /// 操作超时 /// 其他异常 public abstract ICacheEntry GetCache(string key); /// /// /// /// /// /// 参数为空 /// 操作超时 /// 其他异常 public Task> GetCacheAsync(string key) { return EasyTask.Run(() => { return GetCache(key); }); } /// /// /// /// /// /// 参数为空 /// 操作超时 /// 其他异常 public abstract bool RemoveCache(string key); /// /// /// /// /// /// 参数为空 /// 操作超时 /// 其他异常 public Task RemoveCacheAsync(string key) { return EasyTask.Run(() => { return RemoveCache(key); }); } /// /// /// /// /// /// /// /// /// 参数为空 /// 操作超时 /// 其他异常 public bool Set(string key, TValue value, int duration = 60000) { var cache = new CacheEntry(key) { Duration = TimeSpan.FromSeconds(duration) }; if (value is byte[] bytes) { cache.Value = bytes; } else { cache.Value = Converter.ConvertTo(value); } return SetCache(cache); } /// /// /// /// /// /// 参数为空 /// 操作超时 /// 其他异常 public abstract bool SetCache(ICacheEntry entity); /// /// /// /// /// /// 参数为空 /// 操作超时 /// 其他异常 public Task SetCacheAsync(ICacheEntry entity) { return EasyTask.Run(() => { return SetCache(entity); }); } /// /// 获取指定键的值。 /// /// /// /// /// /// 参数为空 /// 操作超时 /// 其他异常 public bool TryGet(string key, out TValue value) { var cache = GetCache(key); if (cache != null) { if (cache.Value is null) { value = default; return true; } if (cache.Value is TValue value1) { value = value1; return true; } value = (TValue)Converter.ConvertFrom(cache.Value, typeof(TValue)); return true; } value = default; return false; } } /// /// RedisClient /// internal class InternalRedisClient : RedisClient { private readonly RpcActor m_rpcActor; public InternalRedisClient(RpcActor rpcActor, BytesConverter converter) { m_rpcActor = rpcActor; Converter = converter; } public override void ClearCache() { var package = new RedisRequestWaitPackage { packageType = RedisPackageType.Clear }; var waitData = m_rpcActor.WaitHandlePool.GetWaitData(package); try { using (ByteBlock byteBlock = new ByteBlock()) { package.Package(byteBlock); m_rpcActor.Send(TouchRpcUtility.P_600_Redis_Request, byteBlock.Buffer, 0, byteBlock.Len); } switch (waitData.Wait(Timeout)) { case WaitDataStatus.SetRunning: { if (waitData.WaitResult.Status == 1) { return; } else { throw new Exception(waitData.WaitResult.Message); } } case WaitDataStatus.Overtime: throw new TimeoutException(TouchSocketStatus.Overtime.GetDescription()); case WaitDataStatus.Canceled: case WaitDataStatus.Default: case WaitDataStatus.Disposed: default: throw new Exception(TouchSocketStatus.UnknownError.GetDescription()); } } finally { m_rpcActor.WaitHandlePool.Destroy(waitData); } } public override bool ContainsCache(string key) { if (string.IsNullOrEmpty(key)) { throw new ArgumentException($"“{nameof(key)}”不能为 null 或空。", nameof(key)); } var package = new RedisRequestWaitPackage { key = key, packageType = RedisPackageType.Contains }; var waitData = m_rpcActor.WaitHandlePool.GetWaitData(package); try { using (ByteBlock byteBlock = new ByteBlock()) { package.Package(byteBlock); m_rpcActor.Send(TouchRpcUtility.P_600_Redis_Request, byteBlock.Buffer, 0, byteBlock.Len); } switch (waitData.Wait(Timeout)) { case WaitDataStatus.SetRunning: { if (waitData.WaitResult.Status == 1) { return true; } else if (waitData.WaitResult.Status == byte.MaxValue) { return false; } else { throw new Exception(waitData.WaitResult.Message); } } case WaitDataStatus.Overtime: throw new TimeoutException(TouchSocketStatus.Overtime.GetDescription()); case WaitDataStatus.Canceled: case WaitDataStatus.Default: case WaitDataStatus.Disposed: default: throw new Exception(TouchSocketStatus.UnknownError.GetDescription()); } } finally { m_rpcActor.WaitHandlePool.Destroy(waitData); } } public override ICacheEntry GetCache(string key) { if (string.IsNullOrEmpty(key)) { throw new ArgumentException($"“{nameof(key)}”不能为 null 或空。", nameof(key)); } RedisRequestWaitPackage package = new RedisRequestWaitPackage() { key = key, packageType = RedisPackageType.Get }; var waitData = m_rpcActor.WaitHandlePool.GetWaitData(package); try { using (ByteBlock byteBlock = new ByteBlock((package.value == null ? 0 : package.value.Length) + 1024)) { package.Package(byteBlock); m_rpcActor.Send(TouchRpcUtility.P_600_Redis_Request, byteBlock.Buffer, 0, byteBlock.Len); } switch (waitData.Wait(Timeout)) { case WaitDataStatus.SetRunning: { RedisResponseWaitPackage responsePackage = (RedisResponseWaitPackage)waitData.WaitResult; if (responsePackage.Status == 1) { return new CacheEntry(key) { Value = responsePackage.value }; } else if (responsePackage.Status == byte.MaxValue) { return new CacheEntry(key); } else { return default; } } case WaitDataStatus.Overtime: throw new TimeoutException(TouchSocketStatus.Overtime.GetDescription()); case WaitDataStatus.Canceled: case WaitDataStatus.Default: case WaitDataStatus.Disposed: default: throw new TimeoutException(TouchSocketStatus.UnknownError.GetDescription()); } } finally { m_rpcActor.WaitHandlePool.Destroy(waitData); } } public override bool RemoveCache(string key) { if (string.IsNullOrEmpty(key)) { throw new ArgumentException($"“{nameof(key)}”不能为 null 或空。", nameof(key)); } var package = new RedisRequestWaitPackage { key = key, packageType = RedisPackageType.Remove }; var waitData = m_rpcActor.WaitHandlePool.GetWaitData(package); try { using (ByteBlock byteBlock = new ByteBlock((package.value == null ? 0 : package.value.Length) + 1024)) { package.Package(byteBlock); m_rpcActor.Send(TouchRpcUtility.P_600_Redis_Request, byteBlock.Buffer, 0, byteBlock.Len); } switch (waitData.Wait(Timeout)) { case WaitDataStatus.SetRunning: { if (waitData.WaitResult.Status == 1) { return true; } else if (waitData.WaitResult.Status == byte.MaxValue) { return false; } else { throw new Exception(waitData.WaitResult.Message); } } case WaitDataStatus.Overtime: throw new TimeoutException(Resources.TouchSocketStatus.Overtime.GetDescription()); case WaitDataStatus.Canceled: return false; case WaitDataStatus.Default: case WaitDataStatus.Disposed: default: throw new TimeoutException(Resources.TouchSocketStatus.UnknownError.GetDescription()); } } finally { m_rpcActor.WaitHandlePool.Destroy(waitData); } } public override bool SetCache(ICacheEntry cache) { if (string.IsNullOrEmpty(cache.Key)) { throw new ArgumentException($"“{nameof(cache.Key)}”不能为 null 或空。", nameof(cache.Key)); } if (cache is null) { throw new ArgumentNullException(nameof(cache)); } var package = new RedisRequestWaitPackage { key = cache.Key, timeSpan = cache.Duration, value = cache.Value, packageType = RedisPackageType.Set }; var waitData = m_rpcActor.WaitHandlePool.GetWaitData(package); try { using (ByteBlock byteBlock = new ByteBlock((package.value == null ? 0 : package.value.Length) + 1024)) { package.Package(byteBlock); m_rpcActor.Send(TouchRpcUtility.P_600_Redis_Request, byteBlock.Buffer, 0, byteBlock.Len); } switch (waitData.Wait(Timeout)) { case WaitDataStatus.SetRunning: { if (waitData.WaitResult.Status == 1) { return true; } else if (waitData.WaitResult.Status == byte.MaxValue) { return false; } else { throw new Exception(waitData.WaitResult.Message); } } case WaitDataStatus.Overtime: throw new TimeoutException(Resources.TouchSocketStatus.Overtime.GetDescription()); case WaitDataStatus.Canceled: return false; case WaitDataStatus.Default: case WaitDataStatus.Disposed: default: throw new TimeoutException(Resources.TouchSocketStatus.UnknownError.GetDescription()); } } finally { m_rpcActor.WaitHandlePool.Destroy(waitData); } } } }