缓存穿透、缓存击穿、缓存雪崩

时间:2022-10-23 21:51:59

1 缓存穿透

(1)什么是缓存穿透

缓存穿透是指缓存和数据库中的没有的数据,而用户不断发起请求,由于缓存时不命中时被动写入的,处于容错考虑,如果从缓存查不到的数据则不写入缓存,这将会导致不存再的数据每次请求都要到存储层(DAO和SQL)去查询,这样缓存就是去意义了。再流量大的时候,SQL数据库的压力就太大了。如果有人利用不存再的key频繁攻击我们的应用,这就是性能漏洞。

例如发起id为”-1“的数据请求,这些数据时不存在的,用户如果时故意为之,我们的系统就会遭受攻击。

(2)解决方法

接口层增加校验,例如:用户权限校验,id做范围校验,id<=0的请求直接拦截。

从缓存取不到数据时,如果在数据库也没有渠道,这时可以将key-value对写为key-null,缓存有效时间可以设置短一些,如30秒(缓存时间太长会导致正常情况也无法使用),这也是可以方式黑客利用缓存穿透进行攻击的有效手段。

2 缓存击穿

2.1 什么是缓存击穿

一个存在value的key,在缓存创建(或过期)的一刻,同时有大量的请求,这些请求都会击穿到DB,造成瞬时DB请求量大,压力骤增。缓存击穿只会发生在高并发的时候,就是当有10000 个并发进行查询数据的时候,我们一般都会先去redis里面查询进行数据,但是如果redis里面没有这个数据的时候,那么这10000个并发里面就会有很大一部分并发会一下子都去mysql数据库里面进行查询了。这样给数据库带来巨大的压力。

例如以下示例:

(1) 业务类代码

    @Override
    public Category queryById(int id) {
        objRedisTemplate.setKeySerializer(RedisSerializer.string());
        Category category = (Category) objRedisTemplate.opsForValue().get("category-"+id);
        if(category==null) {
            //双重检查没有才去SQL数据库查
            System.out.println("从SQL数据库读取...");
            category = categoryMapper.selectById(id);
            objRedisTemplate.opsForValue().set("category-"+id, category, 5, TimeUnit.MINUTES);
        }else{
            System.out.println("从Redis缓存读取...");
        }
        return category;
    }

(2)控制器代码模拟同时1000个并发请求,导致缓存击穿

    //模拟缓存击穿
    @RequestMapping("/breakdown")
    @ResponseBody
    public String breakdown(Integer id){
        //初始化线程池执行器
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //利用线程池模拟并发读取1000次
        for(int i=0; i<1000; i++){
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    Category category = categoryService.queryById(id);
                }
            });
        }
        executorService.shutdown();
        return "success";
    }

印结果打印了很多查询数据库和查询缓存,此时也就说明10000个并发里面有很多去查询了数据库,这就是高并发引起的问题。也就是缓存击穿。我们怎么解决缓存击穿呢,即使10000个并发过来,然后这10000个并发需要的数据在redis里面都没有,那么我们应该第一个线程查询数据里面的数据,然后把这个数据给放到redis里面,然后剩下的9999个线程都到redis里面查询,这样就解决了缓存击穿。

2.2 使用双重锁检查

使用锁来解决缓存击穿问题,而且叫做双重检测锁,为什么叫做双重检测锁呢,因为有两个if语句,第一个if语句就是为了减少走同步代码块,因为如果换成里面存在想要的数据,就直接获取,所以有两个if语句。第一次线程查询把数据放到redis缓存之后,剩下的线程当走到下面的同步代码块的时候,需要在查询一下缓存里面的数据就会发现刚刚第一个线程放到redis里面的数据了。

    @Override
    public Category queryById(int id) {

        redisTemplate.setKeySerializer(RedisSerializer.string());
        //使用双重检查,防止缓存被击穿
        //第一次检查
        Category category = (Category) redisTemplate.opsForValue().get("category-"+id);
        if(category==null) {
            //加同步锁
            synchronized (this){
                //加锁后再次检查
                category = (Category) redisTemplate.opsForValue().get("category-"+id);
                if(category==null) {
                    //双重检查没有才去SQL数据库查
                    System.out.println("从SQL数据库读取...");
                    category = categoryMapper.selectById(id);
                    redisTemplate.opsForValue().set("category-"+id, category, 5, TimeUnit.MINUTES);
                }
            }
        }else{
            System.out.println("从Redis缓存读取...");
        }
        return category;
    }

