from gevent import pywsgi import subprocess import threading import time from flask import Flask, render_template, Response, jsonify, request, send_file from queue import Queue, Empty import os, sys import json import uuid import io import zipfile from datetime import datetime app = Flask(__name__) # 获取当前脚本所在目录(即打包后的exe所在目录) if getattr(sys, 'frozen', False): # 如果是PyInstaller打包后的exe BASE_DIR = sys._MEIPASS else: # 普通Python脚本 BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 配置 UPLOAD_FOLDER = os.path.join(BASE_DIR, 'uploads') SCRIPT_FOLDER = os.path.join(BASE_DIR, 'scripts') # UPLOAD_FOLDER = 'uploads' # SCRIPT_FOLDER = 'scripts' ALLOWED_EXTENSIONS = {'tar.gz', 'tgz'} LOG_FOLDERS = { 'network': '/home/forlinx/Desktop/workspace/TPMFM-A/logs', 'compute': '/home/forlinx/Desktop/workspace/dataParsing/logs', 'monitor': '/home/forlinx/monitor/storage/logs' } # 创建必要的目录 os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(SCRIPT_FOLDER, exist_ok=True) # 存储执行状态和输出的全局变量 execution_data = { 'is_running': False, 'output': [], 'process': None, 'current_file': None } def allowed_file(filename): """检查文件扩展名是否允许""" return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS or \ filename.endswith('.tar.gz') or filename.endswith('.tgz') def run_fixed_shell_script(software_type, uploaded_file_path, output_queue): """运行固定的shell脚本处理上传的文件""" try: # 固定的shell脚本路径 fixed_script_path = os.path.join(SCRIPT_FOLDER, 'run.sh') # 检查固定脚本是否存在,如果不存在则创建 if not os.path.exists(fixed_script_path): create_default_script(fixed_script_path) # 确保脚本有执行权限 os.chmod(fixed_script_path, 0o755) # 使用Popen执行固定脚本,传入上传的文件作为参数 process = subprocess.Popen( ['bash', fixed_script_path, software_type, uploaded_file_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True ) execution_data['process'] = process # 实时读取输出 for line in iter(process.stdout.readline, ''): if line: # 清理行尾的换行符,添加HTML换行 clean_line = line.rstrip('\n') output_queue.put(f"{clean_line}
") process.wait() return_code = process.returncode if return_code == 0: output_queue.put(f"文件处理完成!退出码: {return_code}
") else: output_queue.put(f"文件处理失败!退出码: {return_code}
") except Exception as e: output_queue.put(f"执行错误: {str(e)}
") finally: execution_data['is_running'] = False execution_data['process'] = None @app.route('/') def index(): """主页面""" return render_template('index.html') @app.route('/upload', methods=['POST']) def upload_file(): """上传文件并自动执行处理""" if execution_data['is_running']: return jsonify({ 'success': False, 'message': '当前有任务正在执行,请等待完成' }) # 检查是否有文件 if 'file' not in request.files: return jsonify({'success': False, 'message': '没有选择文件'}) file = request.files['file'] # 获取 software_type software_type = request.form.get('software_type') if file.filename == '': return jsonify({'success': False, 'message': '没有选择文件'}) if file and allowed_file(file.filename): # 生成唯一文件名 # filename = f"{uuid.uuid4().hex}_{file.filename}" filename = file.filename filepath = os.path.join(UPLOAD_FOLDER, filename) # 保存文件 file.save(filepath) # 重置输出 execution_data['output'] = [] execution_data['is_running'] = True execution_data['current_file'] = filename # 创建输出队列 output_queue = Queue() # 在新线程中执行固定的shell脚本 thread = threading.Thread( target=run_fixed_shell_script, args=(software_type, filepath, output_queue), daemon=True ) thread.start() # 启动一个线程来定期从队列中获取输出 def collect_output(): while execution_data['is_running'] or not output_queue.empty(): try: # 从队列获取输出,最多等待1秒 line = output_queue.get(timeout=1) execution_data['output'].append(line) except Empty: continue except Exception as e: print(f"收集输出时出错: {e}") break collection_thread = threading.Thread(target=collect_output, daemon=True) collection_thread.start() return jsonify({ 'success': True, 'message': '文件上传成功,开始处理', 'filename': filename }) return jsonify({'success': False, 'message': '不支持的文件类型'}) @app.route('/stream') def stream(): """SSE流式输出""" def generate(): last_index = 0 while execution_data['is_running']: # 检查是否有新输出 current_output = execution_data['output'] if len(current_output) > last_index: # 发送新内容 for i in range(last_index, len(current_output)): yield f"data: {json.dumps({'output': current_output[i]})}\n\n" last_index = len(current_output) time.sleep(0.1) # 短暂休眠减少CPU使用 # 发送最后可能剩余的输出 current_output = execution_data['output'] if len(current_output) > last_index: for i in range(last_index, len(current_output)): yield f"data: {json.dumps({'output': current_output[i]})}\n\n" # 发送结束标志 yield f"data: {json.dumps({'output': '
处理完成'})}\n\n" return Response( generate(), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no' } ) @app.route('/status') def get_status(): """获取执行状态""" return jsonify({ 'is_running': execution_data['is_running'], 'current_file': execution_data['current_file'], 'output_length': len(execution_data['output']) }) @app.route('/stop') def stop_execution(): """停止当前执行""" if execution_data['is_running'] and execution_data['process']: try: execution_data['process'].terminate() execution_data['is_running'] = False return jsonify({ 'success': True, 'message': '已停止处理' }) except Exception as e: return jsonify({ 'success': False, 'message': f'停止失败: {str(e)}' }) return jsonify({ 'success': False, 'message': '没有正在执行的任务' }) @app.route('/clear_output', methods=['POST']) def clear_output(): """清空输出""" execution_data['output'] = [] return jsonify({'success': True, 'message': '输出已清空'}) @app.route('/download_logs', methods=['GET']) def download_logs(): """下载指定软件的日志文件""" software_type = request.args.get('software', 'network') if software_type not in LOG_FOLDERS: return jsonify({ 'success': False, 'message': '不支持的软件类型' }), 400 log_folder = LOG_FOLDERS[software_type] # 检查日志文件夹是否存在 if not os.path.exists(log_folder): return jsonify({ 'success': False, 'message': f'日志文件夹不存在: {log_folder}' }), 404 # 获取所有.log文件 log_files = [] for root, dirs, files in os.walk(log_folder): for file in files: if file.endswith('.log'): log_files.append(os.path.join(root, file)) if not log_files: return jsonify({ 'success': False, 'message': '未找到日志文件' }), 404 # 创建临时ZIP文件 temp_zip = io.BytesIO() try: with zipfile.ZipFile(temp_zip, 'w', zipfile.ZIP_DEFLATED) as zipf: for log_file in log_files: # 获取相对路径 rel_path = os.path.relpath(log_file, log_folder) zipf.write(log_file, arcname=rel_path) temp_zip.seek(0) # 设置下载文件名 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'{software_type}_logs_{timestamp}.zip' # 记录下载日志 # download_log_path = os.path.join(log_folder, f'{software_type}_download.log') # with open(download_log_path, 'a') as f: # f.write(f'[{datetime.now()}] Logs downloaded: {filename}\n') # 发送ZIP文件 return send_file( temp_zip, mimetype='application/zip', as_attachment=True, download_name=filename ) except Exception as e: return jsonify({ 'success': False, 'message': f'创建ZIP文件失败: {str(e)}' }), 500 if __name__ == '__main__': # 确保固定脚本存在 fixed_script_path = os.path.join(SCRIPT_FOLDER, 'run.sh') print(f"固定脚本位置: {fixed_script_path}") print(f"上传目录: {UPLOAD_FOLDER}") # app.run( # debug=True, # host='0.0.0.0', # port=5000, # threaded=True # ) server = pywsgi.WSGIServer(('0.0.0.0', 5000), app) try: server.serve_forever() except KeyboardInterrupt: print("OTA server stopped by user") server.stop()