Skip to content

Instantly share code, notes, and snippets.

@ryandotsmith
Created April 11, 2012 20:15
Show Gist options
  • Save ryandotsmith/2362143 to your computer and use it in GitHub Desktop.
Save ryandotsmith/2362143 to your computer and use it in GitHub Desktop.
The Work Pattern

The Worker Pattern

2011-05-22

Introduction

This article is the accumulation of a tutorial that was given as a training session at Red Dirt Ruby Conf (2011) and a formal talk given at Rails Conf (2011). To understand the theory behind The Worker Pattern, read over the slides and the slide notes. From there, you can follow the instructions in the Tutorial to practice the concepts discussed in the slides.

Slide Deck

Tutorial

Caching Strategies

There are many techniques for client-side caching. Strategies include: Last-Modified, ETags, and others. In this session, I will discuss a strategy which involves server side caching and client side polling.

Our goal is to provide a web page such that the user submits a search query and with this query, we fetch results from several remote APIs. We will implement these ideas using Sinatra, jQuery & Memcache. We will take several iterative steps towards our goal, but eventually the flow of our program will look like this:

Program Messages

To build a system that communicates like the above diagram, we will take the following steps:

  1. Setup a proxy class for each API.
  2. Create a handler that uses the proxies.
  3. Create HTML view for the form and results page.
  4. Get our search working inside of an HTTP request.
  5. Add code to handle Javascript requests.
  6. Include a Javascript helper for polling our server.
  7. Connect our results page to the Javascript helper.
  8. Add Memcache.

Step 1

We will be fetching data from two remote APIs, Bank and Weather. Let's create a few Ruby classes to proxy requests to the APIs.

# remote_data.rb

class RemoteData
end

class Bank < RemoteData
  def self.search(query)
    RestClient.get ENV["BANK_URL"], :params => query
  end
end

class Weather < RemoteData
  def self.search(query)
    RestClient.get ENV["WEATHER_URL"], :params => query
  end
end

We will not be sending messages to an instance of RemoteData, rather we will be sending messages to an instance of Bank or Weather. But the abstract class RemoteData will help us keep things straight when we begin to add advanced features to our App.

Step 2

# application.rb

class Application < Sinatra::Application
  get "/search" do
    @query = params[:q]
    @bank_results = Bank.search(@query)
    @weather_results = Weather.search(@query)
    erb :search
  end
end

This file contains our Sinatra app. We have mapped search to an HTTP GET. Our search form will send a GET request to this action. Lets hook up the HTML.

Step 3

<!-- views/search.html.erb -->

<form action="/search" id="search">
  <input type="text"  name="query" />
</form>

<section id="bank">
<% @bank_results.each do |result| %>
  <p><%= result %></p>
<% end %>
</section>

<section id="weather">
<% @weather_results.each do |result| %>
  <p><%= result %></p>
<% end %>
</section>

Step 4

<!-- views/layout.erb -->

<!DOCTYPE html>
<html>
  <head>
    <script src="http://ajax.googleapis.com/ajax/libs/jquery/1.5.1/jquery.min.js"
           type="text/javascript"></script>
    <!-- We will add this file in Step 6-->
    <script src="/application.js" type="text/javascript"> </script>
  </head>
  <body>
    <%= yield %>
  </body>
</html>
# config.ru

require 'sinatra'
require 'rest-client'

require './remote_data'
require './application'

run Application
$ brew install postgresql
$ brew install memcached
$ gem install rackup
$ gem install sinatra
$ gem install rest-client
$ gem install dalli
$ gem install queue_classic
$ rackup
# Open web browser and affirm success!

Pause

Let us take a moment and reflect upon what we have done thus far. Our application has a few Ruby classes that know how to call a remote API. They simply use an HTTP library to fetch JSON from an API somewhere on the internet. Our proxies return the results as an array of strings so that we can loop through the values while writing them to our view. The Application class holds all of our sinatra code and controls interaction between the user and our APIs.

This code is great... if you don't care about web scale. There are a few problems with this setup:

  • We are making 2 API requests inside of the users request to our application.
  • The user's request is bound by the slowest API call.
  • If 10 users submit identical queries, our application sends 20 API calls and receives 2 unique results.
  • UX may suffer from combinations of aforementioned problems.

We can address a few of these problems by using AJAX. We will introduce the concept of a fragment to start. Let's define the fragment as such: a component of our view that maps to the result of a RemoteData object. We have 2 fragments, a bank fragment and a weather fragment. Lets see what this looks like with some code:

Step 5

# application.rb

class Application < Sinatra::Application

  get "/search" do
    erb :search
  end

  def self.fragment(fragment_name)
    obj = yield
    Application.get "/fragments/#{fragment_name}" do
      obj.request_attrs = params
      res = obj.fetch
      instance_variable_set("@#{fragment_name}",res)
      erb "_#{fragment_name}.erb", :layout => false
    end
  end

  fragment("bank")    { Bank.new    }
  fragment("weather") { Weather.new }

end

We will be moving the heavy lifting out of the /search handler into the /fragments/:id handler.

