Consumir mensaje en RabbitMQ con Golang (parte 2)

Ayer publiqué la parte 1 de esta serie de dos post: Publicar y consumir RabbitMQ con golang. Si queréis más información sobre la parte de publicar y un pequeño punto de vista de RabbitMQ, visitar la parte 1.

Repositorio

Si os sirve de ayuda el código en el repositorio una estrellita no estaría de más

Enlace: Github repo (Golang)

Consumir mensajes RabbitMQ con GO

Para poder consumir, tenemos una parte donde generaremos unos msg de prueba para después de su publicación, podamos consumirlos.

Por otro lado, la estructura del proyecto es similar al de la publicación, pero con nombres diferentes, no encontraréis nada nuevo aquí, si habéis pasado ya por la parte 1

Las dependencias

module apascualco.com/rabbitmq-publisher-example

go 1.15

require github.com/streadway/amqp v1.0.0

Nuestro fichero main

Hasta la parte de generación de msg es prácticamente el proyecto de publicación de msg ya que necesitamos generar un feed de datos para poder consumir.

La gracia viene cuando definimos un QoS (Quality of service), configurando la forma de consumir la cola.

Después de la configuraron de Qos, construimos el consumer para la cola donde hemos generado los msg y consumimos tas los msg que existen en la cola

package main

import (
	"bytes"
	"log"
	"strconv"
	"time"

	"github.com/streadway/amqp"
)

const QUEUE = "example_task_queue"

func connectRabbitMQ() *amqp.Connection {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/apascualco")
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %s", err)
	}
	return conn
}

func openChannel(conn *amqp.Connection) *amqp.Channel {
	c, err := conn.Channel()
	if err != nil {
		log.Fatalf("Failed to open a channel: %s", err)
	}
	return c
}

func declareQueue(channel *amqp.Channel) amqp.Queue {
	q, err := channel.QueueDeclare(QUEUE, true, false, false, false, nil)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %s", err)
	}
	return q
}

func publish(text string, channel *amqp.Channel, queue amqp.Queue) {
	err := channel.Publish(
		"",
		queue.Name,
		false,
		false,
		amqp.Publishing{
			DeliveryMode: amqp.Persistent,
			ContentType:  "text/plain",
			Body:         []byte(text),
		})
	if err != nil {
		log.Fatalf("Failed to publish a message: %s", err)
	}
}

func generateMessages(count int, channel *amqp.Channel, queue amqp.Queue) {
	n := 0
	for n < count {
		n++
		msg := "apascualco count (" + strconv.Itoa(n) + "): " + time.Now().String()
		log.Printf("Publishing: %s", msg)
		publish(msg, channel, queue)
		time.Sleep(2 * time.Second)
	}
}

func qualityOfService(channel *amqp.Channel) {
	err := channel.Qos(1, 0, false)
	if err != nil {
		log.Fatalf("Failed to configure channel with quality of services: %s", err)
	}
}

func consumer(channel *amqp.Channel, queue amqp.Queue) <-chan amqp.Delivery {
	consumer, err := channel.Consume(queue.Name, "", false, false, false, false, nil)
	if err != nil {
		log.Fatalf("Failed to configure consumer")
	}
	return consumer
}

func main() {
	conn := connectRabbitMQ()
	defer conn.Close()

	channel := openChannel(conn)
	defer channel.Close()

	queue := declareQueue(channel)

	log.Println("Generating test messages")
	generateMessages(10, channel, queue)

	qualityOfService(channel)

	log.Println("Consuming all messages")
	chanDelivery := consumer(channel, queue)
	for d := range chanDelivery {
		log.Printf("Consuming: %s", d.Body)
		dotCount := bytes.Count(d.Body, []byte("."))
		t := time.Duration(dotCount)
		time.Sleep(t * time.Second)
		d.Ack(false)
	}
}

Para facilitar la comprobación, he puesto un sleep, para retrasar tanto la publicación como el consumo. De esta manera podréis ver como se insertan y se consumen en el “admin” de RabbitMQ.

En cualquier caso, podéis subir el sleep para verlo con más detalle

How to use it

Para comprobar el funcionamiento, primero tenemos que tener un RabbitMQ, corriendo y acceso a el (para validar de forma más cómoda que se publica el msg). 

Para ello, he añadido en el repo un Docker compone que levantara un rabbit y su herramienta de administración. Para lanzarlo tan solo tenéis que lanzar el comando

docker-composer up -d

Esto, nos levantará on Docker con el rabbit, preparado para ser utilizado. Podéis comprobarlo de la siguiente manera:

Si no os funciona el ejemplo por lo menos tenéis un RabbitMQ en local :).

Una vez que tenemos la infra necesaria, construimos el ejecutable lanzando y luego lo ejecutas:

go build cmd/publish_example.go

./publish_example

Esto provocará que si entramos en nuestro RabbitMQ, seguramente en la dirección: http://localhost:15672 (guest/guest).

Cuando lancéis la aplicación, los msg que publiquéis en la cola de ejemplo serán consumidos para parar deberéis usar “ctrl+c”

CategoriesGO

Deja una respuesta

A %d blogueros les gusta esto: