//------------------------------------------------------------------------------
// 此代码版权(除特别声明或在XREF结尾的命名空间的代码)归作者本人若汝棋茗所有
// 源代码使用协议遵循本仓库的开源协议及附加协议,若本仓库没有设置,则按MIT开源协议授权
// CSDN博客:https://blog.csdn.net/qq_40374647
// 哔哩哔哩视频:https://space.bilibili.com/94253567
// Gitee源代码仓库:https://gitee.com/RRQM_Home
// Github源代码仓库:https://github.com/RRQM
// API首页:https://www.yuque.com/rrqm/touchsocket/index
// 交流QQ群:234762506
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Threading;
using TouchSocket.Core;
namespace TouchSocket.Sockets
{
///
/// 数据处理适配器测试
///
public class DataAdapterTester : IDisposable
{
private readonly IntelligentDataQueue asyncBytes;
private readonly Thread sendThread;
private DataHandlingAdapter adapter;
private int bufferLength;
private int count;
private bool dispose;
private int expectedCount;
private Action receivedCallBack;
private Stopwatch stopwatch;
private int timeout;
private DataAdapterTester()
{
this.asyncBytes = new IntelligentDataQueue(1024 * 1024 * 10);
this.sendThread = new Thread(this.BeginSend);
this.sendThread.IsBackground = true;
this.sendThread.Name = "DataAdapterTesterThread";
this.sendThread.Start();
}
///
/// 获取测试器
///
/// 待测试适配器
/// 收到数据回调
/// 缓存数据长度
///
public static DataAdapterTester CreateTester(DataHandlingAdapter adapter, int bufferLength = 1024, Action receivedCallBack = default)
{
var tester = new DataAdapterTester();
tester.adapter = adapter;
tester.bufferLength = bufferLength;
adapter.SendCallBack = tester.SendCallback;
adapter.ReceivedCallBack = tester.OnReceived;
tester.receivedCallBack = receivedCallBack;
return tester;
}
///
/// 释放
///
public void Dispose()
{
this.dispose = true;
}
///
/// 模拟测试运行发送
///
///
///
///
/// 测试次数
/// 期待测试次数
/// 超时
///
public TimeSpan Run(byte[] buffer, int offset, int length, int testCount, int expectedCount, int timeout)
{
this.count = 0;
this.expectedCount = expectedCount;
this.timeout = timeout;
this.stopwatch = new Stopwatch();
this.stopwatch.Start();
EasyTask.Run(() =>
{
for (int i = 0; i < testCount; i++)
{
this.adapter.SendInput(buffer, offset, length);
}
});
if (SpinWait.SpinUntil(() => this.count == this.expectedCount, this.timeout))
{
this.stopwatch.Stop();
return this.stopwatch.Elapsed;
}
throw new TimeoutException();
}
///
/// 模拟发送
///
///
/// 测试次数
/// 期待测试次数
/// 超时
public TimeSpan Run(byte[] buffer, int testCount, int expectedCount, int timeout)
{
return this.Run(buffer, 0, buffer.Length, testCount, expectedCount, timeout);
}
private void BeginSend()
{
while (!this.dispose)
{
if (this.tryGet(out List byteBlocks))
{
foreach (ByteBlock block in byteBlocks)
{
try
{
this.adapter.ReceivedInput(block);
}
finally
{
block.Dispose();
}
}
}
else
{
Thread.Sleep(1);
}
}
}
private void OnReceived(ByteBlock byteBlock, IRequestInfo requestInfo)
{
this.count++;
this.receivedCallBack?.Invoke(byteBlock, requestInfo);
}
private void SendCallback(byte[] buffer, int offset, int length)
{
var asyncByte = new QueueDataBytes(new byte[length], 0, length);
Array.Copy(buffer, offset, asyncByte.Buffer, 0, length);
this.asyncBytes.Enqueue(asyncByte);
}
private bool tryGet(out List byteBlocks)
{
byteBlocks = new List();
ByteBlock block = null;
while (true)
{
if (this.asyncBytes.TryDequeue(out QueueDataBytes asyncByte))
{
if (block == null)
{
block = BytePool.Default.GetByteBlock(this.bufferLength);
byteBlocks.Add(block);
}
int surLen = this.bufferLength - block.Pos;
if (surLen < asyncByte.Length)//不能完成写入
{
block.Write(asyncByte.Buffer, asyncByte.Offset, surLen);
int offset = surLen;
while (offset < asyncByte.Length)
{
block = this.Write(asyncByte, ref offset);
byteBlocks.Add(block);
}
if (byteBlocks.Count > 10)
{
break;
}
}
else//本次能完成写入
{
block.Write(asyncByte.Buffer, asyncByte.Offset, asyncByte.Length);
if (byteBlocks.Count > 10)
{
break;
}
}
}
else
{
if (byteBlocks.Count > 0)
{
break;
}
else
{
return false;
}
}
}
return true;
}
private ByteBlock Write(QueueDataBytes transferByte, ref int offset)
{
ByteBlock block = BytePool.Default.GetByteBlock(this.bufferLength, true);
int len = Math.Min(transferByte.Length - offset, this.bufferLength);
block.Write(transferByte.Buffer, offset, len);
offset += len;
return block;
}
}
///
/// Udp数据处理适配器测试
///
public class UdpDataAdapterTester : IDisposable
{
private readonly IntelligentDataQueue asyncBytes;
private UdpDataHandlingAdapter adapter;
private int count;
private bool dispose;
private int expectedCount;
private Action receivedCallBack;
private Stopwatch stopwatch;
private int timeout;
private UdpDataAdapterTester(int multiThread)
{
this.asyncBytes = new IntelligentDataQueue(1024 * 1024 * 10);
for (int i = 0; i < multiThread; i++)
{
EasyTask.Run(this.BeginSend);
}
}
///
/// 获取测试器
///
/// 待测试适配器
/// 并发多线程数量
/// 收到数据回调
///
public static UdpDataAdapterTester CreateTester(UdpDataHandlingAdapter adapter, int multiThread, Action receivedCallBack = default)
{
var tester = new UdpDataAdapterTester(multiThread);
tester.adapter = adapter;
adapter.SendCallBack = tester.SendCallback;
adapter.ReceivedCallBack = tester.OnReceived;
tester.receivedCallBack = receivedCallBack;
return tester;
}
///
/// 释放
///
public void Dispose()
{
this.dispose = true;
}
///
/// 模拟测试运行发送
///
///
///
///
/// 测试次数
/// 期待测试次数
/// 超时
///
public TimeSpan Run(byte[] buffer, int offset, int length, int testCount, int expectedCount, int timeout)
{
this.count = 0;
this.expectedCount = expectedCount;
this.timeout = timeout;
this.stopwatch = new Stopwatch();
this.stopwatch.Start();
EasyTask.Run(() =>
{
for (int i = 0; i < testCount; i++)
{
this.adapter.SendInput(null, buffer, offset, length);
}
});
if (SpinWait.SpinUntil(() => this.count == this.expectedCount, this.timeout))
{
this.stopwatch.Stop();
return this.stopwatch.Elapsed;
}
throw new TimeoutException();
}
///
/// 模拟发送
///
///
/// 测试次数
/// 期待测试次数
/// 超时
public TimeSpan Run(byte[] buffer, int testCount, int expectedCount, int timeout)
{
return this.Run(buffer, 0, buffer.Length, testCount, expectedCount, timeout);
}
private void BeginSend()
{
while (!this.dispose)
{
if (this.tryGet(out List byteBlocks))
{
foreach (ByteBlock block in byteBlocks)
{
try
{
this.adapter.ReceivedInput(null, block);
}
finally
{
block.Dispose();
}
}
}
else
{
Thread.Sleep(1);
}
}
}
private void OnReceived(EndPoint endPoint, ByteBlock byteBlock, IRequestInfo requestInfo)
{
this.receivedCallBack?.Invoke(byteBlock, requestInfo);
Interlocked.Increment(ref this.count);
}
private void SendCallback(EndPoint endPoint, byte[] buffer, int offset, int length)
{
var asyncByte = new QueueDataBytes(new byte[length], 0, length);
Array.Copy(buffer, offset, asyncByte.Buffer, 0, length);
this.asyncBytes.Enqueue(asyncByte);
}
private bool tryGet(out List byteBlocks)
{
byteBlocks = new List();
while (this.asyncBytes.TryDequeue(out QueueDataBytes asyncByte))
{
var block = new ByteBlock(asyncByte.Length);
block.Write(asyncByte.Buffer, asyncByte.Offset, asyncByte.Length);
byteBlocks.Add(block);
}
if (byteBlocks.Count > 0)
{
return true;
}
return false;
}
}
}