Translate

miércoles, 23 de julio de 2014

Parte 4 - Descripción técnica del servicio


Indice
Parte 3 - Entorno de desarrollo utilizado

 

Código del ejemplo


https://github.com/davsuapas/QueueService



Descripción técnica del servicio



El servicio de procesos de negocios de alto rendimiento se ha construido siguiendo un modelo de capas. De tal forma que las interacciones entre las diferentes capas se hace siempre pasando por las capas anteriores. Se ha seguido una arquitectura top-down, pero existe la posibilidad de comunicar las capas de nivel inferior con las de nivel superior a través de eventos. Como podemos ver en la figura de abajo el sistema se divide en cinco capas principales que se describen a continuación.


 
  Ilustración 1. Capas del servicio de procesos de alto rendimiento


Descripción de cada capa




Capa 1. Manejador de la información de configuración (clase: SubcriptionConfigurationSectionHandler)

Esta capa se encarga de recopilar la información creada en xml y convertirla a entidades (clases) que posteriormente se distribuirán por todas las capas que las necesiten. Esta clase debe heredar de IConfigurationSectionHandler y a través del método Create() el sistema inyecta el xml definido por el usuario en el fichero de configuración. A continuación se describen los parámetros que puede configurar el usuario:
  • Name: Nombre de la conexión asignado por el usuario.
  • Uri: Uri del servidor de colas RabbitMQ.
  • Por cada canal de la conexión:
- Name: Nombre del canal.
- Durable: Indica si la cola se persiste incluso si el servidor es pagado.
- Exchange: Tipo de acceso a la cola de RabbitMQ.
- FaultRetry: Intentos de la apertura de una canal antes de reiniciar la conexión por fallo del sistema.
- QueueName: Nombre de la cola de RabbitMQ.
- RoutingKey: Identificador de acceso a la cola.
- WorkerhreadMax: Número máximo de hilos de ejecución de procesos.
- InSecondProcessMaxTimeWarning: Tiempo máximo que podrá usar un proceso antes de que el   sistema informe al usuario por superar el tiempo prefijado.
- Process: Proceso que ejecuta el servicio cuando llega un menaje por este canal a través de la cola configurada.

Capa 2. Manejador del ciclo de vida del servicio (clase: SubcriptionWorker)

Esta capa funciona como un interface con el host que aloja el servicio y se encarga de controlar el ciclo de vida del servicio. También se encarga de inyectar la configuración personalizada por el usuario a través del fichero configuración y distribuirla a las capas inferiores. El sistema contiene los siguientes métodos:
  • Start: Inicia el servicio de colas. Se comunica con la capa de conexión y le informa del inicio del servicio.
  • Stop: Para el servicio de colas. Se comunica con la capa de conexión y le informa de la finalización del servicio. En el apartado que explica el "CIERRE ORDENADO DEL SERVICIO" se describirá con más detalle.

Capa 3. Pool de conexiones (clase: ConnectionManagerPool y ConnectionManager)

Esta capa gestiona todas las conexiones posibles configuradas por el usuario. Existirá un array por cada conexión configurada por el usuario.

Antes de continuar haremos un paréntesis y explicaremos la clase que hereda de ConnecionManagerPool. Esta clase centraliza toda la gestión de hilos del sistema y se llama ThreadInterface. Esta clase mantiene dos variables protegidas que le permiten controlar el ciclo de vida del hilo. Los estados son Running y Stopped. Siempre que el sistema recibe la señal de parar un hilo a través del método Stop(), el gestor de hilos envía una señal a través de un monitor para sacar del estado dormido al hilo, (en caso de que estuviera en este estado), y marca al hilo como Running = false. El gestor de hilos permite controlar errores asociados a él, y como comentamos anteriormente el gestor de hilos permite dormir y despertar un determinado hilo, a través de monitores que se gestionan con los métodos del sistema SetSignal() y WaitSignal(). Por último RunProcess() es el método abstracto de ejecución del proceso en el hilo abierto por el sistema. En definitiva automatiza todas las tareas rutinarias que necesitamos en el servicio para la gestión de hilos.

