Установка необходимых библиотек
Для реализации аутентификации через JWT в FastAPI установите библиотеки fastapi
, uvicorn
, pyjwt
и passlib
.
pip install fastapi uvicorn pyjwt passlib[bcrypt]
Создание и верификация JWT
Используйте библиотеку pyjwt
для создания и верификации JWT. Сгенерируйте секретный ключ и установите срок действия токена.
import jwt
from datetime import datetime, timedelta
SECRET_KEY = "my_secret_key"
ALGORITHM = "HS256"
EXPIRATION_TIME = timedelta(minutes=30)
def create_jwt_token(data: dict):
expiration = datetime.utcnow() + EXPIRATION_TIME
data.update({"exp": expiration})
token = jwt.encode(data, SECRET_KEY, algorithm=ALGORITHM)
return token
def verify_jwt_token(token: str):
try:
decoded_data = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return decoded_data
except jwt.PyJWTError:
return None
Регистрация и аутентификация пользователей
Создайте маршруты для регистрации и аутентификации пользователей. Используйте passlib
для хеширования паролей.
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@app.post("/register")
def register_user(username: str, password: str):
hashed_password = pwd_context.hash(password)
# Сохраните пользователя в базе данных
return {"username": username, "hashed_password": hashed_password}
@app.post("/token")
def authenticate_user(username: str, password: str):
user = get_user(username) # Получите пользователя из базы данных
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
is_password_correct = pwd_context.verify(password, user.hashed_password)
if not is_password_correct:
raise HTTPException(status_code=400, detail="Incorrect username or password")
jwt_token = create_jwt_token({"sub": user.username})
return {"access_token": jwt_token, "token_type": "bearer"}
Привязка JWT к маршрутам
Используйте Depends
и oauth2_scheme
для привязки JWT аутентификации к маршрутам. Создайте зависимость для получения текущего пользователя.
def get_current_user(token: str = Depends(oauth2_scheme)):
decoded_data = verify_jwt_token(token)
if not decoded_data:
raise HTTPException(status_code=400, detail="Invalid token")
user = get_user(decoded_data["sub"]) # Получите пользователя из базы данных
if not user:
raise HTTPException(status_code=400, detail="User not found")
return user
@app.get("/users/me")
def get_user_me(current_user: User = Depends(get_current_user)):
return current_user
Тестирование аутентификации JWT
Тестируйте аутентификацию JWT, запуская приложение с помощью uvicorn
и выполняя запросы к маршрутам /register
, /token
и /users/me
.
uvicorn main:app --reload
Авторизация с использованием различных ролей
Реализуйте авторизацию на основе ролей, создавая зависимости для различных ролей и применяя их к маршрутам.
def has_role(role: str):
def role_validator(current_user: User = Depends(get_current_user)):
if role not in current_user.roles:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return current_user
return role_validator
@app.get("/admin")
def get_admin_data(current_user: User = Depends(has_role("admin"))):
return {"message": "Welcome, admin!"}
Использование зависимостей для гибкой авторизации
Для еще большей гибкости можно создавать зависимости, которые учитывают разные аспекты авторизации, например, проверку владения ресурсом.
def is_resource_owner(resource_id: int):
def owner_validator(current_user: User = Depends(get_current_user)):
if not is_user_owner_of_resource(current_user, resource_id): # Функция проверки владения ресурсом
raise HTTPException(status_code=403, detail="Not the resource owner")
return current_user
return owner_validator
@app.get("/resources/{resource_id}")
def get_resource_data(resource_id: int, current_user: User = Depends(is_resource_owner(resource_id))):
return {"message": "Welcome, resource owner!"}
Обработка исключений при аутентификации и авторизации
FastAPI предоставляет встроенные исключения для обработки ошибок аутентификации и авторизации, такие как HTTPException
. Вы можете использовать эти исключения для создания информативных сообщений об ошибках для пользователей.
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
if not is_password_correct:
raise HTTPException(status_code=400, detail="Incorrect username or password")
Тестирование аутентификации и авторизации
Тестирование аутентификации и авторизации в FastAPI можно выполнять с использованием библиотеки fastapi.testclient
. Это позволяет проверить функциональность вашего API без фактического выполнения запросов.
from fastapi.testclient import TestClient
client = TestClient(app)
def test_get_protected_route():
response = client.get("/protected-route", headers={"Authorization": "Bearer " + get_test_jwt()})
assert response.status_code == 200
def test_get_protected_route_without_token():
response = client.get("/protected-route")
assert response.status_code == 401
Расширение аутентификации и авторизации
FastAPI предлагает гибкость для интеграции с другими системами аутентификации и авторизации, такими как OAuth2, LDAP и другими. Вы можете расширять функциональность аутентификации и авторизации, создавая собственные зависимости и интегрируя их в ваше приложение.