4. WebSockets

WebFluxopen in new window

此部分参考文档涵盖对 Servlet 堆栈的支持,包括原始 WebSocket 交互的 WebSocket 消息传递,通过 SockJS 进行 WebSocket 仿真以及通过 STOMP 作为 WebSocket 的子协议进行发布、订阅消息传递。

4.1. WebSocket 概论

WebSocket 协议 RFC 6455open in new window 提供了一种标准化方法,可通过单个 TCP 连接在客户端与服务端之间建立全双工双向通信通道。 它是与 HTTP 不同的 TCP 协议,但旨在通过端口 80 与 443 在 HTTP 上工作,并允许重复使用现有的防火墙规则。

WebSocket 交互始于一个 HTTP 请求,该请求使用 HTTP Upgrade 标头进行升级,或者在这种情况下切换到 WebSocket 协议。 以下示例展示了这种交互:

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket 
Connection: Upgrade 
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080


 
 




  1. Upgrade 标头。
  2. 使用 Upgrade 连接。

具有 WebSocket 支持的服务端代替通常的 200 状态代码,返回类似于以下内容的输出:

HTTP/1.1 101 Switching Protocols 
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
 




  1. 协议切换

握手成功后,HTTP 升级请求的基础 TCP 套接字将保持打开状态,客户端与服务端均可继续发送与接收消息。

WebSockets 的工作原理的完整介绍超出了本文档的范围。 请参阅 RFC 6455,HTML5 的 WebSocket 章节,或 Web 上的许多简介和教程中的任何一个。

请注意,如果 WebSocket 服务端在 Web 服务器(例如 nginx)后面运行,那么可能需要对其进行配置,以将 WebSocket 升级请求传递到 WebSocket 服务端。 同样,如果应用程序在云环境中运行,请检查与 WebSocket 支持相关的云提供商的说明。

4.1.1. HTTP 对比 WebSocket

尽管 WebSocket 被设计为与 HTTP 兼容并以 HTTP 请求开头,但重要的是要了解这两个协议导致了截然不同的体系结构与应用程序编程模型。

在 HTTP 与 REST 中,应用程序被建模为许多 URL。 为了与应用程序交互,客户端访问那些 URL,即请求 - 响应风格。 服务端根据 HTTP URL、方法与标头将请求路由到适当的处理程序。

相比之下,在 WebSockets 中,初始连接通常只有一个 URL。 随后,所有应用程序消息都在同一 TCP 连接上流动。 这指向了一个完全不同的异步、事件驱动的消息传递体系结构。

WebSocket 也是一种低级传输协议,与 HTTP 不同,它不对消息的内容规定任何语义。 这意味着除非客户端与服务端就消息语义达成一致,否则就无法路由或处理消息。

WebSocket 客户端与服务端可以通过 HTTP 握手请求上的 Sec-WebSocket-Protocol 标头协商使用更高级别的消息协议(例如 STOMP)。 在这种情况下,它们需要提出自己的约定。

4.1.2. 何时使用 WebSockets

WebSockets 可以使网页具有动态性与交互性。 但是,在许多情况下,结合使用 Ajax 与 HTTP 流或长时间轮询可以提供一种简单有效的解决方案。

例如,新闻、邮件与社交订阅源需要动态更新,但是每几分钟进行一次更新可能是完全可以的。 另一方面,协作、游戏与金融应用程序需要更接近实时。

仅延迟并不是决定因素。 如果消息量相对较少(例如,监视网络故障),那么 HTTP 流或轮询可以提供有效的解决方案。 低延迟,高频率与高音量的结合才是使用 WebSocket 的最佳案例。

还请记住,在 Internet 上,不受控制的代理可能会阻止 WebSocket 交互,这可能是因为未将它们配置为传递 Upgrade 标头,或者是因为它们关闭了长期处于空闲状态的连接。 这意味着与面向公众的应用程序相比,将 WebSocket 用于防火墙内部的应用程序是一个更直接的决定。

4.2. WebSocket API

WebFluxopen in new window

Spring 框架提供了一个 WebSocket API,可用于编写处理 WebSocket 消息的客户端与服务端应用程序。

4.2.1. WebSocketHandler

WebFluxopen in new window

创建 WebSocket 服务端就像实现 WebSocketHandler 一样简单,或者更可能地,扩展 TextWebSocketHandlerBinaryWebSocketHandler。 下面的示例使用 TextWebSocketHandler

import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.TextMessage;

public class MyHandler extends TextWebSocketHandler {

    @Override
    public void handleTextMessage(WebSocketSession session, TextMessage message) {
        // ...
    }

}

有专用的 WebSocket Java 配置与 XML 命名空间支持,用于将前面的 WebSocket 处理程序映射到特定的 URL,如以下示例所示:

import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myHandler(), "/myHandler");
    }

    @Bean
    public WebSocketHandler myHandler() {
        return new MyHandler();
    }

}

下面的示例展示与前面的示例等效的 XML 配置:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:handlers>
        <websocket:mapping path="/myHandler" handler="myHandler"/>
    </websocket:handlers>

    <bean id="myHandler" class="org.springframework.samples.MyHandler"/>

</beans>

前面的示例用于 Spring MVC 应用程序,应该包含在 DispatcherServletopen in new window 的配置中。 但是,Spring 的 WebSocket 支持不依赖于 Spring MVC。 在 WebSocketHttpRequestHandleropen in new window 的帮助下将 WebSocketHandler 集成到其他 HTTP 服务环境中相对简单。

4.2.2. WebSocket 握手

WebFluxopen in new window

定制初始 HTTP WebSocket 握手请求的最简单方法是通过 HandshakeInterceptor,它公开了“握手之前”与“握手之后”的方法。 可以使用此类拦截器来阻止握手或使任何属性对 WebSocketSession 可用。 以下示例使用内置的拦截器将 HTTP 会话属性传递到 WebSocket 会话:

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new MyHandler(), "/myHandler")
            .addInterceptors(new HttpSessionHandshakeInterceptor());
    }

}

下面的示例展示与前面的示例等效的 XML 配置:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:handlers>
        <websocket:mapping path="/myHandler" handler="myHandler"/>
        <websocket:handshake-interceptors>
            <bean class="org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor"/>
        </websocket:handshake-interceptors>
    </websocket:handlers>

    <bean id="myHandler" class="org.springframework.samples.MyHandler"/>

</beans>

一个更高级的选项是扩展 DefaultHandshakeHandler,它执行 WebSocket 握手的步骤,包括验证客户端来源、协商子协议以及其他详细信息。 如果应用程序需要配置自定义 RequestUpgradeStrategy 以便适应尚不支持的 WebSocket 服务端引擎与版本,那么可能还需要使用此选项 (有关此主题的更多信息,请参阅 部署)。 Java 配置与 XML 命名空间都使配置自定义 HandshakeHandler 成为可能。

WARNING

Spring 提供了一个 WebSocketHandlerDecorator 基类,可用于装饰 WebSocketHandler 并具有其他行为。 使用 WebSocket Java 配置或 XML 命名空间时,默认情况下会提供并添加日志记录与异常处理实现。 ExceptionWebSocketHandlerDecorator 捕获由任何 WebSocketHandler 方法引起的所有未捕获的异常,并关闭状态为 1011(指示服务端错误)的 WebSocket 会话。

4.2.3. 部署

Spring WebSocket API 易于集成到 Spring MVC 应用程序中,在该应用程序中,DispatcherServlet 同时提供 HTTP WebSocket 握手与其他 HTTP 请求。 通过调用 WebSocketHttpRequestHandler,也很容易集成到其他 HTTP 处理方案中。 这是方便且易于理解的。 但是,对于 JSR-356 运行时,需要特别注意。

Java WebSocket API(JSR-356)提供了两种部署机制。 首先涉及启动时的 Servlet 容器类路径扫描(Servlet 3 特性)。 另一个是在 Servlet 容器初始化时使用的注册 API。 这两种机制均无法使用单个“前端控制器”进行所有 HTTP 处理(包括 WebSocket 握手与所有其他 HTTP 请求,例如:Spring MVC 的 DispatcherServlet)。

这是 JSR-356 的一个重大限制,即使在 JSR-356 运行时中运行,Spring 的 WebSocket 支持也可以通过服务端特定的 RequestUpgradeStrategy 实现解决。 Tomcat、Jetty、GlassFish、WebLogic、WebSphere 与 Undertow(与 WildFly)目前存在此类策略。

TIP

已经创建了克服 Java WebSocket API 中的上述限制的请求,可以在 eclipse-ee4j/websocket-api#211open in new window 中进行跟踪。 Tomcat、Undertow 与 WebSphere 提供了自己的 API 替代方案,使之可以做到这一点,而 Jetty 也可以实现。 我们希望更多的服务端可以做到这一点。

第二个考虑因素是,希望支持 JSR-356 的 Servlet 容器执行 ServletContainerInitializer(SCI)扫描,这可能会减慢应用程序的启动速度,在某些情况下会大大降低速度。 如果在升级到具有 JSR-356 支持的 Servlet 容器版本后观察到重大影响,那么应该可以通过使用 web.xml 中的 <absolute-ordering /> 元素有选择地启用或禁用 Web 片段(和 SCI 扫描),如以下示例所示:

<web-app xmlns="http://java.sun.com/xml/ns/javaee"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
        http://java.sun.com/xml/ns/javaee
        https://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    version="3.0">

    <absolute-ordering/>

</web-app>

然后,可以按名称选择性地启用 Web 片段,例如 Spring 自己的 SpringServletContainerInitializer,它提供对 Servlet 3 Java 初始化 API 的支持。 以下示例展示了如何执行此操作:

<web-app xmlns="http://java.sun.com/xml/ns/javaee"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
        http://java.sun.com/xml/ns/javaee
        https://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    version="3.0">

    <absolute-ordering>
        <name>spring_web</name>
    </absolute-ordering>