3 缓存雪崩

缓存雪崩是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。

解决:原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

4完整代码

@Override
public  Category queryOneWithString(int id) {
    redisTemplate.setKeySerializer(RedisSerializer.string());
    Category category = (Category) redisTemplate.opsForValue().get("category");
    /*解决缓存穿透问题,存 null 改变 判断方式*/
    if (redisTemplate.hasKey("category")){   //第一次检查 第一步解决缓存穿透
        /*1 判断 redis中 是否存在数据*/
        System.out.println("从 redis中获取数据");
        return category;
    }else {
        /*解决 缓存 击穿*/
        synchronized (this){
            /*再次 查询 是否 存在数据*/
            if (!redisTemplate.hasKey("category")){    第二次检查  第二步 解决缓存击穿   设置双重锁
                System.out.println("从mysql中获取数据");
                category = categoryDao.selectById(id);
                redisTemplate.opsForValue().set("category",category);
                if (category == null){
                    redisTemplate.expire("category",30,TimeUnit.SECONDS);//设置空白缓存过期30秒
                }
                /*解决缓存雪崩: 设置不同的过期时间 随机*/   //第三步 解决 缓存雪崩 防止大量缓存过期
                Random random = new Random();
                int suiji = random.nextInt(10);
                System.out.println("随机数"+suiji  );

                redisTemplate.expire("category",5+suiji,TimeUnit.MINUTES);
            }else {
                /*再从redis中获取*/
                category = (Category) redisTemplate.opsForValue().get("category");
            }
        }
        return  category;
    }
}

5 项目代码位置

E:\09.redis\redis-samplate\001-springboot-redis

6 笔记代码


import com.powernode.dao.CategoryDao;
import com.powernode.model.Category;
import com.powernode.service.CategoryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/*
* service:业务逻辑处理,从数据源获取数据,现在有两个数据源
* mysql:持久化,性能差
* redis:不擅长持久化,性能极高
*结合两种数据库的优势
* */
@Service
public class CategoryServiceImpl implements CategoryService {

    @Autowired
    private CategoryDao categoryDao;

    @Autowired
    private RedisTemplate redisTemplate;

    /*
    redisTemplate.opsForValue();//操作字符串
redisTemplate.opsForHash();//操作hash
redisTemplate.opsForList();//操作list
redisTemplate.opsForSet();//操作set
redisTemplate.opsForZSet();//操作有序set

RedisTemplate 和  stringredistemplate
如果你想使用默认的配置来操作redis,则如果操作的数据是字节数组,
就是用redistemplate,如果操作的数据是明文,使用stringredistemplate。

opsForHash() 和 boundHashOps
  获取a,然后获取b,然后删除c,对同一个key有多次操作,按照opsForHash()的写法
    每次都是redisTemplate.opsForHash().xxx("key","value")写法很是啰嗦
    int result = (Integer) redisTemplate.opsForHash().get("hash-key","a");
    result = (Integer)redisTemplate.opsForHash().get("hash-key","b");
    redisTemplate.opsForHash().delete("hash-key","c");

     * boundHashOps()则是直接将key和boundHashOperations对象进行了绑定,
     * 后续直接通过boundHashOperations对象进行相关操作即可,写法简洁,不需要
     * 每次都显式的将key写出来
    BoundHashOperations<String, String, Object> boundHashOperations = redisTemplate.boundHashOps("hash-key");
     */
    @Override
    public List<Category> queryAll() {
        redisTemplate.setKeySerializer(RedisSerializer.string());

        /*1 判断 redis中 是否存在数据*/
        List<Category> categorys = (List<Category>) redisTemplate.opsForValue().get("categorys");
        /*2存在 则 从redis中 获取数据 返回*/
        if (categorys != null){
            System.out.println("从 redis中获取数据");
            return categorys;
        }else {
            System.out.println("从mysql中获取数据");
            /*3 redis中不存在,1 从mysql中获取数据  2 将数据 存入 redis 用于下次查询时 从redis中获取*/
            List<Category> categories = categoryDao.selectAll();
            redisTemplate.opsForValue().set("categorys",categories,5, TimeUnit.MINUTES);
            return  categories;
        }
    }


