reddit stuff for clojure



(this space intentionally left almost blank)

Reddit Scrape Scheduler

Simple task scheduling system to fire off jobs to scrape reddit listings and dump them into a Couchbase bucket.

Stores its state in Couchbase so it will restart unfinished tasks if it dies.

(ns reddalyzr.grabulator
  (:use [ :only (at now)])
  (:require [reddalyzr.reddit :as reddit]
            [reddalyzr.loader :as loader]
            [ :as at-]
            [taoensso.timbre :as log]
            [cheshire.core :as json]
            [clojurewerkz.spyglass.couchbase :as cb]
            [clojurewerkz.spyglass.client :as spy]
            [clj-http.client :as http]))
(def aapool (at-/mk-pool))
(def cb-rest-addr (get (System/getenv) "COUCHBASE_URI" ""))
(def cb-bucket (get (System/getenv) "COUCHBASE_BUCKET" "default"))
(def cb-bucket-password (get (System/getenv) "COUCHBASE_BUCKET_PASSWORD" ))
(def statekey "reddalyzr_grabulator_state")
(def grabulator-config (atom {}))
(def persisted-state (agent {}))
(def initial-state {:kind :configuration
                    :scheduled-tasks {}
                    ;; Grab 300 most recent items every minute
                    :periodic {:all-new [60 "grab" ["r/all/new/" 300 {:query-params {:sort "new"}}]]}})

Use a watch function on the persisted-state agent to store its value into the Couchbase DB every time it changes

(defn- persist-state [_key _atom oldval newval]
  (when (not= oldval newval) (spy/set (:cb-conn @grabulator-config) statekey 0 (json/encode newval))))
(defn- grab-task [path limit & [opts]]
  (log/info "Loading from path" (str "\ path "\) "limit" limit opts)
  (loader/load-reddit (:cb-conn @grabulator-config) path limit opts))
(def tasks
  {:grab grab-task})
(defn r-munge [path] (.replace path \/ \_))
(defn- drop-task [pstate tk]
  (log/info "Task" tk "completed")
  (update-in pstate [:scheduled-tasks] dissoc tk))
(defn- prepare-to-fire [pstate]
  (let [mysid (:sid @grabulator-config)]
    (reduce (fn [s [tk t]]
              (if (not= mysid (:sid t))
                (do (at (:time t) #(do (apply (tasks (keyword (:type t))) (:parm t))
                                       (send persisted-state drop-task tk))
                    (log/info "Scheduled task" tk "to fire at" (:time t))
                    (assoc-in s [:scheduled-tasks tk :sid] mysid))
                s)) pstate (:scheduled-tasks pstate))))
