Posted on by & filed under cloud computing, programming, python.

In Part 1 of this series I described the problem statement: manage conversion of hundreds of files simultaneously on concurrent EC2 instances. In this post I’ll walk through some of the Python code that powered the workflow.

Distribute work

I chose to write this as a Fabric program. Fabric is typically used for code deployment, but I also find it a handy framework for writing command-line programs in which I may want to run different functions at different times. It’s a lot simpler to type fab function_i_want_to_run with arguments than to implement a __main__ method with a bunch of command-line options. (As it turns out I also needed to write some remote file commands, which Fabric is very convenient for.)

Helpers

These two get called repeatedly, but only from inside other Fabric tasks, so they’re just normal functions:

def connect_to_ec2():
    return boto.ec2.connect_to_region(EC2_REGION, aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)

def connect_to_sqs():
    return boto.sqs.connect_to_region(EC2_REGION, aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)

Queue up the work

Now comes the first function I designated as an actual Fabric task:

@task
def create_message(workfile):
    '''Add a work file (filename) to the Amazon SQS'''
    conn = connect_to_sqs()
    queue = conn.get_queue(SQS_QUEUE_NAME)
    if not queue:
        queue = conn.create_queue(SQS_QUEUE_NAME)
    queue.set_timeout(SQS_VISIBILITY_TIMEOUT)
    message = queue.new_message(workfile)
    queue.write(message)

The visibility timeout specifies the duration that an item is allowed to have been pulled from the queue but is presumed to still be processing. If the item is still in limbo after the timeout expires, SQS assumes that the job failed and puts the item back on the queue for re-processing. The default is a mere 30 seconds, but the whole reason for this infrastructure is that the ebook conversions can take a long time. So I set SQS_VISIBILITY_TIMEOUT to one hour, to prevent other instances from grabbing a book off the queue while it’s still being converted.

(It was important to be that this program be repeatable and not unnecessarily re-convert files, so it wasn’t sufficient to just have a static list of filenames. I had to check on the remote server which files needed to be worked on and which we had already converted. This is where Fabric’s remote filesystem commands came in handy.)

Since this was all new to me I wrote a number of helper tasks that I could use to quickly monitor what was going on:


@task
def show_queue():
    '''Show all items in the queue'''
    conn = connect_to_sqs()
    queue = conn.get_queue(SQS_QUEUE_NAME)
    if queue:
        log.info("%d messages in the queue" % queue.count())

Start up the instances

Once the queue is primed, it’s time to start up the EC2 workers. Here’s the code to start up one instance.

@task
def create_server(name):
    '''Create an EC2 instance with the given name.'''
    conn = connect_to_ec2()
    image = conn.get_all_images(EC2_AMIS)

    reservation = image[0].run(1, 1, key_name=EC2_KEY_PAIR, security_groups=EC2_SECURITY_GROUP, placement=EC2_PLACEMENT_GROUP, instance_type=EC2_INSTANCE_TYPE, user_data=open('process_from_queue.sh').read())

    instance = reservation.instances[0]
    conn.create_tags([instance.id], {"Name": name})
    return instance

That’s pretty standard boilerplate EC2 code except for the user-data option. The boto framework passes through user-data scripts as strings, and I wanted to ensure that the startup script was always up to date and individually source-code managed, independent of this deployment script. Hence the call to open().

(The content of that user-data script is available in Part 1.)

@task
def provision_servers():
    '''Provision as many EC2 instances as we can, up to the limit, start up each instance in turn'''
    conn = connect_to_ec2()

    # Amazon returns cranky errors if you try to start too many instances, so to be clean
    # don't even try to run more instances than the hard limit of 20.
    res = conn.get_all_instances(filters = {'instance-state-name' : 'running'})
    instances = [i for r in res for i in r.instances]
    server_count = INSTANCE_LIMIT - len(instances)

    log.info("Provisioning %d servers for %d PDF conversions" % (server_count, workfile_count))

    for i in range(0, server_count):
        server_name = "pdf-worker-%d" % i # The server name can be anything, but I liked naming them incrementally
        create_server(server_name)
        log.info("Provisioned %s" % server_name)

Since the instances are self-terminating, that’s pretty much all I needed to do. I could monitor the queue directly through my Fabric task or by using Amazon’s web-based console.

The last novel bit is the simple task I wrote to invoke multitail (as seen in part 1) by reading the log files of all the remote servers at once:

@task
def watch_logs():
    '''Watch all of the logs on the remote server. Expects you have 'multitail' installed (brew installable)'''
    conn = connect_to_ec2()
    res = conn.get_all_instances(filters = {'instance-state-name' : 'running'})
    instances = [i for r in res for i in r.instances]
    logfile = '/home/ubuntu/ebook_converter/process_from_queue.log'
    cmd = []
    for instance in instances:
        cmd.append("-s 2 -l 'ssh -i MY_EC2_PEM_FILE.pem -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no ubuntu@%s "tail -f %s"'" % (instance.public_dns_name, logfile))
    local("multitail " + ' '.join(cmd))

Happy provisioning!

Tags:

Comments are closed.