</web-app>

4.2.4. 服务端配置

WebFluxopen in new window

每个基础的 WebSocket 引擎都公开配置属性,这些属性控制运行时特征,例如消息缓冲区大小的大小、空闲超时等。

对于 Tomcat、WildFly 与 GlassFish,可以将 ServletServerContainerFactoryBean 添加到 WebSocket Java 配置中,如以下示例所示:

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(8192);
        container.setMaxBinaryMessageBufferSize(8192);
        return container;
    }

}

下面的示例展示与前面的示例等效的 XML 配置:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <bean class="org.springframework...ServletServerContainerFactoryBean">
        <property name="maxTextMessageBufferSize" value="8192"/>
        <property name="maxBinaryMessageBufferSize" value="8192"/>
    </bean>

</beans>

TIP

对于客户端 WebSocket 配置,应使用 WebSocketContainerFactoryBean(XML)或 ContainerProvider.getWebSocketContainer()(Java 配置)。

对于 Jetty,需要提供一个预先配置的 Jetty WebSocketServerFactory,然后通过 WebSocket Java 配置将其注入 Spring 的 DefaultHandshakeHandler 中。 以下示例展示了如何执行此操作:

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(echoWebSocketHandler(),
            "/echo").setHandshakeHandler(handshakeHandler());
    }

    @Bean
    public DefaultHandshakeHandler handshakeHandler() {

        WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
        policy.setInputBufferSize(8192);
        policy.setIdleTimeout(600000);

        return new DefaultHandshakeHandler(
                new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy)));
    }

}

下面的示例展示与前面的示例等效的 XML 配置:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:handlers>
        <websocket:mapping path="/echo" handler="echoHandler"/>
        <websocket:handshake-handler ref="handshakeHandler"/>
    </websocket:handlers>

    <bean id="handshakeHandler" class="org.springframework...DefaultHandshakeHandler">
        <constructor-arg ref="upgradeStrategy"/>
    </bean>

    <bean id="upgradeStrategy" class="org.springframework...JettyRequestUpgradeStrategy">
        <constructor-arg ref="serverFactory"/>
    </bean>

    <bean id="serverFactory" class="org.eclipse.jetty...WebSocketServerFactory">
        <constructor-arg>
            <bean class="org.eclipse.jetty...WebSocketPolicy">
                <constructor-arg value="SERVER"/>
                <property name="inputBufferSize" value="8092"/>
                <property name="idleTimeout" value="600000"/>
            </bean>
        </constructor-arg>
    </bean>

</beans>

4.2.5. 允许的来源

WebFluxopen in new window

从 Spring Framework 4.1.5 开始,WebSocket 与 SockJS 的默认行为是仅接受同源请求。 也可以允许所有或指定的来源列表。 此检查主要用于浏览器客户端。 没有任何措施可以阻止其他类型的客户端修改 Origin 标头值(有关更多详细信息,请参阅 RFC 6454: Web 源概念open in new window)。

三种可能的行为是:

  • 仅允许相同来源的请求(默认): 在此模式下,启用 SockJS 后,Iframe HTTP 响应标头 X-Frame-Options 设置为 SAMEORIGIN,并且 JSONP 传输被禁用,因为它不允许检查请求的来源。 因此,启用此模式时,不支持 IE6 与 IE7。

  • 允许指定来源列表: 每个允许的来源必须以 http://https:// 开头。 在此模式下,启用 SockJS 后,将禁用 IFrame 传输。 因此,启用此模式时,不支持 IE6 到 IE9。

  • 允许所有来源: 要启用此模式,应提供 * 作为允许的来源值。 在这种模式下,所有传输都可用。

可以配置 WebSocket 与 SockJS 允许的来源,如以下示例所示:

import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myHandler(), "/myHandler").setAllowedOrigins("https://mydomain.com");
    }

    @Bean
    public WebSocketHandler myHandler() {
        return new MyHandler();
    }

}

下面的示例展示与前面的示例等效的 XML 配置:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:handlers allowed-origins="https://mydomain.com">
        <websocket:mapping path="/myHandler" handler="myHandler" />
    </websocket:handlers>

    <bean id="myHandler" class="org.springframework.samples.MyHandler"/>

</beans>

4.3. SockJS 回退

在公共互联网上,控件外部的限制性代理可能会阻止 WebSocket 交互,这可能是因为未将它们配置为传递 Upgrade 标头,或者是因为它们关闭了长期处于空闲状态的连接。

解决此问题的方法是 WebSocket 模拟,即先尝试使用 WebSocket,然后再尝试使用基于 HTTP 的技术来模拟 WebSocket 交互并公开相同的应用程序级 API。

在 Servlet 技术栈上,Spring 框架为 SockJS 协议提供服务端与客户端支持。

4.3.1. 概览

SockJS 的目标是让应用程序使用 WebSocket API,但在运行时的必要时刻回退到非 WebSocket 替代方案,而无需更改应用程序代码。

SockJS 包含:

SockJS 设计用于浏览器。 它使用多种技术来支持各种浏览器版本。 有关 SockJS 传输类型与浏览器的完整列表,请参见 SockJS 客户端open in new window页面。 传输分为三大类:WebSocket、HTTP 流与 HTTP 长轮询。 有关这些类别的概述,请参阅此博客文章open in new window

SockJS 客户端首先发送 GET/info 以从服务端获取基本信息。 在那之后,它必须决定使用哪种传输方式。 如果可能,将使用 WebSocket。 如果没有,在大多数浏览器中,至少有一个 HTTP 流选项。 如果不是,则使用 HTTP(长)轮询。

所有传输请求都具有以下 URL 结构:

https://host:port/myApp/myEndpoint/{server-id}/{session-id}/{transport}

此处:

  • {server-id} 用于在集群中路由请求,但在其他情况下不使用。

  • {session-id} 用于关联属于 SockJS 会话的 HTTP 请求。

  • {transport} 用于指示传输类型(例如:websocket、xhr-streaming、等)。

WebSocket 传输仅需要单个 HTTP 请求即可进行 WebSocket 握手。 此后所有消息在该套接字上交换。

HTTP 传输需要更多请求。 例如,Ajax/XHR 流依赖于对服务端到客户端消息的一个长时间运行的请求,以及对客户端到服务端消息的其他 HTTP POST 请求。 长轮询与此类似,不同之处在于长轮询在每次服务端到客户端发送后结束当前请求。

SockJS 增加了最小消息帧。 例如,服务端最初发送字母 o(“开启”帧),消息以 a["message1","message2"](JSON 编码数组)发送,如果 25 秒内(默认)没有消息,那么发送字母 h(“心跳”帧),最后使用字母 c(“关闭”帧)关闭会话。

要了解更多信息,请在浏览器中运行示例并查看 HTTP 请求。 SockJS 客户端允许修复传输列表,因此可以一次查看每个传输。 SockJS 客户端还提供了调试标志,该标志可在浏览器控制台中支持有用的消息。 在服务端,可以为 org.springframework.web.socket 启用 TRACE 日志记录。 有关更多详细信息,请参见 SockJS 协议叙述测试open in new window

4.3.2. 使能 SockJS

可以通过 Java 配置启用 SockJS,如以下示例所示:

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myHandler(), "/myHandler").withSockJS();
    }

    @Bean
    public WebSocketHandler myHandler() {
        return new MyHandler();
    }

}

下面的示例展示与前面的示例等效的 XML 配置:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:handlers>
        <websocket:mapping path="/myHandler" handler="myHandler"/>
        <websocket:sockjs/>
    </websocket:handlers>

    <bean id="myHandler" class="org.springframework.samples.MyHandler"/>

</beans>

前面的示例用于 Spring MVC 应用程序,应该包含在 DispatcherServletopen in new window 的配置中。 但是,Spring 的 WebSocket 支持不依赖于 Spring MVC。 在 WebSocketHttpRequestHandleropen in new window 的帮助下将 WebSocketHandler 集成到其他 HTTP 服务环境中相对简单。

在浏览器端,应用程序可以使用 sockjs-clientopen in new window(版本 1.0.x)。 它模拟 W3C WebSocket API,并与服务端通信以选择最佳的传输选项,具体取决于运行它的浏览器。 请参阅 sockjs-clientopen in new window 页面与浏览器支持的传输类型列表。 客户端还提供了几个配置选项,例如用于指定要包含的传输。

4.3.3. IE 8 与 9

Internet Explorer 8 与 9 仍在使用。 它们是拥有 SockJS 的关键原因。 本节涵盖有关在这些浏览器中运行的重要注意事项。

SockJS 客户端通过使用 Microsoft 的 XDomainRequestopen in new window 在 IE 8 与 9 中支持 Ajax/XHR 流。 跨域有效,但不支持发送 Cookie。 Cookies 对于 Java 应用程序通常是必不可少的。 但是,由于 SockJS 客户端可用于多种服务端类型(而不仅仅是 Java 类型),因此它需要知道 cookie 是否重要。 如果是这样,那么 SockJS 客户端更倾向 Ajax/XHR 进行流传输。 否则,它依赖于基于 iframe 的技术。

SockJS 客户端的第一个 /info 请求是对详细信息的请求,这些信息可以影响客户端对传输方式的选择。 这些详细信息之一是服务端应用程序是否依赖 Cookie(例如,出于身份验证目的或使用粘性会话进行群集)。 Spring 对 SockJS 的支持包括一个名为 sessionCookieNeeded 的属性。 由于大多数 Java 应用程序都依赖 JSESSIONID cookie,因此默认情况下启用该功能。 如果应用程序不需要它,那么可以关闭此选项,然后 SockJS 客户端应在 IE 8 与 9 中选择 xdr-streaming

如果确实使用基于 iframe 的传输,请记住,可以通过将 HTTP 响应标头 X-Frame-Options 设置为 DENYSAMEORIGINALLOW-FROM <origin> 来指示浏览器阻止在给定页面上使用 iframe。 这用于防止点击劫持open in new window

