4.21开发报告

发布时间 2023-04-21 22:12:12作者: Joranger

1.昨天开发遇到的问题

昨天开发遇到的问题主要是对于上课不认真的行为的权重有些不合理,转念一想,我觉得任何不认真的行为其本质是一样的,所以我打算将权重去。

2.今天开发的任务

将课堂学生专注度检测模块彻底完成,周六日两天进行总体项目验收。

3.代码

import time
from abc import ABC, abstractmethod

import torch

single_process = True
from queue import Empty

if single_process:
from queue import Queue
from threading import Thread, Lock
else:
from torch.multiprocessing import Queue, Lock
from torch.multiprocessing import Process as Thread

torch.multiprocessing.set_start_method('forkserver', force=True)
torch.multiprocessing.set_sharing_strategy('file_system')

queueSize = 50

TASK_DATA_OK = 0
TASK_DATA_CLOSE = 1
TASK_DATA_IGNORE = 2
TASK_DATA_SKIP = 3
BALANCE_CEILING_VALUE = 50


class DictData(object):
def __init__(self):
pass


class ModuleBalancer:
def __init__(self):
self.max_interval = 0
self.short_stab_interval = self.max_interval
self.short_stab_module = None
self.lock = Lock()
self.ceiling_interval = 0.1

def get_suitable_interval(self, process_interval, module):
with self.lock:
if module == self.short_stab_module:
self.max_interval = (process_interval + self.max_interval) / 2
self.short_stab_interval = module.process_interval if module.skippable else self.max_interval
return 0
elif process_interval > self.short_stab_interval:
self.short_stab_module = module
self.max_interval = process_interval
self.short_stab_interval = module.process_interval if module.skippable else self.max_interval
return 0
else:
return max(min(self.max_interval - process_interval, self.ceiling_interval), 0)


class TaskData:
def __init__(self, task_stage, task_flag=TASK_DATA_OK):
self.data = DictData()
self.task_stage = task_stage
self.task_flag = task_flag


class TaskStage:
def __init__(self):
self.next_module = None
self.next_stage = None

def to_next_stage(self, task_data: TaskData):
self.next_module().put_task_data(task_data)
task_data.task_stage = self.next_stage


class BaseModule(ABC):
def __init__(self, balancer=None, skippable=True):
self.skippable = skippable
self.ignore_task_data = TaskData(task_stage=None, task_flag=TASK_DATA_IGNORE)
self.queue = Queue(maxsize=queueSize)
self.balancer: ModuleBalancer = balancer
self.process_interval = 0.01
self.process_interval_scale = 1
print(f'created: {self}')

@abstractmethod
def process_data(self, data):
pass

@abstractmethod
def open(self):
self.running = True
pass

def _run(self):
self.running = True
self.open()
while self.running:
task_data = self.product_task_data()
# 执行条件
execute_condition = task_data.task_flag == TASK_DATA_OK
execute_condition = execute_condition or (task_data.task_flag == TASK_DATA_SKIP and not self.skippable)
# 执行和执行结果
start_time = time.time()
execute_result = self.process_data(task_data.data) if execute_condition else task_data.task_flag
process_interval = min((time.time() - start_time) * self.process_interval_scale, BALANCE_CEILING_VALUE)
task_stage = task_data.task_stage
if execute_result != TASK_DATA_SKIP:
# if str(self).__contains__("FaceEncodingModule"):
# print(process_interval, self.queue.qsize())
self.process_interval = process_interval
else:
task_data.task_flag = TASK_DATA_SKIP
if execute_result == TASK_DATA_IGNORE:
continue
else:
if execute_result == TASK_DATA_CLOSE:
task_data.task_flag = TASK_DATA_CLOSE
self.close()
if task_stage.next_stage is not None:
task_stage.to_next_stage(task_data)
if self.balancer is not None:
suitable_interval = self.balancer.get_suitable_interval(process_interval, self)
# print( f'process: {process_interval} s,wait: {suitable_interval}s, class: {self},queue: {self.queue.qsize()}')
if suitable_interval > 0:
time.sleep(suitable_interval)

