Skip to content
Snippets Groups Projects
Controller.py 13.7 KiB
Newer Older
Fanny Risbourg's avatar
Fanny Risbourg committed
import numpy as np
Fanny Risbourg's avatar
Fanny Risbourg committed
import pybullet as pyb
from . import WB_MPC_Wrapper
    """
    Object to store the result of the control loop
    It contains what is sent to the robot (gains, desired positions and velocities,
    feedforward torques)
    """
    def __init__(self, params):
        self.P = np.array(params.Kp_main.tolist() * 4)
        self.D = np.array(params.Kd_main.tolist() * 4)
        self.FF = params.Kff_main * np.ones(12)
        self.q_des = np.zeros(12)
        self.v_des = np.zeros(12)
        self.tau_ff = np.zeros(12)
Fanny Risbourg's avatar
Fanny Risbourg committed
class Interpolator:
Ale's avatar
Ale committed
    def __init__(self, params):
Fanny Risbourg's avatar
Fanny Risbourg committed
        self.dt = params.dt_mpc
        self.type = params.interpolation_type

        self.q0 = np.zeros(3)
        self.q1 = np.zeros(3)
        self.v0 = np.zeros(3)
        self.v1 = np.zeros(3)

Fanny Risbourg's avatar
Fanny Risbourg committed
        if self.type == 3:
            self.ts = np.repeat(np.linspace(0, 2 * self.dt, 3), 2)
        else:
            self.alpha = np.zeros(3)
            self.beta = np.zeros(3)
            self.gamma = np.zeros(3)
            self.delta = 1.0

    def update(self, x0, x1, x2=None):
Fanny Risbourg's avatar
Fanny Risbourg committed
        self.q0 = x0[:3]
        self.q1 = x1[:3]
        self.v0 = x0[3:]
        self.v1 = x1[3:]
        if self.type == 0:  # Linear
            self.alpha = 0.0
            self.beta = self.v1
            self.gamma = self.q0
        elif self.type == 1:  # Quadratic fixed velocity
            self.alpha = 2 * (self.q1 - self.q0 - self.v0 * self.dt) / self.dt**2
            self.beta = self.v0
            self.gamma = self.q0
        elif self.type == 2:  # Quadratic time variable
            for i in range(3):
                q0 = self.q0[i]
                v0 = self.v0[i]
                q1 = self.q1[i]
                v1 = self.v1[i]
                if (q1 == q0) or (v1 == -v0):
                    self.alpha[i] = 0.0
                    self.beta[i] = 0.0
                    self.gamma[i] = q1
                    self.delta = 1.0
                else:
                    self.alpha[i] = (v1**2 - v0**2) / (2 * (q1 - q0))
                    self.beta[i] = v0
                    self.gamma[i] = q0
                    self.delta = 2 * (q1 - q0) / (v1 + v0) / self.dt
Fanny Risbourg's avatar
Fanny Risbourg committed
        elif self.type == 3: # Spline interpolation
            from scipy.interpolate import KroghInterpolator

            if x2 is not None:
                self.q2 = x2[:3]
                self.v2 = x2[3:]
                self.y = [self.q0, self.v0, self.q1, self.v1, self.q2, self.v2]
                self.krog = KroghInterpolator(self.ts, np.array(self.y))
            else:
                self.y = [self.q0, self.v0, self.q1, self.v1]
                self.krog = KroghInterpolator(self.ts[:4], np.array(self.y))
Ale's avatar
Ale committed

    def interpolate(self, t):
Fanny Risbourg's avatar
Fanny Risbourg committed
        if self.type == 3:
            q = self.krog(t)
            v = self.krog.derivative(t)
            return q, v

Fanny Risbourg's avatar
Fanny Risbourg committed
        if self.type == 2:
            t *= self.delta
        q = 1 / 2 * self.alpha * t**2 + self.beta * t + self.gamma
        v = self.v1 if self.type == 2 else self.alpha * t + self.beta
Ale's avatar
Ale committed

Fanny Risbourg's avatar
Fanny Risbourg committed
        return q, v
Ale's avatar
Ale committed

Fanny Risbourg's avatar
Fanny Risbourg committed
    def plot(self, n, dt):
Ale's avatar
Ale committed
        import matplotlib.pyplot as plt
Fanny Risbourg's avatar
Fanny Risbourg committed
        ts = np.linspace(0.0, 2 * self.dt, 2 * n + 1)
Ale's avatar
Ale committed
        plt.style.use("seaborn")
        for i in range(3):
            plt.subplot(3, 2, (i * 2) + 1)
Ale's avatar
Ale committed
            plt.title("Position interpolation")
Fanny Risbourg's avatar
Fanny Risbourg committed
            plt.plot(ts, [self.interpolate(t)[0][i] for t in ts])
Fanny Risbourg's avatar
Fanny Risbourg committed
            plt.scatter(y=self.q0[i], x=0.0, color="violet", marker="+")
            plt.scatter(y=self.q1[i], x=self.dt, color="violet", marker="+")
Fanny Risbourg's avatar
Fanny Risbourg committed
            if self.type == 3 and self.q2 is not None:
                plt.scatter(y=self.q2[i], x=2 * self.dt, color="violet", marker="+")
Ale's avatar
Ale committed

            plt.subplot(3, 2, (i * 2) + 2)
Ale's avatar
Ale committed
            plt.title("Velocity interpolation")
Fanny Risbourg's avatar
Fanny Risbourg committed
            plt.plot(ts, [self.interpolate(t)[1][i] for t in ts])
Fanny Risbourg's avatar
Fanny Risbourg committed
            plt.scatter(y=self.v0[i], x=0.0, color="violet", marker="+")
            plt.scatter(y=self.v1[i], x=self.dt, color="violet", marker="+")
Fanny Risbourg's avatar
Fanny Risbourg committed
            if self.type == 3 and self.v2 is not None:
                plt.scatter(y=self.v2[i], x=2 * self.dt, color="violet", marker="+")
Ale's avatar
Ale committed

        plt.show()


class DummyDevice:
        self.imu = self.IMU()
        self.joints = self.Joints()
Fanny Risbourg's avatar
Fanny Risbourg committed
        self.base_position = np.zeros(3)
        self.base_position[2] = 0.1944
        self.b_base_velocity = np.zeros(3)
Ale's avatar
Ale committed
        self.baseState = tuple()
        self.baseVel = tuple()
    class IMU:
        def __init__(self):
            self.linear_acceleration = np.zeros(3)
            self.gyroscope = np.zeros(3)
            self.attitude_euler = np.zeros(3)
            self.attitude_quaternion = np.zeros(4)

    class Joints:
        def __init__(self):
            self.positions = np.zeros(12)
            self.velocities = np.zeros(12)
Ale's avatar
Ale committed
    def __init__(self, pd, target, params, q_init, t):
Fanny Risbourg's avatar
Fanny Risbourg committed
        """
        Function that computes the reference control (tau, q_des, v_des and gains)
            q_init (array): initial position of actuators
            t (float): time of the simulation
Fanny Risbourg's avatar
Fanny Risbourg committed
        self.q_security = np.array([1.2, 2.1, 3.14] * 4)
thomascbrs's avatar
thomascbrs committed

Ale's avatar
Ale committed
        self.pd = pd
        self.target = target
        self.params = params
        self.q_init = pd.q0
Fanny Risbourg's avatar
Fanny Risbourg committed
        self.k = 0
        self.error = False
Your Name's avatar
Your Name committed
        self.initialized = False
Fanny Risbourg's avatar
Fanny Risbourg committed

        self.result = Result(params)
Fanny Risbourg's avatar
Fanny Risbourg committed
        self.result.q_des = self.pd.q0[7:].copy()
        self.result.v_des = self.pd.v0[6:].copy()
Fanny Risbourg's avatar
Fanny Risbourg committed

        self.mpc = WB_MPC_Wrapper.MPC_Wrapper(pd, target, params)
        self.mpc_solved = False
        self.k_result = 0
        self.k_solve = 0
        if self.params.interpolate_mpc:
            self.interpolator = Interpolator(params)
Ale's avatar
Ale committed
        try:
            file = np.load("/tmp/init_guess.npy", allow_pickle=True).item()
            self.xs_init = list(file["xs"])
            self.us_init = list(file["us"])
            print("Initial guess loaded \n")
Ale's avatar
Ale committed
        except:
            self.xs_init = None
            self.us_init = None
            print("No initial guess\n")

        device = DummyDevice()
        device.joints.positions = q_init
        self.compute(device)

thomascbrs's avatar
thomascbrs committed
    def compute(self, device, qc=None):
        """
        Run one iteration of the main control loop
        Args:
            device (object): Interface with the masterboard or the simulation
        """
        t_start = time.time()
Ale's avatar
Ale committed
        m = self.read_state(device)
        t_measures = time.time()
        self.t_measures = t_measures - t_start

Fanny Risbourg's avatar
Fanny Risbourg committed
        if self.k % self.pd.mpc_wbc_ratio == 0:
            if self.mpc_solved:
                self.k_solve = self.k
                self.mpc_solved = False
Fanny Risbourg's avatar
Fanny Risbourg committed
            self.target.update(self.k // self.pd.mpc_wbc_ratio)
            self.target.shift_gait()
            try:
odri's avatar
odri committed
                self.mpc.solve(self.k, m["x_m"], self.xs_init, self.us_init)
odri's avatar
odri committed
            except ValueError:
                self.error = True
                print("MPC Problem")
Fanny Risbourg's avatar
Fanny Risbourg committed

        t_mpc = time.time()
        self.t_mpc = t_mpc - t_measures
        if not self.error:
            self.mpc_result = self.mpc.get_latest_result()
Fanny Risbourg's avatar
Fanny Risbourg committed
            xs = self.mpc_result.xs
Fanny Risbourg's avatar
Fanny Risbourg committed
            if self.mpc_result.new_result:
                self.mpc_solved = True
                self.k_new = self.k
                print(f"MPC solved in {self.k - self.k_solve} iterations")
Fanny Risbourg's avatar
Fanny Risbourg committed
            if not self.initialized and self.params.save_guess:
                self.save_guess()
Ale's avatar
Ale committed

Fanny Risbourg's avatar
Fanny Risbourg committed
            self.result.FF = self.params.Kff_main * np.ones(12)
            self.result.tau_ff[3:6] = self.compute_torque(m)[:]
Fanny Risbourg's avatar
Fanny Risbourg committed

            if self.params.interpolate_mpc:
Fanny Risbourg's avatar
Fanny Risbourg committed
                if self.mpc_result.new_result:
Fanny Risbourg's avatar
Fanny Risbourg committed
                    if self.params.interpolation_type == 3:
                        self.interpolator.update(xs[0], xs[1], xs[2])
Fanny Risbourg's avatar
Fanny Risbourg committed
                    # self.interpolator.plot(self.pd.mpc_wbc_ratio, self.pd.dt_wbc)
Ale's avatar
Ale committed

Fanny Risbourg's avatar
Fanny Risbourg committed
                t = (self.k - self.k_solve + 1) * self.pd.dt_wbc
                q, v = self.interpolator.interpolate(t)
Fanny Risbourg's avatar
Fanny Risbourg committed
            else:
                q, v = self.integrate_x(m)
Fanny Risbourg's avatar
Fanny Risbourg committed

Fanny Risbourg's avatar
Fanny Risbourg committed
            self.result.q_des[3:6] = q[:]
            self.result.v_des[3:6] = v[:]
            self.xs_init = self.mpc_result.xs[1:] + [self.mpc_result.xs[-1]]
            self.us_init = self.mpc_result.us[1:] + [self.mpc_result.us[-1]]
Ale's avatar
Ale committed

        t_send = time.time()
        self.t_send = t_send - t_mpc
Ale's avatar
Ale committed
        # self.clamp_result(device)
        # self.security_check(m)
Fanny Risbourg's avatar
Fanny Risbourg committed

        if self.error:
            self.set_null_control()
        # self.pyb_camera(device)
        self.t_loop = time.time() - t_start
Your Name's avatar
Your Name committed
        self.initialized = True
        return self.error
Fanny Risbourg's avatar
Fanny Risbourg committed
    def pyb_camera(self, device):
Fanny Risbourg's avatar
Fanny Risbourg committed
        Update position of PyBullet camera on the robot position to do as if it was
        attached to the robot
Ale's avatar
Ale committed
        if self.k > 10 and self.params.enable_pyb_GUI:
Fanny Risbourg's avatar
Fanny Risbourg committed
            pyb.resetDebugVisualizerCamera(
                cameraDistance=0.6,
                cameraYaw=45,
                cameraPitch=-39.9,
                cameraTargetPosition=[device.height[0], device.height[1], 0.0],
Fanny Risbourg's avatar
Fanny Risbourg committed
            )
Your Name's avatar
Your Name committed
    def security_check(self, m):
        """
        Check if the command is fine and set the command to zero in case of error
        """
Fanny Risbourg's avatar
Fanny Risbourg committed

Your Name's avatar
Your Name committed
        if not self.error:
            if (np.abs(m["qj_m"]) > self.q_security).any():
Fanny Risbourg's avatar
Fanny Risbourg committed
                print("-- POSITION LIMIT ERROR --")
Your Name's avatar
Your Name committed
                print(m["qj_m"])
Your Name's avatar
Your Name committed
                print(np.abs(m["qj_m"]) > self.q_security)
Fanny Risbourg's avatar
Fanny Risbourg committed
                self.error = True
Ale's avatar
Ale committed
            elif (np.abs(m["vj_m"]) > 1000 * np.pi / 180).any():
Fanny Risbourg's avatar
Fanny Risbourg committed
                print("-- VELOCITY TOO HIGH ERROR --")
Your Name's avatar
Your Name committed
                print(m["vj_m"])
Ale's avatar
Ale committed
                print(np.abs(m["vj_m"]) > 1000 * np.pi / 180)
Fanny Risbourg's avatar
Fanny Risbourg committed
                self.error = True
Ale's avatar
Ale committed
            elif (np.abs(self.result.FF) > 3.2).any():
Fanny Risbourg's avatar
Fanny Risbourg committed
                print("-- FEEDFORWARD TORQUES TOO HIGH ERROR --")
Ale's avatar
Ale committed
                print(self.result.FF)
Ale's avatar
Ale committed
                print(np.abs(self.result.FF) > 3.2)
Ale's avatar
Ale committed
                self.error = True

    def clamp(self, num, min_value=None, max_value=None):
        clamped = False
Ale's avatar
Ale committed
        if min_value is not None and num <= min_value:
            num = min_value
            clamped = True
Ale's avatar
Ale committed
        if max_value is not None and num >= max_value:
            num = max_value
            clamped = True
        return clamped

    def clamp_result(self, device, set_error=False):
        """
        Clamp the result
        """
Fanny Risbourg's avatar
Fanny Risbourg committed
        hip_max = 120.0 * np.pi / 180.0
        knee_min = 5.0 * np.pi / 180.0
        for i in range(4):
            if self.clamp(self.result.q_des[3 * i + 1], -hip_max, hip_max):
                print("Clamping hip n " + str(i))
                self.error = set_error
Ale's avatar
Ale committed
            if self.q_init[7 + 3 * i + 2] >= 0.0 and self.clamp(
Fanny Risbourg's avatar
Fanny Risbourg committed
                self.result.q_des[3 * i + 2], knee_min
            ):
                print("Clamping knee n " + str(i))
                self.error = set_error
Ale's avatar
Ale committed
            elif self.q_init[7 + 3 * i + 2] <= 0.0 and self.clamp(
Fanny Risbourg's avatar
Fanny Risbourg committed
                self.result.q_des[3 * i + 2], max_value=-knee_min
            ):
                print("Clamping knee n " + str(i))
                self.error = set_error

        for i in range(12):
Fanny Risbourg's avatar
Fanny Risbourg committed
            if self.clamp(
                self.result.q_des[i],
                device.joints.positions[i] - 4.0,
                device.joints.positions[i] + 4.0,
            ):
                print("Clamping position difference of motor n " + str(i))
                self.error = set_error

Fanny Risbourg's avatar
Fanny Risbourg committed
            if self.clamp(
                self.result.v_des[i],
                device.joints.velocities[i] - 100.0,
                device.joints.velocities[i] + 100.0,
            ):
                print("Clamping velocity of motor n " + str(i))
                self.error = set_error

Ale's avatar
Ale committed
            if self.clamp(self.result.tau_ff[i], -3.2, 3.2):
                print("Clamping torque of motor n " + str(i))
                self.error = set_error

    def set_null_control(self):
        """
        Send default null values to the robot
        """
        self.result.P = np.zeros(12)
        self.result.D = 0.1 * np.ones(12)
Fanny Risbourg's avatar
Fanny Risbourg committed
        self.result.FF = np.zeros(12)
        self.result.q_des[:] = np.zeros(12)
        self.result.v_des[:] = np.zeros(12)
        self.result.tau_ff[:] = np.zeros(12)
Ale's avatar
Ale committed

Fanny Risbourg's avatar
Fanny Risbourg committed
    def save_guess(self):
        """
        Save the result of the MPC in a file called /tmp/init_guess.npy
        """
        np.save(
            open("/tmp/init_guess.npy", "wb"),
            {"xs": self.mpc_result.xs, "us": self.mpc_result.us},
        )
        print("Initial guess saved")

Ale's avatar
Ale committed
    def read_state(self, device):
        qj_m = device.joints.positions
        vj_m = device.joints.velocities
Ale's avatar
Ale committed
        x_m = np.concatenate([qj_m[3:6], vj_m[3:6]])
Fanny Risbourg's avatar
Fanny Risbourg committed
        return {"qj_m": qj_m, "vj_m": vj_m, "x_m": x_m}
Ale's avatar
Ale committed

Fanny Risbourg's avatar
Fanny Risbourg committed
    def compute_torque(self, m):
        """
        Compute the feedforward torque using ricatti gains
        """
        # x_diff = np.concatenate(
        #     [
        #         pin.difference(
        #             self.pd.model,
        #             m["x_m"][: self.pd.nq],
        #             self.mpc_result.xs[0][: self.pd.nq],
        #         ),
        #         m["x_m"][self.pd.nq :] - self.mpc_result.xs[0][self.pd.nq :],
        #     ]
        # )
        x_diff = m["x_m"] - self.mpc_result.xs[0]
Fanny Risbourg's avatar
Fanny Risbourg committed
        tau = self.mpc_result.us[0] + np.dot(self.mpc_result.K[0], x_diff)
        return tau

    def integrate_x(self, m):
Fanny Risbourg's avatar
Fanny Risbourg committed
        """
        Integrate the position and velocity using the acceleration computed from the
        feedforward torque
        """
Fanny Risbourg's avatar
Fanny Risbourg committed
        q0 = m["qj_m"][3:6].copy()
        v0 = m["vj_m"][3:6].copy()
        tau = self.result.tau_ff[3:6].copy()
Fanny Risbourg's avatar
Fanny Risbourg committed

Fanny Risbourg's avatar
Fanny Risbourg committed
        a = pin.aba(self.pd.model, self.pd.rdata, q0, v0, tau)
Fanny Risbourg's avatar
Fanny Risbourg committed

        v = v0 + a * self.params.dt_wbc
        q = q0 + v * self.params.dt_wbc

        return q, v