import numpy as np
from .gem_controller import GemController
[docs]
class MPCCurrentController(GemController):
def __init__(self, env, env_id, prediction_horizon=1, w_d=1.0, w_q=1.0):
"""
Args:
env: Gym environment instance
env_id: Environment ID string
prediction_horizon: Prediction horizon (N)
w_d: Weight for d-axis current error
w_q: Weight for q-axis current error
"""
super().__init__()
self.env_id = env_id
self.prediction_horizon = prediction_horizon
self.w_d = w_d
self.w_q = w_q
"""Assign self.step from the environment wrapper, default to 0 if no DeadTimeProcessor"""
ps_wrapper = env.unwrapped.physical_system
self.step = getattr(ps_wrapper, 'dead_time', 0)
print(f"DeadTimeProcessor steps: {self.step}")
"""Retrieve environment info and motor parameters"""
self.state_names = env.get_wrapper_attr('state_names')
self.physical_system = env.get_wrapper_attr('physical_system')
self.tau = self.physical_system.tau
self.limits = self.physical_system.limits
self.motor_params = self.physical_system.electrical_motor.motor_parameter
for key, value in self.motor_params.items():
setattr(self, key, value)
"""Identify indices of key states and inputs"""
self.i_sd_idx = self.state_names.index('i_sd')
self.i_sq_idx = self.state_names.index('i_sq')
self.omega_idx = self.state_names.index('omega')
self.u_sd_idx = self.state_names.index('u_sd')
self.u_lim = self.limits[self.u_sd_idx]
"""Setup coordinate transforms and precompute voltage combinations"""
self.abc_to_dq = self.physical_system.abc_to_dq_space
self.subactions = -np.power(-1, self.physical_system._converter._subactions)
self.u_abc_k1 = self.u_lim * self.subactions
"""Load motor model constants and motor-specific state names"""
self._model_constants = self.physical_system.electrical_motor._model_constants
motor_type = type(self.physical_system.electrical_motor).__name__
if motor_type in ["PermanentMagnetSynchronousMotor", "SynchronousReluctanceMotor"]:
self.motor_state_names = ["i_sd", "i_sq", "epsilon"]
else:
raise NotImplementedError(f"MPC controller not implemented for motor type: {motor_type}")
"""Initialize delay compensation variables"""
self.previous_voltage_idx = 0
self.past_references = []
self.extrapolation_order = 2
# -------- Delay Compensation Helpers ----------
"""
# Extrapolate reference for delay compensation using past references. (works for sinusodia only for e.g. alfa-beta frames)
def _extrapolate_reference(self, current_ref, n=2):
self.past_references.append(current_ref.copy())
if len(self.past_references) > n + 1:
self.past_references.pop(0)
if len(self.past_references) < n + 1:
return current_ref
ref_k = self.past_references[-1]
ref_km1 = self.past_references[-2]
ref_km2 = self.past_references[-3]
return 6 * ref_k - 8 * ref_km1 + 3 * ref_km2
"""
"""Estimate currents at next timestep using previous voltage for delay compensation"""
def _estimate_currents(self, model_constants, x, omega, voltage_idx):
v_abc = self.u_abc_k1[voltage_idx]
v_dq = np.transpose(
np.array([self.abc_to_dq(v_abc, x[-1] + 0.5 * omega * self.tau)])
)
v_d, v_q = v_dq[0], v_dq[1]
omega_Isd = omega * x[0]
omega_Isq = omega * x[1]
ext_vec = np.array([omega, x[0], x[1], float(v_d), float(v_q), omega_Isd, omega_Isq])
dx = model_constants @ ext_vec
return x + self.tau * dx
# -------- Prediction / Cost Evaluation ----------
"""Simulate all possible voltage sequences to find the one minimizing the cost"""
def _simulate_sequence(self, model_constants, x, omega, ref_i_d, ref_i_q, depth):
min_cost = float('inf')
best_sequence = []
for idx, (v_a, v_b, v_c) in enumerate(self.u_abc_k1):
v_dq = np.transpose(
np.array([self.abc_to_dq(np.array([v_a, v_b, v_c]), x[-1] + 0.5 * omega * self.tau)])
)
v_d, v_q = v_dq[0], v_dq[1]
omega_Isd = omega * x[0]
omega_Isq = omega * x[1]
ext_vec = np.array([omega, x[0], x[1], float(v_d), float(v_q), omega_Isd, omega_Isq])
dx = model_constants @ ext_vec
x_next = x + self.tau * dx
"""Compute cost based on tracking error"""
cost = self.w_d * (x_next[0] - ref_i_d) ** 2 + self.w_q * (x_next[1] - ref_i_q) ** 2
if depth == self.prediction_horizon - 1:
total_cost, sequence = cost, [idx]
else:
future_cost, future_sequence = self._simulate_sequence(
model_constants, x_next, omega, ref_i_d, ref_i_q, depth + 1
)
total_cost = cost + future_cost
sequence = [idx] + future_sequence
if total_cost < min_cost:
min_cost = total_cost
best_sequence = sequence
return min_cost, best_sequence
# -------- Control Interface ----------
"""Compute the best voltage index based on current state and reference"""
[docs]
def control(self, state, reference):
x_measured = np.array([state[self.state_names.index(n)] * self.limits[self.state_names.index(n)]
for n in self.motor_state_names])
omega = state[self.omega_idx] * self.limits[self.omega_idx]
ref_i_d = reference[0] * self.limits[self.i_sd_idx]
ref_i_q = reference[1] * self.limits[self.i_sq_idx]
if self.step == 0:
"""Without delay compensation"""
_, best_sequence = self._simulate_sequence(
self._model_constants, x_measured, omega, ref_i_d, ref_i_q, depth=0
)
else:
"""With delay compensation"""
x_est = self._estimate_currents(self._model_constants, x_measured, omega, self.previous_voltage_idx)
_, best_sequence = self._simulate_sequence(
self._model_constants, x_est, omega, ref_i_d, ref_i_q, depth=0
)
best_idx = best_sequence[0] if best_sequence else 0
self.previous_voltage_idx = best_idx
return best_idx
"""Reset delay compensation state"""
[docs]
def reset(self):
self.previous_voltage_idx = 0
self.past_references = []