| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332 |
- from gevent import pywsgi
- import subprocess
- import threading
- import time
- from flask import Flask, render_template, Response, jsonify, request, send_file
- from queue import Queue, Empty
- import os, sys
- import json
- import uuid
- import io
- import zipfile
- from datetime import datetime
- app = Flask(__name__)
- # 获取当前脚本所在目录(即打包后的exe所在目录)
- if getattr(sys, 'frozen', False):
- # 如果是PyInstaller打包后的exe
- BASE_DIR = sys._MEIPASS
- else:
- # 普通Python脚本
- BASE_DIR = os.path.dirname(os.path.abspath(__file__))
- # 配置
- UPLOAD_FOLDER = os.path.join(BASE_DIR, 'uploads')
- SCRIPT_FOLDER = os.path.join(BASE_DIR, 'scripts')
- # UPLOAD_FOLDER = 'uploads'
- # SCRIPT_FOLDER = 'scripts'
- ALLOWED_EXTENSIONS = {'tar.gz', 'tgz'}
- LOG_FOLDERS = {
- 'network': '/home/forlinx/Desktop/workspace/TPMFM-A/logs',
- 'compute': '/home/forlinx/Desktop/workspace/dataParsing/logs',
- 'monitor': '/home/forlinx/Desktop/workspace/monitor/storage/logs'
- }
- # 创建必要的目录
- os.makedirs(UPLOAD_FOLDER, exist_ok=True)
- os.makedirs(SCRIPT_FOLDER, exist_ok=True)
- # 存储执行状态和输出的全局变量
- execution_data = {
- 'is_running': False,
- 'output': [],
- 'process': None,
- 'current_file': None
- }
- def allowed_file(filename):
- """检查文件扩展名是否允许"""
- return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS or \
- filename.endswith('.tar.gz') or filename.endswith('.tgz')
- def run_fixed_shell_script(software_type, uploaded_file_path, output_queue):
- """运行固定的shell脚本处理上传的文件"""
- try:
- # 固定的shell脚本路径
- fixed_script_path = os.path.join(SCRIPT_FOLDER, 'run.sh')
-
- # 检查固定脚本是否存在,如果不存在则创建
- if not os.path.exists(fixed_script_path):
- create_default_script(fixed_script_path)
-
- # 确保脚本有执行权限
- os.chmod(fixed_script_path, 0o755)
-
- # 使用Popen执行固定脚本,传入上传的文件作为参数
- process = subprocess.Popen(
- ['bash', fixed_script_path, software_type, uploaded_file_path],
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- text=True,
- bufsize=1,
- universal_newlines=True
- )
-
- execution_data['process'] = process
-
- # 实时读取输出
- for line in iter(process.stdout.readline, ''):
- if line:
- # 清理行尾的换行符,添加HTML换行
- clean_line = line.rstrip('\n')
- output_queue.put(f"{clean_line}<br>")
-
- process.wait()
- return_code = process.returncode
-
- if return_code == 0:
- output_queue.put(f"<span style='color: green'>文件处理完成!退出码: {return_code}</span><br>")
- else:
- output_queue.put(f"<span style='color: red'>文件处理失败!退出码: {return_code}</span><br>")
-
- except Exception as e:
- output_queue.put(f"<span style='color: red'>执行错误: {str(e)}</span><br>")
- finally:
- execution_data['is_running'] = False
- execution_data['process'] = None
- @app.route('/')
- def index():
- """主页面"""
- return render_template('index.html')
- @app.route('/upload', methods=['POST'])
- def upload_file():
- """上传文件并自动执行处理"""
- if execution_data['is_running']:
- return jsonify({
- 'success': False,
- 'message': '当前有任务正在执行,请等待完成'
- })
-
- # 检查是否有文件
- if 'file' not in request.files:
- return jsonify({'success': False, 'message': '没有选择文件'})
-
- file = request.files['file']
- # 获取 software_type
- software_type = request.form.get('software_type')
-
- if file.filename == '':
- return jsonify({'success': False, 'message': '没有选择文件'})
-
- if file and allowed_file(file.filename):
- # 生成唯一文件名
- # filename = f"{uuid.uuid4().hex}_{file.filename}"
- filename = file.filename
- filepath = os.path.join(UPLOAD_FOLDER, filename)
- # 保存文件
- file.save(filepath)
-
- # 重置输出
- execution_data['output'] = []
- execution_data['is_running'] = True
- execution_data['current_file'] = filename
-
- # 创建输出队列
- output_queue = Queue()
-
- # 在新线程中执行固定的shell脚本
- thread = threading.Thread(
- target=run_fixed_shell_script,
- args=(software_type, filepath, output_queue),
- daemon=True
- )
- thread.start()
-
- # 启动一个线程来定期从队列中获取输出
- def collect_output():
- while execution_data['is_running'] or not output_queue.empty():
- try:
- # 从队列获取输出,最多等待1秒
- line = output_queue.get(timeout=1)
- execution_data['output'].append(line)
- except Empty:
- continue
- except Exception as e:
- print(f"收集输出时出错: {e}")
- break
-
- collection_thread = threading.Thread(target=collect_output, daemon=True)
- collection_thread.start()
-
- return jsonify({
- 'success': True,
- 'message': '文件上传成功,开始处理',
- 'filename': filename
- })
-
- return jsonify({'success': False, 'message': '不支持的文件类型'})
- @app.route('/stream')
- def stream():
- """SSE流式输出"""
- def generate():
- last_index = 0
- while execution_data['is_running']:
- # 检查是否有新输出
- current_output = execution_data['output']
- if len(current_output) > last_index:
- # 发送新内容
- for i in range(last_index, len(current_output)):
- yield f"data: {json.dumps({'output': current_output[i]})}\n\n"
- last_index = len(current_output)
-
- time.sleep(0.1) # 短暂休眠减少CPU使用
-
- # 发送最后可能剩余的输出
- current_output = execution_data['output']
- if len(current_output) > last_index:
- for i in range(last_index, len(current_output)):
- yield f"data: {json.dumps({'output': current_output[i]})}\n\n"
-
- # 发送结束标志
- yield f"data: {json.dumps({'output': '<br><strong>处理完成</strong>'})}\n\n"
-
- return Response(
- generate(),
- mimetype='text/event-stream',
- headers={
- 'Cache-Control': 'no-cache',
- 'X-Accel-Buffering': 'no'
- }
- )
- @app.route('/status')
- def get_status():
- """获取执行状态"""
- return jsonify({
- 'is_running': execution_data['is_running'],
- 'current_file': execution_data['current_file'],
- 'output_length': len(execution_data['output'])
- })
- @app.route('/stop')
- def stop_execution():
- """停止当前执行"""
- if execution_data['is_running'] and execution_data['process']:
- try:
- execution_data['process'].terminate()
- execution_data['is_running'] = False
- return jsonify({
- 'success': True,
- 'message': '已停止处理'
- })
- except Exception as e:
- return jsonify({
- 'success': False,
- 'message': f'停止失败: {str(e)}'
- })
-
- return jsonify({
- 'success': False,
- 'message': '没有正在执行的任务'
- })
- @app.route('/clear_output', methods=['POST'])
- def clear_output():
- """清空输出"""
- execution_data['output'] = []
- return jsonify({'success': True, 'message': '输出已清空'})
- @app.route('/download_logs', methods=['GET'])
- def download_logs():
- """下载指定软件的日志文件"""
- software_type = request.args.get('software', 'network')
-
- if software_type not in LOG_FOLDERS:
- return jsonify({
- 'success': False,
- 'message': '不支持的软件类型'
- }), 400
-
- log_folder = LOG_FOLDERS[software_type]
-
- # 检查日志文件夹是否存在
- if not os.path.exists(log_folder):
- return jsonify({
- 'success': False,
- 'message': f'日志文件夹不存在: {log_folder}'
- }), 404
-
- # 获取所有.log文件
- log_files = []
- for root, dirs, files in os.walk(log_folder):
- for file in files:
- if file.endswith('.log'):
- log_files.append(os.path.join(root, file))
-
- if not log_files:
- return jsonify({
- 'success': False,
- 'message': '未找到日志文件'
- }), 404
-
- # 创建临时ZIP文件
- temp_zip = io.BytesIO()
-
- try:
- with zipfile.ZipFile(temp_zip, 'w', zipfile.ZIP_DEFLATED) as zipf:
- for log_file in log_files:
- # 获取相对路径
- rel_path = os.path.relpath(log_file, log_folder)
- zipf.write(log_file, arcname=rel_path)
-
- temp_zip.seek(0)
-
- # 设置下载文件名
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- filename = f'{software_type}_logs_{timestamp}.zip'
-
- # 记录下载日志
- # download_log_path = os.path.join(log_folder, f'{software_type}_download.log')
- # with open(download_log_path, 'a') as f:
- # f.write(f'[{datetime.now()}] Logs downloaded: {filename}\n')
-
- # 发送ZIP文件
- return send_file(
- temp_zip,
- mimetype='application/zip',
- as_attachment=True,
- download_name=filename
- )
-
- except Exception as e:
- return jsonify({
- 'success': False,
- 'message': f'创建ZIP文件失败: {str(e)}'
- }), 500
- if __name__ == '__main__':
- # 确保固定脚本存在
- fixed_script_path = os.path.join(SCRIPT_FOLDER, 'run.sh')
-
- print(f"固定脚本位置: {fixed_script_path}")
- print(f"上传目录: {UPLOAD_FOLDER}")
-
- # app.run(
- # debug=True,
- # host='0.0.0.0',
- # port=5000,
- # threaded=True
- # )
- server = pywsgi.WSGIServer(('0.0.0.0', 5000), app)
- try:
- server.serve_forever()
- except KeyboardInterrupt:
- print("OTA server stopped by user")
- server.stop()
|