How to close the "ESTABLISHED" connection by aleph?

I followed the example of tcp, but close the connection by sever when response is successful.

 (defn short-connection-echo-handler
  [f]
  (fn [s info]
    (println "show sink:" s)
    ;; take a message, and define a default value that tells us if the connection is closed
    (-> (s/take! s ::none)
        (d/chain
          ; first, check if there even was a message, and then transform it on another thread
          (fn [msg]
            (if (= ::none msg)
              ::none
              (d/future (f msg))))

          ;; once the transformation is complete, write it back to the client
          (fn [msg']
             (when-not (= ::none msg')
                   (s/put! s msg')))

          ; if we were successful in our response, close the connection
          (fn [result]
            (when result
              (s/close! s))))

        ;; if there were any issues on the far end, send a stringified exception back
        ;; and close the connection
        (d/catch
          (fn [ex]
            (s/put! s (str "ERROR: " ex))
            (s/close! s))))))

Then, I found that a lot of “ESTABLISHED” left by “netstat”. So I add a “println” to show at line 4.
Now I knows that: the sink of client connection and data transfer is different, the port of :remote-address below not the same. I just close the transfer connection.

when connection established: << stream: {:type splice, :sink {:type netty, :closed? false, :sink? true, :connection {:local-address /127.0.0.1:8295, :remote-address /127.0.0.1:63583, :writable? true, :readable? true, :closed? false, :direction :outbound}}, :source {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type netty, :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :connection {:local-address /127.0.0.1:8295, :remote-address /127.0.0.1:63583, :writable? true, :readable? true, :closed? false, :direction :inbound}, :direction :inbound, :source? true}} >>

when data transfer: << stream: {:type splice, :sink {:type netty, :closed? false, :sink? true, :connection {:local-address /127.0.0.1:8295, :remote-address /127.0.0.1:63588, :writable? true, :readable? true, :closed? false, :direction :outbound}}, :source {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type netty, :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :connection {:local-address /127.0.0.1:8295, :remote-address /127.0.0.1:63588, :writable? true, :readable? true, :closed? false, :direction :inbound}, :direction :inbound, :source? true}} >>

I want to know how to close! the sink when connection established, thanks a lot.

1 Like

I did it by other method: just insert connection and create time in an agent. and when new connection created, clear the connection which over timed by (send).

1 Like