|
#include "zmq.hpp" |
|
#include <assert.h> |
|
#include <cstdio> |
|
#include <cstring> |
|
#include <map> |
|
#include <set> |
|
#include <vector> |
|
#include <list> |
|
#include <string> |
|
#include <memory> |
|
|
|
// 通信模型 |
|
// [CLIENT]->[DEALER]<->{[ROUTER|ROUTER]<->[ROUTER|ROUTER]}<->[DEALER] |
|
// |
|
|
|
enum msg_type { |
|
EN_REPORT = 1, |
|
EN_TRANS = 2, |
|
}; |
|
|
|
struct msg_send { |
|
char src[16]; |
|
char dst[16]; |
|
}; |
|
|
|
struct app_reg { |
|
char app_name[16]; |
|
char proxy_name[16]; |
|
char listen[64]; |
|
}; |
|
|
|
struct msg_head { |
|
msg_type type; |
|
union { |
|
msg_send send; |
|
app_reg reg; |
|
}; |
|
}; |
|
|
|
struct msg_body { |
|
int left_times; |
|
int sum_times; |
|
time_t start; |
|
time_t end; |
|
}; |
|
|
|
void dump_data(void* data, size_t s) { |
|
while (s > 0) { |
|
msg_head* h = (msg_head*) data; |
|
if (s >= sizeof(msg_head) && EN_REPORT == h->type) { |
|
printf("[DEBUG] recv report, app name %s, proxy name %s, listen %s\n", h->reg.app_name, h->reg.proxy_name, h->reg.listen); |
|
s -= sizeof(msg_head); |
|
data = (char*) data + sizeof(msg_head); |
|
continue; |
|
} |
|
|
|
if (s >= sizeof(msg_head) && EN_TRANS == h->type) { |
|
printf("[DEBUG] recv transform, src %s, dst %s\n", h->send.src, h->send.dst); |
|
msg_body* body = (msg_body*) ((char*) data + sizeof(msg_head)); |
|
printf("[-----] start time:%llu, end time: %llu, sum: %d, left %d\n", body->start, body->end, body->sum_times, body->left_times); |
|
char* str = (char*) malloc(s); |
|
memset(str, 0, s); |
|
memcpy(str, (char*) data + sizeof(msg_head) +sizeof(msg_body), s - sizeof(msg_head) -sizeof(msg_body)); |
|
printf("%s\n", str); |
|
free(str); |
|
s = 0; |
|
continue; |
|
} |
|
|
|
printf("dump error msg (len => %d).\n", (int) s); |
|
char* p = (char*) data; |
|
for (size_t i = 0; i < s; ++i) { |
|
if (p[i] <= 126 && p[i] >= 32) |
|
putchar(p[i]); |
|
else |
|
printf("%p", p[i]); |
|
} |
|
putchar('\n'); |
|
s = 0; |
|
} |
|
} |
|
|
|
char my_name[16] = {0}; |
|
std::map<std::string, app_reg> g_app_list; |
|
std::map<std::string, std::shared_ptr<zmq::socket_t> > g_proxy_list; |
|
std::list<std::shared_ptr<zmq::socket_t> > g_proxy_socks; |
|
|
|
bool reg_proxy(app_reg& proxy, std::shared_ptr<zmq::socket_t> sock) { |
|
std::string app_name = proxy.app_name; |
|
if (g_proxy_list.find(app_name) != g_proxy_list.end()) { |
|
return false; |
|
} |
|
|
|
g_proxy_list[app_name] = sock; |
|
printf("reg proxy %s success.\n", app_name.c_str()); |
|
return true; |
|
} |
|
|
|
void connect_to_proxy(zmq::context_t& ctx, const char* listen, msg_head& proxy_reg) { |
|
std::shared_ptr<zmq::socket_t> send_endpoint = std::shared_ptr<zmq::socket_t>(new zmq::socket_t(ctx, ZMQ_DEALER)); |
|
send_endpoint->connect(listen); |
|
send_endpoint->send(&proxy_reg, sizeof(proxy_reg), 0); |
|
|
|
zmq::pollitem_t poll_items[1]; |
|
poll_items[0].events = ZMQ_POLLIN; |
|
poll_items[0].fd = 0; |
|
poll_items[0].revents = 0; |
|
poll_items[0].socket = *send_endpoint; |
|
|
|
int rc = zmq::poll(poll_items, 1, 1000); |
|
if (rc <= 0) { |
|
fprintf(stderr, "poll ret: %d\n", rc); |
|
return; |
|
} |
|
|
|
zmq::message_t msg; |
|
send_endpoint->recv(&msg, 0); |
|
msg_head* head = (msg_head*) msg.data(); |
|
|
|
dump_data(msg.data(), msg.size()); |
|
if (EN_REPORT != head->type) |
|
return; |
|
|
|
if (false == reg_proxy(head->reg, send_endpoint)) |
|
return; |
|
|
|
g_proxy_socks.push_back(send_endpoint); |
|
} |
|
|
|
bool reg_app(app_reg& app) { |
|
std::string app_name = app.app_name; |
|
if (g_app_list.find(app_name) != g_app_list.end()) { |
|
return false; |
|
} |
|
|
|
memcpy(&g_app_list[app_name], &app, sizeof(app)); |
|
printf("reg app %s(proxy: %s) success.\n", app.app_name, app.proxy_name); |
|
return true; |
|
} |
|
|
|
bool reg_bro_app(zmq::socket_t& router_sock, app_reg& src, app_reg& dst) { |
|
msg_head app_reg; |
|
app_reg.type = EN_REPORT; |
|
strncpy(app_reg.reg.app_name, src.app_name, sizeof(app_reg.reg.app_name)); |
|
strncpy(app_reg.reg.proxy_name, src.proxy_name, sizeof(app_reg.reg.proxy_name)); |
|
strncpy(app_reg.reg.listen, src.listen, sizeof(app_reg.reg.listen)); |
|
|
|
|
|
router_sock.send(dst.app_name, strlen(dst.app_name), ZMQ_NOBLOCK | ZMQ_SNDMORE); |
|
router_sock.send(&app_reg, sizeof(app_reg), ZMQ_NOBLOCK); |
|
|
|
return true; |
|
} |
|
|
|
int main(int argc, char *argv[]) { |
|
if (argc < 4) { |
|
printf("usage: %s <name> <recv endpoint> <local endpoint> [connect proxy] ...\n", argv[0]); |
|
return 0; |
|
} |
|
|
|
strncpy(my_name, argv[1], sizeof(my_name)); |
|
size_t name_len = strlen(my_name); |
|
|
|
|
|
// 当前节点信息 |
|
msg_head proxy_reg; |
|
proxy_reg.type = EN_REPORT; |
|
strncpy(proxy_reg.reg.app_name, my_name, sizeof(proxy_reg.reg.app_name)); |
|
strncpy(proxy_reg.reg.proxy_name, my_name, sizeof(proxy_reg.reg.proxy_name)); |
|
strncpy(proxy_reg.reg.listen, argv[2], sizeof(proxy_reg.reg.listen)); |
|
|
|
|
|
zmq::context_t ctx(1); |
|
|
|
// 接收端点 |
|
std::shared_ptr<zmq::socket_t> recv_endpoint = std::shared_ptr<zmq::socket_t>(new zmq::socket_t(ctx, ZMQ_ROUTER)); |
|
recv_endpoint->setsockopt(ZMQ_IDENTITY, my_name, name_len); |
|
recv_endpoint->bind(argv[2]); |
|
recv_endpoint->bind(argv[3]); |
|
|
|
reg_proxy(proxy_reg.reg, recv_endpoint); |
|
|
|
for (int i = 4; i < argc; ++i) { |
|
// 临时发送端点 |
|
connect_to_proxy(ctx, argv[i], proxy_reg); |
|
} |
|
|
|
while (true) { |
|
zmq::pollitem_t poll_items[1]; |
|
poll_items[0].events = ZMQ_POLLIN; |
|
poll_items[0].fd = 0; |
|
poll_items[0].revents = 0; |
|
poll_items[0].socket = *recv_endpoint; |
|
|
|
int rc = zmq::poll(poll_items, 1); |
|
if (rc < 0) { |
|
fprintf(stderr, "poll ret: %d\n", rc); |
|
continue; |
|
} |
|
|
|
if (poll_items[0].revents & ZMQ_POLLIN) { |
|
zmq::message_t msg, msg_route_node; |
|
|
|
// 第一次会收到路由节点名称 |
|
while (recv_endpoint->recv(&msg_route_node, ZMQ_NOBLOCK)) { |
|
bool flag = recv_endpoint->recv(&msg, ZMQ_NOBLOCK); |
|
if (false == flag) { |
|
fprintf(stderr, "poll success but recv failed\n"); |
|
continue; |
|
} |
|
|
|
// dump_data(msg.data(), msg.size()); |
|
|
|
size_t left_len = msg.size(); |
|
while (left_len >= sizeof(msg_head)) { |
|
msg_head* head = (msg_head*) msg.data(); |
|
|
|
// proxy 注册协议 |
|
if (EN_REPORT == head->type) { |
|
left_len -= sizeof(msg_head); |
|
app_reg& app = head->reg; |
|
|
|
// 如果是proxy则返回自身的信息以供注册 |
|
if (0 == strcmp(app.app_name, app.proxy_name)) { |
|
recv_endpoint->send(msg_route_node.data(), msg_route_node.size(), ZMQ_NOBLOCK | ZMQ_SNDMORE); |
|
recv_endpoint->send(&proxy_reg, sizeof(proxy_reg), ZMQ_NOBLOCK); |
|
|
|
// 反向连接 |
|
if (g_proxy_list.find(app.app_name) == g_proxy_list.end()) { |
|
connect_to_proxy(ctx, app.listen, proxy_reg); |
|
|
|
if (g_proxy_list.find(app.app_name) == g_proxy_list.end()) { |
|
continue; |
|
} |
|
|
|
// 反向注册所有的app到远程proxy |
|
for (std::map<std::string, app_reg>::value_type& app_node : g_app_list) { |
|
if (0 != strcmp(app_node.second.proxy_name, my_name)) |
|
continue; |
|
|
|
msg_head app_reg; |
|
app_reg.type = EN_REPORT; |
|
strncpy(app_reg.reg.app_name, app_node.second.app_name, sizeof(app_reg.reg.app_name)); |
|
strncpy(app_reg.reg.proxy_name, my_name, sizeof(app_reg.reg.proxy_name)); |
|
strncpy(app_reg.reg.listen, argv[2], sizeof(app_reg.reg.listen)); |
|
|
|
g_proxy_list[app.app_name]->send(&app_reg, sizeof(app_reg), ZMQ_NOBLOCK); |
|
} |
|
} |
|
|
|
} |
|
else if (0 == app.proxy_name[0]) { // 子节点同步广播到所有proxy |
|
msg_head app_reg; |
|
app_reg.type = EN_REPORT; |
|
strncpy(app_reg.reg.app_name, app.app_name, sizeof(app_reg.reg.app_name)); |
|
strncpy(app_reg.reg.proxy_name, my_name, sizeof(app_reg.reg.proxy_name)); |
|
strncpy(app_reg.reg.listen, argv[2], sizeof(app_reg.reg.listen)); |
|
|
|
for (const std::map<std::string, std::shared_ptr<zmq::socket_t> >::value_type& proxy : g_proxy_list) { |
|
if (proxy.second == recv_endpoint) |
|
continue; |
|
|
|
proxy.second->send(&app_reg, sizeof(app_reg), ZMQ_NOBLOCK); |
|
} |
|
|
|
strncpy(app_reg.reg.listen, app.listen, sizeof(app_reg.reg.listen)); |
|
reg_app(app_reg.reg); |
|
} |
|
else { |
|
reg_app(app); |
|
} |
|
} |
|
else if (EN_TRANS == head->type) { |
|
left_len = 0; |
|
msg_send& send = head->send; |
|
std::map<std::string, app_reg>::iterator iter = g_app_list.find(send.dst); |
|
if (iter == g_app_list.end()) { |
|
fprintf(stderr, "send msg from src(%s) to dst(%s) failed. dst not found. cur node(%s).\n", send.src, send.dst, my_name); |
|
continue; |
|
} |
|
|
|
// 发送到下属子节点 |
|
if (0 == strcmp(iter->second.proxy_name, my_name)) { |
|
recv_endpoint->send(iter->second.app_name, strlen(iter->second.app_name), ZMQ_SNDMORE); |
|
bool b = recv_endpoint->send(msg, 0); |
|
if (false == b) { |
|
fprintf(stderr, "send msg from src(%s) to dst(%s) failed. send failed. cur node(%s).\n", send.src, send.dst, my_name); |
|
} |
|
} |
|
else { // 发送到远程代理节点 |
|
std::map<std::string, std::shared_ptr<zmq::socket_t> >::iterator proxy_it = g_proxy_list.find(iter->second.proxy_name); |
|
if (proxy_it == g_proxy_list.end()) { |
|
fprintf(stderr, "proxy %s not registered.\n", iter->second.proxy_name); |
|
continue; |
|
} |
|
|
|
bool b = proxy_it->second->send(msg, 0); |
|
if (false == b) { |
|
fprintf(stderr, "send msg from src(%s) to dst(%s) failed. send failed. cur node(%s).\n", send.src, send.dst, my_name); |
|
} |
|
} |
|
|
|
// 下发兄弟节点信息 |
|
std::map<std::string, app_reg>::iterator iter_opt = g_app_list.find(send.src); |
|
if (iter_opt == g_app_list.end()) { |
|
fprintf(stderr, "send msg from src(%s) to dst(%s) warning. src not found. cur node(%s).\n", send.src, send.dst, my_name); |
|
continue; |
|
} |
|
|
|
if (0 == strcmp(iter_opt->second.proxy_name, my_name) && 0 == strcmp(iter->second.proxy_name, my_name)) { |
|
reg_bro_app(*recv_endpoint, iter->second, iter_opt->second); |
|
reg_bro_app(*recv_endpoint, iter_opt->second, iter->second); |
|
} |
|
|
|
} |
|
else { |
|
fprintf(stderr, "got unknown action type %d.\n", head->type); |
|
left_len = 0; |
|
} |
|
} |
|
|
|
if (left_len != 0) { |
|
dump_data(msg.data(), msg.size()); |
|
} |
|
} |
|
} |
|
} |
|
|
|
return 0; |
|
} |