Skip to content
Snippets Groups Projects
Commit 3c29e53a authored by odri's avatar odri
Browse files

Actualization of replay script

parent 7cc68d9a
No related branches found
No related tags found
No related merge requests found
# coding: utf8
import os
import sys
sys.path.insert(0, './solopython')
import threading
from Controller import Controller
import numpy as np
......@@ -12,26 +8,20 @@ import argparse
from LoggerSensors import LoggerSensors
from LoggerControl import LoggerControl
import libquadruped_reactive_walking as lqrw
import time
params = lqrw.Params() # Object that holds all controller parameters
if params.SIMULATION:
from PyBulletSimulator import PyBulletSimulator
else:
# from pynput import keyboard
from solopython.solo12 import Solo12
import libodri_control_interface_pywrap as oci
from solopython.utils.qualisysClient import QualisysClient
key_pressed = False
def get_input():
global key_pressed
keystrk = input('Put the robot on the floor and press Enter \n')
keystrk = input()
# thread doesn't continue until key is pressed
key_pressed = True
# and so it remains alive
def put_on_the_floor(device, q_init):
"""Make the robot go to the default initial position and wait for the user
......@@ -41,69 +31,40 @@ def put_on_the_floor(device, q_init):
device (robot wrapper): a wrapper to communicate with the robot
q_init (array): the default position of the robot
"""
global key_pressed
key_pressed = False
Kp_pos = 3.
Kd_pos = 0.01
imax = 3.0
pos = np.zeros(device.nb_motors)
for motor in range(device.nb_motors):
pos[motor] = q_init[device.motorToUrdf[motor]] * device.gearRatioSigned[motor]
i = threading.Thread(target=get_input)
i.start()
while not key_pressed:
device.UpdateMeasurment()
for motor in range(device.nb_motors):
ref = Kp_pos*(pos[motor] - device.hardware.GetMotor(motor).GetPosition() -
Kd_pos*device.hardware.GetMotor(motor).GetVelocity())
ref = min(imax, max(-imax, ref))
device.hardware.GetMotor(motor).SetCurrentReference(ref)
device.SendCommand(WaitEndOfCycle=True)
print("Start the motion.")
print("PUT ON THE FLOOR.")
def clone_movements(name_interface_clone, q_init, cloneP, cloneD, cloneQdes, cloneDQdes, cloneRunning, cloneResult):
Kp_pos = 6.
Kd_pos = 0.3
print("-- Launching clone interface --")
device.joints.set_position_gains(Kp_pos * np.ones(12))
device.joints.set_velocity_gains(Kd_pos * np.ones(12))
device.joints.set_desired_positions(q_init)
device.joints.set_desired_velocities(np.zeros(12))
device.joints.set_torques(np.zeros(12))
print(name_interface_clone, params.dt_wbc)
clone = Solo12(name_interface_clone, dt=params.dt_wbc)
clone.Init(calibrateEncoders=True, q_init=q_init)
while cloneRunning.value and not clone.hardware.IsTimeout():
# print(cloneP[:], cloneD[:], cloneQdes[:], cloneDQdes[:], cloneRunning.value, cloneResult.value)
if cloneResult.value:
clone.SetDesiredJointPDgains(cloneP[:], cloneD[:])
clone.SetDesiredJointPosition(cloneQdes[:])
clone.SetDesiredJointVelocity(cloneDQdes[:])
clone.SetDesiredJointTorque([0.0] * 12)
clone.SendCommand(WaitEndOfCycle=True)
cloneResult.value = False
return 0
i = threading.Thread(target=get_input)
i.start()
print("Put the robot on the floor and press Enter")
while i.is_alive():
device.parse_sensor_data()
device.send_command_and_wait_end_of_cycle(params.dt_wbc)
print("Start the motion.")
def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=None):
def control_loop(name_interface_clone=None, des_vel_analysis=None):
"""Main function that calibrates the robot, get it into a default waiting position then launch
the main control loop once the user has pressed the Enter key
Args:
name_interface (string): name of the interface that is used to communicate with the robot
name_interface_clone (string): name of the interface that will mimic the movements of the first
"""
"""name_replay = "/home/odri/git/paleziart/trajectories_replay/solo_sin_wiggle_more"
replay_q = np.loadtxt(name_replay + "_q_sta.dat", delimiter=" ")[:, 1:]
replay_v = np.loadtxt(name_replay + "_v_sta.dat", delimiter=" ")[:, 1:]
replay_tau = np.loadtxt(name_replay + "_tau_sta.dat", delimiter=" ")[:, 1:]
params.N_SIMULATION = replay_q.shape[0]"""
# Check .yaml file for parameters of the controller
replay = np.load("/home/odri/git/paleziart/Recordings_2021_06_17/adata_2021_06_17_10_35.npz")
# Read replay data
replay = np.load("path_of_replay.npz")
replay_P = replay["P"]
replay_D = replay["D"]
replay_q = replay["q"]
......@@ -111,45 +72,18 @@ def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=Non
replay_tau = replay["tau_ff"]
params.N_SIMULATION = replay_q.shape[0]
"""from matplotlib import pyplot as plt
plt.figure()
plt.plot(replay_q[:, 1])
plt.plot(replay_q[:, 1])"""
"""from IPython import embed
embed()"""
# Check .yaml file for parameters of the controller
# Enable or disable PyBullet GUI
enable_pyb_GUI = params.enable_pyb_GUI
if not params.SIMULATION:
enable_pyb_GUI = False
params.enable_pyb_GUI = False
# Time variable to keep track of time
t = 0.0
# Default position after calibration
q_init = np.array([0.0, 0.7, -1.4, -0.0, 0.7, -1.4, 0.0, -0.7, +1.4, -0.0, -0.7, +1.4])
if params.SIMULATION and (des_vel_analysis is not None):
print("Analysis: %1.1f %1.1f %1.1f" % (des_vel_analysis[0], des_vel_analysis[1], des_vel_analysis[5]))
acceleration_rate = 0.1 # m/s^2
steady_state_duration = 3 # s
N_analysis = int(np.max(np.abs(des_vel_analysis)) / acceleration_rate / params.dt_wbc) + 1
N_steady = int(steady_state_duration / params.dt_wbc)
params.N_SIMULATION = N_analysis + N_steady
q_init = np.array(params.q_init.tolist())
# Run a scenario and retrieve data thanks to the logger
controller = Controller(q_init, params.envID, params.velID, params.dt_wbc, params.dt_mpc,
int(params.dt_mpc / params.dt_wbc), t, params.T_gait,
params.T_gait * params.N_period, params.N_SIMULATION, params.type_MPC, params.use_flat_plane,
params.predefined_vel, enable_pyb_GUI, params.kf_enabled, params.N_gait,
params.SIMULATION)
if params.SIMULATION and (des_vel_analysis is not None):
controller.joystick.update_for_analysis(des_vel_analysis, N_analysis, N_steady)
controller = Controller(params, q_init, t)
####
......@@ -157,26 +91,12 @@ def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=Non
device = PyBulletSimulator()
qc = None
else:
device = Solo12(name_interface, dt=params.dt_wbc)
device = oci.robot_from_yaml_file(params.config_file)
qc = QualisysClient(ip="140.93.16.160", body_id=0)
if name_interface_clone is not None:
print("PASS")
from multiprocessing import Process, Array, Value
cloneP = Array('d', [0] * 12)
cloneD = Array('d', [0] * 12)
cloneQdes = Array('d', [0] * 12)
cloneDQdes = Array('d', [0] * 12)
cloneRunning = Value('b', True)
cloneResult = Value('b', True)
clone = Process(target=clone_movements, args=(name_interface_clone, q_init, cloneP,
cloneD, cloneQdes, cloneDQdes, cloneRunning, cloneResult))
clone.start()
print(cloneResult.value)
if params.LOGGING or params.PLOTTING:
loggerSensors = LoggerSensors(device, qualisys=qc, logSize=params.N_SIMULATION-3)
loggerControl = LoggerControl(params.dt_wbc, params.N_gait, joystick=controller.joystick,
loggerControl = LoggerControl(params.dt_wbc, params.gait.shape[0], params.type_MPC, joystick=controller.joystick,
estimator=controller.estimator, loop=controller,
gait=controller.gait, statePlanner=controller.statePlanner,
footstepPlanner=controller.footstepPlanner,
......@@ -184,14 +104,18 @@ def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=Non
logSize=params.N_SIMULATION-3)
# Number of motors
nb_motors = device.nb_motors
nb_motors = 12
# Initiate communication with the device and calibrate encoders
if params.SIMULATION:
device.Init(calibrateEncoders=True, q_init=q_init, envID=params.envID,
use_flat_plane=params.use_flat_plane, enable_pyb_GUI=enable_pyb_GUI, dt=params.dt_wbc)
use_flat_plane=params.use_flat_plane, enable_pyb_GUI=params.enable_pyb_GUI, dt=params.dt_wbc)
else:
device.Init(calibrateEncoders=True, q_init=q_init)
# Initialize the communication and the session.
device.initialize(q_init[:])
device.joints.set_zero_commands()
device.parse_sensor_data()
# Wait for Enter input before starting the control loop
put_on_the_floor(device, q_init)
......@@ -200,43 +124,29 @@ def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=Non
t = 0.0
t_max = (params.N_SIMULATION-2) * params.dt_wbc
i_replay = 0
P = 7 * np.ones(12)
D = 0.5 * np.ones(12)
while ((not device.hardware.IsTimeout()) and (t < t_max) and (not controller.myController.error)):
k_log_whole = 0
T_whole = time.time()
dT_whole = 0.0
while ((not device.is_timeout) and (t < t_max) and (not controller.error)):
# Update sensor data (IMU, encoders, Motion capture)
device.UpdateMeasurment()
# Desired torques
# controller.compute(device)
device.parse_sensor_data()
# Check that the initial position of actuators is not too far from the
# desired position of actuators to avoid breaking the robot
if (t <= 10 * params.dt_wbc):
if np.max(np.abs(controller.result.q_des - device.q_mes)) > 0.15:
print("DIFFERENCE: ", controller.result.q_des - device.q_mes)
if np.max(np.abs(controller.result.q_des - device.joints.positions)) > 0.15:
print("DIFFERENCE: ", controller.result.q_des - device.joints.positions)
print("q_des: ", controller.result.q_des)
print("q_mes: ", device.q_mes)
print("q_mes: ", device.joints.positions)
break
# Set desired quantities for the actuators
"""device.SetDesiredJointPDgains(controller.result.P, controller.result.D)
device.SetDesiredJointPosition(controller.result.q_des)
device.SetDesiredJointVelocity(controller.result.v_des)
device.SetDesiredJointTorque(controller.result.tau_ff.ravel())"""
"""device.SetDesiredJointPDgains(np.zeros(12), np.zeros(12))
device.SetDesiredJointPosition(np.zeros(12))
device.SetDesiredJointVelocity(np.zeros(12))
device.SetDesiredJointTorque(np.zeros(12))"""
device.SetDesiredJointPDgains(replay_P[i_replay, :],replay_D[i_replay, :])
device.SetDesiredJointPosition(replay_q[i_replay, :])
device.SetDesiredJointVelocity(replay_v[i_replay, :])
device.SetDesiredJointTorque(replay_tau[i_replay, :])
device.joints.set_position_gains(replay_P[k_log_whole, :])
device.joints.set_velocity_gains(replay_D[k_log_whole, :])
device.joints.set_desired_positions(replay_q[k_log_whole, :])
device.joints.set_desired_velocities(replay_v[k_log_whole, :])
device.joints.set_torques(params.Kff_main * replay_tau[k_log_whole, :])
# Call logger
if params.LOGGING or params.PLOTTING:
......@@ -244,41 +154,18 @@ def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=Non
loggerControl.sample(controller.joystick, controller.estimator,
controller, controller.gait, controller.statePlanner,
controller.footstepPlanner, controller.footTrajectoryGenerator,
controller.myController)
controller.wbcWrapper, dT_whole)
# Send command to the robot
for i in range(1):
device.SendCommand(WaitEndOfCycle=True)
"""if ((device.cpt % 100) == 0):
# device.Print()"""
"""import os
from matplotlib import pyplot as plt
import pybullet as pyb
if (t == 0.0):
cpt_frames = 0
step = 10
if (cpt_frames % step) == 0:
if (cpt_frames % 1000):
print(cpt_frames)
img = pyb.getCameraImage(width=1920, height=1080, renderer=pyb.ER_BULLET_HARDWARE_OPENGL)
if cpt_frames == 0:
newpath = r'/tmp/recording'
if not os.path.exists(newpath):
os.makedirs(newpath)
if (int(cpt_frames/step) < 10):
plt.imsave('/tmp/recording/frame_000'+str(int(cpt_frames/step))+'.png', img[2])
elif int(cpt_frames/step) < 100:
plt.imsave('/tmp/recording/frame_00'+str(int(cpt_frames/step))+'.png', img[2])
elif int(cpt_frames/step) < 1000:
plt.imsave('/tmp/recording/frame_0'+str(int(cpt_frames/step))+'.png', img[2])
else:
plt.imsave('/tmp/recording/frame_'+str(int(cpt_frames/step))+'.png', img[2])
cpt_frames += 1"""
device.send_command_and_wait_end_of_cycle(params.dt_wbc)
t += params.dt_wbc # Increment loop time
i_replay += 1
dT_whole = T_whole
T_whole = time.time()
dT_whole = T_whole - dT_whole
k_log_whole += 1
# ****************************************************************
......@@ -287,65 +174,39 @@ def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=Non
else:
finished = False
# Stop clone interface running in parallel process
if not params.SIMULATION and name_interface_clone is not None:
cloneResult.value = False
# Stop MPC running in a parallel process
if controller.enable_multiprocessing:
print("Stopping parallel process")
controller.mpc_wrapper.stop_parallel_loop()
# controller.view.stop() # Stop viewer
# DAMPING TO GET ON THE GROUND PROGRESSIVELY *********************
t = 0.0
t_max = 2.5
while ((not device.hardware.IsTimeout()) and (t < t_max)):
while ((not device.is_timeout) and (t < t_max)):
device.UpdateMeasurment() # Retrieve data from IMU and Motion capture
device.parse_sensor_data() # Retrieve data from IMU and Motion capture
# Set desired quantities for the actuators
device.SetDesiredJointPDgains(np.zeros(12), 0.1 * np.ones(12))
device.SetDesiredJointPosition(np.zeros(12))
device.SetDesiredJointVelocity(np.zeros(12))
device.SetDesiredJointTorque(np.zeros(12))
device.joints.set_position_gains(np.zeros(nb_motors))
device.joints.set_velocity_gains(0.1 * np.ones(nb_motors))
device.joints.set_desired_positions(np.zeros(nb_motors))
device.joints.set_desired_velocities(np.zeros(nb_motors))
device.joints.set_torques(np.zeros(nb_motors))
# Send command to the robot
device.SendCommand(WaitEndOfCycle=True)
if ((device.cpt % 1000) == 0):
device.Print()
device.send_command_and_wait_end_of_cycle(params.dt_wbc)
if (t % 1) < 5e-5:
print('IMU attitude:', device.imu.attitude_euler)
print('joint pos :', device.joints.positions)
print('joint vel :', device.joints.velocities)
device.robot_interface.PrintStats()
t += params.dt_wbc
# FINAL SHUTDOWN *************************************************
# Whatever happened we send 0 torques to the motors.
device.SetDesiredJointTorque([0]*nb_motors)
device.SendCommand(WaitEndOfCycle=True)
device.joints.set_torques(np.zeros(nb_motors))
device.send_command_and_wait_end_of_cycle(params.dt_wbc)
if device.hardware.IsTimeout():
if device.is_timeout:
print("Masterboard timeout detected.")
print("Either the masterboard has been shut down or there has been a connection issue with the cable/wifi.")
device.hardware.Stop() # Shut down the interface between the computer and the master board
# Plot estimated computation time for each step for the control architecture
from matplotlib import pyplot as plt
"""plt.figure()
plt.plot(controller.t_list_filter[1:], 'r+')
plt.plot(controller.t_list_planner[1:], 'g+')
plt.plot(controller.t_list_mpc[1:], 'b+')
plt.plot(controller.t_list_wbc[1:], '+', color="violet")
plt.plot(controller.t_list_loop[1:], 'k+')
plt.plot(controller.t_list_InvKin[1:], 'o', color="darkgreen")
plt.plot(controller.t_list_QPWBC[1:], 'o', color="royalblue")
plt.legend(["Estimator", "Planner", "MPC", "WBC", "Whole loop", "InvKin", "QP WBC"])
plt.title("Loop time [s]")
plt.show(block=True)"""
"""plt.figure()
for i in range(3):
plt.subplot(3, 1, i+1)
plt.plot(controller.o_log_foot[:, i, 0])
plt.show(block=True)"""
# Plot recorded data
if params.PLOTTING:
......@@ -356,11 +217,11 @@ def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=Non
loggerControl.saveAll(loggerSensors)
print("Log saved")
if params.SIMULATION and enable_pyb_GUI:
if params.SIMULATION and params.enable_pyb_GUI:
# Disconnect the PyBullet server (also close the GUI)
device.Stop()
if controller.myController.error:
if controller.error:
if (controller.error_flag == 1):
print("-- POSITION LIMIT ERROR --")
elif (controller.error_flag == 2):
......@@ -373,22 +234,20 @@ def control_loop(name_interface, name_interface_clone=None, des_vel_analysis=Non
return finished, des_vel_analysis
def main():
"""Main function
"""
parser = argparse.ArgumentParser(description='Playback trajectory to show the extent of solo12 workspace.')
parser.add_argument('-i',
'--interface',
required=True,
help='Name of the interface (use ifconfig in a terminal), for instance "enp1s0"')
parser.add_argument('-c',
'--clone',
required=False,
help='Name of the clone interface that will reproduce the movement of the first one \
(use ifconfig in a terminal), for instance "enp1s0"')
f, v = control_loop(parser.parse_args().interface, parser.parse_args().clone)
os.nice(-20) # Set the process to highest priority (from -20 highest to +20 lowest)
f, v = control_loop(parser.parse_args().clone) # , np.array([1.5, 0.0, 0.0, 0.0, 0.0, 0.0]))
print(f, v)
quit()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment