Coordinate git push from different threads with Clojure

Hello!

I want my server to push data with git. I’ll be calling out to git from clojure.java.io/sh. When everything is synchronized, all is well. But when I try to solve the problem concurrently, my head goes spinning.

:loudspeaker: Beware: beginner mistakes in concurrency are likely to follow! :loudspeaker:

First draft: synchronize with locks

Plan:

  • Use a lock for commits and a lock for pushes
  • Wait for the lock before doing any work
(require
 '[clojure.java.shell :refer [sh]]
 '[clojure.string :as string])

(defn- unstaged-stuff? [directory]
  (let [git-status (-> (sh "git" "status" "--porcelain"
                           :dir directory)
                       :out)]
    (if-not (string/blank? git-status)
      git-status
      nil)))

(defn- now []
  (-> (sh "date" "+%Y-%m-%d %H:%M:%S")
      :out
      clojure.string/trim))

(let [commit-lock (Object.)]
  (defn stage-and-commit! [directory]
    (locking commit-lock
      (when (unstaged-stuff? directory)
        (clojure.java.shell/with-sh-dir directory
          ;; Wrap in bash to ensure that Git auth works as expected
          (sh "bash" "-c"
              "git add .")
          (sh "bash" "-c"
              (str "git commit -m "
                   "\"Autocommit @ "
                   (now)
                   "\"")))))))

(let [push-lock (Object.)]
 (defn push! [lock directory]
  (locking push-lock
    (sh "bash" "-c"
        "git push"
        :dir directory))))

Problem with first draft

Running the commits in sequence is fine.

Running pushes in sequence is potentially really stupid. If i make a 100 commits, I would rather not have 100 pushes happen. One push takes about three seconds, and if I get one write every second, I would be pushing all the time when running the server. Bad programmer!

Idea: don’t queue more work when there are already pushes queued

Here’s what I’m thinking:

  • No push queued: start a push
  • Push running: enque another push, to ensure that the latest writes come along
  • Push alreade enqueued: do nothing.

I’ve tried implementing this with a lock and an atom counting the number of running pushes:

(defn make-synchronizer
  "Create a handler for submitting new work, and limit waiting work queue to 1"
  []
  (let [work-counter (atom 0)
        lock (Object.)]
    (fn add-work [f]
      (future
        (cond
          ;; Empty queue or one in queue: allow adding more work
          (<= @work-counter 1) (do
                                 (swap! work-counter inc)
                                 (f)
                                 (swap! work-counter dec))
          ;; Otherwise, do nothing.
          :else nil)))))

(let [synchronizer (make-synchronizer)]
  (doseq [i (range 6)]
    (Thread/sleep 10)
    (synchronizer #(do (prn :start i)
                       (Thread/sleep 400)
                       (prn :done i)))))
;; Prints:
;;  :start 0
;;  :start 1
;;  :done 0
;;  :done 1

Problem: Worker 1 starts before worker 0 is done!

This isn’t what I want! There’s also a race condition, between (<= @work-counter 1) and (swap! work-counter inc). Also, I’m not using my lock, because I couldn’t see how I could use it. But I think I need it. So it’s there!

Any suggestions?

Fancy solutions with refs, agents or other concepts I haven’t fully grasped are much welcome!

If the answer is “read the F manual”, I’ll be glad to. I’ve had a look at the resources at Clojure.org, but I’m missing a step in between. In which case, please provide a link :slight_smile:

Thanks!

Teodor

There is a nice locking macro availiable in clojure core.

One way to queue up various operations is through an agent, to which you send various function calls. You could store some timing state in the agent which could help the function call you sent to it do determine wether it should also make a push. However, this wont solve the problem where new commits arrive while you are pushing.

The decision to create a push needs some more “locking-state” than just a lock.

One way to achieve that would be to use core.async, a nice tutorial is avail at https://github.com/halgari/clojure-conj-2013-core.async-examples.

In this particular case, I guess you would like to do pushes at most every minute or so, and make them at once if there is no queued up commit operations left.

The nice thing here is that you can create code that works somewhat like a state machine, for instance

Look for commit-“requests” in one exposed API-channel or timed out push-instructions on the push-channel, which ever comes first. (alt! )

This blocks until something happends.

If there are requests on the commit-channel, start an inner loop that drains them. When the loop is finished, add a push-request to the internal push-channel and recur to the outer loop.

The above mentioned receipe would look something like

(def commit-request-chan (chan 100))
(def internal-push-chan (dropping-buffer 1))

(go (loop [[value port] (alt! commit-request-chan]
(if (= port commit-request-chan)
(do (drain-the-chan and don’t forget the first value that we took)
(!> internal-push-chan :push-things))
;; we are in push chan (= port push-chan)
(do-the-push))
(recur))

The code above is very sketchy and wont execute.

The commit and push operations must catch their own errors or the loop will terminate silently.

One problem with this solution is that it wont block other changes to the file system, which makes it hard to determine what should be commited in which commit. I guess you really would like to have some better control over this as well…

1 Like

Hello, @Linus_Ericsson! Thanks for the suggestion on core.async. The examples from the talk were very helpful. clojure.core.async/dropping-buffer seems like what I needed.

And thanks for the formatting, @martinklepsch!

2 Likes

This topic was automatically closed 182 days after the last reply. New replies are no longer allowed.