Skip to content

Controller

Initialize the Controller module.

Parameters:

Name Type Description Default
telescope Telescope instance

Telescope instance, provides the sampling time of the simulation.

required
interactionMatrix InteractionMatrixHandler instance

Contains the interaction matrices and modal basis for the simulation configuration.

required
controllerType String

The type of controller that will be used, supported types are: {leaky, forwardPI, backwardPI}.

required
reconstructionMethod String

Type of reconstructor used, supported types are: {inversion, tikhonov}.

required
**kwargs

rcond : list of length equal to nDMs or float Percentage of the maximum singular value below witch the SV are discarded. beta : list of length equal to nDMs or float Regularisation coefficient beta for the Tikhonov Regularisation: alfa = beta * (Smax**2) gain : list of length equal to nDMs or float Proportional gain of the Leaky and PI controllers decay : list of length equal to nDMs or float Decay rate for the Leaky integrator ki : list of length equal to nDMs or float Integral gain for the PI controllers

{}
Source code in SAOS/Controller.py
def __init__(self,
             telescope,
             interactionMatrix,
             controllerType,
             reconstructionMethod,                 
             logger = None,
             **kwargs):
    """
    Initialize the Controller module.

    Parameters
    ----------
    telescope : Telescope instance
        Telescope instance, provides the sampling time of the simulation.
    interactionMatrix : InteractionMatrixHandler instance
        Contains the interaction matrices and modal basis for the simulation configuration.
    controllerType : String
        The type of controller that will be used, supported types are: {leaky, forwardPI, backwardPI}. 
    reconstructionMethod : String
        Type of reconstructor used, supported types are: {inversion, tikhonov}.        
    **kwargs
        rcond : list of length equal to nDMs or float
            Percentage of the maximum singular value below witch the SV are discarded.
        beta : list of length equal to nDMs or float
            Regularisation coefficient beta for the Tikhonov Regularisation: alfa = beta * (Smax**2)
        gain : list of length equal to nDMs or float
            Proportional gain of the Leaky and PI controllers
        decay : list of length equal to nDMs or float
            Decay rate for the Leaky integrator
        ki : list of length equal to nDMs or float
            Integral gain for the PI controllers            
    """        
    # Setup the logger to handle the queue of info, warning and errors msgs in the simulator
    if logger is None:
        self.queue_listerner = self.setup_logging()
        self.logger = logging.getLogger()
        self.external_logger_flag = False
    else:
        self.external_logger_flag = True
        self.logger = logger

    # Define class attributes
    self.tag = 'controller'

    self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    self.samplingTime = telescope.samplingTime

    if reconstructionMethod in {'inversion', 'tikhonov'}:
        self.reconstructionMethod = reconstructionMethod
    else:
        self.logger.error('Controller - Unknown reconstructor.')
        raise ValueError('Unknown controller')

    # Default will change to list of size nDMs once the IM is scanned
    self.rcond = kwargs.get('rcond', 0.025)
    self.beta = kwargs.get('beta', 1e-4) # adim, adjusted through trial-error

    # Mask provided by the user to select specific WFS-DM links
    self.control_mask = kwargs.get('control_mask', None)

    # Run the initialization of the reconstructor
    self.reconstructor, self.modal_basis, self.mask, self.discarded_modes = self.initializeReconstructor(self.reconstructionMethod, interactionMatrix)                

    # Setup the controller

    if controllerType in {'leaky', 'forwardPI', 'backwardPI'}:
        self.controllerType = controllerType
    else:
        self.logger.error('Controller - Unknown controller.')
        raise ValueError('Unknown controller')

    self.gain = kwargs.get('gain', [0.0 for _ in range(len(self.reconstructor))])
    self.decay = kwargs.get('decay', [0.0 for _ in range(len(self.reconstructor))])
    self.ki = kwargs.get('ki', [0.0 for _ in range(len(self.reconstructor))])

    if not isinstance(self.gain, list):
        temp_gain = self.gain
        self.gain = [temp_gain for _ in range(len(self.reconstructor))]
    if not isinstance(self.decay, list):
        temp_decay = self.decay
        self.decay = [temp_decay for _ in range(len(self.reconstructor))]
    if not isinstance(self.ki, list):
        temp_ki = self.ki
        self.ki = [temp_ki for _ in range(len(self.reconstructor))]                        

    if len(self.gain) != len(self.reconstructor):
        raise ValueError('The gain should be a float or a a list of size nDMs.')
    if len(self.decay) != len(self.reconstructor):
        raise ValueError('The decay should be a float or a a list of size nDMs.')
    if len(self.ki) != len(self.reconstructor):
        raise ValueError('The ki should be a float or a a list of size nDMs.')                

    # Run the initialization of the controller
    self.initializeController(self.controllerType, self.reconstructor)

