使用Reactive Extensions(Rx)实现套接字编程?

时间:2021-07-01 20:17:08

What is the most succint way of writing the GetMessages function with Rx:

使用Rx编写GetMessages函数的最简洁方法是什么:

static void Main()
{
    Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

    var messages = GetMessages(socket, IPAddress.Loopback, 4000);
    messages.Subscribe(x => Console.WriteLine(x));

    Console.ReadKey();
}

static IObservable<string> GetMessages(Socket socket, IPAddress addr, int port)
{
    var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);

    // now will receive a stream of messages
    // each message is prefixed with an 4 bytes/Int32 indicating it's length. 
    // the rest of the message is a string

    // ????????????? Now What ????????????? 
}

A simple server as a driver for the above sample: http://gist.github.com/452893#file_program.cs

一个简单的服务器作为上述示例的驱动程序:http://gist.github.com/452893#file_program.cs

On Using Rx For Socket Programming

I've been investigating using Reactive Extensions for some socket programming work I am doing. My motivation for doing so would be that it would somehow make the code "simpler". Whether this would mean less code, less nesting something along those lines.

我一直在调查使用Reactive Extensions来完成我正在做的一些套接字编程工作。我这样做的动机是它会以某种方式使代码“更简单”。这是否意味着更少的代码,更少的嵌套在这些线上。

However so far that does not seem to be the case:

但到目前为止似乎并非如此:

  1. I haven't found very many examples of using Rx with sockets
  2. 我没有找到很多使用Rx和套接字的例子
  3. The example s I have found don't seem less complicated then my existing BeginXXXX, EndXXXX code
  4. 我发现的示例似乎没有现有的BeginXXXX,EndXXXX代码那么复杂
  5. Although Observable has extension methods for FromAsyncPattern, this does not cover the SocketEventArgs Async API.
  6. 尽管Observable具有FromAsyncPattern的扩展方法,但这并不包括SocketEventArgs Async API。

Current Non-Working Solution

Here is what I have so far. This doesn't work, it fails with a stack overflow (heh) I haven't figured out the semantics so that I can create an IObservable that will read a specified number of bytes.

这是我到目前为止所拥有的。这不起作用,它失败了堆栈溢出(呵呵)我还没有想出语义,所以我可以创建一个读取指定字节数的IObservable。

    static IObservable<int> GetMessages(Socket socket, IPAddress addr, int port)
    {
        var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);

        // keep reading until we get the first 4 bytes
        byte[] buffer = new byte[1024];
        var readAsync = Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);

        IObservable<int> readBytes = null;
        var temp = from totalRead in Observable.Defer(() => readBytes)
                   where totalRead < 4
                   select readAsync(buffer, totalRead, totalRead - 4, SocketFlags.None);
        readBytes = temp.SelectMany(x => x).Sum();

        var nowDoSomethingElse = readBytes.SkipUntil(whenConnect);
    }

2 个解决方案

#1


8  

Something along these lines could work. This is not tested, does not take into account exceptions and the case when a message is returned partially. But otherwise, I believe this is a right direction to go.

沿着这些方向的东西可以起作用。这未经过测试,未考虑异常以及部分返回消息的情况。但除此之外,我相信这是一个正确的方向。

    public static IObservable<T> GetSocketData<T>(this Socket socket,
        int sizeToRead, Func<byte[], T> valueExtractor)
    {
        return Observable.CreateWithDisposable<T>(observer =>
        {
            var readSize = Observable
                .FromAsyncPattern<byte[], int, int, SocketFlags, int>(
                socket.BeginReceive,
                socket.EndReceive);
            var buffer = new byte[sizeToRead];
            return readSize(buffer, 0, sizeToRead, SocketFlags.None)
                .Subscribe(
                x => observer.OnNext(valueExtractor(buffer)),
                    observer.OnError,
                    observer.OnCompleted);
        });
    }

    public static IObservable<int> GetMessageSize(this Socket socket)
    {
        return socket.GetSocketData(4, buf => BitConverter.ToInt32(buf, 0));
    }

    public static IObservable<string> GetMessageBody(this Socket socket,
        int messageSize)
    {
        return socket.GetSocketData(messageSize, buf =>
            Encoding.UTF8.GetString(buf, 0, messageSize));
    }

    public static IObservable<string> GetMessage(this Socket socket)
    {

        return
            from size in socket.GetMessageSize()
            from message in Observable.If(() => size != 0,
                socket.GetMessageBody(size),
                Observable.Return<string>(null))
            select message;
    }

    public static IObservable<string> GetMessagesFromConnected(
        this Socket socket)
    {
        return socket
            .GetMessage()
            .Repeat()
            .TakeWhile(msg => !string.IsNullOrEmpty(msg));
    }

    public static IObservable<string> GetMessages(this Socket socket,
        IPAddress addr, int port)
    {
        return Observable.Defer(() => 
        {
            var whenConnect = Observable
                .FromAsyncPattern<IPAddress, int>(
                    socket.BeginConnect, socket.EndConnect);
            return from _ in whenConnect(addr, port)
                   from msg in socket.GetMessagesFromConnected()
                       .Finally(socket.Close)
                   select msg;
        });
    }

