diff --git a/python/quadruped_reactive_walking/Controller.py b/python/quadruped_reactive_walking/Controller.py index 7ac338a3cfd1765bc6393960fbc5c3a4c215eb8e..20f407191d930976c395918932493cba24737450 100644 --- a/python/quadruped_reactive_walking/Controller.py +++ b/python/quadruped_reactive_walking/Controller.py @@ -4,7 +4,7 @@ import numpy as np import pinocchio as pin import pybullet as pyb -from . import MPC_Wrapper, quadruped_reactive_walking as qrw +from . import WB_MPC_Wrapper from .solo3D.utils import quaternionToRPY from .tools.utils_mpc import init_robot @@ -56,100 +56,12 @@ class Controller: q_init (array): initial position of actuators t (float): time of the simulation """ - self.DEMONSTRATION = params.DEMONSTRATION - self.SIMULATION = params.SIMULATION - self.solo3D = params.solo3D - self.k_mpc = int(params.dt_mpc / params.dt_wbc) - self.type_MPC = params.type_MPC self.enable_pyb_GUI = params.enable_pyb_GUI - self.enable_corba_viewer = params.enable_corba_viewer - self.q_security = np.array([1.2, 2.1, 3.14] * 4) - self.solo = init_robot(q_init, params) - - self.joystick = qrw.Joystick() - self.joystick.initialize(params) - - self.gait = qrw.Gait() - self.gait.initialize(params) - - self.estimator = qrw.Estimator() - self.estimator.initialize(params) - - self.wbcWrapper = qrw.WbcWrapper() - self.wbcWrapper.initialize(params) - - self.h_ref = params.h_ref - self.q_init = np.hstack((np.zeros(6), q_init.copy())) - self.q_init[2] = params.h_ref - self.q_display = np.zeros(19) - - self.mpc_wrapper = MPC_Wrapper.MPC_Wrapper(params, self.q_init) - self.next_footstep = np.zeros((3, 4)) - - self.enable_multiprocessing_mip = params.enable_multiprocessing_mip - if params.solo3D: - from .solo3D.SurfacePlannerWrapper import Surface_planner_wrapper - - self.surfacePlanner = Surface_planner_wrapper(params) - self.statePlanner = qrw.StatePlanner3D() - self.statePlanner.initialize(params) - - self.footstepPlanner = qrw.FootstepPlannerQP() - self.footstepPlanner.initialize( - params, self.gait, self.surfacePlanner.floor_surface - ) - - self.footTrajectoryGenerator = qrw.FootTrajectoryGeneratorBezier() - self.footTrajectoryGenerator.initialize( - params, self.gait, self.surfacePlanner.floor_surface - ) - if self.SIMULATION: - from .solo3D.pyb_environment_3D import PybEnvironment3D - - self.pybEnvironment3D = PybEnvironment3D( - params, - self.gait, - self.statePlanner, - self.footstepPlanner, - self.footTrajectoryGenerator, - ) - self.update_mip = False - else: - self.statePlanner = qrw.StatePlanner() - self.statePlanner.initialize(params) - - self.footstepPlanner = qrw.FootstepPlanner() - self.footstepPlanner.initialize(params, self.gait) - - self.footTrajectoryGenerator = qrw.FootTrajectoryGenerator() - self.footTrajectoryGenerator.initialize(params, self.gait) - - # ForceMonitor to display contact forces in PyBullet with red lines - # import ForceMonitor - # myForceMonitor = ForceMonitor.ForceMonitor(pyb_sim.robotId, pyb_sim.planeId) - - self.t = t - - self.k = 0 - - self.q = np.zeros(18) - self.q_wbc = np.zeros(18) - self.dq_wbc = np.zeros(18) - self.base_targets = np.zeros(12) - - self.filter_q = qrw.Filter() - self.filter_q.initialize(params) - self.filter_h_v = qrw.Filter() - self.filter_h_v.initialize(params) - self.filter_vref = qrw.Filter() - self.filter_vref.initialize(params) + self.mpc = WB_MPC_Wrapper.MPC_Wrapper(params) self.error = False - self.last_q_perfect = np.zeros(6) - self.last_b_vel = np.zeros(3) - self.n_nan = 0 self.result = Result(params) device = DummyDevice() @@ -164,99 +76,29 @@ class Controller: """ t_start = time.time() - self.joystick.update_v_ref(self.k, self.gait.is_static()) - - q_perfect, b_baseVel_perfect = self.get_perfect_data(qc, device) - - oRh, hRb, oTh = self.run_estimator(device, q_perfect, b_baseVel_perfect) - - t_filter = time.time() - self.t_filter = t_filter - t_start - - self.gait.update(self.k, self.k_mpc, self.joystick.get_joystick_code()) - gait_matrix = self.gait.matrix - - if self.solo3D: - self.retrieve_surfaces() + try: + self.mpc_wrapper.solve() + except ValueError: + print("MPC Problem") - self.next_footstep = self.footstepPlanner.update_footsteps( - self.k % self.k_mpc == 0 and self.k != 0, - int(self.k_mpc - self.k % self.k_mpc), - self.q, - self.h_v_windowed, - self.v_ref, - ) - footsteps = self.footstepPlanner.get_footsteps() - - self.statePlanner.compute_reference_states( - self.q_filtered[:6], self.h_v_filtered, self.vref_filtered - ) - reference_state = self.statePlanner.get_reference_states() - - if self.solo3D and self.update_mip: - self.call_planner() - - t_planner = time.time() - self.t_planner = t_planner - t_filter - - self.solve_MPC(reference_state, footsteps, oRh, oTh) - - t_mpc = time.time() - self.t_mpc = t_mpc - t_planner - - if self.solo3D: - self.footTrajectoryGenerator.update( - self.k, - self.next_footstep, - self.surfacePlanner.selected_surfaces, - self.q_filtered, - ) - else: - self.footTrajectoryGenerator.update(self.k, self.next_footstep) - - if not self.error and not self.joystick.get_stop(): - - self.get_base_targets(reference_state, hRb) - - self.get_feet_targets(reference_state, oRh, oTh, hRb) - - self.q_wbc[3:5] = self.q_filtered[3:5] - self.q_wbc[6:] = self.wbcWrapper.qdes - self.dq_wbc[:6] = self.estimator.get_v_estimate()[:6] - self.dq_wbc[6:] = self.wbcWrapper.vdes - - self.wbcWrapper.compute( - self.q_wbc, - self.dq_wbc, - self.mpc_result[12:24, 0:1].copy(), - np.array([gait_matrix[0, :]]), - self.feet_p_cmd, - self.feet_v_cmd, - self.feet_a_cmd, - self.base_targets, - ) + self.mpc_result, self.mpc_cost = self.mpc_wrapper.get_latest_result() - self.result.q_des = self.wbcWrapper.qdes - self.result.v_des = self.wbcWrapper.vdes - self.result.tau_ff = self.wbcWrapper.tau_ff + if not self.error: + self.result.P = np.array(self.params.Kp_main.tolist() * 4) + self.result.D = np.array(self.params.Kd_main.tolist() * 4) + self.result.FF = self.params.Kff_main * np.ones(12) + self.result.q_des = np.zeros(12) + self.result.v_des = np.zeros(12) + self.result.tau_ff = np.zeros(12) - self.t_wbc = time.time() - t_mpc + self.t_wbc = time.time() - t_start self.clamp_result(device) self.security_check() if self.error or self.joystick.get_stop(): self.set_null_control() - - if self.enable_corba_viewer and (self.k % 5 == 0): - self.display_robot() - - if not self.solo3D: - self.pyb_camera(device) - else: - if self.SIMULATION: - self.pybEnvironment3D.update(self.k) - - self.pyb_debug(device, footsteps, gait_matrix, reference_state) + + self.pyb_camera(device) self.t_loop = time.time() - t_start self.k += 1 @@ -276,338 +118,6 @@ class Controller: cameraTargetPosition=[device.height[0], device.height[1], 0.0], ) - def pyb_debug(self, device, footsteps, gait_matrix, xref): - - if self.k > 1 and self.enable_pyb_GUI: - # Display desired feet positions in WBC as green spheres - oTh_pyb = device.base_position.reshape((-1, 1)) - oRh_pyb = pin.rpy.rpyToMatrix(0.0, 0.0, device.imu.attitude_euler[2]) - for i in range(4): - if not self.solo3D: - pos = oRh_pyb @ self.feet_p_cmd[:, i : (i + 1)] + oTh_pyb - pyb.resetBasePositionAndOrientation( - device.pyb_sim.ftps_Ids_deb[i], pos[:, 0].tolist(), [0, 0, 0, 1] - ) - else: - pos = self.next_footstep[:, i] - pyb.resetBasePositionAndOrientation( - device.pyb_sim.ftps_Ids_deb[i], pos, [0, 0, 0, 1] - ) - - # Display desired footstep positions as blue spheres - for i in range(4): - j = 0 - c = 1 - status = gait_matrix[0, i] - while c < gait_matrix.shape[0] and j < device.pyb_sim.ftps_Ids.shape[1]: - while c < gait_matrix.shape[0] and gait_matrix[c, i] == status: - c += 1 - if c < gait_matrix.shape[0]: - status = gait_matrix[c, i] - if status: - pos = ( - oRh_pyb - @ footsteps[c, (3 * i) : (3 * (i + 1))].reshape((-1, 1)) - + oTh_pyb - - np.array([[0.0], [0.0], [oTh_pyb[2, 0]]]) - ) - pyb.resetBasePositionAndOrientation( - device.pyb_sim.ftps_Ids[i, j], - pos[:, 0].tolist(), - [0, 0, 0, 1], - ) - else: - pyb.resetBasePositionAndOrientation( - device.pyb_sim.ftps_Ids[i, j], - [0.0, 0.0, -0.1], - [0, 0, 0, 1], - ) - j += 1 - - # Hide unused spheres underground - for k in range(j, device.pyb_sim.ftps_Ids.shape[1]): - pyb.resetBasePositionAndOrientation( - device.pyb_sim.ftps_Ids[i, k], [0.0, 0.0, -0.1], [0, 0, 0, 1] - ) - - # Display reference trajectory - xref_rot = np.zeros((3, xref.shape[1])) - for i in range(xref.shape[1]): - xref_rot[:, i : (i + 1)] = ( - oRh_pyb @ xref[:3, i : (i + 1)] - + oTh_pyb - + np.array([[0.0], [0.0], [0.05 - self.h_ref]]) - ) - - if len(device.pyb_sim.lineId_red) == 0: - for i in range(xref.shape[1] - 1): - device.pyb_sim.lineId_red.append( - pyb.addUserDebugLine( - xref_rot[:3, i].tolist(), - xref_rot[:3, i + 1].tolist(), - lineColorRGB=[1.0, 0.0, 0.0], - lineWidth=8, - ) - ) - else: - for i in range(xref.shape[1] - 1): - device.pyb_sim.lineId_red[i] = pyb.addUserDebugLine( - xref_rot[:3, i].tolist(), - xref_rot[:3, i + 1].tolist(), - lineColorRGB=[1.0, 0.0, 0.0], - lineWidth=8, - replaceItemUniqueId=device.pyb_sim.lineId_red[i], - ) - - # Display predicted trajectory - mpc_result_rot = np.zeros((3, self.mpc_result.shape[1])) - for i in range(self.mpc_result.shape[1]): - mpc_result_rot[:, i : (i + 1)] = ( - oRh_pyb @ self.mpc_result[:3, i : (i + 1)] - + oTh_pyb - + np.array([[0.0], [0.0], [0.05 - self.h_ref]]) - ) - - if len(device.pyb_sim.lineId_blue) == 0: - for i in range(self.mpc_result.shape[1] - 1): - device.pyb_sim.lineId_blue.append( - pyb.addUserDebugLine( - mpc_result_rot[:3, i].tolist(), - mpc_result_rot[:3, i + 1].tolist(), - lineColorRGB=[0.0, 0.0, 1.0], - lineWidth=8, - ) - ) - else: - for i in range(self.mpc_result.shape[1] - 1): - device.pyb_sim.lineId_blue[i] = pyb.addUserDebugLine( - mpc_result_rot[:3, i].tolist(), - mpc_result_rot[:3, i + 1].tolist(), - lineColorRGB=[0.0, 0.0, 1.0], - lineWidth=8, - replaceItemUniqueId=device.pyb_sim.lineId_blue[i], - ) - - def get_perfect_data(self, qc, device): - """ - Retrieve perfect data from motion capture or simulator - Check if nan and send error if more than 5 nans in a row - - @param qc qualisys client for motion capture - @param device device structure holding simulation data - @return q_perfect 6D perfect position of the base in world frame - @return v_baseVel_perfect 3D perfect linear velocity of the base in base frame - """ - q_perfect = np.zeros(6) - b_baseVel_perfect = np.zeros(3) - if self.solo3D and qc == None: - q_perfect[:3] = device.base_position - q_perfect[3:] = device.imu.attitude_euler - b_baseVel_perfect = device.b_base_velocity - elif self.solo3D and qc != None: - if self.k <= 1: - self.initial_pos = [0.0, 0.0, -0.046] - q_perfect[:3] = qc.getPosition() - self.initial_pos - q_perfect[3:] = quaternionToRPY(qc.getOrientationQuat()) - b_baseVel_perfect = ( - qc.getOrientationMat9().transpose() @ qc.getVelocity().reshape((3, 1)) - ).ravel() - - if np.isnan(np.sum(q_perfect)): - print("Error: nan values in perfect position of the robot") - q_perfect = self.last_q_perfect - self.n_nan += 1 - if not np.any(self.last_q_perfect) or self.n_nan >= 5: - self.error = True - elif np.isnan(np.sum(b_baseVel_perfect)): - print("Error: nan values in perfect velocity of the robot") - b_baseVel_perfect = self.last_b_vel - self.n_nan += 1 - if not np.any(self.last_b_vel) or self.n_nan >= 5: - self.error = True - else: - self.last_q_perfect = q_perfect - self.last_b_vel = b_baseVel_perfect - self.n_nan = 0 - - return q_perfect, b_baseVel_perfect - - def run_estimator(self, device, q_perfect, b_baseVel_perfect): - """ - Call the estimator and retrieve the reference and estimated quantities. - Run a filter on q, h_v and v_ref. - - @param device device structure holding simulation data - @param q_perfect 6D perfect position of the base in world frame - @param v_baseVel_perfect 3D perfect linear velocity of the base in base frame - """ - - self.estimator.run( - self.gait.matrix, - self.footTrajectoryGenerator.get_foot_position(), - device.imu.linear_acceleration, - device.imu.gyroscope, - device.imu.attitude_euler, - device.joints.positions, - device.joints.velocities, - q_perfect, - b_baseVel_perfect, - ) - - self.estimator.update_reference_state(self.joystick.get_v_ref()) - - oRh = self.estimator.get_oRh() - hRb = self.estimator.get_hRb() - oTh = self.estimator.get_oTh().reshape((3, 1)) - - self.v_ref = self.estimator.get_base_vel_ref() - self.h_v = self.estimator.get_h_v() - self.h_v_windowed = self.estimator.get_h_v_filtered() - if self.solo3D: - self.q[:3] = self.estimator.get_q_estimate()[:3] - self.q[6:] = self.estimator.get_q_estimate()[7:] - self.q[3:6] = quaternionToRPY(self.estimator.get_q_estimate()[3:7]).ravel() - else: - self.q = self.estimator.get_q_reference() - self.v = self.estimator.get_v_reference() - - self.q_filtered = self.q.copy() - self.q_filtered[:6] = self.filter_q.filter(self.q[:6], True) - self.h_v_filtered = self.filter_h_v.filter(self.h_v, False) - self.vref_filtered = self.filter_vref.filter(self.v_ref, False) - - return oRh, hRb, oTh - - def retrieve_surfaces(self): - """ - Get last surfaces computed by the planner and send them to the footstep adapter - """ - self.update_mip = self.k % self.k_mpc == 0 and self.gait.is_new_step() - if self.update_mip: - self.statePlanner.update_surface(self.q_filtered[:6], self.vref_filtered) - if self.surfacePlanner.initialized: - self.error = self.surfacePlanner.get_latest_results() - - self.footstepPlanner.update_surfaces( - self.surfacePlanner.potential_surfaces, - self.surfacePlanner.selected_surfaces, - self.surfacePlanner.mip_success, - self.surfacePlanner.mip_iteration, - ) - - def call_planner(self): - """ - Call the planner and show the result in simulation - """ - configs = self.statePlanner.get_configurations().transpose() - self.surfacePlanner.run( - configs, self.gait.matrix, self.next_footstep, self.vref_filtered[:3] - ) - self.surfacePlanner.initialized = True - if not self.enable_multiprocessing_mip and self.SIMULATION: - self.pybEnvironment3D.update_target_SL1M( - self.surfacePlanner.all_feet_pos_syn - ) - - def solve_MPC(self, reference_state, footsteps, oRh, oTh): - """ - Call the MPC and store result in self.mpc_result. Update target footsteps if - necessary - - @param reference_state reference centroideal state trajectory - @param footsteps footsteps positions over horizon - @param oRh rotation between the world and horizontal frame - @param oTh translation between the world and horizontal frame - """ - if (self.k % self.k_mpc) == 0: - try: - if self.type_MPC == 3: - l_targetFootstep = oRh.transpose() @ (self.next_footstep - oTh) - self.mpc_wrapper.solve( - self.k, - reference_state, - footsteps, - self.gait.matrix, - l_targetFootstep, - oRh, - oTh, - self.footTrajectoryGenerator.get_foot_position(), - self.footTrajectoryGenerator.get_foot_velocity(), - self.footTrajectoryGenerator.get_foot_acceleration(), - self.footTrajectoryGenerator.get_foot_jerk(), - self.footTrajectoryGenerator.get_phase_durations() - - self.footTrajectoryGenerator.get_elapsed_durations(), - ) - else: - self.mpc_wrapper.solve( - self.k, - reference_state, - footsteps, - self.gait.matrix, - np.zeros((3, 4)), - ) - except ValueError: - print("MPC Problem") - self.mpc_result, self.mpc_cost = self.mpc_wrapper.get_latest_result() - - if self.k > 100 and self.type_MPC == 3: - for foot in range(4): - if self.gait.matrix[0, foot] == 0: - id = 0 - while self.gait.matrix[id, foot] == 0: - id += 1 - self.next_footstep[:2, foot] = self.mpc_result[ - 24 + 2 * foot : 24 + 2 * foot + 2, id + 1 - ] - - if self.DEMONSTRATION and self.gait.is_static(): - self.mpc_result[12:24, 0] = [0.0, 0.0, 9.81 * 2.5 / 4.0] * 4 - - def get_base_targets(self, reference_state, hRb): - """ - Retrieve the base position and velocity targets - - @params reference_state reference centroideal state trajectory - @params hRb rotation between the horizontal and base frame - """ - if self.DEMONSTRATION and self.gait.is_static(): - hRb = np.eye(3) - - self.base_targets[:6] = np.zeros(6) - if self.DEMONSTRATION and self.joystick.get_l1() and self.gait.is_static(): - p_ref = self.joystick.get_p_ref() - self.base_targets[[3, 4]] = p_ref[[3, 4]] - self.h_ref = p_ref[2] - hRb = pin.rpy.rpyToMatrix(0.0, 0.0, self.p_ref[5]) - else: - self.base_targets[[3, 4]] = reference_state[[3, 4], 1] - self.h_ref = self.q_init[2] - self.base_targets[6:] = self.vref_filtered - - return hRb - - def get_feet_targets(self, reference_state, oRh, oTh, hRb): - """ - Retrieve the feet positions, velocities and accelerations to send to the WBC - (in base frame) - - @params reference_state reference centroideal state trajectory - @params footsteps footsteps positions over horizon - @params oRh rotation between the world and horizontal frame - @params oTh translation between the world and horizontal frame - """ - if self.solo3D: - T = -np.array([0.0, 0.0, reference_state[2, 1]]).reshape((3, 1)) - T[:2] = -self.q_filtered[:2].reshape((-1, 1)) - R = pin.rpy.rpyToMatrix(0.0, 0.0, self.q_filtered[5]).transpose() - else: - T = -oTh - np.array([0.0, 0.0, self.h_ref]).reshape((3, 1)) - R = hRb @ oRh.transpose() - - self.feet_a_cmd = R @ self.footTrajectoryGenerator.get_foot_acceleration() - self.feet_v_cmd = R @ self.footTrajectoryGenerator.get_foot_velocity() - self.feet_p_cmd = R @ (self.footTrajectoryGenerator.get_foot_position() + T) - def security_check(self): """ Check if the command is fine and set the command to zero in case of error @@ -682,17 +192,6 @@ class Controller: print("Clamping torque of motor n " + str(i)) self.error = set_error - def display_robto(self): - """ - Display the robot in corba viewer - """ - self.q_display[:3] = self.q_wbc[:3] - self.q_display[3:7] = pin.Quaternion( - pin.rpy.rpyToMatrix(self.q_wbc[3:6]) - ).coeffs() - self.q_display[7:] = self.q_wbc[6:] - self.solo.display(self.q_display) - def set_null_control(self): """ Send default null values to the robot diff --git a/python/quadruped_reactive_walking/main_solo12_control.py b/python/quadruped_reactive_walking/main_solo12_control.py index fa6b02439b6a9f08cffb4e8f6633ec06885190e5..83b8d647eb6ebfcc991d68e579fae27d87a5d458 100644 --- a/python/quadruped_reactive_walking/main_solo12_control.py +++ b/python/quadruped_reactive_walking/main_solo12_control.py @@ -5,8 +5,7 @@ import numpy as np from . import quadruped_reactive_walking as qrw from .Controller import Controller -from .tools.LoggerControl import LoggerControl -from .tools.LoggerSensors import LoggerSensors +from .tools.LoggerControlSimple import LoggerControl params = qrw.Params() # Object that holds all controller parameters @@ -113,7 +112,7 @@ def damp_control(device, nb_motors): t += params.dt_wbc -def control_loop(des_vel_analysis=None): +def control_loop(): """ Main function that calibrates the robot, get it into a default waiting position then launch the main control loop once the user has pressed the Enter key @@ -125,26 +124,8 @@ def control_loop(des_vel_analysis=None): params.enable_pyb_GUI = False q_init = np.array(params.q_init.tolist()) # Default position after calibration - - if params.SIMULATION and (des_vel_analysis is not None): - print( - "Analysis: %1.1f %1.1f %1.1f" - % (des_vel_analysis[0], des_vel_analysis[1], des_vel_analysis[5]) - ) - acceleration_rate = 0.1 # m/s^2 - steady_state_duration = 3 # s - N_analysis = ( - int(np.max(np.abs(des_vel_analysis)) / acceleration_rate / params.dt_wbc) - + 1 - ) - N_steady = int(steady_state_duration / params.dt_wbc) - params.N_SIMULATION = N_analysis + N_steady - controller = Controller(params, q_init, 0.0) - if params.SIMULATION and (des_vel_analysis is not None): - controller.joystick.update_for_analysis(des_vel_analysis, N_analysis, N_steady) - if params.SIMULATION: device = PyBulletSimulator() qc = None @@ -153,10 +134,7 @@ def control_loop(des_vel_analysis=None): qc = QualisysClient(ip="140.93.16.160", body_id=0) if params.LOGGING or params.PLOTTING: - loggerSensors = LoggerSensors( - device, qualisys=qc, logSize=params.N_SIMULATION - 3 - ) - loggerControl = LoggerControl(params, logSize=params.N_SIMULATION - 3) + loggerControl = LoggerControl(params, log_size=params.N_SIMULATION - 3) if params.SIMULATION: device.Init( @@ -166,8 +144,6 @@ def control_loop(des_vel_analysis=None): params.enable_pyb_GUI, params.dt_wbc, ) - # import ForceMonitor - # myForceMonitor = ForceMonitor.ForceMonitor(device.pyb_sim.robotId, device.pyb_sim.planeId) else: device.initialize(q_init[:]) device.joints.set_zero_commands() @@ -183,12 +159,6 @@ def control_loop(des_vel_analysis=None): k_log_whole = 0 T_whole = time.time() dT_whole = 0.0 - - log_Mddq = np.zeros((params.N_SIMULATION, 6)) - log_NLE = np.zeros((params.N_SIMULATION, 6)) - log_JcTf = np.zeros((params.N_SIMULATION, 6)) - log_Mddq_out = np.zeros((params.N_SIMULATION, 6)) - log_JcTf_out = np.zeros((params.N_SIMULATION, 6)) while (not device.is_timeout) and (t < t_max) and (not controller.error): t_start_whole = time.time() @@ -208,35 +178,13 @@ def control_loop(des_vel_analysis=None): controller.result.FF * controller.result.tau_ff.ravel() ) - log_Mddq[k_log_whole] = controller.wbcWrapper.Mddq - log_NLE[k_log_whole] = controller.wbcWrapper.NLE - log_JcTf[k_log_whole] = controller.wbcWrapper.JcTf - log_Mddq_out[k_log_whole] = controller.wbcWrapper.Mddq_out - log_JcTf_out[k_log_whole] = controller.wbcWrapper.JcTf_out - - # Call logger if params.LOGGING or params.PLOTTING: - loggerSensors.sample(device, qc) - loggerControl.sample( - controller.joystick, - controller.estimator, - controller, - controller.gait, - controller.statePlanner, - controller.footstepPlanner, - controller.footTrajectoryGenerator, - controller.wbcWrapper, - dT_whole, - ) + loggerControl.sample(device, qc, controller) t_end_whole = time.time() - # myForceMonitor.display_contact_forces() - - device.send_command_and_wait_end_of_cycle( - params.dt_wbc - ) # Send command to the robot - t += params.dt_wbc # Increment loop time + device.send_command_and_wait_end_of_cycle(params.dt_wbc) + t += params.dt_wbc dT_whole = T_whole T_whole = time.time() @@ -253,10 +201,6 @@ def control_loop(des_vel_analysis=None): print("Stopping parallel process MPC") controller.mpc_wrapper.stop_parallel_loop() - if params.solo3D and params.enable_multiprocessing_mip: - print("Stopping parallel process MIP") - controller.surfacePlanner.stop_parallel_loop() - # **************************************************************** # Send 0 torques to the motors. @@ -265,27 +209,22 @@ def control_loop(des_vel_analysis=None): if device.is_timeout: print("Masterboard timeout detected.") - print( - "Either the masterboard has been shut down or there has been a connection issue with the cable/wifi." - ) if params.LOGGING: log_path = Path("/tmp") / "logs" log_path.mkdir(parents=True) - loggerControl.saveAll(loggerSensors, str(log_path / "data")) + loggerControl.save(str(log_path / "data")) if params.PLOTTING: - loggerControl.plotAllGraphs(loggerSensors) + loggerControl.plot() if params.SIMULATION and params.enable_pyb_GUI: device.Stop() print("End of script") - return finished, des_vel_analysis if __name__ == "__main__": # os.nice(-20) - f, v = control_loop() # , np.array([1.5, 0.0, 0.0, 0.0, 0.0, 0.0])) - print(f, v) + control_loop() quit() diff --git a/python/quadruped_reactive_walking/tools/LoggerControlSimple.py b/python/quadruped_reactive_walking/tools/LoggerControlSimple.py new file mode 100644 index 0000000000000000000000000000000000000000..edeb92e471bcbf21d99524a1f9e511b48a7cdcfc --- /dev/null +++ b/python/quadruped_reactive_walking/tools/LoggerControlSimple.py @@ -0,0 +1,275 @@ +"""This class will log 1d array in Nd matrix from device and qualisys object""" +from datetime import datetime +from time import time +from pathlib import Path + +import numpy as np +import pinocchio as pin + + +class LoggerControl: + def __init__(self, params, log_size=60e3, loop_buffer=False): + self.log_size = np.int(log_size) + self.i = 0 + self.dt = params.dt_wbc + self.loop_buffer = loop_buffer + + def intialize(self): + size = self.log_size + + # TODO: ADD WHAT YOU WANT TO LOG + + # IMU and actuators: + self.q_mes = np.zeros([size, 12]) + self.v_mes = np.zeros([size, 12]) + self.torquesFromCurrentMeasurment = np.zeros([size, 12]) + self.baseOrientation = np.zeros([size, 3]) + self.baseOrientationQuat = np.zeros([size, 4]) + self.baseAngularVelocity = np.zeros([size, 3]) + self.baseLinearAcceleration = np.zeros([size, 3]) + self.baseAccelerometer = np.zeros([size, 3]) + self.current = np.zeros(size) + self.voltage = np.zeros(size) + self.energy = np.zeros(size) + + # Motion capture: + self.mocapPosition = np.zeros([size, 3]) + self.mocapVelocity = np.zeros([size, 3]) + self.mocapAngularVelocity = np.zeros([size, 3]) + self.mocapOrientationMat9 = np.zeros([size, 3, 3]) + self.mocapOrientationQuat = np.zeros([size, 4]) + + # Timestamps + self.tstamps = np.zeros(size) + + # Controller timings: MPC time, ... + self.t_MPC = np.zeros(size) + + # MPC + # self.mpc_input = np.zeros([size, mpc_result_size]) + # self.mpc_result = np.zeros([size, mpc_result_size]) + self.mpc_solving_duration = np.zeros([size]) + self.mpc_cost = np.zeros([size, 1]) + + # Whole body control + self.wbc_P = np.zeros([size, 12]) # proportionnal gains of the PD+ + self.wbc_D = np.zeros([size, 12]) # derivative gains of the PD+ + self.wbc_q_des = np.zeros([size, 12]) # desired position of actuators + self.wbc_v_des = np.zeros([size, 12]) # desired velocity of actuators + self.wbc_FF = np.zeros([size, 12]) # gains for the feedforward torques + self.wbc_tau_ff = np.zeros([size, 12]) # feedforward torques + + def sample(self, controller, device, qualisys=None): + if self.i >= self.size: + if self.loop_buffer: + self.i = 0 + else: + return + + # Logging from the device (data coming from the robot) + self.q_mes[self.i] = device.joints.positions + self.v_mes[self.i] = device.joints.velocities + self.baseOrientation[self.i] = device.imu.attitude_euler + self.baseOrientationQuat[self.i] = device.imu.attitude_quaternion + self.baseAngularVelocity[self.i] = device.imu.gyroscope + self.baseLinearAcceleration[self.i] = device.imu.linear_acceleration + self.baseAccelerometer[self.i] = device.imu.accelerometer + self.torquesFromCurrentMeasurment[self.i] = device.joints.measured_torques + self.current[self.i] = device.powerboard.current + self.voltage[self.i] = device.powerboard.voltage + self.energy[self.i] = device.powerboard.energy + + # Logging from qualisys (motion capture) + if qualisys is not None: + self.mocapPosition[self.i] = qualisys.getPosition() + self.mocapVelocity[self.i] = qualisys.getVelocity() + self.mocapAngularVelocity[self.i] = qualisys.getAngularVelocity() + self.mocapOrientationMat9[self.i] = qualisys.getOrientationMat9() + self.mocapOrientationQuat[self.i] = qualisys.getOrientationQuat() + else: # Logging from PyBullet simulator through fake device + self.mocapPosition[self.i] = device.baseState[0] + self.mocapVelocity[self.i] = device.baseVel[0] + self.mocapAngularVelocity[self.i] = device.baseVel[1] + self.mocapOrientationMat9[self.i] = device.rot_oMb + self.mocapOrientationQuat[self.i] = device.baseState[1] + + # Logging from model predictive control + # TODO update + # self.mpc_input[self.i] = controller.mpc_result + # self.mpc_result[self.i] = controller.mpc_result + self.mpc_solving_duration[self.i] = controller.mpc.t_mpc_solving_duration + self.mpc_cost[self.i] = controller.mpc_cost + + # Logging from whole body control + self.wbc_P[self.i] = controller.result.P + self.wbc_D[self.i] = controller.result.D + self.wbc_q_des[self.i] = controller.result.q_des + self.wbc_v_des[self.i] = controller.result.v_des + self.wbc_FF[self.i] = controller.result.FF + self.wbc_tau_ff[self.i] = controller.result.tau_ff + + # Logging timestamp + self.tstamps[self.i] = time() + + self.i += 1 + + def plotTimes(self): + """ + Estimated computation time for each step of the control architecture + """ + from matplotlib import pyplot as plt + + t_range = np.array([k * self.dt for k in range(self.tstamps.shape[0])]) + + plt.figure() + plt.plot(t_range, self.t_MPC, "r+") + legend = ["MPC"] + plt.legend(legend) + plt.xlabel("Time [s]") + plt.ylabel("Time [s]") + self.custom_suptitle("Computation time of each block") + + def plotMPCCost(self): + """ + Plot the cost of the OSQP MPC + """ + from matplotlib import pyplot as plt + + t_range = np.array([k * self.dt for k in range(self.tstamps.shape[0])]) + + fig = plt.figure() + plt.plot(t_range[100:], self.mpc_cost[100:], "k+") + plt.legend(["MPC cost"]) + plt.xlabel("Time [s]") + plt.ylabel("Cost value") + self.custom_suptitle("MPC cost value") + + def plotMpcTime(self): + """ + Plot estimated solving time of the model prediction control + """ + from matplotlib import pyplot as plt + + t_range = np.array([k * self.dt for k in range(self.tstamps.shape[0])]) + + fig = plt.figure() + plt.plot(t_range[35:], self.mpc_solving_duration[35:], "k+") + plt.legend(["Solving duration"]) + plt.xlabel("Time [s]") + plt.ylabel("Time [s]") + self.custom_suptitle("MPC solving time") + + def plot(self, self): + """ " + Step in system time at each loop + """ + + from matplotlib import pyplot as plt + + N = self.tstamps.shape[0] + t_range = np.array([k * self.dt for k in range(N)]) + + # TODO add the plots you want + self.plotTimes() + self.plotMpcTime() + self.plotMPCCost() + + plt.show(block=True) + + def save(self, fileName="data"): + date_str = datetime.now().strftime("_%Y_%m_%d_%H_%M") + name = fileName + date_str + "_" + str(self.type_MPC) + ".npz" + + np.savez_compressed( + name, + t_MPC=self.t_MPC, + # mpc_result=self.mpc_result, + mpc_solving_duration=self.mpc_solving_duration, + mpc_cost=self.mpc_cost, + wbc_P=self.wbc_P, + wbc_D=self.wbc_D, + wbc_q_des=self.wbc_q_des, + wbc_v_des=self.wbc_v_des, + wbc_FF=self.wbc_FF, + wbc_tau_ff=self.wbc_tau_ff, + tstamps=self.tstamps, + q_mes=self.q_mes, + v_mes=self.v_mes, + baseOrientation=self.baseOrientation, + baseOrientationQuat=self.baseOrientationQuat, + baseAngularVelocity=self.baseAngularVelocity, + baseLinearAcceleration=self.baseLinearAcceleration, + baseAccelerometer=self.baseAccelerometer, + torquesFromCurrentMeasurment=self.torquesFromCurrentMeasurment, + mocapPosition=self.mocapPosition, + mocapVelocity=self.mocapVelocity, + mocapAngularVelocity=self.mocapAngularVelocity, + mocapOrientationMat9=self.mocapOrientationMat9, + mocapOrientationQuat=self.mocapOrientationQuat, + current=self.current, + voltage=self.voltage, + energy=self.energy, + ) + print("Log saved in " + name) + + def load(self): + + if self.data is None: + print("No data file loaded. Need one in the constructor.") + return + + # TODO: update loader + + self.t_MPC = self.data["t_MPC"] + # self.mpc_x_f = self.data["mpc_x_f"] + self.mpc_solving_duration = self.data["mpc_solving_duration"] + + self.wbc_P = self.data["wbc_P"] + self.wbc_D = self.data["wbc_D"] + self.wbc_q_des = self.data["wbc_q_des"] + self.wbc_v_des = self.data["wbc_v_des"] + self.wbc_FF = self.data["wbc_FF"] + self.wbc_tau_ff = self.data["wbc_tau_ff"] + + self.mpc_cost = self.data["mpc_cost"] + self.tstamps = self.data["tstamps"] + + + # Load sensors arrays + self.q_mes = self.data["q_mes"] + self.v_mes = self.data["v_mes"] + self.baseOrientation = self.data["baseOrientation"] + self.baseOrientationQuat = self.data["baseOrientationQuat"] + self.baseAngularVelocity = self.data["baseAngularVelocity"] + self.baseLinearAcceleration = self.data["baseLinearAcceleration"] + self.baseAccelerometer = self.data["baseAccelerometer"] + self.torquesFromCurrentMeasurment = self.data["torquesFromCurrentMeasurment"] + + self.mocapPosition = self.data["mocapPosition"] + self.mocapVelocity = self.data["mocapVelocity"] + self.mocapAngularVelocity = self.data["mocapAngularVelocity"] + self.mocapOrientationMat9 = self.data["mocapOrientationMat9"] + self.mocapOrientationQuat = self.data["mocapOrientationQuat"] + self.size = self.q_mes.shape[0] + self.current = self.data["current"] + self.voltage = self.data["voltage"] + self.energy = self.data["energy"] + + +if __name__ == "__main__": + import sys + import os + import argparse + import quadruped_reactive_walking as qrw + from quadruped_reactive_walking.tools import self + + sys.path.insert(0, os.getcwd()) + + parser = argparse.ArgumentParser(description="Process logs.") + parser.add_argument("--file", type=str, help="A valid log file path") + args = parser.parse_args() + + params = qrw.Params() + logger = LoggerControl(params) + logger.load(fileName=args.file) + logger.plot()