监听TCP链接
use std::net::TcpListener;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7887").unwrap(); //监听7887端口,成功后,就创建一个linstener
for stream in listener.incoming() { // listener.incoming 会返回产生一流序列的迭代器;每一个链接,都会产生一个流序列
println!("connectiond!!");
}
}
获取请求数据
use std::{io::Read, net::{TcpListener, TcpStream}};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7887").unwrap(); //监听7887端口,成功后,就创建一个linstener
for stream in listener.incoming() { // listener.incoming 会返回产生一流序列的迭代器;每一个链接,都会产生一个流序列
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream : TcpStream) {
let mut buffer = [0; 512]; //定义一个数组存放数据
stream.read(&mut buffer).unwrap(); //使用read方法读取数据
println!("Request : {}",String::from_utf8_lossy(&buffer[..])) //将数组数据转换为字符串
}
响应Http请求
fn handle_connection(mut stream : TcpStream) {
let mut buffer = [0; 512]; //定义一个数组存放数据
stream.read(&mut buffer).unwrap(); //使用read方法读取数据
//println!("Request : {}",String::from_utf8_lossy(&buffer[..])) //将数组数据转换为字符串
//响应
let response = "HTTP/1.1 200 0k\r\n\r\n"; //协议,状态码 数据
stream.write(response.as_bytes()).unwrap(); // write()接收的是数组,所以需要转换下
stream.flush().unwrap(); //阻塞程序运行,直到数据都写入完成
}
返回一个http数据
fn handle_connection(mut stream : TcpStream) {
let mut buffer = [0; 2048]; //定义一个数组存放数据
stream.read(&mut buffer).unwrap(); //使用read方法读取数据
//println!("Request : {}",String::from_utf8_lossy(&buffer[..])) //将数组数据转换为字符串
let contents = fs::read_to_string("hello.html").unwrap();
//响应
let response = "HTTP/1.1 200 0k\r\n\r\n"; //协议,状态码 数据
let response = format!("{}{}",response,contents);
stream.write(response.as_bytes()).unwrap(); // write()接收的是数组,所以需要转换下
stream.flush().unwrap(); //阻塞程序运行,直到数据都写入完成
}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>hello!</title>
</head>
<body>
<h1>hello</h1>
<p>Hi from Rust</p>
</body>
</html>
hello.html如上所示
访问其他路径报故障
fn handle_connection(mut stream : TcpStream) {
let mut buffer = [0; 2048]; //定义一个数组存放数据
stream.read(&mut buffer).unwrap(); //使用read方法读取数据
//println!("Request : {}",String::from_utf8_lossy(&buffer[..])); //将数组数据转换为字符串
let get = b"GET / HTTP/1.1\r\n"; //加个b 转换成字节字符串,可进行比较
if buffer.starts_with(get) { //缓冲区是否以get开头
let contents = fs::read_to_string("hello.html").unwrap();
//响应
let response = "HTTP/1.1 200 0k\r\n\r\n"; //协议,状态码 数据
let response = format!("{}{}",response,contents);
stream.write(response.as_bytes()).unwrap(); // write()接收的是数组,所以需要转换下
stream.flush().unwrap(); //阻塞程序运行,直到数据都写入完成
} else {
let status_line = "HTTP/1.1 404 NOT FOUND\r\n\r\n";
let contents = fs::read_to_string("404.html").unwrap();
let response = format!("{}{}",status_line,contents);
stream.write(response.as_bytes()).unwrap(); // write()接收的是数组,所以需要转换下
stream.flush().unwrap(); //阻塞程序运行,直到数据都写入完成
}
404.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Hello!</title>
</head>
<body>
<h1>Oops!</h1>
<p>Sorry,I don't Know what you're asking for.</p>
</body>
</html>
代码重构
fn handle_connection(mut stream : TcpStream) {
let mut buffer = [0; 2048]; //定义一个数组存放数据
stream.read(&mut buffer).unwrap(); //使用read方法读取数据
//println!("Request : {}",String::from_utf8_lossy(&buffer[..])); //将数组数据转换为字符串
let get = b"GET / HTTP/1.1\r\n"; //加个b 转换成字节字符串,可进行比较
let (status_line , filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 0k\r\n\r\n" , "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n" , "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!("{}{}",status_line,contents);
stream.write(response.as_bytes()).unwrap(); // write()接收的是数组,所以需要转换下
stream.flush().unwrap(); //阻塞程序运行,直到数据都写入完成
}
多线程
最简单的
fn main() {
let listener = TcpListener::bind("127.0.0.1:7887").unwrap(); //监听7887端口,成功后,就创建一个linstener
for stream in listener.incoming() { // listener.incoming 会返回产生一流序列的迭代器;每一个链接,都会产生一个流序列
let stream = stream.unwrap();
thread::spawn(||{
handle_connection(stream);
});
}
}
这样来一个请求就会创建一个线程
但是如果被攻击,就会一直创建很多线程
加入线程池
main.rs
use std::{fs, io::{Read, Write}, net::{TcpListener, TcpStream}, thread, time::Duration};
use project_web::ThreadPool;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7887").unwrap(); //监听7887端口,成功后,就创建一个linstener
let pool = ThreadPool::new(4);
for stream in listener.incoming() { // listener.incoming 会返回产生一流序列的迭代器;每一个链接,都会产生一个流序列
let stream = stream.unwrap();
pool.execute(||{
handle_connection(stream);
});
}
}
fn handle_connection(mut stream : TcpStream) {
let mut buffer = [0; 2048]; //定义一个数组存放数据
stream.read(&mut buffer).unwrap(); //使用read方法读取数据
//println!("Request : {}",String::from_utf8_lossy(&buffer[..])); //将数组数据转换为字符串
let get = b"GET / HTTP/1.1\r\n"; //加个b 转换成字节字符串,可进行比较
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line , filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 0k\r\n\r\n" , "hello.html")
} else if buffer.starts_with(sleep) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 0k\r\n\r\n" , "hello.html")
}
else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n" , "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!("{}{}",status_line,contents);
stream.write(response.as_bytes()).unwrap(); // write()接收的是数组,所以需要转换下
stream.flush().unwrap(); //阻塞程序运行,直到数据都写入完成
}
lib.rs
use std::sync::mpsc;
use std::thread;
use std::sync::Arc;
use std::sync::Mutex;
type Job = Box<dyn FnBox + Send + 'static>;
pub struct ThreadPool {
workers : Vec<Worker>, //这个类型是模仿 thread::spawn() 的函数返回
sender : mpsc::Sender<Job>,
}
impl ThreadPool {
/// Create a new ThreadPool
///
/// The size is the number of threads in the Pool
///
/// # Panics
///
/// The `new` function will panic if the size is zero
pub fn new(size : usize) -> ThreadPool {
assert!(size > 0);
let (sender,receive) = mpsc::channel();
let receive = Arc::new(Mutex::new(receive));
let mut workers = Vec::with_capacity(size); //创建一个预分配好空间的vec
for id in 0..size {
//创建线程并保存到vec中
//这里只想创建线程,而并不想立即执行函数,需要等下一步传入函数后再进行执行
workers.push(Worker::new(id , Arc::clone(&receive)));
}
ThreadPool { workers , sender }
}
pub fn execute<F>(&self,f : F)
where F : FnBox + Send + 'static //这是线程创建时传入的闭包类型
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id : usize,
thread : thread::JoinHandle<()>,
}
impl Worker {
fn new(id : usize,receive : Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
while let Ok(job) = receive.lock().unwrap().recv() {
println!("worker {} get a job,execute",id);
job.call_box();
}
//let job = receive.lock().unwrap().recv().unwrap(); //这里接收传递过来的闭包,但是这种类型不能直接解引用
//*(job)(); 这样解构有问题
}
} );
Worker{ id , thread }
}
}
trait FnBox {
fn call_box(self: Box<Self>);
}
impl<F> FnBox for F
where F : FnOnce()
{
fn call_box(self: Box<F>) {
(*self)()
}
}
优雅地停机
use std::{fs, io::{Read, Write}, net::{TcpListener, TcpStream}, thread, time::Duration};
use project_web::ThreadPool;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7887").unwrap(); //监听7887端口,成功后,就创建一个linstener
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) { // listener.incoming 会返回产生一流序列的迭代器;每一个链接,都会产生一个流序列
let stream = stream.unwrap();
pool.execute(||{
handle_connection(stream);
});
}
thread::sleep(Duration::from_secs(1));
println!("ShutDown!! Bye!!");
}
fn handle_connection(mut stream : TcpStream) {
let mut buffer = [0; 2048]; //定义一个数组存放数据
stream.read(&mut buffer).unwrap(); //使用read方法读取数据
//println!("Request : {}",String::from_utf8_lossy(&buffer[..])); //将数组数据转换为字符串
let get = b"GET / HTTP/1.1\r\n"; //加个b 转换成字节字符串,可进行比较
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line , filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 0k\r\n\r\n" , "hello.html")
} else if buffer.starts_with(sleep) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 0k\r\n\r\n" , "hello.html")
}
else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n" , "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!("{}{}",status_line,contents);
stream.write(response.as_bytes()).unwrap(); // write()接收的是数组,所以需要转换下
stream.flush().unwrap(); //阻塞程序运行,直到数据都写入完成
}
use std::sync::mpsc;
use std::thread;
use std::sync::Arc;
use std::sync::Mutex;
type Job = Box<dyn FnBox + Send + 'static>;
pub struct ThreadPool {
workers : Vec<Worker>, //这个类型是模仿 thread::spawn() 的函数返回
sender : mpsc::Sender<Message>,
}
enum Message {
NewJob(Job),
Terminate,
}
impl ThreadPool {
/// Create a new ThreadPool
///
/// The size is the number of threads in the Pool
///
/// # Panics
///
/// The `new` function will panic if the size is zero
pub fn new(size : usize) -> ThreadPool {
assert!(size > 0);
let (sender,receive) = mpsc::channel();
let receive = Arc::new(Mutex::new(receive));
let mut workers = Vec::with_capacity(size); //创建一个预分配好空间的vec
for id in 0..size {
//创建线程并保存到vec中
//这里只想创建线程,而并不想立即执行函数,需要等下一步传入函数后再进行执行
workers.push(Worker::new(id , Arc::clone(&receive)));
}
ThreadPool { workers , sender }
}
pub fn execute<F>(&self,f : F)
where F : FnBox + Send + 'static //这是线程创建时传入的闭包类型
{
let job = Box::new(f);
let job = Message::NewJob(job);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self){
for _ in &mut self.workers {
self.sender.send(Message::Terminate).unwrap();
}
for wroker in &mut self.workers {
println!("shutting down worker {}",wroker.id);
//wroker.thread.join().unwrap();
if let Some(thread) = wroker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id : usize,
thread : Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id : usize,receive : Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receive.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("worker {} get a job,execute",id);
job.call_box();
},
Message::Terminate => {
println!("shutdown!!!");
break;
}
}
//let job = receive.lock().unwrap().recv().unwrap(); //这里接收传递过来的闭包,但是这种类型不能直接解引用
//*(job)(); 这样解构有问题
}
} );
Worker{ id : id , thread: Some(thread) }
}
}
trait FnBox {
fn call_box(self: Box<Self>);
}
impl<F> FnBox for F
where F : FnOnce()
{
fn call_box(self: Box<F>) {
(*self)()
}
}
这只是一个简单的小工程,目的是为了复习之前的知识;工程中还是有很多问题的。