Toda la gestión del pool de conexiones de centraliza a través de dos métodos principales:

  • RunProcess: Lanza todos los gestores de conexiones manejados por la clase ConnectionManager y espera en estado suspendido hasta posibles errores producidos en capas inferiores. El sistema tolerante a fallos será estudiado más adelante con más detalle.
  • Close: Cierra las conexiones asociadas.
  • La clase ConnectionManager se encarga de crear tanto las conexiones RabbitMQ como el pool de canales configurados por el usuario. Esta clase gestiona los posibles errores asociados a las conexiones y canales y que veremos con más detalle más adelante. También se encarga de poner en marcha los diferentes canales asociados a la conexión así como la finalización y cierre de los mismos.

Capa 4. Pool de canales (clase: ChannelManagerPool y ChannelManager)

Esta capa se encarga de gestionar todos los canales configurados por el usuario. Crea la clase (ChannelManager) para la gestión individual de cada uno de ellos e inicia los diferentes hilos por canal a través del método Run(). Gestiona los errores de los canales, así como la finalización y el cierre de los diferentes canales notificado por las capas superiores del sistema.

La principal función de la clase ChannelManager que hereda de ThreadInterface consiste en gestionar los canales físicos de RabbitMQ y gestionar los diferentes mensajes de las colas. Otra función importante del gestor del canal es en-rutar el mensaje del canal al proceso de negocio asociado al canal configurado por el usuario. El sistema para la gestión de mensajes y procesos se realiza a través de un pool de hilos mediante ranuras (slots) dinámicas. Este proceso lo veremos con más detalle en el apartado "SLOTS DE HILOS PARA PROCESOS DE NEGOCIO".

Como todas las capas, contienen mecanismos para la finalización y cierre ordenados, en este caso de canales.

Capa 5. Pool de procesos (clase: WorkerProcessPool y WorkerManager)

El pool de procesos tiene como finalidad controlar los diferentes hilos asociados a un canal. Como ya hemos comentado anteriormente esto se realiza a través de una serie de ranuras que coinciden cada una con un proceso de negocio.

Otro aspecto importante y que gestiona la clase WorkerProcess es la inyección de procesos descubiertos por el sistema en el directorio donde se encuentra hosteado el servicio. Esto posibilita el desacoplamiento total del servicio con los procesos que gestionan los mensajes. En el punto "PROCESOS DE NEGOCIO DESACOPLADOS" hablaremos de ello con más detalle.

Como todas las capas, contienen mecanismos para la finalización y cierre ordenados, en este caso de procesos.

Transversalmente a todas las capas existen diferentes módulos que permiten las buenas prácticas del sistema. Una de estos módulos es la clase que permite la gestión de trazas por parte del servicio. Esta clase se llama LogService y dispone de diferentes métodos para el informe de incidencias al usuario.

Por último el servicio dispone de un módulo para el envío de los mensajes al servidor de colas. Este módulo se gestiona mediante la clase PublicationService y existe un ejemplo de uso que se puede ver en la parte 5 (demo) de este blog. Esta clase permite enviar mensajes a través del método Publish(). El mensaje puede estar tipado y es serializado a través de la clase Serializer. La clase PublicationService permite configurar la URI del servidor de cola, así como la cola destino y el tipo de comunicación utilizado.


Slots de hilos para procesos de negocio

 

 
El sistema permite ejecutar los procesos de negocio asociados a un canal de forma paralela. Para entender cómo funciona el sistema sigamos el esquema que podemos ver más abajo.



Ilustración 2. Flujo de mensajes entre canales y el pool de procesos 

