python语言中的AOP利器:装饰器

时间:2023-03-09 00:58:57
python语言中的AOP利器:装饰器

一、前言

面向切面编程(AOP)是一种编程思想,与OOP并不矛盾,只是它们的关注点相同。面向对象的目的在于抽象和管理,而面向切面的目的在于解耦和复用。

举两个大家都接触过的AOP的例子:

1)java中mybatis的@Transactional注解,大家知道被这个注解注释的函数立即就能获得DB的事务能力。

2)python中的with threading.Lock(),大家知道,被这个with代码块包裹的部分立即获得同步的锁机制。

这样我们把事务和加锁这两种与业务无关的逻辑抽象出来,在逻辑上解耦,并且可以轻松的做到代码复用。

二、上下文管理器contextlib

当然你可以使用with上下文管理器实现一些AOP的思想,这里有个模块叫contextlib可以帮助你简易的实现上下文管理器。

上下文管理最常见的例子是with open('file') as fh,回收打开句柄的例子。

这种方式还是比较麻烦的,下面我们看一下python中的装饰器怎么样实现AOP编程。

三、装饰器:AOP的语法糖

python中的装饰器就是设计来实现切面注入功能的。下面给出几个例子,这几个例子都是在生产环境验证过的。

其中的任务管理机是伪代码,需要自己实现写数据库的逻辑。

1、重试逻辑

只要do函数被@retry_exp装饰,便可以获得指数退避的重试能力。

@retry_exp(max_retries=10)
def do():
# do whatever
pass

那retry_exp是如何实现的呢?

def retry_exp(max_retries=3, max_wait_interval=10, period=1, rand=False):

    def _retry(func):

        def __retry(*args, **kwargs):
MAX_RETRIES = max_retries
MAX_WAIT_INTERVAL = max_wait_interval
PERIOD = period
RAND = rand retries = 0
error = None
ok = False
while retries < MAX_RETRIES:
try:
ret = func(*args, **kwargs)
ok = True
return ret
except Exception, ex:
error = ex
finally:
if not ok:
sleep_time = min(2 ** retries * PERIOD if not RAND else randint(0, 2 ** retries) * PERIOD, MAX_WAIT_INTERVAL)
time.sleep(sleep_time)
retries += 1
if retries == MAX_RETRIES:
if error:
raise error
else:
raise Exception("unknown")
return __retry
return _retry

2、降级开关

只要do函数被@degrade装饰,就会安装app名称校验redis里的开关,一旦发现开关关闭,则do函数不被执行,也就是降级。

@degrade
def do(app):
# do whatever
pass

那么degrade是怎样实现的呢?

def degrade(app):

    def _wrapper(function):

        def __wrapper(*args, **kwargs):

            value = None
try:
redis = codis_pool.get_connection()
value = redis.get("dmonitor:degrade:%s" % app)
except Exception, _:
logger.info(traceback.format_exc()) if not value or int(value) != 1:
function()
logger.info("[degrade] is_on: %s" % app)
else:
logger.info("[degrade] is_off: %s" % app)
return __wrapper return _wrapper

3、任务状态机

这个是最常用的,我们需要跟踪落盘DB一个任务的执行状态(等待调度,执行中,执行成功,执行失败)

一旦do方法被@tasks_decorator装饰,就获得了这样的能力。对item_param(是个json)中task_id指明的任务进行状态管理。

@tasks_decorator
def do(item_param):
# do whatever
pass

tasks_decorator是怎样实现的呢?

def tasks_decorator(function):
def _wrap(*args, **kwargs):
param_dict = kwargs.get('item_param')
task_id = param_dict.get('task_id')
try:
param_dict.update({'status': TaskStatus.Waiting, 'start_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
try:
manager_dao.save_task(param_dict)
except Exception, ex:
pass
_update_task_status(task_id, TaskStatus.Doing)
function(*args, **kwargs)
_update_task_status(task_id, TaskStatus.Done)
except Exception as e:
time.sleep(0.5)
_update_task_status(task_id, TaskStatus.Fail, unicode(e.message))
raise
return _wrap

4、全局唯一性

在分布式+异步环境中,如果想保证exactly once是需要额外的逻辑的,其实主要是实现唯一键,一旦唯一键实现了,就可以使用公共缓存redis进行唯一键判定了。

do函数被unique装饰,那么对于task_id对应的任务,全局只会执行一次。

@unique
def do(task_id):
# do whatever
pass

unique是怎样实现的呢?

def unique(function):
def _wrap(*args, **kwargs):
task_id = kwargs.get('task_id')
try:
redis = codis_pool.get_connection()
key = "unique:%s" % task_id
if not redis.setnx(key):
redis.expire(key, 24*60*60)
function(*args, **kwargs)
except Exception as e:
logger.error(traceback.format_exc())
raise
return _wrap

四、总结

AOP在少量增加代码复杂度的前提下,显著的获得以下优点:

1、使得功能逻辑和业务逻辑解耦,功能和业务的修改完全独立,代码结构清晰,开发方便

2、一键注入,代码复用程度高,扩展方便