使用yield编写并行程序

时间:2022-11-13 23:30:39
我们知道yield return可以用来生成迭代器,它的原理是将函数的调用堆栈保存起来,并在下一次调用迭代器的MoveNext()方法时恢复堆栈以继续运行.
那么我们在调用一个对象上的阻塞io操作时,可以通过yield return来保存调用堆栈,当阻塞io操作可以继续时,就恢复堆栈继续运行.下面用一个例子来说明假设写一个回显服务器用多线程实现如下
使用yield编写并行程序使用yield编写并行程序     /// <summary>
使用yield编写并行程序    
/// 作者:runner.mei@gmail.com
使用yield编写并行程序    
/// 日期:2008-07-04
使用yield编写并行程序    
/// </summary>

使用yield编写并行程序     class  EchoServerByThread
使用yield编写并行程序使用yield编写并行程序    
{
使用yield编写并行程序        IPAddress _ip;
使用yield编写并行程序        
int _port;
使用yield编写并行程序        
public EchoServerByThread(IPAddress ip, int port)
使用yield编写并行程序使用yield编写并行程序        
{
使用yield编写并行程序            _ip 
= ip;
使用yield编写并行程序            _port 
= port;
使用yield编写并行程序        }

使用yield编写并行程序
使用yield编写并行程序        
public void RunForever()
使用yield编写并行程序使用yield编写并行程序        
{
使用yield编写并行程序            
using (Socket server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
使用yield编写并行程序使用yield编写并行程序            
{
使用yield编写并行程序                server.Bind(
new IPEndPoint(_ip, _port));
使用yield编写并行程序                server.Listen(
9);
使用yield编写并行程序                
whiletrue )
使用yield编写并行程序使用yield编写并行程序                
{
使用yield编写并行程序                    
// 从线程池中取出一个线程来处理接受的连接
使用yield编写并行程序
                    ThreadPool.QueueUserWorkItem(this.OnClient, server.Accept());
使用yield编写并行程序                }

使用yield编写并行程序            }

使用yield编写并行程序        }

使用yield编写并行程序
使用yield编写并行程序
使用yield编写并行程序        
void OnClient(object obj)
使用yield编写并行程序使用yield编写并行程序        
{
使用yield编写并行程序            
try
使用yield编写并行程序使用yield编写并行程序            
{
使用yield编写并行程序                
using (Socket socket = (Socket)obj)
使用yield编写并行程序使用yield编写并行程序                
{
使用yield编写并行程序                    Console.WriteLine(
"接受来自{0}的连接", socket.RemoteEndPoint);
使用yield编写并行程序                    
byte[] buffer = new byte[1024];
使用yield编写并行程序                    
while (true)
使用yield编写并行程序使用yield编写并行程序                    
{
使用yield编写并行程序                        
int i = socket.Receive(buffer);
使用yield编写并行程序                        
if (0 == i)
使用yield编写并行程序                            
break;
使用yield编写并行程序
使用yield编写并行程序                        i 
= socket.Send(buffer, i, SocketFlags.None);
使用yield编写并行程序                        
if (0 == i)
使用yield编写并行程序                            
break;
使用yield编写并行程序                    }

使用yield编写并行程序                    Console.WriteLine(
"来自{0}的连接断开", socket.RemoteEndPoint);
使用yield编写并行程序                    socket.Close();
使用yield编写并行程序                }

使用yield编写并行程序            }

使用yield编写并行程序            
catch
使用yield编写并行程序使用yield编写并行程序            
{
使用yield编写并行程序                
//TODO: what?
使用yield编写并行程序
            }

使用yield编写并行程序        }

使用yield编写并行程序
使用yield编写并行程序        
static void Main(string[] args)
使用yield编写并行程序使用yield编写并行程序        
{
使用yield编写并行程序            
new EchoServerByThread(IPAddress.Any, 30013).RunForever();
使用yield编写并行程序        }

使用yield编写并行程序    }


