//------------------------------------------------------------------------------ // 此代码版权(除特别声明或在XREF结尾的命名空间的代码)归作者本人若汝棋茗所有 // 源代码使用协议遵循本仓库的开源协议及附加协议,若本仓库没有设置,则按MIT开源协议授权 // CSDN博客:https://blog.csdn.net/qq_40374647 // 哔哩哔哩视频:https://space.bilibili.com/94253567 // Gitee源代码仓库:https://gitee.com/RRQM_Home // Github源代码仓库:https://github.com/RRQM // API首页:https://www.yuque.com/rrqm/touchsocket/index // 交流QQ群:234762506 // 感谢您的下载和使用 //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ using System; using System.IO; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; using TouchSocket.Core; using TouchSocket.Sockets; namespace TouchSocket.Rpc.TouchRpc { /// /// WSTouchRpcClient /// public partial class WSTouchRpcClient : DisposableObject, IWSTouchRpcClient { private readonly ActionMap m_actionMap; private readonly ArraySegment m_buffer; private readonly RpcActor m_rpcActor; private ClientWebSocket m_client; private TouchSocketConfig m_config; private IPHost m_remoteIPHost; private RpcStore m_rpcStore; /// /// 创建一个WSTouchRpcClient实例。 /// public WSTouchRpcClient() { m_buffer = new ArraySegment(new byte[1024 * 64]); m_actionMap = new ActionMap(); m_rpcActor = new RpcActor(false) { OutputSend = RpcActorSend, OnRouting = OnRpcActorRouting, OnHandshaked = OnRpcActorHandshaked, OnReceived = OnRpcActorReceived, GetInvokeMethod = GetInvokeMethod, OnClose = OnRpcServiceClose, OnStreamTransfering = OnRpcActorStreamTransfering, OnStreamTransfered = OnRpcActorStreamTransfered, OnFileTransfering = OnRpcActorFileTransfering, OnFileTransfered = OnRpcActorFileTransfered, Caller = this }; } /// /// 方法映射表 /// public ActionMap ActionMap { get => m_actionMap; } /// public bool CanSend => m_client.State == WebSocketState.Open; /// /// 客户端配置 /// public TouchSocketConfig Config => m_config; /// public IContainer Container { get; private set; } /// /// 断开连接 /// public DisconnectEventHandler Disconnected { get; set; } /// public string ID => m_rpcActor.ID; /// public bool IsHandshaked => m_rpcActor.IsHandshaked; /// /// 最后活动时间 /// public DateTime LastActiveTime { get; private set; } /// public ILog Logger => m_rpcActor.Logger; /// public IPluginsManager PluginsManager { get; private set; } /// public IPHost RemoteIPHost => m_remoteIPHost; /// public string RootPath { get => m_rpcActor.RootPath; set => m_rpcActor.RootPath = value; } /// public RpcActor RpcActor => m_rpcActor; /// public RpcStore RpcStore { get => m_rpcStore; } /// public SerializationSelector SerializationSelector => m_rpcActor.SerializationSelector; /// public Func TryCanInvoke { get => m_rpcActor.TryCanInvoke; set => m_rpcActor.TryCanInvoke = value; } /// public bool UsePlugin { get; private set; } /// public bool ChannelExisted(int id) { return m_rpcActor.ChannelExisted(id); } /// /// 关闭 /// /// public void Close(string msg) { if (m_client?.State != WebSocketState.Open) { return; } m_client.SafeDispose(); m_rpcActor.SafeDispose(); BreakOut($"调用{nameof(Close)}", true); } /// public async Task ConnectAsync(int timeout = 5000) { if (!RemoteIPHost.IsUri) { throw new Exception("RemoteIPHost必须为Uri格式。"); } if (m_client == null || m_client.State != WebSocketState.Open) { m_client.SafeDispose(); m_client = new ClientWebSocket(); await m_client.ConnectAsync(RemoteIPHost.Uri, default).ConfigureAwait(false); BeginReceive(); } if (IsHandshaked) { return; } m_rpcActor.Handshake(Config.GetValue(TouchRpcConfigExtensions.VerifyTokenProperty), Config.GetValue(TouchRpcConfigExtensions.DefaultIdProperty), default, timeout, Config.GetValue(TouchRpcConfigExtensions.MetadataProperty)); } /// public Channel CreateChannel() { return m_rpcActor.CreateChannel(); } /// public Channel CreateChannel(int id) { return m_rpcActor.CreateChannel(id); } /// public Channel CreateChannel(string targetId) { return m_rpcActor.CreateChannel(targetId); } /// public Channel CreateChannel(string targetId, int id) { return m_rpcActor.CreateChannel(targetId, id); } /// public bool Ping(string targetId, int timeout = 5000) { return m_rpcActor.Ping(targetId, timeout); } /// public bool Ping(int timeout = 5000) { return m_rpcActor.Ping(timeout); } /// public void ResetID(string newID) { m_rpcActor.ResetID(newID); } /// public void Send(short protocol, byte[] buffer, int offset, int length) { m_rpcActor.Send(protocol, buffer, offset, length); } /// public Task SendAsync(short protocol, byte[] buffer, int offset, int length) { return m_rpcActor.SendAsync(protocol, buffer, offset, length); } /// public Result SendStream(Stream stream, StreamOperator streamOperator, Metadata metadata = null) { return m_rpcActor.SendStream(stream, streamOperator, metadata); } /// public Task SendStreamAsync(Stream stream, StreamOperator streamOperator, Metadata metadata = null) { return m_rpcActor.SendStreamAsync(stream, streamOperator, metadata); } /// /// 配置 /// /// /// public IWSTouchRpcClient Setup(string ipHost) { TouchSocketConfig config = new TouchSocketConfig(); config.SetRemoteIPHost(new IPHost(ipHost)); return Setup(config); } /// /// 配置 /// /// /// public IWSTouchRpcClient Setup(TouchSocketConfig clientConfig) { m_config = clientConfig; LoadConfig(m_config); return this; } /// public bool TrySubscribeChannel(int id, out Channel channel) { return m_rpcActor.TrySubscribeChannel(id, out channel); } /// protected override void Dispose(bool disposing) { m_client.SafeDispose(); m_rpcActor.SafeDispose(); base.Dispose(disposing); } /// /// 加载配置 /// /// protected virtual void LoadConfig(TouchSocketConfig config) { if (config == null) { throw new Exception("配置文件为空"); } m_remoteIPHost = config.GetValue(Sockets.TouchSocketConfigExtension.RemoteIPHostProperty); Container = config.Container; UsePlugin = config.IsUsePlugin; PluginsManager = config.PluginsManager; m_rpcActor.Logger = Container.Resolve(); m_rpcActor.FileController = Container.GetFileResourceController(); RootPath = Config.GetValue(TouchRpcConfigExtensions.RootPathProperty); m_rpcActor.SerializationSelector = Config.GetValue(TouchRpcConfigExtensions.SerializationSelectorProperty); if (config.GetValue(RpcConfigExtensions.RpcStoreProperty) is RpcStore rpcStore) { rpcStore.AddRpcParser(GetType().Name, this); } else { new RpcStore(config.Container).AddRpcParser(GetType().Name, this); } } /// /// 已断开连接。 /// /// protected virtual void OnDisconnected(DisconnectEventArgs e) { if (UsePlugin && PluginsManager.Raise(nameof(IDisconnectedPlguin.OnDisconnected), this, e)) { return; } Disconnected?.Invoke(this, e); } private void BeginReceive() { Task.Factory.StartNew(async() => { try { ByteBlock byteBlock = null; int bufferLength = this.Config.GetValue(TouchSocketConfigExtension.BufferLengthProperty); while (true) { byteBlock ??= new ByteBlock(bufferLength); var result = await m_client.ReceiveAsync(m_buffer, default); if (result.Count == 0) { BreakOut("远程终端主动关闭", false); return; } LastActiveTime = DateTime.Now; byteBlock.Write(m_buffer.Array, 0, result.Count); if (result.EndOfMessage) { try { m_rpcActor.InputReceivedData(byteBlock); } catch { } finally { byteBlock.SafeDispose(); byteBlock = default; } } } } catch (Exception ex) { BreakOut(ex.Message, false); } },TaskCreationOptions.LongRunning); } private void BreakOut(string msg, bool manual) { m_client.SafeDispose(); m_rpcActor.SafeDispose(); OnDisconnected(new DisconnectEventArgs(manual, msg)); } #region 小文件 /// public PullSmallFileResult PullSmallFile(string targetId, string path, Metadata metadata = null, int timeout = 5000, CancellationToken token = default) { return m_rpcActor.PullSmallFile(targetId, path, metadata, timeout, token); } /// public PullSmallFileResult PullSmallFile(string path, Metadata metadata = null, int timeout = 5000, CancellationToken token = default) { return m_rpcActor.PullSmallFile(path, metadata, timeout, token); } /// public Task PullSmallFileAsync(string targetId, string path, Metadata metadata = null, int timeout = 5000, CancellationToken token = default) { return m_rpcActor.PullSmallFileAsync(targetId, path, metadata, timeout, token); } /// public Task PullSmallFileAsync(string path, Metadata metadata = null, int timeout = 5000, CancellationToken token = default) { return m_rpcActor.PullSmallFileAsync(path, metadata, timeout, token); } /// public Result PushSmallFile(string targetId, string savePath, FileInfo fileInfo, Metadata metadata = null, int timeout = 5000, CancellationToken token = default) { return m_rpcActor.PushSmallFile(targetId, savePath, fileInfo, metadata, timeout, token); } /// public Result PushSmallFile(string savePath, FileInfo fileInfo, Metadata metadata = null, int timeout = 5000, CancellationToken token = default) { return m_rpcActor.PushSmallFile(savePath, fileInfo, metadata, timeout, token); } /// public Task PushSmallFileAsync(string targetId, string savePath, FileInfo fileInfo, Metadata metadata = null, int timeout = 5000, CancellationToken token = default) { return m_rpcActor.PushSmallFileAsync(targetId, savePath, fileInfo, metadata, timeout, token); } /// public Task PushSmallFileAsync(string savePath, FileInfo fileInfo, Metadata metadata = null, int timeout = 5000, CancellationToken token = default) { return m_rpcActor.PushSmallFileAsync(savePath, fileInfo, metadata, timeout, token); } #endregion 小文件 #region FileTransfer /// public Result PullFile(FileOperator fileOperator) { return m_rpcActor.PullFile(fileOperator); } /// public Result PullFile(string targetId, FileOperator fileOperator) { return m_rpcActor.PullFile(targetId, fileOperator); } /// public Task PullFileAsync(FileOperator fileOperator) { return m_rpcActor.PullFileAsync(fileOperator); } /// public Task PullFileAsync(string targetId, FileOperator fileOperator) { return m_rpcActor.PullFileAsync(targetId, fileOperator); } /// public Result PushFile(FileOperator fileOperator) { return m_rpcActor.PushFile(fileOperator); } /// public Result PushFile(string targetId, FileOperator fileOperator) { return m_rpcActor.PushFile(targetId, fileOperator); } /// public Task PushFileAsync(FileOperator fileOperator) { return m_rpcActor.PushFileAsync(fileOperator); } /// public Task PushFileAsync(string targetId, FileOperator fileOperator) { return m_rpcActor.PushFileAsync(targetId, fileOperator); } #endregion FileTransfer #region RPC /// public void Invoke(string method, IInvokeOption invokeOption, params object[] parameters) { m_rpcActor.Invoke(method, invokeOption, parameters); } /// public T Invoke(string method, IInvokeOption invokeOption, params object[] parameters) { return m_rpcActor.Invoke(method, invokeOption, parameters); } /// public T Invoke(string method, IInvokeOption invokeOption, ref object[] parameters, Type[] types) { return m_rpcActor.Invoke(method, invokeOption, ref parameters, types); } /// public void Invoke(string method, IInvokeOption invokeOption, ref object[] parameters, Type[] types) { m_rpcActor.Invoke(method, invokeOption, ref parameters, types); } /// public void Invoke(string targetId, string method, IInvokeOption invokeOption, params object[] parameters) { m_rpcActor.Invoke(targetId, method, invokeOption, parameters); } /// public T Invoke(string targetId, string method, IInvokeOption invokeOption, params object[] parameters) { return m_rpcActor.Invoke(targetId, method, invokeOption, parameters); } /// public void Invoke(string targetId, string method, IInvokeOption invokeOption, ref object[] parameters, Type[] types) { m_rpcActor.Invoke(targetId, method, invokeOption, ref parameters, types); } /// public T Invoke(string targetId, string method, IInvokeOption invokeOption, ref object[] parameters, Type[] types) { return m_rpcActor.Invoke(targetId, method, invokeOption, ref parameters, types); } /// public Task InvokeAsync(string method, IInvokeOption invokeOption, params object[] parameters) { return m_rpcActor.InvokeAsync(method, invokeOption, parameters); } /// public Task InvokeAsync(string method, IInvokeOption invokeOption, params object[] parameters) { return m_rpcActor.InvokeAsync(method, invokeOption, parameters); } /// public Task InvokeAsync(string targetId, string method, IInvokeOption invokeOption, params object[] parameters) { return m_rpcActor.InvokeAsync(targetId, method, invokeOption, parameters); } /// public Task InvokeAsync(string targetId, string method, IInvokeOption invokeOption, params object[] parameters) { return m_rpcActor.InvokeAsync(targetId, method, invokeOption, parameters); } #endregion RPC #region 内部委托绑定 private MethodInstance GetInvokeMethod(string arg) { return m_actionMap.GetMethodInstance(arg); } private void OnRpcActorFileTransfered(RpcActor actor, FileTransferStatusEventArgs e) { if (UsePlugin && PluginsManager.Raise(nameof(ITouchRpcPlugin.OnFileTransfered), this, e)) { return; } OnFileTransfered(e); } private void OnRpcActorFileTransfering(RpcActor actor, FileOperationEventArgs e) { if (UsePlugin && PluginsManager.Raise(nameof(ITouchRpcPlugin.OnFileTransfering), this, e)) { return; } OnFileTransfering(e); } private void OnRpcActorHandshaked(RpcActor actor, VerifyOptionEventArgs e) { if (UsePlugin && PluginsManager.Raise(nameof(ITouchRpcPlugin.OnHandshaked), this, e)) { return; } OnHandshaked(e); } private void OnRpcActorReceived(RpcActor actor, short protocol, ByteBlock byteBlock) { if (UsePlugin && PluginsManager.Raise(nameof(ITouchRpcPlugin.OnReceivedProtocolData), this, new ProtocolDataEventArgs(protocol, byteBlock))) { return; } OnReceived(protocol, byteBlock); } private void OnRpcActorRouting(RpcActor actor, PackageRouterEventArgs e) { if (UsePlugin && PluginsManager.Raise(nameof(ITouchRpcPlugin.OnRouting), this, e)) { return; } OnRouting(e); } private void OnRpcActorStreamTransfered(RpcActor actor, StreamStatusEventArgs e) { if (UsePlugin && PluginsManager.Raise(nameof(ITouchRpcPlugin.OnStreamTransfered), this, e)) { return; } OnStreamTransfered(e); } private void OnRpcActorStreamTransfering(RpcActor actor, StreamOperationEventArgs e) { if (UsePlugin && PluginsManager.Raise(nameof(ITouchRpcPlugin.OnStreamTransfering), this, e)) { return; } OnStreamTransfering(e); } private void OnRpcServiceClose(RpcActor actor, string arg2) { Close(arg2); } private async void RpcActorSend(RpcActor actor, ArraySegment[] transferBytes) { LastActiveTime = DateTime.Now; for (int i = 0; i < transferBytes.Length; i++) { if (i == transferBytes.Length - 1) { await m_client.SendAsync(transferBytes[i], WebSocketMessageType.Binary, true, CancellationToken.None); } else { await m_client.SendAsync(transferBytes[i], WebSocketMessageType.Binary, false, CancellationToken.None); } } } #endregion 内部委托绑定 #region 事件触发 /// /// 当文件传输结束之后。并不意味着完成传输,请通过属性值进行判断。 /// /// protected virtual void OnFileTransfered(FileTransferStatusEventArgs e) { } /// /// 在文件传输即将进行时触发。 /// /// protected virtual void OnFileTransfering(FileOperationEventArgs e) { } /// /// 在完成握手连接时 /// /// protected virtual void OnHandshaked(VerifyOptionEventArgs e) { } /// /// 收到数据。 /// /// /// protected virtual void OnReceived(short protocol, ByteBlock byteBlock) { } /// /// 当需要转发路由包时 /// /// protected virtual void OnRouting(PackageRouterEventArgs e) { } /// /// 流数据处理,用户需要在此事件中对e.Bucket手动释放。 /// /// protected virtual void OnStreamTransfered(StreamStatusEventArgs e) { } /// /// 即将接收流数据,用户需要在此事件中对e.Bucket初始化。 /// /// protected virtual void OnStreamTransfering(StreamOperationEventArgs e) { } #endregion 事件触发 #region RPC解析器 void IRpcParser.OnRegisterServer(MethodInstance[] methodInstances) { foreach (var methodInstance in methodInstances) { if (methodInstance.GetAttribute() is TouchRpcAttribute attribute) { m_actionMap.Add(attribute.GetInvokenKey(methodInstance), methodInstance); } } } void IRpcParser.OnUnregisterServer(MethodInstance[] methodInstances) { foreach (var methodInstance in methodInstances) { if (methodInstance.GetAttribute() is TouchRpcAttribute attribute) { m_actionMap.Remove(attribute.GetInvokenKey(methodInstance)); } } } void IRpcParser.SetRpcStore(RpcStore rpcStore) { m_rpcActor.RpcStore = rpcStore; m_rpcStore = rpcStore; } #endregion RPC解析器 } }