使用Python的Django框架实现事务交易管理的教程

时间:2022-09-16 12:13:24

 如果你花费了很多的时间去进行Django数据库事务处理的话,你将会了解到这是让人晕头转向的。

在过去,只是提供了简单的基础文档,要想清楚知道它是怎么使用的,还必须要通过创建和执行Django的事务处理。

这里有众多的Django事务处理的名词,例如:commit_on_success , commit_manually , commit_unless_maneged,rollback_unless_managed,enter_transaction_management,leace_transaction_management,这些只是其中的一部分而已。

最让人感到幸运的是,Django 1.6版本发布后。现在你可以紧紧使用几个函数就可以实现事务处理了,在我们进入学习这些函数的几秒之前。首先,我们要明确下面这些问题:

  •     什么是事务处理呢?
  •     Django 1.6中的出现了那些新的事物处理优先权?

在进入“要怎么才是Django 1.6版本中正确的事务处理”之前,请先了解一下下面的详细案例:

    带状案例

    事务

  •             推荐的方式
  •             使用分隔符
  •             每一个HTTP请求事务

    保存点

    嵌套的事务


事务是什么?

根据SQL-92所说,"一个SQL事务(有时简称为事务)是一个可以原子性恢复的SQL语句执行序列"。换句话说,所有的SQL语句一起执行和提交。同样的,当回滚时,所有语句也一并回滚。

例如:
 

?
1
2
3
4
5
6
# START
note = Note(title="my first note", text="Yay!")
note = Note(title="my second note", text="Whee!")
address1.save()
address2.save()
# COMMIT

所以一个事务是 数据库中单个的工作单元。它是由一个start transaction开始和一个commit或者显式的rollback结束。

Django 1.6之前的事务管理有什么问题?

为了完整地回答这个问题,我们必须阐述一下事务在数据库、客户端以及Django中是如何处理的。

数据库

数据库中的每一条语句都运行在一个事务中,这个事务甚至可以只包含一条语句。

几乎所有的数据库都有一个AUTOCOMMIT设置,通常它被默认设置为True。AUTOCOMMIT将所有语句包装到一个事务里,只要语句成功执行,这个事务就立即被提交。当然你也可以手动调用START_TRANSACTION,它会暂时将AUTOCOMMIT挂起,直到你调用COMMIT_TRANSACTION或者ROLLBACK。

然后,这种方式将会使AUTOCOMMIT设置的作用于每条语句后的隐式提交失效。


然而,有诸如像sqlite3和mysqldb的python客户端库,它允许python程序与数据库本身相连接。这些库遵循一套如何访问与查询数据库的标准。该DB API 2.0标准,被描述在PEP 249之中。虽然它可能让人阅读稍干一些。一个重要带走的是,在PEP 249状态之中,默认数据库应该关闭自动提交功能。

这明显与数据库内发生了什么矛盾冲突:

  •     数据库语句总是要在一个事务中运行。此数据库一般会为你打开自动提交功能。
  •     不过,根据PEP 249,这不应该发生。
  •     客户端库必须反映在数据库之中发生了什么?但由于他们不允许默认打开自动提交功能。他们只是简单地在一个事务中包裹sql语句。就像数据库。

好啦,在我身边呆久一些吧。

 

Django

进入Django,Django也有关于事务处理的话要说。在Django 1.5和更早的版本。当你写数据到数据库时,Django基本上是运行一个开放的事务和自动提交该事务功能。所以每次你所称谓的像诸如model.save() 或者model.update()的东西,Django生成相应的sql语句,并提交该事务。

也有在Django 1.5和更早的版本,它是建议你使用TransactionMiddleware绑定http请求事务。每个请求提供了一个事务。如果返回的响应没有异常,Django会提交此事务。但如果你的视图功能抛出一个错误,回滚将被调用。这实际上说明,它关闭了自动提交功能。如果你想要标准化,数据库级别自动提交风格式的事务管理,你必须管理你自己的交易-通常是通过使用事务装饰你的视图功能,例如@transaction.commit_manually,或者@transaction.commit_on_success.

吸一口气,或者两口。


这意味着什么?

是啊,在那儿有许多事情要做,而事实证明,大多数开发者正需要这个标准数据库级的自动提交功能-有意义的事务往往是留在幕后处理的。做你自己的事,直到你需要手动调整他们。

