使用Python的web.py框架实现类似Django的ORM查询的教程

时间:2021-09-10 20:35:19

Django中的对象查询

Django框架自带了ORM,实现了一些比较强大而且方便的查询功能,这些功能和表无关。比如下面这个例子:

?
1
2
3
4
5
6
7
class Question(models.Model):
  question_text = models.CharField(max_length=200)
  pub_date = models.DateTimeField('date published')
 
 
>>> Question.objects.all()
>>> Question.objects.get(pk=1)

从例子可以看出,objects.all和objects.get这些功能都不是在class Question中定义的,可能在其父类models.Model中定义,也可能不是。那么我们在web.py中如何实现这样的功能呢?(如果你选择使用SQLAlchemy就不需要自己实现了)。
实现
思路

我们注意到Question.objects.all()这样的调用是直接访问了类属性objects,并调用了objects属性的方法all()。这里objects可能是一个实例,也可能是一个类。我个人认为(我没看过Django的实现)这应该是一个实例,因为实例化的过程可以传递一些表的信息,使得类似all()这样的函数可以工作。经过分析之后,我们可以列出我们需要解决的问题:

  •     需要实现一个模型的父类Model,实际的表可以从这个父类继承以获得自己没有定义的功能。
  •     实际的模型类(比如Question类)定义后,不实例话的情况下就要具备objects.all()这样的查询效果。
  • 从上面的需求可以看出,我们需要在类定义的时候就实现这些功能,而不是等到类实例化的时候再实现这些功能。类定义的时候实现功能?这不就是metaclass(元类)做的事情嘛。因此实现过程大概是下面这样的:
  •     实现一个Model类,其绑定方法和表的增、删、改有关。
  •     修改Model类的元类为ModelMetaClass,该元类定义的过程中为类增加一个objects对象,该对象是一个ModelDefaultManager类的实例,实现了表的查询功能。

代码

都说不给代码就是耍流氓,我还是给吧。说明下:使用的数据库操作都是web.py的db库中的接口。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# -*- coding: utf-8 -*-
 
import web
 
import config # 自定义的配置类,可以忽略
 
 
def _connect_to_db():
  return web.database(dbn="sqlite", db=config.dbname)
 
 
def init_db():
  db = _connect_to_db()
  for statement in config.sql_statements:
    db.query(statement)
 
 
class ModelError(Exception):
  """Exception raised by all models.
 
  Attributes:
    msg: Error message.
  """
 
  def __init__(self, msg=""):
    self.msg = msg
 
  def __str__(self):
    return "ModelError: %s" % self.msg
 
 
class ModelDefaultManager(object):
  """ModelManager implements query functions against a model.
 
  Attributes:
    cls: The class to be managed.
  """
 
  def __init__(self, cls):
    self.cls = cls
    self._table_name = cls.__name__.lower()
 
  def all(self):
    db = _connect_to_db()
    results = db.select(self._table_name)
    return [self.cls(x) for x in results]
 
  def get(self, query_vars, where):
    results = self.filter(query_vars, where, limit=1)
    if len(results) > 0:
      return results[0]
    else:
      return None
 
  def filter(self, query_vars, where, limit=None):
    db = _connect_to_db()
    try:
      results = db.select(self._table_name, vars=query_vars, where=where,
                limit=limit)
    except (Exception) as e:
      raise ModelError(str(e))
 
    return [self.cls(x) for x in results]
 
 
class ModelMetaClass(type):
 
  def __new__(cls, classname, bases, attrs):
    new_class = super(ModelMetaClass, cls).__new__(cls, classname,
                            bases, attrs)
    objects = ModelDefaultManager(new_class)
    setattr(new_class, "objects", objects)
 
    return new_class
 
 
class Model(object):
  """Parent class of all models.
  """
 
  __metaclass__ = ModelMetaClass
 
  def __init__(self):
    pass
 
  def _table_name(self):
    return self.__class__.__name__.lower()
 
  def insert(self, **kargs):
    db = _connect_to_db()
    try:
      with db.transaction():
        db.insert(self._table_name(), **kargs)
    except (Exception) as e:
      raise ModelError(str(e))
 
  def delete(self, where, using=None, vars=None):
    db = _connect_to_db()
    try:
      with db.transaction():
        db.delete(self._table_name(), where, vars=vars)
    except (Exception) as e:
      raise ModelError(str(e))
 
  def save(self, where, vars=None, **kargs):
    db = _connect_to_db()
    try:
      with db.transaction():
        db.update(self._table_name(), where, vars, **kargs)
    except (Exception) as e:
      raise ModelError(str(e))

使用

首先定义表对应的类:

?
1
2
class Users(Model):
  ...

使用就和Django的方式一样:

?
1
>>> user_list = Users.objects.all()