背景
公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用activemq等消息队列中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发
一、本文涉及知识点
- excel文件读写--阿里easyexcel sdk
- 文件上传、下载--腾讯云对象存储
- 远程服务调用--resttemplate
- 生产者、消费者--redistemplate leftpush和rightpop操作
- 异步处理数据--executors线程池
- 读取网络文件流--httpclient
- 自定义注解实现用户身份认证--jwt token认证, 拦截器拦截标注有@loginrequired注解的请求入口
当然, java实现咯
涉及的知识点比较多, 每一个知识点都可以作为专题进行学习分析, 本文将完整实现呈现出来, 后期拆分与小伙伴分享学习
二、项目目录结构
说明: 数据库dao层放到另一个模块了, 不是本文重点
三、主要maven依赖
1、easyexcel
1
2
3
4
5
6
7
|
<easyexcel-latestversion> 1.1 . 2 -beta4</easyexcel-latestversion>
<dependency>
<groupid>com.alibaba</groupid>
<artifactid>easyexcel</artifactid>
<version>${easyexcel-latestversion}</version>
</dependency>
|
jwt
1
2
3
4
5
|
<dependency>
<groupid>io.jsonwebtoken</groupid>
<artifactid>jjwt</artifactid>
<version> 0.7 . 0 </version>
</dependency>
|
redis
1
2
3
4
5
|
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-redis</artifactid>
<version> 1.3 . 5 .release</version>
</dependency>
|
腾讯cos
1
2
3
4
5
|
<dependency>
<groupid>com.qcloud</groupid>
<artifactid>cos_api</artifactid>
<version> 5.4 . 5 </version>
</dependency>
|
四、流程
- 用户上传文件
- 将文件存储到腾讯cos
- 将上传后的文件id及上传记录保存到数据库
- redis生产一条导入消息, 即保存文件id到redis
- 请求结束, 返回"处理中"状态
- redis消费消息
- 读取cos文件, 异步处理数据
- 将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为"处理完成"
- 客户端轮询查询处理状态, 并可以下载错误文件
- 结束
五、实现效果
上传文件
数据库导入记录
导入的数据
下载错误文件
错误数据提示
查询导入记录
六、代码实现
1、导入excel控制层
1
2
3
4
5
6
|
@loginrequired
@requestmapping (value = "doimport" , method = requestmethod.post)
public jsonresponse doimport( @requestparam ( "file" ) multipartfile file, httpservletrequest request) {
pluser user = getuser(request);
return orderimportservice.doimport(file, user.getid());
}
|
2、service层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
@override
public jsonresponse doimport(multipartfile file, integer userid) {
if ( null == file || file.isempty()) {
throw new serviceexception( "文件不能为空" );
}
string filename = file.getoriginalfilename();
if (!checkfilesuffix(filename)) {
throw new serviceexception( "当前仅支持xlsx格式的excel" );
}
// 存储文件
string fileid = savetooss(file);
if (stringutils.isblank(fileid)) {
throw new serviceexception( "文件上传失败, 请稍后重试" );
}
// 保存记录到数据库
saverecordtodb(userid, fileid, filename);
// 生产一条订单导入消息
redisproducer.produce(rediskey.orderimportkey, fileid);
return jsonresponse.ok( "导入成功, 处理中..." );
}
/**
* 校验文件格式
* @param filename
* @return
*/
private static boolean checkfilesuffix(string filename) {
if (stringutils.isblank(filename) || filename.lastindexof( "." ) <= 0 ) {
return false ;
}
int pointindex = filename.lastindexof( "." );
string suffix = filename.substring(pointindex, filename.length()).tolowercase();
if ( ".xlsx" .equals(suffix)) {
return true ;
}
return false ;
}
/**
* 将文件存储到腾讯oss
* @param file
* @return
*/
private string savetooss(multipartfile file) {
inputstream ins = null ;
try {
ins = file.getinputstream();
} catch (ioexception e) {
e.printstacktrace();
}
string fileid;
try {
string originalfilename = file.getoriginalfilename();
file f = new file(originalfilename);
inputstreamtofile(ins, f);
filesystemresource resource = new filesystemresource(f);
multivaluemap<string, object> param = new linkedmultivaluemap<>();
param.add( "file" , resource);
responseresult responseresult = resttemplate.postforobject(txossuploadurl, param, responseresult. class );
fileid = (string) responseresult.getdata();
} catch (exception e) {
fileid = null ;
}
return fileid;
}
|
3、redis生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
@service
public class redisproducerimpl implements redisproducer {
@autowired
private redistemplate redistemplate;
@override
public jsonresponse produce(string key, string msg) {
map<string, string> map = maps.newhashmap();
map.put( "fileid" , msg);
redistemplate.opsforlist().leftpush(key, map);
return jsonresponse.ok();
}
}
|
4、redis消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
@service
public class redisconsumer {
@autowired
public redistemplate redistemplate;
@value ( "${txossfileurl}" )
private string txossfileurl;
@value ( "${txossuploadurl}" )
private string txossuploadurl;
@postconstruct
public void init() {
processorderimport();
}
/**
* 处理订单导入
*/
private void processorderimport() {
executorservice executorservice = executors.newcachedthreadpool();
executorservice.execute(() -> {
while ( true ) {
object object = redistemplate.opsforlist().rightpop(rediskey.orderimportkey, 1 , timeunit.seconds);
if ( null == object) {
continue ;
}
string msg = json.tojsonstring(object);
executorservice.execute( new orderimporttask(msg, txossfileurl, txossuploadurl));
}
});
}
}
|
5、处理任务线程类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
public class orderimporttask implements runnable {
public orderimporttask(string msg, string txossfileurl, string txossuploadurl) {
this .msg = msg;
this .txossfileurl = txossfileurl;
this .txossuploadurl = txossuploadurl;
}
}
/**
* 注入bean
*/
private void autowirebean() {
this .resttemplate = beancontext.getapplicationcontext().getbean(resttemplate. class );
this .transactiontemplate = beancontext.getapplicationcontext().getbean(transactiontemplate. class );
this .orderimportservice = beancontext.getapplicationcontext().getbean(orderimportservice. class );
}
@override
public void run() {
// 注入bean
autowirebean();
jsonobject jsonobject = json.parseobject(msg);
string fileid = jsonobject.getstring( "fileid" );
multivaluemap<string, object> param = new linkedmultivaluemap<>();
param.add( "id" , fileid);
responseresult responseresult = resttemplate.postforobject(txossfileurl, param, responseresult. class );
string fileurl = (string) responseresult.getdata();
if (stringutils.isblank(fileurl)) {
return ;
}
inputstream inputstream = httpclientutil.readfilefromurl(fileurl);
list<object> list = excelutil.read(inputstream);
process(list, fileid);
}
/**
* 将文件上传至oss
* @param file
* @return
*/
private string savetooss(file file) {
string fileid;
try {
filesystemresource resource = new filesystemresource(file);
multivaluemap<string, object> param = new linkedmultivaluemap<>();
param.add( "file" , resource);
responseresult responseresult = resttemplate.postforobject(txossuploadurl, param, responseresult. class );
fileid = (string) responseresult.getdata();
} catch (exception e) {
fileid = null ;
}
return fileid;
}
|
说明: 处理数据的业务逻辑代码就不用贴了
6、上传文件到cos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
@requestmapping ( "/txossupload" )
@responsebody
public responseresult txossupload( @requestparam ( "file" ) multipartfile file) throws unsupportedencodingexception {
if ( null == file || file.isempty()) {
return responseresult.fail( "文件不能为空" );
}
string originalfilename = file.getoriginalfilename();
originalfilename = mimeutility.decodetext(originalfilename); // 解决中文乱码问题
string contenttype = getcontenttype(originalfilename);
string key;
inputstream ins = null ;
file f = null ;
try {
ins = file.getinputstream();
f = new file(originalfilename);
inputstreamtofile(ins, f);
key = ifilestorageclient.txossupload( new fileinputstream(f), originalfilename, contenttype);
} catch (exception e) {
return responseresult.fail(e.getmessage());
} finally {
if ( null != ins) {
try {
ins.close();
} catch (ioexception e) {
e.printstacktrace();
}
}
if (f.exists()) { // 删除临时文件
f.delete();
}
}
return responseresult.ok(key);
}
public static void inputstreamtofile(inputstream ins,file file) {
try {
outputstream os = new fileoutputstream(file);
int bytesread = 0 ;
byte [] buffer = new byte [ 8192 ];
while ((bytesread = ins.read(buffer, 0 , 8192 )) != - 1 ) {
os.write(buffer, 0 , bytesread);
}
os.close();
ins.close();
} catch (exception e) {
e.printstacktrace();
}
}
public string txossupload(fileinputstream inputstream, string key, string contenttype) {
key = uuid.getuuid() + "-" + key;
ossutil.txossupload(inputstream, key, contenttype);
try {
if ( null != inputstream) {
inputstream.close();
}
} catch (ioexception e) {
e.printstacktrace();
}
return key;
}
public static void txossupload(fileinputstream inputstream, string key, string contenttype) {
objectmetadata objectmetadata = new objectmetadata();
try {
int length = inputstream.available();
objectmetadata.setcontentlength(length);
} catch (exception e){
logger.info(e.getmessage());
}
objectmetadata.setcontenttype(contenttype);
cosclient.putobject(txbucketname, key, inputstream, objectmetadata);
}
|
7、下载文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
/**
* 腾讯云文件下载
* @param response
* @param id
* @return
*/
@requestmapping ( "/txossdownload" )
public object txossdownload(httpservletresponse response, string id) {
cosobjectinputstream cosobjectinputstream = ifilestorageclient.txossdownload(id, response);
string contenttype = getcontenttype(id);
fileutil.txossdownload(response, contenttype, cosobjectinputstream, id);
return null ;
}
public static void txossdownload(httpservletresponse response, string contenttype, inputstream filestream, string filename) {
fileoutputstream fos = null ;
response.reset();
outputstream os = null ;
try {
response.setcontenttype(contenttype + "; charset=utf-8" );
if (!contenttype.equals(plconstans.filecontenttype.image)){
try {
response.setheader( "content-disposition" , "attachment; filename=" + new string(filename.getbytes( "utf-8" ), "iso8859-1" ));
} catch (unsupportedencodingexception e) {
response.setheader( "content-disposition" , "attachment; filename=" + filename);
logger.error( "encoding file name failed" , e);
}
}
os = response.getoutputstream();
byte [] b = new byte [ 1024 * 1024 ];
int len;
while ((len = filestream.read(b)) > 0 ) {
os.write(b, 0 , len);
os.flush();
try {
if (fos != null ) {
fos.write(b, 0 , len);
fos.flush();
}
} catch (exception e) {
logger.error(e.getmessage());
}
}
} catch (ioexception e) {
ioutils.closequietly(fos);
fos = null ;
} finally {
ioutils.closequietly(os);
ioutils.closequietly(filestream);
if (fos != null ) {
ioutils.closequietly(fos);
}
}
}
|
8、读取网络文件流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
/**
* 读取网络文件流
* @param url
* @return
*/
public static inputstream readfilefromurl(string url) {
if (stringutils.isblank(url)) {
return null ;
}
httpclient httpclient = new defaulthttpclient();
httpget methodget = new httpget(url);
try {
httpresponse response = httpclient.execute(methodget);
if (response.getstatusline().getstatuscode() == 200 ) {
httpentity entity = response.getentity();
return entity.getcontent();
}
} catch (exception e) {
e.printstacktrace();
}
return null ;
}
|
9、excelutil
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
/**
* 读excel
* @param inputstream 文件输入流
* @return list集合
*/
public static list<object> read(inputstream inputstream) {
return easyexcelfactory.read(inputstream, new sheet( 1 , 1 ));
}
/**
* 写excel
* @param data list数据
* @param clazz
* @param savefilepath 文件保存路径
* @throws ioexception
*/
public static void write(list<? extends baserowmodel> data, class <? extends baserowmodel> clazz, string savefilepath) throws ioexception {
file tempfile = new file(savefilepath);
outputstream out = new fileoutputstream(tempfile);
excelwriter writer = easyexcelfactory.getwriter(out);
sheet sheet = new sheet( 1 , 3 , clazz, "sheet1" , null );
writer.write(data, sheet);
writer.finish();
out.close();
}
|
说明: 至此, 整个流程算是完整了, 下面将其他知识点代码也贴出来参考
七、其他
1、@loginrequired注解
1
2
3
4
5
6
7
|
/**
* 在需要登录验证的controller的方法上使用此注解
*/
@target ({elementtype.method})
@retention (retentionpolicy.runtime)
public @interface loginrequired {
}
|
2、mycontrolleradvice
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@controlleradvice
public class mycontrolleradvice {
@responsebody
@exceptionhandler (tokenvalidationexception. class )
public jsonresponse tokenvalidationexceptionhandler() {
return jsonresponse.logininvalid();
}
@responsebody
@exceptionhandler (serviceexception. class )
public jsonresponse serviceexceptionhandler(serviceexception se) {
return jsonresponse.fail(se.getmsg());
}
@responsebody
@exceptionhandler (exception. class )
public jsonresponse exceptionhandler(exception e) {
e.printstacktrace();
return jsonresponse.fail(e.getmessage());
}
}
|
3、authenticationinterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
public class authenticationinterceptor implements handlerinterceptor {
private static final string current_user = "user" ;
@autowired
private userservice userservice;
@override
public boolean prehandle(httpservletrequest request, httpservletresponse response, object handler) {
// 如果不是映射到方法直接通过
if (!(handler instanceof handlermethod)) {
return true ;
}
handlermethod handlermethod = (handlermethod) handler;
method method = handlermethod.getmethod();
// 判断接口是否有@loginrequired注解, 有则需要登录
loginrequired methodannotation = method.getannotation(loginrequired. class );
if (methodannotation != null ) {
// 验证token
integer userid = jwtutil.verifytoken(request);
pluser pluser = userservice.selectbyprimarykey(userid);
if ( null == pluser) {
throw new runtimeexception( "用户不存在,请重新登录" );
}
request.setattribute(current_user, pluser);
return true ;
}
return true ;
}
@override
public void posthandle(httpservletrequest httpservletrequest, httpservletresponse httpservletresponse, object o, modelandview modelandview) throws exception {
}
@override
public void aftercompletion(httpservletrequest httpservletrequest, httpservletresponse httpservletresponse, object o, exception e) throws exception {
}
}
|
4、jwtutil
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
public static final long expiration_time = 2592_000_000l; // 有效期30天
public static final string secret = "pl_token_secret" ;
public static final string header = "token" ;
public static final string user_id = "userid" ;
/**
* 根据userid生成token
* @param userid
* @return
*/
public static string generatetoken(string userid) {
hashmap<string, object> map = new hashmap<>();
map.put(user_id, userid);
string jwt = jwts.builder()
.setclaims(map)
.setexpiration( new date(system.currenttimemillis() + expiration_time))
.signwith(signaturealgorithm.hs512, secret)
.compact();
return jwt;
}
/**
* 验证token
* @param request
* @return 验证通过返回userid
*/
public static integer verifytoken(httpservletrequest request) {
string token = request.getheader(header);
if (token != null ) {
try {
map<string, object> body = jwts.parser()
.setsigningkey(secret)
.parseclaimsjws(token)
.getbody();
for (map.entry entry : body.entryset()) {
object key = entry.getkey();
object value = entry.getvalue();
if (key.tostring().equals(user_id)) {
return integer.valueof(value.tostring()); // userid
}
}
return null ;
} catch (exception e) {
logger.error(e.getmessage());
throw new tokenvalidationexception( "unauthorized" );
}
} else {
throw new tokenvalidationexception( "missing token" );
}
}
|
结语: ok, 搞定,睡了, 好困
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对服务器之家的支持。
原文链接:https://www.cnblogs.com/wangzaiplus/p/10660520.html