在Django 1.6版本之中,什么是正确关于事务管理呢?

现在,欢迎来到Django 1.6.尽力忘掉一切吧,我们只是谈论而已,只是记得在Django 1.6中,你可以使用数据库,需要时可以手动自动提交和管理事务。从本质上来说,我们有一个更简单的模型,基本上是要把设计什么样的数据库摆在首位。

好啦!大功告成,让我们写代码吧?

 

Stripe案例

下面,我们使用处理一个用户注册的例子,调用了Stripe来处理信用卡进程。
 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def register(request):
  user = None
  if request.method == 'POST':
    form = UserForm(request.POST)
    if form.is_valid():
 
      customer = Customer.create("subscription",
       email = form.cleaned_data['email'],
       description = form.cleaned_data['name'],
       card = form.cleaned_data['stripe_token'],
       plan="gold",
      )
 
      cd = form.cleaned_data     
      try:
        user = User.create(cd['name'], cd['email'], cd['password'],
          cd['last_4_digits'])
 
        if customer:
          user.stripe_id = customer.id
          user.save()
        else:
          UnpaidUsers(email=cd['email']).save()
 
      except IntegrityError:
        form.addError(cd['email'] + ' is already a member')
      else:
        request.session['user'] = user.pk
        return HttpResponseRedirect('/')
 
  else:
   form = UserForm()
 
  return render_to_response(
    'register.html',
    {
     'form': form,
     'months': range(1, 12),
     'publishable': settings.STRIPE_PUBLISHABLE,
     'soon': soon(),
     'user': user,
     'years': range(2011, 2036),
    },
    context_instance=RequestContext(request)
  )

例子首先调用了Customer.create,实际上就是调用Stripe来处理信用卡进程,然后我们创建一个新用户。如果我们得到来自Stripe的响应,我们就用stripe_id更新新创建的用户。如果我们没有得到响应(Stripe已关闭),我们将用新创建用户的email向UnpaidUsers表增加一个新条目,这样我们可以让他们稍后重试他们信用卡信息。


思路是这样的:如果Stripe没有响应,用户依然可以注册,然后开始使用我们的网站。我们将在稍后的时候让用户提供信用卡的信息。

“我明白这是一个特殊的例子,并且这也不是我想完成的功能的方式,但是它的目的是展示交易”

考虑交易,牢记住在Django1.6中提供了对于数据库的“AUTOCOMMIT”功能。接下来看一下数据库相关的代码:
 

?
1
2
3
4
5
6
7
8
9
10
11
cd = form.cleaned_data
try:
  user = User.create(cd['name'], cd['email'], cd['password'], cd['last_4_digits'])
 
  if customer:
    user.stripe_id = customer.id
    user.save()
  else:
    UnpaidUsers(email=cd['email']).save()
 
except IntegrityError:

你能发现问题了吗?如果“UnpaidUsers(email=cd['email']).save()” 运行失败,会发生什么?

有一个用户,注册了系统;然后系统认为已经核对过信用卡了。但是事实上,系统并没有核对过。

我们仅仅想得到其中一种结果:

1.在数据库中创建了用户,并有了stripe_id

2.在数据库中创建了用户,但是没有stripe_id。同时在相关的“UnpaidUsers”行,存有相同的邮件地址

这就意味着,我们想要的是分开的数据库语句头完成任务或者回滚。这个例子很好的说明了这个交易。


首先,我们写一些测试用例来验证事情是否按照我们想象的方式运行: 
 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@mock.patch('payments.models.UnpaidUsers.save', side_effect = IntegrityError)
def test_registering_user_when_strip_is_down_all_or_nothing(self, save_mock):
 
  #create the request used to test the view
  self.request.session = {}
  self.request.method='POST'
  self.request.POST = {'email' : 'python@rocks.com',
             'name' : 'pyRock',
             'stripe_token' : '...',
             'last_4_digits' : '4242',
             'password' : 'bad_password',
             'ver_password' : 'bad_password',
            }   
 
  #mock out stripe and ask it to throw a connection error
  with mock.patch('stripe.Customer.create', side_effect =
          socket.error("can't connect to stripe")) as stripe_mock:
 
    #run the test
    resp = register(self.request)
 
    #assert there is no record in the database without stripe id.
    users = User.objects.filter(email="python@rocks.com")
    self.assertEquals(len(users), 0)
 
    #check the associated table also didn't get updated
    unpaid = UnpaidUsers.objects.filter(email="python@rocks.com")
    self.assertEquals(len(unpaid), 0)

