Last active
February 13, 2023 13:38
-
-
Save AleksKu/32ed662350f94ec18556272a70905f16 to your computer and use it in GitHub Desktop.
lua openresty rabbitmq example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#RabbitMq | |
set $connect_rabbitMqStompHost "127.0.0.1"; | |
set $connect_rabbitMqStompPort "61613"; | |
set $connect_rabbitMqStompUsername "test"; | |
set $connect_rabbitMqStompPassword "test"; | |
set $connect_rabbitMqStompVhost "/"; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local function sendToQueue(postbackId) | |
local msg = {postbackId=postbackId} | |
executeSendRabbitMq(msg, "/exchange/lead_process") | |
end | |
function _M.storePostback(data, file_all, file_err) | |
... | |
sendToQueue(res) | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local cjson = require "cjson" | |
function executeSendRabbitMq(msg, destination) | |
local rabbitmq = require "rabbitmqstomp" | |
local opts = { username = ngx.var.connect_rabbitMqStompUsername, | |
password = ngx.var.connect_rabbitMqStompPassword, | |
vhost = ngx.var.connect_rabbitMqStompVhost } | |
local mq, err = rabbitmq:new(opts) | |
if not mq then | |
return | |
end | |
mq:set_timeout(10000) | |
local ok, err = mq:connect(ngx.var.connect_rabbitMqStompHost, ngx.var.connect_rabbitMqStompPort) | |
if not ok then | |
return | |
end | |
local headers = {} | |
headers["destination"] = destination | |
headers["app-id"] = "luaresty" | |
headers["persistent"] = "true" | |
headers["content-type"] = "application/json" | |
local ok, err = mq:send(cjson.encode(msg), headers) | |
if not ok then | |
return | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- lua-resty-rabbitmqstomp: Opinionated RabbitMQ (STOMP) client lib | |
-- Copyright (C) 2013 Rohit 'bhaisaab' Yadav, Wingify | |
-- Opensourced at Wingify in New Delhi under the MIT License | |
local byte = string.byte | |
local concat = table.concat | |
local error = error | |
local find = string.find | |
local gsub = string.gsub | |
local insert = table.insert | |
local len = string.len | |
local pairs = pairs | |
local setmetatable = setmetatable | |
local sub = string.sub | |
local tcp = ngx.socket.tcp | |
module(...) | |
_VERSION = "0.1" | |
local mt = { __index = _M } | |
local LF = "\x0a" | |
local EOL = "\x0d\x0a" | |
local NULL_BYTE = "\x00" | |
local STATE_CONNECTED = 1 | |
local STATE_COMMAND_SENT = 2 | |
function new(self, opts) | |
local sock, err = tcp() | |
if not sock then | |
return nil, err | |
end | |
if opts == nil then | |
opts = {username = "guest", password = "guest", vhost = "/", trailing_lf = true} | |
end | |
return setmetatable({ sock = sock, opts = opts}, mt) | |
end | |
function set_timeout(self, timeout) | |
local sock = self.sock | |
if not sock then | |
return nil, "not initialized" | |
end | |
return sock:settimeout(timeout) | |
end | |
function _build_frame(self, command, headers, body) | |
local frame = {command, EOL} | |
if body then | |
headers["content-length"] = len(body) | |
end | |
for key, value in pairs(headers) do | |
insert(frame, key) | |
insert(frame, ":") | |
insert(frame, value) | |
insert(frame, EOL) | |
end | |
insert(frame, EOL) | |
if body then | |
insert(frame, body) | |
end | |
insert(frame, NULL_BYTE) | |
insert(frame, EOL) | |
return concat(frame, "") | |
end | |
function _send_frame(self, frame) | |
local sock = self.sock | |
if not sock then | |
return nil, "not initialized" | |
end | |
return sock:send(frame) | |
end | |
function _receive_frame(self) | |
local sock = self.sock | |
if not sock then | |
return nil, "not initialized" | |
end | |
local resp = nil | |
if self.opts.trailing_lf == nil or self.opts.trailing_lf == true then | |
resp = sock:receiveuntil(NULL_BYTE .. LF, {inclusive = true}) | |
else | |
resp = sock:receiveuntil(NULL_BYTE, {inclusive = true}) | |
end | |
local data, err, partial = resp() | |
return data, err | |
end | |
function _login(self) | |
local headers = {} | |
headers["accept-version"] = "1.2" | |
headers["login"] = self.opts.username | |
headers["passcode"] = self.opts.password | |
headers["host"] = self.opts.vhost | |
local ok, err = _send_frame(self, _build_frame(self, "CONNECT", headers, nil)) | |
if not ok then | |
return nil, err | |
end | |
local frame, err = _receive_frame(self) | |
if not frame then | |
return nil, err | |
end | |
-- We successfully received a frame, but it was an ERROR frame | |
if sub( frame, 1, len( 'ERROR' ) ) == 'ERROR' then | |
return nil, frame | |
end | |
self.state = STATE_CONNECTED | |
return frame | |
end | |
function _logout(self) | |
local sock = self.sock | |
if not sock then | |
self.state = nil | |
return nil, "not initialized" | |
end | |
if self.state == STATE_CONNECTED then | |
-- Graceful shutdown | |
local headers = {} | |
headers["receipt"] = "disconnect" | |
sock:send(_build_frame(self, "DISCONNECT", headers, nil)) | |
sock:receive("*a") | |
end | |
self.state = nil | |
return sock:close() | |
end | |
function connect(self, ...) | |
local sock = self.sock | |
if not sock then | |
return nil, "not initialized" | |
end | |
local ok, err = sock:connect(...) | |
if not ok then | |
return nil, "failed to connect: " .. err | |
end | |
local reused = sock:getreusedtimes() | |
if reused and reused > 0 then | |
self.state = STATE_CONNECTED | |
return 1 | |
end | |
return _login(self) | |
end | |
function send(self, msg, headers) | |
local ok, err = _send_frame(self, _build_frame(self, "SEND", headers, msg)) | |
if not ok then | |
return nil, err | |
end | |
if headers["receipt"] ~= nil then | |
return _receive_frame(self) | |
end | |
return ok, err | |
end | |
function subscribe(self, headers) | |
return _send_frame(self, _build_frame(self, "SUBSCRIBE", headers)) | |
end | |
function unsubscribe(self, headers) | |
return _send_frame(self, _build_frame(self, "UNSUBSCRIBE", headers)) | |
end | |
function receive(self) | |
local data, err = _receive_frame(self) | |
if not data then | |
return nil, err | |
end | |
local idx = find(data, "\n\n", 1) | |
return sub(data, idx + 2) | |
end | |
function set_keepalive(self, ...) | |
local sock = self.sock | |
if not sock then | |
return nil, "not initialized" | |
end | |
if self.state ~= STATE_CONNECTED then | |
return nil, "cannot be reused in the current connection state: " | |
.. (self.state or "nil") | |
end | |
self.state = nil | |
return sock:setkeepalive(...) | |
end | |
function get_reused_times(self) | |
local sock = self.sock | |
if not sock then | |
return nil, "not initialized" | |
end | |
return sock:getreusedtimes() | |
end | |
function close(self) | |
return _logout(self) | |
end | |
local class_mt = { | |
-- to prevent use of casual module global variables | |
__newindex = function (table, key, val) | |
error('attempt to write to undeclared variable "' .. key .. '"') | |
end | |
} | |
setmetatable(_M, class_mt) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment