关于eventmachine,协程在rails里面的使用

时间:2021-04-25 00:18:43

最近在公司实习的时候一直在学习rails和ruby.

在实现一个小需求,从微信的接口拉取粉丝信息然后更新我们的数据库,这是一个不太常用的功能,很久才会跑一次.

但是rails的模式,如果作为普通的客户端来发需求,而且同时发送的请求不可以太多,否则会出错(估计是微信的限制),ruby的httprequest是阻塞操作,一次只会发送一个请求,我查阅资料并且求教一番之后有如下几种可行的思路.

####1.可以用socket来编程,自己模拟部分的http交互,开两个线程,一个负责不停的发送,另外一个不停的接受并且解析.这样速度很快,但是工作量很大.
####2.单线程加callback,这样可以在一次循环里面发出200个报文,然后等待回调.这实质上就是一个IO多路复用,当然我们可以加上多进/程线程来使得更加快速.

EventMachine是一个gem,可以引入callback的方式来处理httprequest.

在考虑方案2的时候,发现如果后续的逻辑比较复杂,那么你的回调就会写的非常复杂,各种嵌套.这很像js里面的callback地狱,面对这样的情况,使用协程可以使得我们的代码按照同步的方式来写异步,这样就非常有利于阅读.具体怎么写协程和Eventmachine结合我也没有写.只是查阅了一些资料.

rails里面要想使用EventMachine还是会有一些问题,其中最需注意的就是一定要写好EventMachine事件循环的结束状态,否则会一直loop导致后续EventMachine循环无法被有效的执行.

给出代码,免得忘:


class WxBindController < ApplicationController

def execute_transcation(user_info_list)

user_info_list.each do |id,attributes|
user_info_list[id] = attributes.select{|k,v| v && v!=""}
end

#user_info_list = user_info_list.compact
#将数据更新到数据库里面
WeiXinBind.Base.connection do transcation
WeiXinBind.update(user_info_list.keys,user_info_list.values)
end
end


def send_request_in_batch(request_url_list,user_info_list)
EM.run do
cnt = 0
request_url_list.each do |url,id|

http = EventMachine::HttpRequest.new(url).get
http.errback {
cnt += 1
if cnt == request_url_list.count - 1
EM.stop
end
}
http.callback {
resp = Oj.load(http.response)
if !resp.blank? && resp["subscribe"] == 1
user_info_list[id] = {nickname:resp["nickname"],headimg_url:resp["headimgurl"],
sex:resp["sex"], province:resp["province"],
city:resp["city"], country:resp["country"]}
end
cnt += 1
if cnt == request_url_list.count - 1
EM.stop
end
}
end
end
end


#用来更新weixin_bind里面的数据
def update

access_token = params[:access_token]
cate_id = params[:cate_id]
issue_id = params[:issue_id]

user_info_list = {}
request_url_list = []

users = WeiXinBind.select("weixin_bind.*").joins("inner join live_users as lu using(user_id)").where("lu.cate_id = ? and lu.issue_id =?",cate_id,issue_id)
users.each do |wx|
openid = wx.service_openid
request_url_list << ["https://api.weixin.qq.com/cgi-bin/user/info?access_token=#{access_token}&openid=#{openid}&lang=zh_CN",wx.id] unless openid.blank?

if request_url_list.count > 200
send_request_in_batch(request_url_list,user_info_list)
request_url_list.clear
end

if user_info_list.count > 10000
execute_transcation(user_info_list)
user_info_list.clear
end
end

send_request_in_batch(request_url_list,user_info_list)
execute_transcation(user_info_list)
render :text => "OK"
end


#用来测试EventMachine的使用
def mysleep(x)
MyLog.debug(Time.now)
end

end