You are previewing Clojure Programming.

Clojure Programming

Cover of Clojure Programming by Chas Emerick... Published by O'Reilly Media, Inc.
  1. Clojure Programming
  2. SPECIAL OFFER: Upgrade this ebook with O’Reilly
  3. Preface
    1. Who Is This Book For?
      1. Engaged Java Developers
      2. Ruby, Python, and Other Developers
    2. How to Read This Book
      1. Start with Practical Applications of Clojure
      2. Start from the Ground Up with Clojure’s Foundational Concepts
    3. Who’s “We”?
      1. Chas Emerick
      2. Brian Carper
      3. Christophe Grand
    4. Acknowledgments
      1. And Last, but Certainly Far from Least
    5. Conventions Used in This Book
    6. Using Code Examples
    7. Safari® Books Online
    8. How to Contact Us
  4. 1. Down the Rabbit Hole
    1. Why Clojure?
    2. Obtaining Clojure
    3. The Clojure REPL
    4. No, Parentheses Actually Won’t Make You Go Blind
    5. Expressions, Operators, Syntax, and Precedence
    6. Homoiconicity
    7. The Reader
      1. Scalar Literals
      3. Whitespace and Commas
      4. Collection Literals
      5. Miscellaneous Reader Sugar
    8. Namespaces
    9. Symbol Evaluation
    10. Special Forms
      1. Suppressing Evaluation: quote
      2. Code Blocks: do
      3. Defining Vars: def
      4. Local Bindings: let
      5. Destructuring (let, Part 2)
      6. Creating Functions: fn
      7. Conditionals: if
      8. Looping: loop and recur
      9. Referring to Vars: var
      10. Java Interop: . and new
      11. Exception Handling: try and throw
      12. Specialized Mutation: set!
      13. Primitive Locking: monitor-enter and monitor-exit
    11. Putting It All Together
      1. eval
    12. This Is Just the Beginning
  5. I. Functional Programming and Concurrency
    1. 2. Functional Programming
      1. What Does Functional Programming Mean?
      2. On the Importance of Values
      3. First-Class and Higher-Order Functions
      4. Composition of Function(ality)
      5. Pure Functions
      6. Functional Programming in the Real World
    2. 3. Collections and Data Structures
      1. Abstractions over Implementations
      2. Concise Collection Access
      3. Data Structure Types
      4. Immutability and Persistence
      5. Metadata
      6. Putting Clojure’s Collections to Work
      7. In Summary
    3. 4. Concurrency and Parallelism
      1. Shifting Computation Through Time and Space
      2. Parallelism on the Cheap
      3. State and Identity
      4. Clojure Reference Types
      5. Classifying Concurrent Operations
      6. Atoms
      7. Notifications and Constraints
      8. Refs
      9. Vars
      10. Agents
      11. Using Java’s Concurrency Primitives
      12. Final Thoughts
  6. II. Building Abstractions
    1. 5. Macros
      1. What Is a Macro?
      2. Writing Your First Macro
      3. Debugging Macros
      4. Syntax
      5. When to Use Macros
      6. Hygiene
      7. Common Macro Idioms and Patterns
      8. The Implicit Arguments: &env and &form
      9. In Detail: -> and ->>
      10. Final Thoughts
    2. 6. Datatypes and Protocols
      1. Protocols
      2. Extending to Existing Types
      3. Defining Your Own Types
      4. Implementing Protocols
      5. Protocol Introspection
      6. Protocol Dispatch Edge Cases
      7. Participating in Clojure’s Collection Abstractions
      8. Final Thoughts
    3. 7. Multimethods
      1. Multimethods Basics
      2. Toward Hierarchies
      3. Hierarchies
      4. Making It Really Multiple!
      5. A Few More Things
      6. Final Thoughts
  7. III. Tools, Platform, and Projects
    1. 8. Organizing and Building Clojure Projects
      1. Project Geography
      2. Build
      3. Final Thoughts
    2. 9. Java and JVM Interoperability
      1. The JVM Is Clojure’s Foundation
      2. Using Java Classes, Methods, and Fields
      3. Handy Interop Utilities
      4. Exceptions and Error Handling
      5. Type Hinting for Performance
      6. Arrays
      7. Defining Classes and Implementing Interfaces
      8. Using Clojure from Java
      9. Collaborating Partners
    3. 10. REPL-Oriented Programming
      1. Interactive Development
      2. Tooling
      3. Debugging, Monitoring, and Patching Production in the REPL
      4. Limitations to Redefining Constructs
      5. In Summary
  8. IV. Practicums
    1. 11. Numerics and Mathematics
      1. Clojure Numerics
      2. Clojure Mathematics
      3. Equality and Equivalence
      4. Optimizing Numeric Performance
      5. Visualizing the Mandelbrot Set in Clojure
    2. 12. Design Patterns
      1. Dependency Injection
      2. Strategy Pattern
      3. Chain of Responsibility
      4. Aspect-Oriented Programming
      5. Final Thoughts
    3. 13. Testing
      1. Immutable Values and Pure Functions
      2. clojure.test
      3. Growing an HTML DSL
      4. Relying upon Assertions
    4. 14. Using Relational Databases
      2. Korma
      3. Hibernate
      4. Final Thoughts
    5. 15. Using Nonrelational Databases
      1. Getting Set Up with CouchDB and Clutch
      2. Basic CRUD Operations
      4. _changes: Abusing CouchDB as a Message Queue
      5. À la Carte Message Queues
      6. Final Thoughts
    6. 16. Clojure and the Web
      1. The “Clojure Stack”
      2. The Foundation: Ring
      3. Routing Requests with Compojure
      4. Templating
      5. Final Thoughts
    7. 17. Deploying Clojure Web Applications
      1. Java and Clojure Web Architecture
      2. Running Web Apps Locally
      3. Web Application Deployment
      4. Going Beyond Simple Web Application Deployment
  9. V. Miscellanea
    1. 18. Choosing Clojure Type Definition Forms Wisely
    2. 19. Introducing Clojure into Your Workplace
      1. Just the Facts…
      2. Emphasize Productivity
      3. Emphasize Community
      4. Be Prudent
    3. 20. What’s Next?
      1. (dissoc Clojure 'JVM)
      2. 4Clojure
      3. Overtone
      4. core.logic
      5. Pallet
      6. Avout
      7. Clojure on Heroku
  10. Index
  11. About the Authors
  12. Colophon
  13. SPECIAL OFFER: Upgrade this ebook with O’Reilly


Agents are an uncoordinated, asynchronous reference type. This means that changes to an agent’s state are independent of changes to other agents’ states, and that all such changes are made away from the thread of execution that schedules them. Agents further possess two characteristics that uniquely separate them from atoms and refs:

  1. I/O and other side-effecting functions may be safely used in conjunction with agents.

  2. Agents are STM-aware, so that they may be safely used in the context of retrying transactions.

Agent state may be altered via two functions, send and send-off. They follow the same pattern as other reference state change functions, accepting another function that will determine the agent’s new state that accepts as arguments the agent’s current state along with optional additional arguments to pass to the function.

Taken together, each function + optional set of arguments passed to send or send-off is called an agent action, and each agent maintains a queue of actions. Both send and send-off return immediately after queueing the specified action, each of which are evaluated serially, in the order in which they are “sent,” on one of many threads dedicated to the evaluation of agent actions. The result of each evaluation is installed as the agent’s new state.

The sole difference between send and send-off is the type of action that may be provided to each. Actions queued using send are evaluated within a fixed-size thread pool that is configured to not exceed the parallelizability of the current hardware.[150] Thus, send must never be used for actions that might perform I/O or other blocking operations, lest the blocking action prevent other nonblocking, CPU-bound actions from fully utilizing that resource.

In contrast, actions queued using send-off are evaluated within an unbounded thread pool (incidentally, the same one used by futures), which allows any number of potentially blocking, non-CPU-bound actions to be evaluated concurrently.

Knowing all this, we can get a picture of how agents work in general:

image with no caption

Figure 4-7. Queueing and evaluation of agent actions resulting in state changes

Actions are queued for an agent using either send or send-off (represented in Figure 4-7 as different-colored units of work). The agent applies its state to those actions in order, performing that evaluation on a thread from the pool associated with the function used to queue the action. So, if the black actions are CPU-bound, then threads t2 and t3 are from the dedicated, fixed-size send thread pool, and t9 and t18 are from the unbounded send-off thread pool. The return value of each action becomes the agent’s new state.

While the semantics of agents may be subtle, using them is extraordinarily easy:

(def a (agent 500))                            1
;= #'user/a
(send a range 1000)                            2
;= #<Agent@53d2f8be: 500>
;= (500 501 502 503 504 ... 999)

An agent is created with an initial value of 500.


We send an action to the agent, consisting of the range function and an additional argument 1000; in another thread, the agent’s value will be set to the result of (range @a 1000).

Both send and send-off return the agent involved. When sending actions in the REPL, it is possible that you’ll see the result of the sent action’s evaluation immediately in the printing of the agent; depending on the complexity of the action and how quickly it can be scheduled to be evaluated, it may be complete by the time the REPL has a chance to print the agent returned from send or send-off:

(def a (agent 0))
;= #'user/a
(send a inc)
;= #<Agent@65f7bb1f: 1>

On the other hand, you may find yourself needing the result of a pending action’s evaluation, and polling the agent for the result would be daft. You can block on an agent(s) completing evaluation of all actions sent from your current thread using await:[152]

(def a (agent 5000))
(def b (agent 10000))

(send-off a #(Thread/sleep %))
;= #<Agent@da7d7b5: 5000>
(send-off b #(Thread/sleep %))
;= #<Agent@c0cd75b: 10000>
@a                              1
;= 5000
(await a b)                     2
;= nil
@a                              3
;= nil

The function sent to a will take five seconds to complete, so its value has not been updated yet.


We can use await to block until all of the actions sent to the passed agents from this thread have completed. This particular call will block for up to 10 seconds, since that is how long the function sent to b will take to evaluate.


After await has returned, the sent actions will have been evaluated, and the agent(s) values will have been updated. Note that another action could have modified a’s value before you dereference it!

await-for does the same but allows you to provide a timeout.

Dealing with Errors in Agent Actions

Because agent actions are run asynchronously, an exception thrown in the course of their evaluation cannot be dealt with in the same thread of execution that dispatches the offending action. By default, encountering an error will cause an agent to fail silently: you’ll still be able to dereference its last state, but further actions will fail to queue up:

(def a (agent nil))
;= #'user/a
(send a (fn [_] (throw (Exception. "something is wrong"))))
;= #<Agent@3cf71b00: nil>
;= #<Agent@3cf71b00 FAILED: nil>
(send a identity)                                           1
;= #<Exception java.lang.Exception: something is wrong>

Attempting to send an action to a failed agent will return the exception that caused the failure. If you explicitly want to check for an error, you should use agent-error, which will return the exception or nil if the provided agent isn’t in a failed state.

A failed agent can be salvaged with restart-agent, which will reset the agent’s state to the provided value and enable it to receive actions again. An optional flag to restart-agent, :clear-actions, will clear any pending actions on the agent. Otherwise those pending actions will be attempted immediately.

(restart-agent a 42)
;= 42
(send a inc)                                                       1
;= #<Agent@5f2308c9: 43>
(reduce send a (for [x (range 3)]                                  2
                 (fn [_] (throw (Exception. (str "error #" x))))))
;= #<Agent@5f2308c9: 43>
(agent-error a)
;= #<Exception java.lang.Exception: error #0>
(restart-agent a 42)
;= 42
(agent-error a)                                                    3
;= #<Exception java.lang.Exception: error #1>
(restart-agent a 42 :clear-actions true)                           4
;= 42
(agent-error a)
;= nil

Restarting an agent will reset its failed status and allow it to receive actions again.


However, if an agent’s queue contains other actions that will cause further errors…


…then restart-agent would need to be called once per erroring action.


Adding the :clear-actions option to a restart-agent call will clear the agent’s queue prior to resetting its failed status, ensuring that any doomed actions in the queue will not immediately fail the agent.

This default error-handling mode—where agents drop into a failed status and need to be resuscitated again—is most useful when you can rely upon manual intervention, usually via a REPL.[153] More flexible and potentially hands-off error handling can be had by changing the defaults for each agent as appropriate.

Agent error handlers and modes

The default behavior where an error causes an agent to enter a failed status is one of two failure modes supported by agents. agent accepts an :error-mode option of :fail (the default) or :continue;[154] an agent with a failure mode of :continue will simply ignore an error thrown by the evaluation of an agent action, carrying on with processing any actions in its queue and receiving new actions without difficulty:

(def a (agent nil :error-mode :continue))
;= #'user/a
(send a (fn [_] (throw (Exception. "something is wrong"))))
;= #<Agent@44a5b703: nil>
(send a identity)
;= #<Agent@44a5b703: nil>

This makes restart-agent unnecessary, but dropping errors on the floor by default and without any possible intervention is generally not a good idea. Thus, the :continue error mode is almost always paired with an error handler, a function of two arguments (the agent, and the precipitating error) that is called any time an agent action throws an exception; an error handler can be specified for an agent by using the :error-handler option:[155]

(def a (agent nil
              :error-mode :continue
              :error-handler (fn [the-agent exception]
                               (.println System/out (.getMessage exception)))))
;= #'user/a
(send a (fn [_] (throw (Exception. "something is wrong"))))
;= #<Agent@bb07c59: nil>
; something is wrong
(send a identity)
:= #<Agent@bb07c59: nil>

Of course, far more sophisticated things can be done within an :error-handler function beyond simply dumping information about the exception to the console: some data in the application may be changed to avoid the error, an action or other operation might be retried, or the agent’s :error-mode can be switched back to :fail if you know that shutting down the agent is the only safe course of action:

(set-error-handler! a (fn [the-agent exception]
                        (when (= "FATAL" (.getMessage exception))
                          (set-error-mode! the-agent :fail))))
;= nil
(send a (fn [_] (throw (Exception. "FATAL"))))
;= #<Agent@6fe546fd: nil>
(send a identity)
;= #<Exception java.lang.Exception: FATAL>

I/O, Transactions, and Nested Sends

Unlike refs and atoms, it is perfectly safe to use agents to coordinate I/O and perform other blocking operations. This makes them a vital piece of any complete application that use refs and Clojure’s STM to maintain program state over time. Further, thanks to their semantics, agents are often an ideal construct for simplifying asynchronous processing involving I/O even if refs are not involved at all.

Because agents serialize all actions sent to them, they provide a natural synchronization point for necessarily side-effecting operations. You can set up an agent to hold as its state a handle to some context—an OutputStream to a file or network socket, a connection to a database, a pipe to a message queue, etc.—and you can be sure that actions sent to that agent will each have exclusive access to that context for the duration of their evaluation. This makes it easy to integrate the Clojure sphere—including refs and atoms—which generally aims to minimize side effects with the rest of the world that demands them.

You might wonder how agents could possibly be used from within STM transactions. Sending an agent action is a side-effecting operation, and so would seem to be just as susceptible to unintended effects due to transaction restarts as other side-effecting operations like applying change operations to atoms or writing to a file. Thankfully, this is not the case.

Agents are integrated into Clojure’s STM implementation such that actions dispatched using send and send-off from within the scope of a transaction will be held in reserve until that transaction is successfully committed. This means that, even if a transaction retries 100 times, a sent action is only dispatched once, and that all of the actions sent during the course of a transaction’s runtime will be queued at once after the transaction commits. Similarly, calls to send and send-off made within the scope of evaluation of an agent action—called a nested send—are also held until the action has completed. In both cases, sent actions held pending the completion of an action evaluation or STM transaction may be discarded entirely if a validator aborts the state change associated with either.

To illustrate these semantics and see what they enable, let’s take a look at a couple of examples that use agents to simplify the coordination of I/O operations in conjunction with refs and the STM, and as part of a parallelized I/O-heavy workload.

Persisting reference states with an agent-based write-behind log

The game we developed in The Mechanics of Ref Change using refs to maintain character state in the face of relentlessly concurrent player activity proved the capabilities of Clojure’s STM in such an environment. However, any game like this, especially those providing multiplayer capabilities, will track and store player activity and the impact it has on their characters. Of course, we wouldn’t want to stuff any kind of logging, persistence, or other I/O into the core game engine: any persistence we want to perform may itself end up being inconsistent because of transaction restarts.

The simplest way to address this is to use watchers and agents to implement a write-behind log for characters in the game. First, let’s set up the agents that will hold our output sinks; for this example, we’ll assume that all such agents will contain, the Java interface that defines the API of character output streams:

(require '[ :as io])

(def console (agent *out*))
(def character-log (agent (io/writer "character-states.log" :append true)))

One of these agents contains *out* (itself an instance of Writer), the other a Writer that drains to a character-states.log file in the current directory. These Writer instances will have content written to them by an agent action, write:

(defn write
  [^ w & content]
  (doseq [x (interpose " " content)]
    (.write w (str x)))
  (doto w
    (.write "\n")

write takes as its first argument a Writer (the state of one of the agents it will be queued for), and any number of other values to write to it. It writes each value separated by a space, then a newline, and then flushes the Writer so outputted content will actually hit the disk or console rather than get caught up in any buffers that might be in use by the Writer.

Finally, we need a function that will add a watcher to any reference type, which we’ll use to connect our character refs with the agents that hold the Writer instances:

(defn log-reference
  [reference & writer-agents]
  (add-watch reference :log
             (fn [_ reference old new]
               (doseq [writer-agent writer-agents]
                 (send-off writer-agent write new)))))

Every time the reference’s state changes, its new state will be sent along with our write function to each of the agents provided to log-reference. All we need to do now is add the watcher for each of the characters for which we want to log state changes, and fire up a battle:

(def smaug (character "Smaug" :health 500 :strength 400))
(def bilbo (character "Bilbo" :health 100 :strength 100))
(def gandalf (character "Gandalf" :health 75 :mana 1000))

(log-reference bilbo console character-log)
(log-reference smaug console character-log)

(wait-futures 1
              (play bilbo attack smaug)
              (play smaug attack bilbo)
              (play gandalf heal bilbo))

; {:max-health 500, :strength 400, :name "Smaug", :items #{}, :health 490.052618}
; {:max-health 100, :strength 100, :name "Bilbo", :items #{}, :health 61.5012391}
; {:max-health 100, :strength 100, :name "Bilbo", :items #{}, :health 100.0}      1
; {:max-health 100, :strength 100, :name "Bilbo", :items #{}, :health 67.3425151}
; {:max-health 100, :strength 100, :name "Bilbo", :items #{}, :health 100.0}
; {:max-health 500, :strength 400, :name "Smaug", :items #{}, :health 480.990141}
; ...

You can see the healing effects of Gandalf made concrete each time Bilbo’s :health goes up in the log.

You’ll find this same content in the character-states.log file as well. Fundamentally, we’re logging states to the console and a file because they’re the most accessible sinks; this approach will work just as well if you were to stream updates to a database, message queue, and so on.

Using a watcher like this gives us the opportunity to make each state change to our characters’ refs durable (e.g., by writing them to disk or to a database) without modifying the functions used to implement those changes.

In order to track and persist in-transaction information—like the amount of each attack and heal, who did what to whom, and so on—we just need to dispatch a write action to our writer agents within the body of any function that makes a change we might want to persist:

(defn attack
  [aggressor target]
    (let [damage (* (rand 0.1) (:strength @aggressor) (ensure daylight))]
      (send-off console write
        (:name @aggressor) "hits" (:name @target) "for" damage)
      (commute target update-in [:health] #(max 0 (- % damage))))))

(defn heal
  [healer target]
    (let [aid (min (* (rand 0.1) (:mana @healer))
                   (- (:max-health @target) (:health @target)))]
      (when (pos? aid)
        (send-off console write
          (:name @healer) "heals" (:name @target) "for" aid)
        (commute healer update-in [:mana] - (max 5 (/ aid 5)))
        (alter target update-in [:health] + aid)))))

  (alter smaug assoc :health 500)
  (alter bilbo assoc :health 100))
; {:max-health 100, :strength 100, :name "Bilbo", :items #{}, :health 100}
; {:max-health 500, :strength 400, :name "Smaug", :items #{}, :health 500}

(wait-futures 1
              (play bilbo attack smaug)
              (play smaug attack bilbo)
              (play gandalf heal bilbo))
; {:max-health 500, :strength 400, :name "Smaug", :items #{}, :health 497.414581}
; Bilbo hits Smaug for 2.585418463393845
; {:max-health 100, :strength 100, :name "Bilbo", :items #{}, :health 66.6262521}
; Smaug hits Bilbo for 33.373747881474934
; {:max-health 500, :strength 400, :name "Smaug", :items #{}, :health 494.667477}
; Bilbo hits Smaug for 2.747103668676348
; {:max-health 100, :strength 100, :name "Bilbo", :items #{}, :health 100.0}
; Gandalf heals Bilbo for 33.37374788147494
; ...

The end result of composing these small pieces together with our character refs is a fire-and-forget persistence mechanism that is safe to use in conjunction with the retries that are inevitable when using atoms and transactions over refs. We wrote to the console and a logfile to keep the example simple, but you can just as easily write ref state updates to a database. In any case, this demonstrates how, just as with the general usage of atoms and refs, even things like sharing I/O resources within a concurrent environment can be done without touching a single low-level lock and taking on the risks inherent in their management.

Using agents to parallelize workloads

It may initially seem unnecessary or inconvenient to have to segregate agent actions into two sorts. However, without the separation between blocking and nonblocking actions, agents would lose their ability to efficiently utilize the resources needed to service the different kinds of workloads—CPU, disk I/O, network throughput, and so on.

For example, say our application was dedicated to processing messages pulled from a queue; reading messages from the queue would likely be a blocking operation due to waiting on the network if the queue was not in-process, and depending on the semantics of the queue, waiting for work to be available. However, processing each message is likely to be CPU-bound.

This sounds a lot like a web crawler. Agents make building one that is efficient and flexible quite easy. The one we’ll build here will be extraordinarily basic,[156] but it will demonstrate how agents can be used to orchestrate and parallelize potentially very complicated workloads.

First, we need some basic functions for working with the content of web pages we crawl. links-from takes a base URL and that URL’s HTML content, returning a seq of the links found within that content; words-from takes some HTML content and extracts its text, returning a seq of the words found therein, converted to lowercase:

(require '[net.cgrand.enlive-html :as enlive])
(use '[clojure.string :only (lower-case)])
(import '( URL MalformedURLException))

(defn- links-from
  [base-url html]
  (remove nil? (for [link (enlive/select html [:a])]
                 (when-let [href (-> link :attrs :href)]
                     (URL. base-url href)
                     ; ignore bad URLs
                     (catch MalformedURLException e))))))

(defn- words-from
  (let [chunks (-> html
                 (enlive/at [:script] nil)
                 (enlive/select [:body enlive/text-node]))]
    (->> chunks
      (mapcat (partial re-seq #"\w+"))
      (remove (partial re-matches #"\d+"))
      (map lower-case))))

This code uses Enlive, a web templating and scraping library that we discuss in detail in Enlive: Selector-Based HTML Transformation, but its details aren’t key to our main focus, the use of agents to soak up all of the resources we have to maximize crawling throughput.

There will be three pools of state associated with our web crawler:

  1. One of Java’s thread-safe queues will hold URLs that are yet to be crawled, which we’ll call url-queue. Then, for each page we retrieve, we will…

  2. Find all of the links in the page so as to crawl them later; these will all be added to a set held within an atom called crawled-urls, and URLs we haven’t visited yet will be queued up in url-queue. Finally…

  3. We’ll extract all of the text of each page, which will be used to maintain a count of cumulative word frequencies observed throughout the crawl. This count will be stored in a map of words to their respective counts, held in an atom called word-freqs:

(def url-queue (LinkedBlockingQueue.))
(def crawled-urls (atom #{}))
(def word-freqs (atom {}))

We’ll set up a bunch of agents in order to fully utilize all the resources we have available,[157] but we need to think through what state they’ll hold and what actions will be used to transition those states. In many cases, it is useful to think about agent state and the actions applied to it as forming a finite state machine; we’ve already walked through the workflow of our crawler, but we should formalize it.

image with no caption

Figure 4-8. A web crawler’s primary state transitions

The state of an agent at each point in this process should be obvious: prior to retrieving a URL, an agent will need a URL to retrieve (or some source of such URLs); prior to scraping, an agent will need a page’s content to scrape; and, prior to updating the cumulative crawler state, it will need the results of scraping. Since we don’t have very many potential states, we can simplify things for ourselves by allowing each action implementing these transitions to indicate the next action (transition) that should be applied to an agent.

To see what this looks like, let’s define our set of agents; their initial state, corresponding with the initial state preceding the “Retrieve URL” transition in Figure 4-8, is a map containing the queue from which the next URL may be retrieved, and the next transition itself, a function we’ll call get-url:

(declare get-url)

(def agents (set (repeatedly 25 #(agent {::t #'get-url :queue url-queue})))) 1

Our agents’ state will always have a ::t slot,[158] mapped to the next function that implements the next transition.[159]

The three transitions shown in Figure 4-8 are implemented by three agent actions: get-url, process, and handle-results.

get-url will wait on a queue (remember, each of our agents has url-queue as its initial value) for URLs to crawl. It will leave the state of the agent set to a map containing the URL it pulls off the queue and its content:

(declare run process handle-results)           1

(defn ^::blocking get-url
  [{:keys [^BlockingQueue queue] :as state}]
  (let [url (as-url (.take queue))]
      (if (@crawled-urls url)                  2
        {:url url
         :content (slurp url)
         ::t #'process})
      (catch Exception e
        ;; skip any URL we failed to load
      (finally (run *agent*)))))

We’ll show run and explain what it’s doing in a little bit.


If we’ve already crawled the URL pulled off the queue (or if we encounter an error while loading the URL’s content), we leave the agent’s state untouched. This implementation detail in our finite state machine adds a cycle to it where get-url will sometimes be invoked on a single agent multiple times before it transitions states.

process will parse a URL’s content, using the links-from and words-from functions to obtain the URL’s content’s links and build a map containing the frequencies of each word found in the content. It will leave the state of the agent set to a map containing these values as well as the originating URL:

(defn process
  [{:keys [url content]}]
    (let [html (enlive/html-resource ( content))]
      {::t #'handle-results
       :url url
       :links (links-from url html)
       :words (reduce (fn [m word]
                        (update-in m [word] (fnil inc 0)))            1
                      (words-from html))})
    (finally (run *agent*))))

The :words map associates found words with their count within the retrieved page, which we produce by reducing a map through the seq of those words. fnil is a HOF that returns a function that uses some default value(s) (here, 0) in place of any nil arguments when calling the function provided to fnil; this keeps us from having to explicitly check if the value in the words map is nil, and returning 1 if so.

handle-results will update our three key pieces of state: adding the just-crawled URL to crawled-urls, pushing each of the newfound links onto url-queue, and merging the URL’s content’s word frequency map with our cumulative word-freqs map. handle-results returns a state map containing url-queue and the get-url transition, thus leaving the agent in its original state.

(defn ^::blocking handle-results
  [{:keys [url links words]}]
    (swap! crawled-urls conj url)
    (doseq [url links]
      (.put url-queue url))
    (swap! word-freqs (partial merge-with +) words)

    {::t #'get-url :queue url-queue}
    (finally (run *agent*))))

You may have noticed that each of the functions we’ll use as agent actions has a try form with a finally clause that contains a sole call to run with *agent* as its sole argument.[160] We didn’t define *agent* anywhere; usually unbound, it is a var provided by Clojure that, within the scope of the evaluation of an agent action, is bound to the current agent. So, (run *agent*) in each of these actions is calling run with a single argument, the agent that is evaluating the action.

This is a common idiom used with agents that allow them to run continuously. In our web crawler’s case, run is a function that queues up the next transition function to be applied to an agent based on that agent’s ::t state. If each action already knows which transition function should be applied next, why add a level of indirection in calling run? Two reasons:

  1. While it is reasonable to expect each function used as an agent action to know what the next transition should be given the new agent state it is returning, there’s no way for it to know whether that next transition is a blocking action or not. This is something that is best left up to the transitions themselves (and their informed authors); thus, run will use the presence (or, absence) of ::blocking metadata on each transition to determine whether to use send or send-off to dispatch transition functions.[161]

  2. run can check to see if the agent has been marked as being paused—a condition indicated simply by the presence of a logically true ::paused value in the agent’s metadata.

Example 4-8. run, the web crawler’s “main loop”

(defn paused? [agent] (::paused (meta agent)))

(defn run
  ([] (doseq [a agents] (run a)))
    (when (agents a)
      (send a (fn [{transition ::t :as state}]
                (when-not (paused? *agent*)
                  (let [dispatch-fn (if (-> transition meta ::blocking)
                    (dispatch-fn *agent* transition)))

run doubles as a way to start all of our (unpaused) agents when called with no arguments.

The pausing capability is particularly important, as we wouldn’t want to have the crawler run without interruption. With the use of metadata to indicate that run should not dispatch the next transition for an agent’s state, pause and restart give us a way to pause or restart the agents’ operation just from changing their metadata:

(defn pause
  ([] (doseq [a agents] (pause a)))
  ([a] (alter-meta! a assoc ::paused true)))

(defn restart
  ([] (doseq [a agents] (restart a)))
    (alter-meta! a dissoc ::paused)
    (run a)))

Now we can crawl some web pages! We’ll want to run the crawler repeatedly from a fresh state, so it will be handy to have a testing function that will reset the crawler state. test-crawler does this, as well as adding a starting URL to url-queue, and letting the agents run for just 60 seconds so we can make some informal throughput comparisons:

(defn test-crawler
  "Resets all state associated with the crawler, adds the given URL to the
   url-queue, and runs the crawler for 60 seconds, returning a vector
   containing the number of URLs crawled, and the number of URLs
   accumulated through crawling that have yet to be visited."
  [agent-count starting-url]
  (def agents (set (repeatedly agent-count 
                     #(agent {::t #'get-url :queue url-queue}))))  1
  (.clear url-queue)
  (swap! crawled-urls empty)
  (swap! word-freqs empty)
  (.add url-queue starting-url)
  (Thread/sleep 60000)
  [(count @crawled-urls) (count url-queue)])

We warned you against redefining vars within the body of a function in Vars Are Not Variables, but this may be one of the few contexts where doing so is okay: a function that is never called outside of the REPL, used solely for experimentation and testing.

To establish a baseline, let’s first try it with a single agent, using the BBC’s news page as a crawl root:

(test-crawler 1 "")
;= [86 14598]

Eighty-six pages crawled in a minute. Surely we can do better; let’s use 25 agents, which will parallelize both the blocking retrieval workload as well as the CPU-bound scraping and text processing workload:

(test-crawler 25 "")
;= [670 81775]

Not bad, 670 pages crawled in 60 seconds, a very solid order of magnitude gained simply by tweaking the number of agents being applied to the problem.[162]

Let’s check on the word frequencies being calculated. We can get a selection of the most and least common terms with their frequencies quite easily:

(->> (sort-by val @word-freqs)
  (take 10))
;= (["the" 23083] ["to" 14308] ["of" 11243] ["bbc" 10969] ["in" 9473]
;=  ["a" 9214] ["and" 8595] ["for" 5203] ["is" 4844] ["on" 4364])
(->> (sort-by val @word-freqs)
  (take 10))
;= (["relieved" 1] ["karim" 1] ["gnome" 1] ["brummell" 1] ["mccredie" 1]
;=  ["ensinar" 1] ["estrictas" 1] ["arap" 1] ["forcibly" 1] ["kitchin" 1])

Looks like we have a fully functioning crawler that does some marginally interesting work. It’s surely not optimal—as we’ve said, it’s quite basic, and would need a variety of subtle enhancements in order to be used at scale, but the foundation is clearly there.

Now, remember what we were saying earlier in this section, that the division of agent actions into those that may block (due to I/O or other wait conditions) and those that won’t (i.e., CPU-bound processing) enables the maximal utilization of all of the resources at our disposal. We can test this; for example, by marking process as a blocking operation, we will ensure that it is always sent to agents using send-off, and thus handled using the unbounded thread pool:

(alter-meta! #'process assoc ::blocking true)
;= {:arglists ([{:keys [url content]}]), :ns #<Namespace user>, 
;=  :name process, :user/blocking true}

The practical effect of this is that all of the HTML parsing, searching for links, and text processing associated with the word frequency calculations will happen without limit.

(test-crawler 25 "")
;= [573 80576]

This actually has a negative impact on throughput—approaching 15 percent overall—as now there can be up to 25 active (and hungry) agents contending for CPU cores, which can cumulatively slow our CPU-bound workload.

[150] For example, a two-core CPU will have a send thread pool configured to contain a maximum of four threads, a four-core CPU will have a pool of eight threads, etc.

[151] Many thousands of agents may be created without strain with a default heap configuration; millions may be created by tweaking the JVM’s heap settings.

[152] It is an implementation detail—and so may change in the future—but you can call (.getQueueCount some-agent) in order to check the current size of an some-agent’s action queue.

[153] That is, via a REPL connected to your environment, wherever it may be; see Debugging, Monitoring, and Patching Production in the REPL.

[154] You can change an agent’s error mode with set-error-mode!.

[155] You can change an agent’s error handler with set-error-handler!.

[156] And not very well-behaved, especially insofar as it doesn’t throttle connections, a key point of politeness when crawling web content. Our apologies to the BBC for (ab)using them as an example crawl root!

[157] Another shortcoming of our basic web crawler: at nearly any scale, a useful web crawler would use a message queue and suitable database instead of maintaining all state in memory. This thankfully does not affect the semantics of our example, which could be adapted to use such things with relative ease.

[158] We use a namespaced keyword to avoid any potential naming clashes with other parts of state that might be added to the agents, if this crawler implementation were to ever be extended outside of its own namespace.

[159] Depending on the range of states that are to be held by your agents, sending a multimethod or protocol function to them can be an elegant, efficient option to discriminate between a number of different potential agent state transitions. We talk about multimethods and protocols in Chapter 7 and Protocols, respectively.

[160] We’ve repeated this pattern in three functions at this point; any more, and it would be a no-brainer to write a macro that would eliminate that boilerplate.

[161] This metadata access explains in part why we’re using the functions’ vars to denote transitions instead of the functions themselves. Beyond that, using vars helps make it easier to modify the behavior of the web crawler; see Limitations to Redefining Constructs for why.

[162] Of course, your specific results will vary greatly depending upon the CPU power you have available and the speed and latency of your Internet connection; however, the relative improvement from 1 to 25 agents should be similar.

The best content for your career. Discover unlimited learning on demand for around $1/day.