From 3c29e53af54e3b3819cb4827e11ea75d668c32ea Mon Sep 17 00:00:00 2001 From: odri <odri@furano.laas.fr> Date: Fri, 1 Oct 2021 14:10:24 +0200 Subject: [PATCH] Actualization of replay script --- scripts/main_solo12_replay.py | 295 +++++++++------------------------- 1 file changed, 77 insertions(+), 218 deletions(-) diff --git a/scripts/main_solo12_replay.py b/scripts/main_solo12_replay.py index 7bae2bd1..1e7d140a 100644 --- a/scripts/main_solo12_replay.py +++ b/scripts/main_solo12_replay.py @@ -1,10 +1,6 @@ # coding: utf8 import os -import sys - -sys.path.insert(0, './solopython') - import threading from Controller import Controller import numpy as np @@ -12,26 +8,20 @@ import argparse from LoggerSensors import LoggerSensors from LoggerControl import LoggerControl import libquadruped_reactive_walking as lqrw +import time params = lqrw.Params() # Object that holds all controller parameters if params.SIMULATION: from PyBulletSimulator import PyBulletSimulator else: - # from pynput import keyboard - from solopython.solo12 import Solo12 + import libodri_control_interface_pywrap as oci from solopython.utils.qualisysClient import QualisysClient - -key_pressed = False - - def get_input(): - global key_pressed - keystrk = input('Put the robot on the floor and press Enter \n') + keystrk = input() # thread doesn't continue until key is pressed - key_pressed = True - + # and so it remains alive def put_on_the_floor(device, q_init): """Make the robot go to the default initial position and wait for the user @@ -41,69 +31,40 @@ def put_on_the_floor(device, q_init): device (robot wrapper): a wrapper to communicate with the robot q_init (array): the default position of the robot """ - global key_pressed - key_pressed = False - Kp_pos = 3. - Kd_pos = 0.01 - imax = 3.0 - pos = np.zeros(device.nb_motors) - for motor in range(device.nb_motors): - pos[motor] = q_init[device.motorToUrdf[motor]] * device.gearRatioSigned[motor] - i = threading.Thread(target=get_input) - i.start() - while not key_pressed: - device.UpdateMeasurment() - for motor in range(device.nb_motors): - ref = Kp_pos*(pos[motor] - device.hardware.GetMotor(motor).GetPosition() - - Kd_pos*device.hardware.GetMotor(motor).GetVelocity()) - ref = min(imax, max(-imax, ref)) - device.hardware.GetMotor(motor).SetCurrentReference(ref) - device.SendCommand(WaitEndOfCycle=True) - - print("Start the motion.") + print("PUT ON THE FLOOR.") -def clone_movements(name_interface_clone, q_init, cloneP, cloneD, cloneQdes, cloneDQdes, cloneRunning, cloneResult): + Kp_pos = 6. + Kd_pos = 0.3 - print("-- Launching clone interface --") + device.joints.set_position_gains(Kp_pos * np.ones(12)) + device.joints.set_velocity_gains(Kd_pos * np.ones(12)) + device.joints.set_desired_positions(q_init) + device.joints.set_desired_velocities(np.zeros(12)) + device.joints.set_torques(np.zeros(12)) - print(name_interface_clone, params.dt_wbc) - clone = Solo12(name_interface_clone, dt=params.dt_wbc) - clone.Init(calibrateEncoders=True, q_init=q_init) - - while cloneRunning.value and not clone.hardware.IsTimeout(): - - # print(cloneP[:], cloneD[:], cloneQdes[:], cloneDQdes[:], cloneRunning.value, cloneResult.value) - if cloneResult.value: - - clone.SetDesiredJointPDgains(cloneP[:], cloneD[:]) - clone.SetDesiredJointPosition(cloneQdes[:]) - clone.SetDesiredJointVelocity(cloneDQdes[:]) - clone.SetDesiredJointTorque([0.0] * 12) - - clone.SendCommand(WaitEndOfCycle=True) - - cloneResult.value = False - - return 0 + i = threading.Thread(target=get_input) + i.start() + print("Put the robot on the floor and press Enter") + while i.is_alive(): + device.parse_sensor_data() + device.send_command_and_wait_end_of_cycle(params.dt_wbc) + + print("Start the motion.") -def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=None): +def control_loop(name_interface_clone=None, des_vel_analysis=None): """Main function that calibrates the robot, get it into a default waiting position then launch the main control loop once the user has pressed the Enter key Args: - name_interface (string): name of the interface that is used to communicate with the robot name_interface_clone (string): name of the interface that will mimic the movements of the first """ - """name_replay = "/home/odri/git/paleziart/trajectories_replay/solo_sin_wiggle_more" - replay_q = np.loadtxt(name_replay + "_q_sta.dat", delimiter=" ")[:, 1:] - replay_v = np.loadtxt(name_replay + "_v_sta.dat", delimiter=" ")[:, 1:] - replay_tau = np.loadtxt(name_replay + "_tau_sta.dat", delimiter=" ")[:, 1:] - params.N_SIMULATION = replay_q.shape[0]""" + # Check .yaml file for parameters of the controller - replay = np.load("/home/odri/git/paleziart/Recordings_2021_06_17/adata_2021_06_17_10_35.npz") + # Read replay data + replay = np.load("path_of_replay.npz") replay_P = replay["P"] replay_D = replay["D"] replay_q = replay["q"] @@ -111,45 +72,18 @@ def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=Non replay_tau = replay["tau_ff"] params.N_SIMULATION = replay_q.shape[0] - """from matplotlib import pyplot as plt - plt.figure() - plt.plot(replay_q[:, 1]) - plt.plot(replay_q[:, 1])""" - - - """from IPython import embed - embed()""" - - # Check .yaml file for parameters of the controller - # Enable or disable PyBullet GUI - enable_pyb_GUI = params.enable_pyb_GUI if not params.SIMULATION: - enable_pyb_GUI = False + params.enable_pyb_GUI = False # Time variable to keep track of time t = 0.0 # Default position after calibration - q_init = np.array([0.0, 0.7, -1.4, -0.0, 0.7, -1.4, 0.0, -0.7, +1.4, -0.0, -0.7, +1.4]) - - if params.SIMULATION and (des_vel_analysis is not None): - print("Analysis: %1.1f %1.1f %1.1f" % (des_vel_analysis[0], des_vel_analysis[1], des_vel_analysis[5])) - acceleration_rate = 0.1 # m/s^2 - steady_state_duration = 3 # s - N_analysis = int(np.max(np.abs(des_vel_analysis)) / acceleration_rate / params.dt_wbc) + 1 - N_steady = int(steady_state_duration / params.dt_wbc) - params.N_SIMULATION = N_analysis + N_steady + q_init = np.array(params.q_init.tolist()) # Run a scenario and retrieve data thanks to the logger - controller = Controller(q_init, params.envID, params.velID, params.dt_wbc, params.dt_mpc, - int(params.dt_mpc / params.dt_wbc), t, params.T_gait, - params.T_gait * params.N_period, params.N_SIMULATION, params.type_MPC, params.use_flat_plane, - params.predefined_vel, enable_pyb_GUI, params.kf_enabled, params.N_gait, - params.SIMULATION) - - if params.SIMULATION and (des_vel_analysis is not None): - controller.joystick.update_for_analysis(des_vel_analysis, N_analysis, N_steady) + controller = Controller(params, q_init, t) #### @@ -157,26 +91,12 @@ def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=Non device = PyBulletSimulator() qc = None else: - device = Solo12(name_interface, dt=params.dt_wbc) + device = oci.robot_from_yaml_file(params.config_file) qc = QualisysClient(ip="140.93.16.160", body_id=0) - if name_interface_clone is not None: - print("PASS") - from multiprocessing import Process, Array, Value - cloneP = Array('d', [0] * 12) - cloneD = Array('d', [0] * 12) - cloneQdes = Array('d', [0] * 12) - cloneDQdes = Array('d', [0] * 12) - cloneRunning = Value('b', True) - cloneResult = Value('b', True) - clone = Process(target=clone_movements, args=(name_interface_clone, q_init, cloneP, - cloneD, cloneQdes, cloneDQdes, cloneRunning, cloneResult)) - clone.start() - print(cloneResult.value) - if params.LOGGING or params.PLOTTING: loggerSensors = LoggerSensors(device, qualisys=qc, logSize=params.N_SIMULATION-3) - loggerControl = LoggerControl(params.dt_wbc, params.N_gait, joystick=controller.joystick, + loggerControl = LoggerControl(params.dt_wbc, params.gait.shape[0], params.type_MPC, joystick=controller.joystick, estimator=controller.estimator, loop=controller, gait=controller.gait, statePlanner=controller.statePlanner, footstepPlanner=controller.footstepPlanner, @@ -184,14 +104,18 @@ def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=Non logSize=params.N_SIMULATION-3) # Number of motors - nb_motors = device.nb_motors + nb_motors = 12 # Initiate communication with the device and calibrate encoders if params.SIMULATION: device.Init(calibrateEncoders=True, q_init=q_init, envID=params.envID, - use_flat_plane=params.use_flat_plane, enable_pyb_GUI=enable_pyb_GUI, dt=params.dt_wbc) + use_flat_plane=params.use_flat_plane, enable_pyb_GUI=params.enable_pyb_GUI, dt=params.dt_wbc) else: - device.Init(calibrateEncoders=True, q_init=q_init) + # Initialize the communication and the session. + device.initialize(q_init[:]) + device.joints.set_zero_commands() + + device.parse_sensor_data() # Wait for Enter input before starting the control loop put_on_the_floor(device, q_init) @@ -200,43 +124,29 @@ def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=Non t = 0.0 t_max = (params.N_SIMULATION-2) * params.dt_wbc - i_replay = 0 - P = 7 * np.ones(12) - D = 0.5 * np.ones(12) - - while ((not device.hardware.IsTimeout()) and (t < t_max) and (not controller.myController.error)): + k_log_whole = 0 + T_whole = time.time() + dT_whole = 0.0 + while ((not device.is_timeout) and (t < t_max) and (not controller.error)): # Update sensor data (IMU, encoders, Motion capture) - device.UpdateMeasurment() - - - # Desired torques - # controller.compute(device) + device.parse_sensor_data() # Check that the initial position of actuators is not too far from the # desired position of actuators to avoid breaking the robot if (t <= 10 * params.dt_wbc): - if np.max(np.abs(controller.result.q_des - device.q_mes)) > 0.15: - print("DIFFERENCE: ", controller.result.q_des - device.q_mes) + if np.max(np.abs(controller.result.q_des - device.joints.positions)) > 0.15: + print("DIFFERENCE: ", controller.result.q_des - device.joints.positions) print("q_des: ", controller.result.q_des) - print("q_mes: ", device.q_mes) + print("q_mes: ", device.joints.positions) break # Set desired quantities for the actuators - """device.SetDesiredJointPDgains(controller.result.P, controller.result.D) - device.SetDesiredJointPosition(controller.result.q_des) - device.SetDesiredJointVelocity(controller.result.v_des) - device.SetDesiredJointTorque(controller.result.tau_ff.ravel())""" - """device.SetDesiredJointPDgains(np.zeros(12), np.zeros(12)) - device.SetDesiredJointPosition(np.zeros(12)) - device.SetDesiredJointVelocity(np.zeros(12)) - device.SetDesiredJointTorque(np.zeros(12))""" - - device.SetDesiredJointPDgains(replay_P[i_replay, :],replay_D[i_replay, :]) - device.SetDesiredJointPosition(replay_q[i_replay, :]) - device.SetDesiredJointVelocity(replay_v[i_replay, :]) - device.SetDesiredJointTorque(replay_tau[i_replay, :]) - + device.joints.set_position_gains(replay_P[k_log_whole, :]) + device.joints.set_velocity_gains(replay_D[k_log_whole, :]) + device.joints.set_desired_positions(replay_q[k_log_whole, :]) + device.joints.set_desired_velocities(replay_v[k_log_whole, :]) + device.joints.set_torques(params.Kff_main * replay_tau[k_log_whole, :]) # Call logger if params.LOGGING or params.PLOTTING: @@ -244,41 +154,18 @@ def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=Non loggerControl.sample(controller.joystick, controller.estimator, controller, controller.gait, controller.statePlanner, controller.footstepPlanner, controller.footTrajectoryGenerator, - controller.myController) + controller.wbcWrapper, dT_whole) # Send command to the robot for i in range(1): - device.SendCommand(WaitEndOfCycle=True) - """if ((device.cpt % 100) == 0): - # device.Print()""" - - """import os - from matplotlib import pyplot as plt - import pybullet as pyb - if (t == 0.0): - cpt_frames = 0 - step = 10 - if (cpt_frames % step) == 0: - if (cpt_frames % 1000): - print(cpt_frames) - img = pyb.getCameraImage(width=1920, height=1080, renderer=pyb.ER_BULLET_HARDWARE_OPENGL) - if cpt_frames == 0: - newpath = r'/tmp/recording' - if not os.path.exists(newpath): - os.makedirs(newpath) - if (int(cpt_frames/step) < 10): - plt.imsave('/tmp/recording/frame_000'+str(int(cpt_frames/step))+'.png', img[2]) - elif int(cpt_frames/step) < 100: - plt.imsave('/tmp/recording/frame_00'+str(int(cpt_frames/step))+'.png', img[2]) - elif int(cpt_frames/step) < 1000: - plt.imsave('/tmp/recording/frame_0'+str(int(cpt_frames/step))+'.png', img[2]) - else: - plt.imsave('/tmp/recording/frame_'+str(int(cpt_frames/step))+'.png', img[2]) - - cpt_frames += 1""" + device.send_command_and_wait_end_of_cycle(params.dt_wbc) t += params.dt_wbc # Increment loop time - i_replay += 1 + + dT_whole = T_whole + T_whole = time.time() + dT_whole = T_whole - dT_whole + k_log_whole += 1 # **************************************************************** @@ -287,65 +174,39 @@ def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=Non else: finished = False - # Stop clone interface running in parallel process - if not params.SIMULATION and name_interface_clone is not None: - cloneResult.value = False - - # Stop MPC running in a parallel process - if controller.enable_multiprocessing: - print("Stopping parallel process") - controller.mpc_wrapper.stop_parallel_loop() - # controller.view.stop() # Stop viewer - # DAMPING TO GET ON THE GROUND PROGRESSIVELY ********************* t = 0.0 t_max = 2.5 - while ((not device.hardware.IsTimeout()) and (t < t_max)): + while ((not device.is_timeout) and (t < t_max)): - device.UpdateMeasurment() # Retrieve data from IMU and Motion capture + device.parse_sensor_data() # Retrieve data from IMU and Motion capture # Set desired quantities for the actuators - device.SetDesiredJointPDgains(np.zeros(12), 0.1 * np.ones(12)) - device.SetDesiredJointPosition(np.zeros(12)) - device.SetDesiredJointVelocity(np.zeros(12)) - device.SetDesiredJointTorque(np.zeros(12)) + device.joints.set_position_gains(np.zeros(nb_motors)) + device.joints.set_velocity_gains(0.1 * np.ones(nb_motors)) + device.joints.set_desired_positions(np.zeros(nb_motors)) + device.joints.set_desired_velocities(np.zeros(nb_motors)) + device.joints.set_torques(np.zeros(nb_motors)) # Send command to the robot - device.SendCommand(WaitEndOfCycle=True) - if ((device.cpt % 1000) == 0): - device.Print() + device.send_command_and_wait_end_of_cycle(params.dt_wbc) + if (t % 1) < 5e-5: + print('IMU attitude:', device.imu.attitude_euler) + print('joint pos :', device.joints.positions) + print('joint vel :', device.joints.velocities) + device.robot_interface.PrintStats() t += params.dt_wbc # FINAL SHUTDOWN ************************************************* # Whatever happened we send 0 torques to the motors. - device.SetDesiredJointTorque([0]*nb_motors) - device.SendCommand(WaitEndOfCycle=True) + device.joints.set_torques(np.zeros(nb_motors)) + device.send_command_and_wait_end_of_cycle(params.dt_wbc) - if device.hardware.IsTimeout(): + if device.is_timeout: print("Masterboard timeout detected.") print("Either the masterboard has been shut down or there has been a connection issue with the cable/wifi.") - device.hardware.Stop() # Shut down the interface between the computer and the master board - - # Plot estimated computation time for each step for the control architecture - from matplotlib import pyplot as plt - """plt.figure() - plt.plot(controller.t_list_filter[1:], 'r+') - plt.plot(controller.t_list_planner[1:], 'g+') - plt.plot(controller.t_list_mpc[1:], 'b+') - plt.plot(controller.t_list_wbc[1:], '+', color="violet") - plt.plot(controller.t_list_loop[1:], 'k+') - plt.plot(controller.t_list_InvKin[1:], 'o', color="darkgreen") - plt.plot(controller.t_list_QPWBC[1:], 'o', color="royalblue") - plt.legend(["Estimator", "Planner", "MPC", "WBC", "Whole loop", "InvKin", "QP WBC"]) - plt.title("Loop time [s]") - plt.show(block=True)""" - """plt.figure() - for i in range(3): - plt.subplot(3, 1, i+1) - plt.plot(controller.o_log_foot[:, i, 0]) - plt.show(block=True)""" # Plot recorded data if params.PLOTTING: @@ -356,11 +217,11 @@ def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=Non loggerControl.saveAll(loggerSensors) print("Log saved") - if params.SIMULATION and enable_pyb_GUI: + if params.SIMULATION and params.enable_pyb_GUI: # Disconnect the PyBullet server (also close the GUI) device.Stop() - if controller.myController.error: + if controller.error: if (controller.error_flag == 1): print("-- POSITION LIMIT ERROR --") elif (controller.error_flag == 2): @@ -373,22 +234,20 @@ def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=Non return finished, des_vel_analysis + def main(): """Main function """ parser = argparse.ArgumentParser(description='Playback trajectory to show the extent of solo12 workspace.') - parser.add_argument('-i', - '--interface', - required=True, - help='Name of the interface (use ifconfig in a terminal), for instance "enp1s0"') parser.add_argument('-c', '--clone', required=False, help='Name of the clone interface that will reproduce the movement of the first one \ (use ifconfig in a terminal), for instance "enp1s0"') - f, v = control_loop(parser.parse_args().interface, parser.parse_args().clone) + os.nice(-20) # Â Set the process to highest priority (from -20 highest to +20 lowest) + f, v = control_loop(parser.parse_args().clone) # , np.array([1.5, 0.0, 0.0, 0.0, 0.0, 0.0])) print(f, v) quit() -- GitLab