Edit: To handle incomplete reads, Observable.While can be used (within GetSockedData) as proposed by Dave Sexton in the same thread on RX forum.

编辑:为了处理不完整的读取,Observable.While可以在Dave Sexton在RX论坛的同一个线程中提出使用(在GetSockedData内)。

Edit: Also, take a look at this Jeffrey Van Gogh's article: Asynchronous System.IO.Stream reading

编辑:另外,看看这篇Jeffrey Van Gogh的文章:Asynchronous System.IO.Stream阅读

#2


2  

Ok, so this is perhaps "cheating", but I suppose you could re-purpose my non-Rx answer and wrap it with Observable.Create.

好的,所以这可能是“作弊”,但我想你可以重新设定我的非Rx答案并用Observable.Create包装它。

I'm fairly sure that returning the socket as the IDisposable is the wrong semantics, but not sure what would be.

我很确定以IDisposable返回套接字是错误的语义,但不确定会是什么。

    static IObservable<string> GetMessages(Socket socket, IPAddress addr, int port)
    {
        return Observable.CreateWithDisposable<string>(
            o =>
            {
                byte[] buffer = new byte[1024];

                Action<int, Action<int>> readIntoBuffer = (length, callback) =>
                {
                    var totalRead = 0;

                    AsyncCallback receiveCallback = null;
                    AsyncCallback temp = r =>
                    {
                        var read = socket.EndReceive(r);

                        if (read == 0)
                        {
                            socket.Close();
                            o.OnCompleted();
                            return;
                        }

                        totalRead += read;

                        if (totalRead < length)
                        {
                            socket.BeginReceive(buffer, totalRead, length - totalRead, SocketFlags.None, receiveCallback, null);
                        }
                        else
                        {
                            callback(length);
                        }
                    };
                    receiveCallback = temp;

                    socket.BeginReceive(buffer, totalRead, length, SocketFlags.None, receiveCallback, null);
                };

                Action<int> sizeRead = null;

                Action<int> messageRead = x =>
                {
                    var message = Encoding.UTF8.GetString(buffer, 0, x);
                    o.OnNext(message);
                    readIntoBuffer(4, sizeRead);
                };

                Action<int> temp2 = x =>
                {
                    var size = BitConverter.ToInt32(buffer, 0);
                    readIntoBuffer(size, messageRead);
                };
                sizeRead = temp2;

                AsyncCallback connectCallback = r =>
                {
                    socket.EndConnect(r);
                    readIntoBuffer(4, sizeRead);
                };

                socket.BeginConnect(addr, port, connectCallback, null);

                return socket;
            });
    }

#1


8  

Something along these lines could work. This is not tested, does not take into account exceptions and the case when a message is returned partially. But otherwise, I believe this is a right direction to go.

