目录
01_wait_group.cc
02_http_wait.cc
03_httptask_callback.cc
04_http_task_request.cc
05_redis_task_callback.cc
06_redistask_read.cc
07_series.cc
08_series_dynamic.cc
09_context.cc
10_parallel_work.cc
作业:
01 存在下列的redis键值映射关系,使用workflow的redis任务和序列,假如只知道"x1",如何找到最终的"100"?"x1" --> "x2""x2" --> "x3""x3" --> "x4""x4" --> "100"
02 读取某个网站的内容,并且存入redis服务端当中(比如先访问淘宝,再set www.taobao.com 淘宝的html内容)
03 阅读下面的代码并尝试添加注释
01_wait_group.cc
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <signal.h>
#include <iostream>
using std::cout;
using std::cerr;
static WFFacilities::WaitGroup WaitGroup(1);
void handler(int signum){
cout<<"done\n";
WaitGroup.done();
}
int main(void)
{
signal(SIGINT,handler);
//创建任务
WFHttpTask * httpTask=WFTaskFactory::create_http_task(
/* "http://www.baidu.com", */
"http://localhost/en/index.html",
10,
10,
nullptr
);
//交给框架执行
httpTask->start();
WaitGroup.wait();
cout<<"finish\n";
return 0;
}
/* static WFHttpTask *create_http_task(const std::string& url, */
/* int redirect_max,//最大重定向次数 */
/* int retry_max, //最大重试次数*/
/* http_callback_t callback//回调函数); */
02_http_wait.cc
// wait_group 实现有条件的等待
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <iostream>
#include <signal.h>
using std::cout;
using std::cerr;
static WFFacilities::WaitGroup waitGroup(1);
void handler(int signum){
cout << "done\n";
waitGroup.done();
}
int main(){
signal(SIGINT,handler);
// 用户代码 1 创建任务
WFHttpTask * httpTask = WFTaskFactory::create_http_task(
//"http://www.baidu.com",
"http://localhost/en/index.html",
10,
10,
nullptr
);
// 用户代码 2 把任务交给框架
httpTask->start();
waitGroup.wait();
cout << "finish!\n";
}
03_httptask_callback.cc
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/HttpUtil.h>
#include <signal.h>
#include <iostream>
using std::cout;
using std::cerr;
//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);
void sighandler(int signum){
waitGroup.done();//waitGroup(--num);
cout<<"done\n";
}
//------------------------------------//
// httpCallback()
//------------------------------------//
void httpCallback(WFHttpTask* httpTask){
cout<<"callback is called\n";
protocol::HttpRequest* req=httpTask->get_req();
protocol::HttpResponse* resq=httpTask->get_resp();
int state=httpTask->get_state();
int error=httpTask->get_error();//错误原因
//状态处理
switch(state) {
case WFT_STATE_SYS_ERROR: // 系统错误
cerr <<"system error: " << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR: // DNS错误
cerr <<"DNS error: " << gai_strerror(error) << "\n";
break;
case WFT_STATE_SSL_ERROR: // SSL错误
cerr <<"SSL error: " << error << "\n";
break;
case WFT_STATE_TASK_ERROR: // 任务错误
cerr <<"Task error: "<< error << "\n";
break;
case WFT_STATE_SUCCESS: // 请求成功
break;
}
//错误处理
if (state != WFT_STATE_SUCCESS) {
cerr <<"Failed. Press Ctrl-C to exit.\n";
return;
}
//------------------------------------//
//请求报文-起始行-首部字段
cerr<<"method="<<req->get_method()<<"\n";
cerr<<"version="<<req->get_http_version()<<"\n";
cerr<<"path& query="<<req->get_request_uri()<<"\n";
std::string name;
std::string value;
protocol::HttpHeaderCursor req_cursor(req);//类似迭代器,初始化
while(req_cursor.next(name,value)){
cerr<<"name ="<<name<<" value ="<<value<<"\n";
}
cerr<<"\n";
//------------------------------------//
//响应报文-起始行-首部字段-报文体
cerr<<"version="<<resq->get_http_version()<<"\n";
cerr<<"state code="<<resq->get_status_code()<<"\n";
cerr<<"reason phrea="<<resq->get_reason_phrase()<<"\n";
protocol::HttpHeaderCursor resp_cursor(resq);//类似迭代器,初始化
while(resp_cursor.next(name,value)){
cerr<<"name ="<<name<<" value ="<<value<<"\n";
}
const void * body;
size_t body_len;
resq->get_parsed_body(&body,&body_len);
cerr<<static_cast<const char*>(body)<<"\n";
}
//------------------------------------//
// main()
//------------------------------------//
int main(void)
{
signal(SIGINT,sighandler);
WFHttpTask* httpTask=WFTaskFactory::create_http_task(
"http://www.taobao.com",10,10,httpCallback);
httpTask->start();
waitGroup.wait();
cerr<<"\nfinish\n";
return 0;
}
04_http_task_request.cc
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/HttpMessage.h>
#include <workflow/HttpUtil.h>
#include <signal.h>
#include <iostream>
using std::cout;
using std::cerr;
//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);
void sighandler(int signum){
waitGroup.done();//waitGroup(--num);
cout<<"done\n";
}
//------------------------------------//
// httpCallback()
//------------------------------------//
void httpCallback(WFHttpTask* httpTask){
cout<<"callback is called\n";
protocol::HttpRequest* req=httpTask->get_req();
protocol::HttpResponse* resq=httpTask->get_resp();
int state=httpTask->get_state();
int error=httpTask->get_error();//错误原因
//状态处理
switch(state) {
case WFT_STATE_SYS_ERROR: // 系统错误
cerr <<"system error: " << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR: // DNS错误
cerr <<"DNS error: " << gai_strerror(error) << "\n";
break;
case WFT_STATE_SSL_ERROR: // SSL错误
cerr <<"SSL error: " << error << "\n";
break;
case WFT_STATE_TASK_ERROR: // 任务错误
cerr <<"Task error: "<< error << "\n";
break;
case WFT_STATE_SUCCESS: // 请求成功
break;
}
//错误处理
if (state != WFT_STATE_SUCCESS) {
cerr <<"Failed. Press Ctrl-C to exit.\n";
return;
}
//------------------------------------//
//请求报文-起始行-首部字段
cerr<<"method="<<req->get_method()<<"\n";
cerr<<"version="<<req->get_http_version()<<"\n";
cerr<<"path& query="<<req->get_request_uri()<<"\n";
std::string name;
std::string value;
protocol::HttpHeaderCursor req_cursor(req);//类似迭代器,初始化
while(req_cursor.next(name,value)){
cerr<<"name ="<<name<<" value ="<<value<<"\n";
}
cerr<<"\n";
//------------------------------------//
//响应报文-起始行-首部字段-报文体
cerr<<"version="<<resq->get_http_version()<<"\n";
cerr<<"state code="<<resq->get_status_code()<<"\n";
cerr<<"reason phrea="<<resq->get_reason_phrase()<<"\n";
protocol::HttpHeaderCursor resp_cursor(resq);//类似迭代器,初始化
while(resp_cursor.next(name,value)){
cerr<<"name ="<<name<<" value ="<<value<<"\n";
}
/* const void * body; */
/* size_t body_len; */
/* resq->get_parsed_body(&body,&body_len); */
/* cerr<<static_cast<const char*>(body)<<"\n"; */
/* GET: 通常用于请求数据,不会对服务器的状态产生副作用。 */
/* POST: 用于提交数据,通常会导致服务器状态改变或者创建新的资源。 */
/* 百度的接口: */
/* 百度的搜索接口一般是通过 GET 请求来获取搜索结果。因此,即使您将方法设置为 POST,如果服务器只支持 GET,请求仍然会被处理为 GET。 */
}
//------------------------------------//
// main()
//------------------------------------//
int main(void)
{
signal(SIGINT,sighandler);
WFHttpTask* httpTask=WFTaskFactory::create_http_task(
"http://www.taobao.com",10,10,httpCallback);
//找到请求并设置残数
protocol::HttpRequest* req=httpTask->get_req();
req->set_method("POST");
req->set_request_uri("/s?wd=123");//百度查询接口
req->add_header_pair("myname","workflow");
cerr<<"method="<<req->get_method()<<"\n";
cerr<<"version="<<req->get_http_version()<<"\n";
cerr<<"path& query="<<req->get_request_uri()<<"\n";
cerr<<"\n";
cerr<<"\n";
httpTask->start();
waitGroup.wait();
cerr<<"\nfinish\n";
return 0;
}
05_redis_task_callback.cc
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <signal.h>
#include <iostream>
using std::cout;
using std::cerr;
//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);
void sighandler(int signum){
waitGroup.done();//waitGroup(--num);
cout<<"done\n";
}
//------------------------------------//
// redisCallback()
//------------------------------------//
void redisCallback(WFRedisTask* redistask){
cout<<"callback is called\n";
protocol::RedisRequest *req=redistask->get_req();
protocol::RedisResponse* resp=redistask->get_resp();
int state=redistask->get_state();
int error=redistask->get_error();
//状态处理
switch(state) {
case WFT_STATE_SYS_ERROR: // 系统错误
cerr <<"system error: " << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR: // DNS错误
cerr <<"DNS error: " << gai_strerror(error) << "\n";
break;
case WFT_STATE_SSL_ERROR: // SSL错误
cerr <<"SSL error: " << error << "\n";
break;
case WFT_STATE_TASK_ERROR: // 任务错误
cerr <<"Task error: "<< error << "\n";
break;
case WFT_STATE_SUCCESS: // 请求成功
break;
}
//错误处理
if (state != WFT_STATE_SUCCESS) {
cerr <<"Failed. Press Ctrl-C to exit.\n";
return;
}
cout<<"callback is end\n";
}
//------------------------------------//
// main()
//------------------------------------//
int main(void)
{
signal(SIGINT,sighandler);
WFRedisTask* redistask=WFTaskFactory::create_redis_task(
"redis://127.0.0.1:6379",10,redisCallback);
cerr<<"\n";
//找到修改请求
protocol::RedisRequest *req=redistask->get_req();
req->set_request("SET",{"huasheng","lovexixi"});
redistask->start();
waitGroup.wait();
cerr<<"\nfinish\n";
return 0;
}
06_redistask_read.cc
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <signal.h>
#include <iostream>
using std::cout;
using std::cerr;
//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);
void sighandler(int signum){
waitGroup.done();//waitGroup(--num);
cout<<"done\n";
}
//------------------------------------//
// redisCallback()
//------------------------------------//
void redisCallback(WFRedisTask* redistask){
cout<<"callback is called\n";
protocol::RedisRequest *req=redistask->get_req();
protocol::RedisResponse* resp=redistask->get_resp();
int state=redistask->get_state();
int error=redistask->get_error();
protocol::RedisValue val;
//状态处理
switch(state) {
case WFT_STATE_SYS_ERROR: // 系统错误
cerr <<"system error: " << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR: // DNS错误
cerr <<"DNS error: " << gai_strerror(error) << "\n";
break;
case WFT_STATE_SSL_ERROR: // SSL错误
cerr <<"SSL error: " << error << "\n";
break;
case WFT_STATE_TASK_ERROR: // 任务错误
cerr <<"Task error: "<< error << "\n";
break;
case WFT_STATE_SUCCESS: // 请求成功
resp->get_result(val);
if(val.is_error()){
cerr<<"error reply,need a password? \n";
state=WFT_STATE_TASK_ERROR;
}
break;
}
//错误处理
if (state != WFT_STATE_SUCCESS) {
cerr <<"Failed. Press Ctrl-C to exit.\n";
return;
}
//------------------------------------//
//查看redis执行的结果
/* protocol::RedisValue val; */
/* resp->get_result(val); */
if(val.is_string()){
cerr<<"value is string: "<<val.string_value()<<"\n";
}
if(val.is_array()){
cerr<<"value is array: \n ";
for(size_t i=0;i<val.arr_size();++i){
cerr<<i<<" value:"<<val.arr_at(i).string_value()<<"\n";
}
}
cout<<"callback is end\n";
}
//------------------------------------//
// main()
//------------------------------------//
int main(void)
{
signal(SIGINT,sighandler);
WFRedisTask* redistask=WFTaskFactory::create_redis_task(
"redis://127.0.0.1:6379",10,redisCallback);
cerr<<"\n";
//找到修改请求
protocol::RedisRequest *req=redistask->get_req();
/* req->set_request("SET",{"huasheng","lovexixi"}); */
req->set_request("HGETALL",{"aa"});
redistask->start();
waitGroup.wait();
cerr<<"\nfinish\n";
return 0;
}
07_series.cc
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>
#include <signal.h>
#include <unistd.h>
#include <iostream>
using std::cout;
using std::cerr;
//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);
void sighandler(int signum){
waitGroup.done();//waitGroup(--num);
cout<<"done\n";
}
//------------------------------------//
// redisCallback()
//------------------------------------//
void redisCallback(WFRedisTask* redistask){
cout<<"callback is called\n";
protocol::RedisRequest *req=redistask->get_req();
protocol::RedisResponse* resp=redistask->get_resp();
int state=redistask->get_state();
int error=redistask->get_error();
protocol::RedisValue val;
//状态处理
switch(state) {
case WFT_STATE_SYS_ERROR: // 系统错误
cerr <<"system error: " << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR: // DNS错误
cerr <<"DNS error: " << gai_strerror(error) << "\n";
break;
case WFT_STATE_SSL_ERROR: // SSL错误
cerr <<"SSL error: " << error << "\n";
break;
case WFT_STATE_TASK_ERROR: // 任务错误
cerr <<"Task error: "<< error << "\n";
break;
case WFT_STATE_SUCCESS: // 请求成功
resp->get_result(val);
if(val.is_error()){
cerr<<"error reply,need a password? \n";
state=WFT_STATE_TASK_ERROR;
}
break;
}
//错误处理
if (state != WFT_STATE_SUCCESS) {
cerr <<"Failed. Press Ctrl-C to exit.\n";
return;
}
//------------------------------------//
//查看redis执行的结果
/* protocol::RedisValue val; */
/* resp->get_result(val); */
if(val.is_string()){
cerr<<"value is string: "<<val.string_value()<<"\n";
}
if(val.is_array()){
cerr<<"value is array: \n ";
for(size_t i=0;i<val.arr_size();++i){
cerr<<i<<" value:"<<val.arr_at(i).string_value()<<"\n";
}
}
/* sleep(2); */
cout<<"callback is end\n";
}
//------------------------------------//
// main()
//------------------------------------//
int main(void)
{
signal(SIGINT,sighandler);
WFRedisTask* redistask=WFTaskFactory::create_redis_task(
"redis://127.0.0.1:6379",10,redisCallback);
protocol::RedisRequest *req=redistask->get_req();
req->set_request("SET",{"07dada","07lovexixi"});
WFRedisTask* redistask1=WFTaskFactory::create_redis_task(
"redis://127.0.0.1:6379",10,redisCallback);
protocol::RedisRequest *req1=redistask1->get_req();
req1->set_request("GET",{"07dada"});
/* redistask->start(); */
/* redistask1->start(); */
//没有固定先后顺序
SeriesWork* series=Workflow::create_series_work(redistask,nullptr);
series->push_back(redistask1);
series->start();
waitGroup.wait();
cerr<<"\nfinish\n";
return 0;
}
08_series_dynamic.cc
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>
#include <signal.h>
#include <unistd.h>
#include <iostream>
using std::cout;
using std::cerr;
//------------------------------------//
static WFFacilities::WaitGroup waitGroup(1);
void sighandler(int signum){
waitGroup.done();//waitGroup(--num);
cout<<"done\n";
}
//------------------------------------//
// redisCallback()
//------------------------------------//
void redisCallback1(WFRedisTask* redistask){
cout<<"callback111\n";
}
void redisCallback(WFRedisTask* redistask){
cout<<"callback is called\n";
protocol::RedisRequest *req=redistask->get_req();
protocol::RedisResponse* resp=redistask->get_resp();
int state=redistask->get_state();
int error=redistask->get_error();
protocol::RedisValue val;
//状态处理
switch(state) {
case WFT_STATE_SYS_ERROR: // 系统错误
cerr <<"system error: " << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR: // DNS错误
cerr <<"DNS error: " << gai_strerror(error) << "\n";
break;
case WFT_STATE_SSL_ERROR: // SSL错误
cerr <<"SSL error: " << error << "\n";
break;
case WFT_STATE_TASK_ERROR: // 任务错误
cerr <<"Task error: "<< error << "\n";
break;
case WFT_STATE_SUCCESS: // 请求成功
resp->get_result(val);
if(val.is_error()){
cerr<<"error reply,need a password? \n";
state=WFT_STATE_TASK_ERROR;
}
break;
}
//错误处理
if (state != WFT_STATE_SUCCESS) {
cerr <<"Failed. Press Ctrl-C to exit.\n";
return;
}
//------------------------------------//
if(val.is_string()){
cerr<<"value is string: "<<val.string_value()<<"\n";
//在正在执行的任务队列中添加任务
WFRedisTask* redistask1=WFTaskFactory::create_redis_task(
/* "redis://127.0.0.1:6379",10,redisCallback); */
"redis://127.0.0.1:6379",10,redisCallback1);
//递归调用
redistask1->get_req()->set_request("SET",{"07dada","07lovexixi"});
series_of(redistask)->push_back(redistask1);
}
if(val.is_array()){
cerr<<"value is array: \n ";
for(size_t i=0;i<val.arr_size();++i){
cerr<<i<<" value:"<<val.arr_at(i).string_value()<<"\n";
}
}
/* sleep(2); */
cout<<"callback is end\n";
}
//------------------------------------//
// main()
//------------------------------------//
int main(void)
{
signal(SIGINT,sighandler);
WFRedisTask* redistask1=WFTaskFactory::create_redis_task(
"redis://127.0.0.1:6379",10,redisCallback);
protocol::RedisRequest *req1=redistask1->get_req();
req1->set_request("GET",{"07dada"});
/* redistask->start(); */
redistask1->start();
//没有固定先后顺序
/* SeriesWork* series=Workflow::create_series_work(redistask,nullptr); */
/* series->push_back(redistask1); */
/* series->start(); */
waitGroup.wait();
cerr<<"\nfinish\n";
return 0;
}
09_context.cc
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>
#include <signal.h>
#include <unistd.h>
#include <iostream>
using std::cout;
using std::cerr;
using std::string;
using std::vector;
//------------------------------------//
struct SeriesContext{
int id;
std::string name;
};
static WFFacilities::WaitGroup waitGroup(1);
void sighandler(int signum){
waitGroup.done();//waitGroup(--num);
cout<<"done\n";
}
//------------------------------------//
// redisCallback()
//------------------------------------//
void redisCallback1(WFRedisTask *redisTask){
cerr << "xixi 1 begin\n";
SeriesContext * context = static_cast<SeriesContext *>(series_of(redisTask)->get_context());
cerr << "before id = " << context->id << " name = " << context->name << "\n";
context->id = 1001;
context->name = "task1";
cerr << "after id = " << context->id << " name = " << context->name << "\n";
cerr << "xixi 1 end!\n";
}
void redisCallback2(WFRedisTask *redisTask){
cerr << "xixi 2 begin\n";
SeriesContext * context = static_cast<SeriesContext *>(series_of(redisTask)->get_context());
cerr << "before id = " << context->id << " name = " << context->name << "\n";
context->id = 1002;
context->name = "task2";
cerr << "after id = " << context->id << " name = " << context->name << "\n";
cerr << "xixi 2 end!\n";
}
void redisCallback3(WFRedisTask *redisTask){
cerr << "xixi 3 begin\n";
SeriesContext * context = static_cast<SeriesContext *>(series_of(redisTask)->get_context());
cerr << "before id = " << context->id << " name = " << context->name << "\n";
cerr << "xixi 3 end!\n";
}
void Callback(const SeriesWork* series){
SeriesContext* context=static_cast<SeriesContext*>(series->get_context());
cerr<<"callback id ="<<context->id<<" name="<<context->name<<"\n";
delete context;
}
//------------------------------------//
// main()
//------------------------------------//
int main(void)
{
signal(SIGINT,sighandler);
WFRedisTask * redisTask1 = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback1);
redisTask1->get_req()->set_request("SET",{"key","123"});
WFRedisTask * redisTask2 = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2);
redisTask2->get_req()->set_request("SET",{"key","123"});
WFRedisTask * redisTask3 = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback3);
redisTask3->get_req()->set_request("SET",{"key","123"});
SeriesWork* series=Workflow::create_series_work(redisTask1,nullptr);
series->push_back(redisTask2);
/* series->push_back(redisTask3); */
SeriesContext* context=new SeriesContext({1000,"mian"});
series->set_context(context);
series->set_callback([context](const SeriesWork*series){
cerr<<"callback id ="<<context->id<<" name="<<context->name<<"\n";
delete context;
});
series->start();
waitGroup.wait();
cerr<<"\nfinish\n";
return 0;
}
10_parallel_work.cc
// wait_group 实现有条件的等待
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>
#include <iostream>
#include <signal.h>
using std::cout;
using std::cerr;
using std::vector;
using std::string;
struct SeriesContext{
string url;
size_t body_len;
};
static WFFacilities::WaitGroup waitGroup(1);
void handler(int signum){
cout << "done\n";
waitGroup.done();
}
void httpCallback(WFHttpTask *httpTask){
protocol::HttpResponse *resp = httpTask->get_resp(); // 获取响应
int state = httpTask->get_state(); // 获取状态
int error = httpTask->get_error(); // 获取错误原因
switch (state){
case WFT_STATE_SYS_ERROR:
cerr <<"system error: " << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR:
cerr <<"DNS error: " << gai_strerror(error) << "\n";
break;
case WFT_STATE_SSL_ERROR:
cerr <<"SSL error\n";
break;
case WFT_STATE_TASK_ERROR:
cerr <<"Task error\n";
break;
case WFT_STATE_SUCCESS:
break;
}
if (state != WFT_STATE_SUCCESS){
cerr <<"Failed. Press Ctrl-C to exit.\n";
return;
}
const void *body;
size_t body_len;
resp->get_parsed_body(&body, &body_len); // get_parsed_body找到响应报文的报文体
SeriesContext * context = static_cast<SeriesContext *>(series_of(httpTask)->get_context());
context->body_len = body_len;
cerr << "url = " << context->url << ", len = " << context->body_len << "\n";
}
void parallelCallback(const ParallelWork * parallelWork){
cerr << "parallel callback\n";
string name;
size_t body_len = 0;
for(int i = 0; i < 3; ++i){
// 找到内部的(已经执行完成不可修改的)序列
const SeriesWork * series = parallelWork->series_at(i);
SeriesContext * context = static_cast<SeriesContext *>(series->get_context());
cerr << "i = " << i << "url = " << context->url << ", len = " << context->body_len << "\n";
if(body_len < context->body_len){
body_len = context->body_len;
name = context->url;
}
delete context;
}
cerr << "longest body_len url = " << name << " body_len = " << body_len <<"\n";
WFRedisTask * redisTask = WFTaskFactory::create_redis_task(
"redis://127.0.0.1:6379",10,nullptr
);
redisTask->get_req()->set_request("SET",{name,std::to_string(body_len)});
series_of(parallelWork)->push_back(redisTask);
}
int main(){
signal(SIGINT,handler);
// 创建一个空的并行任务
ParallelWork * parallelWork = Workflow::create_parallel_work(parallelCallback);
// 创建多个小序列
vector<string> urls = {
"http://www.taobao.com",
"http://www.jd.com",
"http://www.baidu.com"
};
for(int i = 0; i < 3; ++i){
// 创建一个http任务
WFHttpTask * httpTask = WFTaskFactory::create_http_task(urls[i],10,10,httpCallback);
// 根据http任务创建小序列
SeriesWork * series = Workflow::create_series_work(httpTask,nullptr);
// 往序列中加一个context
SeriesContext *context = new SeriesContext;
context->url = urls[i];
series->set_context(context);
// 把小序列加入并行任务
parallelWork->add_series(series);
}
parallelWork->start();
waitGroup.wait();
cout << "finish!\n";
}
作业:
01 存在下列的redis键值映射关系,使用workflow的redis任务和序列,假如只知道"x1",如何找到最终的"100"?"x1" --> "x2""x2" --> "x3""x3" --> "x4""x4" --> "100"
// wait_group 实现有条件的等待 #include <workflow/WFFacilities.h> #include <workflow/WFTaskFactory.h> #include <iostream> #include <signal.h> using std::cout; using std::cerr; static WFFacilities::WaitGroup waitGroup(1); void handler(int signum){ cout << "done\n"; waitGroup.done(); } void redisCallback(WFRedisTask *redisTask) { protocol::RedisRequest *req = redisTask->get_req(); protocol::RedisResponse *resp = redisTask->get_resp(); int state = redisTask->get_state(); int error = redisTask->get_error(); // val用来保存redis执行的结果 protocol::RedisValue val; switch (state){ case WFT_STATE_SYS_ERROR: cerr <<"system error: " << strerror(error) << "\n"; break; case WFT_STATE_DNS_ERROR: cerr <<"DNS error: " << gai_strerror(error) << "\n"; break; case WFT_STATE_SSL_ERROR: cerr <<"SSL error\n"; break; case WFT_STATE_TASK_ERROR: cerr <<"Task error\n"; break; case WFT_STATE_SUCCESS: resp->get_result(val);// 将redis的执行结果保存起来 if (val.is_error()){ cerr << "Error reply. Need a password?\n"; state = WFT_STATE_TASK_ERROR; } break; } if (state != WFT_STATE_SUCCESS){ cerr << "Failed. Press Ctrl-C to exit.\n"; return; } if(val.is_string()&& val.string_value()!="100"){ cerr<<"100 is not found"<<val.string_value()<<"\n"; WFRedisTask* newtask=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback); newtask->get_req()->set_request("GET",{val.string_value()}); series_of(redisTask)->push_back(newtask); }else{ cerr<<"100 is found\n"; } } int main(){ signal(SIGINT,handler); // 创建任务 WFRedisTask * redisTask = WFTaskFactory::create_redis_task( "redis://127.0.0.1:6379", 10, redisCallback ); // 找到请求 protocol::RedisRequest * req = redisTask->get_req(); req->set_request("GET", {"x1"}); // 将任务交给框架 redisTask->start(); waitGroup.wait(); cout << "finish!\n"; }
02 读取某个网站的内容,并且存入redis服务端当中(比如先访问淘宝,再set www.taobao.com 淘宝的html内容)
// wait_group 实现有条件的等待
#include <workflow/WFFacilities.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/Workflow.h>
#include <iostream>
#include <signal.h>
using std::cout;
using std::cerr;
using std::vector;
using std::string;
struct SeriesContext{
string url;
size_t body_len;
};
static WFFacilities::WaitGroup waitGroup(1);
void handler(int signum){
cout << "done\n";
waitGroup.done();
}
void httpCallback(WFHttpTask *httpTask){
protocol::HttpResponse *resp = httpTask->get_resp(); // 获取响应
int state = httpTask->get_state(); // 获取状态
int error = httpTask->get_error(); // 获取错误原因
switch (state){
case WFT_STATE_SYS_ERROR:
cerr <<"system error: " << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR:
cerr <<"DNS error: " << gai_strerror(error) << "\n";
break;
case WFT_STATE_SSL_ERROR:
cerr <<"SSL error\n";
break;
case WFT_STATE_TASK_ERROR:
cerr <<"Task error\n";
break;
case WFT_STATE_SUCCESS:
break;
}
if (state != WFT_STATE_SUCCESS){
cerr <<"Failed. Press Ctrl-C to exit.\n";
return;
}
const void *body;
size_t body_len;
resp->get_parsed_body(&body, &body_len); // get_parsed_body找到响应报文的报文体
//创建一个redis任务
WFRedisTask* redistask=WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,nullptr);
redistask->get_req()->set_request("SET",{"http://www.baidu.com",static_cast<const char*>(body)});
series_of(httpTask)->push_back(redistask);
}
int main(){
signal(SIGINT,handler);
WFHttpTask * httpTask = WFTaskFactory::create_http_task("http://www.baidu.com",10,10,httpCallback);
/* SeriesWork * series = Workflow::create_series_work(httpTask,nullptr); */
httpTask->start();
waitGroup.wait();
cout << "finish!\n";
}
03 阅读下面的代码并尝试添加注释
#include <workflow/WFFacilities.h>
#include <workflow/MySQLUtil.h>
#include <workflow/MySQLResult.h>
#include <iostream>
#include <signal.h>
using std::string;
using std::cerr;
static WFFacilities::WaitGroup waitGroup(1);
void sighandler(int signum){
std::cout << "signum = " << signum << "\n";
waitGroup.done();
}
void mysqlCallback(WFMySQLTask * mysqlTask){
if(mysqlTask->get_state() != WFT_STATE_SUCCESS){
// 在系统层面报错,权限or密码
cerr << "error_msg = " << WFGlobal::get_error_string(mysqlTask->get_state(), mysqlTask->get_error()) << "\n";
return;
}
protocol::MySQLResponse * resp = mysqlTask->get_resp();
if(resp->get_packet_type() == MYSQL_PACKET_ERROR){
// 在SQL语句报错
cerr << "error_code = " << resp->get_error_code() << " error_msg = " << resp->get_error_msg() << "\n";
return;
}
protocol::MySQLResultCursor cursor(resp);
do{
if(cursor.get_cursor_status() == MYSQL_STATUS_OK){
// 写类型的SQL语句
cerr << "write \n";
cerr << cursor.get_affected_rows() << " rows affected\n";
}
else if(cursor.get_cursor_status() == MYSQL_STATUS_GET_RESULT){
// 读类型的SQL语句
cerr << "read \n";
// 读表头 列的信息 field
const protocol::MySQLField * const * fieldArr;
fieldArr = cursor.fetch_fields();
for(int i = 0; i < cursor.get_field_count(); ++i){
cerr << "db = " << fieldArr[i]->get_db()
<< " table = " << fieldArr[i]->get_table()
<< " name = " << fieldArr[i]->get_name()
<< " type = " << datatype2str(fieldArr[i]->get_data_type()) << "\n";
}
// 读表的内容 每一行每一列
// bool fetch_all(std::vector<std::vector<MySQLCell>>& rows);
std::vector<std::vector<protocol::MySQLCell>> rows;
cursor.fetch_all(rows);
for(auto &row:rows){
for(auto &cell:row){
if(cell.is_int()){
cerr << cell.as_int();
}
else if(cell.is_string()){
cerr << cell.as_string();
}
else if(cell.is_datetime()){
cerr << cell.as_datetime();
}
cerr << "\t";
}
cerr << "\n";
}
}
}while(cursor.next_result_set()); //mysql 任务支持一个任务处理多个SQL语句
}
int main(){
signal(SIGINT,sighandler);
WFMySQLTask * mysqlTask = WFTaskFactory::create_mysql_task("mysql://root:123@localhost",1,mysqlCallback);
string sql = "insert into mycloud.tbl_user_token (user_name,user_token) values ('Caixukun','singdancerap');";
//string sql;
sql += "select * from mycloud.tbl_user_token;";
mysqlTask->get_req()->set_query(sql);
mysqlTask->start();
waitGroup.wait();
return 0;
}
#include <workflow/WFFacilities.h> // 包含 WFFacilities 库
#include <workflow/MySQLUtil.h> // 包含 MySQL 相关工具库
#include <workflow/MySQLResult.h> // 包含 MySQL 结果处理库
#include <iostream> // 包含输入输出流库
#include <signal.h> // 包含信号处理库
using std::string; // 使用 string 类型
using std::cerr; // 使用 cerr 输出错误信息
// 创建一个 WaitGroup 对象,用于同步
static WFFacilities::WaitGroup waitGroup(1);
// 定义信号处理函数,用于处理 SIGINT 信号
void sighandler(int signum){
std::cout << "signum = " << signum << "\n"; // 输出接收到的信号编号
waitGroup.done(); // 完成 WaitGroup 的工作,结束程序
}
// MySQL 任务的回调函数
void mysqlCallback(WFMySQLTask * mysqlTask){
// 检查任务状态是否成功
if(mysqlTask->get_state() != WFT_STATE_SUCCESS){
// 输出系统级错误信息,如权限或密码错误
cerr << "error_msg = " << WFGlobal::get_error_string(mysqlTask->get_state(), mysqlTask->get_error()) << "\n";
return;
}
// 获取 MySQL 响应对象
protocol::MySQLResponse * resp = mysqlTask->get_resp();
// 检查返回的数据包类型是否为错误类型
if(resp->get_packet_type() == MYSQL_PACKET_ERROR){
// 输出 SQL 语句执行中的错误码和错误信息
cerr << "error_code = " << resp->get_error_code() << " error_msg = " << resp->get_error_msg() << "\n";
return;
}
// 创建 MySQL 结果游标对象
protocol::MySQLResultCursor cursor(resp);
// 循环处理结果集
do{
// 检查游标状态
if(cursor.get_cursor_status() == MYSQL_STATUS_OK){
// 如果是写操作,输出受影响的行数
cerr << "write \n";
cerr << cursor.get_affected_rows() << " rows affected\n";
}
else if(cursor.get_cursor_status() == MYSQL_STATUS_GET_RESULT){
// 如果是读操作
cerr << "read \n";
// 读取字段信息
const protocol::MySQLField * const * fieldArr;
fieldArr = cursor.fetch_fields();
for(int i = 0; i < cursor.get_field_count(); ++i){
// 输出数据库、表名、字段名及其数据类型
cerr << "db = " << fieldArr[i]->get_db()
<< " table = " << fieldArr[i]->get_table()
<< " name = " << fieldArr[i]->get_name()
<< " type = " << datatype2str(fieldArr[i]->get_data_type()) << "\n";
}
// 读取表的所有内容
std::vector<std::vector<protocol::MySQLCell>> rows;
cursor.fetch_all(rows); // 获取所有行
// 遍历每一行和每一个单元格
for(auto &row : rows){
for(auto &cell : row){
// 根据单元格的数据类型输出相应的内容
if(cell.is_int()){
cerr << cell.as_int();
}
else if(cell.is_string()){
cerr << cell.as_string();
}
else if(cell.is_datetime()){
cerr << cell.as_datetime();
}
cerr << "\t"; // 输出制表符
}
cerr << "\n"; // 换行
}
}
} while(cursor.next_result_set()); // 支持一个任务处理多个 SQL 语句
}
int main(){
signal(SIGINT, sighandler); // 注册 SIGINT 信号处理函数
// 创建 MySQL 任务并指定连接字符串和回调函数
WFMySQLTask * mysqlTask = WFTaskFactory::create_mysql_task("mysql://root:123@localhost", 1, mysqlCallback);
// 构造 SQL 查询语句
string sql = "insert into mycloud.tbl_user_token (user_name, user_token) values ('Caixukun', 'singdancerap');";
sql += "select * from mycloud.tbl_user_token;"; // 在插入后选择用户令牌
// 设置 MySQL 任务的查询
mysqlTask->get_req()->set_query(sql);
// 启动 MySQL 任务
mysqlTask->start();
// 等待 WaitGroup 完成
waitGroup.wait();
return 0; // 返回 0 表示程序正常结束
}