Fullmenu null

 

27 January 2018

En este artículo vamos a desarrollar una solución muy simple para monitorizar ficheros de logs en múltiples máquinas , enviando los cambios que se van produciendo en los mismos a un componente central el cual será el encargado de su tratamiento (persistencia a base de datos, análisis de las lineas recibidas, alarmas, etc).

Para ello vamos a utilizar el mismo script en dos modos diferentes (proporcionando un argumento en su ejecución):

  • producer , monitoriza un fichero y cada vez que se produzca un cambio en el mismo lo notificará enviando la última línea que se haya escrito en el fichero. Este modo será ejecutado en varias máquinas de forma simultánea.

  • consumer, recibe los eventos de los diferentes producer’S y los va mostrando por pantalla

Para ello usaremos un gestor de mensajería RabbitMQ, el cual será el encargado de proporcionar el canal de envío y recepción así como de la persistencia de los mensajes hasta su consumo.

RabbitMQ

RabbitMQ es un software destinado a la gestión del intercambio de mensajes, en su más amplia aceptación, entre aplicaciones. Un software cliente se subscribe al mismo para recibir notificaciones de cuando otro software cliente envía un mensaje siendo el gestor el encargado de gestionar la entrega, persistencia hasta su consumo, reintentos, etc.

Para nuestro scrip vamos a crear una instancia de este broker mediante el uso de la imagen oficial Docker, pero no vamos a entrar en detalles de cómo hacer backups, seguridad etc.

Una vez que tengamos instalado Docker en alguna de nuestras máquinas ejecutaremos:

$docker run -d --hostname my-rabbit \
    --name some-rabbit \
    -v $(pwd)/rabbit://var/lib/rabbitmq/mnesia/rabbit \
    -p 5672:5672 -p 15672:15672 \
    rabbitmq:3-management

Básicamente creamos una instancia docker some-rabbit que va a usar un subdirectorio rabbit de la máquina donde se está ejecutando para la persistencia de los mensajes y por último vamos a utlizar un par de puertos para poder acceder a dicha instancia (sólo necesitamos 5672 pero si quieres acceder a la consola de RabbitMQ vía web para poder inspeccionarlo usamos 15672)

Conexion

Lo primero que hará el script es establecer conexión con el gestor (en nuestro caso vamos a usar una conexión en localhost con las credenciales por defecto) y configurar en qué entorno publicará/recibirá los mensajes:

exchangeName="groovy-script"
queueName="grabbit"
routingKey='#'

factory = new ConnectionFactory()
	factory.username='guest'
	factory.password='guest'
	factory.virtualHost='/'
	factory.host= args[0] ?: 'localhost'
	factory.port=5672
conn = factory.newConnection()

channel = conn.createChannel()
channel.exchangeDeclare(exchangeName, "direct", true)
channel.queueDeclare(queueName, true, false, false, null)
channel.queueBind(queueName, exchangeName, routingKey)

Por simplificar configuramos tanto el exchange, el routing y la cola en el mismo sitio sin importar si es consumer o producer

RabbitMQ nos permite crear los canales de múltiples formas (con o sin persistencia, subscripción o directo, etc) Nosotros vamos a usar una configuración típica donde los mensajes se guarden para asegurar su entrega.

Monitorizar fichero (producer)

En el modo producer el script utilizará la librería de Apache VFS2 para monitorizar un fichero de tal forma que cada vez que se modifique este fichero recibiremos una llamada en nuestro código (`void fileChanged(FileChangeEvent evt) throws Exception `)

	monitor = args[2]

	if (new File(monitor).exists() == false)
		new File(monitor).newPrintWriter().println "${new Date()}"

	FileSystemManager manager = VFS.getManager();
	FileObject fobj = manager.resolveFile(new File(monitor).absolutePath)
	DefaultFileMonitor fm = new DefaultFileMonitor(this as FileListener)
	fm.delay = 500
	fm.addFile(fobj)
	fm.start()

