I followed the example of tcp, but close the connection by sever when response is successful.
(defn short-connection-echo-handler
[f]
(fn [s info]
(println "show sink:" s)
;; take a message, and define a default value that tells us if the connection is closed
(-> (s/take! s ::none)
(d/chain
; first, check if there even was a message, and then transform it on another thread
(fn [msg]
(if (= ::none msg)
::none
(d/future (f msg))))
;; once the transformation is complete, write it back to the client
(fn [msg']
(when-not (= ::none msg')
(s/put! s msg')))
; if we were successful in our response, close the connection
(fn [result]
(when result
(s/close! s))))
;; if there were any issues on the far end, send a stringified exception back
;; and close the connection
(d/catch
(fn [ex]
(s/put! s (str "ERROR: " ex))
(s/close! s))))))
Then, I found that a lot of “ESTABLISHED” left by “netstat”. So I add a “println” to show at line 4.
Now I knows that: the sink of client connection and data transfer is different, the port of :remote-address below not the same. I just close the transfer connection.
when connection established: << stream: {:type splice, :sink {:type netty, :closed? false, :sink? true, :connection {:local-address /127.0.0.1:8295, :remote-address /127.0.0.1:63583, :writable? true, :readable? true, :closed? false, :direction :outbound}}, :source {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type netty, :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :connection {:local-address /127.0.0.1:8295, :remote-address /127.0.0.1:63583, :writable? true, :readable? true, :closed? false, :direction :inbound}, :direction :inbound, :source? true}} >>
when data transfer: << stream: {:type splice, :sink {:type netty, :closed? false, :sink? true, :connection {:local-address /127.0.0.1:8295, :remote-address /127.0.0.1:63588, :writable? true, :readable? true, :closed? false, :direction :outbound}}, :source {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type netty, :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :connection {:local-address /127.0.0.1:8295, :remote-address /127.0.0.1:63588, :writable? true, :readable? true, :closed? false, :direction :inbound}, :direction :inbound, :source? true}} >>
I want to know how to close! the sink when connection established, thanks a lot.