reevoolabs : open source technology

April 8, 2009

beanstalk-messaging

Filed under: Uncategorized — lukeredpath @ 1:27 pm

Beanstalkd is a lightweight messaging system that we use at Reevoo to communicate between our various Rails applications. Beanstalkd queues can be interacted with using the beanstalk-client RubyGem.

Whilst newer versions of Beanstalkd supports the concept of multiple “queues” within a single beanstalkd process, earlier versions (we currently use 0.6.1) were limited to one queue per process. Because of this, and our need for multiple queues to store different types of message, we ran each queue as a separate beanstalkd process—an approach that still has it’s advantages because each process can be managed and monitored independently.

The beanstalk_messaging plugin for Rails takes all of the work out of managing individual beanstalkd processes and also provides a higher-level interface than that provided by the beanstalk-client gem for interfacing with and polling queues.

Installation

The plugin can be installed from Subversion:

$ ./script/plugin install http://svn.reevoo.com/repos/plugins/beanstalk_messaging/

Then create a symlink to the beanstalk control script under the script/ folder:

$ ln -s vendor/plugins/beanstalk_messaging/script/beanstalk script/beanstalk

Managing beanstalkd processes

The first step to integrate beanstalk_messaging into your Rails application is to configure the queues that you need. Create a config/beanstalk.yml file with contents similar to the following:

:queue_one:
  :host: 0.0.0.0
  :port: 11000

:queue_two:
  :host: 0.0.0.0
  :port: 11001

You can now use the beanstalk control script to start up your queues:

$ ./script/beanstalk start

To view the current status of your queues (messages received, pending etc.), run the following command1:

$ ./script/beanstalk status

The beanstalk control script also lets you stop and restart your queues2.

You can also run any of these commands for a specific queue. To only start queue_two, run:

$ ./script/beanstalk start queue_two

For a convenient way to manage your queue configuration, take a look at Jack.

Accessing your queues from within your Rails application

The plugin provides a Beanstalk::QueueManager class which provides a high level interface for accessing your queues. The easiest way to use this is to assign an instance of this class in a constant in your environment.rb file (or a separate initializer file) – it just requires the path to your beanstalk.yml file.

QUEUE_MANAGER = Beanstalk::QueueManager.new( File.join(Rails.root, 'config', 'beanstalk.yml') )

Use the queue method to return a Beanstalk::Queue instance for the specified queue – this class acts as a high level wrapper around the Beanstalk::Pool class provided by the beanstalk-client gem.

my_queue = QUEUE_MANAGER.queue(:queue_one)

You can also disable certain queues – this can be useful in a test environment where you do not actually have any queue instances running, but do not want to change your beanstalk.yml configuration. When a queue is disabled, calling queue with that queue’s name will return a Beanstalk::NullQueue rather than a real queue, which responds in the same way as a Beanstalk::Queue without actually doing anything.

QUEUE_MANAGER.disable(:queue_one) # => just disable queue_one
QUEUE_MANAGER.disable_all! # => disable all queues

Working with queues

Once you have an instance of a Beanstalk::Queue, you can push new messages on to it as if it were a Ruby Array:

# both of these are functionally equivalent
my_queue.push("hello world")
my_queue << "hello world"

Messages will be added to the queue in YAML format; this means you can push any Ruby object onto the queue as long as you have defined a to_yaml method:

class MyClass
  def to_yaml
    # some data
  end
end

object = MyClass.new
my_queue << object

There are several methods available for inspecting the status of the queue:

my_queue.number_of_pending_messages # => number of jobs awaiting processing
my_queue.total_jobs # => number of jobs ever received by the queue
my_queue.raw_stats # => a hash of lower level queue statistics

To pull the next pending message off of the queue, simply call next_message. This will return nil if there are no pending jobs on the queue, or a Beanstalk::Job object. To get the parsed YAML response from this object, simply call the ybody method. You must remember to do something with this job once you have done something with it’s data, such as delete or release it3.

Alternatively, you can call next_message with a block; in this case, the parsed YAML body will be yielded to the block and once the block has finished executing, the job will be deleted. The following two blocks of code are functionally equivalent:

# without a block
job = queue.next_message
MyJobHandler.process(job.ybody)
job.delete

# with a block
queue.next_message do |body|
  MyJobHandler.process(body)
end

It is recommended that you use the block form unless you need more fine-grained control over how the job is handled.

Polling queues

The plugin also provides a convenient QueuePoller class for continuous polling of a queue for new messages. This makes it possible to handle new messages as they arrive on the queue.

queue_manager = Beanstalk::QueueManager.new( File.join(Rails.root, 'config', 'beanstalk.yml') )
queue_poller  = Beanstalk::QueuePoller.new(queue_manager)

# begin continuous loop
queue_poller.poll(:my_queue) do |message|
  MessageHandler.handle(message.ybody)
  message.delete
end

Note that because the poller yields the actual message object and not the parsed body, like calling queue.next_message, you need to delete the message yourself once you are finished with it.

If you would rather handle your messages in batches rather than on the fly, the poller has a poll_with_buffer method that lets you define how many messages you would like to be received before they are yielded.

# yields once 10 messages have been received
queue_poller.poll_with_buffer(:my_queue, 10) do |messages|
  messages.each do |message|
    MessageHandler.handle(message.ybody)
    message.delete
  end
end

Further information

1 If you want to continually monitor your queues, you can run this command using the Unix `watch` command.

2 Don’t forget – beanstalkd is not a persistant queue and any messages remaining on your queue will be lost if you restart the process.

3 See the beanstalk-client gem documentation for more details.

No Comments »

No comments yet.

RSS feed for comments on this post. TrackBack URL

Leave a comment

Powered by WordPress