import csv import os import math import time def save_login_action(uuid, data, csv_path): create_login_csv(csv_path) gap = data['gapList'] offset = data['offsetList'] hold_time, inter_time = get_hold_inter(gap) distance = get_distance(offset) pressures = get_pressures(data['pressures']) touch_size = get_touch_size(data['touch_size']) distanceMove = get_distanceMove(data['distanceMoveList']) create_data = time.strftime("%Y-%m-%d-%H:%M:%S", time.localtime(int(time.time()))) try: csv_file = open(csv_path, 'a', newline='', encoding='utf-8') writer = csv.writer(csv_file) row = [create_data, uuid, data['data_stamp'], data['uuid'], data['gesture'], data['gesture_phone'], data['input_duration'], data['username_len'], data['pswd_len'], \ hold_time, inter_time, distance, distanceMove, pressures, touch_size] # data['is_ios']] writer.writerow(row) csv_file.close() return True except BaseException: return False def create_login_csv(base_path): if not os.path.exists(base_path): print(f'{base_path} 不存在,创建该文件。') csv_file = open(base_path, 'w', newline='', encoding='utf-8') writer = csv.writer(csv_file) titles = ['create_date', 'uuid', 'data_stamp', 'label', 'gesture', 'gesture_phone', 'input_duration', 'usrn_len', 'pswd_len', \ 'hold-time', 'inter-time', 'distance', 'distanceMove', 'pressures', 'touch_size'] # 'total_time' writer.writerow(titles) csv_file.close() def get_pressures(pressures): pressures = [p for p in pressures if p is not None] return str(pressures) def get_touch_size(touch_size): touch_size = [t for t in touch_size if t is not None] return str(touch_size) def get_distanceMove(distanceMoveList): distanceMove = [d for d in distanceMoveList if d is not None] return str(distanceMove) def get_hold_inter(gap): hold_time = [] inter_time = [0] for i, item in zip(range(len(gap)), gap): if i % 2 == 0: hold_time.append(item) else: inter_time.append(item) hold_time = str(hold_time) inter_time = str(inter_time) return hold_time, inter_time def get_distance(offset): distance = [0] x1, y1 = offset[0], offset[1] x2, y2 = 0, 0 for i, item in zip(range(len(offset) - 2), offset[2:]): if i % 2 == 0: x2 = item else: y2 = item D = math.pow(math.pow(x2 - x1, 2) + math.pow(y2 - y1, 2), 0.5) distance.append(D) x1 = x2 y1 = y2 return str(distance) def save_sensor(base_path, login_action_uuid, data): # try: is_ios = data['is_ios'] file_path = os.path.join(base_path, login_action_uuid) csv_path = f'{file_path}.csv' csv_file = open(csv_path, 'w', newline='', encoding='utf-8') writer = csv.writer(csv_file) writer.writerow(['accX', 'accY', 'accZ', 'gyroX', 'gyroY', 'gyroZ', 'magX', 'magY', 'magZ']) accelerometer, gyroscope = data['accelerometer_data'], data['gyroscope_data'] length = min(len(accelerometer), len(gyroscope)) print(f'is_ios:{is_ios}') if is_ios: row = [0, 1, 2, 3, 4, 5] for i, acc, gyro in zip(range(1, length + 1), accelerometer, gyroscope): if i % 3 == 1: row[0] = acc row[3] = gyro if i % 3 == 2: row[1] = acc row[4] = gyro if i % 3 == 0: row[2] = acc row[5] = gyro if i % 3 == 0: writer.writerow(row) else: ac, gy, mag = getSimultaneousData(data['accelerometer_data'], data['accelerometer_time'], data['gyroscope_data'], data['gyroscope_time'], data['magnetometer_data'], data['magnetometer_time']) # print("--------------------------------") # print(mag) row = [0, 1, 2, 3, 4, 5, 6, 7, 8] for a, g, m in zip(ac, gy, mag): row[0] = a[0] row[1] = a[1] row[2] = a[2] row[3] = g[0] row[4] = g[1] row[5] = g[2] row[6] = m[0] row[7] = m[1] row[8] = m[2] writer.writerow(row) csv_file.close() return True def save_random_action(uuid, data, csv_path): create_random_csv(csv_path) gap = data['gapList'] offset = data['offsetList'] hold_time, inter_time = get_hold_inter(gap) distance = get_distance(offset) pressures = get_pressures(data['pressures']) touch_size = get_touch_size(data['touch_size']) distanceMove = get_distanceMove(data['distanceMoveList']) create_data = time.strftime("%Y-%m-%d-%H:%M:%S", time.localtime(int(time.time()))) try: csv_file = open(csv_path, 'a', newline='', encoding='utf-8') writer = csv.writer(csv_file) row = [create_data, uuid, data['data_stamp'], data['uuid'], data['gesture'], data['gesture_phone'], data['input_duration'], data['str_len'], hold_time, inter_time, distance, distanceMove, pressures, touch_size] writer.writerow(row) csv_file.close() return True except BaseException: return False def create_random_csv(base_path): if not os.path.exists(base_path): print(f'{base_path} 不存在,创建该文件。') csv_file = open(base_path, 'w', newline='', encoding='utf-8') writer = csv.writer(csv_file) titles = ['create_date', 'uuid', 'data_stamp', 'label', 'gesture', 'gesture_phone', 'input_duration', 'str_len', 'hold-time', 'inter-time', 'distance', 'distanceMove', 'pressures', 'touch_size'] writer.writerow(titles) csv_file.close() def getSimultaneousData(accel, accelTime, gyro, gyroTime, magnet, magnetTime): ac, gy, mag = [], [], [] map_ac, map_gy, map_mag = getMapXX(accel, accelTime), getMapXX(gyro, gyroTime), getMapXX(magnet, magnetTime) ac_keys = list(map_ac.keys()) gy_keys = list(map_gy.keys()) mag_keys = list(map_mag.keys()) error = 5 ac_i, gy_i = 0, 0 while ac_i < len(ac_keys) and gy_i < len(gy_keys): margin_left = ac_keys[ac_i] - error margin_right = ac_keys[ac_i] + error if gy_keys[gy_i] < margin_left: gy_i += 1 elif margin_left <= gy_keys[gy_i] and gy_keys[gy_i] <= margin_right: ac.append(map_ac[ac_keys[ac_i]]) gy.append(map_gy[gy_keys[gy_i]]) # print(ac_keys[ac_i], gy_keys[gy_i]) ac_i += 1 gy_i += 1 elif gy_keys[gy_i] > margin_right: ac_i += 1 ac_i, mag_i = 0, 0 while ac_i < len(ac_keys) and mag_i < len(mag_keys): margin_left = ac_keys[ac_i] - error margin_right = ac_keys[ac_i] + error if mag_keys[mag_i] < margin_left: mag_i += 1 elif margin_left <= mag_keys[mag_i] and mag_keys[mag_i] <= margin_right: mag.append(map_mag[mag_keys[mag_i]]) # print(ac_keys[ac_i], gy_keys[gy_i]) ac_i += 1 mag_i += 1 elif mag_keys[mag_i] > margin_right: ac_i += 1 return ac, gy, mag def getMapXX(xx, xxTime): map_xx = {} j = 0 row = [0, 1, 2] for i, x in zip(range(len(xx)), xx): if i % 3 == 0: row[0] = x if i % 3 == 1: row[1] = x if i % 3 == 2: row[2] = x if i % 3 == 2: map_xx[xxTime[j]] = row.copy() j += 1 return map_xx def getError(l, n, a): gap_sum = 0 bef = l[0] for g in l[1:n]: gap_sum += (g - bef) bef = g error = gap_sum / (n - 1) * a return error