python局域网U盘
2026-06-15 19:09:25
发布于:广东
AI生成
import os
import socket
from flask import Flask, request, render_template_string, send_from_directory, redirect, url_for, flash
import shutil
# ------------------------- 配置 -------------------------
UPLOAD_FOLDER = '局域网U盘的临时文件夹' # 共享文件夹
MAX_STORAGE = 1 * 1024 * 1024 * 1024 # 总容量限制 1GB
# --------------------------------------------------------
app = Flask(__name__)
app.secret_key = 'replace_with_random_string' # 用于 flash 消息
# 确保上传文件夹存在
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
def get_dir_size(path):
"""递归计算文件夹总大小(字节)"""
total = 0
for dirpath, _, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
if not os.path.islink(fp):
try:
total += os.path.getsize(fp)
except OSError:
continue
return total
def get_local_ip():
"""获取本机局域网 IP"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('10.255.255.255', 1))
ip = s.getsockname()
s.close()
return ip
except Exception:
return '127.0.0.1'
# ------------------------- HTML 模板 -------------------------
HTML_TEMPLATE = '''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>局域网U盘 - 1GB共享空间</title>
<style>
body { font-family: 'Segoe UI', sans-serif; max-width: 700px; margin: 30px auto; padding: 20px; }
h1 { color: #333; }
.usage { font-size: 18px; margin: 10px 0; }
.progress { background: #eee; border-radius: 10px; height: 24px; width: 100%; }
.progress-bar { background: #4CAF50; height: 100%; border-radius: 10px; text-align: center; color: white; }
table { width: 100%; border-collapse: collapse; }
th, td { padding: 8px; text-align: left; border-bottom: 1px solid #***; }
a { color: #2196F3; text-decoration: none; }
a:hover { text-decoration: underline; }
.upload-form { margin: 20px 0; }
.flash { color: red; }
.flash-success { color: green; }
</style>
</head>
<body>
<h1> 局域网共享U盘</h1>
<p>总容量限制:1 GB | 本机访问地址:<strong>http://{{ ip }}:5000</strong></p>
<div class="usage">
已用空间:{{ used_mb }} MB / {{ limit_mb }} MB
</div>
<div class="progress">
<div class="progress-bar" style="width: {{ percent }}%;">{{ percent|round(1) }}%</div>
</div>
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
<p class="flash{% if category=='success' %}-success{% endif %}">{{ message }}</p>
{% endfor %}
{% endif %}
{% endwith %}
<div class="upload-form">
<form method="POST" enctype="multipart/form-data">
<input type="file" name="file" required>
<button type="submit">上传文件</button>
</form>
</div>
<h2>文件列表</h2>
{% if files %}
<table>
<tr><th>文件名</th><th>大小</th><th>操作</th></tr>
{% for file in files %}
<tr>
<td>{{ file.name }}</td>
<td>{{ file.size_mb }} MB</td>
<td><a href="{{ url_for('download_file', filename=file.name) }}" download>下载</td>
</tr>
{% endfor %}
</table>
{% else %}
<p>还没有文件,快来上传吧!</p>
{% endif %}
</body>
</html>
'''
# ------------------------- 路由 -------------------------
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
# 处理上传
if 'file' not in request.files:
flash('没有选择文件', 'error')
return redirect(url_for('index'))
file = request.files['file']
if file.filename == '':
flash('文件名为空', 'error')
return redirect(url_for('index'))
# 检查容量限制
current_size = get_dir_size(UPLOAD_FOLDER)
# 需要先知道文件大小,flask 的 file 对象可读取一部分来判断,但安全做法是保存到临时位置再测大小
# 这里我们简单保存到一个临时路径,获取大小后再决定
# 更好的办法:使用 file.seek(0, os.SEEK_END) + file.tell() 但 flask 的 FileStorage 不支持直接 seek
# 我们先把文件流读取到一个 BytesIO 中得到大小
file_bytes = file.read() # 一次性读入内存(文件过大需调整)
file_size = len(file_bytes)
if current_size + file_size > MAX_STORAGE:
flash(f'容量不足!当前已用 {current_size / 1024 / 1024:.1f} MB,超出 1 GB 限制', 'error')
return redirect(url_for('index'))
# 处理文件名冲突:如果已存在,自动加序号
original_name = file.filename
save_path = os.path.join(UPLOAD_FOLDER, original_name)
base, ext = os.path.splitext(original_name)
counter = 1
while os.path.exists(save_path):
new_name = f"{base}_{counter}{ext}"
save_path = os.path.join(UPLOAD_FOLDER, new_name)
counter += 1
# 写入文件
try:
with open(save_path, 'wb') as f:
f.write(file_bytes)
flash(f'文件 “{os.path.basename(save_path)}” 上传成功!', 'success')
except Exception as e:
flash(f'保存失败: {e}', 'error')
return redirect(url_for('index'))
# GET 请求:显示文件列表与用量
files = []
if os.path.exists(UPLOAD_FOLDER):
for fname in os.listdir(UPLOAD_FOLDER):
fpath = os.path.join(UPLOAD_FOLDER, fname)
if os.path.isfile(fpath):
size = os.path.getsize(fpath)
files.append({
'name': fname,
'size_mb': round(size / 1024 / 1024, 2)
})
# 按文件名排序
files.sort(key=lambda x: x['name'])
used = get_dir_size(UPLOAD_FOLDER)
percent = (used / MAX_STORAGE) * 100 if MAX_STORAGE > 0 else 0
return render_template_string(
HTML_TEMPLATE,
ip=get_local_ip(),
files=files,
used_mb=round(used / 1024 / 1024, 2),
limit_mb=round(MAX_STORAGE / 1024 / 1024, 2),
percent=percent
)
@app.route('/download/<filename>')
def download_file(filename):
try:
return send_from_directory(UPLOAD_FOLDER, filename, as_attachment=True)
except Exception:
flash('文件不存在', 'error')
return redirect(url_for('index'))
# ------------------------- 启动 -------------------------
if os.path.exists(UPLOAD_FOLDER):
shutil.rmtree(UPLOAD_FOLDER)
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
if __name__ == '__main__':
local_ip = get_local_ip()
print(f'✅ 局域网U盘已启动')
print(f' 访问地址: http://{local_ip}:5000')
print(f' (按 Ctrl+C 停止服务)')
# 允许所有主机访问(局域网内其他设备皆可连接)
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)
这里空空如也





















有帮助,赞一个