沿着这些方向的东西可以起作用。这未经过测试,未考虑异常以及部分返回消息的情况。但除此之外,我相信这是一个正确的方向。

    public static IObservable<T> GetSocketData<T>(this Socket socket,
        int sizeToRead, Func<byte[], T> valueExtractor)
    {
        return Observable.CreateWithDisposable<T>(observer =>
        {
            var readSize = Observable
                .FromAsyncPattern<byte[], int, int, SocketFlags, int>(
                socket.BeginReceive,
                socket.EndReceive);
            var buffer = new byte[sizeToRead];
            return readSize(buffer, 0, sizeToRead, SocketFlags.None)
                .Subscribe(
                x => observer.OnNext(valueExtractor(buffer)),
                    observer.OnError,
                    observer.OnCompleted);
        });
    }

    public static IObservable<int> GetMessageSize(this Socket socket)
    {
        return socket.GetSocketData(4, buf => BitConverter.ToInt32(buf, 0));
    }

    public static IObservable<string> GetMessageBody(this Socket socket,
        int messageSize)
    {
        return socket.GetSocketData(messageSize, buf =>
            Encoding.UTF8.GetString(buf, 0, messageSize));
    }

    public static IObservable<string> GetMessage(this Socket socket)
    {

        return
            from size in socket.GetMessageSize()
            from message in Observable.If(() => size != 0,
                socket.GetMessageBody(size),
                Observable.Return<string>(null))
            select message;
    }

    public static IObservable<string> GetMessagesFromConnected(
        this Socket socket)
    {
        return socket
            .GetMessage()
            .Repeat()
            .TakeWhile(msg => !string.IsNullOrEmpty(msg));
    }

    public static IObservable<string> GetMessages(this Socket socket,
        IPAddress addr, int port)
    {
        return Observable.Defer(() => 
        {
            var whenConnect = Observable
                .FromAsyncPattern<IPAddress, int>(
                    socket.BeginConnect, socket.EndConnect);
            return from _ in whenConnect(addr, port)
                   from msg in socket.GetMessagesFromConnected()
                       .Finally(socket.Close)
                   select msg;
        });
    }

Edit: To handle incomplete reads, Observable.While can be used (within GetSockedData) as proposed by Dave Sexton in the same thread on RX forum.

编辑:为了处理不完整的读取,Observable.While可以在Dave Sexton在RX论坛的同一个线程中提出使用(在GetSockedData内)。

Edit: Also, take a look at this Jeffrey Van Gogh's article: Asynchronous System.IO.Stream reading

编辑:另外,看看这篇Jeffrey Van Gogh的文章:Asynchronous System.IO.Stream阅读

#2


2  

Ok, so this is perhaps "cheating", but I suppose you could re-purpose my non-Rx answer and wrap it with Observable.Create.

好的,所以这可能是“作弊”,但我想你可以重新设定我的非Rx答案并用Observable.Create包装它。

I'm fairly sure that returning the socket as the IDisposable is the wrong semantics, but not sure what would be.

我很确定以IDisposable返回套接字是错误的语义,但不确定会是什么。

    static IObservable<string> GetMessages(Socket socket, IPAddress addr, int port)
    {
        return Observable.CreateWithDisposable<string>(
            o =>
            {
                byte[] buffer = new byte[1024];

                Action<int, Action<int>> readIntoBuffer = (length, callback) =>
                {
                    var totalRead = 0;

                    AsyncCallback receiveCallback = null;
                    AsyncCallback temp = r =>
                    {
                        var read = socket.EndReceive(r);

                        if (read == 0)
                        {
                            socket.Close();
                            o.OnCompleted();
                            return;
                        }

                        totalRead += read;

                        if (totalRead < length)
                        {
                            socket.BeginReceive(buffer, totalRead, length - totalRead, SocketFlags.None, receiveCallback, null);
                        }
                        else
                        {
                            callback(length);
                        }
                    };
                    receiveCallback = temp;

                    socket.BeginReceive(buffer, totalRead, length, SocketFlags.None, receiveCallback, null);
                };

                Action<int> sizeRead = null;

                Action<int> messageRead = x =>
                {
                    var message = Encoding.UTF8.GetString(buffer, 0, x);
                    o.OnNext(message);
                    readIntoBuffer(4, sizeRead);
                };

                Action<int> temp2 = x =>
                {
                    var size = BitConverter.ToInt32(buffer, 0);
                    readIntoBuffer(size, messageRead);
                };
                sizeRead = temp2;

                AsyncCallback connectCallback = r =>
                {
                    socket.EndConnect(r);
                    readIntoBuffer(4, sizeRead);
                };

                socket.BeginConnect(addr, port, connectCallback, null);

                return socket;
            });
    }