queue_listerner instance-attribute

queue_listerner = setup_logging()

logger instance-attribute

logger = getLogger()

external_logger_flag instance-attribute

external_logger_flag = False

tag instance-attribute

tag = 'controller'

device instance-attribute

device = device('cuda' if is_available() else 'cpu')

samplingTime instance-attribute

samplingTime = samplingTime

reconstructionMethod instance-attribute

reconstructionMethod = reconstructionMethod

rcond instance-attribute

rcond = get('rcond', 0.025)

beta instance-attribute

beta = get('beta', 0.0001)

control_mask instance-attribute

control_mask = get('control_mask', None)

controllerType instance-attribute

controllerType = controllerType

gain instance-attribute

gain = get(
    "gain", [0.0 for _ in (range(len(reconstructor)))]
)

decay instance-attribute

decay = get(
    "decay", [0.0 for _ in (range(len(reconstructor)))]
)

ki instance-attribute

ki = get('ki', [0.0 for _ in (range(len(reconstructor)))])

initializeReconstructor

initializeReconstructor(
    reconstructionMethod, interactionMatrix
)

Initialize the reconstructor matrix from the measured interaction matrices.

Parameters:

Name Type Description Default
reconstructionMethod str

Type of reconstructor ('inversion' or 'tikhonov').

required
interactionMatrix InteractionMatrixHandler

Object containing the measured interaction matrices and modal basis.

required

Returns:

Name Type Description
reconstructor list

List of reconstructor matrices per DM.

modal_basis list

List of modal basis per DM.

mask ndarray

Boolean mask indicating interactions between DMs and light paths.

discarded_modes list

List of number of discarded modes per DM.