def start(self):
p = Thread(target=self._run, args=())
p.start()
self.result_worker = p
return p

def put_task_data(self, task_data):
self.queue.put(task_data)
self._refresh_process_interval_scale()

def _refresh_process_interval_scale(self):
self.process_interval_scale = max(self.queue.qsize(), 1)

def product_task_data(self):
try:
task_data = self.queue.get(block=True, timeout=1)
self._refresh_process_interval_scale()
return task_data
except Empty:
return self.ignore_task_data

def close(self):
print(f'closing: {self}')
self.running = False

def wait_for_end(self):
self.result_worker.join()
from pipeline_module.core.base_module import BaseModule, TaskStage, ModuleBalancer


class TaskSolution:

def __init__(self):
self.modules = []
self.start_stage: TaskStage = TaskStage()
self.current_stage: TaskStage = self.start_stage
self.source_module = None
self.balancer = ModuleBalancer()

def set_source_module(self, source_module):
source_module.balancer = self.balancer
self.source_module = source_module
source_module.task_stage = self.start_stage
return self

def set_next_module(self, next_module: BaseModule):
next_module.balancer = self.balancer
next_stage = TaskStage()
self.current_stage.next_module = lambda: next_module
self.current_stage.next_stage = next_stage
self.current_stage = next_stage
self.modules.append(next_module)
return self

def start(self):
for module in self.modules:
print(f'starting modules {module}')
module.start()
self.source_module.start()

def wait_for_end(self):
self.source_module.wait_for_end()
for module in self.modules:
module.wait_for_end()

def close(self):
self.source_module.close()
for module in self.modules:
print(f'closing modules {module}')
module.close()
import numpy as np
import torch

from models.action_analysis import CheatingActionAnalysis
from models.classroom_action_classifier import ClassroomActionClassifier
from models.concentration_evaluator import ConcentrationEvaluator
from models.pose_estimator import PnPPoseEstimator
from pipeline_module.core.base_module import TASK_DATA_OK, BaseModule

peep_threshold = -60 # 第一个视频使用阈值-50


class CheatingActionModule(BaseModule):
raw_class_names = ["seat", "write", "stretch", "hand_up_R", "hand_up_L",
"hand_up_highly_R", "hand_up_highly_L",
"relax", "hand_up", "pass_R", "pass_L", "pass2_R", "pass2_L",
"turn_round_R", "turn_round_L", "turn_head_R", "turn_head_L",
"sleep", "lower_head"]
class_names = ["正常", "传纸条", "低头偷看", "东张西望"]
use_keypoints = [x for x in range(11)] + [17, 18, 19]
class_of_passing = [9, 10, 11, 12]
class_of_peep = [18]
class_of_gazing_around = [13, 14, 15, 16]

# 0 正常坐姿不动
# 1 正常低头写字
# 2 正常伸懒腰
# 3 举右手低
# 4 举左手低
# 5 举右手高
# 6 举左手高
# 7 起立
# 8 抬手
# 9 右伸手
# 10 左伸手
# 11 右伸手2
# 12 左伸手2
# 13 右转身
# 14 左转身
# 15 右转头
# 16 左转头
# 17 上课睡觉
# 18 严重低头

def __init__(self, weights, device='cpu', img_size=(480, 640), skippable=True):
super(CheatingActionModule, self).__init__(skippable=skippable)
self.weights = weights
self.classifier = ClassroomActionClassifier(weights, device)
self.pnp = PnPPoseEstimator(img_size=img_size)

