FastAPI - 17 (보안3 - JWT)
출처: https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/
아래의 내용은 공식 사이트의 내용을 제 경험과 생각을 추가하여 다시 정리한 것 입니다.
JWT란
먼저 OAuth 2.0에서 발급되는 Bearer 토큰의 형태 중 하나가 JWT입니다.
- JWT 정의: JWT (JSON Web Tokens)는 JSON 객체를 긴 문자열로 인코딩하는 표준입니다. 이 문자열은 공백 없이 밀집되어 있습니다.
-
예시 형태: JWT는 다음과 같은 형태를 가집니다:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
- 비암호화: JWT는 암호화되지 않았으므로, 누구나 내용을 복구할 수 있습니다.
- 서명 포함: JWT는 서명되어 있어, 발급자는 자신이 발급한 토큰인지를 확인할 수 있습니다.
- 만료 기간 설정 가능: 예를 들어, 1주일 후 만료되는 토큰을 생성할 수 있습니다. 사용자가 다음 날 토큰을 가지고 돌아오면, 사용자가 여전히 시스템에 로그인된 상태임을 알 수 있습니다.
- 만료 및 변조 감지: 토큰이 만료되면 사용자는 재로그인해야 하며, 토큰을 변조하려고 시도할 경우 서명 불일치로 감지할 수 있습니다.
JWT의 구조
-
헤더(Header): 토큰의 타입(주로 JWT)과 사용된 해시 알고리즘(예: HMAC SHA256 또는 RSA)을 지정합니다.
{ "alg": "HS256", "typ": "JWT" }
-
페이로드(Payload): 토큰에 담을 클레임(Claims)을 포함합니다. 클레임은 사용자에 대한 속성이나 데이터를 의미하며, 이는 인증된 사용자의 식별 정보를 포함할 수 있습니다.
{ "sub": "1234567890", "name": "John Doe", "admin": true }
-
서명(Signature): 토큰이 유효하고 무결성이 보장되었음을 증명하기 위해 사용됩니다. 서명은 헤더와 페이로드의 인코딩된 값을 합치고, 비밀 키로 해시하여 생성됩니다.
패키지 설치
JWT를 사용하기 위해 python-jose
를 설치합니다. python-jose는 추가로 암호화 백엔드가 필요해서 아래와 같이 설치합니다.
pip install "python-jose[cryptography]"
비밀번호 해싱을 위해 passlib
를 설치합니다. ‘passlib’은 많은 보안 해시 알고리즘과 유틸리티를 함께 사용할 수 있도록 지원합니다. 권장 알고리즘은 “Bcrypt”입니다.
pip install "passlib[bcrypt]
JWT 적용
이전의 코드(FastAPI - 16 (보안2 - QAuth2, Bearer))를 이용하여 추가 합니다.
JWT 토큰 생성 및 반환
-
login_for_access_token: 로그인을 합니다.
- authenticate_user: 사용자 인증을 합니다.
- create_access_token: JWT 토큰을 생성합니다.
- Bearer 타입의 access_token을 반환합니다.
JWT 토큰을 이용하여 현재 사용자 가져오기
-
read_users_me: 현재 사용자 정보를 반환합니다.
- get_current_active_user: get_current_user를 이용 하여 활성화 된 사용자 정보를 가져옵니다.
- get_current_user: JWT 토큰을 디코딩하고 전달 된 사용자 정보를 가져옥 get_user를 이용 하여 사용자를 인증하고 가져옵니다.
- get_user: 사용자가 존재하는지 인증합니다.
코드
from datetime import datetime, timedelta
from typing import Union
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
# JWT 관련 패키지 import
from jose import JWTError, jwt
# 비밀번호 해싱을 위한 패키지 import
from passlib.context import CryptContext
from pydantic import BaseModel
from typing_extensions import Annotated
# JWT를 생성하고 검증할 때 사용되는 비밀 키
# openssl rand -hex 32 - 랜던한 값을 얻음
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
# JWT를 생성하고 검증할 때 사용되는 암고리즘
ALGORITHM = "HS256"
# JWT 토큰의 만료 시간 - 30분
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
# response_model로 사용할 Token 클래스 생성
class Token(BaseModel):
access_token: str
token_type: str
# 토큰을 디코딩할 때 사용할 TokenData 클래스 생성
class TokenData(BaseModel):
username: Union[str, None] = None
# pydantic의 BaseModel을 사용
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
### passlib 라이브러리를 사용하여 비밀번호 해싱을 위한 설정을 정의
# CryptContext: 암호 관련 작업을 위한 각종 설정을 캡슐화
# schemes=["bcrypt"]: bcrypt를 해싱 알고리즘으로 지정
# deprecated="auto": 더 이상 사용되지 않는 알고리즘이 자동으로 업데이트되도록 설정
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 인증 헤더에서 Bearer 토큰을 추출하는 데 사용 할 OAuth2PasswordBearer 인스턴스 생성
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
# 평문 패스워드와 해싱된 패스워드를 비교하는 함수
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
# 패스워드를 해싱하는 함수
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
# 사용자 인증 함수: 실 업무에서는 DB에서 가져오는 경우가 많습니다.
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
### data를 JWT로 생성하는 함수
# data: 토큰에 담을 데이터
# expires_delta: 토큰의 만료 시간을 설정
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
# 토큰의 만료 시간이 있다면 토큰의 만료 시간을 설정
if expires_delta:
expire = datetime.utcnow() + expires_delta
# 그렇지 않으면 기본 만료 시간을 15분으로 설정
else:
expire = datetime.utcnow() + timedelta(minutes=15)
# 토큰에 만료 시간을 추가
to_encode.update({"exp": expire})
# jwt.encode 함수를 사용하여 to_encode 딕셔너리를 JWT로 인코딩
# SECRET_KEY: 토큰을 서명의 비밀 키, ALGORITHM: 암호화 알고리즘
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# JWT를 사용하여 현재 사용자를 확인 함수
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
# credentials_exception 변수를 사용하여 예외 처리 설정
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# jwt.decode 함수를 사용하여 토큰을 디코딩하여 페이로드를 가져옴
# SECRET_KEY: 토큰을 서명의 비밀 키, ALGORITHM: 암호화 알고리즘
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# 페이로드에서 서브젝트(sub)를 가져와서 username 변수에 할당
# sub는 일반적으로 JWT에서 사용자를 식별하는 주체(subject)를 나타냅니다.
username: str = payload.get("sub")
if username is None:
raise credentials_exception
# username 값을 가지는 TokenData 클래스의 인스턴스를 생성
token_data = TokenData(username=username)
# 예외 발생 시 credentials_exception 예외를 발생시킴
except JWTError:
raise credentials_exception
# 토큰에서 가져온 username을 가진 사용자 정보를 가져옴
user = get_user(fake_users_db, username=token_data.username)
# 사용자 정보가 없다면 credentials_exception 예외를 발생시킴
if user is None:
raise credentials_exception
return user
# 현재 활성화 된 사용자 정보를 가져오는 함수
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)]
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# 토큰을 생성하는 함수(토큰을 생성하기 위해 인증 - 로그인 하는 함수)
@app.post("/token", response_model=Token)
async def login_for_access_token(
# 클라이언트가 전달한 username과 password를 OAuth2PasswordRequestForm을 사용하여 가져옴
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
):
# 사용자 인증
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
# 사용자 정보가 없다면, HTTPException을 발생시킵니다.
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# access_token의 만료 시간을 설정
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# 토큰을 생성하여 반환
# 'sub' key에 username을 저장, 만료 시간을 설정
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
# 'Bearer' 타입의 access_token을 반환
return {"access_token": access_token, "token_type": "bearer"}
# 현재 사용자 정보를 반환하는 함수
@app.get("/users/me/", response_model=User)
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)]
):
return current_user
# 현재 사용자의 아이템 정보를 반환하는 함수
@app.get("/users/me/items/")
async def read_own_items(
current_user: Annotated[User, Depends(get_current_active_user)]
):
return [{"item_id": "Foo", "owner": current_user.username}]
참고
SECRET_KEY는 랜덤한 값으로 설정해야 합니다.
터미널에서 아래의 명령어를 실행하여 랜덤한 값을 얻을 수 있습니다.
openssl rand -hex 32
윈도우는 openssl이 설치되어 있지 않으므로 아래의 사이트에서 다운로드 받아 설치 해야 합니다. https://code.google.com/archive/p/openssl-for-windows/downloads
테스트
위의 코드를 실행 하고 fastapi의 docs를 확인 합니다.
http://localhost:8000/docs
로그인 하여 토큰 받기(‘/token’)
먼저 로그인(인증 절차)을 하기 위해 /token
를 실행 해 보겠습니다.
username(johndoe)과 password(secret)를 입력하고 Execute
를 클릭 합니다.
Response body를 보겠습니다.
로그인이 완료되어 access_token
과 token_type
(Bearer 타입)이 출력되었습니다.
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNzAzMzI2NzQ0fQ.l3JA4p_0u0jrwBQ5EtbYvT438QIWSvloAfNzbfdX994",
"token_type": "bearer"
}
access_token
은 이전과 다르게 JWT 토튼 형태로 해싱되어 출력됩니다.
‘Authorization’를 이용하여 로그인 하기
이전의 코드(FastAPI - 16 (보안2 - QAuth2, Bearer))를 참조하여 Authorization
를 이용하여 로그인을 하겠습니다.
현재 유저(user) 가져오기(‘/users/me/’)
로그인인 된 상태에서 /users/me
를 실행 해 보겠습니다.
별도의 Parameter가 없으므로 Execute를 클릭합니다.
Curl을 보겠습니다.
curl -X 'GET' \
'http://localhost:8000/users/me/' \
-H 'accept: application/json' \
-H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNzAzMzI3MjE0fQ.jSLRpLd7dihlu1cwG4DaZ2ydhVkSUaj_73kUc6rcPGo'
-H 'Authorization: Bearer
뒤의 access_token이 JWT 형태(해싱)로 요청 되었습니다.
Response body을 보겠습니다.
{
"username": "johndoe",
"email": "johndoe@example.com",
"full_name": "John Doe",
"disabled": false
}
로그인을 하거나 서버의 Session에 정보를 저장하지 않고 JWT를 이용하여 인증을 하였습니다.
현재 유저(user)의 items 가져오기(‘/users/me/items/’)
추가로 현재 유저(user)의 items를 가져오는 api를 실행 해 보겠습니다. /users/me/items/
를 실행 합니다.
Curl을 보겠습니다.
curl -X 'GET' \
'http://localhost:8000/users/me/' \
-H 'accept: application/json' \
-H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNzAzMzI3MjE0fQ.jSLRpLd7dihlu1cwG4DaZ2ydhVkSUaj_73kUc6rcPGo'
/user/me/
와 동일한 방법으로 JWT 형태(해싱)로 요청 되었습니다.
Response body을 보겠습니다.
[
{
"item_id": "Foo",
"owner": "johndoe"
}
]
동일하게 JWT 토큰 만으로 인증을 하고 현재 유저의 items를 가져왔습니다.
해시태그: #fastapi #uvicorn #OAuth2 #Bearer #JWT #jwt.decode #jwt.encode
댓글남기기