Showing posts with label messaging. Show all posts
Showing posts with label messaging. Show all posts

Sunday, July 12, 2009

SC Engine: Part 6 - Messaging Middleware

- Introduction
- Part One: System Overview
- Part Two: System Overview: Messages and Applications
- Part Three: Screen Scraping
- Part Four: YouTube Parsing
- Part Five: Linking the Video to the Game
- Part Six: Messaging Middleware
- Part Seven: The Console
- Part Eight: The Site

In the previous posts, I've talked about the various applications that deal with the domain logic, and how they communicate through message passing. They are designed to run on top of a messaging bus. This post will talk about the implementation of the message bus software on top of RabbitMQ using amqplib.

First, the ApplicationmessageBus....

Application Message Bus

One ApplicationMessageBus object is created for each "application" (such as YouTubeFetch, or ScheduleFetch). To get an idea of where this is used in everything, here's some code that could potentially be written:


import my_app

name = "my_app"
root_config, config = {}
subscriptions = my_app.make_my_app(name, root_config, config) # Run the apps builder function

bus = ApplicationMessageBus(name, subscriptions)


Once again, the ApplicationMessageBus is similar to applications in that it's not tied down to any specific messaging library. It's purpose is to receive a message, and knowing the subscriptions for the app, pass the message to the correct handler. It also takes the returned messages and properly creates a list of outgoing messages, which it returns to it's caller. This is to make it a bit easier for the caller, since it'll always expect a list.

A few other notes about this object. First, in addition to a handle method, there is an idle method. This gets called every once and awhile and allows me to write apps that, in addition to having handlers for messages, can also have handlers for elapsed time. This means I can write an app like this...


def make_my_app(name, root_config, config):
app = MyApp()

return {
msgs.AMessage : app.handle_a_message,
timedelta(minutes=5) : app.run_every_five_minutes,
}

class MyApp(object):
def handle_a_message(self, msg):
...

def run_every_five_minutes(self):
...


Also, let's take a closer look at the part that actually calls the applications callback:


def _run_callback(self, callback, msg = None):
log_msg = "Running callback on %s for message %s" % (callback, msg)
self.logger.info( log_msg )

root_logger = logging.getLogger()
logging_handler = CollectionLogHandler()
root_logger.addHandler(logging_handler)
try:
if msg:
results = callback(msg)
else:
results = callback()

except Exception as e:
exceptionTraceback = sys.exc_info()[2]
tb = traceback.extract_tb(exceptionTraceback)
log = logging_handler.log

results = AppErrorOccurredMessage(self.app_name, msg, unicode(e), tb, log)
finally:
root_logger.removeHandler(logging_handler)

log_msg = "Results of callback %s: %s" % (callback, results)
self.logger.info( log_msg )

return results


First off, before the application's callback, it adds a handler to the root logger. This will catch all log entries that happen inside the application's handler when it's run. Next, the results are called in a try block. If the application doesn't throw an error, the function just returns the results after cleaning up the log interceptor. If it does throw an error, all the logs that were being intercepted, along with the error and traceback information are collected and placed into an AppErrorOccurredMessage. This message is just like any other message, and as such can be returned and expected to be sent out over the wire.

In my closing words about the ApplicationMessageBus, you'll notice that it can also respond to HeartbeatMessage messages. I'm still debating how exactly I want these to work, as I've discovered some problems with the way that I've currently implemented them. Basically, the idea is that I should be able to send out hearbeats, and the applications should send responses, so that I can get an idea of the status of applications. The problem is with applications that are dutifully reacting to messages, but is a bit backlogged, who won't respond to a heartbeat for awhile because the heartbeat message is pretty far down in it's fifo queue. In the mean time, I'll think that the application is somehow down. The original goal of the heartbeat was to determine what applications are up RIGHT NOW, so this doesn't really work too well. However, it may prove useful for determining loads on an application. Anyway, like I said, it's still a work in progress.

Speaking of works in progress, I've arrived at the fun part, the MainMessageBus...

Main Message Bus

This is an object that is potential main-method material. Anyway, here's how you might use it...


import app1
import app2

transport = ...
main = MainMessageBus('main', transport)

root_config = ...
app1_config = ...
app2_config = ...

# Build app1
subscriptions = app1.make_app1('app1', root_config, app1_config)
amb = ApplicationMessageBus('app1', subscriptions)
main.add_app('app1', amb, subscriptions)

