discord_intro_quiz_bot/web/auth.py

95 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import request, session, redirect, url_for, render_template, jsonify
import bcrypt
import secrets
from db import get_db
from models import Account, ActiveSession
import logging
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
# セッションタイムアウトの設定3分
SESSION_TIMEOUT = timedelta(minutes=5)
def login_required(f):
def decorated_function(*args, **kwargs):
if not check_login(session, logger):
return redirect(url_for('login_route'))
return f(*args, **kwargs)
decorated_function.__name__ = f.__name__
return decorated_function
def check_login(session, logger):
if 'logged_in' not in session or not session['logged_in']:
logger.info("セッションにログイン情報がありません")
return False
# セッションタイムアウトのチェック
last_access_time = session.get('last_access_time')
if last_access_time:
last_access_time = datetime.strptime(last_access_time, '%Y-%m-%d %H:%M:%S.%f')
if datetime.now() - last_access_time > SESSION_TIMEOUT:
logger.info("セッションがタイムアウトしました")
logout() # セッションをクリアし、ログアウト
return False
db = get_db()
active_session_count = db.query(ActiveSession).filter_by(session_id=session['session_id']).count()
if active_session_count != 1:
logger.info(f"セッションID {session['session_id']} のアクティブセッション数: {active_session_count}")
return False
# 最後のアクセス時間を更新
session['last_access_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
return True
def login():
if request.method == 'POST':
user_id = request.form['user_id']
password = request.form['password'].encode('utf-8')
db = get_db()
account = db.query(Account).filter_by(user_id=user_id).first()
if account and bcrypt.checkpw(password, account.password.encode('utf-8')):
# 既存のアクティブセッションをチェック
existing_session = db.query(ActiveSession).filter_by(user_id=account.id).first()
if existing_session:
# セッションのタイムアウトを確認
last_active = existing_session.last_active
if last_active and datetime.now() - last_active > SESSION_TIMEOUT:
# セッションがタイムアウトしている場合、削除
db.delete(existing_session)
db.commit()
else:
return "他のユーザーが既にログインしています。", 403
# 新しいセッションの作成
session['logged_in'] = True
session['session_id'] = secrets.token_hex(16)
session['last_access_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
new_session = ActiveSession(session_id=session['session_id'], user_id=account.id, last_active=datetime.now())
db.add(new_session)
db.commit()
return redirect(url_for('index_route'))
else:
return "Invalid credentials", 403
return render_template('login.html')
def logout():
db = get_db()
db.query(ActiveSession).filter_by(session_id=session.get('session_id')).delete()
db.commit()
session.clear()
return redirect(url_for('login_route'))
def create_account():
user_id = secrets.token_hex(8)
password = secrets.token_hex(16)
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
db = get_db()
new_account = Account(user_id=user_id, password=hashed_password.decode('utf-8'))
db.add(new_account)
db.commit()
return jsonify({'message': 'アカウントが作成されました', 'user_id': user_id, 'password': password})