(defn- sched-task [pstate id [time type parm]]
  (let [prex ((:scheduled-tasks pstate) id)
        ntime (+ (* 1000 time) (now))]
    (send persisted-state prepare-to-fire)
    (if (nil? prex)
      (assoc-in pstate [:scheduled-tasks id] {:time ntime :type type :parm parm})

Immediately start scraping the reddit listing at path, dumping up to limit items into the DB

(defn grab-path
  [path & [limit opts]]
  (send persisted-state sched-task
        (keyword (str "fetch-" (r-munge path)))
        [0 "grab" [(str path) (or limit 10000) opts]]))

Set up a periodic schedule to scrape the reddit lisitng at path, dumping limit items into the DB every period seconds.

(defn sched-periodic-grab
  [path period & [limit opts]]
  (send persisted-state assoc-in [:periodic (keyword (str "fetch-" (r-munge path)))]
        [period "grab" [path (or limit 500) opts]]))

Periodically check our list of periodic tasks, and if they don't have an instance in the list of scheduled tasks, schedule them.

(defn- heartbeat [pstate]
  (log/debug "Grabulator heartbeat.")
  ;; Reschedule repeating tasks
  (doseq [[tk tv] (:periodic pstate)] (send persisted-state sched-task tk tv))
  ;; Schedule next heartbeat
  (when (:heartbeat @grabulator-config) (at (+ (now) 5000) #(send persisted-state heartbeat) aapool))
(defn startup []
  (reset! grabulator-config {:cb-conn (cb/connection [cb-rest-addr] cb-bucket cb-bucket-password)
                             :heartbeat true
                             :sid (now)})
  (let [cfg @grabulator-config
        conn (:cb-conn cfg)
        loadps (json/parse-string (or (spy/get conn statekey) "{}") true)]
    (add-watch persisted-state "persistor" persist-state)
    (if (= loadps {})
      (do (log/warn "No state record found in Couchbase" loadps)
          (send persisted-state (fn [_] initial-state)))
      (send persisted-state (fn [_] loadps)))
      (send persisted-state heartbeat)))
(defn start-file-logging [filename]
  (let [stdoutfn (get-in @log/config [:appenders :standard-out :fn])
        fappender {:doc "Prints to file" :min-level nil :enabled? true :async? false
                   :max-message-per-msecs nil
                   :fn (fn [logdata] (spit filename (with-out-str (stdoutfn logdata)) :append true))}]
    (log/set-config! [:appenders :file-appender] fappender)))
(defn shutdown []
  (swap! grabulator-config dissoc :heartbeat)
  (remove-watch persisted-state "persistor")
  (at-/stop-and-reset-pool! aapool))
(ns reddalyzr.loader
  (:require [clojurewerkz.spyglass.client :as spyc]
            [cheshire.core :as json]
            [reddalyzr.reddit :as reddit]))

Load items from a reddit listing into a memcached or Couchbase server

(defn load-reddit
  [conn subreddit & [limit opts]]
  (let [listing (reddit/listing subreddit opts)]
    (doseq [link (if (nil? limit) listing (take limit listing))]
      (.set conn (:id link) 0 (json/encode link)))))
(defn dev-conn [] (spyc/bin-connection ""))

Reddit API Utilities

(ns reddalyzr.reddit
  (:require [clj-http.client :as http]
            [taoensso.timbre :as log]
            [clojure.walk :as walk]))

Reddit will always give us what the reddit docs refer to as a "thing". Types of things are documented here

We'll handle a couple types of things and transform them to be more easily used by all the clojure data processing tools. The "Listing" type for example, would be more useful if it was just a list of the items it contained. We use clojure metadata to attach the original metadata so we can still get to it later.

(letfn [(remap-kind [el kind]
          (let [data (:data el)
                m (merge (dissoc el :data) {:kind kind})]
            (with-meta (merge data {:kind kind}) m)))]
  (defn- rdt-transform [el]
    (if (map? el)
      (condp = (:kind el)
        "Listing" (let [data (:data el)
                        items (:children data)
                        m (merge (dissoc el :data) (dissoc data :children) {:kind :listing})]
                    (with-meta items m))
        "t3" (remap-kind el :link)
        "t1" (remap-kind el :comment)
        "t5" (remap-kind el :subreddit)
(def reddit-base "")
(def ^:private lrqtime (agent 0))

If minwait milliseconds have not passed since the timestamp in timeagent, sleep for the difference. Always update time agent with the current timestamp.

(defn- rlimit
  [minwait timeagent]
  (send-off timeagent (fn [lt]
                        (let [now (System/currentTimeMillis)]
                          (when (< now (+ lt minwait))
                            (Thread/sleep (- (+ lt minwait) now)))
  (await timeagent))

Use clj-http to make a request to a reddit resource. Will append .json to get JSON data, and rate-limit to one request per two seconds, per reddit API guidelines.

(defn request
  [& [path opts]]
  (rlimit 2000 lrqtime) ;rate limit, 1 request per 2 seconds per reddit guidelines
  (log/info "Reddit request" path opts)
  (:body (http/request (merge-with merge
                                   {:method :get
                                    ; descriptive user-agent per reddit guidelines
                                    :headers {"User-Agent" "reddalyzr.clj by /u/apage43"}
                                    :url (str reddit-base "/" path ".json")
                                    :as :json} opts))))

Request as with request, but also transform the things to something more clojure friendly, attaching metadata as clojure metadata.

(defn request-xf
  [& [path opts]]
  (walk/postwalk rdt-transform (request path opts)))

Given a listing and the path that was used to retrieve it, create a lazy sequence that will request the next page of items, if there is another page after the current one.

(defn- listing-seq [path listing opts]
  (let [m (meta listing)
        after (:after m)]
    (if (not= (:kind m) :listing) (throw (ex-info "Not a listing" {:obj listing})))
    (lazy-cat listing
              (if (nil? after) []
                  (listing-seq path (request-xf path (merge-with merge {:query-params {:after after :limit 100}} opts)) opts)))))

A lazy sequence of all the items that would be on the listing page at path. (take 50 (listing "r/gaming"))

(defn listing
  [path & [opts]]
  (listing-seq path (request-xf path (merge-with merge {:query-params {:limit 100}} opts)) opts))

Get a thing by its id

(defn thing
  [id & [opts]]
  (request-xf (str "by_id/" id) opts))
(defn hour-freqs [listing-path amount]
   (map (fn [x] (-> x
                    (* 1000)
                    .getHours)) (take amount (listing listing-path)))))
(defn print-hour-freqs [hh]
  (let [mv (apply max (vals hh))]
    (doseq [[k v] (sort hh)]
      (println (str (if (> 10 k) " " ) k " " (apply str (repeat (* (/ 30 mv) v) "*")))))))
(ns reddalyzr.startup
  (:require [taoensso.timbre :as log]
            [reddalyzr.reddit :as reddit]
            [reddalyzr.grabulator :as g]
            [ :as rs]))
(def server (atom nil))
(defn -main []
  (let [replport (Integer/parseInt (get (System/getenv) "REPL_PORT" "0"))]
    (log/set-level! :info)
    (when (not= 0 replport)
      (log/info "Start REPL Server" @(reset! server (rs/start-server :port replport))))