我们在添加商品时需要与索引库进行同步,这样每添加一个商品索引库就多一个文档,这样做的好处是不用把数据库中的所有数据进行同步,大大提高了性能节约了时间。
我们要做的是当添加商品的时候发送activemq消息,至于发送什么类型的activemq消息则要根据实际应用场景来定,由于添加商品涉及到同步缓存、同步索引库、添加静态页面等操作,也就是一个消息被多个消费者所消费,显然发送topic消息更为合适。
要发送topic消息,就要在activemq的配置文件中做下配置,我们将原来配置的topic做下简单的修改,根据命名有意义原则,我们给<bean>起名为"itemAddTopic",给消息起名为"item-add-topic",如下图所示。
修改后topic如下:
<!--这个是主题目的地,一对多的 -->
<bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="item-add-topic" />
</bean>
下一步是我们找到添加商品的实现类"ItemServiceImpl",在该类中注入jmsTemplate和Destination,如下图所示。
然后我们找到添加商品的方法,在添加完商品后,发送消息,这里需要考虑一个问题,那就是消息的内容应该是什么?既然是添加商品,消费者肯定是要知道添加的商品是哪个商品,同时本着简单的原则,我们只需要传新增商品的ID即可,如下图所示。
当前ItemServiceImpl类的所有代码如下:
package com.taotao.service.impl;
import java.util.Date;
import java.util.List;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.taotao.common.pojo.EasyUIDataGridResult;
import com.taotao.common.pojo.TaotaoResult;
import com.taotao.common.utils.IDUtils;
import com.taotao.mapper.TbItemDescMapper;
import com.taotao.mapper.TbItemMapper;
import com.taotao.pojo.TbItem;
import com.taotao.pojo.TbItemDesc;
import com.taotao.pojo.TbItemExample;
import com.taotao.service.ItemService;
@Service
public class ItemServiceImpl implements ItemService {
@Autowired
private TbItemMapper itemMapper;
@Autowired
private TbItemDescMapper itemDescMapper;
@Autowired
private JmsTemplate jmsTemplate;
@Resource(name="itemAddTopic")
private Destination destination;
@Override
public TbItem getItemById(long itemId) {
TbItem tbItem = itemMapper.selectByPrimaryKey(itemId);
return tbItem;
}
@Override
public EasyUIDataGridResult getItemList(int page, int rows) {
//设置分页信息
PageHelper.startPage(page, rows);
//执行查询
TbItemExample example = new TbItemExample();
List<TbItem> list = itemMapper.selectByExample(example);
//获取查询结果
PageInfo<TbItem> pageInfo = new PageInfo<>(list);
EasyUIDataGridResult result = new EasyUIDataGridResult();
result.setRows(list);
result.setTotal(pageInfo.getTotal());
//返回结果
return result;
}
@Override
public TaotaoResult createItem(TbItem tbItem, String desc) throws Exception{
//生成商品ID
long itemId = IDUtils.genItemId();
//补全item的属性
tbItem.setId(itemId);
//商品状态,1-正常,2-下架,3-删除
tbItem.setStatus(((byte) 1));
tbItem.setCreated(new Date());
tbItem.setUpdated(new Date());
itemMapper.insert(tbItem);
//添加商品描述
insertItemDesc(itemId, desc);
//发送activemq消息
jmsTemplate.send(destination,new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(itemId+"");
return textMessage;
}
});
return TaotaoResult.ok();
}
//添加商品描述
private void insertItemDesc(long itemId,String desc){
//创建一个商品描述表对应的pojo
TbItemDesc itemDesc = new TbItemDesc();
//补全pojo的属性
itemDesc.setItemId(itemId);
itemDesc.setItemDesc(desc);
itemDesc.setCreated(new Date());
itemDesc.setUpdated(new Date());
//向商品描述表插入数据
itemDescMapper.insert(itemDesc);
}
}
上面是发送activemq消息,下面我们再来写下接收消息,接收消息是在taotao-search-service工程,我们先要做的事情是修改Mybatis文件,添加一个根据商品ID来查询商品详情的方法,如下图所示。
上图添加的代码如下:
<select id="getItemById" parameterType="long" resultType="com.taotao.common.pojo.SearchItem">
SELECT
a.id,
a.title,
a.sell_point,
a.price,
a.image,
b. NAME item_category_name,
c.item_desc
FROM
tb_item a
LEFT JOIN tb_item_cat b ON a.cid = b.id
LEFT JOIN tb_item_desc c ON a.id = c.item_id
WHERE
a.`status` = 1
AND a.id = #{itemId}
</select>
接着,在接口类中添加更加商品ID查询商品详情的接口,如下图所示。
现在我启动的是单机版的solr服务器,因此要保证solr配置文件当前切换到单机版,如下图所示。
下面我们需要创建一个监听器类ItemAddMessageListener,在该类中处理同步索引库的逻辑,如下图所示。
ItemAddMessageListener.java类代码如下:
package com.taotao.search.listener;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.common.SolrInputDocument;
import org.springframework.beans.factory.annotation.Autowired;
import com.taotao.common.pojo.SearchItem;
import com.taotao.search.mapper.SearchItemMapper;
public class ItemAddMessageListener implements MessageListener {
@Autowired
private SearchItemMapper searchItemMapper;
@Autowired
private SolrServer solrServer;
@Override
public void onMessage(Message message) {
try {
//从消息中取出商品id
TextMessage textMessage = (TextMessage)message;
String text = textMessage.getText();
long itemId = Long.valueOf(text);
//根据商品id查询商品详情,这里需要注意的是消息发送方法
//有可能还没有提交事务,因此这里是有可能取不到商品信息
//的,为了避免这种情况出现,我们最好等待事务提交,这里
//我采用3次尝试的方法,每尝试一次休眠一秒
SearchItem searchItem = null;
for(int i=0;i<3;i++){
try {
Thread.sleep(1000);//休眠一秒
searchItem = searchItemMapper.getItemById(itemId);
//如果获取到了商品信息,那就退出循环。
if(searchItem != null){
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
//创建文档对象
SolrInputDocument document = new SolrInputDocument();
//向文档对象中添加域
document.setField("id", searchItem.getId());
document.setField("item_title", searchItem.getTitle());
document.setField("item_sell_point", searchItem.getSell_point());
document.setField("item_price", searchItem.getPrice());
document.setField("item_image", searchItem.getImage());
document.setField("item_category_name", searchItem.getItem_category_name());
document.setField("item_desc", searchItem.getItem_desc());
//把文档写入索引库
solrServer.add(document);
//提交
solrServer.commit();
} catch (Exception e) {
e.printStackTrace();
}
}
}
正如在代码中提到的那样,在添加商品的时候(如下图所示)是涉及到事务的,事务提交之后才能在数据库中查询到商品信息,假如网络不济,造成事务还没提交,接收消息的一端想去查询商品信息,这时显然是查询不到的,为了等待事务提交,采用三次尝试的机制,如上面代码所示。
写完监听器我们再配置下接收的topic(要与taotao-manager-service当中配置的topic一致),以及设置监听器,如下图所示。
applicationContext-activemq.xml文件内容如下:
<?xml version="1.0" encoding="UTF-8"?>在启动工程前,我们需要把学习JMail发送邮件时人为添加的异常给删掉,如下图所示
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.156.30:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 配置生产者 -->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="spring-queue"/>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="item-add-topic" />
</bean>
<!-- 接收消息 -->
<!-- 配置监听器 -->
<bean id="messageListener" class="com.taotao.search.listener.MyMessageListener"/>
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="queueDestination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
<!-- 配置监听器 -->
<bean id="itemAddMessageListener" class="com.taotao.search.listener.ItemAddMessageListener"/>
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="itemAddTopic"/>
<property name="messageListener" ref="itemAddMessageListener"/>
</bean>
</beans>
好了,写完了代码,现在我们来测试一下,我们依次启动taotao-manager、taotao-content、taotao-search、taotao-manager-web、taotao-port-web、taotao-search-web工程。启动完之后,我们访问http://localhost:8081来访问淘淘商城管理后台,添加一款商品,这里我添加的是华为P10手机,如下图所示。
添加完商品后,我们到淘淘商城首页进行搜索,看能不能搜索到我们刚才添加的手机,如下图所示,发现可以正常搜索到刚才添加的华为手机!!说明我们的消息机制没问题。