# -*- coding: utf-8 -*-
"""
Created on Wed April 03 14:27:26 2019
Description: Controller class implementations for:
https://www.frontiersin.org/articles/10.3389/fnins.2020.00166/
@author: John Fleming, john.fleming@ucdconnect.ie
"""
import math
class Constant_Controller:
"""Constant DBS Parameter Controller Class"""
def __init__(self, SetPoint=0.0, MinValue=0.0, MaxValue=1e9, ConstantValue=0.0, Ts=0.0, units='mA'):
# Initial Controller Values
self.SetPoint = SetPoint
self.MaxValue = MaxValue
self.MinValue = MinValue
self.ConstantValue = ConstantValue
self.Ts = Ts # should be in sec as per above
self.units = units
self.label = ('Constant_Controller/%f%s' % (self.ConstantValue, self.units))
# Set output value
self.OutputValue = 0
# Lists for tracking controller history
self.state_history = []
self.error_history = []
self.output_history = []
self.sample_times = []
def clear(self):
"""Clears current On-Off controller output value and history"""
self.state_history = []
self.error_history = []
self.output_history = []
self.sample_times = []
self.OutputValue = 0.0
def update(self, state_value, current_time):
"""Calculates biomarker for constant DBS value
u = self.ConstantValue
"""
# Calculate Error - if SetPoint > 0.0, then normalize error with respect to setpoint
if self.SetPoint==0.0:
error = state_value - self.SetPoint
else:
error = (state_value - self.SetPoint)/self.SetPoint
# Bound the controller output (between MinValue - MaxValue)
if self.ConstantValue > self.MaxValue:
self.OutputValue = self.MaxValue
elif self.ConstantValue < self.MinValue:
self.OutputValue = self.MinValue
else:
self.OutputValue = self.ConstantValue
# Record state, error and sample time values
self.state_history.append(state_value)
self.error_history.append(error)
self.output_history.append(self.OutputValue)
self.sample_times.append(current_time/1000) # Convert from msec to sec
return self.OutputValue
def setMaxValue(self, max_value):
"""Sets the upper bound for the controller output"""
self.MaxValue = max_value
def setMinValue(self, min_value):
"""Sets the lower bound for the controller output"""
self.MinValue = min_value
def setConstantValue(self, constant_value):
"""Sets the constant controller output"""
self.ConstantValue = ConstantValue
def setTs(self, Ts):
"""Sets the sampling rate of the controller"""
self.Ts = Ts
def setLabel(self, label):
"""Sets the label of the controller"""
self.label = label
def setSetPoint(self, set_point):
self.SetPoint = set_point
def get_state_history(self):
return self.state_history
def get_error_history(self):
return self.error_history
def get_output_history(self):
return self.output_history
def get_sample_times(self):
return self.sample_times
def get_label(self):
return self.label
class ON_OFF_Controller:
"""On-Off Controller Class"""
def __init__(self, SetPoint=0.0, MinValue=0.0, MaxValue=1e9, RampDuration=0.25, Ts=0.02):
# Initial Controller Values
self.SetPoint = SetPoint
self.MaxValue = MaxValue
self.MinValue = MinValue
self.RampDuration = RampDuration # should be defined in sec, i.e. 0.25 sec
self.Ts = Ts # should be in sec as per above
self.label = 'On_Off_Controller'
# Calculate how much controller output value will change each controller call
self.OutputValueIncrement = (self.MaxValue - self.MinValue)/math.ceil(self.RampDuration/self.Ts)
# Initialize the output value of the controller
self.LastOutputValue = 0
self.OutputValue = 0
# Lists for tracking controller history
self.state_history = []
self.error_history = []
self.output_history = []
self.sample_times = []
def clear(self):
"""Clears current On-Off controller output value and history"""
self.state_history = []
self.error_history = []
self.output_history = []
self.sample_times = []
self.LastOutputValue = 0.0
self.OutputValue = 0.0
def update(self, state_value, current_time):
"""Calculates updated controller output value for given reference feedback
y(t) = y(t-1) + u(t)
where:
u(t) = MaxValue / (RampDuration/Ts) if e(t) > SetPoint or -MaxValue / (RampDuration/Ts) if e(t) < SetPoint
"""
# Calculate Error - if SetPoint > 0.0, then normalize error with respect to setpoint
if self.SetPoint==0.0:
error = state_value - self.SetPoint
increment = 0.0
else:
error = (state_value - self.SetPoint)/self.SetPoint
if error > 0.0:
increment = self.OutputValueIncrement
else:
increment = -self.OutputValueIncrement
# Bound the controller output (between MinValue - MaxValue)
if self.LastOutputValue+increment > self.MaxValue:
self.OutputValue = self.MaxValue
elif self.LastOutputValue+increment < self.MinValue:
self.OutputValue = self.MinValue
else:
self.OutputValue = self.LastOutputValue+increment
# Record state, error and sample time values
self.state_history.append(state_value)
self.error_history.append(error)
self.output_history.append(self.OutputValue)
self.sample_times.append(current_time/1000) # Convert from msec to sec
self.LastOutputValue = self.OutputValue
return self.OutputValue
def setMaxValue(self, max_value):
"""Sets the upper bound for the controller output"""
self.MaxValue = max_value
self.OutputValueIncrement = (self.MaxValue - self.MinValue)/(self.RampDuration/self.Ts)
def setMinValue(self, min_value):
"""Sets the lower bound for the controller output"""
self.MinValue = min_value
self.OutputValueIncrement = (self.MaxValue - self.MinValue)/(self.RampDuration/self.Ts)
def setRampDuration(self, ramp_duration):
"""Sets the how long the controller output takes to reach it's max value"""
self.RampDuration = ramp_duration
self.OutputValueIncrement = (self.MaxValue - self.MinValue)/(self.RampDuration/self.Ts)
def setTs(self, Ts):
"""Sets the sampling rate of the controller"""
self.Ts = Ts
self.OutputValueIncrement = (self.MaxValue - self.MinValue)/(self.RampDuration/self.Ts)
def setLabel(self, label):
"""Sets the label of the controller"""
self.label = label
def setSetPoint(self, set_point):
self.SetPoint = set_point
def get_state_history(self):
return self.state_history
def get_error_history(self):
return self.error_history
def get_output_history(self):
return self.output_history
def get_sample_times(self):
return self.sample_times
def get_label(self):
return self.label
class Dual_Threshold_Controller:
"""Dual-Threshold Controller Class"""
def __init__(self, LowerThreshold=0.0, UpperThreshold=0.1, MinValue=0.0, MaxValue=1e9, RampDuration=0.25, Ts=0.02):
# Initial Controller Values
self.UpperThreshold = UpperThreshold
self.LowerThreshold = LowerThreshold
self.MaxValue = MaxValue
self.MinValue = MinValue
self.RampDuration = RampDuration # should be defined in sec, i.e. 0.25 sec
self.Ts = Ts # should be in sec as per above
self.label = 'Dual_Threshold_Controller'
# Calculate how much controller output value will change each controller call
self.OutputValueIncrement = (self.MaxValue - self.MinValue)/math.ceil(self.RampDuration/self.Ts)
# Initialize the output value of the controller
self.LastOutputValue = 0.0
self.OutputValue = 0.0
# Lists for tracking controller history
self.state_history = []
self.error_history = []
self.output_history = []
self.sample_times = []
def clear(self):
"""Clears current dual-threshold controller output value and history"""
self.state_history = []
self.error_history = []
self.output_history = []
self.sample_times = []
self.LastOutputValue = 0.0
self.OutputValue = 0.0
def update(self, state_value, current_time):
"""Calculates updated controller output value for given reference feedback
if state_value > upper_threshold:
y(t) = y(t-1) + u(t)
elif state_value < lower_threshold:
y(t) = y(t-1) + u(t)
else:
y(t) = y(t-1)
where:
u(t) = MaxValue / (RampDuration/Ts) if state_value(t) > UpperThreshold or -MaxValue / (RampDuration/Ts) if state_value(t) < LowerThreshold
"""
# Check how to update controller value and calculate error with respect to upper/lower threshold
if state_value > self.UpperThreshold: # Increase if above upper threshold
error = (state_value - self.UpperThreshold)/self.UpperThreshold
increment = self.OutputValueIncrement
elif state_value < self.LowerThreshold: # Decrease if below lower threshold
error = (state_value - self.LowerThreshold)/self.LowerThreshold
increment = -self.OutputValueIncrement
else: # Do nothing when within upper and lower thresholds
error = 0
increment = 0
# Bound the controller output (between MinValue - MaxValue)
if self.LastOutputValue+increment > self.MaxValue:
self.OutputValue = self.MaxValue
elif self.LastOutputValue+increment < self.MinValue:
self.OutputValue = self.MinValue
else:
self.OutputValue = self.LastOutputValue+increment
# Record state, error and sample time values
self.state_history.append(state_value)
self.error_history.append(error)
self.output_history.append(self.OutputValue)
self.sample_times.append(current_time/1000) # Convert from msec to sec
self.LastOutputValue = self.OutputValue
return self.OutputValue
def setUpperThreshold(self, upper_threshold):
"""Sets the upper threshold for the measured state"""
self.UpperThreshold = upper_threshold
def setLowerThreshold(self, lower_threshold):
"""Sets the lower threshold for the measured state"""
self.LowerThreshold = lower_threshold
def setMaxValue(self, max_value):
"""Sets the upper bound for the controller output"""
self.MaxValue = max_value
self.OutputValueIncrement = (self.MaxValue - self.MinValue)/(self.RampDuration/self.Ts)
def setMinValue(self, min_value):
"""Sets the lower bound for the controller output"""
self.MinValue = min_value
self.OutputValueIncrement = (self.MaxValue - self.MinValue)/(self.RampDuration/self.Ts)
def setRampDuration(self, ramp_duration):
"""Sets the how long the controller output takes to reach it's max value"""
self.RampDuration = ramp_duration
self.OutputValueIncrement = (self.MaxValue - self.MinValue)/(self.RampDuration/self.Ts)
def setTs(self, Ts):
"""Sets the sampling rate of the controller"""
self.Ts = Ts
self.OutputValueIncrement = (self.MaxValue - self.MinValue)/(self.RampDuration/self.Ts)
def setLabel(self, label):
"""Sets the label of the controller"""
self.label = label
def setSetPoint(self, set_point):
self.SetPoint = set_point
def get_state_history(self):
return self.state_history
def get_error_history(self):
return self.error_history
def get_output_history(self):
return self.output_history
def get_sample_times(self):
return self.sample_times
def get_label(self):
return self.label
class standard_PID_Controller:
"""Standard PID Controller Class"""
def __init__(self, SetPoint=0.0, Kp=0.0, Ti=0.0, Td=0.0, Ts=0.02, MinValue=0.0, MaxValue=1e9):
self.SetPoint = SetPoint
self.Kp = Kp
self.Ti = Ti
self.Td = Td
# Set output value bounds
self.MinValue = MinValue
self.MaxValue = MaxValue
self.label = "standard_PID_Controller/Kp=%f, Ti=%f, Td=%f" % (self.Kp, self.Ti, self.Td)
self.Ts = Ts
self.current_time = 0.0 # (sec)
self.last_time = 0.0
# Initialize controller terms
self.ITerm = 0.0
self.DTerm = 0.0
self.last_error = 0.0
self.last_OutputValue = 0.0
# Initialize the output value of the controller
self.OutputValue = 0.0
self.state_history = []
self.error_history = []
self.output_history = []
self.sample_times = []
def clear(self):
"""Clears PID computations and coefficients"""
self.ITerm = 0.0
self.DTerm = 0.0
self.last_error = 0.0
self.state_history = []
self.error_history = []
self.output_history = []
self.sample_times = []
self.OutputValue = 0.0
def update(self, state_value, current_time):
"""Calculates controller output signal for given reference feedback
where:
u(t) = K_p (e(t) + (1/T_i)* \int_{0}^{t} e(t)dt + T_d {de}/{dt})
where the error calculated is the tracking error (r(t) - y(t))
"""
# Calculate Error - if SetPoint > 0.0, then normalize error with respect to setpoint
if self.SetPoint==0.0:
error = state_value - self.SetPoint
else:
error = (state_value - self.SetPoint)/self.SetPoint
self.current_time = current_time/1000.0 # Converting from msec to sec
delta_time = self.Ts
delta_error = error - self.last_error
self.ITerm += error * delta_time
self.DTerm = 0.0
if delta_time > 0:
self.DTerm = delta_error / delta_time
# Remember last time and last error for next calculation
self.last_time = self.current_time
self.last_error = error
# Calculate u(t) - catch potential division by zero error
try:
u = self.Kp * (error + ((1.0/self.Ti) * self.ITerm) + (self.Td * self.DTerm))
except ZeroDivisionError:
u = self.Kp * (error + (0.0 * self.ITerm) + (self.Td * self.DTerm))
# Bound the controller output if necessary (between MinValue - MaxValue)
if u > self.MaxValue:
self.OutputValue = self.MaxValue
self.ITerm -= error * delta_time # Back-calculate the integral error
elif u < self.MinValue:
self.OutputValue = self.MinValue
self.ITerm -= error * delta_time # Back-calculate the integral error
else:
self.OutputValue = u
# Update the last output value
self.last_OutputValue = self.OutputValue
# Record state, error, y(t), and sample time values
self.state_history.append(state_value)
self.error_history.append(error)
self.output_history.append(self.OutputValue)
self.sample_times.append(current_time/1000) # Convert from msec to sec
# Return controller output
return self.OutputValue
def setKp(self, proportional_gain):
"""Determine how aggressively the controller reacts to the current error with setting Proportional Gain"""
self.Kp = proportional_gain
self.label = "standard_PID_Controller/Kp=%f, Ti=%f, Td=%f" % (self.Kp, self.Ti, self.Td)
def setTi(self, Ti):
"""Determine how fast the controller integrates the error history by setting Integral Time Constant"""
self.Ti = Ti
self.label = "standard_PID_Controller/Kp=%f, Ti=%f, Td=%f" % (self.Kp, self.Ti, self.Td)
def setTd(self, Td):
"""Determine far into the future the controller predicts future errors setting Derivative Time Constant"""
self.Td = Td
self.label = "standard_PID_Controller/Kp=%f, Ti=%f, Td=%f" % (self.Kp, self.Ti, self.Td)
def setSetPoint(self, set_point):
"""Set target setpoint value"""
self.SetPoint = set_point
def setMaxValue(self, max_value):
"""Sets the upper bound for the controller output"""
self.MaxValue = max_value
def setMinValue(self, min_value):
"""Sets the lower bound for the controller output"""
self.MinValue = min_value
def get_state_history(self):
return self.state_history
def get_error_history(self):
return self.error_history
def get_output_history(self):
return self.output_history
def get_sample_times(self):
return self.sample_times
def get_label(self):
return self.label