def process_data(self, data):
data.num_of_cheating = 0
data.num_of_normal = 0
data.num_of_passing = 0
data.num_of_peep = 0
data.num_of_gazing_around = 0
if data.detections.shape[0] > 0:
# 行为识别
data.classes_probs = self.classifier.classify(data.keypoints[:, self.use_keypoints])
# 最佳行为分类
data.raw_best_preds = torch.argmax(data.classes_probs, dim=1)
data.best_preds = [self.reclassify(idx) for idx in data.raw_best_preds]
data.raw_classes_names = self.raw_class_names
data.classes_names = self.class_names
# 头背部姿态估计
data.head_pose = [self.pnp.solve_pose(kp) for kp in data.keypoints[:, 26:94, :2].numpy()]
data.draw_axis = self.pnp.draw_axis
data.head_pose_euler = [self.pnp.get_euler(*vec) for vec in data.head_pose]
# 传递动作识别
is_passing_list = CheatingActionAnalysis.is_passing(data.keypoints)
# 头部姿态辅助判断转头
for i in range(len(data.best_preds)):
if data.best_preds[i] == 0:
if is_passing_list[i] != 0:
data.best_preds[i] = 1
elif data.head_pose_euler[i][1][0] < peep_threshold:
data.best_preds[i] = 2
data.pred_class_names = [self.class_names[i] for i in data.best_preds]
# 统计人数
data.num_of_normal = data.best_preds.count(0)
data.num_of_passing = data.best_preds.count(1)
data.num_of_peep = data.best_preds.count(2)
data.num_of_gazing_around = data.best_preds.count(3)
data.num_of_cheating = data.detections.shape[0] - data.num_of_normal

return TASK_DATA_OK

def reclassify(self, class_idx):
if class_idx in self.class_of_passing:
return 1
elif class_idx in self.class_of_peep:
return 2
elif class_idx in self.class_of_gazing_around:
return 3
else:
return 0

def open(self):
super(CheatingActionModule, self).open()
pass


class ConcentrationEvaluationModule(BaseModule):
use_keypoints = [x for x in range(11)] + [17, 18, 19]
face_hidden_threshold = 0.005
mouth_hidden_threshold = 0

def __init__(self, weights, device='cpu', img_size=(480, 640), skippable=True):
super(ConcentrationEvaluationModule, self).__init__(skippable=skippable)
self.classifier = ClassroomActionClassifier(weights, device) # 行为识别
self.pnp = PnPPoseEstimator(img_size=img_size) # 头部姿态估计
self.concentration_evaluator = ConcentrationEvaluator() # 模糊综合分析

def process_data(self, data):
if data.detections.shape[0] > 0:
# 行为识别
data.classes_probs = self.classifier.classify(data.keypoints[:, self.use_keypoints])
data.raw_best_preds = torch.argmax(data.classes_probs, dim=1) # 最佳行为分类
# 头背部姿态估计
data.head_pose = np.array([self.pnp.solve_pose(kp) for kp in data.keypoints[:, 26:94, :2].numpy()])
data.draw_axis = self.pnp.draw_axis
data.head_pose_euler = np.array([self.pnp.get_euler(*vec) for vec in data.head_pose])
data.pitch_euler = np.array([euler[1] for euler in data.head_pose_euler])
# 面部遮挡
face_scores = data.keypoints_scores[:, 26:94].numpy().squeeze(2)
face_hidden = np.max(face_scores[:, 0:48], axis=1) < self.face_hidden_threshold
mouth_hidden = np.max(face_scores[:, 48:68], axis=1) < self.mouth_hidden_threshold
# 面部表情分类
face_landmarks = data.keypoints[:, 26:94].numpy()
face_preds = self.concentration_evaluator.get_expressions(face_landmarks)
# 模糊综合分析
data.concentration_evaluation = self.concentration_evaluator.evaluate(data.raw_best_preds.numpy(),
face_preds,
data.pitch_euler,
face_hidden | mouth_hidden
)

def open(self):
super(ConcentrationEvaluationModule, self).open()


if __name__ == '__main__':
print(np.bincount(np.array([1, 1, 1, 2, 2, 2, 3, 3, 3]) > 2))
print(np.mean(np.array([
[[1], [2], [3]],
[[2], [2], [4]]
]).squeeze(2), axis=1) > 2)