Application.fragment(fragment_name) is the primary addition to our code. We are applying a bit of meta-programming so that we can accommodate additional APIs that our application may support. You might be wondering why we sublcassed Bank and Weather from RemoteData, now it should be clear. We did this in order to provide a protocol for sublcasses of RemoteData to support. For example, if we wanted to add a Movie API to query, we would sublcass it from RemoteData and then implement all of the methods in RemoteData and finally use our fragment DSL.

  fragment("movie") { Movie.new }

You will also notice that we are calling some methods on obj that we have yet to define. In short, we need to create the following methods in our RemoteData class:

  • request_attrs
  • fetch
  • name
# remote_data.rb

class RemoteData
  attr_accessor :request_attrs

  def name
    self.class.to_s.underscore
  end

  def fetch
    raise "should be definied in subclass"
  end

end

class Bank < RemoteData
  def self.search(query)
    RestClient.get ENV["BANK_URL"], :params => query
  end

  def fetch
    self.class.search(request_attrs[:q])
  end

end

class Weather < RemoteData
  def self.search(query)
    RestClient.get ENV["WEATHER_URL"], :params => query
  end

  def fetch
    self.class.search(request_attrs[:q])
  end

end

Step 6

So, we have added support to our server for the fragments. Let us now implement some client-side support for fragments. We will start by building a Javascript function that behaves similar to fragment().

/*
  ./public/application.js
*/

function fetch(fragment_name) {
  $.ajax({
    url: "/fragments/" + fragment_name,
    data: $('#search').serialize(),
    success: function(data, status, req) {
      $('#' + fragment_name).html(data);
    },
    error: function(request, status) {
      console.log(status);
    }
  });
}

Step 7

<!-- views/search.html.erb -->

<form action="/search" id="search">
  <input type="text"  name="query" />
</form>

<section id="bank"></section>
<section id="weather"></section>

<script type="text/javascript">
  fetch("bank");
  fetch("weather");
</sctipt>
<!-- views/_bank.html.erb -->
<% @bank.each do |result| %>
  <p><%= result %></p>
<% end %>
<!-- views/_weather.html.erb -->
<% @weather.each do |result| %>
  <p><%= result %></p>
<% end %>

Here we have done a bit of refactoring. We started by removing the erb from inside the section tags inside our search.html.erb file. Since fetch() will insert HTML fragments, we can start with empty tags. Then, once our page has loaded, we called fetch() with our 2 fragment names.

Pause

Let's take a look at our updated list of problems:

  • -We are making 2 API requests inside of the users request to our application.
  • -The user's request is bound by the slowest API call.
  • If 10 users submit identical queries, our application sends 20 API calls and receives 2 unique results.
  • -UX may suffer from combinations of aforementioned problems.

Well, since the end Step 4, we have added more code and more technology while maintaining the same behaviour. Is all of this complexity worth it? Yes! Here is why:

Let's assume that we are running our sinatra app on one server. Then, the user's browser will interpret the calls to fetch() and subsequently send 2 HTTP requests to our server. Since we are running only 1 server, we will only be able to process 1 request at a time. This is not parallel, this is sequential.

  fetch("weather")-----|
                       |-------> (WebServer)
  fetch("bank")--------|

However, our Javascript is able to make requests in parallel, so if we were to set up some servers to respond to requests in parallel, then we could achieve total parallel execution. This is horizontal scale.

  fetch("weather")-----|-------> (WebServer 0)

  fetch("bank")--------|-------> (WebServer 1)

So, we have 1 more problem to solve. Looks like we are ready to setup Memcahced. We are using memcached to solve the problem of users who submit the same query in a short interval of time. We can do this because we know that our Weather API will not change that often and our Bank API as well. So, when we fetch the data from the API we will stick it in memcache so that users who query after us will take advantage of all of the hard work we have done.

Step 8

# application.rb

class Application < Sinatra::Application

  get "/search" do
    erb :search
  end

  def self.fragment(fragment_name)
    obj = yield
    Application.get "/fragments/#{fragment_name}" do
      obj.request_attrs = params
      if obj.exists?
        instance_variable_set "@#{fragment_name}", obj.instance
        erb "_#{fragment_name}".to_sym, :layout => false
      else
        obj.fetch
        status 204
      end
    end
  end

  fragment("bank")    { Bank.new    }
  fragment("weather") { Weather.new }

end

Previously, we called fetch() on our object, set the instance variable and then rendered the template. Now we are checking if the object exists. The idea is that sometimes the object might take a long time to load the results of the search query. So we will let our client poll the server until the server is ready to deliver the results. The interesting code additions are: exists?, instance & fetch. Lets implement these in our RemoteData class.

# remote_data.rb

class Cache
  extend self

  def instance
    @@instance ||= Dalli::Client.new
  end

  def get(*args)
    instance.get(*args)
  end

  def set(*args)
    instance.set(*args)
  end

end