    @Override
    public List<Category> queryAllByHash() {
        redisTemplate.setKeySerializer(RedisSerializer.string());
        /*1 判断 redis中 是否存在数据*/
        List<Category> categorys = (List<Category>) redisTemplate.opsForHash().values("hash:category");
        System.out.println(categorys.size());
        /*2存在 则 从redis中 获取数据 返回*/
        if (categorys.size()>0){
            System.out.println("从 redis中获取数据");
            return categorys;
        }else {
            System.out.println("从mysql中获取数据");
            categorys = categoryDao.selectAll();
            categorys.forEach(x->{
                redisTemplate.opsForHash().put("hash:category",x.getId(),x);
            });
            redisTemplate.expire("hash:category",5,TimeUnit.MINUTES);
            return  categorys;
        }
    }

    @Override
    public Category queryOne(int id) {
        redisTemplate.setKeySerializer(RedisSerializer.string());
        /*1 判断 redis中 是否存在数据*/
        Category category = (Category) redisTemplate.opsForHash().get("hash:category", id);
        /*2存在 则 从redis中 获取数据 返回*/
        if (category != null){
            System.out.println("从 redis中获取数据");
            return category;
        }else {
            System.out.println("从mysql中获取数据");
            category = categoryDao.selectById(id);
            redisTemplate.opsForHash().put("hash:category",category.getId(),category);
            redisTemplate.expire("hash:category",5,TimeUnit.MINUTES);
            return  category;
        }
    }


    @Override
    public  Category queryOneWithString(int id) {
        redisTemplate.setKeySerializer(RedisSerializer.string());
        Category category = (Category) redisTemplate.opsForValue().get("category");
        /*解决缓存穿透问题,存 null 改变 判断方式*/
        if (redisTemplate.hasKey("category")){   //第一次检查 第一步解决缓存穿透
            /*1 判断 redis中 是否存在数据*/
            System.out.println("从 redis中获取数据");
            return category;
        }else {
            /*解决 缓存 击穿*/
            synchronized (this){
                /*再次 查询 是否 存在数据*/
                if (!redisTemplate.hasKey("category")){    第二次检查  第二步 解决缓存击穿   设置双重锁
                    System.out.println("从mysql中获取数据");
                    category = categoryDao.selectById(id);
                    redisTemplate.opsForValue().set("category",category);
                    if (category == null){
                        redisTemplate.expire("category",30,TimeUnit.SECONDS);//设置空白缓存过期30秒
                    }
                    /*解决缓存雪崩: 设置不同的过期时间 随机*/   //第三步 解决 缓存雪崩 防止大量缓存过期
                    Random random = new Random();
                    int suiji = random.nextInt(10);
                    System.out.println("随机数"+suiji  );

                    redisTemplate.expire("category",5+suiji,TimeUnit.MINUTES);
                }else {
                    /*再从redis中获取*/
                    category = (Category) redisTemplate.opsForValue().get("category");
                }
            }
            return  category;
        }
    }


    /*增删改  删掉redis数据 实现 数据同步-------------高并发redis双删  1先删redis  2再删mysql 3 延迟删除 redis*/
}