当我们尝试去保存“UnpaidUsers”,测试上方的解释器就会跑出异常'IntegrityError' 。

接下来是解释这个问题的答案,“当“UnpaidUsers(email=cd['email']).save()”运行的时候到底发生了什么?” 下面一段代码创建了一段对话,我们需要在注册函数中给出一些合适的信息。然后“with mock.patch” 会强制系统去认为Stripe没响应,最终就跳到我们的测试用例中。

?
1
resp = register(self.request)

上面这段话仅仅是调用我们的注册视图去传递请求。然后我们仅仅需要去核对表是否有更新:
 

?
1
2
3
4
5
6
7
#assert there is no record in the database without stripe_id.
users = User.objects.filter(email="python@rocks.com")
self.assertEquals(len(users), 0)
 
#check the associated table also didn't get updated
unpaid = UnpaidUsers.objects.filter(email="python@rocks.com")
self.assertEquals(len(unpaid), 0)

所以如果我们运行了测试用例,那么它就该运行失败:
 

?
1
2
3
4
5
6
7
8
9
10
11
======================================================================
FAIL: test_registering_user_when_strip_is_down_all_or_nothing (tests.payments.testViews.RegisterPageTests)
----------------------------------------------------------------------
Traceback (most recent call last):
 File "/Users/j1z0/.virtualenvs/django_1.6/lib/python2.7/site-packages/mock.py", line 1201, in patched
  return func(*args, **keywargs)
 File "/Users/j1z0/Code/RealPython/mvp_for_Adv_Python_Web_Book/tests/payments/testViews.py", line 266, in test_registering_user_when_strip_is_down_all_or_nothing
  self.assertEquals(len(users), 0)
AssertionError: 1 != 0
 
----------------------------------------------------------------------

赞。这就是我们最终想要的结果。

记住:我们这里已经练习了“测试驱动开发”的能力。错误信息提示我们:用户信息已经被保存到数据库中,但是这个并不是我们想要的,因为我们并没有付费!

事务交易用于挽救这样问题 ...

事务

对于Django1.6,有很多种方式来创建事务。

这里简单介绍几种。
推荐的方法

依据Django1.6的文档,“Django提供了一种简单的API去控制数据库的事务交易...原子操作用来定义数据库事务的属性。原子操作允许我们在数据库保证的前提下,创建一堆代码。如果这些代码被成功的执行,所对应的改变也会提交到数据库中。如果有异常发生,那么操作就会回滚。”

原子操作可以被用于解释操作或者是内容管理。所以如果我们用作为内容管理的时候,我们的注册函数的代码就会如下:
 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from django.db import transaction
 
try:
  with transaction.atomic():
    user = User.create(cd['name'], cd['email'], cd['password'], cd['last_4_digits'])
 
    if customer:
      user.stripe_id = customer.id
      user.save()
    else:
      UnpaidUsers(email=cd['email']).save()
 
except IntegrityError:
  form.addError(cd['email'] + ' is already a member')

注意在“with transaction.atomic()”这一行。这块代码将会在事务内部执行。所以如果我们重新运行了我们的测试,他们都将会通过。

记住:事务是一个工作单元,所以当“UnpaidUsers”调用失败的时候,内容管理的所有操作都会被一起回滚。

使用装饰器

除了上面的做法,我们能使用Python的装饰器特性来使用事务。
 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@transaction.atomic():
def register(request):
  ...snip....
 
  try:
    user = User.create(cd['name'], cd['email'], cd['password'], cd['last_4_digits'])
 
    if customer:
      user.stripe_id = customer.id
      user.save()
    else:
        UnpaidUsers(email=cd['email']).save()
 
  except IntegrityError:
    form.addError(cd['email'] + ' is already a member')

如果我们重新跑一次测试,那还是会一样失败。

为啥呢?为啥事务没有正确回滚呢?原因在与transaction.atomic会尝试捕获某种异常,而我们代码里人肉捕抓了(例如 try-except 代码块里的IntegrityError 异常),所以 transaction.atomic 永远看不到这个异常,所以标准的AUTOCOMMIT 流程就此无效掉。

