Java 微信公众号迁移

时间:2023-03-09 21:29:59
Java 微信公众号迁移

背景:公众号换主体,要迁移,粉丝(openId)的业务数据要做处理.

第一步:参照我的另一篇文章,Java 导出微信公众号粉丝。

第二部:数据处理(master-worker模式)

程序主入口:Main

我导出来的粉丝文件格式是:

{
"info":[
{"openId":"ogVous494ltuNmO4zHb1seHeGLSk"}
      ..... 1万条
]
}
package changeOpenId;

import java.util.List;
import java.util.Map;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.odao.weixin.api.support.AccessTokenKit;
import com.odao.weixin.site.cases2017.change.service.ChangeService;
import com.odao.weixin.site.cases2017.push.entity.JsonDataReadUtil; /**
* 多线程转换openId
* @author wangfj
*/
public class ChangeMain {
@SuppressWarnings({ "unchecked", "static-access", "resource" })
public static void main(String[] args) throws Exception {
ApplicationContext appContext = new ClassPathXmlApplicationContext(new String[] {"odao-weixin-site-servlet.xml"});
String token = AccessTokenKit.getTokenNew("APPID", "APP秘钥");
String accesstoken = (String) ((Map) JSON.parseObject(token, Map.class)).get("access_token");
//根据粉丝文件来读取数据
JSONObject openIdJson = JsonDataReadUtil.getReadJsonByPath("第一步导出来的文件");
String info= openIdJson.get("info").toString();
JSONArray jsonArr = JSONObject.parseArray(info);
List<Map<String,Object>> list = jsonArr.toJavaObject(jsonArr, List.class);
   //如果你们是直接操作数据库,可以直接写一个业务类,从数据库获取要转换的数据,如下:
/*ChangeService changeService = (ChangeService) appContext.getBean("changeService");
List<Map<String,Object>> list = changeService.queryRecord(40000,50000);*/
//构造master
ChangeMaster master = new ChangeMaster(new ChangeWorker(),100,accesstoken,appContext);//第二个参数100(worker工作线程数),这个根据你们自己的需求定
for(int i=0;i<list.size();i+=100){//微信转换openId接口,腾讯说是一次只能处理100条
List<Map<String,Object>> newList = list.subList(i, (i+100)>list.size()?list.size():(i+100));
master.submit(newList);
}
//多线程执行
master.execute();
while(true){
if(master.isComplate()){
long allTime = master.getResult();
System.out.println("主线程执行完毕,总耗时:"+allTime+"毫秒");
break;
}
}
} }

Master:

package changeOpenId;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.springframework.context.ApplicationContext; /**
* Master任务指派者,分发者
* @author wangfj
*/
public class ChangeMaster{ private ConcurrentLinkedQueue<List<Map<String,Object>>> workerQueue = new ConcurrentLinkedQueue<List<Map<String,Object>>>(); //存放所有的工作者
private HashMap<String, Thread> workers = new HashMap<String, Thread>(); //存放数据的结果集
private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<String, Object>(); //构造master
public ChangeMaster(Worker worker,int workerCount,String accessToken,ApplicationContext appContext){
worker.setApplicationContext(appContext);
worker.setWorkerQueue(this.workerQueue);
worker.setAccessToken(accessToken);
worker.setResultMap(resultMap);
for(int i=0;i<workerCount;i++){
this.workers.put("执行任务worker"+i,new Thread(worker,"子线程"+i));
}
} //提交
public void submit(List<Map<String,Object>> list){
this.workerQueue.add(list);
} //执行
public void execute(){
for(Map.Entry<String,Thread> me :workers.entrySet()){
me.getValue().start();
}
} //获得执行结果集
public long getResult(){
long result = 0l;
for(Map.Entry<String,Object> me :resultMap.entrySet()){
result += (Long)me.getValue();
}
return result;
} //所有子线程是否执行完毕
public boolean isComplate() {
for(Map.Entry<String,Thread> me :workers.entrySet()){
if(Thread.State.TERMINATED != me.getValue().getState()){
return false;
}
}
return true;
}
}

Worker(转换openId核心代码):