class RemoteData
  attr_accessor :request_attrs

  def name
    self.class.to_s.underscore
  end

  def fetch
    raise "should be definied in subclass"
  end

  def exists?
    not Cache.get(key).nil?
  end

  def key
    raise "should be definied in subclass"
  end

  def instance
    JSON.parse(Cache.get(key))
  end

  def write_to_cache
    ttl = 10
    result = JSON.dump(yield)

    Cache.set(key, result, ttl)
  end

end

class Bank < RemoteData
  def self.search(query)
    RestClient.get ENV["BANK_URL"], :params => query
  end

  def fetch
    write_to_cache { self.class.search(request_attrs[:q]) }
  end

  def key
    [name,query].join(":")
  end

  def query
    request_attrs[:q]
  end

end

class Weather < RemoteData
  def self.search(query)
    RestClient.get ENV["WEATHER_URL"], :params => query
  end

  def fetch
    write_to_cache { self.class.search(request_attrs[:q]) }
  end

  def key
    [name,query].join(":")
  end

  def query
    request_attrs[:q]
  end

end

And now we need to tell our fetch() function to poll until we receive our data.

// public/application.js

function fetch(fragment_name) {
  $.ajax({
    url: "/fragments/" + fragment_name,
    data: $('#search').serialize(),
    success: function(data, status, req) {
      if(req.status == 204) {
        setTimeout(function() { fetch(fragment_name) }, 500);
        return;
      }
      $('#' + fragment_name).html(data);
    },
    error: function(request, status) {
      console.log(status);
    }
  });
}

In our javascript, we are relying on the fragment action to respond with a HTTP status code 204 at least 1 time. The first request will check the cache for our query result, when it does not find the key, the action tells the our fragment object to fetch it self. On the 2nd (or any integer > 1) request, our controller will read the data from the cache and return the data to the calling javascript with a HTTP status code 200. Finally, our Javascript writes the data to the page.

Reflection

Well done. We have taken an application that was quited linear and through a series of refactorings, we transformed it's execution into something quite parallel. However, there is still some optimization to be had.

obj.fetch()

In the next section, we will discover how we can further optimize this piece of code.


Background Processing

In the last section, we created an app that was quite linear and through a series of refactorings, we parallelized the majority of executions. We can now address the last bottleneck.

Let's take a look at our fragments handler:

# application.rb

class Application < Sinatra::Application

  get "/search" do
    erb :search
  end

  def self.fragment(fragment_name)
    obj = yield
    Application.get "/fragments/#{fragment_name}" do
      obj.request_attrs = params
      if obj.exists?
        instance_variable_set "@#{fragment_name}", obj.instance
        erb "_#{fragment_name}".to_sym, :layout => false
      else
        obj.fetch # <------- Method call in question
        status 204
      end
    end
  end

  fragment("bank")    { Bank.new    }
  fragment("weather") { Weather.new }

end

On the first request, exists? will return false and then we will attempt to fetch(). Currently, the handler will block until we have completed fetch(), which is a call to a remote API. This is undesirable for several reason:

  • What if the API call takes a long time?
  • What if fetch raises an exception?

Also, it is generally considered best practice to not do any heavy lifting in an HTTP request. Our goal is to respond to the HTTP request as quickly as we can. Therefore, we are going to do the heavy lifting of fetch() in a background process. To do this, we will use a queueing library to distribute the job.

class RemoteData
  attr_accessor :request_attrs

  def name
    self.class.to_s.underscore
  end

  def fetch
    raise "should be definied in subclass"
  end

  def exists?
    not Cache.get(key).nil?
  end

  def key
    raise "should be definied in subclass"
  end

  def instance
    Marshal.load(cache.get(key))
  end

  def write_to_cache
    ttl = 10
    result = Marshal.dump(yield)

    Cache.set(key, result, ttl)
  end

end

class Bank < RemoteData

  def self.search_and_set(key,query)
    ttl = 10
    result = search(query).to_json
    Cache.set(key,result,ttl)
  end

  def self.search(query)
    RestClient.get ENV["BANK_URL"], :params => query
  end

  def fetch
    QC.enqueue("Bank.search_and_set", key, request_attrs[:q])
  end

  def key
    [name,query].join(":")
  end

end

class Weather < RemoteData

  def self.search_and_set(key,query)
    ttl = 10
    result = search(query).to_json
    Cache.set(key,result,ttl)
  end

  def self.search(query)
    RestClient.get ENV["WEATHER_URL"], :params => query
  end

  def fetch
    QC.enqueue("Weather.search_and_set", key, request_attrs[:q])
  end

  def key
    [name,query].join(":")
  end

end

We have done a few notable refatorings here. Instead of calling search() within fetch() we enqueue the job into our queue. Eventually, our worker will call search_and_set() and this method will call the API and set the return value to the cache. Let's start up a worker now.

First of all, we need to give Queue Classic a database to use. We can do this by setting the DATABASE_URL environment variable.

$ export DATABASE_URL='postgres://username:password@localhost/database_name'
$ rackup

In another terminal, let's use an IRB session to start our worker and fetch some remote data.

$ export DATABASE_URL='postgres://username:password@localhost/database_name'
$ irb -I .
irb: worker = QC::Worker.new
irb: worker.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment