[Flask] JWT를 이용한 인증기능 구현

Kyunghwan Ko·2022년 5월 25일
0

Web CS 지식

목록 보기
1/2

인트로

로그인을 통해 발급받은 JWT을 활용해서 다른 페이지로 넘어갔을때 인증된 사용자인지 주기적으로 확인하면서 서비스를 제공하는 Flask App을 만들것이다.

먼저 flask에서 이루어지는 jwt를 활용한 인증방식을 이해하기 위해 하나의 python파일로만 구현해 보았습니다.

가상환경을 사용하는 이유

파이썬 package(library)간 의존성 때문에 사용하는 것이 좋고
특정 패키지 버전을 업데이트 할때 다른 것들이 호환되지 않아 문제가 생기는 경우를 방지할 수 있습니다.

그리고 만약 작업환경이 바뀌더라도(ex. PC교체) 필요한 패키지들을 동일한 버전으로 설치해 작업할 수 있기 때문에
가상환경 사용을 추천합니다.

별도의 가상환경을 만들어서 다시 시도해보겠습니다.

(venv2) ~redash/redash/ $ 에서 진행

pip install --upgrade pip
pip install flask
pip install flask_sqlalchemy
pip install setuptools_rust
pip install jwt
sudo apt install sqlite3

$ python
>> from api import db2
>> db2.create_all()
>> exit()

$ sqlite3 test.db
sqlite> .tables
student
sqlite> .exit

$ python api.py
에러 발생

Address already in use Error#)

Error 발생)

Traceback (most recent call last):
File "api.py", line 23, in <module>
app2.run(debug=True)
File "/home/kyunghwan/venv2/lib/python3.6/site-packages/flask/app.py", line 922, in run
run_simple(t.cast(str, host), port, self, **options)
File "/home/kyunghwan/venv2/lib/python3.6/site-packages/werkzeug/serving.py", line 982, in run_simple
s.bind(server_address)
OSError: [Errno 98] Address already in use

Error 원인)

​ Flask 모듈이 비정상적으로 종료되어서 5000 포트를 잡고 있어 발생하는 에러입니다.

Error 해결)

​ flask가 생성한 process를 강제로 kill 합니다.
​ PID를 확인하기 위해서 lsof (list open files) 명령어 열려진 socket 파일들을 보는 명령어입니다.

#flask가 생성한 process를 확인
$ sudo lsof -i :5000
flask            9066 kibua    3u  IPv4  82012      0t0  TCP localhost:5000 (LISTEN)
python3     9069 kibua    3u  IPv4  82012      0t0  TCP localhost:5000 (LISTEN)
python3     9069 kibua    4u  IPv4  82012      0t0  TCP localhost:5000 (LISTEN)

#kill 명령어로 -SIGKILL(-9)를 PID에 전달
$ sudo kill -9 9066 9069

이후 다시

(venv2) ~redash/redash/ $ 에서 진행
$ python api.py

postman으로 요청보낼 때 충돌이 나기때문에

외부에 api.py를 다시 만들어서 db구성에도 email 열을 추가하고 test.db를 구성하겠다.
그리고 앞전에 python 서버를 실행시켰기 때문에 프로세스를 찾아서 kill해야 합니다.

(venv2) ~$ 에서 진행

$ python
>> from api import db2
>> db2.create_all()
>> exit()

$ sqlite3 test.db
sqlite> .tables
student
sqlite> .exit

#flask가 생성한 process를 확인
$ sudo lsof -i :5000
flask            9066 kibua    3u  IPv4  82012      0t0  TCP localhost:5000 (LISTEN)
python3     9069 kibua    3u  IPv4  82012      0t0  TCP localhost:5000 (LISTEN)
python3     9069 kibua    4u  IPv4  82012      0t0  TCP localhost:5000 (LISTEN)

#kill 명령어로 -SIGKILL(-9)를 PID에 전달
$ sudo kill -9 9066 9069

$ python api.py

위 과정을 통해 api.py파일을 실행시킨다.

이후 Postman을 통해 Rest api를 확인해보겠습니다.

# student 리스트에 추가하기
POST   http://127.0.0.1:5000/student 
{"name": "kyunghwan1", "email": "ko1@naver.com", "password": "1234"}

Response-> { "message": "New stduent created!" }↵

# student 리스트 보기 (지금까지 추가한 student가 1명이므로 1개만 나온다.)
GET    http://127.0.0.1:5000/student 

Response ->
{
  "students": [
    {
      "email": "ko1@naver.com", 
      "name": "kyunghwan1", 
      "password": "sha256$EBzbyAgwmvbcQJQK$bf0f879995d3f997964ade5b00fce9d81d68e6589f7c812937b30671a58d767b", 
      "public_id": "b19415c2-4c1f-473f-909e-4ade1725e841", 
      "role": "user"
    }
  ]
}↵

# 특정 public_id를 가진 한 명의 student 정보 조회
GET  http://127.0.0.1:5000/student/b19415c2-4c1f-473f-909e-4ade1725e841
Response -> 위와 동일

<로그인 - jwt토큰 사용한 인증방식>

# login시 기존에 student 리스트에 등록 되어있는 사람의 username, password를 입력하면 token이 발행된다.
GET  http://127.0.0.1:5000/login

Authorization - Type: Basic Auth | Username: kyunghwan1 | Password: 1234

