美文网首页
clojure&clojurescript前后端实现websoc

clojure&clojurescript前后端实现websoc

作者: 小马将过河 | 来源:发表于2019-10-31 00:28 被阅读0次

    服务端

    服务端配置websocket相对来说挺简单,因为我们项目当初new的时候没有加websocket,现在是参考luminusweb websocket往里增加关键代码

      1. 创建ws-route
        创建如下route,并跟普通http api route一样加入handler序列中。
    (ns alk-wxapi.routes.websockets
      (:require
       [alk-wxapi.routes.service.ws-service :refer [ws-handler countdown]]))
    
    (defn websocket-routes []
      ["/ws"
       {:swagger {:tags ["websocket"]}}
    
       [""
        {:get {:summary    "websocket 入口"
               :parameters {}
               :handler    (fn [request]
                             (ws-handler request))}}]
       ])
    
      1. 创建ws-service
        即上面require的ws-service,内容如下:
    (ns alk-wxapi.routes.service.ws-service
      (:require
       [clojure.tools.logging :as log]
       [immutant.web.async :as async]
       [immutant.web.sse :as sse]))
    
    (defonce channels (atom #{}))
    
    (defn broadcast-msg
      "广播消息"
      [data]
      (log/info "广播消息:" data)
      (doseq [channel @channels]
        (async/send! channel (str data))))
    
    (defn notify-clients! [channel data]
      (log/info (java.util.Date.) "收到客户端发送的message:" data)
      (async/send! channel (str {:message data
                                 :type "reply"})))
    
    (defn connect! [channel]
      (log/info "channel open")
      (swap! channels conj channel))
    
    (defn disconnect! [channel {:keys [code reason]}]
      (log/info "close code:" code "reason:" reason)
      (swap! channels #(remove #{channel} %)))
    
    (def websocket-callbacks
      "WebSocket callback functions"
      {:on-open connect!
       :on-close disconnect!
       :on-message notify-clients!})
    
    (defn ws-handler [request]
      (async/as-channel request websocket-callbacks))
    
    
      1. 测试连接
        这样再次启动repl,在启动http 接口的同时就在同一端口启动了ws协议的websocket服务,可以用wscat命令行测试工具或者在在线测试工具进行连接测试。
    $ wscat -c ws://localhost:3000/api/ws
    Connected (press CTRL+C to quit)
    >
    
    

    客户端

    在任意一个页面找个契机开始创建连接,参考multi-client-ws-immutant创建时指定消息处理回调函数。
    代码如下:

      1. 根目录或者任意目录创建ws连接工具类
    (defn- receive-transit-msg!
      [update-fn]
      (fn [msg]
        (js/clearTimeout clear-time)
        (update-fn (cljs.reader/read-string (.-data msg)))))
    
    (defn- send-transit-msg!
      [msg]
      (if @ws-chan
        (.send @ws-chan msg)
        (throw (js/Error. "Websocket is not available!"))))
    (defn make-websocket! [url receive-handler]
     (println "attempting to connect websocket")
     (if-let [chan (js/WebSocket. url)]
       (do
         (set! (.-onmessage chan) (receive-transit-msg! receive-handler))
         (reset! ws-chan chan)
         (println "Websocket connection established with: " url))
       (throw (js/Error. "Websocket connection failed!"))))
    
      1. 在某个页面触发websocket连接
    (require ' [alk-doc.ws :as ws :refer [webSocketUrl click-one]])
    
    (defn handler-ws-msg
     “可以根据消息内容或者自定义的类型,进行相应的业务处理”
     [msg]
     (js/console.info "收到广播消息:" msg))
    
    (ws/make-websocket! webSocketUrl handler-ws-msg)
    

    这样就建立了前后端ws通信。

    生产环境上的特殊处理

    但是在生产环境(https+nginx)上要做点特殊处理,这里的处理,服务端和客户端都有。

      1. 生产环境多是https的,这就要求页面里面的websocket必须要是wss的,而不能是ws

    因此websocket的连接变成了wss://www.abc.com/api/ws/

      1. 服务端nginx配置支持websocket

    生产环境服务端的接口使用nginx做了反向代理,不出意外的话,上面的连接即使改成wss也连接不成功,需要nginx的配置如下:

    http {
        map $http_upgrade $connection_upgrade {
            default upgrade;
            ''      close;
        }
    
        server {
            ...
    
            location /chat/ {
                proxy_pass http://backend;
                proxy_http_version 1.1;
                proxy_set_header Upgrade $http_upgrade;
                proxy_set_header Connection $connection_upgrade;
                proxy_read_timeout 600s; 
            }
      }
    
      1. 最重要的,客户端加入心跳检测,端口后自动重连

    我们的程序是一个jar直接java启动的,这样在连接上后不会自己断开的,但是上面说了,生产环境使用nginx代理的,默认情况下,利用nginx代理websocket的时候,发现客户端和服务器握手成功后,如果在60s时间内没有数据交互,连接就会自动断开。因此上面配置了proxy_read_timeout 600s,也就是10分钟没有通信再断开。
    这个时间可以长,但我没试过是不是可以无限长。
    所以最好是在客户端加上心跳检测,断开后有能力自己重连,及时服务器不设置10分钟,60秒钟断开也可以自己重连。
    终极方案是心跳检测+延长read-timeout时间,客户端的代码:

    (require '[reagent.core :as reagent :refer [atom]])
    (defonce ws-chan (atom nil))
    (def lockReconnect (atom false))
    (def clear-time (atom nil))
    (def click-one (atom true))
    (def webSocketUrl "ws://182.61.51.177:3055/api/ws")
    
    (defn receive-transit-msg!
      [update-fn]
      (fn [msg]
        (js/clearTimeout clear-time)
        (update-fn (cljs.reader/read-string (.-data msg)))))
    
    (defn send-transit-msg!
      [msg]
      (if @ws-chan
        (.send @ws-chan msg)
        (throw (js/Error. "Websocket is not available!"))))
    
    (set! (.-onbeforeunload js/window) (fn []
                                         (.close (js/WebSocket. webSocketUrl))))
    
    (defn initEventHandle [url chan receive-handler]
      (do
        (set! (.-onmessage chan) (receive-transit-msg! receive-handler))
        (reset! ws-chan chan)
        (js/console.log "Websocket connection established with: " url)
        (set! (.-onopen chan) (fn []
                                (js/console.log "连接成功")
                                (js/clearTimeout clear-time)))
        (set! (.-onclose chan) (fn []
                                 (js/console.log "连接断开>>>")
                                 (reset! lockReconnect true)
                                 (if (true? @lockReconnect)
                                   (do                                 
                                     ;;没连接上会一直重连,设置延迟避免请求过多
                                     (reset! clear-time
                                             (js/setTimeout (fn []
                                                              (initEventHandle webSocketUrl
                                                                               (js/WebSocket. url)
                                                                               receive-handler)
                                                              (reset! lockReconnect false)) 2000))
                                     (reset! lockReconnect false))
                                   (do                                 
                                     (js/clearTimeout clear-time)))))
        (set! (.-onerror chan) (fn []
                                 (js/console.Error "连接错误")
                                 (reset! lockReconnect true)
                                 (if (true? @lockReconnect)
                                   (do                                 
                                     ;;没连接上会一直重连,设置延迟避免请求过多
                                     (reset! clear-time
                                             (js/setTimeout (fn []
                                                              (initEventHandle webSocketUrl
                                                                               (js/WebSocket. url)
                                                                               receive-handler)
                                                              (reset! lockReconnect false)) 2000))
                                     (reset! lockReconnect false))
                                   (do                                 
                                     (js/clearTimeout clear-time)))))))
    
    (defn make-websocket! [url receive-handler]
      (js/console.log "attempting to connect websocket")
      (if-let [chan (js/WebSocket. url)]
        (initEventHandle url chan receive-handler)
        (throw (js/Error. "Websocket connection failed!"))))
    
    

    参考官方demo的实现后,只是成功进行了连接,并不能保证真正的长连,因此上面代码里对当前channel的oncloseonerror事件的处理是我们前端的姑娘参考WebSocket加入心跳包防止自动断开连接js版本用cljs做的实现,给她点个赞👍。

    到这里,clojure和clojurescript里搭建websocket通信就算是可用了。

    client testing

    (require '[reagent.core :as reagent :refer [atom]])
    
    (defonce messages (reagent/atom []))
    
    (defn message-list []
      [:ul
       (for [[i message] (map-indexed vector @messages)]
         ^{:key i}
         [:li message])])
    
    (defn message-input []
      (reagent/with-let [value (reagent/atom nil)]
        [:input
         {:type        :text
          :placeholder "输入&回车发送"
          :value       @value
          :on-change   #(reset! value (-> % .-target .-value))
          :on-key-down #(when (= (.-keyCode %) 13)
                          (send-transit-msg!
                           {:message @value})
                          (reset! value nil))}]))
    
    (defn update-messages! [{:keys [message]}]
      (js/console.info "收到服务端返回的消息")
      (swap! messages #(vec (take 10 (conj % message)))))
    
    
    (make-websocket! webSocketUrl update-messages!)
    (defn home-page []
     [:div.container
      [:div.row
       [:div.col-md-12
        [:h2 "Welcome to chat"]
        [:h4 "服务端将你发送的内容加了个key做了回答"]]]
      [:div.row
       [:div.col-sm-6
        [message-list]]]
      [:div.row
       [:div.col-sm-6
        [message-input]]]])
    
    [home-page]
    

    参考

    immutant websockets sample

    WebSocket proxying

    WebSocket加入心跳包防止自动断开连接

    相关文章

      网友评论

          本文标题:clojure&clojurescript前后端实现websoc

          本文链接:https://www.haomeiwen.com/subject/hztzvctx.html