
oneacrefund/superset-websocket一个Node.js WebSocket服务器,用于向Superset Web应用前端发送异步事件数据。
要使用此功能,Superset需配置启用全局异步查询并使用WebSocket作为传输方式(详见下文)。
基于SIP-39定义的架构实现。
异步事件从Superset Flask应用推送到Redis流。特定用户的事件会发布到两个流:1)包含所有用户事件的全局事件流;2)仅该用户的频道/会话特定流。这种方式在性能(从单个全局流读取)和容错性(断开的连接可通过频道特定流“追赶”)之间取得平衡。
注意:此处未使用Redis流的消费者组,因为每个组仅接收流数据的子集,而WebSocket客户端与每个应用实例有持久连接,需要访问流中的所有数据。WebSocket应用的水平扩展需要多个WebSocket服务器,每个都能完全访问Redis流数据。
用户浏览器初始通过HTTP连接WebSocket服务器,请求中包含Flask应用设置的JWT认证Cookie。注意:由于基于Cookie的认证方式,WebSocket服务器必须与Web应用运行在同一主机上。 服务器使用共享密钥(配置:jwtSecret)验证JWT令牌,验证通过后将连接升级为WebSocket。JWT中包含用户会话的“频道”ID,作为向用户连接的socket发送事件的依据。
用户在单个频道(会话)ID下可能有多个WebSocket连接(例如多个浏览器标签)。在此情况下,特定频道的所有事件会发送到所有连接的socket,由消费者决定哪些事件与当前应用上下文相关。
由于网络波动,用户的WebSocket连接可能断开。Superset前端代码跟踪最后接收的异步事件ID,并尝试使用初始HTTP请求中的last_id查询参数重连WebSocket服务器。如果连接包含有效的last_id,服务器会从用户特定的Redis流中读取可能已接收但发送失败的事件,并重新发送到新连接。之后全局事件流负责向连接的socket发送后续事件。
服务器利用标准WebSocket的ping/pong功能判断活动连接是否存活。定期(配置:pingSocketsIntervalMs)向活动socket发送ping,收到pong响应时更新内部socket注册表的时间戳。若在超时时间(配置:socketResponseTimeoutMs)内未收到pong响应,将终止socket并从注册表中移除。
此外,内部频道注册表会定期清理(配置:gcChannelsIntervalMs)以删除过期引用,防止长期运行导致的内存过度消耗。
安装依赖:
bashnpm ci
将config.example.json复制为config.json,并根据环境调整值。也支持通过环境变量配置,完整配置项可参考src/config.ts。
需在Superset Flask应用中配置(superset_config.py)以启用全局异步查询:
启用GLOBAL_ASYNC_QUERIES特性标志:
python"GLOBAL_ASYNC_QUERIES": True
配置以下Superset值:
pythonGLOBAL_ASYNC_QUERIES_TRANSPORT = "ws" GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL = "ws://<主机>:<端口>/"
注意:WebSocket服务器必须与Web应用运行在同一主机名(不同端口),以便Cookie在Flask应用和WebSocket服务器之间共享。此外,localhost和127.0.0.1不视为同一主机。例如,若浏览器访问localhost:<端口>的Superset,则WebSocket URL需配置为localhost:<端口>。
以下配置值在Flask应用配置和config.json中必须相同:
textGLOBAL_ASYNC_QUERIES_CACHE_BACKEND GLOBAL_ASYNC_QUERIES_REDIS_STREAM_PREFIX GLOBAL_ASYNC_QUERIES_JWT_COOKIE_NAME GLOBAL_ASYNC_QUERIES_JWT_SECRET
更多Superset异步查询配置信息:[***]
应用使用hot-shots库通过StatsD跟踪指标,如连接客户端数量、向客户端发送消息失败尝试次数等。
可通过配置文件中的statsd对象配置StatsD。默认配置如下:
json{ "statsd": { "host": "127.0.0.1", "port": 8125, "globalTags": [] } }
bashnpm run dev-server
bashnpm run build && npm start
WebSocket服务器支持通过以下方式进行健康检查:
GET /health
或
HEAD /health
TODO:容器化WebSocket服务器
manifest unknown 错误
TLS 证书验证失败
DNS 解析超时
410 错误:版本过低
402 错误:流量耗尽
身份认证失败错误
429 限流错误
凭证保存错误
来自真实用户的反馈,见证轩辕镜像的优质服务