//------------------------------------------------------------------------------ // 此代码版权(除特别声明或在XREF结尾的命名空间的代码)归作者本人若汝棋茗所有 // 源代码使用协议遵循本仓库的开源协议及附加协议,若本仓库没有设置,则按MIT开源协议授权 // CSDN博客:https://blog.csdn.net/qq_40374647 // 哔哩哔哩视频:https://space.bilibili.com/94253567 // Gitee源代码仓库:https://gitee.com/RRQM_Home // Github源代码仓库:https://github.com/RRQM // API首页:https://www.yuque.com/rrqm/touchsocket/index // 交流QQ群:234762506 // 感谢您的下载和使用 //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ using System; using System.IO; using System.Threading; namespace TouchSocket.Core { /// /// 阻塞式单项读取流。 /// //[IntelligentCoder.AsyncMethodPoster(Flags = IntelligentCoder.MemberFlags.Public)] public abstract partial class BlockReadStream : Stream, IWrite { private readonly AutoResetEvent m_inputEvent; private readonly AutoResetEvent m_readEvent; private byte[] m_buffer; private bool m_dis; private volatile int m_length; private volatile int m_offset; /// /// 构造函数 /// public BlockReadStream() { m_readEvent = new AutoResetEvent(false); m_inputEvent = new AutoResetEvent(false); ReadTimeout = 5000; } /// /// /// /// public virtual void Write(byte[] buffer) { Write(buffer, 0, buffer.Length); } /// /// 还剩余的未读取的长度 /// public int CanReadLen { get { if (m_dis) { return 0; } return m_length - m_offset; } } /// /// 不可使用 /// public override bool CanSeek => false; /// /// 不可使用 /// public override long Length => throw new NotImplementedException(); /// /// 不可使用 /// public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } /// /// /// public override int ReadTimeout { get; set; } /// /// 阻塞读取。 /// /// /// /// /// public override int Read(byte[] buffer, int offset, int count) { if (!CanRead) { throw new Exception("该流不允许读取。"); } int r; if (m_length > 0) { if (m_length > count) { //按count读取 Array.Copy(m_buffer, m_offset, buffer, offset, count); m_length -= count; m_offset += count; r = count; } else { //会读完本次 Array.Copy(m_buffer, m_offset, buffer, offset, m_length); r = m_length; Reset(); } } else { //无数据,须等待 if (m_readEvent.WaitOne(ReadTimeout)) { if (m_length == 0) { Reset(); r = 0; } else if (m_length > count) { //按count读取 Array.Copy(m_buffer, m_offset, buffer, offset, count); m_length -= count; m_offset += count; r = count; } else { //会读完本次 Array.Copy(m_buffer, m_offset, buffer, offset, m_length); r = m_length; Reset(); } } else { throw new TimeoutException(); } } return r; } /// /// 不可使用 /// /// /// /// public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); } /// /// 不可使用 /// /// public override void SetLength(long value) { throw new NotImplementedException(); } /// /// /// /// protected override void Dispose(bool disposing) { if (m_dis) { return; } m_dis = true; Reset(); m_readEvent.SafeDispose(); m_inputEvent.SafeDispose(); base.Dispose(disposing); } /// /// 传输输入. /// 必须以length为0结束。读取端会超时。 /// /// /// /// /// protected bool Input(byte[] buffer, int offset, int length) { m_inputEvent.Reset(); m_buffer = buffer; m_offset = offset; m_length = length; m_readEvent.Set(); return m_inputEvent.WaitOne(ReadTimeout); } private void Reset() { m_buffer = null; m_offset = 0; m_length = 0; m_readEvent.Reset(); m_inputEvent.Set(); } } }