from gevent import pywsgi
import subprocess
import threading
import time
from flask import Flask, render_template, Response, jsonify, request, send_file
from queue import Queue, Empty
import os, sys
import json
import uuid
import io
import zipfile
from datetime import datetime
app = Flask(__name__)
# 获取当前脚本所在目录(即打包后的exe所在目录)
if getattr(sys, 'frozen', False):
# 如果是PyInstaller打包后的exe
BASE_DIR = sys._MEIPASS
else:
# 普通Python脚本
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 配置
UPLOAD_FOLDER = os.path.join(BASE_DIR, 'uploads')
SCRIPT_FOLDER = os.path.join(BASE_DIR, 'scripts')
# UPLOAD_FOLDER = 'uploads'
# SCRIPT_FOLDER = 'scripts'
ALLOWED_EXTENSIONS = {'tar.gz', 'tgz'}
LOG_FOLDERS = {
'network': '/home/forlinx/Desktop/workspace/TPMFM-A/logs',
'compute': '/home/forlinx/Desktop/workspace/dataParsing/logs',
'monitor': '/home/forlinx/Desktop/workspace/monitor/storage/logs'
}
# 创建必要的目录
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(SCRIPT_FOLDER, exist_ok=True)
# 存储执行状态和输出的全局变量
execution_data = {
'is_running': False,
'output': [],
'process': None,
'current_file': None
}
def allowed_file(filename):
"""检查文件扩展名是否允许"""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS or \
filename.endswith('.tar.gz') or filename.endswith('.tgz')
def run_fixed_shell_script(software_type, uploaded_file_path, output_queue):
"""运行固定的shell脚本处理上传的文件"""
try:
# 固定的shell脚本路径
fixed_script_path = os.path.join(SCRIPT_FOLDER, 'run.sh')
# 检查固定脚本是否存在,如果不存在则创建
if not os.path.exists(fixed_script_path):
create_default_script(fixed_script_path)
# 确保脚本有执行权限
os.chmod(fixed_script_path, 0o755)
# 使用Popen执行固定脚本,传入上传的文件作为参数
process = subprocess.Popen(
['bash', fixed_script_path, software_type, uploaded_file_path],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True
)
execution_data['process'] = process
# 实时读取输出
for line in iter(process.stdout.readline, ''):
if line:
# 清理行尾的换行符,添加HTML换行
clean_line = line.rstrip('\n')
output_queue.put(f"{clean_line}
")
process.wait()
return_code = process.returncode
if return_code == 0:
output_queue.put(f"文件处理完成!退出码: {return_code}
")
else:
output_queue.put(f"文件处理失败!退出码: {return_code}
")
except Exception as e:
output_queue.put(f"执行错误: {str(e)}
")
finally:
execution_data['is_running'] = False
execution_data['process'] = None
@app.route('/')
def index():
"""主页面"""
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_file():
"""上传文件并自动执行处理"""
if execution_data['is_running']:
return jsonify({
'success': False,
'message': '当前有任务正在执行,请等待完成'
})
# 检查是否有文件
if 'file' not in request.files:
return jsonify({'success': False, 'message': '没有选择文件'})
file = request.files['file']
# 获取 software_type
software_type = request.form.get('software_type')
if file.filename == '':
return jsonify({'success': False, 'message': '没有选择文件'})
if file and allowed_file(file.filename):
# 生成唯一文件名
# filename = f"{uuid.uuid4().hex}_{file.filename}"
filename = file.filename
filepath = os.path.join(UPLOAD_FOLDER, filename)
# 保存文件
file.save(filepath)
# 重置输出
execution_data['output'] = []
execution_data['is_running'] = True
execution_data['current_file'] = filename
# 创建输出队列
output_queue = Queue()
# 在新线程中执行固定的shell脚本
thread = threading.Thread(
target=run_fixed_shell_script,
args=(software_type, filepath, output_queue),
daemon=True
)
thread.start()
# 启动一个线程来定期从队列中获取输出
def collect_output():
while execution_data['is_running'] or not output_queue.empty():
try:
# 从队列获取输出,最多等待1秒
line = output_queue.get(timeout=1)
execution_data['output'].append(line)
except Empty:
continue
except Exception as e:
print(f"收集输出时出错: {e}")
break
collection_thread = threading.Thread(target=collect_output, daemon=True)
collection_thread.start()
return jsonify({
'success': True,
'message': '文件上传成功,开始处理',
'filename': filename
})
return jsonify({'success': False, 'message': '不支持的文件类型'})
@app.route('/stream')
def stream():
"""SSE流式输出"""
def generate():
last_index = 0
while execution_data['is_running']:
# 检查是否有新输出
current_output = execution_data['output']
if len(current_output) > last_index:
# 发送新内容
for i in range(last_index, len(current_output)):
yield f"data: {json.dumps({'output': current_output[i]})}\n\n"
last_index = len(current_output)
time.sleep(0.1) # 短暂休眠减少CPU使用
# 发送最后可能剩余的输出
current_output = execution_data['output']
if len(current_output) > last_index:
for i in range(last_index, len(current_output)):
yield f"data: {json.dumps({'output': current_output[i]})}\n\n"
# 发送结束标志
yield f"data: {json.dumps({'output': '
处理完成'})}\n\n"
return Response(
generate(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no'
}
)
@app.route('/status')
def get_status():
"""获取执行状态"""
return jsonify({
'is_running': execution_data['is_running'],
'current_file': execution_data['current_file'],
'output_length': len(execution_data['output'])
})
@app.route('/stop')
def stop_execution():
"""停止当前执行"""
if execution_data['is_running'] and execution_data['process']:
try:
execution_data['process'].terminate()
execution_data['is_running'] = False
return jsonify({
'success': True,
'message': '已停止处理'
})
except Exception as e:
return jsonify({
'success': False,
'message': f'停止失败: {str(e)}'
})
return jsonify({
'success': False,
'message': '没有正在执行的任务'
})
@app.route('/clear_output', methods=['POST'])
def clear_output():
"""清空输出"""
execution_data['output'] = []
return jsonify({'success': True, 'message': '输出已清空'})
@app.route('/download_logs', methods=['GET'])
def download_logs():
"""下载指定软件的日志文件"""
software_type = request.args.get('software', 'network')
if software_type not in LOG_FOLDERS:
return jsonify({
'success': False,
'message': '不支持的软件类型'
}), 400
log_folder = LOG_FOLDERS[software_type]
# 检查日志文件夹是否存在
if not os.path.exists(log_folder):
return jsonify({
'success': False,
'message': f'日志文件夹不存在: {log_folder}'
}), 404
# 获取所有.log文件
log_files = []
for root, dirs, files in os.walk(log_folder):
for file in files:
if file.endswith('.log'):
log_files.append(os.path.join(root, file))
if not log_files:
return jsonify({
'success': False,
'message': '未找到日志文件'
}), 404
# 创建临时ZIP文件
temp_zip = io.BytesIO()
try:
with zipfile.ZipFile(temp_zip, 'w', zipfile.ZIP_DEFLATED) as zipf:
for log_file in log_files:
# 获取相对路径
rel_path = os.path.relpath(log_file, log_folder)
zipf.write(log_file, arcname=rel_path)
temp_zip.seek(0)
# 设置下载文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'{software_type}_logs_{timestamp}.zip'
# 记录下载日志
# download_log_path = os.path.join(log_folder, f'{software_type}_download.log')
# with open(download_log_path, 'a') as f:
# f.write(f'[{datetime.now()}] Logs downloaded: {filename}\n')
# 发送ZIP文件
return send_file(
temp_zip,
mimetype='application/zip',
as_attachment=True,
download_name=filename
)
except Exception as e:
return jsonify({
'success': False,
'message': f'创建ZIP文件失败: {str(e)}'
}), 500
if __name__ == '__main__':
# 确保固定脚本存在
fixed_script_path = os.path.join(SCRIPT_FOLDER, 'run.sh')
print(f"固定脚本位置: {fixed_script_path}")
print(f"上传目录: {UPLOAD_FOLDER}")
# app.run(
# debug=True,
# host='0.0.0.0',
# port=5000,
# threaded=True
# )
server = pywsgi.WSGIServer(('0.0.0.0', 5000), app)
try:
server.serve_forever()
except KeyboardInterrupt:
print("OTA server stopped by user")
server.stop()