🧐 개요
이번 포스트에서는 FastAPI 환경에서 OAuth 인증을 구현하는 과정을 설명합니다.
Spotify등의 공공 API 서비스에서는 API 요청을 전송하는 과정에서 Bearer Token 등의 엑세스 토큰의 발급을 요구합니다. 또한 엑세스 토큰을 발급하기 위해서는 계정 생성 시에 발급되는 access token 및 secret token을 요구합니다.
위키백과의 OAuth 설명
OAuth("Open Authorization")는 인터넷 사용자들이 비밀번호를 제공하지 않고 다른 웹사이트 상의 자신들의 정보에 대해 웹사이트나 애플리케이션의 접근 권한을 부여할 수 있는 공통적인 수단으로서 사용되는, 접근 위임을 위한 개방형 표준이다. 이 매커니즘은 여러 기업들에 의해 사용되는데, 이를테면 아마존, 구글, 페이스북, 마이크로소프트, 트위터가 있으며 사용자들이 타사 애플리케이션이나 웹사이트의 계정에 관한 정보를 공유할 수 있게 허용한다.
WEB 프레임워크 기반의 서비스의 경우 로그인을 통한 '회원 인증'을 통해 대상을 분류하여 서비스를 제공할 수 있습니다(LOL등의 게임 클라이언트에 아이디와 비밀번호를 입력해 로그인하면 게임 내에서도 해당 계정으로 접속하는 것이 예시가 될 수 있겠습니다). 하지만 cURL 기반의 request & response로 통신하는 API 서비스의 경우 로그인 기능을 생략한 형태로 동일한 수준의 인증을 진행해야 합니다. 따라서 아래와 같은 과정을 통해 인증 절차를 간략화할 수 있습니다.
'Access Token' decoding 수행
-> 내부 정보 확인 (Username, Scope, Expire Timestamp 등) 및 권한 확인
-> 권한에 따른 후속 작업 수행 (서비스 제공, 안내 사항 제공 등)
[예시] Spotify API의 OAuth 인증 절차
Spotify API는 회원 가입 후 어플리케이션을 생성하면 자동으로 Client ID와 Client secret 코드를 발급합니다. 해당 코드는 OAuth의 코드 생성 알고리즘 중 하나를 통해 어플리케이션 생성 시점에 (유저의 회원 정보를 기반으로)랜덤으로 생성됩니다. 일반적으로 생성된 Client ID 및 Client secret 정보는 추후 수정되지 않는 이상 유통기한(expire date)이 없는 인증 데이터입니다. AWS 서비스에서도 S3 스토리지 등의 서비스에 대한 접근 권한을 부여하기 위해 Client ID 정보와 Client secret 정보를 제공했었죠.
해당 정보를 통해 직접 권한을 확인하기도 하지만, 일반적으로는 해당 정보들을 기반으로 유통기한이 있는 Access Token을 발급하여 이 코드를 통해 인증 및 권한 확인을 수행합니다. 발급한 Token을 포함한 header을 전송하면
Spotify API의 경우 어플리케이션 및 컨텐츠와 관련된 다양한 엔드포인트를 가지고 있습니다. Request의 요청 방식(GET, POST, DELETE) 및 세부 작업 내용(scope)에 따라 발급 받아야 할 키의 형태 및 발급 방식이 상이합니다.
- [POST/DELETE] : Authorized Key 발급 (scope 기반, 3600s) -> Access Token 발급 (Authorized Key 기반, 3600s)
- [GET] : Access Token 발급 (3600s)
일반적인 데이터 요청 작업의 경우에는 GET 메서드를 사용하기 때문에 Auth 키 발급이 필요하지 않지만, 플레이리스트를 생성 또는 조작하는 등의 작업은 '플레이리스트의 공개/비공개 여부' 등의 scope 권한이 추가로 필요하기 때문에 Auth Key 발급이 선행되어야 합니다. 이러한 차이점들이 있지만 중요한 포인트는, 공통적으로 모든 엔드포인트가 Access Token의 발급을 요구한다는 것입니다.
Token을 분석해 사용자 권한을 파악하는 OAuth 인증 방식을 통해 Spotify API 서버는 (로그인/로그아웃 기능 없이) 유저의 request 요청 한줄만으로도 해당 유저가 해당 파트에 대한 권한을 가진 유저인가를 판단하고, 그에 따른 후속 작업을 수행할 수 있게 됩니다.
🛠️ FastAPI에서 OAuth 인증 구현하기
이제 FastAPI 서버에서 간단한 OAuth 인증 방식을 구현해보도록 하겠습니다.
구현할 기능
- Client ID 및 Client Secret 코드 조회(반환) 기능
- Access Token 발급 기능 (Client ID, Client Secret 기반)
- Access Token 확인 기능 == 테스트용
사전 준비
OAuth 인증 기능을 구현하기 위해서는 사용자의 인증 정보를 저장하는 Database 또는 환경설정 파일이 필요합니다. 해당 예제에서는 간단하게 SQLite 데이터베이스를 파일로 만들고 임의의 회원 정보를 만들어 테스트를 수행하도록 하겠습니다.
def execute_query(dir: str, QUERY: str):
import sqlite3
# open conn
conn = sqlite3.connect(dir)
cursor = conn.cursor()
# execute queries
cursor.execute(QUERY)
conn.commit()
# close conn
conn.close()
if __name__ == "__main__":
import secrets, os
# 파일 디렉토리 및 파일명은 환경에 맞게 수정해주세요.
dir = os.path.dirname(os.path.abspath(__file__)) + "/authentication.db"
QUERY_WAL = "PRAGMA journal_mode=WAL;"
QUERY_CREATE = f"""CREATE TABLE IF NOT EXISTS user (
usermane VARCHAR(20),
password VARCHAR(20),
client_id VARCHAR(20),
client_secret VARCHAR(40)
)"""
# 사용자 정보 부분은 사용하실 username 및 password로 수정해주세요.
client_id = secrets.token_urlsafe(20)
client_secret = secrets.token_urlsafe(40)
QUERY_INSERT = f"""
INSERT INTO user
VALUES ('<username>', '<password>', '{client_id}', '{client_secret}')
"""
execute_query(dir=dir, QUERY=QUERY_WAL)
execute_query(dir=dir, QUERY=QUERY_CREATE)
execute_query(dir=dir, QUERY=QUERY_INSERT)
해당 스크립트에서 수행하고 있는 동작은 다음과 같습니다.
- SQLite Database의 동시성을 향상시키기 위해 WAL 모드를 적용합니다. 기본적으로 SQLite와 같은 파일 형식의 데이터베이스는 READ 동작과 WRITE 동작을 동시에 수행할 수 없습니다. 따라서 동시 사용을 필요로 하는 경우 journal mode를 적절히 수정하여 동시성을 향상시키는 작업을 진행해주어야 합니다.
- username, password, client_id, client_secret 등의 사용자 정보를 저장하는 테이블을 생성합니다. 해당 테이블을 통해 OAuth 인증을 수행하게 됩니다.
- 테스트를 위한 임의의 Dataset을 생성합니다. Python의 Secrets 모델을 사용하면 원하는 길이의 token을 랜덤으로 생성할 수 있습니다.
전체 스크립트
from fastapi import APIRouter # set router
from fastapi import Depends # set dependency
from fastapi import HTTPException, status # raise exception errors
from fastapi.security import OAuth2PasswordBearer # create scheme
import jwt # decode token with params
from pydantic import BaseModel # create cliend credential model
from datetime import datetime, timedelta # (set/define) expire date
import sqlite3 # read database
import os # file dir searching
from typing import Optional # set optional variables
# cliend credential model
class ClientCredentials(BaseModel):
client_id: str
client_secret: str
# user model
class User:
def __init__(self, username: str):
self.username = username
# set router
router = APIRouter()
# token
SECRET_KEY = "mysecretkey"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
# sqlite database dir
DATABASE_FILE = f"{os.path.dirname(os.path.abspath(__file__))}/../sqlite/authentication.db"
# create scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# return client id & client secret to user
@router.post("/key")
async def return_keys(username:str, password:str):
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
query = f"SELECT client_id, client_secret FROM user WHERE username = ? AND password = ?"
cursor.execute(query, (username, password))
result = cursor.fetchone()
if result:
return {"client_id": result[0], "client_secret": result[1]}
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# create access token
def create_access_token(data: dict, expires_delta: Optional[int] = None):
# copy data
to_encode = data.copy()
# set expire date(timestamp)
if expires_delta:
expire = datetime.now() + expires_delta
to_encode.update({"exp": expire})
else:
expire = datetime.now() + timedelta(minutes=15)
to_encode.update({"exp": expire})
# encode data == create access token
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# create & return access token
@router.post("/token")
async def login_for_access_token(form_data: ClientCredentials):
# define user information
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
query = f"SELECT username FROM user WHERE client_id = ? AND client_secret = ?"
cursor.execute(query, (form_data.client_id, form_data.client_secret))
username = cursor.fetchone()[0]
conn.close()
# create access token
if username:
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# raise exception error
raise HTTPException(status_code=401, detail="Invalid credentials")
# decode access token
def decode_auth(token: str):
try:
# decode token with secret key and algorithm
decoded_credentials = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = decoded_credentials["sub"]
expire_date = decoded_credentials["exp"]
return {"username": username, "expire_date": expire_date}
# raise exception error
except Exception as E:
raise HTTPException(status_code=401, detail="Invalid authentication header")
def decode_access_token(token: str = Depends(oauth2_scheme)):
# decode token & get infos
user_info = decode_auth(token)
username, expire_date = user_info["username"], user_info["expire_date"]
# read information
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
query = "SELECT * FROM user WHERE username = ?"
cursor.execute(query, (username,))
user = cursor.fetchone()
conn.close()
# raise exception error (if credential invalid)
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
# raise exception error (if expired)
expire_datetime = datetime.fromtimestamp(expire_date)
if expire_datetime <= datetime.now():
raise HTTPException(status_code=401, detail="Token expired")
return username
# return user infos
@router.get("/users/me")
async def read_users_me(current_user: User = Depends(decode_access_token)):
return current_user
스크립트의 가독성 및 유지 보수를 위해서는 Dependency 부분과 SQLite 부분 등의 라이브러리를 별도의 디렉토리로 옮겨 호출하는 방식으로 사용해야 합니다. 하지만(제가 미처 못 정리하기도 했고..) 해당 스크립트에서는 단일 endpoint 스크립트에 모든 함수와 클래스, 라우터를 할당하는 방식으로 설명하고자 합니다.
이제 스크립트의 내용을 하나씩 확인해보도록 하겠습니다.
기본 설정
# define SECRET KEY, ALGORITHM and ACCESS TOKEN EXPIRE MINUTES
SECRET_KEY = "mysecretkey"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
# define scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# sqlite database dir
# 파일 디렉토리는 앞서 생성한 SQLite 파일 디렉토리로 수정해주세요.
DATABASE_FILE = f"{os.path.dirname(os.path.abspath(__file__))}/../sqlite/authentication.db"
SECRET KEY 및 ALGORITHM 정보는 추후 Access Token을 생성하고 decoding을 수행할 때 사용할 파라미터입니다. ALGORITHM은 여러 종류가 있지만 해당 스크립트에서는 "HS256" 알고리즘을 사용합니다(SECRET KEY는 원하시는 텍스트를 입력해주시면 됩니다). 일반적으로 토큰의 유통기한은 3600초(60분)으로 설정되기 때문에 해당 예제에서도 60분으로 설정하였습니다.
[엔드포인트] 사용자의 Client ID 및 Client Secret 정보 반환하기
@router.get("/key")
async def return_keys(username:str, password:str):
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
query = f"SELECT client_id, client_secret FROM user WHERE username = ? AND password = ?"
cursor.execute(query, (username, password))
result = cursor.fetchone()
if result:
return {"client_id": result[0], "client_secret": result[1]}
raise HTTPException(status_code=401, detail="Invalid credentials")
해당 엔드포인트에서는 사용자의 username 및 passowrd 정보를 입력받아 SQLite 데이터베이스에 있는 Client ID 및 Client Secret 정보를 반환하는 역할을 수행합니다. 만일 데이터베이스 검색 결과에 없을 경우 401 ERROR을 발생시킵니다.
curl \
'http://localhost:8000/key?username=<username>&password=<password>'
로컬 환경에서 해당 엔드포인트를 테스트하실 경우 위와 같이 cURL을 발송하실 수 있습니다.
대시보드를 통해서 테스트한 모습입니다. Client ID 및 Client Secret 정보를 딕셔너리의 형태로 반환하고 있습니다.
[엔드포인트] 사용자에게 Access Token 발급하기
class ClientCredentials(BaseModel):
client_id: str
client_secret: str
def create_access_token(data: dict, expires_delta: Optional[int] = None):
# copy data
to_encode = data.copy()
# set expire date(timestamp)
if expires_delta:
expire = datetime.now() + expires_delta
to_encode.update({"exp": expire})
else:
expire = datetime.now() + timedelta(minutes=15)
to_encode.update({"exp": expire})
# encode data == create access token
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# create & return access token
@router.post("/token")
async def login_for_access_token(form_data: ClientCredentials):
# define user information
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
query = f"SELECT username FROM user WHERE client_id = ? AND client_secret = ?"
cursor.execute(query, (form_data.client_id, form_data.client_secret))
username = cursor.fetchone()[0]
conn.close()
# create access token
if username:
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# raise exception error
raise HTTPException(status_code=401, detail="Invalid credentials")
해당 스크립트에서는 엔드포인트에서 Client ID 및 Client Secret 데이터의 유효성을 검사한 후 Access Token을 생성 및 반환하는 로직을 수행하고 있습니다. Python의 jwt 모듈을 통해 Encoding 알고리즘과 Secret Key를 지정하고, 이를 기반으로 일정한 유효기간을 지니는 Access Key를 생성(Encoding)합니다.
* 해당 로직을 GET 대신 POST 메서드를 사용한 이유는, (해당 예제에서는 구현하지 않았지만) 토큰 발급 과정에서 추가적인 파라미터를 입력할 수도 있기 때문입니다. 엔드포인트 별 세부 권한을 명시하는 'Scope'나 인증 방식'Authentication Type' 등을 추가적으로 설정하는 경우 입력 변수의 확장을 용이하게 하기 위해 POST 메서드를 사용할 수 있습니다.
** 스크립트 상단에 Pydentic 모듈의 BaseModel 메서드를 통해 Client Credentials를 생성하였는데, 이렇게 한 이유는 POST 요청에서 Client ID 및 Client Secret 정보를 입력받아 사용하기 위해서입니다(딕셔너리에서 dict["method"] 방식으로 데이터를 호출하는 것을 대신하는 것입니다). 해당 방식을 통해 데이터를 입력받으면 잘못된 규격으로 데이터가 전송될 경우의 ERROR 호출이 쉽고, 딕셔너리(JSON) 내부의 변수를 Casting(데이터 타입 변환) 없이 바로 사용할 수 있어 코드 작성이 편리해집니다.
curl -X 'POST' \
'http://localhost:8000/token' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"client_id": "<client_id>",
"client_secret": "<client_secret>"
}'
로컬 환경에서 해당 엔드포인트를 테스트하실 경우 위와 같이 cURL을 발송하실 수 있습니다.
대시보드를 통해서 테스트한 모습입니다. 정상적으로 인증이 수행되어 Bearer 타입의 Access Token이 발급되었습니다.
[엔드포인트] 사용자 인증 수행하기 == 사용자가 제출한 header의 토큰 정보 해석하기
# user model
class User:
def __init__(self, username: str):
self.username = username
# decode access token
def decode_auth(token: str):
try:
# decode token with secret key and algorithm
decoded_credentials = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = decoded_credentials["sub"]
expire_date = decoded_credentials["exp"]
return {"username": username, "expire_date": expire_date}
# raise exception error
except Exception as E:
raise HTTPException(status_code=401, detail="Invalid authentication header")
def decode_access_token(token: str = Depends(oauth2_scheme)):
# decode token & get infos
user_info = decode_auth(token)
username, expire_date = user_info["username"], user_info["expire_date"]
# read information
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
query = "SELECT * FROM user WHERE username = ?"
cursor.execute(query, (username,))
user = cursor.fetchone()
conn.close()
# raise exception error (if credential invalid)
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
# raise exception error (if expired)
expire_datetime = datetime.fromtimestamp(expire_date)
if expire_datetime <= datetime.now():
raise HTTPException(status_code=401, detail="Token expired")
return username
# return user infos
@router.get("/users/me")
async def read_users_me(current_user: User = Depends(decode_access_token)):
return current_user
해당 스크립트에서는 사용자가 header 내부에 전송한 Access Token 정보를 해석하여 토큰의 유효성을 검증한 후 후속 조치를 수행하고 있습니다.
FastAPI 모듈의 Depends 메서드를 통해 객체를 호출하는 과정에서 사용할 의존성 함수를 선정할 수 있습니다(Depends 메서드로 함수를 지정하면, 객체를 반환받는 과정에서 해당 함수를 사용한 최종 반환값을 객체로 받게 됩니다). 앞의 사전 설정에서 정의한 oauth2_scheme 함수를 사용하여 Header 내용에 포함된 Access Token 정보를 추출하고, 해당 토큰을 다시 jwt 모듈을 통해 Decoding하여 사용자 정보 및 토큰의 만료 시점을 확인하고 있습니다.
해당 스크립트에서는 단순 검증만을 수행하고 있으므로 username 정보를 단순 반환하지만, 실제로는 해당 로직을 기반으로 다양한 기능을 엔드포인트에 부여할 수 있을 것입니다.
curl \
--request GET \
--url 'localhost:8000/users/me' \
--header 'Authorization: bearer <access_token>'
로컬 환경에서 해당 엔드포인트를 테스트하실 경우 위와 같이 cURL을 발송하실 수 있습니다.
FastAPI Docs에서는 header 정보를 입력하지 못하는 듯 하여 cURL 명령어를 직접 수행하였습니다. GET 명령어는 일반적으로 'GET'을 명시할 필요가 없지만, header 정보를 추가로 입력하기 위해 표준 방식으로 테스트를 수행하였습니다. 예시에서는 인증에 성공하여 사용자 이름을 반환하고 있습니다.
📝 마치며
API 서버 운용에는 고려해야 할 다양한 요소들이 내재되어 있습니다. 네트워크 트래픽과 서버 상태 정보(리소스, 로그 등), 인증 방식 및 권한 문제 등 깊은 수준으로 고려해야 할 부분이 많더군요. 모든 부분을 한 번에 다룰 수는 없겠지만, 부족했던 요소들을 하나씩 보충하다 보면 어제보다는 더 높은 완성도를 갖춘 API 서비스를 구축할 수 있겠지, 하는 생각이 드는 과정이었습니다.
'백엔드 이모저모 > FastAPI' 카테고리의 다른 글
[FastAPI] 미들웨어(MiddleWare) 구성을 통한 시스템 로그 관리 - 작성중 (1) | 2023.12.29 |
---|
발자취를 로그처럼 남기고자 하는 초보 개발자의 블로그