Using RabbitMQ with multiple consumers from one .NET executable
RabbitMQ is a popular message broker that uses AMQP protocol. Helps creating and working with queues of data messages. Queues are needed almost everywhere now: everyone needs better performance and scalability. With the help of message broker you achieve both: you can use it to transfer data asynchronously between services, so that publisher sends the data to the queue without really waiting for it to be processed, the queue is persistent (can survive service/server restarts) and it will be dequeued by consumer/subscriber of that queue. There can be 1 or more consumers, based on that there are various strategies of distributing messages between them (round-robin algorithm by default, sending next message to the next free customer in circular order). Sure you can send the SAME message to several queues, each of which will distribute them to their subscribers (using this way you can broadcast messages to large audiences; exchange is being used in RabbitMQ)
This was a brief introduction, I want to show a sample of using multi subscriber simple example in C#.
Example project on GitHub.
We assume in the samples that RabbitMQ is hosted on local machine; why not use the docker image right from the docker hub (works on Windows too):
Rabbitmq - Official Image | Docker Hub
RabbitMQ is an open source multi-protocol messaging broker.
The task is to process the messages in queue “faster”, so the main idea is that we need to subscribe to it using several consumers. Sure we can create 1 executable and just run it as many times as needed, but why not make it from single executable, which also gives us the ability to scale using some configuration for example. You can check the sample on GitHub.
The main points
- We use event based message receive model (allows to get the messages as they arrive), create a share connection, but different channels to work with the queue.
- We use “exchange” here, just to show the exchange mechanics in same sample, it’s not really needed for the task (check Worker2 project, it works with another queue, which is binded to the same exchange):
channel.ExchangeDeclare(exchange: “logs”, type: ExchangeType.Fanout);
- After that we create multiple consumers that use that newly created channels to work with the queue.
The main part is setting the QOS parameters: in case we use the default ones — all the messages will be distributed to last connected subscriber:
channel.BasicQos(0, 1, false);
Which means in the current example: take 1 message per consumer and while he is busy — continue distributing to the next free one.
That is probably it, the other things are standard ones: we create cannels with consumers in a loop, each of them receives 1 message and “works” on it. While working — distribution continues. RabbitMQ has async receive implementation too, but I was not using it in that case.
Full sample of consumption (ignore the timer in the beginning, it’s there just for another test O_O. Each consumer does some “intensive” “work” using
Thread.Sleep to emulate the processing process):
Sample of publisher that sends several messages to the queue
Sample response from
[x] Received in 9 -> 9 at 2/10/2022 22:28:20
[x] Done 4 -> 4 at 2/10/2022 22:28:29
[x] Done 2 -> 2 at 2/10/2022 22:28:29
[-] CANT PROCESS 5 consumer! Error with -> 5
[x] Done 1 -> 20 at 2/10/2022 22:28:29
[x] Done 3 -> 3 at 2/10/2022 22:28:29
[x] Received in 2 -> 10 at 2/10/2022 22:28:29
[x] Received in 3 -> 11 at 2/10/2022 22:28:29
[x] Received in 4 -> 13 at 2/10/2022 22:28:29
[x] Received in 1 -> 12 at 2/10/2022 22:28:29
[x] Received in 5 -> 5 at 2/10/2022 22:28:29
[x] Done 6 -> 6 at 2/10/2022 22:28:29
[x] Received in 6 -> 14 at 2/10/2022 22:28:29
[x] Done 7 -> 7 at 2/10/2022 22:28:29
[x] Received in 7 -> 15 at 2/10/2022 22:28:29
[x] Done 8 -> 8 at 2/10/2022 22:28:29
[x] Received in 8 -> 16 at 2/10/2022 22
Full sample project