TIP

Spring Security 3.2+ 提供了对每个响应设置 X-Frame-Options 的支持。 默认情况下,Spring Security Java 配置将其设置为 DENY。 在 3.2 中,Spring Security XML 命名空间默认情况下不设置该标头,但可以配置为这样做。 将来,它可能会成为默认设置。

有关如何配置 X-Frame-Options 标头设置的详细信息,请参见 Spring Security 文档的默认安全标头open in new window。 也可以查看 SEC-2501open in new window 以获取更多背景信息。

如果应用程序添加了 X-Frame-Options 响应标头(应该如此!)并依赖于基于 iframe 的传输,那么需要将标头值设置为 SAMEORIGINALLOW-FROM <origin>。 Spring SockJS 支持还需要知道 SockJS 客户端的位置,因为它是从 iframe 加载的。 默认情况下,iframe 设置为从 CDN 位置下载 SockJS 客户端。 最好将此选项配置为使用与应用程序源相同的 URL。

以下示例展示了如何在 Java 配置中执行此操作:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS()
                .setClientLibraryUrl("http://localhost:8080/myapp/js/sockjs-client.js");
    }

    // ...

}

XML 命名空间通过 <websocket:sockjs> 元素提供了类似的选项。

TIP

在初始开发过程中,请启用 SockJS 客户端开发模式,以防止浏览器缓存本应缓存的 SockJS 请求(如 iframe)。 有关如何启用它的详细信息,请参见 SockJS 客户端open in new window 页面。

4.3.4. 心跳

SockJS 协议要求服务端发送心跳消息,以防止代理断定连接挂起。 Spring SockJS 配置具有一个名为 heartbeatTime 的属性,可用于自定义频率。 默认情况下,假设没有其他消息在该连接上发送,那么心跳将在 25 秒后发送。 这个 25 秒的值符合以下对公共互联网应用程序的 IETF 建议open in new window

TIP

在 WebSocket 与 SockJS 上使用 STOMP 时,如果 STOMP 客户端与服务端协商要交换的心跳,将会禁用 SockJS 心跳。

Spring SockJS 支持还允许配置 TaskScheduler 来计划心跳任务。 任务调度程序由线程池支持,其默认设置基于可用处理器的数量。 应该考虑根据特定需求自定义设置。

4.3.5. 客户端挂断

HTTP 流与 HTTP 长轮询 SockJS 传输要求连接保持打开的时间比平常更长。 有关这些技术的概述,请参见此博客文章open in new window

在 Servlet 容器中,这是通过 Servlet 3 异步支持完成的,该支持允许退出 Servlet 容器线程,处理请求并继续写入另一个线程的响应。

一个特定的问题是,Servlet API 不会为已离开的客户端提供通知。 请参阅 eclipse-ee4j/servlet-api#44open in new window。 但是,Servlet 容器在随后尝试写入响应时会引发异常。 由于 Spring 的 SockJS 服务支持服务端发送的心跳信号(默认情况下,每 25 秒发送一次),这意味着通常会在该时间段内(如果消息发送频率更高将更早)检测到客户端断开连接。

TIP

结果,由于客户端已断开连接,可能会发生网络 I/O 故障,这可能会在日志中填充不必要的堆栈跟踪。 Spring 会尽最大努力找出代表客户端断开连接(特定于每个服务器)的网络故障,并通过使用专用日志类别 DISCONNECTED_CLIENT_LOG_CATEGORY(在 AbstractSockJsSession 中定义)来记录一条最小的消息。 如果需要查看堆栈跟踪,可以将该日志类别设置为 TRACE

4.3.6. SockJS 与 CORS

如果允许跨域请求(请参阅“允许的来源”),那么 SockJS 协议使用 CORS 在 XHR 流与轮询传输中提供跨域支持。 因此,除非在响应中检测到 CORS 标头的存在,否则将自动添加 CORS 标头。 因此,如果已经将应用程序配置为提供 CORS 支持(例如,通过 Servlet 过滤器),那么 Spring 的 SockJsService 会跳过这一部分。

也可以通过在 Spring 的 SockJsService 中设置 suppressCors 属性来禁止添加这些 CORS 标头。

SockJS 需要以下标头与值:

  • Access-Control-Allow-Origin:从 Origin 请求标头的值初始化。

  • Access-Control-Allow-Credentials:始终设置为 true

  • Access-Control-Request-Headers:从等效请求标头中的值初始化。

  • Access-Control-Allow-Methods:传输支持的 HTTP 方法(请参见 TransportType 枚举)。

  • Access-Control-Max-Age:设置为 31536000(1 年)。

有关确切的实现,请参见 AbstractSockJsService 中的 addCorsHeaders 与源代码中的 TransportType 枚举。

另外,如果 CORS 配置允许,请考虑排除带有 SockJS 端点前缀的 URL,从而让 Spring 的 SockJsService 处理它。

4.3.7. SockJsClient

Spring 提供了一个 SockJS Java 客户端,无需使用浏览器即可连接到远程 SockJS 端点。 当需要通过公共网络在两个服务器之间进行双向通信时(这是网络代理可以阻止使用 WebSocket 协议的地方),这特别有用。 SockJS Java 客户端对于测试目的(例如,模拟大量并发用户)也非常有用。

SockJS Java 客户端支持 websocketxhr-streamingxhr-polling 传输。 其余的仅在浏览器中有意义。

可以使用以下命令配置 WebSocketTransport

  • JSR-356 运行时的 StandardWebSocketClient

  • 通过使用 Jetty 9+ 本机 WebSocket API 来创建 JettyWebSocketClient

  • Spring 的 WebSocketClient 的任何实现。

根据定义,XhrTransport 支持 xhr-streamingxhr-polling,因为从客户端的角度来看,除了用于连接服务器的 URL 之外没有其他区别。 当前有两种实现:

  • RestTemplateXhrTransport 使用 Spring 的 RestTemplate 进行 HTTP 请求。

  • JettyXhrTransport 使用 Jetty 的 HttpClient 进行 HTTP 请求。

以下示例展示了如何创建 SockJS 客户端并连接到 SockJS 端点:

List<Transport> transports = new ArrayList<>(2);
transports.add(new WebSocketTransport(new StandardWebSocketClient()));
transports.add(new RestTemplateXhrTransport());

SockJsClient sockJsClient = new SockJsClient(transports);
sockJsClient.doHandshake(new MyWebSocketHandler(), "ws://example.com:8080/sockjs");

TIP

SockJS 对消息使用 JSON 格式的数组。 默认情况下,使用 Jackson 2,并且需要在类路径上。 或者,可以配置 SockJsMessageCodec 的自定义实现,并在 SockJsClient 上对其进行配置。

要使用 SockJsClient 模拟大量并发用户,需要配置基础 HTTP 客户端(用于 XHR 传输)以允许足够数量的连接与线程。 以下示例展示了如何使用 Jetty 进行操作:

HttpClient jettyHttpClient = new HttpClient();
jettyHttpClient.setMaxConnectionsPerDestination(1000);
jettyHttpClient.setExecutor(new QueuedThreadPool(1000));

下面的示例展示了与服务端 SockJS 相关的属性(有关详细信息,请参见 javadoc),还应该考虑自定义:

@Configuration
public class WebSocketConfig extends WebSocketMessageBrokerConfigurationSupport {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/sockjs").withSockJS()
            .setStreamBytesLimit(512 * 1024) 
            .setHttpMessageCacheSize(1000) 
            .setDisconnectDelay(30 * 1000); 
    }

    // ...
}






 
 
 




  1. streamBytesLimit 属性设置为 512KB(默认值为 128KB——128 * 1024)。

  2. httpMessageCacheSize 属性设置为 1,000(默认值为 100)。

  3. 将断开连接延迟属性设置为 30 个属性秒(默认值为 5 秒——5 * 1000)。

4.4. STOMP

WebSocket 协议定义了两种消息类型(文本消息与二进制消息),但是其内容未定义。 该协议定义了一种机制,供客户端与服务端协商用于在 WebSocket 之上使用的子协议(即高级消息协议),以定义每条消息可以发送的类型、格式是什么、每条消息的内容,等等。 子协议的使用是可选的,但是无论哪种方式,客户端和服务端都需要就定义消息内容的某种协议达成共识。

4.4.1. 概览

STOMPopen in new window(面向简单文本的消息传递协议)最初是为脚本语言(例如:Ruby、Python 与 Perl)创建的,以连接到企业消息代理。 它旨在解决常用消息传递模式的最小子集。 STOMP 可以在任何可靠的双向流网络协议上使用,例如 TCP 与 WebSocket。 尽管 STOMP 是面向文本的协议,但是消息有效负载可以是文本或二进制。

STOMP 是基于帧的协议,其帧是基于 HTTP 的。 以下清单显示了 STOMP 帧的结构:

COMMAND
header1:value1
header2:value2

Body^@

客户端可以使用 SENDSUBSCRIBE 命令发送或订阅消息,以及描述消息的内容与接收者的 destination 标头。 这启用了一种简单的发布 - 订阅机制,可以使用该机制通过代理将消息发送到其他连接的客户端,或者将消息发送到服务端以请求执行某些工作。

使用 Spring 的 STOMP 支持时,Spring WebSocket 应用程序将充当客户端的 STOMP 代理。 消息被路由到 @Controller 消息处理方法或简单的内存中代理,该代理跟踪订阅并向订阅的用户广播消息。 还可以将 Spring 配置为与专用的 STOMP 代理(例如:RabbitMQ、ActiveMQ 与其他代理)一起使用,以实际广播消息。 在那种情况下,Spring 维护与代理的 TCP 连接,将消息中继给它,并将消息从它传递到连接的 WebSocket 客户端。 因此,Spring Web 应用程序可以依靠基于 HTTP 的统一安全性,通用验证以及用于消息处理的熟悉的编程模型。

