$docker run -d --hostname my-rabbit \ --name some-rabbit \ -v $(pwd)/rabbit://var/lib/rabbitmq/mnesia/rabbit \ -p 5672:5672 -p 15672:15672 \ rabbitmq:3-management
27 January 2018
En este artículo vamos a desarrollar una solución muy simple para monitorizar ficheros de logs en múltiples máquinas , enviando los cambios que se van produciendo en los mismos a un componente central el cual será el encargado de su tratamiento (persistencia a base de datos, análisis de las lineas recibidas, alarmas, etc).
Para ello vamos a utilizar el mismo script en dos modos diferentes (proporcionando un argumento en su ejecución):
producer , monitoriza un fichero y cada vez que se produzca un cambio en el mismo lo notificará enviando la última línea que se haya escrito en el fichero. Este modo será ejecutado en varias máquinas de forma simultánea.
consumer, recibe los eventos de los diferentes producer’S y los va mostrando por pantalla
Para ello usaremos un gestor de mensajería RabbitMQ, el cual será el encargado de proporcionar el canal de envío y recepción así como de la persistencia de los mensajes hasta su consumo.
RabbitMQ es un software destinado a la gestión del intercambio de mensajes, en su más amplia aceptación, entre aplicaciones. Un software cliente se subscribe al mismo para recibir notificaciones de cuando otro software cliente envía un mensaje siendo el gestor el encargado de gestionar la entrega, persistencia hasta su consumo, reintentos, etc.
Para nuestro scrip vamos a crear una instancia de este broker mediante el uso de la imagen oficial Docker, pero no vamos a entrar en detalles de cómo hacer backups, seguridad etc.
Una vez que tengamos instalado Docker en alguna de nuestras máquinas ejecutaremos:
$docker run -d --hostname my-rabbit \ --name some-rabbit \ -v $(pwd)/rabbit://var/lib/rabbitmq/mnesia/rabbit \ -p 5672:5672 -p 15672:15672 \ rabbitmq:3-management
Básicamente creamos una instancia docker some-rabbit
que va a usar un subdirectorio rabbit
de la máquina donde
se está ejecutando para la persistencia de los mensajes y por último vamos a utlizar un par de puertos para poder
acceder a dicha instancia (sólo necesitamos 5672
pero si quieres acceder a la consola de RabbitMQ vía web para
poder inspeccionarlo usamos 15672
)
Lo primero que hará el script es establecer conexión con el gestor (en nuestro caso vamos a usar una conexión en localhost con las credenciales por defecto) y configurar en qué entorno publicará/recibirá los mensajes:
exchangeName="groovy-script"
queueName="grabbit"
routingKey='#'
factory = new ConnectionFactory()
factory.username='guest'
factory.password='guest'
factory.virtualHost='/'
factory.host= args[0] ?: 'localhost'
factory.port=5672
conn = factory.newConnection()
channel = conn.createChannel()
channel.exchangeDeclare(exchangeName, "direct", true)
channel.queueDeclare(queueName, true, false, false, null)
channel.queueBind(queueName, exchangeName, routingKey)
Por simplificar configuramos tanto el exchange, el routing y la cola en el mismo sitio sin importar si es consumer o producer
RabbitMQ nos permite crear los canales de múltiples formas (con o sin persistencia, subscripción o directo, etc) Nosotros vamos a usar una configuración típica donde los mensajes se guarden para asegurar su entrega.
En el modo producer el script utilizará la librería de Apache VFS2 para monitorizar un fichero de tal forma que cada vez que se modifique este fichero recibiremos una llamada en nuestro código (`void fileChanged(FileChangeEvent evt) throws Exception `)
monitor = args[2]
if (new File(monitor).exists() == false)
new File(monitor).newPrintWriter().println "${new Date()}"
FileSystemManager manager = VFS.getManager();
FileObject fobj = manager.resolveFile(new File(monitor).absolutePath)
DefaultFileMonitor fm = new DefaultFileMonitor(this as FileListener)
fm.delay = 500
fm.addFile(fobj)
fm.start()
Cada vez que recibimos el evento de que el fichero ha sido modificado, lo leeremos desde el final hacia atrás buscando el último retorno de carro y de esta forma obtener la última línea del fichero. Obviamente no es una solución robusta para sistemas donde el cambio en el fichero puedan ser de varias líneas, en nuestro caso es un simple ejemplo sujeto de ser modificado a situaciones más robustas
Por otra parte, la librería de RabbitMQ nos permite una gran granularidad en el envío del mensaje. En nuestro caso vamos a usar la forma más simple en la cual simplemente indicamos dónde queremos publicar el mensaje y lo enviamos como una simple cadena de texto (como bytes)
void fileChanged(FileChangeEvent evt) throws Exception {
RandomAccessFile randomAccessFile = new RandomAccessFile(evt.file.localFile, "r"); //(1)
long fileLength = evt.file.localFile.length() - 1;
randomAccessFile.seek(fileLength);
StringBuilder stringBuilder = new StringBuilder()
for(long pointer = fileLength; pointer >= 0; pointer--){
randomAccessFile.seek(pointer)
char c = (char)randomAccessFile.read()
if(c == '\n' && fileLength == pointer){
continue
}
if(c == '\n' && fileLength != pointer){
break
}
stringBuilder.append(c)
}
println "Soy el producer enviando ${stringBuilder.toString().reverse()}"
channel.basicPublish(exchangeName, routingKey, null, stringBuilder.toString().reverse().bytes) //(2)
}
Usamos un RandomAccess que nos permite movernos por el fichero en lugar de leerlo secuencial
Publicamos en el sistema la linea leída
La parte "consumer" del script simplemente va a conectarse al gestor de mensajes el cual le avisará cada vez que haya un mensaje que cumpla las condiciones de exchange/routing indicadas. En nuestro caso el script simplemente lo traceará en la consola
boolean autoAck = true;
channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException{
println "Soy el consumer recibiendo ${new String(body)}"
}
});
En este screenshot puedes ver un caso de ejecución. En la ventana inferior se encuentra corriendo el consumer a la espera de eventos de RabbitMQ, en la superior se encuentra el producer esperando eventos de VFS y en la superior derecha se realiza un cambio en el fichero que se está monitorizando (volcando la fecha de ese momento).
El cambio en el fichero es detectado por el producer y propagado hasta el consumer. En este ejemplo todo se ejecuta en el mismo equipo pero como hemos dicho su interés radica en que estos componentes pueden estar ejecutándose en otras máquinas e incluso redes
Para comprobar que la solución puede ser ejecutada en un entorno distribuido vamos a crear una "mini-red" docker con varios contenedores:
rabbitmq-container, será el contenedor donde estará corriendo el servidor de colas RabbitMQ
consumer, será el contenedor donde estará corriendo el script en modo consumer (mostrará por pantalla los eventos que reciba)
producer1, será un contenedor observando un fichero
producer2, será un contenedor observando otro fichero diferente
Para ello vamos a crear un fichero docker-compose.yml donde vamos a configurar todos estos containers:
version: '2'
services:
#(1)
rabbitmq-container:
image: rabbitmq:3-management
hostname: my-rabbit
volumes:
- ./rabbit:/var/lib/rabbitmq/mnesia/rabbit
ports:
- 5672:5672
- 15672:15672
#(2)
consumer:
image: groovy:2.4-jdk8
volumes:
- ./:/home/groovy/
command: groovy /home/groovy/grabbit.groovy rabbitmq-container consumer #(3)
#(4)
producer1:
image: groovy:2.4-jdk8
volumes:
- ./:/home/groovy/
command: groovy /home/groovy/grabbit.groovy rabbitmq-container producer /home/groovy/grabbit.log #(5)
#(4)
producer2:
image: groovy:2.4-jdk8
volumes:
- ./:/home/groovy/
command: groovy /home/groovy/grabbit.groovy rabbitmq-container producer /home/groovy/grabbit2.log #(6)
rabbitmq-container es el "hostname" por el que el resto de containers lo pueden encontrar en su network
consumer es el nombre del container donde corre el script en modo lectura de la cola
indicamos el script a ejecutar así como el nombre del host a conectarse y el modo
tenemos dos producers (producer1 y producer2)
producer1 observará un fichero en lo que en su directorio de trabajo
producer2 observará un fichero diferente en lo que en su directorio de trabajo
Una vez definidos nuestros componentes levantamos el entorno:
$ docker-compose up
y actualizamos cualquiera de los dos ficheros que están siendo observados. En la consola de docker iremos viendo indentificados cada container en las trazas que generan
@Grapes([
@Grab(group='com.rabbitmq', module='amqp-client', version='3.1.2'),
@Grab(group='org.apache.commons', module='commons-vfs2', version='2.2')
])
import com.rabbitmq.client.*
import groovy.json.*
import org.apache.commons.vfs2.FileChangeEvent
import org.apache.commons.vfs2.FileListener
import org.apache.commons.vfs2.FileObject
import org.apache.commons.vfs2.FileSystemManager
import org.apache.commons.vfs2.VFS
import org.apache.commons.vfs2.impl.DefaultFileMonitor
//tag::conexion[]
exchangeName="groovy-script"
queueName="grabbit"
routingKey='#'
factory = new ConnectionFactory()
factory.username='guest'
factory.password='guest'
factory.virtualHost='/'
factory.host= args[0] ?: 'localhost'
factory.port=5672
conn = factory.newConnection()
channel = conn.createChannel()
channel.exchangeDeclare(exchangeName, "direct", true)
channel.queueDeclare(queueName, true, false, false, null)
channel.queueBind(queueName, exchangeName, routingKey)
//end::conexion[]
if( args[1] == 'producer' ) {
//tag::producer[]
monitor = args[2]
if (new File(monitor).exists() == false)
new File(monitor).newPrintWriter().println "${new Date()}"
FileSystemManager manager = VFS.getManager();
FileObject fobj = manager.resolveFile(new File(monitor).absolutePath)
DefaultFileMonitor fm = new DefaultFileMonitor(this as FileListener)
fm.delay = 500
fm.addFile(fobj)
fm.start()
//end::producer[]
}
if( args[1] == 'consumer' ) {
//tag::consumer[]
boolean autoAck = true;
channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException{
println "Soy el consumer recibiendo ${new String(body)}"
}
});
//end::consumer[]
}
//tag::filechanged[]
void fileChanged(FileChangeEvent evt) throws Exception {
RandomAccessFile randomAccessFile = new RandomAccessFile(evt.file.localFile, "r"); //(1)
long fileLength = evt.file.localFile.length() - 1;
randomAccessFile.seek(fileLength);
StringBuilder stringBuilder = new StringBuilder()
for(long pointer = fileLength; pointer >= 0; pointer--){
randomAccessFile.seek(pointer)
char c = (char)randomAccessFile.read()
if(c == '\n' && fileLength == pointer){
continue
}
if(c == '\n' && fileLength != pointer){
break
}
stringBuilder.append(c)
}
println "Soy el producer enviando ${stringBuilder.toString().reverse()}"
channel.basicPublish(exchangeName, routingKey, null, stringBuilder.toString().reverse().bytes) //(2)
}
//end::filechanged[]
void fileCreated(FileChangeEvent arg0) throws Exception {
}
void fileDeleted(FileChangeEvent arg0) throws Exception {
}