El canal en la clase ChannelManager se pone en modo espera de mensajes de la cola asociada de RabbitMQ a través del método Next() de la clase Subsription como podemos ver en la figura de más abajo. En el constructor de la clase Subcription del API de RabbitMQ se debe indicar la cola que se desea escuchar, así como un modelo creado en base a la conexión creada de RabbitMQ ( mqConnection.CreateModel()). Una vez que llega el mensaje el canal envía el mensaje al pool de procesos de negocio que es el encargado de distribuir el mensaje al slot (hilo) que se encuentre libre para ejecutar el consiguiente proceso de negocio asociado.






using (Subscription sub = new Subscription(this.model, this.entityChannel.QueueInfo.QueueName, false)) {

    while (this.IsRunning == ThreadInterface.CONST_TRUE) {
        BasicDeliverEventArgs result;
        if (sub.Next(10000, out result)) {
            this.workerProcessPool.SendMessage(result.Body);
            sub.Ack();
            this.WaitSignal(); // Espera a que se libere un slot
        }
    }
} 


Como podemos observar en la figura de abajo, en el método SendMessage() de la clase WorkerProcessPool se busca un slot libre mirando la variable protegida IsMessageNull. En caso de existir más slot libres, mediante activeChannel activamos el canal modificando el estado del hilo del canal pasando de suspendido a trabajando, (mirar this.WaitSignal() de la figura de más arriba). Esto hace que el canal de nuevo se ponga en modo escucha de mensajes.