在这里我们使了一个独立的线程来处理来自客户端的连接.每一个客户端连接就需要一个线程.
下面我们来用yield来实现它,客户端连接处理函数改成下面的形式
使用yield编写并行程序
使用yield编写并行程序        IEnumerator
< Request >  OnClient(Socket socket)
使用yield编写并行程序使用yield编写并行程序        
{
使用yield编写并行程序            Console.WriteLine(
"接受来自{0}的连接", socket.RemoteEndPoint );
使用yield编写并行程序            
byte[] buffer = new byte[1024];
使用yield编写并行程序            
bool state = true;
使用yield编写并行程序            
while (state)
使用yield编写并行程序使用yield编写并行程序            
{
使用yield编写并行程序使用yield编写并行程序                
///在这里我们创建了一个IO读请求,并保存堆栈后返回
使用yield编写并行程序                yield return CreateRequest(socket, SelectMode.SelectRead);
使用yield编写并行程序                
int i = socket.Receive(buffer);
使用yield编写并行程序                
if (0 == i)
使用yield编写并行程序使用yield编写并行程序                
{
使用yield编写并行程序                    state 
= false;
使用yield编写并行程序                    
continue;
使用yield编写并行程序                }

使用yield编写并行程序
使用yield编写并行程序使用yield编写并行程序                
///在这里我们创建了一个IO写请求,并保存堆栈后返回
使用yield编写并行程序                yield return CreateRequest(socket, SelectMode.SelectWrite);
使用yield编写并行程序                state 
=0 != socket.Send(buffer, i, SocketFlags.None));
使用yield编写并行程序            }

使用yield编写并行程序
使用yield编写并行程序            Console.WriteLine(
"来自{0}的连接断开", socket.RemoteEndPoint);
使用yield编写并行程序            socket.Close();
使用yield编写并行程序        }


IO请求对象的定义如下
使用yield编写并行程序         class  Request
使用yield编写并行程序使用yield编写并行程序        
{
使用yield编写并行程序            
public Socket _socket;                 // io对象
使用yield编写并行程序
            public SelectMode _mode;               // io请求类型
使用yield编写并行程序
            public IEnumerator<Request> _callback; // 包含了堆栈的迭代器
使用yield编写并行程序
    
使用yield编写并行程序            
public Request(Socket socket, SelectMode mode, IEnumerator<Request> handler)
使用yield编写并行程序使用yield编写并行程序            
{
使用yield编写并行程序                _socket 
= socket;
使用yield编写并行程序                _mode 
= mode;
使用yield编写并行程序                _callback 
= handler;
使用yield编写并行程序            }

使用yield编写并行程序        }

上面用到的创建函数定义如下
使用yield编写并行程序        Request CreateRequest(Socket socket, SelectMode mode)
使用yield编写并行程序使用yield编写并行程序        
{
使用yield编写并行程序            Request request 
= new Request(socket, mode, null); // 注意在创建时并没有放入包含了堆栈的迭代器
使用yield编写并行程序
            _requests.Add(request);// 将 IO请求对象放在全局的IO请求对象队列
使用yield编写并行程序
            return request;
使用yield编写并行程序        }


和上面的多线程版本对比一下,可以发现在socket.Receive()和socket.Send()前面多了一个yield return其它没有区别.下面我们来实现io请求的检测和运行调度,添加一个全局的运行堆栈队列和一个全局的io请求队列

使用yield编写并行程序使用yield编写并行程序 /// IO请求队列
使用yield编写并行程序List < Request >  _requests  =   new  List < Request > ();
使用yield编写并行程序
使用yield编写并行程序使用yield编写并行程序
/// IO请求可以执行的运行堆栈队列,存放IO请求可以继续执行不会产生
使用yield编写并行程序
/// 阻塞的包含运行堆栈的迭代器

使用yield编写并行程序LinkedList < IEnumerator < Request >>  _currents  =   new  LinkedList < IEnumerator < Request >> ();



调度函数实现如下
使用yield编写并行程序
使用yield编写并行程序使用yield编写并行程序        
/// <summary>
使用yield编写并行程序        
/// 作者:runner.mei@gmail.com
使用yield编写并行程序        
/// 日期:2008-07-04
使用yield编写并行程序        
/// </summary>

