Face/app.py

450 lines
17 KiB
Python
Raw Permalink Normal View History

2024-07-29 11:24:25 +08:00
import time
import faiss
from flask import Flask, render_template, request, jsonify, send_from_directory
from markupsafe import escape, escape_silent
from werkzeug.utils import secure_filename
from anti import anti_spoofing, load_anti_model
from face_api import load_arcface_model, load_npy, findOne, load_image, face_verification, findAll, add_one_to_database, \
get_claster_tmp_file_embedding, cluster, detect_video
from gender_age import set_gender_conf, gender_age, load_gender_model
from retinaface_detect import load_retinaface_model, detect_one, set_retinaface_conf
from werkzeug.exceptions import RequestEntityTooLarge
import zipfile
import os
import shutil
import re
import numpy as np
import torch
ALLOWED_IMG = set(['png', 'jpg', 'jpeg', 'bmp', 'PNG', 'JPG', 'JPEG'])
# 限制上传的图片最大为10M
ALLOWED_IMG_SIZE = 10 * 1024 * 1024
ALLOWED_FILE = set(['zip'])
ALLOWED_VIDEO = set(['mp4'])
app = Flask(__name__)
# 限制上传的文件最大为100M
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024
# 使用jsonify避免中文乱码
app.config['JSON_AS_ASCII'] = False
# 设置使用CPU或者GPU传入cuda
cpu_or_cuda = "cuda" if torch.cuda.is_available() else "cpu"
# 加载人脸识别模型
arcface_model = load_arcface_model("./model/backbone100.pth", cpu_or_cuda=cpu_or_cuda)
# 加载人脸检测模型
retinaface_args = set_retinaface_conf(cpu_or_cuda=cpu_or_cuda)
retinaface_model = load_retinaface_model(retinaface_args)
# 加载性别年龄识别模型
gender_args = set_gender_conf()
gender_model = load_gender_model(gender_args, 'fc1')
anti_spoofing_model_path = "model/anti_spoof_models"
anti_model = load_anti_model(anti_spoofing_model_path, cpu_or_cuda)
# 读取人脸库
@app.route('/')
def index():
return "model"
@app.route('/hello')
@app.route('/hello/<name>')
def hello(name=None):
return render_template('hello.html', name=name)
@app.route('/user', methods=['GET'])
def show_user_name():
return request.args.get('username', '')
# 创建返回的json数据
# 函数参数用是否=None判断函数中定义的data,result用truefalse判断
def create_response(status, name=None, distance=None, verification=None, gender=None, age=None, num=None, anti=None,
score=None, box_and_point=None, addfile_names=None,fail_names=None,database_name=None,msg=None,
delete_names=None,not_exist_names=None):
# res为总的json结构体
res = {}
res['status'] = status
data = {}
try:
data["box_and_point"] = box_and_point.tolist()
except AttributeError:
pass
if anti != None and score != None:
liveness = {}
liveness["spoofing"] = anti
liveness['score'] = score
data['liveness'] = liveness
if distance!=None:
data['distance'] = float(distance)
if verification!=None:
data['verification'] = verification
if num!=None:
data['number'] = num
if gender!=None:
data['gender'] = gender
if age!=None:
data['age'] = age
if name!=None:
data['name'] = name
if data:
res['data'] = data
# 数据库增删接口返回数据
result = {}
if msg!=None:
res['msg'] = msg
if database_name!=None:
result['database_name'] = database_name
# 增加人脸
if addfile_names!=None or fail_names!=None:
result['success_names'] = addfile_names
result['fail_names'] = fail_names
# 删除人脸
if delete_names!=None or not_exist_names!=None:
result['delete_names'] = delete_names
result['not_exist_names'] = not_exist_names
if result:
res['result'] = result
return jsonify(res)
# 创建cluster接口返回的json数据
def create_cluster_response(status, all_cluster):
res = {}
data = {}
for index, cluster in enumerate(all_cluster):
data['cluster' + str(index)] = cluster
res['data'] = data
res['status'] = status
return res
# 检查上传文件格式
def check_file_format(file_name, format):
if '.' in file_name:
file_format = file_name.rsplit('.')[1]
if file_format in format:
return True
return False
# 检查img大小大于10M抛出异常
def check_img_size(img_path):
fsize = os.path.getsize(img_path)
if fsize > ALLOWED_IMG_SIZE:
raise RequestEntityTooLarge
# 解压zip文件存到某路径
def unzip(zip_src, dst_dir):
f = zipfile.is_zipfile(zip_src)
if f:
fz = zipfile.ZipFile(zip_src, 'r')
for file in fz.namelist():
fz.extract(file, dst_dir)
return True
else:
return False
# 解压文件
def un_zip(file_path, output_path):
zip_file = zipfile.ZipFile(file_path)
if os.path.isdir(output_path):
pass
else:
os.mkdir(output_path)
zip_file.extractall(output_path)
# for names in zip_file.namelist():
# zip_file.extract(names,output_path)
zip_file.close()
# 人脸识别、性别年龄识别
@app.route('/recognition', methods=['POST'])
def recognition():
try:
f = request.files['file_name']
if f and check_file_format(f.filename, ALLOWED_IMG):
img_path = './img/recognition/' + secure_filename(f.filename)
f.save(img_path)
check_img_size(img_path)
# img3 = load_image('./file/'+secure_filename(f.filename))
# img3 = torch.from_numpy(img3)
tic = time.time()
img3, box_and_point = detect_one(img_path, retinaface_model, retinaface_args)
print('detect time: {:.4f}'.format(time.time() - tic))
if len(img3) == 0:
return create_response('no face')
elif len(img3) > 1:
namelist = findAll(img3, arcface_model, index, database_name_list, cpu_or_cuda)
gender_list, age_list = [], []
# gender_list, age_list = gender_age(img3, gender_model)
res = create_response('success', namelist, gender=gender_list, age=age_list,
box_and_point=box_and_point)
else:
b = box_and_point[0]
w = b[2] - b[0]
h = b[3] - b[1]
b[2] = w
b[3] = h
label, value = anti_spoofing(img_path, anti_spoofing_model_path, cpu_or_cuda, np.array(b[:4], int),
anti_model)
# print(index,database_name_list)
name, distance = findOne(img3, arcface_model, index, database_name_list, cpu_or_cuda)
gender_list, age_list = [], []
# gender_list, age_list = gender_age(img3, gender_model)
res = create_response('success', name, gender=gender_list, age=age_list, distance=distance,
anti=label, score=value, box_and_point=box_and_point)
return res
else:
return create_response('png jpg jpeg bmp are allowed')
except RequestEntityTooLarge:
return create_response('image size should be less than 10M')
# 两张图片比对
@app.route('/compare', methods=['POST'])
def compare_file():
try:
file1 = request.files['file1_name']
file2 = request.files['file2_name']
if file1 and check_file_format(file1.filename, ALLOWED_IMG) and file2 and check_file_format(file2.filename,
ALLOWED_IMG):
img1_path = './img/compare/' + secure_filename(file1.filename)
img2_path = './img/compare/' + secure_filename(file2.filename)
file1.save(img1_path)
file2.save(img2_path)
check_img_size(img1_path)
check_img_size(img2_path)
img1, box_and_point1 = detect_one(img1_path, retinaface_model,
retinaface_args)
img2, box_and_point2 = detect_one(img2_path, retinaface_model, retinaface_args)
if len(img1) == 1 and len(img2) == 1:
result,distance = face_verification(img1, img2, arcface_model, cpu_or_cuda)
print(result,distance)
return create_response('success', verification=result,distance=distance)
else:
return create_response('image contains no face or more than 1 face')
else:
return create_response('png jpg jpeg bmp are allowed')
except RequestEntityTooLarge:
return create_response('image size should be less than 10M')
# 数据库增加人脸,可实现向“现有/新建”数据库增加“单张/多张”人脸
# 增和改
@app.route('/databaseAdd', methods=['POST'])
def DB_add_face():
try:
# 上传人脸图片(>=1
# key都为file_listvalue为不同的值可实现批量上传图片
upload_files = request.files.getlist("file_list")
# '',[],{},0都可以视为False
if not upload_files:
msg = "上传文件为空"
return create_response(0,msg=msg)
database_name = request.form.get("database_name")
database_path = "./Database/" + database_name + ".npy"
if not os.path.exists(database_path):
msg = "数据库不存在"
return create_response(0,msg=msg)
# 数据库中已存在的人名
names = load_npy(database_path).keys()
# print(names)
# 这是服务器上用于暂存上传图片的文件夹,每次上传前重建,使用后删除
# 后面可根据需要改为定期删除
file_temp_path = './img/uploadNew/'
if not os.path.exists(file_temp_path):
os.makedirs(file_temp_path)
# 正则表达式用于提取文件名中的中文,用于.npy中的keys
r = re.compile('[\u4e00-\u9fa5]+')
# 分别存取添加成功或失败的名字
success_names = []
fail_names = {}
# 添加失败的两种情况:格式错误或已经存在
format_wrong = []
alreadyExist = []
# 分别处理每一张图片,先判断格式对不对,再判断是否存在
for file in upload_files:
filename = file.filename
name = r.findall(filename)[0]
if file and check_file_format(filename, ALLOWED_IMG):
if name in names:
alreadyExist.append(name)
continue
save_path = file_temp_path + filename
file.save(save_path)
check_img_size(save_path)
img_file, box_and_point = detect_one(save_path, retinaface_model, retinaface_args)
add_one_to_database(img=img_file, model=arcface_model, name=name, database_path=database_path,
cpu_or_cuda=cpu_or_cuda)
success_names.append(name)
else:
format_wrong.append(name)
continue
shutil.rmtree(file_temp_path)
# 如果有错误情况
if format_wrong or alreadyExist:
status = 0
else:
status = 1
fail_names['formatWrong'] = format_wrong
fail_names['alreadyExist'] = alreadyExist
return create_response(status=status,addfile_names=success_names,fail_names=fail_names,database_name=database_name,msg="新增人脸操作执行完成")
except RequestEntityTooLarge:
return create_response(0,msg='image size should be less than 10M')
# 数据库删除人脸,可实现在现有数据库中删除’单/多‘张人脸
@app.route('/databaseDelete', methods=['POST'])
def DB_delete_face():
try:
delete_names = request.form.getlist("delete_names")
database_name = request.form.get("database_name")
database_path = "./Database/" + database_name + ".npy"
if not os.path.exists(database_path):
msg = "数据库不存在"
return create_response(0,msg=msg)
if not delete_names:
msg = "delete_names参数为空"
return create_response(0,msg=msg)
k_v = load_npy(database_path)
print(k_v.keys())
success_list = []
fail_list = []
for name in delete_names:
if name in k_v.keys():
del k_v[name]
success_list.append(name)
else:
fail_list.append(name)
continue
np.save(database_path, k_v)
status = 1
if fail_list:
status = 0
return create_response(status=status,delete_names=success_list,not_exist_names=fail_list,database_name=database_name,
msg="删除人脸操作完成")
except RequestEntityTooLarge:
return create_response(0,'image size should be less than 10M')
# 以图搜图接口:
# 上传图片压缩包建图片库
@app.route('/uploadZip', methods=['POST'])
def upload_Zip():
try:
zip = request.files['zip_name']
dst_dir = './img/search/'
if unzip(zip, dst_dir):
return create_response('upload zip success')
else:
return create_response('upload zip file please')
except RequestEntityTooLarge:
return create_response('image size should be less than 10M')
# 以图搜图
@app.route('/imgSearchImg', methods=['POST'])
def img_search_img():
searchfile = './img/search/face'
try:
file = request.files['img_name']
if file and check_file_format(file.filename, ALLOWED_IMG):
img_path = './img/search/' + secure_filename(file.filename)
file.save(img_path)
check_img_size(img_path)
img, box_and_point = detect_one(img_path, retinaface_model,
retinaface_args)
if len(img) == 1:
Onename = []
num = 0
for filenames in os.listdir(searchfile):
imgpath = os.path.join(searchfile, filenames)
imgdata, box_and_point = detect_one(imgpath, retinaface_model, retinaface_args)
result = face_verification(img, imgdata, arcface_model, cpu_or_cuda)
isOne, distance = result.split(' ', -1)[0], result.split(' ', -1)[1]
if isOne == 'same':
Onename.append(filenames)
num += 1
return create_response('success', name=Onename, num=num)
else:
return create_response('image contains no face or more than 1 face')
else:
return create_response('png jpg jpeg bmp are allowed')
except RequestEntityTooLarge:
return create_response('image size should be less than 10M')
# 人脸聚类接口
@app.route('/cluster', methods=['POST'])
def zip_cluster():
try:
f = request.files['file_name']
if f and check_file_format(f.filename, ALLOWED_FILE):
zip_name = secure_filename(f.filename)
f.save('./img/cluster_tmp_file/' + zip_name)
un_zip('./img/cluster_tmp_file/' + zip_name, './img/cluster_tmp_file/')
emb_list, name_list = get_claster_tmp_file_embedding("./img/cluster_tmp_file/" + zip_name.rsplit('.')[0],
retinaface_model,
retinaface_args, arcface_model, cpu_or_cuda)
return create_cluster_response("success", cluster(emb_list, name_list))
else:
return create_response('zip are allowed')
except RequestEntityTooLarge:
return create_response('file size should be less than 100M')
# 视频识别接口
@app.route('/videorecognition', methods=['POST'])
def video_recognition():
try:
f = request.files['file_name']
if f and check_file_format(f.filename, ALLOWED_VIDEO):
video_name = secure_filename(f.filename)
f.save('./video/' + video_name)
detect_video('./video/' + video_name, './videoout/' + video_name, retinaface_model, arcface_model, k_v,
retinaface_args)
return create_response("success")
else:
return create_response('mp4 are allowed')
except RequestEntityTooLarge:
return create_response('file size should be less than 100M')
@app.route('/download/<string:filename>', methods=['GET'])
def download(filename):
if os.path.isfile(os.path.join('./videoout/', filename)):
return send_from_directory('./videoout/', filename, as_attachment=True)
else:
return create_response("Download failed")
if __name__ == '__main__':
k_v = load_npy("./Database/student.npy")
database_name_list = list(k_v.keys())
vector_list = np.array(list(k_v.values()))
print(vector_list.shape)
#print(database_name_list)
nlist = 50
quantizer = faiss.IndexFlatL2(512) # the other index
index = faiss.IndexIVFFlat(quantizer, 512, nlist, faiss.METRIC_L2)
index.train(vector_list)
# index = faiss.IndexFlatL2(512)
index.add(vector_list)
index.nprobe = 50
app.run(host="0.0.0.0", port=5000)