using (Subscription sub = new Subscription(this.model, this.entityChannel.QueueInfo.QueueName, false)) {

internal void SendMessage(byte[] message) {
    int countFreeSlot = 0;

    for (int i = 0; i < this.workerProcessManagers.Count; i++) {

        var workerProcessManager = workerProcessManagers[i];

        if (workerProcessManager.IsMessageNull) {

            if (++countFreeSlot == 1) { // Solo asigno el mensaje al primero.
                LogService.WriteInfo("Send message to the free process with number: {0}, Channel: {1}", i, this.channelId);
                workerProcessManager.Message = message;
            }
        }
    }

    LogService.WriteInfo("Free slots: {0} for channel: {1}", countFreeSlot, this.channelId);

    if (countFreeSlot > 1) { // Si hay slot libre activo el canal para cojer más mensajes
        activateChannel();
    }
}
 


Al trabajar de esta forma conseguimos dos efectos positivos:
  1. Aislamiento total entre procesos. Incluso trabajando de forma concurrente.
  2. Conseguimos no sobrecargar el servidor de colas RabbitMQ. Esto es posible porque mientras todos los slots estén ocupados el canal no solicita más mensajes y el canal permanece suspendido. Según se van liberando slots el canal es avisado y el sistema solicita más mensajes.


Procesos de negocio desacoplados

 


Uno de los objetivos principales del proyecto es conseguir la independencia total del servicio de colas y los procesos de negocio asociados. Esto significa que los procesos de negocio no pueden ser referenciados por el sistema mediante una referencia directa al ensamblado.

Para conseguir la independencia total usamos el concepto de contenedor de dependencias y extensibilidad mediante Managed Extensibility Framework (MEF). Con MEF conseguimos descubrir todos los procesos asociados a los canales. Recordemos que para indicar que proceso se asocia a cada canal utilizamos el fichero de configuración. Para ver un ejemplo de uso dirigirse a la parte 5 (demo) de este blog.
Para indicar a MEF que escuche un determinado directorio donde se encuentran los procesos de negocio se realiza a través del siguiente comando:


var catalog = new AggregateCatalog(new DirectoryCatalog(Path.Combine(Path.GetDirectoryName(System.Reflection.Assembly.GetAssembly(typeof(Program)).Location), "BusinessProcess")));


En este caso estamos indicando a MEF que escuche el directorio “BusinessProcess”. De esta forma siempre que queramos añadir procesos de negocio ubicaremos el ensamblado con el proceso de negocio pertinente en este directorio, mediante una simple copia de fichero.

El siguiente paso será obtener el proceso de negocio asociado al canal. Para ellos utilizamos el siguiente comando:


this.process = MefService.Container.GetExportedValueOrDefault(channelEntity.Process);

channelEntity.Process es el nombre del proceso de negocio que deseo cargar y es un parámetro del fichero de configuración que vimos anteriormente. La forma de configurar un nombre en el propio proceso de negocio es mediante un atributo de la clase. Un ejemplo sería el siguiente:
[Export("HelloWorldProcess", typeof(IProcessExecute))]
[PartCreationPolicy(CreationPolicy.NonShared)]
public class HelloWorldProcess : ProcessExecute, IDisposable {
  ............
 
El atributo Export expone el nombre del proceso de negocio que en este caso es “HelloWorldProcess”.


Cierre ordenado del servicio 




Ilustración 10. Esquema del cierre ordenado del servicio de procesos

Uno de los estados posibles en el servicio de colas de alto rendimiento es el estado “parado”. Mediante la capa de nivel superior llamada SubcriptionWorker el host que aloja el servicio puede pararlo llamando al método Stop(). El sistema notifica a las capas inferiores que el servicio debe ser parado y cada capa está encargada de cerrar y parar sus funcionalidades. El cierre es ordenado y siempre se cierran las capas de nivel más inferior y después las de nivel superior. A continuación se describe las funcionalidades de cada capa:

  • WorkerProcessPool: Para todos los procesos de negocio y espera a que se cierren. 
  • ChannelManagerPool: Cierra todos los canales de RabbitMQ y espera a que se cierren. 
  • ConnectionManagerPool: Cierra todas las conexiones de RabbitMQ y espera a que se cierren.
Cada una de las capas siempre espera a que se cierre la anterior.


Sistema tolerante a fallos

 


La gestión de errores es un aspecto importante a la hora de abordar un servicio que debe dar cobertura 24 horas al día. El servicio está preparado para rearmarse en caso de fallo de las conexiones del servidor de colas de RabbitMQ. El sistema dispone de mecanismos para dar información de cada incidencia que se pueda producir en el servicio. El control de errores se gestiona desde las capas inferiores a las capas superiores. Cuando se produce un error en el método RunProcess() de la clase ChannelManager como se puede ver en la figura de abajo, el sistema cierra todos los procesos que hubiera ejecutándose, incrementa el contador de reintentos y lanza un evento que recogerá las capas superiores.

catch (Exception ex) {
    Util.TryExecute(() => this.workerProcessPool.CloseAndWait(false));
    this.IncrementRetry();
    new ServiceException(ex, "Error into channel: {0}. Number retry: {1}", this.channelId, this.Retry);
    this.HasError = ThreadInterface.CONST_TRUE;
    Util.TryExecute(() => this.OnErrorEventHandler(ex));
} 
La capa superior activará el hilo de la capa superior ConnectionManagerPool pasando el estado dormido a ejecutándose y realizará dos procesos como se puede ver en la figura de más abajo.

  • ObserverChannelErrors(): Recrea todos los canales que contuvieran errores, teniendo en cuenta siempre el número de reintentos. Esto significa que intenta abrir de nuevo los canales de RabbitMQ. Si se supera el número de reintentos, el canal se marca como erróneo. 
  • ObserverConnectionErrors(): Si todos los canales están rotos el sistema intenta reabrir las conexiones de RabbitMQ hasta que le sea posible. 

while (this.IsRunning == ThreadInterface.CONST_TRUE) {
    if (this.WaitSignal()) {

        foreach (var managerConnection in managerConnections) {
            managerConnection.ObserverChannelErrors();
            managerConnection.ObserverConnectionErrors();
        }
    }
}
  
Un ejemplo donde intervendría el sistema tolerante a fallos sería cuando el servidor de colas estuviera caído. Una vez que el servidor de colas se pusiera de nuevo en marcha el servicio a través del sistema tolerante a fallos recuperaría el sistema

Parte 5 - Caso de uso 

No hay comentarios:

Publicar un comentario