使用yield编写并行程序         public   void  RunForever()
使用yield编写并行程序使用yield编写并行程序        
{
使用yield编写并行程序            
while (true)
使用yield编写并行程序使用yield编写并行程序            
{
使用yield编写并行程序                
while (0 != _currents.Count)
使用yield编写并行程序使用yield编写并行程序                
{
使用yield编写并行程序                    
// 处理IO请求可以继续执行不会产生阻塞的IO请求对象
使用yield编写并行程序
                    IEnumerator<Request> enumerator = _currents.First.Value;
使用yield编写并行程序                    _currents.RemoveFirst();
使用yield编写并行程序
使用yield编写并行程序                    
//将运行堆栈恢复,并继续运行
使用yield编写并行程序
                    if (enumerator.MoveNext())
使用yield编写并行程序使用yield编写并行程序                    
{
使用yield编写并行程序                        
// 取本次运行后返回的IO请求对象,将包含运行堆栈的迭代器
使用yield编写并行程序                        
// 保存在它的_callback中请见CreateRequest函数
使用yield编写并行程序
                        enumerator.Current._callback = enumerator;
使用yield编写并行程序                    }

使用yield编写并行程序                }

使用yield编写并行程序
使用yield编写并行程序                List
<Socket> readset = new List<Socket>();
使用yield编写并行程序                List
<Socket> writeset = new List<Socket>();
使用yield编写并行程序
使用yield编写并行程序                
foreach (Request r in _requests)
使用yield编写并行程序使用yield编写并行程序                
{
使用yield编写并行程序                    
if (r._mode == SelectMode.SelectWrite)
使用yield编写并行程序                        writeset.Add(r._socket);
使用yield编写并行程序                    
else
使用yield编写并行程序                        readset.Add(r._socket);
使用yield编写并行程序                }

使用yield编写并行程序
使用yield编写并行程序使用yield编写并行程序                
/// 检测是不是有IO对象可以继续
使用yield编写并行程序                Socket.Select(readset, writeset, null10000);
使用yield编写并行程序                
foreach (Socket socket in readset)
使用yield编写并行程序使用yield编写并行程序                
{
使用yield编写并行程序                    
//该IO对象可以继续io操作
使用yield编写并行程序
                    processRequest(socket);
使用yield编写并行程序                }

使用yield编写并行程序
使用yield编写并行程序                
foreach (Socket socket in writeset)
使用yield编写并行程序使用yield编写并行程序                
{
使用yield编写并行程序                    
//该IO对象可以继续io操作
使用yield编写并行程序
                    processRequest(socket);
使用yield编写并行程序                }

使用yield编写并行程序            }

使用yield编写并行程序        }

使用yield编写并行程序
使用yield编写并行程序        
void  processRequest(Socket socket)
使用yield编写并行程序使用yield编写并行程序        
{
使用yield编写并行程序            
// 根据IO对象找到IO请求对象
使用yield编写并行程序
            Request r = _requests.Find(delegate(Request request)
使用yield编写并行程序使用yield编写并行程序             
return socket == request._socket; });
使用yield编写并行程序            
if (null != r)
使用yield编写并行程序使用yield编写并行程序            
{
使用yield编写并行程序                
// 将它添加到IO请求可以执行的运行堆栈队列,并在稍后执行
使用yield编写并行程序
                _currents.AddLast(r._callback);
使用yield编写并行程序                
// 从IO请求对象队列中删除它
使用yield编写并行程序
                _requests.Remove(r);
使用yield编写并行程序            }

使用yield编写并行程序        }


这样就完整的实现了一个模拟多线程的回显服务了,它有以下几个特点
1.是单线程的,但具有与多线程版本一样的功能,支持同时处理多个客户端连接
2.客户端处理逻辑象多线程版本一样简单,不象异步io那样需要写一个复杂的状态机

