discord_intro_quiz_bot/web/app.py

353 lines
13 KiB
Python
Raw Normal View History

2024-08-03 10:29:46 +00:00
import os
import sys
import traceback
import secrets
import random
import requests
from flask import Flask, session, redirect, url_for, g, request, render_template, jsonify, flash, send_from_directory
2024-08-03 10:29:46 +00:00
import logging
from datetime import timedelta
from db import get_db, close_connection, initialize_db, reset_active_sessions
from auth import login_required, check_login, login, logout, create_account
from handlers import handle_file_upload, delete_file, play_file, join_channel, leave_channel, index, get_files, get_quiz_master, update_quiz_master, add_quiz_master, delete_quiz_master, get_unique_keywords
from init_account import create_initial_account
from models import ActiveSession, QuizMaster
from sqlalchemy.sql import func
import csv
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'bucket'
app.config['SECRET_KEY'] = secrets.token_hex(16)
app.config['DATABASE'] = '/web/data/database.db'
app.config['BOT_API_URL'] = 'http://bot:80'
app.config['SESSION_TIMEOUT_MINUTES'] = 30
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
drawn_quiz_ids = set()
if not os.path.exists(app.config['UPLOAD_FOLDER']):
os.makedirs(app.config['UPLOAD_FOLDER'])
logger.info(f"{app.config['UPLOAD_FOLDER']}ディレクトリを作成しました")
with app.app_context():
create_initial_account(app.config['DATABASE'])
reset_active_sessions(logger)
initialize_db(logger)
@app.teardown_appcontext
def close_db_connection(exception):
close_connection(exception)
@app.before_request
def before_request():
if 'session_id' not in session:
session['session_id'] = secrets.token_hex(16)
logger.info(f"新しいセッションIDが生成されました: {session['session_id']}")
#else:
#logger.info(f"既存のセッションIDが存在します: {session['session_id']}")
session.permanent = True
app.permanent_session_lifetime = timedelta(minutes=app.config['SESSION_TIMEOUT_MINUTES'])
session.modified = True
db = get_db()
if session.get('logged_in'):
active_session = db.query(ActiveSession).filter_by(session_id=session['session_id']).first()
if active_session:
active_session.last_active = func.now()
db.commit()
elif request.endpoint not in ('login_route', 'static'):
return redirect(url_for('login_route'))
@app.route('/login', methods=['GET', 'POST'])
def login_route():
return login()
@app.route('/logout', methods=['POST'])
def logout_route():
return logout()
@app.route('/create_account', methods=['POST'])
def create_account_route():
return create_account()
@app.route('/')
@login_required
def index_route():
return index()
@app.route('/upload', methods=['POST'])
@login_required
def upload_file_route():
return handle_file_upload()
@app.route('/uploaded_files', methods=['GET'])
@login_required
def uploaded_files_route():
return render_template('files.html')
@app.route('/uploaded_files_list', methods=['GET'])
@login_required
def uploaded_files_list_route():
files = os.listdir(app.config['UPLOAD_FOLDER'])
return jsonify(files)
@app.route('/used_files_list', methods=['GET'])
@login_required
def used_files_list_route():
db = get_db()
used_files = db.query(QuizMaster.full_audio_name, QuizMaster.intro_audio_name).all()
return jsonify([file for sublist in used_files for file in sublist])
@app.route('/delete/<filename>', methods=['POST'])
@login_required
def delete_file_route(filename):
return delete_file(filename)
@app.route('/play/<filename>', methods=['POST'])
@login_required
def play_file_route(filename):
volume = request.json.get('volume', 100)
return play_file(filename, volume)
@app.route('/stop_audio', methods=['POST'])
@login_required
def stop_audio_route():
response = requests.post(f"{app.config['BOT_API_URL']}/stop_audio")
return jsonify(response.json())
@app.route('/join', methods=['POST'])
@login_required
def join_channel_route():
return join_channel()
@app.route('/leave', methods=['POST'])
@login_required
def leave_channel_route():
return leave_channel()
@app.route('/files', methods=['GET'])
@login_required
def get_files_route():
return get_files()
@app.route('/quiz_master_page', methods=['GET'])
@login_required
def quiz_master_page_route():
return render_template('quiz_master.html')
@app.route('/quiz_master', methods=['GET'])
@login_required
def get_quiz_master_route():
return get_quiz_master()
@app.route('/unique_keywords', methods=['GET'])
@login_required
def unique_keywords_route():
return get_unique_keywords()
@app.route('/update_quiz_master', methods=['POST'])
@login_required
def update_quiz_master_route():
return update_quiz_master()
@app.route('/add_quiz_master', methods=['POST'])
@login_required
def add_quiz_master_route():
return add_quiz_master()
@app.route('/delete_quiz_master', methods=['POST'])
@login_required
def delete_quiz_master_route():
return delete_quiz_master()
@app.route('/random_quiz', methods=['POST'])
@login_required
def random_quiz_route():
data = request.json
keyword = data.get('keyword')
db = get_db()
quiz_master_data = db.query(QuizMaster).filter(
(QuizMaster.keyword1 == keyword) |
(QuizMaster.keyword2 == keyword) |
(QuizMaster.keyword3 == keyword)
).all()
if not quiz_master_data:
return jsonify({'error': '指定されたキーワードでクイズマスターが見つかりません'}), 404
available_quizzes = [quiz for quiz in quiz_master_data if quiz.id not in drawn_quiz_ids]
if not available_quizzes:
return jsonify({'error': '指定されたキーワードで利用可能なクイズが見つかりません'}), 404
selected_quiz = random.choice(available_quizzes)
drawn_quiz_ids.add(selected_quiz.id)
return jsonify({
'id': selected_quiz.id,
'title': selected_quiz.title,
'full_audio_name': selected_quiz.full_audio_name,
'intro_audio_name': selected_quiz.intro_audio_name,
'keyword1': selected_quiz.keyword1,
'keyword2': selected_quiz.keyword2,
'keyword3': selected_quiz.keyword3
})
@app.route('/reset_quiz', methods=['POST'])
@login_required
def reset_quiz_route():
global drawn_quiz_ids
drawn_quiz_ids = set()
return jsonify({'message': 'クイズがリセットされました'})
@app.route('/start_quiz', methods=['POST'])
@login_required
def start_quiz_route():
data = request.json
quiz_id = data.get('quiz_id')
intro_audio_name = data.get('intro_audio_name')
volume = data.get('volume', 100)
if not quiz_id or not intro_audio_name:
return jsonify({'error': 'クイズIDとイントロ音源名が必要です'}), 400
response = requests.post(f"{app.config['BOT_API_URL']}/start_quiz", json={'quiz_id': quiz_id, 'intro_audio_name': intro_audio_name, 'volume': volume})
if response.status_code == 200:
return jsonify({'message': 'クイズを開始しました'})
else:
return jsonify({'error': 'クイズの開始に失敗しました'}), response.status_code
@app.route('/search_quiz', methods=['GET'])
@login_required
def search_quiz_route():
title = request.args.get('title', '')
db = get_db()
results = db.query(QuizMaster).filter(QuizMaster.title.like(f'%{title}%')).all()
quizzes = [{'id': quiz.id, 'title': quiz.title, 'full_audio_name': quiz.full_audio_name, 'intro_audio_name': quiz.intro_audio_name,
'keyword1': quiz.keyword1, 'keyword2': quiz.keyword2, 'keyword3': quiz.keyword3} for quiz in results]
return jsonify(quizzes)
@app.route('/select_quiz', methods=['POST'])
@login_required
def select_quiz_route():
data = request.json
quiz_id = data.get('quiz_id')
db = get_db()
quiz = db.query(QuizMaster).filter_by(id=quiz_id).first()
if not quiz:
return jsonify({'error': '指定されたIDのクイズが見つかりません'}), 404
selected_quiz = {
'id': quiz.id,
'title': quiz.title,
'full_audio_name': quiz.full_audio_name,
'intro_audio_name': quiz.intro_audio_name,
'keyword1': quiz.keyword1,
'keyword2': quiz.keyword2,
'keyword3': quiz.keyword3
}
return jsonify(selected_quiz)
@app.route('/upload_quiz_master', methods=['POST'])
@login_required
def upload_quiz_master_route():
global uploaded_file_path
try:
if 'file' not in request.files:
logger.debug('No file part in the request')
flash('No file part')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
logger.debug('No selected file')
flash('No selected file')
return redirect(request.url)
if file and file.filename.endswith('.csv'):
uploaded_file_path = os.path.join(app.config['UPLOAD_FOLDER'], file.filename)
file.save(uploaded_file_path)
logger.debug(f'File {file.filename} uploaded successfully to {uploaded_file_path}')
flash('File uploaded successfully')
return redirect(url_for('quiz_master_page_route'))
logger.debug('Invalid file format')
flash('Invalid file format')
return redirect(request.url)
except Exception as e:
logger.error(f'Error during file upload: {str(e)}')
flash('Error during file upload')
return redirect(request.url)
@app.route('/import_quiz_master', methods=['POST'])
@login_required
def import_quiz_master_route():
global uploaded_file_path
try:
if uploaded_file_path and os.path.exists(uploaded_file_path):
logger.debug(f'Starting import of file {uploaded_file_path}')
process_csv(uploaded_file_path)
logger.debug('File imported successfully')
flash('File imported successfully')
uploaded_file_path = None # Reset after processing
return redirect(url_for('quiz_master_page_route'))
logger.debug('No file uploaded to import')
flash('No file uploaded to import')
return redirect(url_for('quiz_master_page_route'))
except Exception as e:
logger.error(f'Error during file import: {str(e)}')
flash('Error during file import')
return redirect(url_for('quiz_master_page_route'))
def process_csv(file_path):
try:
with open(file_path, newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
db = get_db()
for row in reader:
# BOT格納状況が「格納済」のデータをスキップ
if row['BOT格納状況'] == '格納済':
logger.debug(f'Skipping row due to BOT格納状況 being 格納済: {row}')
continue
# 必要な情報がそろっているかチェック
if not (row['曲名'] and row['フル音源名'] and row['イントロ音源名']):
logger.debug(f'Skipping row due to missing information: {row}')
continue
# キーワード1からキーワード3のうちどれか1つが入力されているかチェック
if not (row['キーワード1(歌唱)'] or row['キーワード2(種別)'] or row['キーワード3']):
logger.debug(f'Skipping row due to missing keywords: {row}')
continue
# データベースに保存
quiz_master = QuizMaster(
title=row['曲名'],
full_audio_name=f"{row['フル音源名']}.mp3",
intro_audio_name=f"{row['イントロ音源名']}.mp3",
keyword1=row['キーワード1(歌唱)'],
keyword2=row['キーワード2(種別)'],
keyword3=row['キーワード3']
)
db.add(quiz_master)
logger.debug(f'Added row to database: {row}')
db.commit()
logger.debug('CSV file processed and committed to database')
except Exception as e:
logger.error(f'Error processing CSV file: {str(e)}')
# プレビュー用のエンドポイントを追加
@app.route('/preview_audio/<filename>', methods=['GET'])
@login_required
def preview_audio_route(filename):
try:
# ファイルがアップロードされているディレクトリからファイルを返す
return send_from_directory(app.config['UPLOAD_FOLDER'], filename)
except Exception as e:
return jsonify({'error': 'ファイルのプレビュー中にエラーが発生しました'}), 500
2024-08-03 10:29:46 +00:00
if __name__ == "__main__":
try:
app.run(debug=True, host='0.0.0.0', port=80)
except ImportError as e:
logger.error(f"ImportErrorが発生しました: {e}")
traceback.print_exc(file=sys.stderr)
except Exception as e:
logger.error(f"例外が発生しました: {e}")
traceback.print_exc(file=sys.stderr)