YuYuYouErMessageServer – 消息队列服务器
项目中经常用到各种需要阻塞操作的组件,如联网上传、下载,硬件设备通讯,原来多是使用线程来解决,传递一些简单的进度信息。
最近做的项目用到了多台主机采集数据,同样也有一些上传功能,还有串口读卡器,此时很需要一套完善的组件通讯机制,用来分发任务,收集进度反馈。
前端使用消息服务器解决组件通讯问题,后端使用状态机(参考qfFSM有限自动机)来解决逻辑问题,理论上讲一套消息驱动的组件便OK啦。
这份消息队列服务器由两部分组成:
- 服务器:独立exe,命令行方式,接受参数,可指定一个tcp端口以及IP,默认为“tcp://*:117058”;
- 客户端:Win32接口DLL,提供连接服务器,发送、收取消息,注册、删除服务器消息队列;
服务器exe说明:
服务器启动后界面如下,最下方会显示busy或者idle,后面last 1s为最近一秒内处理的请求数目,sum为服务器启动后处理的请求总数。
下图中可以看到,服务处于busy状态,通讯端点为本机tcp端口117058,最近一秒处理了129192个请求。
客户端Win32 DLL异步接口说明:
以下接口分为3类,连接/关闭服务器、异步请求接口(asyn_request_开头)、异步响应接口(asyn_response_开头)。
所有响应消息有四个字段:id、message、status、data;
message为请求名称;status为“ok”表示成功,其他值为错误信息;
特别说明asyn_request_类接口,返回值为bool型,表示此请求是否被加入本地发送队列,并不是成功发出的标志,后面会提到本地发送队列也是有限额的(10000条),所以队列满时,会淘汰最早的请求消息。
务必将应用设计成以响应消息为准。
1、连接服务器
bool WINAPI asyn_connect_server(char * str_endpoint);
str_endpoint为服务器连接点,默认为“tcp://127.0.0.1:117058”
服务器端未运行时,客户端依然可以连接成功,底层会不但去尝试到服务器的连接;在此期间发送的消息会缓存在客户端进程空间中,为避免大量积压,最多缓冲10000条,超过则按照FIFO方式淘汰最早的消息。
此函数会开启一个线程负责与服务器通讯,并在本地维护一个请求队列和一个响应队列,大小均为10000条,对于普通互动应用而言足够了。
2、断开服务器连接
bool asyn_disconnect_server();
3、关闭服务器
bool asyn_request_shutdown_server();
向服务器端发送关闭指令,用于退出服务器端进程;
4、注册消息队列
bool asyn_request_register_message_queue(char * str_queue_name);
发送注册消息队列请求,参数为消息队列名称;
消息队列服务器发送、接收消息均需要指定消息队列名称,消息排序方式为FIFO;
消息队列已存在时,响应为注册成功;
5、删除消息队列
bool asyn_request_remove_message_queue(char * str_queue_name);
发送删除消息队列请求,参数为消息队列名称;
消息队列删除后,向此队列投递消息的客户端会得到“unknown queue name”错误;
6、发送消息到指定队列
bool asyn_request_send_message(char * str_queue_name, char *message_buf, int message_length);
发送一条消息投递请求,参数为接收队列名称和消息数据;
str_queue_name为目标消息队列名称;
message_buf为消息缓冲区地址,message_length为消息数据长度;客户端会将这些数据作为二进制通过socket发送给服务器端,并加入到目标消息队列;
7、发送广播消息到所有队列
bool asyn_request_broadcast_message(char *message_buf, int message_length);
发送一条广播请求,参数为广播消息数据;
message_buf为消息缓冲区地址,message_length为消息数据长度;客户端会将这些数据作为二进制通过socket发送给服务器端,在每个消息队列中添加一份;
8、从指定队列接收消息
bool asyn_request_recv_message(char * str_queue_name);
发送一条接收消息请求,参数为队列名称;
应用可以根据需要注册一个或多个消息队列,与其他组件约定好通讯用的消息队列名称,便可以通过此接口获取其他组件发送来的消息通知;
9、清空指定消息队列
bool asyn_request_clear_message_queue(char * str_queue_name);
发送清空指定消息队列请求,参数为消息队列名称;
消息队列服务器在运行过程中,并不会主动删除、清空一个消息队列,即使客户端进程退出,服务器仍然会保留这些消息队列,待客户端重新连接后,仍可使用。
10、获取响应消息
bool asyn_response_query();
接收一条响应数据,返回值为bool型,如果成功响应数据接收成功,则返回true;
此接口会将响应数据保存在本地,作为当前响应消息,有效时间直达下次调用此接口获取新的响应消息,应用可通过其他asyn_response_开头的接口查询消息内各个字段的内容;
11、获取响应消息id
int asyn_response_get_message_id();
获取当前响应消息的id序号,此id号,由客户端发送时生成,采用递增方式;
12、获取响应消息名称
int asyn_response_get_message_length();
int asyn_response_get_message(char* buf, int length);
此接口由两个接口函数完成,应用首先获取消息名称的长度,然后准备足够的内存缓冲区,然调用获取数据的接口,参数为用户提供的buffer首地址以及buffer长度,接口会将消息名称复制到里面,返回值表明了实际复制的字节数。
13、获取响应消息状态
int asyn_response_get_status_length();
int asyn_response_get_status(char* buf, int length);
与获取消息名称类似,此接口也由两个接口函数完成,参数和返回值含义均类似。
14、获取响应消息数据
int asyn_response_get_data_length();
int asyn_response_get_data(char* buf, int length);
与获取消息名称类似,此接口也由两个接口函数完成,参数和返回值含义均类似。
目前请求名称如下,分别对应上述ayn_request消息:
getVersion:此请求只是例行公事,返回服务器版本信息,目前收到后可以简单忽略;
shutdownServer:关闭服务器请求
registerMessageQueue:注册消息队列请求
removeMessageQueue:删除消息队列请求sendMessage:发送消息请求
broadcastMessage:广播消息请求
recvMessage:接收消息请求clearMessageQueue:清空消息队列请求
此客户端DLL在C++、Unity3D(C#)脚本中测试OK,效果还是挺不错的。
大家有问题,加QQ技术群(322609996)询问即可。
群共享会发布最新版本,文中提到的qfFSM有限自动机,是本人为自己开发的另一个小类库,感兴趣的老大可以下载他的源码,目前支持几种主流编程语言。