欢迎进入Wiki » FAQ » 基于 CometD 的消息总线?

基于 CometD 的消息总线?

在2014-07-07 21:56上被李小翔修改
评论 (0) · 附件 (0) · 记录 · 信息

grails-platform-core插件的事件总线(Events Bus) 中,通过插件 platform-core 实现了一个应用内部的事件总线,但是该总线存在下面两个问题:

  1. 只适用于单一 JVM、单一应用,无法跨 JVM、应用传递消息
  2. 无法通过Web端发送或监听消息(除非安装其他插件)

为了解决上述问题,BroToolkit/Framework还内置了插件cometd 并对其做了改造,集成了支持 JDK 1.6 的最新版本的 CometD,实现了支持集群、集群内消息广播、跨应用、支持BS结构的异步消息总线。

当然,作为消息总线,和前者的区别还在于:

  • 消息主体不能是复杂对象,比如 Domain 实例,除非能够完全序列化
  • 消息推送与监听都是异步的,消息发布方完全无法了解到监听方的执行情况
  • 不能监听 gorm 事件(当然也可以扩展)

后台API

BroFramework 提供了CometdService 服务,封装了常用的 CometD API,主要包括下面的API:

  • publish:发布消息
  • subscribe:监听频道
  • unsubscribe:取消监听

注意这里的频道和 platform-core 插件里的频道的区别,这里频道是一个以字符“/”分层的字符串,如 /foo/bar。

发布、监听时,同样可以使用通配符,如 /foo/** 代表 /foo 下所有频道(含孙子频道)、/foo/ 仅代表子频道。

关于频道的详细定义可以参考这里

调用示例:

  • 本应用内:
def cometdService = ctx.cometdService

// 监听频道 /test
// 第二个参数为一个闭包,参数为 publish 的消息对象
def listener = cometdService.subscribe("/test", {data->
    log.info "test data is " + data
})

// 发布消息
// 这里的消息可以是字符串、Map等各种可以序列化的对象
// 消息发布后,后台日志中会输出 “test data is  foobar”
cometdService.publish("/test", "foobar")

// 取消监听
cometdService.unsubscribe( "/test", listener )
// 或者取消该频道内所有的监听器
cometdService.unsubscribe( "/test" )
  • 跨应用:
import bropen.framework.core.security.DomainApplication
def app = DomainApplication.findByCode("Foobar")

// 发布消息到应用 Foobar 的 /test 频道
cometdService.publish( app, "/test", "foobar")

// 监听等示例略
...
  • 跨应用(直接访问CometD服务):
// 发布消息到某远程服务器的 /test 频道
cometdService.publish( "http://foobar.com/Foobar/cometd", "/test", "foobar")

// 监听等示例略
...

Web API

Comet 本身是一种基于 HTTP 长连接的“服务器推”技术。基于这种架构开发的应用中,服务器端会主动以异步的方式向客户端程序推送数据,而不需要客户端显式的发出请求。Comet 架构非常适合事件驱动的 Web 应用,以及对交互性和实时性要求很强的应用,如股票交易行情分析、聊天室和 Web 版在线游戏等。

而 CometD 是一个 Comet 的技术实现,因此,天生就提供了基于 jQuery 和 Dojo 框架的 Web API,详细可以参考这里

BroFramework 中,对其稍稍作了封装,示例如下:

// 新增了 ready 方法,自动连接当前应用(支持断线重连),成功后执行回调函数
$j.cometd.ready(function(){
   // 回调函数中,监听 /test 频道
   $j.cometd.subscribe( "/test", function(message){
        console.log( message.data )
    });
   // 发布消息到 /test 频道
   $j.cometd.publish("/test", {foo: "bar"});
})

集群

CometD 的集群方案叫做 Oort,通过应用前端的软硬件(如Apache)负载均衡器,能够实现负载均衡、失效转移、集群间消息广播等功能。

集群配置

BroToolkit 默认提供了 CometD 集群功能,但是未启用,而 BroFramework 中默认启用了集群(通过Config.groovy配置 bropen.toolkit.cluster.cometd.enabled)。

但是,Oort 启动时,通过组播技术来发现集群节点时,需要告诉其他节点本服务器的地址(含端口),而 Java 技术中并没有现成的解决方案获得本 JVM 绑定的 IP 地址和端口(如8080),因此,这里需要做一点配置:

  • 在应用服务器的 /bropen (bropen.toolkit.cluster.config.file)文件夹下,创建一个 clusters.properties 文件
  • 文件内容可以参考 BroToolkit 中的示例,这里主要需要配置上面所说的两个信息:server.host、server.port
  • 需要注意的是,如果是垂直集群,则两个节点下需要分别创建的配置文件,且 server.host 必须指向不同的 IP 地址

例如节点1配置:

server.host = 192.168.0.11
server.port = 8881

节点2配置:

server.host = 192.168.0.12
server.port = 8882

应用启动时,日志中会陆续输出下面这些日志:

# 本节点(节点1)的 cometURL
CometD oort.url is: http://192.168.0.11:8080/Foobar/cometd

# 根据应用名自动计算的组播端口
CometD oort multicast port: 6129

# 注册广播频道
CometD oort.channels: /broadcast/**

# 节点2加入集群
Comet joined the cloud http://192.168.0.12:8882/Foobar/cometd

各节点都加入到集群后,就能实现集群的负载均衡、失效转移等功能了。

消息广播

在集群环境下,可能有不同的 CometD 客户端(web端或后台)在发送消息,默认情况下,这些消息是不会被广播到其他集群节点的。

Oort可以通过配置Servlet参数,即上面日志里的 oort.channels 来实现对这些频道的广播。BroToolkit 中,默认对频道 /broadcast/** 进行了广播配置。

因此,如果需要将消息广播到所有集群节点中,只需要将消息发送到 /broadcast 的下级频道即可,如:

subscribe( "/broadcast/test", .... )

publish( "/broadcast/test", "test data" )
在2014-07-07 18:49上被李小翔创建

Copyright © 2013 北京博瑞开源软件有限公司
京ICP备12048974号