项目中经常用到各种需要阻塞操作的组件,如联网上传、下载,硬件设备通讯,原来多是使用线程来解决,传递一些简单的进度信息。

最近做的项目用到了多台主机采集数据,同样也有一些上传功能,还有串口读卡器,此时很需要一套完善的组件通讯机制,用来分发任务,收集进度反馈。

前端使用消息服务器解决组件通讯问题,后端使用状态机(参考qfFSM有限自动机)来解决逻辑问题,理论上讲一套消息驱动的组件便OK啦。

这份消息队列服务器由两部分组成:

  1. 服务器:独立exe,命令行方式,接受参数,可指定一个tcp端口以及IP,默认为“tcp://*:117058”;
  2. 客户端:Win32接口DLL,提供连接服务器,发送、收取消息,注册、删除服务器消息队列;

服务器exe说明:

服务器启动后界面如下,最下方会显示busy或者idle,后面last 1s为最近一秒内处理的请求数目,sum为服务器启动后处理的请求总数。

下图中可以看到,服务处于busy状态,通讯端点为本机tcp端口117058,最近一秒处理了129192个请求。

客户端Win32 DLL异步接口说明:

以下接口分为3类,连接/关闭服务器、异步请求接口(asyn_request_开头)、异步响应接口(asyn_response_开头)。

所有响应消息有四个字段:id、message、status、data;

message为请求名称;status为“ok”表示成功,其他值为错误信息;

特别说明asyn_request_类接口,返回值为bool型,表示此请求是否被加入本地发送队列,并不是成功发出的标志,后面会提到本地发送队列也是有限额的(10000条),所以队列满时,会淘汰最早的请求消息。

务必将应用设计成以响应消息为准。

1、连接服务器

bool WINAPI asyn_connect_server(char * str_endpoint);

str_endpoint为服务器连接点,默认为“tcp://127.0.0.1:117058”

服务器端未运行时,客户端依然可以连接成功,底层会不但去尝试到服务器的连接;在此期间发送的消息会缓存在客户端进程空间中,为避免大量积压,最多缓冲10000条,超过则按照FIFO方式淘汰最早的消息。

此函数会开启一个线程负责与服务器通讯,并在本地维护一个请求队列和一个响应队列,大小均为10000条,对于普通互动应用而言足够了。

2、断开服务器连接

bool asyn_disconnect_server();

 3、关闭服务器

 bool asyn_request_shutdown_server();

向服务器端发送关闭指令,用于退出服务器端进程;

4、注册消息队列

bool asyn_request_register_message_queue(char * str_queue_name);

发送注册消息队列请求,参数为消息队列名称;

消息队列服务器发送、接收消息均需要指定消息队列名称,消息排序方式为FIFO;

消息队列已存在时,响应为注册成功;

5、删除消息队列

bool asyn_request_remove_message_queue(char * str_queue_name);

发送删除消息队列请求,参数为消息队列名称;

消息队列删除后,向此队列投递消息的客户端会得到“unknown queue name”错误;

6、发送消息到指定队列

bool asyn_request_send_message(char * str_queue_name, char *message_buf, int message_length);

发送一条消息投递请求,参数为接收队列名称和消息数据;

str_queue_name为目标消息队列名称;

message_buf为消息缓冲区地址,message_length为消息数据长度;客户端会将这些数据作为二进制通过socket发送给服务器端,并加入到目标消息队列;

7、发送广播消息到所有队列

bool asyn_request_broadcast_message(char *message_buf, int message_length);

发送一条广播请求,参数为广播消息数据;

message_buf为消息缓冲区地址,message_length为消息数据长度;客户端会将这些数据作为二进制通过socket发送给服务器端,在每个消息队列中添加一份;

8、从指定队列接收消息

bool asyn_request_recv_message(char * str_queue_name);

发送一条接收消息请求,参数为队列名称;

应用可以根据需要注册一个或多个消息队列,与其他组件约定好通讯用的消息队列名称,便可以通过此接口获取其他组件发送来的消息通知;

9、清空指定消息队列

bool asyn_request_clear_message_queue(char * str_queue_name);

发送清空指定消息队列请求,参数为消息队列名称;

消息队列服务器在运行过程中,并不会主动删除、清空一个消息队列,即使客户端进程退出,服务器仍然会保留这些消息队列,待客户端重新连接后,仍可使用。

10、获取响应消息

bool asyn_response_query();

接收一条响应数据,返回值为bool型,如果成功响应数据接收成功,则返回true;

此接口会将响应数据保存在本地,作为当前响应消息,有效时间直达下次调用此接口获取新的响应消息,应用可通过其他asyn_response_开头的接口查询消息内各个字段的内容;

11、获取响应消息id

int asyn_response_get_message_id();

获取当前响应消息的id序号,此id号,由客户端发送时生成,采用递增方式;

12、获取响应消息名称

int asyn_response_get_message_length();

int asyn_response_get_message(char* buf, int length);

此接口由两个接口函数完成,应用首先获取消息名称的长度,然后准备足够的内存缓冲区,然调用获取数据的接口,参数为用户提供的buffer首地址以及buffer长度,接口会将消息名称复制到里面,返回值表明了实际复制的字节数。

13、获取响应消息状态

 int asyn_response_get_status_length();

int asyn_response_get_status(char* buf, int length);

与获取消息名称类似,此接口也由两个接口函数完成,参数和返回值含义均类似。

14、获取响应消息数据

int asyn_response_get_data_length();

int asyn_response_get_data(char* buf, int length);

与获取消息名称类似,此接口也由两个接口函数完成,参数和返回值含义均类似。

目前请求名称如下,分别对应上述ayn_request消息:

 getVersion:此请求只是例行公事,返回服务器版本信息,目前收到后可以简单忽略;

 shutdownServer:关闭服务器请求

 registerMessageQueue:注册消息队列请求
removeMessageQueue:删除消息队列请求

 sendMessage:发送消息请求
 broadcastMessage:广播消息请求
 recvMessage:接收消息请求

 clearMessageQueue:清空消息队列请求

此客户端DLL在C++、Unity3D(C#)脚本中测试OK,效果还是挺不错的。

大家有问题,加QQ技术群(322609996)询问即可。

群共享会发布最新版本,文中提到的qfFSM有限自动机,是本人为自己开发的另一个小类库,感兴趣的老大可以下载他的源码,目前支持几种主流编程语言。