Cada vez que recibimos el evento de que el fichero ha sido modificado, lo leeremos desde el final hacia atrás buscando el último retorno de carro y de esta forma obtener la última línea del fichero. Obviamente no es una solución robusta para sistemas donde el cambio en el fichero puedan ser de varias líneas, en nuestro caso es un simple ejemplo sujeto de ser modificado a situaciones más robustas

Por otra parte, la librería de RabbitMQ nos permite una gran granularidad en el envío del mensaje. En nuestro caso vamos a usar la forma más simple en la cual simplemente indicamos dónde queremos publicar el mensaje y lo enviamos como una simple cadena de texto (como bytes)

void fileChanged(FileChangeEvent evt) throws Exception {

	RandomAccessFile randomAccessFile = new RandomAccessFile(evt.file.localFile, "r");	//(1)
	long fileLength = evt.file.localFile.length() - 1;
	randomAccessFile.seek(fileLength);
	StringBuilder stringBuilder = new StringBuilder()

	for(long pointer = fileLength; pointer >= 0; pointer--){
		randomAccessFile.seek(pointer)
		char c = (char)randomAccessFile.read()
		if(c == '\n' && fileLength == pointer){
			continue
		}
		if(c == '\n' && fileLength != pointer){
			break
		}
		stringBuilder.append(c)
	}
	println "Soy el producer enviando ${stringBuilder.toString().reverse()}"
	channel.basicPublish(exchangeName, routingKey, null, stringBuilder.toString().reverse().bytes) //(2)
}
  1. Usamos un RandomAccess que nos permite movernos por el fichero en lugar de leerlo secuencial

  2. Publicamos en el sistema la linea leída

Recibir eventos (consumer)

La parte "consumer" del script simplemente va a conectarse al gestor de mensajes el cual le avisará cada vez que haya un mensaje que cumpla las condiciones de exchange/routing indicadas. En nuestro caso el script simplemente lo traceará en la consola

	boolean autoAck = true;
	channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException{
		println "Soy el consumer recibiendo ${new String(body)}"
         }
	});

Resultado

En este screenshot puedes ver un caso de ejecución. En la ventana inferior se encuentra corriendo el consumer a la espera de eventos de RabbitMQ, en la superior se encuentra el producer esperando eventos de VFS y en la superior derecha se realiza un cambio en el fichero que se está monitorizando (volcando la fecha de ese momento).

El cambio en el fichero es detectado por el producer y propagado hasta el consumer. En este ejemplo todo se ejecuta en el mismo equipo pero como hemos dicho su interés radica en que estos componentes pueden estar ejecutándose en otras máquinas e incluso redes

rabbit

Docker

Para comprobar que la solución puede ser ejecutada en un entorno distribuido vamos a crear una "mini-red" docker con varios contenedores:

  • rabbitmq-container, será el contenedor donde estará corriendo el servidor de colas RabbitMQ

  • consumer, será el contenedor donde estará corriendo el script en modo consumer (mostrará por pantalla los eventos que reciba)

  • producer1, será un contenedor observando un fichero

  • producer2, será un contenedor observando otro fichero diferente

Para ello vamos a crear un fichero docker-compose.yml donde vamos a configurar todos estos containers:

docker-compose.yml
version: '2'

