Sunday, July 12, 2009

SC Engine: Part 6 - Messaging Middleware

- Introduction
- Part One: System Overview
- Part Two: System Overview: Messages and Applications
- Part Three: Screen Scraping
- Part Four: YouTube Parsing
- Part Five: Linking the Video to the Game
- Part Six: Messaging Middleware
- Part Seven: The Console
- Part Eight: The Site

In the previous posts, I've talked about the various applications that deal with the domain logic, and how they communicate through message passing. They are designed to run on top of a messaging bus. This post will talk about the implementation of the message bus software on top of RabbitMQ using amqplib.

First, the ApplicationmessageBus....

Application Message Bus

One ApplicationMessageBus object is created for each "application" (such as YouTubeFetch, or ScheduleFetch). To get an idea of where this is used in everything, here's some code that could potentially be written:

import my_app

name = "my_app"
root_config, config = {}
subscriptions = my_app.make_my_app(name, root_config, config) # Run the apps builder function

bus = ApplicationMessageBus(name, subscriptions)

Once again, the ApplicationMessageBus is similar to applications in that it's not tied down to any specific messaging library. It's purpose is to receive a message, and knowing the subscriptions for the app, pass the message to the correct handler. It also takes the returned messages and properly creates a list of outgoing messages, which it returns to it's caller. This is to make it a bit easier for the caller, since it'll always expect a list.

A few other notes about this object. First, in addition to a handle method, there is an idle method. This gets called every once and awhile and allows me to write apps that, in addition to having handlers for messages, can also have handlers for elapsed time. This means I can write an app like this...

def make_my_app(name, root_config, config):
app = MyApp()

return {
msgs.AMessage : app.handle_a_message,
timedelta(minutes=5) : app.run_every_five_minutes,

class MyApp(object):
def handle_a_message(self, msg):

def run_every_five_minutes(self):

Also, let's take a closer look at the part that actually calls the applications callback:

def _run_callback(self, callback, msg = None):
log_msg = "Running callback on %s for message %s" % (callback, msg) log_msg )

root_logger = logging.getLogger()
logging_handler = CollectionLogHandler()
if msg:
results = callback(msg)
results = callback()

except Exception as e:
exceptionTraceback = sys.exc_info()[2]
tb = traceback.extract_tb(exceptionTraceback)
log = logging_handler.log

results = AppErrorOccurredMessage(self.app_name, msg, unicode(e), tb, log)

log_msg = "Results of callback %s: %s" % (callback, results) log_msg )

return results

First off, before the application's callback, it adds a handler to the root logger. This will catch all log entries that happen inside the application's handler when it's run. Next, the results are called in a try block. If the application doesn't throw an error, the function just returns the results after cleaning up the log interceptor. If it does throw an error, all the logs that were being intercepted, along with the error and traceback information are collected and placed into an AppErrorOccurredMessage. This message is just like any other message, and as such can be returned and expected to be sent out over the wire.

In my closing words about the ApplicationMessageBus, you'll notice that it can also respond to HeartbeatMessage messages. I'm still debating how exactly I want these to work, as I've discovered some problems with the way that I've currently implemented them. Basically, the idea is that I should be able to send out hearbeats, and the applications should send responses, so that I can get an idea of the status of applications. The problem is with applications that are dutifully reacting to messages, but is a bit backlogged, who won't respond to a heartbeat for awhile because the heartbeat message is pretty far down in it's fifo queue. In the mean time, I'll think that the application is somehow down. The original goal of the heartbeat was to determine what applications are up RIGHT NOW, so this doesn't really work too well. However, it may prove useful for determining loads on an application. Anyway, like I said, it's still a work in progress.

Speaking of works in progress, I've arrived at the fun part, the MainMessageBus...

Main Message Bus

This is an object that is potential main-method material. Anyway, here's how you might use it...

import app1
import app2

transport = ...
main = MainMessageBus('main', transport)

root_config = ...
app1_config = ...
app2_config = ...

# Build app1
subscriptions = app1.make_app1('app1', root_config, app1_config)
amb = ApplicationMessageBus('app1', subscriptions)
main.add_app('app1', amb, subscriptions)

# Build app2
subscriptions = app1.make_app2('app2', root_config, app2_config)
amb = ApplicationMessageBus('app2', subscriptions)
main.add_app('app2', amb, subscriptions)


ITransport looks like this...

The interesting part is the retrieve method, which doesn't just return a message, but instead returns a context manager for the message. This is so that we can easily write code to handle the message, and correctly set up a way for the transport to finish anything it needs to finish once we're sure that the message handling is done. The is shown in the MainMessageBus...

with self.transport.retrieve(name) as msg:
if not msg:

data = (msg.__class__.__name__, name, pformat(msg.to_dict(), indent=5))
log_msg = 'Handling %s in %s\n%s' % data

results = app_bus.handle(msg)
handled_count += 1
outgoing_messages = results

map(self.send, outgoing_messages)

If for any reason the system were to go down or an exception were to occur during this context, the context manager would not exit in a way to tell the transport to mark the message as done. This is important to make sure that the message will still be there when the system starts up again to be rerun. You'll see how this works for the AMQPTransport...


First off, you may notice that I'm constantly opening and closing channels. This is because I've experienced problems with sending messages that had no exchanges set up for them. An exchange is set up when the transport is told to set it up, and this happens when the main message bus sees that an application has subscribed to it. It is possible, however, that an application sends out a message that no one has subscribed to (I just haven't written the application to deal with that message yet). Because of the asynchronous nature of amqp, unless I closed the channel, I might not get the error message until the next time I tried to use the channel. So, instead I've decided to be using a fresh channel in each method. The closing of the channel will force the error message from amqp to be raises in the same method as the code that caused the problem (in my "sending a message that no one subscribed to" problem, this would happen in "send") where I can then trap and handle it.

You see in the retrieve method the context managers being created. What's nice about this ITransport is that it came about as the refactoring point when I wanted to switch from my contained-in-memory prototype transport to AMQP. I still have the "LocalTransport" hanging around:

Local Transport

No comments:

Post a Comment