原文网址: http://www.cnblogs.com/csdev
Networkcomms 是一款C# 语言编写的TCP/UDP通信框架 作者是英国人 以前是收费的 目前作者已经开源 许可是:Apache License v2
开源地址是:https://github.com/MarcFletcher/NetworkComms.Net
线程安全的数据流
/// <summary> /// A wrapper around a stream to ensure it can be accessed in a thread safe way. The .net implementation of Stream.Synchronized is not suitable on its own. /// 线程安全的数据流 .net实现的Stream.Synchronized 不适合其本身 /// </summary> public class ThreadSafeStream : Stream, IDisposable { private Stream _innerStream; private object streamLocker = new object(); /// <summary> /// If true the internal stream will be disposed once the data has been written to the network /// 如果设置为True 当数据写入到网络后 内部的数据流将会释放 /// </summary> public bool DiposeInnerStreamOnDispose { get; set; } /// <summary> /// Create a thread safe stream. Once any actions are complete the stream must be correctly disposed by the user. /// 创建一个新的线程安全数据流 一旦操作完成数据流必须正确的释放 /// </summary> /// <param name="stream">The stream to make thread safe</param> public ThreadSafeStream(Stream stream) { this.DiposeInnerStreamOnDispose = false; this._innerStream = stream; } /// <summary> /// Create a thread safe stream. /// 创建一个新的线程安全数据流 /// </summary> /// <param name="stream">数据流 The stream to make thread safe.</param> /// <param name="closeStreamAfterSend">发送完成后内部数据流是否释放 If true the provided stream will be disposed once data has been written to the network. If false the stream must be disposed of correctly by the user</param> public ThreadSafeStream(Stream stream, bool closeStreamAfterSend) { this.DiposeInnerStreamOnDispose = closeStreamAfterSend; this._innerStream = stream; } /// <summary> /// The total length of the internal stream /// 内部数据流的长度 /// </summary> public override long Length { get { lock (streamLocker) return _innerStream.Length; } } /// <inheritdoc /> public override void SetLength(long value) { lock (streamLocker) _innerStream.SetLength(value); } /// <summary> /// The current position of the internal stream /// 内部数据流的当前位置 /// </summary> public override long Position { get { lock (streamLocker) return _innerStream.Position; } set { lock (streamLocker) _innerStream.Position = value; } } /// <inheritdoc /> public override long Seek(long offset, SeekOrigin origin) { lock (streamLocker) return _innerStream.Seek(offset, origin); } /// <summary> /// Returns data from entire Stream /// 从数据流中返回所有数据 /// </summary> /// <param name="numberZeroBytesPrefex">是否存在0前缀 If non zero will append N 0 value bytes to the start of the returned array</param> /// <returns></returns> ) { lock (streamLocker) { _innerStream.Seek(, SeekOrigin.Begin); byte[] returnData = new byte[_innerStream.Length + numberZeroBytesPrefex]; _innerStream.Read(returnData, numberZeroBytesPrefex, returnData.Length - numberZeroBytesPrefex); return returnData; } } /// <summary> /// Returns data from the specified portion of Stream /// 从数据流的指定位置返回数据 /// </summary> /// <param name="start">开始位置 The start position of the desired bytes</param> /// <param name="length">长度 The total number of desired bytes, not including the zero byte prefix and append parameters</param> /// <param name="numberZeroBytesPrefix">0前缀 If non zero will append N 0 value bytes to the start of the returned array</param> /// <param name="numberZeroBytesAppend">0后缀 If non zero will append N 0 value bytes to the end of the returned array</param> /// <returns></returns> , ) { if (length > int.MaxValue) throw new ArgumentOutOfRangeException("length", "Unable to return array whose size is larger than int.MaxValue. Consider requesting multiple smaller arrays."); lock (streamLocker) { if (start + length > _innerStream.Length) throw new ArgumentOutOfRangeException("length", "Provided start and length parameters reference past the end of the available stream."); _innerStream.Seek(start, SeekOrigin.Begin); byte[] returnData = new byte[length + numberZeroBytesPrefix + numberZeroBytesAppend]; _innerStream.Read(returnData, numberZeroBytesPrefix, (int)length); return returnData; } } /// <summary> /// Return the MD5 hash of the current <see cref="ThreadSafeStream"/> as a string /// 返回当前“线程安全数据流”的MD5哈希值 /// </summary> /// <returns></returns> public string MD5() { lock (streamLocker) return StreamTools.MD5(_innerStream); } /// <summary> /// Return the MD5 hash of part of the current <see cref="ThreadSafeStream"/> as a string /// 返回当前“线程安全数据流”指定部分的MD5哈希值 /// </summary> /// <param name="start">开始位置 The start position in the stream</param> /// <param name="length">长度 The length of stream to MD5</param> /// <returns></returns> public string MD5(long start, int length) { using (MemoryStream partialStream = new MemoryStream(length)) { lock (streamLocker) { StreamTools.Write(_innerStream, start, length, partialStream, , , ); return StreamTools.MD5(partialStream); } } } /// <summary> /// Writes the provided buffer to the internal stream starting at the provided position within the internal stream /// 把 字节数组 从内部数据流指定的位置开始写入 /// </summary> /// <param name="buffer"></param> /// <param name="startPosition"></param> public void Write(byte[] buffer, long startPosition) { if (buffer == null) throw new ArgumentNullException("data"); lock (streamLocker) { _innerStream.Seek(startPosition, SeekOrigin.Begin); _innerStream.Write(buffer, , buffer.Length); _innerStream.Flush(); } } /// <inheritdoc /> public override void Write(byte[] buffer, int offset, int count) { if (buffer == null) throw new ArgumentNullException("buffer"); lock (streamLocker) _innerStream.Write(buffer, offset, count); } /// <summary> /// Copies data specified by start and length properties from internal stream to the provided stream. /// 根据参数 从指定的开始位置复制指定长度的内部数据流到目标数据流中 /// </summary> /// <param name="destinationStream">目标数据流 The destination stream to write to</param> /// <param name="startPosition"></param> /// <param name="length"></param> /// <param name="writeBufferSize">写入的缓冲区大小 The buffer size to use for copying stream contents</param> /// <param name="minTimeoutMS">超时最小时间 The minimum time allowed for any sized copy</param> /// <param name="timeoutMSPerKBWrite">每KB数据发送的超时时间 The timouts in milliseconds per KB to write</param> /// <returns>每KB数据发送的平均时间 The average time in milliseconds per byte written</returns> , ) { lock (streamLocker) return StreamTools.Write(_innerStream, startPosition, length, destinationStream, writeBufferSize, timeoutMSPerKBWrite, minTimeoutMS); } /// <summary> /// Attempts to return the buffer associated with the internal stream. In certain circumstances this is more efficient /// than copying the stream contents into a new buffer using ToArray. If the internal stream is not a memory stream /// will throw InvalidCastException. If access to the buffer is not allowed will throw an UnauthorizedAccessException. /// 尝试返回内部流相关的缓冲区。 /// 在某些情况下,这比使用ToArray复制内容到一个新的缓冲区中更高效 /// 如果内部数据流不是内存流 将会抛出异常 /// 如果缓冲区不可访问 ,也会抛出异常 /// </summary> /// <returns></returns> public byte[] GetBuffer() { #if NETFX_CORE throw new NotImplementedException("This method has not been implemented for Win RT"); #else MemoryStream _innerMemoryStream = _innerStream as MemoryStream; if (_innerMemoryStream != null) return _innerMemoryStream.GetBuffer(); else throw new InvalidCastException("Unable to return stream buffer as inner stream is not a MemoryStream."); #endif } /// <inheritdoc /> public override int Read(byte[] buffer, int offset, int count) { lock (streamLocker) return _innerStream.Read(buffer, offset, count); } /// <summary> /// Disposes the internal stream if <see cref="DiposeInnerStreamOnDispose"/> is true. /// Use Close() to close the inner stream regardless of <see cref="DiposeInnerStreamOnDispose"/>. /// 如果 DiposeInnerStreamOnDispose属性设置为TRUE,释放内部数据流 /// </summary> public new void Dispose() { if (DiposeInnerStreamOnDispose) { lock (streamLocker) _innerStream.Dispose(); } } /// <summary> /// Disposes the internal stream. If <see cref="DiposeInnerStreamOnDispose"/> is false, forceDispose /// must be true to dispose of the internal stream. /// 如果 DiposeInnerStreamOnDispose属性设置为False ,强制释放设定为True,来释放内部数据流 /// </summary> /// <param name="forceDispose">If true the internal stream will be disposed regardless of <see cref="DiposeInnerStreamOnDispose"/> value.</param> public new void Dispose(bool forceDispose) { if (DiposeInnerStreamOnDispose || forceDispose) { lock (streamLocker) _innerStream.Dispose(); } } /// <inheritdoc /> public override bool CanRead { get { lock (streamLocker) return _innerStream.CanRead; } } /// <inheritdoc /> public override bool CanSeek { get { lock (streamLocker) return _innerStream.CanSeek; } } /// <inheritdoc /> public override bool CanWrite { get { lock (streamLocker) return _innerStream.CanWrite; } } /// <inheritdoc /> public override void Flush() { lock (streamLocker) _innerStream.Flush(); } }