下面是完整的源代码
使用yield编写并行程序using  System;
使用yield编写并行程序
using  System.Collections.Generic;
使用yield编写并行程序
using  System.Text;
使用yield编写并行程序
using  System.Net;
使用yield编写并行程序
using  System.Net.Sockets;
使用yield编写并行程序
使用yield编写并行程序
namespace  Networks
使用yield编写并行程序使用yield编写并行程序
{
使用yield编写并行程序    
class Request
使用yield编写并行程序使用yield编写并行程序    
{
使用yield编写并行程序        
public Socket _socket;                 // io对象
使用yield编写并行程序
        public SelectMode _mode;               // io请求类型
使用yield编写并行程序
        public IEnumerator<Request> _callback; // 包含了堆栈的迭代器
使用yield编写并行程序

使用yield编写并行程序        
public Request(Socket socket, SelectMode mode, IEnumerator<Request> handler)
使用yield编写并行程序使用yield编写并行程序        
{
使用yield编写并行程序            _socket 
= socket;
使用yield编写并行程序            _mode 
= mode;
使用yield编写并行程序            _callback 
= handler;
使用yield编写并行程序        }

使用yield编写并行程序    }

使用yield编写并行程序
使用yield编写并行程序使用yield编写并行程序    
/// <summary>
使用yield编写并行程序    
/// 作者:runner.mei@gmail.com
使用yield编写并行程序    
/// 日期:2008-07-04
使用yield编写并行程序    
/// </summary>

使用yield编写并行程序    class EchoServer
使用yield编写并行程序使用yield编写并行程序    
{
使用yield编写并行程序使用yield编写并行程序        
/// IO请求对象队列
使用yield编写并行程序        List<Request> _requests = new List<Request>();
使用yield编写并行程序
使用yield编写并行程序使用yield编写并行程序        
/// IO请求可以执行的运行堆栈队列,存放IO请求可以继续执行不会产生
使用yield编写并行程序        
/// 阻塞的包含运行堆栈的迭代器

使用yield编写并行程序        LinkedList<IEnumerator<Request>> _currents = new LinkedList<IEnumerator<Request>>();
使用yield编写并行程序
使用yield编写并行程序
使用yield编写并行程序        
public EchoServer(IPAddress ip, int port)
使用yield编写并行程序使用yield编写并行程序        
{
使用yield编写并行程序            Socket server 
= new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
使用yield编写并行程序            server.Bind(
new IPEndPoint(ip, port));
使用yield编写并行程序            server.Listen(
9);
使用yield编写并行程序            _currents.AddLast(RunServer(server));
使用yield编写并行程序        }

使用yield编写并行程序
使用yield编写并行程序        
void processRequest(Socket socket)
使用yield编写并行程序使用yield编写并行程序        
{
使用yield编写并行程序            
// 根据IO对象找到IO请求对象
使用yield编写并行程序
            Request r = _requests.Find(delegate(Request request)
使用yield编写并行程序使用yield编写并行程序             
return socket == request._socket; });
使用yield编写并行程序            
if (null != r)
使用yield编写并行程序使用yield编写并行程序            
{
使用yield编写并行程序                
// 将它添加到IO请求可以执行的运行堆栈队列,并在稍后执行
使用yield编写并行程序
                _currents.AddLast(r._callback);
使用yield编写并行程序                
// 从IO请求对象队列中删除它
使用yield编写并行程序
                _requests.Remove(r);
使用yield编写并行程序            }

使用yield编写并行程序        }

使用yield编写并行程序
使用yield编写并行程序使用yield编写并行程序        
/// <summary>
使用yield编写并行程序        
/// 作者:runner.mei@gmail.com
使用yield编写并行程序        
/// 日期:2008-07-04
使用yield编写并行程序        
/// </summary>

使用yield编写并行程序        public void RunForever()
使用yield编写并行程序使用yield编写并行程序        
{
使用yield编写并行程序            
while (true)
使用yield编写并行程序使用yield编写并行程序            
{
使用yield编写并行程序                
while (0 != _currents.Count)
使用yield编写并行程序使用yield编写并行程序                
{
使用yield编写并行程序                    
// 处理IO请求可以继续执行不会产生阻塞的IO请求对象
使用yield编写并行程序
                    IEnumerator<Request> enumerator = _currents.First.Value;
使用yield编写并行程序                    _currents.RemoveFirst();
使用yield编写并行程序
使用yield编写并行程序                    
//将运行堆栈恢复,并继续运行
使用yield编写并行程序
                    if (enumerator.MoveNext())
使用yield编写并行程序使用yield编写并行程序                    
{
使用yield编写并行程序                        
// 取本次运行后返回的IO请求对象,将包含运行堆栈的迭代器
使用yield编写并行程序                        
// 保存在它的_callback中请见CreateRequest函数
使用yield编写并行程序
                        enumerator.Current._callback = enumerator;
使用yield编写并行程序                    }

使用yield编写并行程序                }

使用yield编写并行程序
使用yield编写并行程序                List
<Socket> readset = new List<Socket>();
使用yield编写并行程序                List
<Socket> writeset = new List<Socket>();
使用yield编写并行程序
使用yield编写并行程序                
foreach (Request r in _requests)
使用yield编写并行程序使用yield编写并行程序                
{
使用yield编写并行程序                    
if (r._mode == SelectMode.SelectWrite)
使用yield编写并行程序                        writeset.Add(r._socket);
使用yield编写并行程序                    
else
使用yield编写并行程序                        readset.Add(r._socket);
使用yield编写并行程序                }

使用yield编写并行程序
使用yield编写并行程序使用yield编写并行程序                
/// 检测是不是有IO对象可以继续
使用yield编写并行程序                Socket.Select(readset, writeset, null10000);
使用yield编写并行程序                
foreach (Socket socket in readset)
使用yield编写并行程序使用yield编写并行程序                
{
使用yield编写并行程序                    
//该IO对象可以继续io操作
使用yield编写并行程序
                    processRequest(socket);
使用yield编写并行程序                }

使用yield编写并行程序
使用yield编写并行程序                
foreach (Socket socket in writeset)
使用yield编写并行程序使用yield编写并行程序                
{
使用yield编写并行程序                    
//该IO对象可以继续io操作
使用yield编写并行程序
                    processRequest(socket);
使用yield编写并行程序                }

使用yield编写并行程序            }

使用yield编写并行程序        }

使用yield编写并行程序
使用yield编写并行程序        Request CreateRequest(Socket socket, SelectMode mode)
使用yield编写并行程序使用yield编写并行程序        
{
使用yield编写并行程序            Request request 
= new Request(socket, mode, null); // 注意在创建时并没有放入包含了堆栈的迭代器
使用yield编写并行程序
            _requests.Add(request);// 将 IO请求对象放在全局的IO请求对象队列
使用yield编写并行程序
            return request;
使用yield编写并行程序        }

使用yield编写并行程序
使用yield编写并行程序        IEnumerator
<Request> RunServer(Socket server)
使用yield编写并行程序使用yield编写并行程序        
{
使用yield编写并行程序            
for (; ; )
使用yield编写并行程序使用yield编写并行程序            
{
使用yield编写并行程序使用yield编写并行程序                
///在这里我们创建了一个IO读请求,并保存堆栈后返回
使用yield编写并行程序                yield return CreateRequest(server, SelectMode.SelectRead);
使用yield编写并行程序                _currents.AddLast(OnClient(server.Accept()));
使用yield编写并行程序            }

使用yield编写并行程序        }

使用yield编写并行程序
使用yield编写并行程序        IEnumerator
<Request> OnClient(Socket socket)
使用yield编写并行程序使用yield编写并行程序        
{
使用yield编写并行程序            Console.WriteLine(
"接受来自{0}的连接", socket.RemoteEndPoint);
使用yield编写并行程序            
byte[] buffer = new byte[1024];
使用yield编写并行程序            
bool state = true;
使用yield编写并行程序            
while (state)
使用yield编写并行程序使用yield编写并行程序            
{
使用yield编写并行程序使用yield编写并行程序                
///在这里我们创建了一个IO读请求,并保存堆栈后返回
使用yield编写并行程序                yield return CreateRequest(socket, SelectMode.SelectRead);
使用yield编写并行程序                
int i = socket.Receive(buffer);
使用yield编写并行程序                
if (0 == i)
使用yield编写并行程序使用yield编写并行程序                
{
使用yield编写并行程序                    state 
= false;
使用yield编写并行程序                    
continue;
使用yield编写并行程序                }

使用yield编写并行程序
使用yield编写并行程序使用yield编写并行程序                
///在这里我们创建了一个IO写请求,并保存堆栈后返回
使用yield编写并行程序                yield return CreateRequest(socket, SelectMode.SelectWrite);
使用yield编写并行程序                state 
= (0 != socket.Send(buffer, i, SocketFlags.None));
使用yield编写并行程序            }

使用yield编写并行程序
使用yield编写并行程序            Console.WriteLine(
"来自{0}的连接断开", socket.RemoteEndPoint);
使用yield编写并行程序            socket.Close();
使用yield编写并行程序        }

使用yield编写并行程序        
static void Main(string[] args)
使用yield编写并行程序使用yield编写并行程序        
{
使用yield编写并行程序            
new EchoServer(IPAddress.Any, 30013).RunForever();
使用yield编写并行程序        }

使用yield编写并行程序    }

使用yield编写并行程序}