Stephen A. Fuqua (SAF) is a Bahá'í, software developer, and conservation and interfaith advocate in the DFW area of Texas.

Node.js, Web API, and RabbitMQ. Part 4

October 13, 2014

nodeToRabbitGreen.PNG

Desiring to learn about both Node.js (particularly as an API server) and ASP.Net Web API, I decided to throw one more technology in the mix and see which one is faster at relaying messages to a service bus, namely, RabbitMQ.

Part 1: Test Runner
Part 2: Initial Node.js Code
Part 3: Web API Code
Part 4: Enhanced Node.js code
Part 5: Performance Comparison

Time to turn that Node.js test green. In Part 2, I succeeded in publishing a message to RabbitMQ using Node.js. However, my automated test failed: the .Net test runner could not handle the generated message. Three key elements were missing, which are required for the MassTransit .Net library to interpret the message correctly:

  1. A "Content-Type" header with value "application/vnd.masstransit+json" – not to be confused with the message's content type, which is still JSON.
  2. Loading the message body into a "content" string inside a "message" structure.
  3. Providing the .Net namespace and message class name in a "messageType" array

You can verify these three differences by looking at the message as received in RabbitMQ, at the end of Part 2 and of Part 3. The application/vnd.masstransit+json content header in particular can be rather tricky. Do not spell it incorrectly (e.g. underscore instead of hyphen). Do not put it in the message body. Do not put it in the a message property. It is a header. This is not part of the AMQP protocol used by Rabbit, but rather a convention used by MassTransit (among other library providers).

As far as I can tell, the application/vnd.masstransit+json media type is not formally defined anywhere, except implicitly in the MassTransit code base. While a MassTransit-generated message has more properties in the payload, I found that only those two – message and messageType – were actually required for a message to go through successfully. The message is clear enough, while messageType is used by MassTransit to route a message to a particular Consumer – that is, to a specific .Net class. Thus urn:message:RabbitMQ.WebApi:SimpleMessage will cause a message to be routed to a class named SimpleMessage in the RabbitMQ.WebApi namespace.

With this information, let's modify the Node.js code:

// Before
var publishMessage = {
    "body": message
};
exchange.publish(queue.name, publishMessage);

// After
var publishMessage = {
    "message": {
        "content": message
    },
    "messageType": [
        "urn:message:RabbitMQ.WebApi:SimpleMessage"
    ],
};
var publishOptions = {
    "headers": {
        "Content-Type": "application/vnd.masstransit+json"
    }
};
exc.publish(queue.name, publishMessage, publishOptions);

Run the test... And the test is green :-). My .Net "back end" code was able to read the service bus message, which had been generated by a service call to a Node.js service.

In part 3 I noted that I should, perhaps, look into dependency injection in the Node service, which would allow me to open a long-running channel once instead of opening and closing connections to the RabbitMQ server with every message call. I have not been able to find clear guidance on a best practice in that regard. My experience with other protocols is that the connection pooling concept is vital: opening and closing sockets requires overhead and will slow down the individual transactions. For now, I choose the long-running approach. Here is the restructured, working code. As it is not fully object oriented yet, this is an example of Property-style Dependency Injection, instead of Constructor-style.


/**
 * Module dependencies.
 */
var express = require('express');
var http = require('http');
var amqp = require('amqp');
 
 
/**
 * Web Server setup.
 */
var app = express();
 
app.set('port', process.env.PORT || 10025);
 
/**
 * Rabbit MQ connection setup
 */
var rabbitSettings = {
    mqUrl: process.env.RABBITMQ_URL || 'amqp://localhost',
    exchangeName: process.env.EXCHANGE || 'RabbitMQ.WebApi:SimpleMessage',
    queueName: process.env.QUEUE || 'apitest_webapi',
};
 
 
console.log('Opening connection to RabbitMQ');
var connection = amqp.createConnection({ url: rabbitSettings.mqUrl }, {
    reconnect: true, // Enable reconnection
    reconnectBackoffStrategy: 'linear',
    reconnectBackoffTime: 1000, // Try reconnect once a second
});
 
var exchange;
 
connection.on('ready', function () {
    var options = { type: 'fanout', durable: true, autoDelete: false }
 
    console.log('Creating/opening exchange');
    exchange = connection.exchange(rabbitSettings.exchangeName, options, function (exc) {
 
        console.log('Creating/opening queue');
        connection.queue(rabbitSettings.queueName, options, function (queue) {
 
            console.log('Binding queue to exchange');
            queue.bind(exc, queue.name);
        });
    });
});
 
 
/**
 * A RESTful GET request handler
 */
app.get('/Message/:message', function(request, response) {
 
    var message = request.params.message;
 
    var publishMessage = {
        'message': {
            'content': message
        },
        'messageType': [
            'urn:message:RabbitMQ.WebApi:SimpleMessage'
        ],
    };
 
    var publishOptions = {
        'headers': {
            'Content-Type': "application/vnd.masstransit+json"
        }
    };
 
    exchange.publish(rabbitSettings.queueName, publishMessage, publishOptions);
 
    response.writeHead(202);
    response.end();
});
 
/**
 * Start the web server
 */
http.createServer(app).listen(app.get('port'), function () {
    console.log('Express server listening on port ' + app.get('port'));
});
 

Perhaps some day I'll turn this into a proper Node.js library (if there isn't one already) for publishing messages intended for MassTransit consumers.

No TrackBacks

TrackBack URL: http://www.safnet.com/fcgi-bin/mt/mt-tb.cgi/136

Leave a comment