Source code in SAOS/Controller.py
def initializeReconstructor(self, reconstructionMethod, interactionMatrix):
    """
    Initialize the reconstructor matrix from the measured interaction matrices.

    Parameters
    ----------
    reconstructionMethod : str
        Type of reconstructor ('inversion' or 'tikhonov').
    interactionMatrix : InteractionMatrixHandler
        Object containing the measured interaction matrices and modal basis.

    Returns
    -------
    reconstructor : list
        List of reconstructor matrices per DM.
    modal_basis : list
        List of modal basis per DM.
    mask : np.ndarray
        Boolean mask indicating interactions between DMs and light paths.
    discarded_modes : list
        List of number of discarded modes per DM.
    """
    self.logger.info('Controller::initializeReconstructor - Computing the reconstructor.')
    t0 = time.time()

    # Define the mask that relates the DMs with the LPs

    nDMs = len(interactionMatrix.interaction_matrix_warehouse) # IM warehouse has: nDms x nLPs

    if nDMs < 1:
        raise ValueError('Number of DMs detected are less than 1.')

    nLPs = len(interactionMatrix.interaction_matrix_warehouse[0])

    if nLPs < 1:
        raise ValueError('Number of LPs detected are less than 1.')

    mask = np.zeros((nDMs, nLPs),dtype=bool)

    # Scan for interactions: if None, then there is not interaction.

    for i in range(nDMs):
        for j in range(nLPs):
            if interactionMatrix.interaction_matrix_warehouse[i][j]['IM'] is not None:
                mask[i, j] = True

    if hasattr(self, 'control_mask') and self.control_mask is not None:
        # Check dimensions
        control_mask_arr = np.array(self.control_mask, dtype=bool)
        if control_mask_arr.shape != (nDMs, nLPs):
            self.logger.error(f'Controller - control_mask shape must be ({nDMs}, {nLPs})')
            raise ValueError(f'control_mask shape mismatch. Expected ({nDMs}, {nLPs}), got {control_mask_arr.shape}')

        # Warn if user requests control where no IM exists
        invalid_requests = control_mask_arr & (~mask)
        if np.any(invalid_requests):
            self.logger.warning('Controller - control_mask requests control for DM/LP pairs without an interaction matrix. These will be ignored.')

        mask = mask & control_mask_arr

    # Check the reconstructor parameters
    if reconstructionMethod == 'inversion':
        if isinstance(self.rcond, list):
            if len(self.rcond) != nDMs:
                raise ValueError('Rcond parameter is expected to be a list of size equal to the number of DMs.')
        else:
            # Make the list copying the values
            temp_rcond = self.rcond
            self.rcond = [temp_rcond for _ in range(nDMs)]
    elif reconstructionMethod == 'tikhonov':
        if isinstance(self.beta, list):
            if len(self.rcond) != nDMs:
                raise ValueError('Beta parameter is expected to be a list of size equal to the number of DMs.')
        else:
            # Make the list copying the values
            temp_beta = self.beta
            self.beta = [temp_beta for _ in range(nDMs)]      

    # Get modal basis
    modal_basis = []
    for i in range(nDMs):
        for j in range(nLPs):
            if interactionMatrix.interaction_matrix_warehouse[i][j]['IM'] is not None:
                # The modal basis is common for each DM
                modal_basis_type = interactionMatrix.interaction_matrix_warehouse[i][j]['modalBasis']
                modal_basis.append(torch.as_tensor(interactionMatrix.modal_basis[i][modal_basis_type], dtype=torch.float64, device=self.device))
                break
    # Get discarded modes metadata:
    discarded_modes = []
    for i in range(nDMs):
        found_discarded_modes = 0
        for j in range(nLPs):
            if interactionMatrix.interaction_matrix_warehouse[i][j]['IM'] is not None:
                found_discarded_modes = interactionMatrix.interaction_matrix_warehouse[i][j].get('discarded_modes', 0)
                break
        discarded_modes.append(found_discarded_modes)

    # Now, define the reconstruction matrices for each DM

    reconstructor = []

    for i in range(nDMs):
        interaction_matrix_per_DM = []
        for j in range(nLPs):
            if mask[i,j]:
                # Append the IMs to shape one large matrix of size nValidAct x nSignals
                interaction_matrix_per_DM.append(interactionMatrix.interaction_matrix_warehouse[i][j]['IM'])
        # Compute the reconstructor
        if len(interaction_matrix_per_DM) == 0:
            self.logger.warning(f'Controller - DM {i} has no associated WFS in the control mask. Setting reconstructor to zero.')
            nModes = modal_basis[i].shape[1]
            temp_reconstructor = torch.zeros((nModes, 0), dtype=torch.float64, device=self.device)
        else:
            interaction_matrix_per_DM = torch.as_tensor(np.vstack(interaction_matrix_per_DM), dtype=torch.float64, device=self.device).squeeze()
            if interaction_matrix_per_DM.ndim == 1:
                interaction_matrix_per_DM = interaction_matrix_per_DM.unsqueeze(0)
            if reconstructionMethod == 'inversion':
                temp_reconstructor = torch.linalg.pinv(interaction_matrix_per_DM, self.rcond[i])
            elif reconstructionMethod == 'tikhonov':
                # (D.T@D + alfa*I)@D.T --> implemented through SVD to improve the stability of the inversion and the automation of alfa
                H = interaction_matrix_per_DM
                U, S, Vh = torch.linalg.svd(H, full_matrices=False)
                alfa = self.beta[i] * torch.max(S)**2
                S_reg = S / (S**2 + alfa)
                temp_reconstructor = Vh.T @ torch.diag(S_reg) @ U.T
            else:
                self.logger.error('Controller::initializeReconstructor - Unknown reconstructor')
                raise ValueError('Unknown reconstructor method.')
        reconstructor.append(temp_reconstructor)

    self.logger.info(f'Controller::initializeReconstructor - Reconstruction took {time.time()-t0}[s]')

    return reconstructor, modal_basis, mask, discarded_modes

initializeController

initializeController(controllerType, reconstructor)

Initialize the control state (history buffers) based on the controller type.

Parameters:

Name Type Description Default
controllerType str

Type of controller ('leaky', 'forwardPI', 'backwardPI').

required
reconstructor list

List of reconstructor matrices per DM.

required

Returns:

Type Description
bool

True if initialization succeeds.

Source code in SAOS/Controller.py
def initializeController(self, controllerType, reconstructor):
    """
    Initialize the control state (history buffers) based on the controller type.

    Parameters
    ----------
    controllerType : str
        Type of controller ('leaky', 'forwardPI', 'backwardPI').
    reconstructor : list
        List of reconstructor matrices per DM.

    Returns
    -------
    bool
        True if initialization succeeds.
    """

    if controllerType == 'leaky':
        self.command_previous = [torch.zeros((reconstructor[i].shape[0],1), dtype=torch.float64, device=self.device) for i in range(len(reconstructor))]
    elif controllerType == 'forwardPI' or controllerType == 'backwardPI':
        self.command_previous = [torch.zeros((reconstructor[i].shape[0],1), dtype=torch.float64, device=self.device) for i in range(len(reconstructor))]
        self.error_previous = [torch.zeros((reconstructor[i].shape[0],1), dtype=torch.float64, device=self.device) for i in range(len(reconstructor))]
    else:
        self.logger.error('Controller::initializeController - Unknown controller')
        raise ValueError('Unknown controller.')
    return True

computeControlAction

computeControlAction(lightPaths)

Compute the control action for each DM given the wavefront error from the light paths.

Parameters:

Name Type Description Default
lightPaths list

List of LightPath objects that contain the wavefront error measurements.

required

Returns:

Name Type Description
dm_cmd list

List of command arrays to be sent to each Deformable Mirror.

Source code in SAOS/Controller.py
def computeControlAction(self, lightPaths):
    """
    Compute the control action for each DM given the wavefront error from the light paths.

    Parameters
    ----------
    lightPaths : list
        List of LightPath objects that contain the wavefront error measurements.

    Returns
    -------
    dm_cmd : list
        List of command arrays to be sent to each Deformable Mirror.
    """

    # Get the combined measurement array for each DM
    error = []
    for i in range(len(self.reconstructor)):
        combined_slopes = []

        for j in range(len(lightPaths)):
            if self.mask[i,j]:
                combined_slopes.append(lightPaths[j].get_wavefront_error())

        # Convert to torch
        if len(combined_slopes) > 0:
            error.append((-1)*torch.as_tensor(np.hstack(combined_slopes).T, dtype=torch.float64, device=self.device).unsqueeze(1)) # -1 for the feedback
        else:
            error.append(torch.zeros((0, 1), dtype=torch.float64, device=self.device))

    # Compute the DM command
    modal_error = []
    modal_cmd = []

    for i in range(len(self.reconstructor)):
        modal_error.append(self.reconstructor[i]@error[i])

        if self.controllerType == 'leaky':
            modal_cmd.append(self.gain[i]*modal_error[i] + self.decay[i] * self.command_previous[i])
        # For the PI (forward and backward), the sampling time is removed from multiplying ki, so that ki is in a closer range to 1, 
        # instead of having large ki values and small proportional gains
        elif self.controllerType == 'forwardPI':
            modal_cmd.append(self.command_previous[i] + self.gain[i] * (modal_error[i]-self.error_previous[i]) + self.ki[i]*self.error_previous[i])
        elif self.controllerType == 'backwardPI':
            modal_cmd.append(self.command_previous[i] + self.gain[i] * (modal_error[i]-self.error_previous[i]) + self.ki[i]*modal_error[i])            

    # Compute the DM command
    dm_cmd = []

    for i in range(len(self.reconstructor)):
        offset = self.discarded_modes[i]
        dm_cmd.append(self.modal_basis[i][:, offset : offset + self.reconstructor[i].shape[0]] @ modal_cmd[i])

    # Update history buffers for the next iteration

    if self.controllerType == 'leaky':
        self.command_previous = modal_cmd.copy()
    elif self.controllerType == 'forwardPI' or self.controllerType == 'backwardPI':
        self.command_previous = modal_cmd.copy()
        self.error_previous = modal_error.copy()

    return dm_cmd

setup_logging

setup_logging(logging_level=logging.WARNING)
Source code in SAOS/Controller.py
def setup_logging(self, logging_level=logging.WARNING):
    #
    #  Setup of logging at the main process using QueueHandler
    log_queue = Queue()
    queue_handler = logging.handlers.QueueHandler(log_queue)
    root_logger = logging.getLogger()
    root_logger.setLevel(logging_level)  # Minimum log level

    # Setup of the formatting
    formatter = logging.Formatter(
        "%(asctime)s - %(levelname)s - %(message)s"
    )

    # Output to terminal
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)

    # Qeue handler captures the messages from the different logs and serialize them
    queue_listener = logging.handlers.QueueListener(log_queue, console_handler)
    root_logger.addHandler(queue_handler)
    queue_listener.start()

    return queue_listener

__del__

__del__()
Source code in SAOS/Controller.py
def __del__(self):
    if not self.external_logger_flag:
        self.queue_listerner.stop()