package changeOpenId;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HTTP;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.odao.weixin.site.cases2017.change.service.ChangeService; @Service
public class ChangeWorker extends Worker{ public static String url = "http://api.weixin.qq.com/cgi-bin/changeopenid?access_token="; //微信提供中的openId转换接口 public static long handle(List<Map<String,Object>> list,String accessToken,ApplicationContext appContext) throws Exception {
ChangeService changeService = (ChangeService) appContext.getBean("changeService");//我处理数据的业务类
long end = System.currentTimeMillis();
JSONObject params = new JSONObject();
params.put("from_appid", "xxxx");//此处from_appid为原帐号的appid ArrayList<String> openIds = new ArrayList<String>();
for(int i=0;i<list.size();i++){
openIds.add(list.get(i).get("openId").toString());
} params.put("openid_list", openIds);//需要转换的openid,即第1步中拉取的原帐号用户列表,这些必须是旧账号目前关注的才行,否则会出错;一次最多100个,不能多。格式:["openIdA","openIdB"] String reslut = JsonSMS(params.toString(),accessToken);
JSONObject json = (JSONObject) JSONObject.parse(reslut);
if("ok".equals(json.getString("errmsg"))){
String result_list = json.get("result_list").toString();
JSONArray arr= JSONObject.parseArray(result_list);
List<Map<String,Object>> obj = arr.toJavaObject(arr, List.class);
if(!obj.get(0).get("err_msg").equals("ori_openid error")){
List<Map<String,Object>> openIdObj = arr.toJavaObject(arr, List.class);
changeService.batchUpdateAccountsMapping(openIdObj);
}
}else{
System.out.println("请求微信转换接口返回异常");
}
return System.currentTimeMillis()-end;
} public static String JsonSMS(String postData, String token) {
String result = "";
try {
//发送POST请求
URL urls = new URL(url.concat(token));
HttpURLConnection conn = (HttpURLConnection) urls.openConnection();
conn.setRequestMethod("POST");
conn.setRequestProperty("Content-Type", "application/json");
conn.setRequestProperty("Connection", "Keep-Alive");
conn.setUseCaches(false);
conn.setDoOutput(true);
conn.setRequestProperty("Content-Length", "" + postData.length());
OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream(), "UTF-8");
out.write(postData);
out.flush();
out.close();
//获取响应状态
if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
System.out.println("connect failed!");
return "";
}
//获取响应内容体
String line;
BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), "utf-8"));
while ((line = in.readLine()) != null) {
result += line + "\n";
}
in.close();
} catch (IOException e) {
e.printStackTrace(System.out);
}
return result;
}
}
package changeOpenId;

import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.springframework.context.ApplicationContext; public class Worker implements Runnable{ private ConcurrentLinkedQueue<List<Map<String,Object>>> workerQueue; private ConcurrentHashMap<String,Object> resultMap; private ApplicationContext appContext; private String accessToken; public void setWorkerQueue(ConcurrentLinkedQueue<List<Map<String,Object>>> workerQueue) {
this.workerQueue = workerQueue;
} public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
this.resultMap = resultMap;
} public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
} @Override
public void run() {
while(true){
List<Map<String,Object>> input = this.workerQueue.poll();
if(input==null) break;
try {
Random random = new Random();
long time = ChangeWorker.handle(input,accessToken,appContext);
resultMap.put(String.valueOf(random.nextInt(100)), time);
} catch (Exception e) {
e.printStackTrace();
}
}
} @SuppressWarnings("unused")
private static void handle(List<String> list) {} public void setApplicationContext(ApplicationContext appContext) {
this.appContext = appContext; }
}

处理数据的业务类:

package com.odao.weixin.site.cases2017.change.service;

import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service; @Service
public class ChangeService { static final Logger logger = LoggerFactory.getLogger(ChangeService.class); @Autowired
private JdbcTemplate jdbcTemplateWebSiteActivityDB; /**
* 批量更新玩家openId
* @param list
*/
public void batchUpdateAccountsMapping(final List<Map<String,Object>> list) {
String sql = "update 表名 set newOpenId=? ,createTime=getdate() where openId=?";
jdbcTemplateWebSiteActivityDB.batchUpdate(sql, new BatchPreparedStatementSetter() {
public int getBatchSize() {
return list.size();
//这个方法设定更新记录数,通常List里面存放的都是我们要更新的,所以返回list.size();
}
public void setValues(java.sql.PreparedStatement ps, int i) throws SQLException {
try{
ps.setString(1, list.get(i).get("new_openid").toString());
ps.setString(2, list.get(i).get("ori_openid").toString());
}catch(Exception e){ }
}
});
System.out.println(Thread.currentThread().getName()+"成功改变条数:"+list.size());
}
}

业务类操作的表字段最好加上索引,基本上几秒钟几万数据就跑完了。