# Build app2
subscriptions = app1.make_app2('app2', root_config, app2_config)
amb = ApplicationMessageBus('app2', subscriptions)
main.add_app('app2', amb, subscriptions)

try:
main.run()
finally:
main.close()



ITransport looks like this...



The interesting part is the retrieve method, which doesn't just return a message, but instead returns a context manager for the message. This is so that we can easily write code to handle the message, and correctly set up a way for the transport to finish anything it needs to finish once we're sure that the message handling is done. The is shown in the MainMessageBus...


with self.transport.retrieve(name) as msg:
if not msg:
continue

data = (msg.__class__.__name__, name, pformat(msg.to_dict(), indent=5))
log_msg = 'Handling %s in %s\n%s' % data
self.messages_logger.info(log_msg)

results = app_bus.handle(msg)
handled_count += 1
outgoing_messages = results

map(self.send, outgoing_messages)


If for any reason the system were to go down or an exception were to occur during this context, the context manager would not exit in a way to tell the transport to mark the message as done. This is important to make sure that the message will still be there when the system starts up again to be rerun. You'll see how this works for the AMQPTransport...

AMQPTransport

First off, you may notice that I'm constantly opening and closing channels. This is because I've experienced problems with sending messages that had no exchanges set up for them. An exchange is set up when the transport is told to set it up, and this happens when the main message bus sees that an application has subscribed to it. It is possible, however, that an application sends out a message that no one has subscribed to (I just haven't written the application to deal with that message yet). Because of the asynchronous nature of amqp, unless I closed the channel, I might not get the error message until the next time I tried to use the channel. So, instead I've decided to be using a fresh channel in each method. The closing of the channel will force the error message from amqp to be raises in the same method as the code that caused the problem (in my "sending a message that no one subscribed to" problem, this would happen in "send") where I can then trap and handle it.

You see in the retrieve method the context managers being created. What's nice about this ITransport is that it came about as the refactoring point when I wanted to switch from my contained-in-memory prototype transport to AMQP. I still have the "LocalTransport" hanging around:

Local Transport

Saturday, July 11, 2009

SC Engine: Part 1 - System Overview

- Introduction
- Part One: System Overview
- Part Two: System Overview: Messages and Applications
- Part Three: Screen Scraping
- Part Four: YouTube Parsing
- Part Five: Linking the Video to the Game
- Part Six: Messaging Middleware
- Part Seven: The Console
- Part Eight: The Site

This first part will give you an overview of the architecture of SC Engine.

First off, for those interested, a snapshot of the code is available online on google code. The point is to allow people to browse through it, as I have no plans of updating that repository regularly (if at all). Also, please note that this is by no means a finished product, and probably has plenty of bugs...

Google Code Repository

Although the main goal is to get youtube videos, sorting and organizing them by game means I need a database of the games. This database I found in the website of the Korean e-Sports Player Association, or KeSPA. It has data on games played, as well as player and map info. While this site is being scraped for data, YouTube is being searched, and if a video can be found that looks like it's talking about a game that we know about, a "link" is made between the two. Eventually, all data that is needed by the final web site is eventually uploaded there as the data comes rolling in.

There's a lot that can be going on, and I felt this app was a nice time to try out using a Message Bus. The idea is to, rather than writing a monolithic application where one part of the app directly makes a call to the next, you write a bunch of small apps that do their simple job and send a message out stating the results. Any application can listen for these messages and do their own computation, also sending out results. The small applications don't know about each other, only about the messages themselves.

So, using a message bus, a workflow for fetching and storing the schedule might look like this:


  • Every once in awhile, a ScheduleFetchRequested message is sent out.

  • The schedule fetcher application is listening for the ScheduleFetchRequested message, and upon receiving it, goes out and fetches the schedule for the date detailed in the message. This involves running code to go to the website and do some scraping. It then sends out a ScheduleFetchAnnouncement message with the results of what it's found. Some stuff it can determine immediately (player ids, map ids), where others it might not be able to (team names, league and stage that the game is played at ).

  • Another application gets this ScheduleFetchAnnouncement, and will do lookups on the information we couldn't find directly from the web page (such as the team ids or league name). It gathers the results and spits out a ScheduleParseAnnouncement.

  • Another application that actually stores the schedules onto disk receives the ScheduleParseAnnouncement message, and goes ahead and saves them. It doesn't need to send out any message.



So why three apps? Why not have all this done in one app? Or the fetch and parsing in one app, and the storing in another?


  1. Although debatable as to it's merits, the applications would then be more complex if we shrunk them down to two or one. The trade-off is that with three apps, now your entire system is more complex.

  2. More importantly, by separating the steps by messages, you can later make other applications that can get at the data in the middle of the flow. For example, we can create an UnknownData application that also listens for the ScheduleParseAnnouncement messages, and if it sees any players or maps that we've never heard of before, can send out appropriate messages to try to fetch information on those players and maps. Adding this new functionality can be done without touching existing components.



Of course, you need to go with your own discretion over how much you separate your apps. There really can't be a hard rule, just experience and preference to help here.

Now, let's look at what happens regarding youtube videos:


  • Every once in awhile, a YouTubeFetchRequested message is sent out

  • An application hears this message and runs a query on youtube. Each resulting video's id, title, description, and author are sent in a YouTubeFetchAnnouncement message.

  • Another application is listening for these YouTubeFetchAnnouncement messages. For each one, they'll run the title and description through a parser to try to grab as much information as possible. Possible info might be team names, player names, game number (game X of a set of 5 games), date played, etc. Sometimes, the parse might not find enough data to even assume that it's actually a starcraft commentary. In that case, the application doesn't send out a message about it. However, let's assume it gets at least SOME data from the parse. The data it does find it will put into a message, YouTubeParseAnnouncement.

  • The YouTubeParseAnnouncement message is sent to the application we talked about earlier that stored the scheduling data. That application will run the youtube information against the schedules stored and look for possible links. If it can find one, it sends out a VideoGameLinkAnnouncement. If it can't find any results, or it could be one of multiple results, the app will instead send out a YouTubePartalParseAnnouncement.



From here, the VideoGameLinkAnnouncement messages eventually go to the web site, whereas the "partial parses", as I call them, are collected into another application to be sorted through manually by me to see what's up. By manually going through these results, I can come up with changes to the youtube parsing code, or add data to help the search engine in future tries. I then have the option to retry the parse again, or just manually send out the VideoGameLinkAnnouncement myself with the correct data.

There are other apps and messages that I haven't spoken of, such as fetching and updating data on players and maps, the applications that trigger periodic fetches, and what happens to all those "partial parse" messages. But I think this gives a good idea of how things would tend to work.

So, that's the overview of the system. For the most part, it's a bunch of small apps that share data through passing messages. There is no real central database, as each app is responsible for saving the data in the way they feel suitable.

It's a completely different style than the typical system where all the data is in a central database and all parts of the app work on the same data. I've read about this style of programming on blogs like Udi Dahan and Ayende. It's my first time using such a system, and I'm interested in learning about it's strengths/pitfalls.

SC Engine: Part 2 - System Overview: Messages and Applications

- Introduction
- Part One: System Overview
- Part Two: System Overview: Messages and Applications
- Part Three: Screen Scraping
- Part Four: YouTube Parsing
- Part Five: Linking the Video to the Game
- Part Six: Messaging Middleware
- Part Seven: The Console
- Part Eight: The Site

This post will go over some of the general implementation for SC Engine.

The system is written in python, and as stated in Part 1, uses a Message Bus to pass messages with information between "applications" inside the engine.

Messages



A message is a simple entity meant to store data that is being sent from one part of the system to another. Here is what a message might look like...


class PlayerFetchRequested(BaseMessage):
def __init__(self, player_id):
# int
self.player_id = player_id


For those interetested, you can see all the messages in the source:

As you can see, there's not much meat here. It's just a class that inherits from BaseMessage, with a constructor that has the data. It could've just been implemented as a tuple with a string for the message name and a dictionary for the data, but it was nice to be able to have an error thrown if I don't include all the arguments. BaseMessage also provides a few methods onto the messages that are helpful, and is used by the messaging middleware I wrote (which will be discussed in another part).

See all the messages for SC Engine

Applications


Messages are sent between applications. An "application" is a python object that defines a number of methods which respond to incoming messages and optionally sending out resulting messages. The point is that each application does one job, and sends out messages of the results of what it's done. Where the messages go or who use them is completely not of the concern of the application. Nor does it care where the messages it receives have come from. In this sense, the entire system is implicitly "pub/sub", or "publish and subscribe".

Typically, an application consists of two parts: a builder, and the application itself. Let's look at an application first:

Player Fetch App

This application takes into its constructor a callable (in the end, this callable turns out to be a function that launches the actual fetching and scraping of a web page containing the player data). It is using constructor dependency injection so that I can replace lookup with a mock and easily test the app.

There is one method here that handles incoming messages, and that is player_fetch_requested. This message is called when the application receives a PlayerFetchMessage. An application can support handling of more than just one type of message, just add more methods for each handling operation. The fact that the name of the function is similar to the message name is just a convention, and doesn't have any significance other than to to be self-documenting.

The method takes in one argument; the message to handle. A method that handles messages like this can then return from the function a message or list of messages of it's own, which is how it sends out messages. The point is that this application is run on top of some messaging middleware that passes the message to the correct function, gets the results of the function (which are always messages) and then sends out the resulting messages.

In this case, the lookup is done, and depending on the results sends out a resulting message. In this case, either a messages with the info on the player, or a message saying that the player doesn't exist.

Now, the application itself is useless, since there's no way of telling the middleware that will be using it what messages to map to what methods. I experimented with naming conventions on the methods and decorators, but eventually settled on a simpler solution: a builder function that does this mapping, as well as giving the application object an dependencies...


def make_app(app_name, root_config, config):
lookup = get_player_info
app = FetchPlayerApp(lookup)

return {
msgs.PlayerFetchRequested: app.player_fetch_requested,
}



Originally, this method was parameterless, but over time has grown to three parameters. app_name is the name as determined by the message bus for the application. The application shouldn't hard-code this or figure it out by itself because it might have parts prepended or appended to it by other parts of the system. The main reason an application would need this is to use it as the name of the logger or data filenames.

The root_config and config parameters are dictionaries containing items that might be run-time options (typically stored in ini files and parsed by a lower part of the engine). root_config typically contains configuration information for all application (such as the directory to store any data files), while config stores information specific to that application (an example might be the exact url to use when fetching, although I've decided to hardcode this right now).

The application builder uses the arguments to create the application object. It then returns a dictionary that maps the message type to the method on the app that should handle that message. Typically, this method is simple enough that it doesn't need testing.

Notice that the application is a POPO (Plain Ole' Python Object). It does not have any dependencies on messaging systems on it. It just takes in the messages, and returns resulting messages. The actual job of sending and receiving those messages on any sort of message bus is up to the object which calls the applications methods.

This allowed me to easily set up prototypes in the early stages by using a hacked together message bus that would just repeatedly send a message to an apps handler, get the resulting messages, then send those out as well. In fact, I pretty much used this system through most of development, not setting up RabbitMQ or anything like that until much later.

Also, because it's a POPO, this application is extremely simple to test. Here is what the test looks like:


class TestPlayerFetchApp(AppTestBase):
def setUp(self):
self.lookup = Mock()
self.sut = PlayerFetchApp(self.lookup)

def test_fetches_player(self):
self.lookup.return_value = {'name' : 'test', 'race' : 'zerg'}

input = msgs.PlayerFetchRequested(1)
expected = msgs.PlayerFetchAnnouncement(1, 'test', 'zerg')

result = self.sut.player_fetch_requested(input)

self.assertContainsMsg(result, expected)

def test_fetches_non_existant_player(self):
self.lookup.return_value = None

input = msgs.PlayerFetchRequested(1)
expected = msgs.PlayerFetchNonExistantPlayer(1)

result = self.sut.player_fetch_requested(input)

self.assertContainsMsg(result, expected)


assertContainsMsg is a helper function that covers all the cases of an application returning a message (returning the message itself, or returning a list of messages with that message in the list). Also, the messages are easy to spot because the BaseMessage object implements value equality. This means...


>>> a = PlayerFetchAnnouncement(1, 'test', 'zerg')
>>> b = PlayerFetchAnnouncement(1, 'test', 'zerg')
>>> c = PlayerFetchAnnouncement(2, 'test2', 'zerg')
>>> a == b
True
>>> a == c
False


Even though object a and b are different objects, BaseMessage overrides the equality operators to ensure that messages with the same data are equal. The main point is to make testing much easier: just make the message you're expecting, rather than having to check all the individual attributes yourself.

That's a pretty simple overview of what an individual application might look like. Typically, your application is either a simple application in itself, just storing and sending data, or is a front-end to more advanced functionality such as this fetch application. It's easy to test, and while is built with messaging in mind has no dependencies on any messaging framework.