利用Redis实现SQL伸缩的方法简介

时间:2021-09-06 08:20:23

缓解行竞争

我们在Sentry开发的早起采用的是sentry.buffers。 这是一个简单的系统,它允许我们以简单的Last Write Wins策略来实现非常有效的缓冲计数器。 重要的是,我们借助它完全消除了任何形式的耐久性 (这是Sentry工作的一个非常可接受的方式)。

操作非常简单,每当一个更新进来我们就做如下几步:

创建一个绑定到传入实体的哈希键(hash key)

使用HINCRBY使计数器值增加

HSET所有的LWW数据(比如 "最后一次见到的")

用当前时间戳ZADD哈希键(hash key)到一个"挂起" set

现在每一个时间刻度 (在Sentry中为10秒钟) 我们要转储(dump)这些缓冲区并且扇出写道(fanout the writes)。 看起来像下面这样:

使用ZRANGE获取所有的key

为每一个挂起的key发起一个作业到RabbitMQ

现在RabbitMQ作业将能够读取和清除哈希表,和“悬而未决”更新已经弹出了一套。有几件事情需要注意:

在下面我们想要只弹出一个设置的数量的例子中我们将使用一组排序(举例来说我们需要那100个旧集合)。

假使我们为了处理一个键值来结束多道排序的作业,这个人会得到no-oped由于另一个已经存在的处理和清空哈希的过程。

该系统能够在许多Redis节点上不断扩展下去仅仅是通过在每个节点上安置把一个'悬置'主键来实现。

我们有了这个处理问题的模型之后,能够确保“大部分情况下”每次在SQL中只有一行能够被马上更新,而这样的处理方式减轻了我们能够预见到的锁问题。考虑到将会处理一个突然产生且所有最终组合在一起进入同一个计数器的数据的场景,这种策略对Sentry用处很多。

速度限制

出于哨兵的局限性,我们必须终结持续的拒绝服务攻击。我们通过限制连接速度来应对这种问题,其中一项是通过Redis支持的。这无疑是在sentry.quotas范围内更直接的实现。

它的逻辑相当直接,如同下面展示的那般:

 

 
  1. def incr_and_check_limit(user_id, limit): 
  2.   key = '{user_id}:{epoch}'.format(user_id, int(time() / 60)) 
  3.    
  4.   pipe = redis.pipeline() 
  5.   pipe.incr(key) 
  6.   pipe.expire(key, 60) 
  7.   current_rate, _ = pipe.execute() 
  8.    
  9.   return int(current_rate) > limit 

我们所阐明的限制速率的方法是 Redis在高速缓存服务上最基本的功能之一:增加空的键字。在高速缓存服务中实现同样的行为可能最终使用这种方法:

 

 
  1. def incr_and_check_limit_memcache(user_id, limit): 
  2.   key = '{user_id}:{epoch}'.format(user_id, int(time() / 60)) 
  3.    
  4.   if cache.add(key, 0, 60): 
  5.     return False 
  6.    
  7.   current_rate = cache.incr(key) 
  8.    
  9.   return current_rate > limit 

事实上我们最终采取这种策略可以使哨兵追踪不同事件的短期数据。在这种情况下,我们通常对用户数据进行排序以便可以在最短的时间内找到最活跃用户的数据。

基本锁

虽然Redis的是可用性不高,我们的用例锁,使其成为工作的好工具。我们没有使用这些在哨兵的核心了,但一个示例用例是,我们希望尽量减少并发性和简单无操作的操作,如果事情似乎是已经在运行。这对于可能需要执行每隔一段时间类似cron任务非常有用,但不具备较强的协调。

在Redis的这样使用SETNX操作是相当简单的:

  1. from contextlib import contextmanagerr = Redis()@contextmanagerdef lock(key, nowait=True): 
  2.   while not r.setnx(key, '1'): 
  3.     if nowait: 
  4.       raise Locked('try again soon!'
  5.     sleep(0.01) 
  6.    
  7.   # limit lock time to 10 seconds 
  8.   r.expire(key, 10) 
  9.    
  10.   # do something crazy 
  11.   yield 
  12.    
  13.   # explicitly unlock 
  14.   r.delete(key) 

而锁()内的哨兵利用的memcached的,但绝对没有理由我们不能在其切换到Redis。

时间序列数据

近来我们创造一个新的机制在Sentry(包含在sentry.tsdb中)存储时间序列数据。这是受RRD模型启发,特别是Graphite。我们期望一个快速简单的方式存储短期(比如一个月)时间序列数,以便于处理高速写入数据,特别是在极端情况下计算潜在的短期速率。尽管这是第一个模型,我们依旧期望在Redis存储数据,它也是使用计数器的简单范例。

在目前的模型中,我们使用单一的hash map来存储全部时间序列数据。例如,这意味所有数据项在都将同一个哈希键拥有一个数据类型和1秒的生命周期。如下所示:

  1.  
  2. "::": { 
  3.  
  4. ""
  5.  
  6. }} 

因此在这种状况,我们需要追踪事件的数目。事件类型映射到枚举类型"1".该判断的时间是1s,因此我们的处理时间需要以秒计。散列最终看起来是这样的:

  1.  
  2. "1:1399958363:0": { 
  3.  
  4. "1": 53, 
  5.  
  6. "2": 72, 
  7.  
  8. }} 

一个可修改模型可能仅使用简单的键并且仅在存储区上增加一些增量寄存器。

  1. "1:1399958363:0:1": 53 

我们选择哈希映射模型基于以下两个原因:

我们可以将所有的键设为一次性的(这也可能产生负面影响,但是目前为止是稳定的)

大幅压缩键值,这是相当重要的处理

此外,离散的数字键允许我们在将虚拟的离散键值映射到固定数目的键值上,并在此分配单一存储区(我们可以使用64,映射到32个物理结点上)

现在通过使用Nydus和它的map()(依赖于一个工作区)(),数据查询已经完成。这次操作的代码是相当健壮的,但幸好它并不庞大。

  1. def get_range(self, model, keys, start, end, rollup=None): 
  2.   """  To get a range of data for group ID=[1, 2, 3]:  Start and end are both inclusive.  >>> now = timezone.now()  >>> get_keys(tsdb.models.group, [1, 2, 3],  >>>     start=now - timedelta(days=1),  >>>     end=now)  """ 
  3.   normalize_to_epoch = self.normalize_to_epoch 
  4.   normalize_to_rollup = self.normalize_to_rollup 
  5.   make_key = self.make_key 
  6.    
  7.   if rollup is None: 
  8.     rollup = self.get_optimal_rollup(start, end) 
  9.    
  10.   results = [] 
  11.   timestamp = end 
  12.   with self.conn.map() as conn: 
  13.     while timestamp >= start: 
  14.       real_epoch = normalize_to_epoch(timestamp, rollup) 
  15.       norm_epoch = normalize_to_rollup(timestamp, rollup) 
  16.    
  17.       for key in keys: 
  18.         model_key = self.get_model_key(key) 
  19.         hash_key = make_key(model, norm_epoch, model_key) 
  20.         results.append((real_epoch, key, conn.hget(hash_key, model_key))) 
  21.    
  22.       timestamp = timestamp - timedelta(seconds=rollup) 
  23.    
  24.   results_by_key = defaultdict(dict) 
  25.   for epoch, key, count in results: 
  26.     results_by_key[key][epoch] = int(count or 0) 
  27.    
  28.   for key, points in results_by_key.iteritems(): 
  29.     results_by_key[key] = sorted(points.items()) 
  30.   return dict(results_by_key) 

归结如下:

生成所必须的键。

使用工作区,提取所有连接操作的最小结果集(Nydus负责这些)。

给出结果,并且基于指定的时间间隔内和给定的键值将它们映射到当前的存储区内。

简单的选择

我是一个喜欢用简单的方案解决问题的人,在这个范畴里使用Redis无疑是很适合的。它的文档是那样让人惊讶,那是因为(阅读)其文档的门槛非常的低。虽然他也有折衷(主要是如果你使用持久化),但是他们工作地很好并且比较直观。

那么Redis为您解决什么问题呢?