What is the most succint way of writing the GetMessages
function with Rx:
使用Rx编写GetMessages函数的最简洁方法是什么:
static void Main()
{
Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var messages = GetMessages(socket, IPAddress.Loopback, 4000);
messages.Subscribe(x => Console.WriteLine(x));
Console.ReadKey();
}
static IObservable<string> GetMessages(Socket socket, IPAddress addr, int port)
{
var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);
// now will receive a stream of messages
// each message is prefixed with an 4 bytes/Int32 indicating it's length.
// the rest of the message is a string
// ????????????? Now What ?????????????
}
A simple server as a driver for the above sample: http://gist.github.com/452893#file_program.cs
一个简单的服务器作为上述示例的驱动程序:http://gist.github.com/452893#file_program.cs
On Using Rx For Socket Programming
I've been investigating using Reactive Extensions for some socket programming work I am doing. My motivation for doing so would be that it would somehow make the code "simpler". Whether this would mean less code, less nesting something along those lines.
我一直在调查使用Reactive Extensions来完成我正在做的一些套接字编程工作。我这样做的动机是它会以某种方式使代码“更简单”。这是否意味着更少的代码,更少的嵌套在这些线上。
However so far that does not seem to be the case:
但到目前为止似乎并非如此:
- I haven't found very many examples of using Rx with sockets
- 我没有找到很多使用Rx和套接字的例子
- The example
sI have found don't seem less complicated then my existing BeginXXXX, EndXXXX code - 我发现的示例似乎没有现有的BeginXXXX,EndXXXX代码那么复杂
- Although
Observable
has extension methods for FromAsyncPattern, this does not cover theSocketEventArgs
Async API. - 尽管Observable具有FromAsyncPattern的扩展方法,但这并不包括SocketEventArgs Async API。
Current Non-Working Solution
Here is what I have so far. This doesn't work, it fails with a stack overflow (heh) I haven't figured out the semantics so that I can create an IObservable
that will read a specified number of bytes.
这是我到目前为止所拥有的。这不起作用,它失败了堆栈溢出(呵呵)我还没有想出语义,所以我可以创建一个读取指定字节数的IObservable。
static IObservable<int> GetMessages(Socket socket, IPAddress addr, int port)
{
var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);
// keep reading until we get the first 4 bytes
byte[] buffer = new byte[1024];
var readAsync = Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);
IObservable<int> readBytes = null;
var temp = from totalRead in Observable.Defer(() => readBytes)
where totalRead < 4
select readAsync(buffer, totalRead, totalRead - 4, SocketFlags.None);
readBytes = temp.SelectMany(x => x).Sum();
var nowDoSomethingElse = readBytes.SkipUntil(whenConnect);
}
2 个解决方案
#1
8
Something along these lines could work. This is not tested, does not take into account exceptions and the case when a message is returned partially. But otherwise, I believe this is a right direction to go.
沿着这些方向的东西可以起作用。这未经过测试,未考虑异常以及部分返回消息的情况。但除此之外,我相信这是一个正确的方向。
public static IObservable<T> GetSocketData<T>(this Socket socket,
int sizeToRead, Func<byte[], T> valueExtractor)
{
return Observable.CreateWithDisposable<T>(observer =>
{
var readSize = Observable
.FromAsyncPattern<byte[], int, int, SocketFlags, int>(
socket.BeginReceive,
socket.EndReceive);
var buffer = new byte[sizeToRead];
return readSize(buffer, 0, sizeToRead, SocketFlags.None)
.Subscribe(
x => observer.OnNext(valueExtractor(buffer)),
observer.OnError,
observer.OnCompleted);
});
}
public static IObservable<int> GetMessageSize(this Socket socket)
{
return socket.GetSocketData(4, buf => BitConverter.ToInt32(buf, 0));
}
public static IObservable<string> GetMessageBody(this Socket socket,
int messageSize)
{
return socket.GetSocketData(messageSize, buf =>
Encoding.UTF8.GetString(buf, 0, messageSize));
}
public static IObservable<string> GetMessage(this Socket socket)
{
return
from size in socket.GetMessageSize()
from message in Observable.If(() => size != 0,
socket.GetMessageBody(size),
Observable.Return<string>(null))
select message;
}
public static IObservable<string> GetMessagesFromConnected(
this Socket socket)
{
return socket
.GetMessage()
.Repeat()
.TakeWhile(msg => !string.IsNullOrEmpty(msg));
}
public static IObservable<string> GetMessages(this Socket socket,
IPAddress addr, int port)
{
return Observable.Defer(() =>
{
var whenConnect = Observable
.FromAsyncPattern<IPAddress, int>(
socket.BeginConnect, socket.EndConnect);
return from _ in whenConnect(addr, port)
from msg in socket.GetMessagesFromConnected()
.Finally(socket.Close)
select msg;
});
}
Edit: To handle incomplete reads, Observable.While can be used (within GetSockedData) as proposed by Dave Sexton in the same thread on RX forum.
编辑:为了处理不完整的读取,Observable.While可以在Dave Sexton在RX论坛的同一个线程中提出使用(在GetSockedData内)。
Edit: Also, take a look at this Jeffrey Van Gogh's article: Asynchronous System.IO.Stream reading
编辑:另外,看看这篇Jeffrey Van Gogh的文章:Asynchronous System.IO.Stream阅读
#2
2
Ok, so this is perhaps "cheating", but I suppose you could re-purpose my non-Rx answer and wrap it with Observable.Create
.
好的,所以这可能是“作弊”,但我想你可以重新设定我的非Rx答案并用Observable.Create包装它。
I'm fairly sure that returning the socket as the IDisposable
is the wrong semantics, but not sure what would be.
我很确定以IDisposable返回套接字是错误的语义,但不确定会是什么。
static IObservable<string> GetMessages(Socket socket, IPAddress addr, int port)
{
return Observable.CreateWithDisposable<string>(
o =>
{
byte[] buffer = new byte[1024];
Action<int, Action<int>> readIntoBuffer = (length, callback) =>
{
var totalRead = 0;
AsyncCallback receiveCallback = null;
AsyncCallback temp = r =>
{
var read = socket.EndReceive(r);
if (read == 0)
{
socket.Close();
o.OnCompleted();
return;
}
totalRead += read;
if (totalRead < length)
{
socket.BeginReceive(buffer, totalRead, length - totalRead, SocketFlags.None, receiveCallback, null);
}
else
{
callback(length);
}
};
receiveCallback = temp;
socket.BeginReceive(buffer, totalRead, length, SocketFlags.None, receiveCallback, null);
};
Action<int> sizeRead = null;
Action<int> messageRead = x =>
{
var message = Encoding.UTF8.GetString(buffer, 0, x);
o.OnNext(message);
readIntoBuffer(4, sizeRead);
};
Action<int> temp2 = x =>
{
var size = BitConverter.ToInt32(buffer, 0);
readIntoBuffer(size, messageRead);
};
sizeRead = temp2;
AsyncCallback connectCallback = r =>
{
socket.EndConnect(r);
readIntoBuffer(4, sizeRead);
};
socket.BeginConnect(addr, port, connectCallback, null);
return socket;
});
}
#1
8
Something along these lines could work. This is not tested, does not take into account exceptions and the case when a message is returned partially. But otherwise, I believe this is a right direction to go.
沿着这些方向的东西可以起作用。这未经过测试,未考虑异常以及部分返回消息的情况。但除此之外,我相信这是一个正确的方向。
public static IObservable<T> GetSocketData<T>(this Socket socket,
int sizeToRead, Func<byte[], T> valueExtractor)
{
return Observable.CreateWithDisposable<T>(observer =>
{
var readSize = Observable
.FromAsyncPattern<byte[], int, int, SocketFlags, int>(
socket.BeginReceive,
socket.EndReceive);
var buffer = new byte[sizeToRead];
return readSize(buffer, 0, sizeToRead, SocketFlags.None)
.Subscribe(
x => observer.OnNext(valueExtractor(buffer)),
observer.OnError,
observer.OnCompleted);
});
}
public static IObservable<int> GetMessageSize(this Socket socket)
{
return socket.GetSocketData(4, buf => BitConverter.ToInt32(buf, 0));
}
public static IObservable<string> GetMessageBody(this Socket socket,
int messageSize)
{
return socket.GetSocketData(messageSize, buf =>
Encoding.UTF8.GetString(buf, 0, messageSize));
}
public static IObservable<string> GetMessage(this Socket socket)
{
return
from size in socket.GetMessageSize()
from message in Observable.If(() => size != 0,
socket.GetMessageBody(size),
Observable.Return<string>(null))
select message;
}
public static IObservable<string> GetMessagesFromConnected(
this Socket socket)
{
return socket
.GetMessage()
.Repeat()
.TakeWhile(msg => !string.IsNullOrEmpty(msg));
}
public static IObservable<string> GetMessages(this Socket socket,
IPAddress addr, int port)
{
return Observable.Defer(() =>
{
var whenConnect = Observable
.FromAsyncPattern<IPAddress, int>(
socket.BeginConnect, socket.EndConnect);
return from _ in whenConnect(addr, port)
from msg in socket.GetMessagesFromConnected()
.Finally(socket.Close)
select msg;
});
}
Edit: To handle incomplete reads, Observable.While can be used (within GetSockedData) as proposed by Dave Sexton in the same thread on RX forum.
编辑:为了处理不完整的读取,Observable.While可以在Dave Sexton在RX论坛的同一个线程中提出使用(在GetSockedData内)。
Edit: Also, take a look at this Jeffrey Van Gogh's article: Asynchronous System.IO.Stream reading
编辑:另外,看看这篇Jeffrey Van Gogh的文章:Asynchronous System.IO.Stream阅读
#2
2
Ok, so this is perhaps "cheating", but I suppose you could re-purpose my non-Rx answer and wrap it with Observable.Create
.
好的,所以这可能是“作弊”,但我想你可以重新设定我的非Rx答案并用Observable.Create包装它。
I'm fairly sure that returning the socket as the IDisposable
is the wrong semantics, but not sure what would be.
我很确定以IDisposable返回套接字是错误的语义,但不确定会是什么。
static IObservable<string> GetMessages(Socket socket, IPAddress addr, int port)
{
return Observable.CreateWithDisposable<string>(
o =>
{
byte[] buffer = new byte[1024];
Action<int, Action<int>> readIntoBuffer = (length, callback) =>
{
var totalRead = 0;
AsyncCallback receiveCallback = null;
AsyncCallback temp = r =>
{
var read = socket.EndReceive(r);
if (read == 0)
{
socket.Close();
o.OnCompleted();
return;
}
totalRead += read;
if (totalRead < length)
{
socket.BeginReceive(buffer, totalRead, length - totalRead, SocketFlags.None, receiveCallback, null);
}
else
{
callback(length);
}
};
receiveCallback = temp;
socket.BeginReceive(buffer, totalRead, length, SocketFlags.None, receiveCallback, null);
};
Action<int> sizeRead = null;
Action<int> messageRead = x =>
{
var message = Encoding.UTF8.GetString(buffer, 0, x);
o.OnNext(message);
readIntoBuffer(4, sizeRead);
};
Action<int> temp2 = x =>
{
var size = BitConverter.ToInt32(buffer, 0);
readIntoBuffer(size, messageRead);
};
sizeRead = temp2;
AsyncCallback connectCallback = r =>
{
socket.EndConnect(r);
readIntoBuffer(4, sizeRead);
};
socket.BeginConnect(addr, port, connectCallback, null);
return socket;
});
}