但是,删掉try-catch语句会导致异常没有捕获,然后代码流程十有八九会就此乱掉。所以啊,也就是说不能去掉try-catch。
 


所以,技巧是将原子上下文管理器放入我们在第一个解决方案中的 try-catch 代码段里。

再看一下正确的代码:
 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from django.db import transaction
 
try:
  with transaction.atomic():
    user = User.create(cd['name'], cd['email'], cd['password'], cd['last_4_digits'])
 
    if customer:
      user.stripe_id = customer.id
      user.save()
    else:
      UnpaidUsers(email=cd['email']).save()
 
except IntegrityError:
  form.addError(cd['email'] + ' is already a member')

当 UnpaidUsers 触发 IntegrityError 时,上下文管理器 transaction.atomic() 会捕获到它,并执行回滚操作。此时我们的代码在异常处理中执行(即行 theform.addErrorline),将会完成回滚操作,并且,如果必要的话,也可以安全的进行数据库调用。也要注意:任何在上下文管理器 thetransaction.atomic() 前后的数据库调用都不会受到它的执行结果的影响。

针对每次HTTP请求的事务交易

Django1.5和1.6版本都允许用户操作请求事务模式。在这种模式下,Django会自动在事务中,处理你的视图函数。如果视图函数抛出异常,Django会自动回滚事务;否则Django会提交事务。

为了实现这个功能,你需要在你想要有此功能的数据库的配置中,设置“ATOMIC_REQUEST”为真。所以在我们的“settings.py”需要有如下设置:
 

?
1
2
3
4
5
6
7
DATABASES = {
  'default': {
    'ENGINE': 'django.db.backends.sqlite3',
    'NAME': os.path.join(SITE_ROOT, 'test.db'),
    'ATOMIC_REQUEST': True,
  }
}

如果我们把解释器放到视图函数中,以上设置就会生效。所以这并没有符合我们的想法。

但是,这里依然值得注意的是解释器:“ATOMIC_REQUESTS”和“@transaction.atomic”仍然会有可能在有异常抛出的时候,处理这些错误。为了去捕捉这些错误,你需要去完成一些常规的中间件,或者需要去覆盖“urls.hadler500”,或者是创建新的“500.html”模板。

保存点

尽管事务是有原子性的,但还是能够打散为多个“保存点”——你可看作是“部分事务”。

例如,你有个事务包含了4条SQL语句,你可以在第二个SQL之后创建一个保存点。一旦保存点创建成功,就算第三条或第四条SQL执行失败,你仍旧能够做一个部分回滚,忽视后面两条SQL,仅保留前面两条。

基本上,这就像是提供了一个切割的能力:一个普通的事务能够被切分为多个更“轻量”的事务,然后能进行部分回滚或部分提交。

    但一定要注意,小心整个事务被无端回滚掉(例如由于抛出了IntegrityError异常但却没有捕抓,那所有的保存点都会因此被回滚掉的)

来看个示例代码,了解怎么玩转保存点。

 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@transaction.atomic()
def save_points(self,save=True):
 
  user = User.create('jj','inception','jj','1234')
  sp1 = transaction.savepoint()
 
  user.name = 'zheli hui guadiao, T.T'
  user.stripe_id = 4
  user.save()
 
  if save:
    transaction.savepoint_commit(sp1)
  else:
    transaction.savepoint_rollback(sp1)

示例中,整个函数都是属于一个事务的。新User对象创建后,我们创建并得到了一个保存点。然后后续3行语句——
 

?
1
2
3
user.name = 'zheli hui guadiao, T.T'
user.stripe_id = 4
user.save()

——不属于刚才的保存点,因此他们有可能是属于下面的savepoint_rollback或savepoint_commit的一部分。假设是savepoint_rollback, 那代码行user = User.create('jj','inception','jj','1234')仍旧会成功提交到数据库中 ,而下面三行则不会成功。
 

采用另外一种方法,下面的两种测试用例描述了保存点是如何工作的:

 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def test_savepoint_rollbacks(self):
 
  self.save_points(False)
 
  #verify that everything was stored
  users = User.objects.filter(email="inception")
  self.assertEquals(len(users), 1)
 
  #note the values here are from the original create call
  self.assertEquals(users[0].stripe_id, '')
  self.assertEquals(users[0].name, 'jj')
 
 
def test_savepoint_commit(self):
  self.save_points(True)
 
  #verify that everything was stored
  users = User.objects.filter(email="inception")
  self.assertEquals(len(users), 1)
 
  #note the values here are from the update calls
  self.assertEquals(users[0].stripe_id, '4')
  self.assertEquals(users[0].name, 'starting down the rabbit hole')

同样,在我们提交或者回滚保存点之后,我们仍然可以继续在同一个事务中工作。同时,这个运行结果不受之前保存点输出结果的影响。

例如,如果我们按照如下例子更新“save_points”函数,

 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@transaction.atomic()
def save_points(self,save=True):
 
  user = User.create('jj','inception','jj','1234')
  sp1 = transaction.savepoint()
 
  user.name = 'starting down the rabbit hole'
  user.save()
 
  user.stripe_id = 4
  user.save()
 
  if save:
    transaction.savepoint_commit(sp1)
  else:
    transaction.savepoint_rollback(sp1)
 
  user.create('limbo','illbehere@forever','mind blown',
      '1111')

即使无论是“savepoint_commit”或者“savepoint_rollback”被“limbo”这个用户调用了,这个事务仍然会被成功创建。如果没有创建成功,整个事务将会被回滚。

嵌套事务

采用“savepoint()”,“savepoint_commit”和“savepoint_rollback”去手动指定保存点,将会自动一个嵌套事务,同时这个嵌套事务会自动为我们创建一个保存点。并且,如果我们遇到错误,这个事务将会回滚。

下面用一个扩展的例子来说明:
 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@transaction.atomic()
def save_points(self,save=True):
 
  user = User.create('jj','inception','jj','1234')
  sp1 = transaction.savepoint()
 
  user.name = 'starting down the rabbit hole'
  user.save()
 
  user.stripe_id = 4
  user.save()
 
  if save:
    transaction.savepoint_commit(sp1)
  else:
    transaction.savepoint_rollback(sp1)
 
  try:
    with transaction.atomic():
      user.create('limbo','illbehere@forever','mind blown',
          '1111')
      if not save: raise DatabaseError
  except DatabaseError:
    pass

这里我们可以看到:在我们处理保存点之后,我们采用“thetransaction.atomic”的上下文管理区擦出我们创建的"limbo"这个用户。当上下文管理被调用的时候,它会创建一个保存点(因为我们已经在事务里面了),同时这个保存点将会依据已经存在的上下文管理器去被执行或者回滚。

这样下面两个测试用例就描述了这个行文:

 
 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def test_savepoint_rollbacks(self):
 
     self.save_points(False)
 
    #verify that everything was stored
    users = User.objects.filter(email="inception")
    self.assertEquals(len(users), 1)
 
    #savepoint was rolled back so we should have original values
    self.assertEquals(users[0].stripe_id, '')
    self.assertEquals(users[0].name, 'jj')
 
    #this save point was rolled back because of DatabaseError
    limbo = User.objects.filter(email="illbehere@forever")
    self.assertEquals(len(limbo),0)
 
  def test_savepoint_commit(self):
    self.save_points(True)
 
    #verify that everything was stored
    users = User.objects.filter(email="inception")
    self.assertEquals(len(users), 1)
 
    #savepoint was committed
    self.assertEquals(users[0].stripe_id, '4')
    self.assertEquals(users[0].name, 'starting down the rabbit hole')
 
    #save point was committed by exiting the context_manager without an exception
    limbo = User.objects.filter(email="illbehere@forever")
    self.assertEquals(len(limbo),1)


因此,在现实之中你可以使用原子或者在事务之中创建保存点的保存点。使用原子,你不必要很仔细地担心提交和会滚,当这种情况发生时,你可以完全控制其中的保存点。
结论

如果你有任何以往使用Django更早版本事务处理的经验,你可以看到很多更简单地事务处理模型。如下,在默认情况下,也有自动提交功能,它是一个很好的例子,即Django与python两者都引以为豪所提供的“理智的”默认值。对于如此多的系统,你将不需要直接地来处理事务。只是让“自动提交功能”来完成其工作,但如果你这样做,我将希望这个帖子能提供你所需要像专家一样在Django之中管理的事务处理。