From bcf492f0813e3a3ae93a58cb3b4d8f99c065819b Mon Sep 17 00:00:00 2001 From: thomascbrs <thomas.corberes@laposte.net> Date: Fri, 17 Sep 2021 09:35:38 +0100 Subject: [PATCH] Pos, vel and jerk feet into the MPC planner for costs --- scripts/Controller.py | 11 +- scripts/LoggerControl.py | 18 +- scripts/MPC_Wrapper.py | 142 ++++++++---- .../crocoddyl_class/MPC_crocoddyl_planner.py | 219 +++++++++++++----- 4 files changed, 280 insertions(+), 110 deletions(-) diff --git a/scripts/Controller.py b/scripts/Controller.py index 57cc0dc6..9c857110 100644 --- a/scripts/Controller.py +++ b/scripts/Controller.py @@ -279,9 +279,14 @@ class Controller: if self.type_MPC == 3: # Compute the target foostep in local frame, to stop the optimisation around it when t_lock overpass l_targetFootstep = oRh.transpose() @ (self.o_targetFootstep - oTh) - self.mpc_wrapper.solve(self.k, xref, fsteps, cgait, l_targetFootstep, oRh, oTh) - else: - self.mpc_wrapper.solve(self.k, xref, fsteps, cgait, np.zeros((3, 4))) + self.mpc_wrapper.solve(self.k, xref, fsteps, cgait, l_targetFootstep, oRh, oTh, + self.footTrajectoryGenerator.getFootPosition(), + self.footTrajectoryGenerator.getFootVelocity(), + self.footTrajectoryGenerator.getFootAcceleration(), + self.footTrajectoryGenerator.getFootJerk(), + self.footTrajectoryGenerator.getTswing() - self.footTrajectoryGenerator.getT0s()) + else : + self.mpc_wrapper.solve(self.k, xref, fsteps, cgait, np.zeros((3,4))) except ValueError: print("MPC Problem") diff --git a/scripts/LoggerControl.py b/scripts/LoggerControl.py index 9cba02ea..f7cfa85c 100644 --- a/scripts/LoggerControl.py +++ b/scripts/LoggerControl.py @@ -1,4 +1,5 @@ '''This class will log 1d array in Nd matrix from device and qualisys object''' +import pickle import numpy as np from datetime import datetime as datetime from time import time @@ -73,9 +74,10 @@ class LoggerControl(): self.planner_h_ref = np.zeros([logSize]) # reference height of the planner # Foot Trajectory Generator - self.planner_goals = np.zeros([logSize, 3, 4]) # 3D target feet positions + self.planner_goals = np.zeros([logSize, 3, 4]) # 3D target feet positions self.planner_vgoals = np.zeros([logSize, 3, 4]) # 3D target feet velocities self.planner_agoals = np.zeros([logSize, 3, 4]) # 3D target feet accelerations + self.planner_jgoals = np.zeros([logSize, 3, 4]) # 3D target feet accelerations # Model Predictive Control # output vector of the MPC (next state + reference contact force) @@ -160,6 +162,7 @@ class LoggerControl(): self.planner_goals[self.i] = footTrajectoryGenerator.getFootPosition() self.planner_vgoals[self.i] = footTrajectoryGenerator.getFootVelocity() self.planner_agoals[self.i] = footTrajectoryGenerator.getFootAcceleration() + self.planner_jgoals[self.i] = footTrajectoryGenerator.getFootJerk() self.planner_is_static[self.i] = gait.getIsStatic() self.planner_h_ref[self.i] = loop.h_ref @@ -741,6 +744,7 @@ class LoggerControl(): planner_goals=self.planner_goals, planner_vgoals=self.planner_vgoals, planner_agoals=self.planner_agoals, + planner_jgoals=self.planner_jgoals, planner_is_static=self.planner_is_static, planner_h_ref=self.planner_h_ref, @@ -788,7 +792,7 @@ class LoggerControl(): import glob fileName = np.sort(glob.glob('data_2021_*.npz'))[-1] # Most recent file - data = np.load(fileName) + data = np.load(fileName, allow_pickle = True) # Load LoggerControl arrays self.joy_v_ref = data["joy_v_ref"] @@ -836,6 +840,7 @@ class LoggerControl(): self.planner_goals = data["planner_goals"] self.planner_vgoals = data["planner_vgoals"] self.planner_agoals = data["planner_agoals"] + self.planner_jgoals = data["planner_jgoals"] self.planner_is_static = data["planner_is_static"] self.planner_h_ref = data["planner_h_ref"] @@ -1068,13 +1073,20 @@ class LoggerControl(): if __name__ == "__main__": import LoggerSensors + import sys + import os + from sys import argv + sys.path.insert(0, os.getcwd()) # adds current directory to python path + + # Data file name to load + file_name = "/home/thomas_cbrs/Desktop/edin/quadruped-reactive-walking/scripts/crocoddyl_eval/logs/explore_weight_acc/data_2021_09_16_15_10_3.npz" # Create loggers loggerSensors = LoggerSensors.LoggerSensors(logSize=20000-3) logger = LoggerControl(0.001, 30, logSize=20000-3) # Load data from .npz file - logger.loadAll(loggerSensors) + logger.loadAll(loggerSensors, fileName= file_name) # Call all ploting functions logger.plotAll(loggerSensors) diff --git a/scripts/MPC_Wrapper.py b/scripts/MPC_Wrapper.py index 3061c7c7..fd82392a 100644 --- a/scripts/MPC_Wrapper.py +++ b/scripts/MPC_Wrapper.py @@ -1,5 +1,7 @@ # coding: utf8 +from operator import pos +from typing import FrozenSet import numpy as np import libquadruped_reactive_walking as MPC from multiprocessing import Process, Value, Array @@ -8,6 +10,9 @@ import crocoddyl_class.MPC_crocoddyl_planner as MPC_crocoddyl_planner import pinocchio as pin from time import time from enum import Enum +import libquadruped_reactive_walking as lqrw +import ctypes +from ctypes import Structure, c_double class MPC_type(Enum): OSQP = 0 @@ -16,6 +21,32 @@ class MPC_type(Enum): CROCODDYL_PLANNER = 3 CROCODDYL_PLANNER_TIME = 4 +class DataInCtype(Structure): + ''' Ctype data structure for the shared memory between processes. + ''' + params = lqrw.Params() # Object that holds all controller parameters + mpc_type = MPC_type(params.type_MPC) # MPC type + n_steps = np.int(params.T_mpc/params.dt_mpc) # Colomn size for xref (12 x n_steps) + N_gait = int(5 + params.T_mpc / params.dt_mpc) # Row size for fsteps (N_gait x 12), from utils_mpc.py + + if mpc_type == MPC_type.CROCODDYL_PLANNER: + _fields_ = [('k', ctypes.c_int64 ), + ('xref', ctypes.c_double * 12 * (n_steps+1) ), + ('fsteps', ctypes.c_double * 12 * N_gait ), + ('l_fsteps_target', ctypes.c_double * 3 * 4 ), + ('oRh', ctypes.c_double * 3 * 3 ), + ('oTh', ctypes.c_double * 3 * 1 ), + ('position', ctypes.c_double * 3 * 4 ), + ('velocity', ctypes.c_double * 3 * 4 ), + ('acceleration', ctypes.c_double * 3 * 4 ), + ('jerk', ctypes.c_double * 3 * 4 ), + ('dt_flying', ctypes.c_double * 4)] + else: + _fields_ = [('k', ctypes.c_int64 ), + ('xref', ctypes.c_double * 12 * (n_steps+1) ), + ('fsteps', ctypes.c_double * 12 * N_gait )] + + class Dummy: """Dummy class to store variables""" @@ -60,18 +91,17 @@ class MPC_Wrapper: self.N_gait = params.N_gait self.gait_past = np.zeros(4) self.gait_next = np.zeros(4) - self.mass = params.mass + self.mass = params.mass self.mpc_type = MPC_type(params.type_MPC) self.multiprocessing = params.enable_multiprocessing if self.multiprocessing: # Setup variables in the shared memory self.newData = Value('b', False) - self.newResult = Value('b', False) - if self.mpc_type == MPC_type.CROCODDYL_PLANNER: # Need more space to store optimized footsteps and l_fsteps to stop the optimization around it (+ oTh and oRh) - self.dataIn = Array('d', [0.0] * (1 + (np.int(self.n_steps)+1) * 12 + 12*self.N_gait + 12 + 9 + 3) ) + self.newResult = Value('b', False) + self.dataIn = Value(DataInCtype) + if self.mpc_type == MPC_type.CROCODDYL_PLANNER: self.dataOut = Array('d', [0] * 32 * (np.int(self.n_steps))) - else: - self.dataIn = Array('d', [0.0] * (1 + (np.int(self.n_steps)+1) * 12 + 12*self.N_gait)) + else: self.dataOut = Array('d', [0] * 24 * (np.int(self.n_steps))) self.fsteps_future = np.zeros((self.N_gait, 12)) self.running = Value('b', True) @@ -99,7 +129,8 @@ class MPC_Wrapper: self.last_available_result = np.zeros((24, (np.int(self.n_steps)))) self.last_available_result[:24, 0] = np.hstack((x_init, np.array([0.0, 0.0, 8.0] * 4))) - def solve(self, k, xref, fsteps, gait, l_targetFootstep, oRh = np.eye(3), oTh = np.zeros((3,1)) ): + def solve(self, k, xref, fsteps, gait, l_fsteps_target, oRh = np.eye(3), oTh = np.zeros((3,1)), + position = np.zeros((3,4)), velocity = np.zeros((3,4)) , acceleration = np.zeros((3,4)), jerk = np.zeros((3,4)) , dt_flying = np.zeros(4)): """Call either the asynchronous MPC or the synchronous MPC depending on the value of multiprocessing during the creation of the wrapper @@ -108,15 +139,15 @@ class MPC_Wrapper: xref (12xN): Desired state vector for the whole prediction horizon fsteps (12xN array): the [x, y, z]^T desired position of each foot for each time step of the horizon gait (4xN array): Contact state of feet (gait matrix) - l_targetFootstep (3x4 array) : 4*[x, y, z]^T target position in local frame, to stop the optimisation of the feet location around it + l_fsteps_target (3x4 array) : 4*[x, y, z]^T target position in local frame, to stop the optimisation of the feet location around it """ self.t_mpc_solving_start = time() if self.multiprocessing: # Run in parallel process - self.run_MPC_asynchronous(k, xref, fsteps, l_targetFootstep, oRh, oTh) + self.run_MPC_asynchronous(k, xref, fsteps, l_fsteps_target, oRh, oTh, position, velocity, acceleration, jerk, dt_flying) else: # Run in the same process than main loop - self.run_MPC_synchronous(k, xref, fsteps, l_targetFootstep, oRh, oTh) + self.run_MPC_synchronous(k, xref, fsteps, l_fsteps_target, oRh, oTh, position, velocity, acceleration, jerk, dt_flying) if not np.allclose(gait[0, :], self.gait_past): # If gait status has changed if np.allclose(gait[0, :], self.gait_next): # If we're still doing what was planned the last time MPC was solved @@ -157,14 +188,14 @@ class MPC_Wrapper: self.not_first_iter = True return self.last_available_result - def run_MPC_synchronous(self, k, xref, fsteps, l_targetFootstep, oRh, oTh): + def run_MPC_synchronous(self, k, xref, fsteps, l_fsteps_target, oRh, oTh, position, velocity, acceleration, jerk, dt_flying): """Run the MPC (synchronous version) to get the desired contact forces for the feet currently in stance phase Args: k (int): Number of inv dynamics iterations since the start of the simulation xref (12xN): Desired state vector for the whole prediction horizon fsteps (12xN array): the [x, y, z]^T desired position of each foot for each time step of the horizon - l_targetFootstep (3x4 array) : [x, y, z]^T target position in local frame, to stop the optimisation of the feet location around it + l_fsteps_target (3x4 array) : [x, y, z]^T target position in local frame, to stop the optimisation of the feet location around it """ # Run the MPC to get the reference forces and the next predicted state @@ -175,7 +206,7 @@ class MPC_Wrapper: self.mpc.run(np.int(k), xref.copy(), fsteps.copy()) elif self.mpc_type == MPC_type.CROCODDYL_PLANNER: # Add goal position to stop the optimisation # Crocoddyl MPC - self.mpc.solve(k, xref.copy(), fsteps.copy(), l_targetFootstep) + self.mpc.solve(k, xref.copy(), fsteps.copy(), l_fsteps_target, position, velocity, acceleration, jerk, oRh, oTh, dt_flying) else: # Crocoddyl MPC self.mpc.solve(k, xref.copy(), fsteps.copy()) @@ -186,7 +217,7 @@ class MPC_Wrapper: else: self.f_applied = self.mpc.get_latest_result() - def run_MPC_asynchronous(self, k, xref, fsteps, l_targetFootstep, oRh, oTh): + def run_MPC_asynchronous(self, k, xref, fsteps, l_fsteps_target, oRh, oTh, position, velocity, acceleration, jerk, dt_flying): """Run the MPC (asynchronous version) to get the desired contact forces for the feet currently in stance phase Args: @@ -194,7 +225,7 @@ class MPC_Wrapper: xref (12xN): Desired state vector for the whole prediction horizon fsteps (12xN array): the [x, y, z]^T desired position of each foot for each time step of the horizon params (object): stores parameters - l_targetFootstep (3x4 array) : [x, y, z]^T target position in local frame, to stop the optimisation of the feet location around it + l_fsteps_target (3x4 array) : [x, y, z]^T target position in local frame, to stop the optimisation of the feet location around it """ # If this is the first iteration, creation of the parallel process @@ -204,7 +235,7 @@ class MPC_Wrapper: p.start() # Stacking data to send them to the parallel process - self.compress_dataIn(k, xref, fsteps, l_targetFootstep, oRh, oTh) + self.compress_dataIn(k, xref, fsteps, l_fsteps_target, oRh, oTh, position, velocity, acceleration, jerk, dt_flying) self.newData.value = True return 0 @@ -231,17 +262,9 @@ class MPC_Wrapper: # Retrieve data thanks to the decompression function and reshape it if self.mpc_type != MPC_type.CROCODDYL_PLANNER: - kf, xref_1dim, fsteps_1dim = self.decompress_dataIn(dataIn) + k, xref, fsteps = self.decompress_dataIn(dataIn) else: - kf, xref_1dim, fsteps_1dim, l_target_1dim, oRh, oTh = self.decompress_dataIn(dataIn) - l_target = np.reshape(l_target_1dim, (3,4)) - oRh = np.reshape(oRh, (3,3)) - oTh = np.reshape(oTh, (3,1)) - - # Reshaping 1-dimensional data - k = int(kf[0]) - xref = np.reshape(xref_1dim, (12, self.n_steps+1)) - fsteps = np.reshape(fsteps_1dim, (self.N_gait, 12)) + k, xref, fsteps, l_fsteps_target, oRh, oTh, position, velocity, acceleration, jerk, dt_flying = self.decompress_dataIn(dataIn) # Create the MPC object of the parallel process during the first iteration if k == 0: @@ -262,7 +285,9 @@ class MPC_Wrapper: fsteps[np.isnan(fsteps)] = 0.0 loop_mpc.run(np.int(k), xref, fsteps) elif self.mpc_type == MPC_type.CROCODDYL_PLANNER: - loop_mpc.solve(k, xref.copy(), fsteps.copy(), l_target.copy()) + loop_mpc.solve(k, xref.copy(), fsteps.copy(), l_fsteps_target.copy(), + position.copy(), velocity.copy(), acceleration.copy(), jerk.copy(), + oRh.copy(), oTh.copy(), dt_flying.copy()) else: loop_mpc.solve(k, xref.copy(), fsteps.copy()) @@ -280,46 +305,69 @@ class MPC_Wrapper: return 0 - def compress_dataIn(self, k, xref, fsteps, l_targetFootstep, oRh, oTh): - """Compress data in a single C-type array that belongs to the shared memory to send data from the main control + def compress_dataIn(self, k, xref, fsteps, l_fsteps_target, oRh, oTh, position, velocity, acceleration, jerk, dt_flying): + """Compress data in a C-type structure that belongs to the shared memory to send data from the main control loop to the asynchronous MPC Args: k (int): Number of inv dynamics iterations since the start of the simulation fstep_planner (object): FootstepPlanner object of the control loop - l_targetFootstep (3x4 array) : [x, y, z]^T target position in local frame, to stop the optimisation of the feet location around it + l_fsteps_target (3x4 array) : [x, y, z]^T target position in local frame, to stop the optimisation of the feet location around it """ # Replace NaN values by 0.0 to be stored in C-type arrays fsteps[np.isnan(fsteps)] = 0.0 # Compress data in the shared input array - if self.mpc_type == MPC_type.CROCODDYL_PLANNER: - self.dataIn[:] = np.concatenate([[(k/self.k_mpc)], xref.ravel(), - fsteps.ravel(), l_targetFootstep.ravel(), oRh.ravel(), oTh.ravel()], axis=0) - else: - self.dataIn[:] = np.concatenate([[(k/self.k_mpc)], xref.ravel(), - fsteps.ravel()], axis=0) + with self.dataIn.get_lock(): + if self.mpc_type == MPC_type.CROCODDYL_PLANNER: + self.dataIn.k = k + np.frombuffer(self.dataIn.xref).reshape((12, self.n_steps+1))[:,:] = xref + np.frombuffer(self.dataIn.fsteps).reshape((self.N_gait, 12))[:,:] = fsteps + np.frombuffer(self.dataIn.l_fsteps_target).reshape((3, 4))[:,:] = l_fsteps_target + np.frombuffer(self.dataIn.oRh).reshape((3, 3))[:,:] = oRh + np.frombuffer(self.dataIn.oTh).reshape((3, 1))[:,:] = oTh + np.frombuffer(self.dataIn.position).reshape((3, 4))[:,:] = position + np.frombuffer(self.dataIn.velocity).reshape((3, 4))[:,:] = velocity + np.frombuffer(self.dataIn.acceleration).reshape((3, 4))[:,:] = acceleration + np.frombuffer(self.dataIn.jerk).reshape((3, 4))[:,:] = jerk + np.frombuffer(self.dataIn.dt_flying).reshape(4)[:] = dt_flying - return 0.0 + else: + self.dataIn.k = k + np.frombuffer(self.dataIn.xref).reshape((12, self.n_steps+1))[:,:] = xref + np.frombuffer(self.dataIn.fsteps).reshape((self.N_gait, 12))[:,:] = fsteps def decompress_dataIn(self, dataIn): - """Decompress data from a single C-type array that belongs to the shared memory to retrieve data from the main control + """Decompress data from a C-type structure that belongs to the shared memory to retrieve data from the main control loop in the asynchronous MPC Args: - dataIn (Array): shared array that contains the data the asynchronous MPC will use as inputs + dataIn (Array): shared C-type structure that contains the data the asynchronous MPC will use as inputs """ - # Sizes of the different variables that are stored in the C-type array - if self.mpc_type == MPC_type.CROCODDYL_PLANNER: - sizes = [0, 1, (np.int(self.n_steps)+1) * 12, 12*self.N_gait, 12, 9, 3] - else: - sizes = [0, 1, (np.int(self.n_steps)+1) * 12, 12*self.N_gait] - csizes = np.cumsum(sizes) + with dataIn.get_lock(): + if self.mpc_type == MPC_type.CROCODDYL_PLANNER: + k = self.dataIn.k + xref = np.frombuffer(self.dataIn.xref).reshape((12, self.n_steps+1)) + fsteps = np.frombuffer(self.dataIn.fsteps).reshape((self.N_gait, 12)) + l_fsteps_target = np.frombuffer(self.dataIn.l_fsteps_target).reshape((3,4)) + oRh = np.frombuffer(self.dataIn.oRh).reshape((3,3)) + oTh = np.frombuffer(self.dataIn.oTh).reshape((3,1)) + position = np.frombuffer(self.dataIn.position).reshape((3,4)) + velocity = np.frombuffer(self.dataIn.velocity).reshape((3,4)) + acceleration = np.frombuffer(self.dataIn.acceleration).reshape((3,4)) + jerk = np.frombuffer(self.dataIn.jerk).reshape((3,4)) + dt_flying = np.frombuffer(self.dataIn.dt_flying).reshape(4) + + return k, xref, fsteps, l_fsteps_target, oRh, oTh, position, velocity, acceleration, jerk, dt_flying + + else: + k = self.dataIn.k + xref = np.frombuffer(self.dataIn.xref).reshape((12, self.n_steps+1)) + fsteps = np.frombuffer(self.dataIn.fsteps).reshape((self.N_gait, 12)) - # Return decompressed variables in a list - return [dataIn[csizes[i]:csizes[i+1]] for i in range(len(sizes)-1)] + return k, xref, fsteps def convert_dataOut(self): """Return the result of the asynchronous MPC (desired contact forces) that is stored in the shared memory diff --git a/scripts/crocoddyl_class/MPC_crocoddyl_planner.py b/scripts/crocoddyl_class/MPC_crocoddyl_planner.py index 0b9ac289..dee9bfbb 100644 --- a/scripts/crocoddyl_class/MPC_crocoddyl_planner.py +++ b/scripts/crocoddyl_class/MPC_crocoddyl_planner.py @@ -5,6 +5,10 @@ import numpy as np import quadruped_walkgen as quadruped_walkgen import pinocchio as pin + + + + class MPC_crocoddyl_planner(): """Wrapper class for the MPC problem to call the ddp solver and retrieve the results. @@ -13,29 +17,29 @@ class MPC_crocoddyl_planner(): params (obj): Params object containing the simulation parameters mu (float): Friction coefficient inner (float): Inside or outside approximation of the friction cone (mu * 1/sqrt(2)) - linearModel(bool) : Approximation in the cross product by using desired state """ - def __init__(self, params, mu=1, inner=True, warm_start=True, min_fz=0.0): + def __init__(self, params, mu=1, inner=True): - self.dt = params.dt_mpc # Time step of the solver - self.T_mpc = params.T_mpc # Period of the MPC + self.dt = params.dt_mpc # Time step of the solver + self.dt_wbc = params.dt_wbc # Time step of the WBC + self.T_mpc = params.T_mpc # Period of the MPC + self.T_gait = params.T_gait # Period of the gait + self.N_gait = params.N_gait # Number of gait row self.n_nodes = int(self.T_mpc/self.dt) # Number of nodes self.mass = params.mass # Mass of the robot self.max_iteration = 10 # Max iteration ddp solver - self.gI = np.array(params.I_mat.tolist()).reshape((3, 3)) # Inertia matrix in ody frame + self.gI = np.array(params.I_mat.tolist()).reshape((3, 3)) # Inertia matrix in body frame # Friction coefficient if inner: self.mu = (1/np.sqrt(2))*mu else: - self.mu = mu - - # self.stateWeights = np.sqrt([2.0, 2.0, 20.0, 0.25, 0.25, 10.0, 0.2, 0.2, 0.2, 0.0, 0.0, 0.3]) + self.mu = mu # Weights Vector : States + # self.stateWeights = np.sqrt([2.0, 2.0, 20.0, 0.25, 0.25, 10.0, 0.2, 0.2, 0.2, 0.0, 0.0, 0.3]) self.w_x = 0.3 - # self.w_y = 0.3 self.w_y = 0.3 self.w_z = 20 self.w_roll = 0.9 @@ -50,33 +54,51 @@ class MPC_crocoddyl_planner(): self.stateWeights = np.array([self.w_x, self.w_y, self.w_z, self.w_roll, self.w_pitch, self.w_yaw, self.w_vx, self.w_vy, self.w_vz, self.w_vroll, self.w_vpitch, self.w_vyaw]) - self.forceWeights = 1*np.array(4*[0.007, 0.007, 0.007]) # Weight Vector : Force Norm - self.frictionWeights = 1. # Weight Vector : Friction cone cost - self.heuristicWeights = 0.*np.array(4*[0.03, 0.04]) # Weights on the heuristic term - self.stepWeights = 0.*np.full(8, 0.005) # Weight on the step command (distance between steps) - self.stopWeights = 0.*np.ones(8) # Weights to stop the optimisation at the end of the flying phase - self.shoulderContactWeight = 0.5*np.full(4,1.) # Weight for shoulder-to-contact penalty - self.shoulder_hlim = 0.235 - self.shoulderReferencePosition = False # Use the reference trajectory of the Com (True) or not (False) for shoulder/contact cost + self.forceWeights = 1*np.array(4*[0.007, 0.007, 0.007]) # Weight vector : Force Norm + self.frictionWeights = 1. # Weight vector : Friction cone penalisation + self.heuristicWeights = 0.*np.array(4*[0.03, 0.04]) # Weight vector : Heuristic term + self.stepWeights = 0.*np.full(8, 0.005) # Weight vector : Norm of step command (distance between steps) + self.stopWeights = 0.*np.ones(8) # Weight vector : Stop the optimisation at the end of the flying phase (position penalisation) + self.shoulderContactWeight = 1.5*np.full(4,1.) # Weight vector : Shoulder-to-contact penalisation + self.shoulder_hlim = 0.235 # Distance to activate the shoulder-to-contact penalisation + self.shoulderReferencePosition = False # Use the reference trajectory of the Com (True) or not (False) for shoulder-to-contact penalisation # TODO : create a proper warm-start with the previous optimisation self.warm_start = True - # Minimum normal force(N) and reference force vector bool + # Minimum/Maximal normal force(N) and relative_forces activation self.min_fz = 0. self.max_fz = 25. - self.relative_forces = True + self.relative_forces = True # F_ref = m*g/nb_contact + + # Acceleration penalty + self.is_acc_activated = False + self.acc_limit = np.array([60.,60.]) # Acceleration to activate the penalisation + self.acc_weight = 0.0001 + self.n_sampling = 8 + self.flying_foot_old = 4*[False] + self.dx_new_phase = 0. + self.dy_new_phase = 0. + + # Velocity penalty + self.is_vel_activated = True + self.vel_limit = np.array([4.,4.]) # Velocity to activate the penalisation + self.vel_weight = 1. + + # Jerk penalty + self.is_jerk_activated = True + self.jerk_weight = 1e-7 + self.jerk_alpha = 23 # Define the slope of the cost, not used + self.jerk_beta = 0. # Define the slope of the cost, not used # Offset CoM self.offset_com = -0.03 + self.vert_time = params.vert_time # Gait matrix self.gait = np.zeros((params.N_gait, 4)) self.gait_old = np.zeros(4) - # Position of the feet in local frame - self.fsteps = np.zeros((params.N_gait, 12)) - # List to generate the problem self.action_models = [] self.x_init = [] @@ -105,24 +127,21 @@ class MPC_crocoddyl_planner(): self.index_inital_stance_phase = [] # List of index to reset the stopWeights to 0 after optimisation # Initialize the lists of models - self.initialize_models(params) + self.initialize_models() - def initialize_models(self, params): + def initialize_models(self): ''' Reset the two lists of augmented and step-by-step models, to avoid recreating them at each loop. Not all models here will necessarily be used. - - Args : - - params : object containing the parameters of the simulation ''' self.models_augmented = [] self.models_step = [] - for _ in range(params.N_gait): + for _ in range(self.N_gait): model = quadruped_walkgen.ActionModelQuadrupedAugmented() self.update_model_augmented(model) self.models_augmented.append(model) - for _ in range(4 * int(params.T_mpc / params.T_gait)): + for _ in range(4 * int(self.T_mpc / self.T_gait)): model = quadruped_walkgen.ActionModelQuadrupedStep() self.update_model_step(model) self.models_step.append(model) @@ -131,52 +150,60 @@ class MPC_crocoddyl_planner(): self.terminal_model = quadruped_walkgen.ActionModelQuadrupedAugmented() self.update_model_augmented(self.terminal_model, terminal=True) - def solve(self, k, xref, footsteps, l_stop): - """ Solve the MPC problem + def solve(self, k, xref, footsteps, l_stop, position, velocity, acceleration, jerk, oRh, oTh, dt_flying): + """ Solve the MPC problem. Args: - k : Iteration - xref : the desired state vector - footsteps : current position of the feet (given by planner) - l_stop : current and target position of the feet (given by footstepTragectory generator) + k (int) : Iteration + xref (Array 12xN) : Desired state vector + footsteps (Array 12xN) : current position of the feet (given by planner) + l_stop (Array 3x4) : Current target position of the feet + position (Array 3x4) : Position of the flying feet + velocity (Array 3x4) : Velocity of the flying feet + acceleration (Array 3x4): Acceleration position of the flying feet + jerk (Array 3x4) : Jerk position of the flying feet + oRh (Array 3x3) : Rotation matrix from base to ideal/world frame + oTh (Array 3x1) : Translation vector from base to ideal/world frame + dt_flying (Array 4x) : Remaining timing of flying feet """ self.xref[:,:] = xref self.xref[2,:] += self.offset_com - self.updateProblem(k, self.xref, footsteps, l_stop) + self.updateProblem(k, self.xref, footsteps, l_stop, position, velocity, acceleration, jerk, oRh, oTh, dt_flying) self.ddp.solve(self.x_init, self.u_init, self.max_iteration) - - # np.set_printoptions(linewidth=300, precision=3) - # print(self.gait[:30,:]) - # idx = 0 - # for model in self.action_models: - # if model.__class__.__name__ == "ActionModelQuadrupedStep" : - # print(model.__class__.__name__) - # else: - # print(model.__class__.__name__ + " : " + str(model.shoulderContactWeight) + " - " + str(self.gait[idx,:])) - # idx += 1 - # from IPython import embed - # embed() - - # Reset to 0 the stopWeights for next optimisation + + # Reset to 0 the stopWeights for next optimisation (already 0.) for index_stopped in self.index_stop_optimisation: self.models_augmented[index_stopped].stopWeights = np.zeros(8) for index_stance in self.index_inital_stance_phase: self.models_augmented[index_stance].shoulderContactWeight = self.shoulderContactWeight - def updateProblem(self, k, xref, footsteps, l_stop): + def updateProblem(self, k, xref, footsteps, l_stop, position, velocity, acceleration, jerk, oRh, oTh, dt_flying): """ - Update the dynamic of the model list according to the predicted position of the feet, - and the desired state. + Update the models of the nodes according to parameters of the simulations. Args: + k (int) : Iteration + xref (Array 12xN) : Desired state vector + footsteps (Array 12xN) : current position of the feet (given by planner) + l_stop (Array 3x4) : Current target position of the feet + position (Array 3x4) : Position of the flying feet + velocity (Array 3x4) : Velocity of the flying feet + acceleration (Array 3x4): Acceleration position of the flying feet + jerk (Array 3x4) : Jerk position of the flying feet + oRh (Array 3x3) : Rotation matrix from base to ideal/world frame + oTh (Array 3x1) : Translation vector from base to ideal/world frame + dt_flying (Array 4x) : Remaining timing of flying feet """ + # Update internal matrix self.compute_gait_matrix(footsteps) + # Set up intial feet position : shoulder for flying feet and current foot position for feet on the ground p0 = (1.0 - np.repeat(self.gait[0, :], 2)) * self.shoulders \ + np.repeat(self.gait[0, :], 2) * footsteps[0, [0, 1, 3, 4, 6, 7, 9, 10]] + # Clear lists of models self.x_init.clear() self.u_init.clear() self.action_models.clear() @@ -208,17 +235,76 @@ class MPC_crocoddyl_planner(): self.index_inital_stance_phase.append(index_augmented) self.action_models.append(self.models_augmented[index_augmented]) - index_augmented += 1 + # Warm-start self.x_init.append(np.concatenate([xref[:, 0], p0])) self.u_init.append(np.repeat(self.gait[0, :], 3) * np.array(4*[0., 0., 2.5*9.81/np.sum(self.gait[0, :])])) + # Bool to activate the speed, acc or jerk on only current flying feet + is_first_step = True + while np.any(self.gait[j, :]): - if np.any(self.gait[j, :] - self.gait[j-1, :]): - # Step model + if np.any(self.gait[j, :] - self.gait[j-1, :]): # TODO Adapt this [1,1,1,1] --> [1,0,0,1] True here but not step model to add + + flying_feet = np.where(self.gait[j-1, :] == 0)[0] # Current flying feet + + # Get remaining dt incurrent flying phase + # TODO : dt_ is the same for all flying foot, could be different + dt_ = dt_flying[flying_feet[0]] + + # part_curve explanation for foot trajectory : + # ------- <- 2 (foot stopped, during params.vert_time) + # / + # / <- 1 (foot moving) + # / + # ------ <- 0 (foot stopped, during params.vert_time) + # TODO : get total flying period from gait and not T_gait/2 + part_curve = 0 + dt_ -= self.dt_wbc # dt_ has been computed on the previous loop, dt_wbc late + if dt_ <= 0.001 or dt_ >= self.T_gait/2 - self.vert_time: # Because of the delay, dt_ = 0 --> new flying phase + dt_ = self.T_gait/2 - 2*self.vert_time + part_curve = 0 + elif dt_ >= self.vert_time : + dt_ -= self.vert_time + part_curve = 1 + else: + dt_ = 0.002 # Close to 0 to avoid high jerk and thus keep the optimised fstep from previous cycle + part_curve = 2 + + if is_first_step : + # Acceleration cost + # if self.flying_foot_old != self.flying_foot : + # self.dx_new_phase = abs(footsteps[j, 3*flying_feet[0] ] - position[0,flying_feet[0] ]) + # self.dy_new_phase = abs(footsteps[j, 3*flying_feet[0]+1 ] - position[1,flying_feet[0] ]) + # self.acc_limit = 2*5.625*np.array([ self.dx_new_phase / 0.24**2 , self.dy_new_phase / 0.24**2]) + self.models_step[index_step].is_acc_activated = self.is_acc_activated + # self.models_step[index_step].acc_limit = self.acc_limit + + # Velocity cost + self.models_step[index_step].is_vel_activated = self.is_vel_activated + + if part_curve == 1 or part_curve == 2 : + self.models_step[index_step].is_jerk_activated = self.is_jerk_activated + # self.models_step[index_step].jerk_weight = self.exponential_cost(self.T_gait/2 - dt_) + # self.models_step[index_step].jerk_weight = self.jerk_weight + else : + self.models_step[index_step].jerk_weight = 0. + + # Update is_first_step bool + is_first_step = False + + else: + # Cost on the feet trajectory disabled + # TODO : Only activate cost on the maximum speed (simplify the equations in step model, vmax = T/2) + self.models_step[index_step].is_acc_activated = False + self.models_step[index_step].is_vel_activated = False + self.models_step[index_step].is_jerk_activated = False + dt_ = 0.24 + self.models_step[index_step].updateModel(np.reshape(footsteps[j, :], (3, 4), order='F'), - xref[:, j], self.gait[j, :] - self.gait[j-1, :]) + xref[:, j], self.gait[j, :] - self.gait[j-1, :], + position, velocity, acceleration, jerk, oRh, oTh, dt_) self.action_models.append(self.models_step[index_step]) # Augmented model @@ -366,6 +452,15 @@ class MPC_crocoddyl_planner(): model.heuristicWeights = np.zeros(8) model.stateWeights = self.stateWeights model.stepWeights = self.stepWeights + model.acc_limit = self.acc_limit + model.acc_weight = self.acc_weight + model.is_acc_activated = False + model.vel_limit = self.vel_limit + model.vel_weight = self.vel_weight + model.is_vel_activated = False + model.is_jerk_activated = False + model.jerk_weight = self.jerk_weight + model.set_sample_feet_traj(self.n_sampling) def compute_gait_matrix(self, footsteps): """ @@ -383,6 +478,7 @@ class MPC_crocoddyl_planner(): j += 1 # Get the current flying feet and the number of nodes + self.flying_foot_old[:] = self.flying_foot[:] # Update old matrix, usefull when new phase start for foot in range(4): row = 0 if self.gait[0, foot] == 0: @@ -403,5 +499,14 @@ class MPC_crocoddyl_planner(): self.stance_foot_nodes[foot] = int(row) else: self.stance_foot[foot] = False + + def exponential_cost(self, t): + """ + Returns weight_jerk * exp(alpha(t-beta)) - 1 + Args: + t (float) : time in s + """ + + return self.jerk_weight * (np.exp(self.jerk_alpha*(t - self.jerk_beta)) - 1) -- GitLab