以下示例展示了一个客户订阅的股票报价,服务端可能会定期发出该股票报价(例如:通过计划的任务,该任务通过 SimpMessagingTemplate 将消息发送给代理):

SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*

^@

以下示例展示了一个发送交易请求的客户端,服务端可以通过 @MessageMapping 方法处理该请求:

SEND
destination:/queue/trade
content-type:application/json
content-length:44

{"action":"BUY","ticker":"MMM","shares",44}^@

执行后,服务端可以向客户广播交易确认消息与详细信息。

在 STOMP 规范中,目的地的含义是故意不透明的。 它可以是任何字符串,并且完全由 STOMP 服务端定义它们支持的目的地的语义和语法。 但是,目的地通常是类似路径的字符串,其中 /topic/.. 表示发布 - 订阅(一对多),而 /queue/ 表示点对点(一对一)消息交流。

STOMP 服务端可以使用 MESSAGE 命令向所有订阅者广播消息。 以下示例展示了服务端向订阅的客户端发送股票报价:

MESSAGE
message-id:nxahklf6-1
subscription:sub-1
destination:/topic/price.stock.MMM

{"ticker":"MMM","price":129.45}^@

服务端无法发送未经请求的消息。 来自服务端的所有消息都必须响应特定的客户端订阅,并且服务端消息的 subscription-id 标头必须与客户端订阅的 id 标头匹配。

前面的概述旨在提供对 STOMP 协议的最基本的了解。 建议全面阅读协议 规范open in new window

4.4.2. 优点

与使用原始 WebSocket 相比,使用 STOMP 作为子协议使 Spring Framework 与 Spring Security 提供了更丰富的编程模型。 关于 HTTP 与原始 TCP 以及它如何让 Spring MVC 与其他 Web 框架提供丰富的功能,可以得出相同的观点。 以下是优点列表:

  • 无需发明自定义消息协议与消息格式。

  • 可以使用 STOMP 客户端,包括 Spring Framework 中的 Java 客户端open in new window

  • 可以(可选)使用消息代理(例如:RabbitMQ、ActiveMQ 与其他代理)来管理订阅与广播消息。

  • 可以在任意数量的 @Controller 实例中组织应用程序逻辑,并且可以基于 STOMP 目的地标头将消息路由到它们,而不是通过给定连接使用单个 WebSocketHandler 处理原始 WebSocket 消息。

  • 可以使用 Spring Security 基于 STOMP 目的地与消息类型来保护消息。

4.4.3. 启用 STOMP

spring-messagingspring-websocket 模块中提供了 STOMP 通过 WebSocket 支持。 一旦有了这些依赖关系,就可以使用 SockJS 回退 通过 WebSocket 公开 STOMP 端点,如以下示例所示:

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();  
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app"); 
        config.enableSimpleBroker("/topic", "/queue"); 
    }
}









 




 
 


  1. /portfolio 是 WebSocket(或 SockJS)客户端为了 WebSocket 握手需要连接到的端点的 HTTP URL。

  2. 其目的地标头以 /app 开头的 STOMP 消息被路由到 @Controller 类中的 @MessageMapping 方法。

  3. 使用内置的消息代理进行订阅与广播,以及将目的地标头以 /topic/queue 开头的消息路由到代理。

下面的示例展示与前面的示例等效的 XML 配置:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio">
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:simple-broker prefix="/topic, /queue"/>
    </websocket:message-broker>

</beans>

TIP

对于内置的简单代理,/topic/queue 前缀没有任何特殊含义。 它们仅是区分发布订阅消息传递与点对点消息传递的约定(即,许多订阅者与一个消费者)。 使用外部代理时,请检查代理的 STOMP 页面以了解其支持哪种 STOMP 目标与前缀。

要从浏览器连接 SockJS,可以使用 sockjs-clientopen in new window。 对于 STOMP,许多应用程序都使用了 jmesnil/stomp-websocketopen in new window 库(也称为 stomp.js),该库功能齐全,已在生产中使用多年,但不再维护。 目前,JSteunou/webstomp-clientopen in new window 是该库中最活跃且发展最快的后继程序。 以下示例代码基于此:

var socket = new SockJS("/spring-websocket-portfolio/portfolio");
var stompClient = webstomp.over(socket);

stompClient.connect({}, function(frame) {
}

请注意,前面示例中的 stompClient 不需要指定 loginpassword 标头。 即使这样做,它们也会在服务器端被忽略(或更确切地说,被覆盖)。 有关身份验证的更多信息,请参见连接到代理身份验证

有关更多示例代码,请参见:

4.4.4. WebSocket 服务端

要配置基础 WebSocket 服务器,请应用“服务端配置”中的信息。 但是对于Jetty,需要通过 StompEndpointRegistry 设置 HandshakeHandlerWebSocketPolicy

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").setHandshakeHandler(handshakeHandler());
    }

    @Bean
    public DefaultHandshakeHandler handshakeHandler() {

        WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
        policy.setInputBufferSize(8192);
        policy.setIdleTimeout(600000);

        return new DefaultHandshakeHandler(
                new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy)));
    }
}

4.4.5. 消息流

公开 STOMP 端点后,Spring 应用程序将成为已连接客户端的 STOMP 代理。 本节描述服务端的消息流。

spring-messaging 模块包含对起源于 Spring Integrationopen in new window 的消息传递应用程序的基础支持,后来被提取并合并到 Spring Framework 中,以在许多 Spring 项目与应用程序场景中更广泛地使用。 下面的列表简要描述了一些可用的消息传递抽象:

Java 配置(即 @EnableWebSocketMessageBroker)与 XML 命名空间配置(即 <websocket:message-broker>)都使用前面的组件来组装消息工作流。 下图展示了启用简单内置消息代理时使用的组件:

消息流简单代理

上图展示了三个消息通道:

  • clientInboundChannel:用于传递从 WebSocket 客户端收到的消息。

  • clientOutboundChannel:用于向 WebSocket 客户端发送服务端消息。

  • brokerChannel:用于从服务端应用程序代码内将消息发送到消息代理。

下图展示了将外部代理(例如 RabbitMQ)配置为用于管理订阅与广播消息时使用的组件:

消息流代理转发

前面两个图之间的主要区别是使用“代理中继”将消息通过 TCP 传递到外部 STOMP 代理,以及将消息从代理传递到订阅的客户端。

从 WebSocket 连接接收到消息后,它们将被解码为 STOMP 帧,转换为 Spring 消息表示形式,然后发送到 clientInboundChannel 进行进一步处理。 例如:目的地标头以 /app 开头的 STOMP 消息可以路由到控制器中带 @MessageMapping 注解的方法,而 /topic/queue 消息可以直接路由到消息代理。

处理来自客户端的 STOMP 消息的带注解的 @Controller 可以通过 brokerChannel 将消息发送到消息代理,并且代理通过 clientOutboundChannel 将消息广播给匹配的订阅者。 相同的控制器还可以响应 HTTP 请求执行相同的操作,因此客户端可以执行 HTTP POST,然后 @PostMapping 方法可以将消息发送到消息代理,以广播到订阅的客户端。

可以通过一个简单的示例跟踪流程。 考虑以下示例,该示例设置了服务器:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app");
        registry.enableSimpleBroker("/topic");
    }
}

@Controller
public class GreetingController {

    @MessageMapping("/greeting") {
    public String handle(String greeting) {
        return "[" + getTimestamp() + ": " + greeting;
    }
}

前面的示例支持以下流程:

  1. 客户端连接到 http://localhost:8080/portfolioopen in new window,一旦建立了 WebSocket 连接,STOMP 帧就开始在其上流动。

  2. 客户端发送 SUBSCRIBE 帧,其目的地标头为 /topic/greeting。 接收并解码后,该消息将发送到 clientInboundChannel,然后路由到消息代理,该代理存储客户端订阅。

  3. 客户端向 /app/greeting 发送一个 aSEND 帧。 /app 前缀有助于将其路由到带注解的控制器。 删除 /app 前缀后,目标的其余 /greeting 部分将映射到 GreetingController 中的 @MessageMapping 方法。

  4. GreetingController 返回的值被转换为带有有效负载的 Spring 消息,该有效负载基于返回值和 /topic/greeting 的默认目的地标头(从输入目的地派生,其中 /app 替换为 /topic)。 结果消息将发送到 brokerChannel 并由消息代理处理。

  5. 消息代理查找所有匹配的订阅者,并通过 clientOutboundChannel 向每个消息订阅者发送一个 MESSAGE 帧,消息在该消息中被编码为 STOMP 帧并通过 WebSocket 连接发送。

下一节将提供有关带注解方法的更多详细信息,包括支持的参数类型与返回值。

4.4.6. 带注解的控制器

应用程序可以使用带注解的 @Controller 类来处理来自客户端的消息。 这样的类可以声明 @MessageMapping@SubscribeMapping@ExceptionHandler 方法,如以下主题所述:

@MessageMapping

可以使用 @MessageMapping 来注解基于目的地路由消息的方法。 在方法级与类型级都支持它。 在类级,@MessageMapping 用于表示控制器中所有方法之间的共享映射。

默认情况下,映射值是 Ant 风格的路径模式(例如:/thing*/thing/**),包括对模板变量(例如:/thing/{id})的支持。 可以通过 @DestinationVariable 方法参数引用这些值。 应用程序还可以切换到以点分隔的映射的目标约定,如“将点作为分隔符”中所述。

支持的方法参数

下表描述了方法参数:

方法参数描述
Message用于访问完整的消息。
MessageHeaders用于访问 Message 中的标头。
MessageHeaderAccessorSimpMessageHeaderAccessorStompHeaderAccessor用于通过类型化访问器方法访问标头。
@Payload为了访问消息的有效负载,由配置的 MessageConverter 转换(例如,从 JSON)。
不需要此注解,因为默认情况下会假定没有其它自变量匹配。
可以使用 @javax.validation.Valid 或 Spring 的 @Validated 注解有效负载参数,以自动验证有效负载参数。
@Header为了访问特定的标头值——以及必要时使用 org.springframework.core.convert.converter.Converter 进行类型转换。
@Headers用于访问消息中的所有标头。此参数必须可分配给 java.util.Map
@DestinationVariable用于访问从消息目的地中提取的模板变量。根据需要将值转换为声明的方法参数类型。
java.security.Principal反映在 WebSocket HTTP 握手时登录的用户。
返回值

默认情况下,@MessageMapping 方法的返回值通过匹配的 MessageConverter 序列化为有效负载,并作为消息发送到 brokerChannel,并从该消息广播到订阅者。 出站邮件的目的地与入站邮件的目的地相同,但以 /topic 为前缀。

可以使用 @SendTo@SendToUser 注解来自定义输出消息的目的地。 @SendTo 用于自定义目标位置或指定多个目标。 @SendToUser 用于将输出消息定向到仅与输入消息关联的用户。 请参阅用户目的地

可以在同一方法上同时使用 @SendTo@SendToUser,并且两者在类级都受支持,在这种情况下,它们充当类中方法的默认值。 但是,请记住,任何方法级的 @SendTo@SendToUser 注解都会在类级别覆盖所有此类批注。

消息可以异步处理,@MessageMapping 方法可以返回 ListenableFutureCompletableFutureCompletionStage

请注意,@SendTo@SendToUser 只是一种便利,等同于使用 SimpMessagingTemplate 发送消息。 如有必要,对于更高级的方案,可以直接使用 SimpMessagingTemplate 来使用 @MessageMapping 方法。 这可以代替返回值,也可以附加于返回值。 请参阅发送消息

@SubscribeMapping

@SubscribeMapping@MessageMapping 相似,但是将映射范围缩小到仅订阅消息。 它支持与 @MessageMapping 相同的方法参数。 但是,在默认情况下对于返回值,消息直接发送到客户端(通过 clientOutboundChannel,作为对订阅的响应),而不发送到代理(通过 brokerChannel,作为对匹配订阅的广播)。 添加 @SendTo@SendToUser 会覆盖此行为,而是发送给代理。

什么时候有用? 假定代理映射到 /topic/queue,而应用程序控制器映射到 /app。 在此设置中,代理将存储 /topic/queue 的所有订阅,这些订阅旨在进行重复广播,并且不需要应用程序参与。 客户端还可以订阅某个 /app 目的地,并且控制器可以响应该订阅而返回一个值,而无需代理,而无需再次存储或使用该订阅(实际上是一次请求 - 答复交换)。 一个用例是在启动时用初始数据填充 UI。

什么时候没用? 不要尝试将代理与控制器映射到相同的目标前缀,除非出于某种原因希望两者都独立处理消息(包括订阅)。 入站消息是并行处理的。 不能保证代理或控制器首先处理给定的消息。 如果要在存储订阅并准备广播时通知目标,那么客户端应询问服务端是否支持收据(简单代理不支持)。 例如,对于 Java STOMP 客户端,可以执行以下操作添加收据:

@Autowired
private TaskScheduler messageBrokerTaskScheduler;

// During initialization..
stompClient.setTaskScheduler(this.messageBrokerTaskScheduler);

// When subscribing..
StompHeaders headers = new StompHeaders();
headers.setDestination("/topic/...");
headers.setReceipt("r1");
FrameHandler handler = ...;
stompSession.subscribe(headers, handler).addReceiptTask(() -> {
    // Subscription ready...
});

服务端的一种选择是在 brokerChannel注册一个 ExecutorChannelInterceptor 并实现 afterMessageHandled 方法,该方法在处理消息(包括订阅)之后被调用。

@ExceptionHandler

应用程序可以使用 @MessageExceptionHandler 方法来处理 @MessageMapping 方法中的异常。 如果要访问异常实例,那么可以在注解本身中声明异常,也可以通过方法参数声明异常。 以下示例通过方法参数声明异常:

@Controller
public class MyController {

    // ...

    @MessageExceptionHandler
    public ApplicationError handleException(MyException exception) {
        // ...
        return appError;
    }
}

@MessageExceptionHandler 方法支持灵活的方法签名,并支持与 @MessageMapping 方法相同的方法参数类型与返回值。

通常,@MessageExceptionHandler 方法适用于声明它们的 @Controller 类(或类层次结构)。 如果希望此类方法在全局范围内(跨控制器)应用,那么可以在标有 @ControllerAdvice 的类中声明它们。 这与 Spring MVC 中可用的类似支持open in new window相当。

4.4.7. 发送消息

如果要从应用程序的任何部分向连接的客户端发送消息怎么办? 任何应用程序组件都可以将消息发送到 brokerChannel。 最简单的方法是注入 SimpMessagingTemplate 并使用它发送消息。 通常,将按类型注入它,如以下示例所示:

@Controller
public class GreetingController {

    private SimpMessagingTemplate template;

    @Autowired
    public GreetingController(SimpMessagingTemplate template) {
        this.template = template;
    }

    @RequestMapping(path="/greetings", method=POST)
    public void greet(String greeting) {
        String text = "[" + getTimestamp() + "]:" + greeting;
        this.template.convertAndSend("/topic/greetings", text);
    }

}

但是,如果存在另一个相同类型的 bean,也可以通过其名称(brokerMessagingTemplate)对其进行限定。

4.4.8. 简单代理

内置的简单消息代理处理来自客户端的订阅请求,将其存储在内存中,并将消息广播到具有匹配目的地的已连接客户端。 该代理支持类似路径的目的地,包括订阅 Ant 风格的目的地模式。

TIP

应用程序还可以使用点分隔(而不是斜杠分隔)目的地。 请参阅将点作为分隔符

如果配置了任务调度程序,那么简单代理支持 STOMP 心跳。 为此,可以声明自己的调度程序,也可以使用内部自动声明与使用的调度程序。 以下示例展示如何声明自己的调度程序:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private TaskScheduler messageBrokerTaskScheduler;

    @Autowired
    public void setMessageBrokerTaskScheduler(TaskScheduler taskScheduler) {
        this.messageBrokerTaskScheduler = taskScheduler;
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {

        registry.enableSimpleBroker("/queue/", "/topic/")
                .setHeartbeatValue(new long[] {10000, 20000})
                .setTaskScheduler(this.messageBrokerTaskScheduler);

        // ...
    }
}

4.4.9. 外部代理

简单代理非常适合入门,但仅支持 STOMP 命令的子集(它不支持 ack、收据与其他一些特性),依赖于简单的消息发送循环,并且不适合于集群。 或者,可以升级应用程序以使用功能齐全的消息代理。

请参阅 STOMP 文档以了解选择的消息代理(例如 RabbitMQopen in new windowActiveMQopen in new window 与其它),安装代理,并在启用 STOMP 支持的情况下运行它。 然后,可以在 Spring 配置中启用 STOMP 代理中继(而不是简单代理)。

以下示例配置启用了功能齐全的代理:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/topic", "/queue");
        registry.setApplicationDestinationPrefixes("/app");
    }

}

下面的示例展示与前面的示例等效的 XML 配置:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio" />
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:stomp-broker-relay prefix="/topic,/queue" />
    </websocket:message-broker>

</beans>

先前配置中的 STOMP 代理中继是一个 Spring MessageHandleropen in new window,它通过将消息转发到外部消息代理来处理消息。 为此,它建立到代理的 TCP 连接,将所有消息转发给代理,然后通过客户端的 WebSocket 会话将从代理收到的所有消息转发给客户端。 本质上,它充当双向转发消息的“中继”。

TIP

io.projectreactor.netty:reactor-nettyio.netty:netty-all 所有依赖项添加到项目中以进行 TCP 连接管理。

此外,应用程序组件(例如 HTTP 请求处理方法、业务服务等)还可以将消息发送到代理中继,如“发送消息”中所述,将消息广播到订阅的 WebSocket 客户端。

实际上,代理中继可实现健壮且可伸缩的消息广播。

4.4.10. 连接到代理

STOMP 代理中继器维护与代理的单个“系统”TCP 连接。 此连接仅用于源自服务端应用程序的消息,而不用于接收消息。 可以为此连接配置 STOMP 凭据(即 STOMP 帧 loginpasscode 标头)。 这在 XML 命名空间与 Java 配置中都以 systemLoginsystemPasscode 属性(默认值为 guestguest)公开。

TIP

STOMP 代理中继始终在代表客户端转发给代理的每个 CONNECT 帧上设置 loginpasscode 标头。 因此,WebSocket 客户端无需设置这些标头。 它们将被忽略。 如“身份验证”部分所述,WebSocket 客户端应改为依靠 HTTP 身份验证来保护 WebSocket 端点并建立客户端身份。

STOMP 代理中继还通过“系统” TCP 连接向消息代理发送与从消息代理接收心跳。 可以配置发送与接收心跳的间隔(默认间隔为 10 秒)。 如果与代理的连接断开,那么代理中继每 5 秒继续尝试重新连接,直到成功。

当与代理的“系统”连接丢失并重新建立时,任何 Spring bean 都可以实现 ApplicationListener<BrokerAvailabilityEvent> 来接收通知。 例如,当没有活动的“系统”连接时,广播股票报价的股票报价服务可以停止尝试发送消息。

默认情况下,STOMP 代理中继始终连接到同一主机和端口,如果连接断开,那么根据需要重新连接。 如果希望提供多个地址,那么在每次尝试连接时,都可以配置地址供应商,而不是固定的主机与端口。 以下示例展示了如何执行此操作:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    // ...

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());
        registry.setApplicationDestinationPrefixes("/app");
    }

    private ReactorNettyTcpClient<byte[]> createTcpClient() {
        return new ReactorNettyTcpClient<>(
                client -> client.addressSupplier(() -> ... ),
                new StompReactorNettyCodec());
    }
}

还可以使用 virtualHost 属性配置 STOMP 代理中继。 此属性的值设置为每个 CONNECT 帧的 host 标头,这很有用(例如,在云环境中,建立 TCP 连接的实际主机与提供基于云的 STOMP 服务的主机是不同的)。

4.4.11. 将点作为分隔符

将消息路由到 @MessageMapping 方法时,它们将与 AntPathMatcher 进行匹配。 默认情况下,模式应使用斜杠(/)作为分隔符。 这是 Web 应用程序中的一个良好约定,类似于 HTTP URL。 但是,如果更习惯于消息传递约定,那么可以切换为使用点(.)作为分隔符。

以下示例展示了如何在Java配置中执行此操作:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    // ...

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setPathMatcher(new AntPathMatcher("."));
        registry.enableStompBrokerRelay("/queue", "/topic");
        registry.setApplicationDestinationPrefixes("/app");
    }
}

下面的示例展示与前面的示例等效的 XML 配置:

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:websocket="http://www.springframework.org/schema/websocket"
        xsi:schemaLocation="
                http://www.springframework.org/schema/beans
                https://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/websocket
                https://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker application-destination-prefix="/app" path-matcher="pathMatcher">
        <websocket:stomp-endpoint path="/stomp"/>
        <websocket:stomp-broker-relay prefix="/topic,/queue" />
    </websocket:message-broker>

    <bean id="pathMatcher" class="org.springframework.util.AntPathMatcher">
        <constructor-arg index="0" value="."/>
    </bean>

</beans>

然后,控制器可以使用点(.)作为 @MessageMapping 方法中的分隔符,如以下示例所示:

@Controller
@MessageMapping("red")
public class RedController {

    @MessageMapping("blue.{green}")
    public void handleGreen(@DestinationVariable String green) {
        // ...
    }
}

客户端现在可以将消息发送到 /app/red.blue.green123

在前面的示例中,没有更改“代理中继”上的前缀,因为这些前缀完全取决于外部消息代理。 有关使用的代理的信息,请参见 STOMP 文档页面,以查看其对目的地标头支持的约定。

另一方面,“简单代理”确实依赖于已配置的 PathMatcher,因此,如果切换分隔符,该更改也将应用于代理,以及代理将目的地从消息匹配到订阅中的模式的方式。

4.4.12. 身份验证

每个通过 WebSocket 进行的 STOMP 消息传递会话均以 HTTP 请求开头。 这可以是升级到 WebSockets 的请求(即 WebSocket 握手),或者在 SockJS 回退的情况下,可以是一系列 SockJS HTTP 传输请求。

许多 Web 应用程序已经具有身份验证和授权来保护 HTTP 请求。 通常,使用某种机制(例如:登录页面、HTTP 基本认证或其他方式)通过 Spring Security 对用户进行认证。 经过身份验证的用户的安全性上下文保存在 HTTP 会话中,并与同一基于 cookie 的会话中的后续请求相关联。

因此,对于 WebSocket 握手或 SockJS HTTP 传输请求,通常已经有一个可以通过 HttpServletRequest#getUserPrincipal() 访问的经过身份验证的用户。 Spring 会自动将该用户与为其创建的 WebSocket 或 SockJS 会话相关联,并随后与通过该用户标头在该会话上传输的所有 STOMP 消息相关联。

简而言之,一个典型的 Web 应用程序除了已经为安全性做的事情之外,不需要做任何其他事情。 通过基于 cookie 的 HTTP 会话(然后与为该用户创建的 WebSocket 或 SockJS 会话相关联)维护的安全上下文在 HTTP 请求级别对用户进行身份验证,并导致在流经应用程序的每个 Message 上加盖用户标头。

请注意,STOMP 协议在 CONNECT 帧上确实具有 loginpasscode 标头。 它们最初是为 TCP 设计的,现在仍然需要它们,例如:基于 TCP 的 STOMP。 但是,对于基于 WebSocket 的 STOMP,默认情况下,Spring 会忽略 STOMP 协议级别的授权标头,假定用户已经在 HTTP 传输级别进行了身份验证,并期望 WebSocket 或 SockJS 会话包含已通过身份验证的用户。

TIP

Spring Security 提供了 WebSocket 子协议授权open in new window,该授权使用 ChannelInterceptor 来基于消息中的用户标头对消息进行授权。 而且,Spring Session 提供了 WebSocket 集成open in new window,以确保当 WebSocket 会话仍处于活动状态时,用户 HTTP 会话不会过期。

4.4.13. 令牌(Token)验证

Spring Security OAuthopen in new window 支持基于令牌的安全支持,包括 JSON Web 令牌(JWT)。 可以将其用作 Web 应用程序中的身份验证机制,包括上一节中所述的 WebAPI 交互中的 STOMP(即通过基于 cookie 的会话维护身份)。

同时,基于 cookie 的会话并非总是最合适的(例如,在不维护服务端会话的应用程序中或在通常使用标头进行身份验证的移动应用程序中)。

WebSocket 协议 RFC 6455open in new window中“在 WebSocket 握手过程中,服务端没有指定任何特定的方式来验证客户端。” 然而,在实践中,浏览器客户端只能使用标准的身份验证标头(即基本 HTTP 身份验证)或 cookie,而不能(例如)提供自定义的标头。 同样,SockJS JavaScript 客户端也不提供通过 SockJS 传输请求发送 HTTP 标头的方法。 请参阅 sockjs-client 问题 196open in new window。 相反,它确实允许发送可用于发送令牌的查询参数,但是有其自身的缺点(例如,令牌可能会无意中与服务器日志中的 URL 一起记录)。

TIP

前述限制适用于基于浏览器的客户端,不适用于基于 Spring Java 的 STOMP 客户端,该客户端确实支持通过 WebSocket 与 SockJS 请求发送标头。

因此,希望避免使用 cookie 的应用程序可能没有在 HTTP 协议级别进行身份验证的任何好的替代方案。 他们可能更喜欢在 STOMP 消息传递协议级别使用标头进行身份验证,而不是使用 cookie。 这样做需要两个简单的步骤:

  1. 使用 STOMP 客户端在连接时传递身份验证标头。

  2. 使用 ChannelInterceptor 处理身份验证标头。

下一个示例使用服务器端配置来注册自定义身份验证拦截器。 请注意,拦截器仅需要认证并在 CONNECT Message 上设置用户标头。 Spring 记录并保存经过身份验证的用户,并将其与同一会话上的后续 STOMP 消息相关联。 以下示例展示了如何注册自定义身份验证拦截器:

@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ChannelInterceptor() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                StompHeaderAccessor accessor =
                        MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
                if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                    Authentication user = ... ; // access authentication header(s)
                    accessor.setUser(user);
                }
                return message;
            }
        });
    }
}

另外,请注意,目前,当使用 Spring Security 的消息授权时,需要确保在 Spring Security 之前订阅了 ChannelInterceptor 身份验证配置。 最好通过在其自己的 WebSocketMessageBrokerConfigurer 实现中声明自定义拦截器来完成,该实现用 @Order(Ordered.HIGHEST_PRECEDENCE + 99) 标记。

4.4.14. 用户目的地

应用程序可以发送针对特定用户的消息,并且 Spring 的 STOMP 支持可以识别以 /user/ 为前缀的目的地。 例如,客户端可能订阅了 /user/queue/position-updates 目的地。 该目的地由 UserDestinationMessageHandler 处理,并转换为用户会话唯一的目的地(例如 /queue/position-updates-user123)。 这提供了订阅通用命名目的地的便利,同时确保与订阅相同目的地的其他用户不发生冲突,以便每个用户都可以接收唯一的持仓更新。

在发送方,可以将消息发送到一个目的地,例如 /user/{username}/queue/position-updates,然后将其由 UserDestinationMessageHandler 转换为一个或多个目的地,每个与用户关联的会话一个目的地。 这样,应用程序中的任何组件都可以发送针对特定用户的消息,而不必知道他们的姓名与通用目的地。 注解与消息传递模板也支持此特性。

消息处理方法可以通过 @SendToUser 注解将消息发送给与正在处理的消息相关联的用户(在类级上也支持共享公共目标),如以下示例所示:

@Controller
public class PortfolioController {

    @MessageMapping("/trade")
    @SendToUser("/queue/position-updates")
    public TradeResult executeTrade(Trade trade, Principal principal) {
        // ...
        return tradeResult;
    }
}

如果用户具有多个会话,那么默认情况下,所有订阅给定目标的会话都将成为目的地。 但是,有时可能仅需要将发送正在处理的消息的会话作为目的地。 可以通过将 broadcast 属性设置为 false 来做到这一点,如以下示例所示:

@Controller
public class MyController {

    @MessageMapping("/action")
    public void handleAction() throws Exception{
        // raise MyBusinessException here
    }

    @MessageExceptionHandler
    @SendToUser(destinations="/queue/errors", broadcast=false)
    public ApplicationError handleException(MyBusinessException exception) {
        // ...
        return appError;
    }
}

TIP

尽管用户目的地通常暗指经过身份验证的用户,但这并不是严格要求的。 与已认证用户不关联的 WebSocket 会话可以订阅用户目的地。 在这种情况下,@SendToUser 注解的行为与 broadcast=false 完全相同(也就是说,仅针对发送正在处理的消息的会话)。

可以从任何应用程序组件向用户目的地发送消息,例如,注入由 Java 配置或 XML 命名空间创建的 SimpMessagingTemplate。 (如果需要使用 @Qualifier 进行限定,那么 bean 名称为 brokerMessagingTemplate。) 下面的示例演示了如何执行此操作:

@Service
public class TradeServiceImpl implements TradeService {

    private final SimpMessagingTemplate messagingTemplate;

    @Autowired
    public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) {
        this.messagingTemplate = messagingTemplate;
    }

    // ...

    public void afterTradeExecuted(Trade trade) {
        this.messagingTemplate.convertAndSendToUser(
                trade.getUserName(), "/queue/position-updates", trade.getResult());
    }
}

TIP

将用户目标与外部消息代理一起使用时,应查看代理文档以了解如何管理非活动队列,以便在用户会话结束时,将删除所有唯一的用户队列。 例如,当使用诸如 /exchange/amq.direct/position-updates 之类的目的地时,RabbitMQ 会创建自动删除队列。 因此,在这种情况下,客户端可以订阅 /user/exchange/amq.direct/position-updates。 同样,ActiveMQ 具有用于清除非活动目标的配置选项open in new window

在多应用程序服务器方案中,由于用户连接到其他服务器,因此用户目的地可能无法解析。 在这种情况下,可以配置目的地以广播未解决的消息,以便其他服务器可以尝试。 这可以通过 Java 配置中 MessageBrokerRegistryuserDestinationBroadcast 属性以及 XML 中 message-broker 元素的 user-destination-broadcast 属性来完成。

4.4.15. 消息顺序

来自代理的消息被发布到 clientOutboundChannel,从那里被写入 WebSocket 会话。 由于该通道由 ThreadPoolExecutor 支持,因此消息在不同的线程中处理,并且客户端接收到的结果序列可能与发布的确切顺序不匹配。

如果这是一个问题,请启用 setPreservePublishOrder 标志,如以下示例所示:

@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    protected void configureMessageBroker(MessageBrokerRegistry registry) {
        // ...
        registry.setPreservePublishOrder(true);
    }

}

下面的示例展示与前面的示例等效的 XML 配置:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker preserve-publish-order="true">
        <!-- ... -->
    </websocket:message-broker>

</beans>

设置该标志后,同一客户端会话中的消息将一次一个地发布到 clientOutboundChannel,这样可以保证发布顺序。 请注意,这会产生较小的性能开销,因此,只有在需要时启用它。

4.4.16. 事件

几个 ApplicationContext 事件被发布,并且可以通过实现 Spring 的 ApplicationListener 接口来接收:

  • BrokerAvailabilityEvent: 指示代理何时可用或不可用。 虽然“简单”代理在启动时立即可用,并在应用程序运行时保持可用,但 STOMP“代理中继”可能会失去与全功能代理的连接(例如,重新启动代理)。 代理中继具有重新连接逻辑,并在代理返回时重新建立与代理的“系统”连接。 结果,只要状态从已连接变为断开,就会发布此事件,反之亦然。 使用 SimpMessagingTemplate 的组件应订阅此事件,并避免在代理不可用时发送消息。 在任何情况下,当发送消息时,都应该准备好处理 MessageDeliveryException

  • SessionConnectEvent: 在收到新的 STOMP CONNECT 帧时发布,以指示新的客户端会话的开始。 该事件包含代表连接的消息,包括:会话 ID、用户信息(如果有)与客户端发送的所有自定义标头。 这对于跟踪客户端会话很有用。 订阅此事件的组件可以使用 SimpMessageHeaderAccessorStompMessageHeaderAccessor 包装包含的消息。

  • SessionConnectedEvent: 当代理发送了一个 STOMP CONNECTED 帧来响应 CONNECT 时,在 SessionConnectEvent 之后不久发布。 此时,可以认为 STOMP 会话已完全建立。

  • SessionSubscribeEvent: 在收到新的 STOMP SUBSCRIBE 帧时发布。

  • SessionUnsubscribeEvent: 在收到新的 STOMP UNSUBSCRIBE 帧时发布。

  • SessionDisconnectEvent: 在 STOMP 会话结束时发布。 DISCONNECT 帧可能是从客户端发送的,也可能是在 WebSocket 会话关闭时自动生成的。 在某些情况下,这个事件在每个会话中发布不止一次。 组件对于多个断开连接事件应该是幂等的。

TIP

当使用全功能代理时,如果代理暂时不可用,那么 STOMP“代理中继”会自动重新连接“系统”连接。 但是,客户端连接不会自动重新连接。 假设启用了心跳,那么客户端通常会注意到代理在 10 秒内没有响应。 客户需要实现自己的重新连接逻辑。

4.4.17. 拦截

事件提供有关 STOMP 连接生命周期的通知,但不提供每条客户端消息的通知。 应用程序还可以注册一个 ChannelInterceptor 来拦截处理链中任何部分的任何消息。 以下示例展示了如何拦截来自客户端的入站消息:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new MyChannelInterceptor());
    }
}

自定义 ChannelInterceptor 可以使用 StompHeaderAccessorSimpMessageHeaderAccessor 访问相关消息的信息,如以下示例所示:

public class MyChannelInterceptor implements ChannelInterceptor {

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getStompCommand();
        // ...
        return message;
    }
}

应用程序还可以实现 ExecutorChannelInterceptor,它是 ChannelInterceptor 的子接口,在处理消息的线程中具有回调。 对于发送到频道的每个消息,ChannelInterceptor 都会被一次调用,但 ExecutorChannelInterceptor 会在订阅来自该频道的消息的每个 MessageHandler 的线程中都提供钩子。

请注意,与前面所述的 SessionDisconnectEvent 一样,DISCONNECT 消息可以来自客户端,也可以在 WebSocket 会话关闭时自动生成。 在某些情况下,对于每个会话,拦截器可能会多次拦截此消息。 组件对于多个断开连接事件应该是幂等的。

4.4.18. STOMP 客户端

Spring 提供了 WebSocket 客户端与 TCP 客户端。

首先,可以创建并配置 WebSocketStompClient,如下面的示例所示:

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats

在前面的示例中,可以用 SockJsClient 替换 StandardWebSocketClient,因为这也是 WebSocketClient 的实现。 SockJsClient 可以使用 WebSocket 传输或回退基于 HTTP 传输。 有关更多详细信息,请参见 SockJsClientopen in new window

接下来,可以建立连接并为 STOMP 会话提供处理程序,如以下示例所示:

String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);

当会话准备好使用时,将通知处理程序,如以下示例所示:

public class MyStompSessionHandler extends StompSessionHandlerAdapter {

    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        // ...
    }
}

一旦建立会话,就可以发送任何有效负载,并使用配置的 MessageConverter 对其进行序列化,如以下示例所示:

session.send("/topic/something", "payload");

还可以订阅目的地。 subscribe 方法需要一个用于订阅中消息的处理程序,并返回可用于取消订阅的 Subscription 句柄。 对于每个收到的消息,处理程序可以指定应将有效负载反序列化到的目标对象类型,如以下示例所示:

session.subscribe("/topic/something", new StompFrameHandler() {

    @Override
    public Type getPayloadType(StompHeaders headers) {
        return String.class;
    }

    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        // ...
    }

});

要启用 STOMP 心跳,可以使用 TaskScheduler 配置 WebSocketStompClient 并有选择地自定义心跳间隔(10 秒未写发送心跳,10 秒未读关闭连接)。

WebSocketStompClient 仅在不活动的情况下(即未发送其他消息时)发送心跳。 使用外部代理时,这可能会带来挑战,因为带有非代理目的地的消息表示活动,但实际上不会转发到代理。 在这种情况下,可以在初始化外部代理时配置 TaskScheduler,以确保即使仅发送具有非代理目标的消息,也可以将心跳转发到代理。

TIP

当使用 WebSocketStompClient 进行性能测试,以模拟同一台计算机上的数千个客户端时,请考虑关闭心跳,因为每个连接都计划自己的心跳任务,并且并未针对在同一台计算机上运行的大量客户端进行优化。

STOMP 协议还支持收据,客户端必须在其中添加一个 receipt 标头,在发送或订阅处理之后,服务器以 RECEIPT 帧作为响应标头。 为此,StompSession 提供了 setAutoReceipt(boolean),它会在每个后续发送或订阅事件上添加一个 receipt 报头。 或者,也可以手动将收据标头添加到 StompHeaders。 发送与订阅都返回一个 Receiptable 实例,可以使用该实例注册接收成功与失败的回调。 要使用此特性,必须为客户端配置 TaskScheduler 以及收据过期之前的时间(默认为 15 秒)。

请注意,StompSessionHandler 本身是一个 StompFrameHandler,它除了处理来自消息处理的异常的 handleException 回调与处理包括 ConnectionLostException 的传输级错误的 handleTransportError 之外,还可以处理 ERROR 帧。

4.4.19. WebSocket 作用域

每个 WebSocket 会话都有一个属性映射。 该映射作为标头附加到入站客户端消息,可以通过控制器方法进行访问,如以下示例所示:

@Controller
public class MyController {

    @MessageMapping("/action")
    public void handle(SimpMessageHeaderAccessor headerAccessor) {
        Map<String, Object> attrs = headerAccessor.getSessionAttributes();
        // ...
    }
}

可以在 websocket 作用域内声明一个 Spring 托管的 Bean。 可以将 WebSocket 作用域的 Bean 注入控制器以及在 clientInboundChannel 上注册的任何通道拦截器中。 这些都是典型的单例对象,并且比任何单独的 WebSocket 会话寿命更长。 因此,需要对作用域 WebSocket 的 Bean 使用作用域代理模式,如以下示例所示:

@Component
@Scope(scopeName = "websocket", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class MyBean {

    @PostConstruct
    public void init() {
        // 注入依赖项后调用
    }

    // ...

    @PreDestroy
    public void destroy() {
        // WebSocket 会话结束时调用
    }
}

@Controller
public class MyController {

    private final MyBean myBean;

    @Autowired
    public MyController(MyBean myBean) {
        this.myBean = myBean;
    }

    @MessageMapping("/action")
    public void handle() {
        // 当前 WebSocket 会话中的 this.myBean
    }
}

与任何自定义范围一样,Spring 首次在控制器中对其进行访问时会初始化一个新的 MyBean 实例,并将该实例存储在 WebSocket 会话属性中。 随后将返回相同的实例,直到会话结束。 WebSocket 作用域的 Bean 调用了所有 Spring 生命周期方法,如前面的示例所示。

4.4.20. 性能

关于性能,没有万灵药。 影响它的因素很多,包括消息的大小与数量,应用程序方法是否执行需要阻止的工作以及外部因素(例如网络速度与其他问题)。 本部分的目的是提供可用配置选项的概述,以及有关如何进行扩展的一些想法。

在消息传递应用程序中,消息通过频道传递以进行异步执行,并由线程池支持。 配置这样的应用程序需要对频道与消息流有充分的了解。 因此,建议查看消息流

最明显的起点是配置支持 clientInboundChannelclientOutboundChannel 的线程池。 默认情况下,两者都配置为可用处理器数量的两倍。

如果带注解的方法中的消息处理主要是受 CPU 限制的,那么 clientInboundChannel 的线程数应保持接近处理器数。 如果它们所做的工作更多地受到 IO 限制,并且需要阻塞或等待数据库或其他外部系统,那么可能需要增加线程池大小。

TIP

ThreadPoolExecutor 具有三个重要属性:核心线程池大小、最大线程池大小以及队列存储没有可用线程的任务的容量。

常见的混淆点是,配置核心池大小(例如 10)与最大池大小(例如 20)会导致线程池具有 10 到 20 个线程。 实际上,如果将容量保留为其默认值 Integer.MAX_VALUE,那么由于所有其他任务都已排队,因此线程池永远不会超过核心池的大小。

请参阅 ThreadPoolExecutor 的 JavaDoc,以了解这些属性如何工作并了解各种排队策略。

clientOutboundChannel 方面,所有关于向 WebSocket 客户端发送消息。 如果客户端位于快速网络上,那么线程数应保持接近可用处理器数。 如果客户端很慢或带宽很低,那么将花费更长的时间来消耗消息并给线程池增加负担。 因此,必须增加线程池的大小。

虽然 clientInboundChannel 的工作量可以预测——毕竟,它是基于应用程序的工作——但是,如何配置“clientOutboundChannel”却比较困难,因为它基于应用程序无法控制的因素。 因此,还有两个与消息发送有关的属性:sendTimeLimitsendBufferSizeLimit。 可以使用这些方法来配置发送消息到客户端时允许发送多长时间以及可以缓冲多少数据。

通常的想法是,在任何给定时间,只能使用单个线程将其发送给客户端。 同时,所有其他消息都将被缓冲,可以使用这些属性来决定允许发送消息花费多长时间以及在此期间可以缓冲多少数据。 有关其他重要信息,请参见 XML 模式的 JavaDoc 与文档。

以下示例展示了可能的配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setSendTimeLimit(15 * 1000).setSendBufferSizeLimit(512 * 1024);
    }

    // ...

}

下面的示例展示与前面的示例等效的 XML 配置:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker>
        <websocket:transport send-timeout="15000" send-buffer-size="524288" />
        <!-- ... -->
    </websocket:message-broker>

</beans>

还可以使用前面显示的 WebSocket 传输配置来配置传入 STOMP 消息的最大允许大小。 从理论上讲,WebSocket 消息的大小几乎是无限的。 实际上,WebSocket 服务端会施加限制,例如:Tomcat 8K 与 Jetty 64K。 因此,STOMP 客户端(例如 JavaScript webstomp-clientopen in new window 等)在 16K 边界处拆分较大的 STOMP 消息,并将其作为多个 WebSocket 消息发送,这需要服务器进行缓冲与重新组装。

Spring 的 STOMP-over-WebSocket 支持可以做到这一点,因此应用程序可以为 STOMP 消息配置最大大小,而与 WebSocket 服务器特定的消息大小无关。 请记住,如有必要,将自动调整 WebSocket 消息的大小,以确保它们最多可以承载 16K WebSocket 消息。

以下示例展示了一种可能的配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setMessageSizeLimit(128 * 1024);
    }

    // ...

}

下面的示例展示与前面的示例等效的 XML 配置:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker>
        <websocket:transport message-size="131072" />
        <!-- ... -->
    </websocket:message-broker>

</beans>

关于扩展的重要一点涉及使用多个应用程序实例。 当前,不能使用简单的代理来做到这一点。 但是,当使用全功能代理(例如 RabbitMQ)时,每个应用程序实例都连接到代理,并且从一个应用程序实例广播的消息可以通过代理广播到通过任何其他应用程序实例连接的 WebSocket 客户端。

4.4.21. 监控

当使用 @EnableWebSocketMessageBroker<websocket:message-broker> 时,关键基础结构组件会自动收集统计信息与计数器,这些统计信息与计数器可提供对应用程序内部状态的重要了解。 该配置还声明了一个 WebSocketMessageBrokerStats 类型的 Bean,该 Bean 将所有可用信息收集到一个位置,并且默认情况下每 30 分钟在 INFO 级别记录一次。 该 Bean 可以通过 Spring 的 MBeanExporter 导出到 JMX,以便在运行时查看(例如,通过 JDK 的 jconsole)。 以下列表总结了可用的信息:

  • Client WebSocket Sessions
    • Current 指示当前有多少个客户端会话,计数进一步分解为 WebSocket、HTTP 流与轮询 SockJS 会话。
    • Total 指示已建立的会话总数。
    • Abnormally Closed
      • Connect Failures 已建立但在 60 秒内未收到任何消息后关闭的会话。 这通常表示代理或网络问题。
      • Send Limit Exceeded 超过配置的发送超时或发送缓冲区限制后,会话将关闭,缓慢的客户端可能会发生该会话(请参阅上一节)。
      • Transport Errors 传输错误(例如无法读取或写入 WebSocket 连接或 HTTP 请求或响应)后,会话关闭。
    • STOMP Frames 已处理的 CONNECT、CONNECTED 与 DISCONNECT 帧的总数,指示在 STOMP 级别上连接了多少个客户端。 请注意,当会话异常关闭或客户端未发送 DISCONNECT 帧而关闭时,DISCONNECT 计数可能会偏低。
  • STOMP Broker Relay
    • TCP Connections 指示与代理建立了代表客户端 WebSocket 会话的 TCP 连接数。 这应该等于客户端 WebSocket 会话的数量 +1 个用于从应用程序内部发送消息的附加共享“系统”连接。
    • STOMP Frames 代表客户转发到代理或从代理接收的 CONNECT、CONNECTED 与 DISCONNECT 帧的总数。 请注意,无论客户端 WebSocket 会话如何关闭,DISCONNECT 帧都会发送到代理。 因此,偏低的 DISCONNECT 帧计数表示代理正在主动关闭连接(可能是由于未及时到达的心跳、无效的输入帧或其他问题)。
  • Client Inbound Channel 来自支持 clientInboundChannel 的线程池的统计信息,可深入了解传入消息处理的运行状况。 此处排队的任务表明该应用程序可能太慢而无法处理消息。 如果存在 I/O 绑定的任务(例如:缓慢的数据库查询、对第三方 REST API 的 HTTP 请求等),请考虑增加线程池的大小。
  • Client Outbound Channel 来自支持 clientOutboundChannel 的线程池的统计信息,该统计信息提供了对向客户端广播消息的运行状况的深入了解。 此处排队的任务表明客户端太慢而无法消费消息。 解决此问题的一种方法是增加线程池大小,以容纳并发慢速客户端的预期数量。 另一个选择是减少发送超时与发送缓冲区大小限制(请参阅上一节)。
  • SockJS Task Scheduler 来自 SockJS 任务调度程序的线程池的统计信息,用于发送心跳。 请注意,在 STOMP 级别协商心跳时,将禁用 SockJS 心跳。

4.4.22. 测试

当使用 Spring 的 STOMP-over-WebSocket 支持时,主要有两种方法来测试应用程序。 第一种是编写服务端测试,以验证控制器及其带注解的消息处理方法的功能性。 第二种是编写涉及运行客户端与服务端的完整的端到端测试。

两种方法不是互斥的。 相反,每一种都在整体测试策略中占有一席之地。 服务端测试更加集中,更易于编写和维护。 另一方面,端到端集成测试,更完整并且测试更多,但也更涉及到编写与维护。

服务端测试的最简单形式是编写控制器单元测试。 但是,这还不够有用,因为控制器所做的很多事情都取决于其注释。 纯单元测试根本无法测试。

理想情况下,应该像在运行时那样调用被测控制器,就像使用 Spring MVC Test 框架测试处理 HTTP 请求的控制器的方法一样,即不运行 Servlet 容器而是依靠 Spring 框架来调用被测控制器。 带注解的控制器。 与 Spring MVC Test 一样,有两种可能的选择,要么使用“基于上下文的”设置,要么使用“独立的”设置:

  • 在 Spring TestContext 框架的帮助下加载实际的 Spring 配置,将 clientInboundChannel 注入为测试字段,并使用它发送消息以由控制器方法处理。

  • 手动设置调用控制器(即 SimpAnnotationMethodMessageHandler)所需的最小 Spring 框架基础设施,并将控制器的消息直接传递给它。

tests for the stock portfolioopen in new window 示例应用中演示了这两种设置方案。

第二种方法是创建端到端集成测试。 为此,需要以嵌入式模式运行 WebSocket 服务器,并将其作为 WebSocket 客户端连接到该服务器,该客户端发送包含 STOMP 帧的 WebSocket 消息。 在 tests for the stock portfolioopen in new window 示例应用中,还通过将 Tomcat 用作嵌入式 WebSocket 服务器与用于测试目的的简单 STOMP 客户端来演示此方法。