services:

  #(1)
  rabbitmq-container:
    image: rabbitmq:3-management
    hostname: my-rabbit
    volumes:
      - ./rabbit:/var/lib/rabbitmq/mnesia/rabbit
    ports:
      - 5672:5672
      - 15672:15672

  #(2)
  consumer:
    image: groovy:2.4-jdk8
    volumes:
      - ./:/home/groovy/
    command: groovy /home/groovy/grabbit.groovy rabbitmq-container consumer #(3)

  #(4)
  producer1:
      image: groovy:2.4-jdk8
      volumes:
        - ./:/home/groovy/
      command: groovy /home/groovy/grabbit.groovy rabbitmq-container producer /home/groovy/grabbit.log #(5)

  #(4)
  producer2:
      image: groovy:2.4-jdk8
      volumes:
        - ./:/home/groovy/
      command: groovy /home/groovy/grabbit.groovy rabbitmq-container producer /home/groovy/grabbit2.log #(6)
  1. rabbitmq-container es el "hostname" por el que el resto de containers lo pueden encontrar en su network

  2. consumer es el nombre del container donde corre el script en modo lectura de la cola

  3. indicamos el script a ejecutar así como el nombre del host a conectarse y el modo

  4. tenemos dos producers (producer1 y producer2)

  5. producer1 observará un fichero en lo que en su directorio de trabajo

  6. producer2 observará un fichero diferente en lo que en su directorio de trabajo

Una vez definidos nuestros componentes levantamos el entorno:

$ docker-compose up

y actualizamos cualquiera de los dos ficheros que están siendo observados. En la consola de docker iremos viendo indentificados cada container en las trazas que generan

docker rabbit

Script
@Grapes([
    @Grab(group='com.rabbitmq', module='amqp-client', version='3.1.2'),
	@Grab(group='org.apache.commons', module='commons-vfs2', version='2.2')
])

import com.rabbitmq.client.*
import groovy.json.*

import org.apache.commons.vfs2.FileChangeEvent
import org.apache.commons.vfs2.FileListener
import org.apache.commons.vfs2.FileObject
import org.apache.commons.vfs2.FileSystemManager
import org.apache.commons.vfs2.VFS
import org.apache.commons.vfs2.impl.DefaultFileMonitor

//tag::conexion[]
exchangeName="groovy-script"
queueName="grabbit"
routingKey='#'

factory = new ConnectionFactory()
	factory.username='guest'
	factory.password='guest'
	factory.virtualHost='/'
	factory.host= args[0] ?: 'localhost'
	factory.port=5672
conn = factory.newConnection()

channel = conn.createChannel()
channel.exchangeDeclare(exchangeName, "direct", true)
channel.queueDeclare(queueName, true, false, false, null)
channel.queueBind(queueName, exchangeName, routingKey)
//end::conexion[]

if( args[1] == 'producer' ) {

	//tag::producer[]
	monitor = args[2]

	if (new File(monitor).exists() == false)
		new File(monitor).newPrintWriter().println "${new Date()}"

	FileSystemManager manager = VFS.getManager();
	FileObject fobj = manager.resolveFile(new File(monitor).absolutePath)
	DefaultFileMonitor fm = new DefaultFileMonitor(this as FileListener)
	fm.delay = 500
	fm.addFile(fobj)
	fm.start()
	//end::producer[]

}

if( args[1] == 'consumer' ) {
	//tag::consumer[]
	boolean autoAck = true;
	channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException{
		println "Soy el consumer recibiendo ${new String(body)}"
         }
	});
	//end::consumer[]
}

//tag::filechanged[]
void fileChanged(FileChangeEvent evt) throws Exception {

	RandomAccessFile randomAccessFile = new RandomAccessFile(evt.file.localFile, "r");	//(1)
	long fileLength = evt.file.localFile.length() - 1;
	randomAccessFile.seek(fileLength);
	StringBuilder stringBuilder = new StringBuilder()

	for(long pointer = fileLength; pointer >= 0; pointer--){
		randomAccessFile.seek(pointer)
		char c = (char)randomAccessFile.read()
		if(c == '\n' && fileLength == pointer){
			continue
		}
		if(c == '\n' && fileLength != pointer){
			break
		}
		stringBuilder.append(c)
	}
	println "Soy el producer enviando ${stringBuilder.toString().reverse()}"
	channel.basicPublish(exchangeName, routingKey, null, stringBuilder.toString().reverse().bytes) //(2)
}
//end::filechanged[]

void fileCreated(FileChangeEvent arg0) throws Exception {

}

void fileDeleted(FileChangeEvent arg0) throws Exception {

}