discord_intro_quiz_bot/web/auth.py

95 lines
3.9 KiB
Python
Raw Normal View History

2024-08-12 08:11:32 +00:00
from flask import request, session, redirect, url_for, render_template, jsonify
2024-08-03 10:29:46 +00:00
import bcrypt
import secrets
from db import get_db
from models import Account, ActiveSession
import logging
2024-08-12 08:11:32 +00:00
from datetime import datetime, timedelta
2024-08-03 10:29:46 +00:00
logger = logging.getLogger(__name__)
2024-08-12 08:11:32 +00:00
# セッションタイムアウトの設定3分
SESSION_TIMEOUT = timedelta(minutes=5)
2024-08-03 10:29:46 +00:00
def login_required(f):
def decorated_function(*args, **kwargs):
if not check_login(session, logger):
return redirect(url_for('login_route'))
return f(*args, **kwargs)
decorated_function.__name__ = f.__name__
return decorated_function
def check_login(session, logger):
if 'logged_in' not in session or not session['logged_in']:
logger.info("セッションにログイン情報がありません")
return False
2024-08-12 08:11:32 +00:00
# セッションタイムアウトのチェック
last_access_time = session.get('last_access_time')
if last_access_time:
last_access_time = datetime.strptime(last_access_time, '%Y-%m-%d %H:%M:%S.%f')
if datetime.now() - last_access_time > SESSION_TIMEOUT:
logger.info("セッションがタイムアウトしました")
logout() # セッションをクリアし、ログアウト
return False
2024-08-03 10:29:46 +00:00
db = get_db()
active_session_count = db.query(ActiveSession).filter_by(session_id=session['session_id']).count()
if active_session_count != 1:
logger.info(f"セッションID {session['session_id']} のアクティブセッション数: {active_session_count}")
return False
2024-08-12 08:11:32 +00:00
# 最後のアクセス時間を更新
session['last_access_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
2024-08-03 10:29:46 +00:00
return True
def login():
if request.method == 'POST':
user_id = request.form['user_id']
password = request.form['password'].encode('utf-8')
db = get_db()
account = db.query(Account).filter_by(user_id=user_id).first()
2024-08-12 08:11:32 +00:00
2024-08-03 10:29:46 +00:00
if account and bcrypt.checkpw(password, account.password.encode('utf-8')):
2024-08-12 08:11:32 +00:00
# 既存のアクティブセッションをチェック
existing_session = db.query(ActiveSession).filter_by(user_id=account.id).first()
if existing_session:
# セッションのタイムアウトを確認
last_active = existing_session.last_active
if last_active and datetime.now() - last_active > SESSION_TIMEOUT:
# セッションがタイムアウトしている場合、削除
db.delete(existing_session)
db.commit()
else:
return "他のユーザーが既にログインしています。", 403
# 新しいセッションの作成
2024-08-03 10:29:46 +00:00
session['logged_in'] = True
2024-08-12 08:11:32 +00:00
session['session_id'] = secrets.token_hex(16)
session['last_access_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
new_session = ActiveSession(session_id=session['session_id'], user_id=account.id, last_active=datetime.now())
2024-08-03 10:29:46 +00:00
db.add(new_session)
db.commit()
return redirect(url_for('index_route'))
else:
return "Invalid credentials", 403
return render_template('login.html')
def logout():
db = get_db()
2024-08-12 08:11:32 +00:00
db.query(ActiveSession).filter_by(session_id=session.get('session_id')).delete()
2024-08-03 10:29:46 +00:00
db.commit()
session.clear()
return redirect(url_for('login_route'))
def create_account():
user_id = secrets.token_hex(8)
password = secrets.token_hex(16)
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
db = get_db()
new_account = Account(user_id=user_id, password=hashed_password.decode('utf-8'))
db.add(new_account)
db.commit()
return jsonify({'message': 'アカウントが作成されました', 'user_id': user_id, 'password': password})