Response ->
{
  "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJwdWJsaWNfaWQiOiJiMTk0MTVjMi00YzFmLTQ3M2YtOTA5ZS00YWRlMTcyNWU4NDEiLCJleHAiOjE2MjgxNDM5MjJ9.tciuF_OvxefLzDyz8I-U_KjysFuVSYRadnBteG7sOP4"
}↵
# @token_required 이므로 py파일에서 정의한 형식으로 header를 구성, authorizationed되었다면 student 리스트 보기
GET   http://127.0.0.1:5000/student

Headers - Key: x-access-token  |  Value: eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJwdWJsaWNfaWQiOiJiMTk0MTVjMi00YzFmLTQ3M2YtOTA5ZS00YWRlMTcyNWU4NDEiLCJleHAiOjE2MjgxNDM5MjJ9.tciuF_OvxefLzDyz8I-U_KjysFuVSYRadnBteG7sOP4
<만약 token이 유효하다면>
student 리스트를 보여준다.

<만약 token이 유효하지 않다면>
{"message": "Token is invalid!"}↵

완성된 api.py파일

# start add from ko
from flask import Flask, jsonify, request, jsonify, make_response
from flask_sqlalchemy import SQLAlchemy
import uuid
from werkzeug.security import generate_password_hash, check_password_hash
import jwt
import datetime
from functools import wraps

app2 = Flask(__name__)
app2.config['SECRET_KEY'] = 'thisissecret'
app2.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////home/kyunghwan//test.db'
db2 = SQLAlchemy(app2)

class Student(db2.Model):
    id = db2.Column(db2.Integer, primary_key=True)
    public_id = db2.Column(db2.String(50), unique=True)
    name = db2.Column(db2.String(50))
    password = db2.Column(db2.String(80))
    email = db2.Column(db2.String(100))
    role = db2.Column(db2.String(30)) #superuser, admin, user
# token 유효성 검사
def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = None

        if 'x-access-token' in request.headers:
            token = request.headers['x-access-token']

        if not token:
            return jsonify({'message' : 'Token is missing!'}), 401

        try: 
            data = jwt.decode(token, app2.config['SECRET_KEY'], algorithms='HS256')
            current_student = Student.query.filter_by(public_id=data['public_id']).first()
        except:
            return jsonify({'message' : 'Token is invalid!'}), 401

        return f(current_student, *args, **kwargs)

    return decorated

@app2.route('/student', methods=['GET'])
@token_required
def get_all_students(current_student):

    if current_student.role != 'user':
        return jsonify({'message': 'You do not have permission!'})

    students = Student.query.all()

    output = []

    for student in students:
        student_data = {}
        student_data['public_id'] = student.public_id
        student_data['name'] = student.name
        student_data['password'] = student.password
        student_data['email'] = student.email
        student_data['role'] = student.role
        output.append(student_data)

    return jsonify({'students' : output})

@app2.route('/student/<public_id>', methods=['GET'])
@token_required
def get_one_student(current_student, public_id):

    student = Student.query.filter_by(public_id=public_id).first()
    if not student:
        return jsonify({'message': 'No stduent found!'})
    
    student_data = {}
    student_data['public_id'] = student.public_id
    student_data['name'] = student.name
    student_data['password'] = student.password
    student_data['email'] = student.email
    student_data['role'] = student.role

    return jsonify({'student': student_data})

@app2.route('/student', methods=['POST'])
def create_student(current_student):
    data = request.get_json()
    hashed_password = generate_password_hash(data['password'], method='sha256')
    new_student = Student(public_id=str(uuid.uuid4()), name=data['name'], password=hashed_password, email=data['email'], role='user')
    db2.session.add(new_student)
    db2.session.commit()

    return jsonify({'message': 'New stduent created!'})

# role을 바꿔주는 함수?
@app2.route('/student/<public_id>', methods=['PUT'])
@token_required
def promote_student(current_student, public_id):
    #student = Student.query.filter_by(public_id=public_id).first() 
    #db2.session.commit()
    return '<h1>promote_student - superuser, admin, user</h1>'

@app2.route('/student/<public_id>', methods=['DELETE'])
@token_required
def delete_student(current_student, public_id):
    student = Student.query.filter_by(public_id=public_id).first()

    if not student:
        return jsonify({'message': 'No user Found!'})

    db2.session.delete(student)
    db2.session.commit()
    return jsonify({'message': 'The student has been deleted'})

@app2.route('/login')
def login():
    auth = request.authorization
    
    if not auth or not auth.username or not auth.password: #인증되지 않은 사용자일 때
        return make_response('Could not verify', 401, {'WWW-Authenticate' : 'Basic realm="Login required!"'})

    student = Student.query.filter_by(name=auth.username).first()
    
    if not student: #인증은 됬지만 사용자가 존재하지 않을 경우
        return make_response('Could not verify', 401, {'WWW-Authenticate' : 'Basic realm="Login required!"'})
    
    if check_password_hash(student.password, auth.password):
        token = jwt.encode({'public_id' : student.public_id, 'exp' : datetime.datetime.utcnow() + datetime.timedelta(seconds=100)}, app2.config['SECRET_KEY'], algorithm='HS256')
        #seconds ms 100ms -> 1초
        #minutes
        #return jsonify({'token': jwt.decode(token, app2.config['SECRET_KEY'], algorithms='HS256')}) #DB에 token column만들어서 추가하기 -> DB에 저장된 token조회해서 유효성 검사 효율적으로 해보자
        return jsonify({'token': token})

    return make_response('Could not verify', 401, {'WWW-Authenticate' : 'Basic realm="Login required!"'}) #인증됬고, 사용자가 존재하지만, password가 틀렷을 경우

if __name__ == '__main__':
    app2.run(debug=True)

# end add from ko
profile
부족한 부분을 인지하는 것부터가 배움의 시작이다.

0개의 댓글