diff --git a/python/quadruped_reactive_walking/WB_MPC/CasadiOCP.py b/python/quadruped_reactive_walking/WB_MPC/CasadiOCP.py index 2c050cb41cb10b9e366858bb1f95b50a92bc4088..966cb15828226baaac7fc153bdbbac3cd40f34d1 100644 --- a/python/quadruped_reactive_walking/WB_MPC/CasadiOCP.py +++ b/python/quadruped_reactive_walking/WB_MPC/CasadiOCP.py @@ -11,8 +11,7 @@ import pinocchio.casadi as cpin class NodeData: - """ - Data stored by each shooting node of the OCP + """Data stored by each shooting node of the OCP Attributes: x (vector): State vector (q + dq) @@ -22,6 +21,9 @@ class NodeData: u (vector): Control vector (tau) f (vector): Contact forces xnext (vector): Next state vector + + Args: + isTerminal (bool): whether this is a terminal node """ def __init__(self, isTerminal=False): @@ -36,8 +38,7 @@ class NodeData: class OcpResult: - """ - Result of the OCP + """Result of the OCP Attributes: xs (vector): State trajectory (q + dq) @@ -54,34 +55,35 @@ class OcpResult: class OCP: - """ - Optimal Control Problem + """Optimal Control Problem Args: pd (ProblemData): data related to the OCP - gait (list): list of list containing contact pattern i.e. for two steps[[1,1,1,1], [1,0,0,1]] - Its length determines the horizon of the ocp + gait (array): array containing the contact pattern i.e. for two steps[[1,1,1,1], [1,0,0,1]] + It has one column for each feet, the number of rows determines the horizon of the ocp Attributes: pd (ProblemData): data related to the OCP results (OcpResult): result of the OCP - shooting_nodes (set): set of shooting nodes of the OCP + shooting_nodes (dict): shooting nodes corresponding to possible contact status + runningModels (list of ShootingNode): running nodes ordered according to gait matrix + terminalModel (ShootingNode): terminal node of the OCP + datas (list of NodeData): list of node data, one for each node of the OCP + opti (casadi.Opti): optimization problem wrapper + xs (vector): State trajectory (q + dq) + dxs (vector): Derivative of state trajectory + a (vector): Acceleration slack trajectory + us (vector): Control trajectory (tau) + fs (vector): Contact forces trajectories + cost(casadi.MX): Costs of the OCP """ - def __init__(self, pd: ProblemData, tasks, gait): - """Define an optimal ocntrol problem. - :param robot: Pinocchio RobotWrapper instance - :param gait: list of list containing contact pattern i.e. for two steps[ [1,1,1,1], [1,0,0,1] ]. \ - Its length determines the horizon of the ocp - :param x0: starting configuration - :param x_ref: reference configuration - :param target: array of target positions - :param dt: timestep integration - """ + def __init__(self, pd: ProblemData, gait): + self.pd = pd self.results = OcpResult() - # build the different shooting nodes + # Build the types of shooting nodes based on the contact statuses in the gait matrix self.shooting_nodes = { contacts: ShootingNode(pd, contacts) for contacts in set( @@ -92,28 +94,40 @@ class OCP: ) } - def solve(self, x0, tasks, gait, guess={}): - """ Solve the NLP - :param guess: ditctionary in the form: {'xs':array([T+1, n_states]), 'acs':array([T, nv]),\ - 'us':array([T, nu]), 'fs': [array([T, nv]),]} + def solve(self, x0, targets, gait, guess={}): + """Solve the OCP + + Args: + x0 (vector): initial state + targets (list of arrays): targets of the whole-body tasks (lin and ang velocities) + gait (array): array containing the contact pattern + guess (dict): dict containing values for the warm start + In the form: {'xs':array([T+1, n_states]), 'acs':array([T, nv]), + 'us':array([T, nu]), 'fs': [array([T, nv]),]} """ + + # Assemble the sequence of nodes from the dict of shooting nodes created at initialization self.runningModels = [ self.shooting_nodes[self.pattern_to_id(gait[t])] for t in range(self.pd.T) ] self.terminalModel = self.shooting_nodes[self.pattern_to_id(gait[self.pd.T])] - self.datas = [] - for i in range(self.pd.T + 1): - if i == self.pd.T: - self.datas += [NodeData(True)] - else: - self.datas += [NodeData()] + # Create node data for the T running nodes and the terminal node + self.datas = [NodeData() for _ in range(self.pd.T)] + [NodeData(True)] + + # Create the various optimization variables of the problem and initialize them self.make_opti_variables(x0) - self.cost, eq_constraints = self.make_ocp(tasks, gait) + # Assemble the optimization control problem + self.cost, eq_constraints = self.make_ocp(targets, gait) + + # Set the costs to be minimized ... self.opti.minimize(self.cost) + + # ... subject to these equality constraints self.opti.subject_to(eq_constraints == 0) + # Problem and solver parameters p_opts = {"expand": False} s_opts = { "tol": 1e-4, @@ -124,14 +138,15 @@ class OCP: # "resto_failure_feasibility_threshold": 1 # "linear_solver": "ma57" } - self.opti.solver("ipopt", p_opts, s_opts) + # Warm start the OCP with initial value and guess self.warmstart(x0, guess) - # SOLVE + # Solve the optimal control problem self.opti.solve_limited() + # Store state of each node after the solving process for data in self.datas: data.x = self.opti.value(data.x) data.cost = self.opti.value(data.cost) @@ -140,8 +155,16 @@ class OCP: data.xnext = self.opti.value(data.xnext) def make_opti_variables(self, x0): + """Create the optimization variables of the OCP and initialize them + + Args: + x0 (vector): initial state + """ + + # Casadi optimization problem opti = casadi.Opti() self.opti = opti + # Optimization variables self.dxs = [ opti.variable(self.pd.ndx) @@ -159,10 +182,22 @@ class OCP: self.fs.append(f_tmp) self.fs = self.fs - def make_ocp(self, tasks, gait): + def make_ocp(self, targets, gait): + """Assemble the optimization control problem + + Args: + targets (list of arrays): targets of the whole-body tasks (lin and ang velocities) + gait (array): array containing the contact pattern + + Returns: + totalcost (casadi.MX): Costs of the OCP + eq_constraints (casadi.MX): Equality constraints of the OCP + """ + totalcost = 0 eq = [] eq.append(self.dxs[0]) + # Gather costs and equality constraints for each running node for t in range(self.pd.T): # These change every time! Do not use runningModels[0].x to get the state!!!! use xs[0] self.runningModels[t].init( @@ -170,7 +205,7 @@ class OCP: ) xnext, rcost = self.runningModels[t].calc( - x_ref=self.pd.xref, u_ref=self.pd.uref, target=tasks + x_ref=self.pd.xref, u_ref=self.pd.uref, target=targets ) # If it is landing @@ -189,6 +224,7 @@ class OCP: totalcost += rcost eq.append(self.xs[self.pd.T][self.pd.nq :]) + # Gather costs and equality constraints for terminal node self.terminalModel.init( data=self.datas[self.pd.T], x=self.xs[self.pd.T], isTerminal=True ) @@ -200,6 +236,15 @@ class OCP: return totalcost, eq_constraints def warmstart(self, x0, guess=None): + """Warm start the OCP + + Args: + x0 (vector): initial state + guess (dict): dict containing values for the warm start + In the form: {'xs':array([T+1, n_states]), 'acs':array([T, nv]), + 'us':array([T, nu]), 'fs': [array([T, nv]),]} + """ + for g in guess: if guess[g] == []: print("No warmstart provided") @@ -276,11 +321,16 @@ class OCP: return self.results.x, self.results.a, self.results.u, self.results.fs, 0 def get_feet_position(self, xs_sol): - """Get the feet positions computed durnig one ocp + """Get feet positions over the prediction horizon - :param xs_sol: Array of dimension [n_steps, n_states] + Args: + xs_sol (array): State trajectory solution of the OCP + Array of dimension [n_steps, n_states] + Returns: + feet_log (dict): dict of position trajectories, one entry per foot """ + feet_log = {i: [] for i in self.pd.allContactIds} for foot in feet_log: tmp = [] @@ -291,11 +341,16 @@ class OCP: return feet_log def get_feet_velocity(self, xs_sol): - """Get the feet velocities in LOCAL_WORLD_ALIGNED frame computed durnig one ocp + """Get feet velocities over the prediction horizon - :param xs_sol: Array of dimension [n_steps, n_states] + Args: + xs_sol (array): State trajectory solution of the OCP + Array of dimension [n_steps, n_states] + Returns: + feet_log (dict): dict of velocity trajectories, one entry per foot """ + feet_log = {i: [] for i in self.pd.allContactIds} for foot in feet_log: tmp = [] @@ -306,11 +361,16 @@ class OCP: return feet_log def get_base_log(self, xs_sol): - """Get the base positions computed durnig one ocp + """Get base positions over the prediction horizon - :param xs_sol: Array of dimension [n_steps, n_states] + Args: + xs_sol (array): State trajectory solution of the OCP + Array of dimension [n_steps, n_states] + Returns: + base_pos_log (array): position of the base over the prediction horizon """ + base_pos_log = [] [ base_pos_log.append( @@ -322,4 +382,13 @@ class OCP: return base_pos_log def pattern_to_id(self, gait): + """Get the key corresponding to a contact status, to be used with the dict of shooting nodes + + Args: + gait (array): Contact status + + Returns: + tuple: key corresponding to the contact status + """ + return tuple(self.pd.allContactIds[i] for i, c in enumerate(gait) if c == 1) diff --git a/python/quadruped_reactive_walking/WB_MPC_Wrapper.py b/python/quadruped_reactive_walking/WB_MPC_Wrapper.py index 2f93c1a13d78568eb11751409d52446ffe2c4cff..71a9e2f1c1073774b3b6b7074f17ca5b4a42c8f0 100644 --- a/python/quadruped_reactive_walking/WB_MPC_Wrapper.py +++ b/python/quadruped_reactive_walking/WB_MPC_Wrapper.py @@ -52,7 +52,7 @@ class MPC_Wrapper: self.out_k = Array("d", [0] * (pd.T * pd.nu * pd.ndx)) self.out_solving_time = Value("d", 0.0) else: - self.ocp = OCP(pd, footsteps, gait) + self.ocp = OCP(pd, gait) self.last_available_result = Result(pd) self.new_result = Value("b", False) diff --git a/python/quadruped_reactive_walking/main_solo12_control.py b/python/quadruped_reactive_walking/main_solo12_control.py index 97f58caa2d21a6e16cfdbb411a1c2d603bf3febf..1a8220778305034f0fbaf5d5b2023657af39ddbe 100644 --- a/python/quadruped_reactive_walking/main_solo12_control.py +++ b/python/quadruped_reactive_walking/main_solo12_control.py @@ -2,7 +2,7 @@ import threading import time from pathlib import Path import numpy as np -import git +# import git from datetime import datetime import quadruped_reactive_walking as qrw @@ -14,9 +14,11 @@ from .WB_MPC.ProblemData import ProblemData, ProblemDataFull params = qrw.Params() # Object that holds all controller parameters pd = ProblemData(params) +""" repo = git.Repo(search_parent_directories=True) sha = repo.head.object.hexsha msg = repo.head.object.message + "\nCommit: " + sha +""" if params.SIMULATION: from .tools.PyBulletSimulator import PyBulletSimulator @@ -222,8 +224,8 @@ def control_loop(): log_path = Path("/tmp") / "logs" / date_str log_path.mkdir(parents=True) loggerControl.save(str(log_path)) - with open(str(log_path / "readme.txt"), "w") as f: - f.write(msg) + # with open(str(log_path / "readme.txt"), "w") as f: + # f.write(msg) if params.PLOTTING: loggerControl.plot(save=True, fileName=str(log_path))