fastapi身份认证

时间:2022-12-11 15:56:12

官方文档

FastApi提供了OAuth2PasswordBearer类对OAuth2中的password授权模式提供了支持。

一、实现逻辑

  1. 创建OAuth2PasswordBearer实例并指明token url(认证用户获取token)。

    oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
    
  2. 将创建的OAuth2PasswordBearer作为依赖添加到需要用户认证才能访问的url。

    @app.get("/items/")
    async def read_items(token: str = Depends(oauth2_scheme)):
        return {"token": token}
    
  3. 实现tokenUrl端点

    • 认证用户

      def authenticate_user(fake_db, username, password):
          user = get_user(fake_db, username) (1)
          if not user:
              return False
          if not verify_password(password, user.hashed_password):  (2)
              return False
          return user
      

      这里使用passlibBcrypt生成和验证hash密码。

      $ pip install passlib[bcrypt]
      

      (1)通过用户名查询系统用户

      (2)验证密码

    • 生成token

      def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
          to_encode = data.copy()
          if expires_delta:
              expire = datetime.utcnow() + expires_delta
          else:
              expire = datetime.utcnow() + timedelta(minutes=15)
          to_encode.update({"exp": expire})
          encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) (1)
          return encoded_jwt
      

      这里使用python-josecryptography生成和校验token,

      $ pip install python-jose[cryptography]
      

      (1)SECRET_KEY为生成的秘钥,ALGORITHM为秘钥算法,这里为HS256。

      $ openssl rand -hex 32
      
    • 定义端点

      @app.post("/token")
      async def login(form_data: OAuth2PasswordRequestForm = Depends()):
          user = authenticate_user(fake_users_db, form_data.username, form_data.password)
          if not user:
              raise HTTPException(
                  status_code=status.HTTP_401_UNAUTHORIZED,
                  detail="Incorrect username or password",
                  headers={"WWW-Authenticate": "Bearer"},
              )
          access_token_expires = timedelta(minutes=15)
          access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires) 
          return {"access_token": access_token, "token_type": "bearer"}
      

二、OAuth2PasswordBearer

class OAuth2PasswordBearer(OAuth2):
    def __init__(
        self,
        tokenUrl: str,
        scheme_name: Optional[str] = None,
        scopes: Optional[Dict[str, str]] = None,
        description: Optional[str] = None,
        auto_error: bool = True,
    ):
        if not scopes:
            scopes = {}
        flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes})
        super().__init__(
            flows=flows,
            scheme_name=scheme_name,
            description=description,
            auto_error=auto_error,
        )

OAuth2PasswordBearer的tokenUrl属性定义了认证请求处理url,由开发者自行实现token url。

from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")1@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):2return {"token": token}

(1)创建OAuth2PasswordBearer实例并声明token url;

(2)通过Depends(oauth2_scheme)可以从请求中获取Bearer token,如果用户没有登录则将会返回401错误;

三、实现token url端点:

@app.post("/token")   (1)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)2if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=15)    (3)
    access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires)4return {"access_token": access_token, "token_type": "bearer"}

(1)使用post方法请求/token即可进行用户认证获取token。

(2)通过请求中的用户名进行用户认证,认证通过后将返回用户信息

(3)token有效时间

(4)生成token

四、OAuth2PasswordRequestForm

class OAuth2PasswordRequestForm:
    def __init__(
        self,
        grant_type: str = Form(default=None, regex="password"),
        username: str = Form(),
        password: str = Form(),
        scope: str = Form(default=""),
        client_id: Optional[str] = Form(default=None),
        client_secret: Optional[str] = Form(default=None),
    ):
        self.grant_type = grant_type
        self.username = username
        self.password = password
        self.scopes = scope.split()
        self.client_id = client_id
        self.client_secret = client_secret

OAuth2PasswordRequestForm是fastapi提供的获取请求中的用户名密码的依赖项,声明了如下的请求表单:

  • username
  • password
  • 一个可选的 scope 字段,是一个由空格分隔的字符串组成的大字符串。
  • 一个可选的 grant_type
  • 一个可选的 client_id
  • 一个可选的 client_secret

注:使用form_data需要安装python-multipart

$ pip install python-multipart

五、完整示例代码

import time
from datetime import timedelta, datetime
from typing import Union

from fastapi import Depends, FastAPI, HTTPException, Request

from fastapi.security.oauth2 import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from starlette import status
from passlib.context import CryptContext
from jose import JWTError, jwt

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$uaiu4T29ukSDdbKIwv51mee661OGiaycuE276syh79I5uAIIVkatC",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "$2b$12$uaiu4T29ukSDdbKIwv51mee661OGiaycuE276syh79I5uAIIVkatC",
        "disabled": True,
    },
}

class UserInDB(User):
    hashed_password: str


class TokenData(BaseModel):
    username: Union[str, None] = None


class Token(BaseModel):
    access_token: str
    token_type: str


pwd_context = CryptContext(schemes=["bcrypt"])


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(db, username):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username, password):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"


def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid authentication credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_activate_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")

    return current_user


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=15)
    access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires)
    return {"access_token": access_token, "token_type": "bearer"}

  
@app.get("/user/me", response_model=User)
async def read_items(user: User = Depends(get_current_activate_user)):
    return user

六、中间件

fastapi提供了中间件可以在请求执行前和请求执行后做一下其他的拦截。

创建中间件只需要使用@app.middleware(“http”)装饰器,所有的请求执行前和执行后都将经过中间件。

import time

from fastapi import FastAPI, Request

app = FastAPI()


@app.middleware("http") (1)
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()  (2)
    response = await call_next(request)
    process_time = time.time() - start_time  (3)
    response.headers["X-Process-Time"] = str(process_time)
    return response

(1)声明该函数为中间件

(2)请求处理前前执行

(3)请求处理完成后执行