Skip to content

Instantly share code, notes, and snippets.

@frankosterfeld
Created November 5, 2021 14:47
Show Gist options
  • Save frankosterfeld/528fcbc85e1146ad6b36cbc142404304 to your computer and use it in GitHub Desktop.
Save frankosterfeld/528fcbc85e1146ad6b36cbc142404304 to your computer and use it in GitHub Desktop.
From 463d4ac5dd00523906bd335765111013725de53a Mon Sep 17 00:00:00 2001
From: Frank Osterfeld <frank.osterfeld@kdab.com>
Date: Fri, 5 Nov 2021 14:25:01 +0100
Subject: [PATCH] MdpMessage: Add template parameter for 8 vs. 9 frame
distinction
---
.../include/majordomo/BasicMdpWorker.hpp | 19 +---
src/majordomo/include/majordomo/Message.hpp | 78 +++++++------
src/majordomo/include/majordomo/broker.hpp | 68 ++++++------
src/majordomo/include/majordomo/client.hpp | 14 +--
src/majordomo/test/majordomo_tests.cpp | 105 ++++++++++++------
src/yaxxzmq/include/yaz/kill/Message.hpp | 4 -
6 files changed, 157 insertions(+), 131 deletions(-)
diff --git a/src/majordomo/include/majordomo/BasicMdpWorker.hpp b/src/majordomo/include/majordomo/BasicMdpWorker.hpp
index c51a3ae..3cd3a70 100644
--- a/src/majordomo/include/majordomo/BasicMdpWorker.hpp
+++ b/src/majordomo/include/majordomo/BasicMdpWorker.hpp
@@ -19,13 +19,6 @@ class BasicMdpWorker {
std::atomic<bool> _shutdown_requested = false;
yaz::Socket<MdpMessage, BasicMdpWorker *> _socket;
- void send(MdpMessage &&message) {
- auto &frames = message.parts_ref();
- assert(frames.size() == 9);
- auto span = std::span(frames);
- _socket.send_parts(span.subspan(1, span.size() - 1));
- }
-
protected:
MdpMessage create_message(MdpMessage::WorkerCommand command) {
auto message = MdpMessage::createWorkerMessage(command);
@@ -58,10 +51,6 @@ public:
}
void handle_message(MdpMessage &&message) {
- // we receive 8 frames here, add first empty frame for MdpMessage
- auto &frames = message.parts_ref();
- frames.emplace(frames.begin(), yaz::MessagePart{});
-
if (!message.isValid()) {
debug() << "invalid MdpMessage received\n";
return;
@@ -71,12 +60,12 @@ public:
switch (message.workerCommand()) {
case MdpMessage::WorkerCommand::Get:
if (auto reply = handle_get(std::move(message))) {
- send(std::move(*reply));
+ _socket.send(std::move(*reply));
}
return;
case MdpMessage::WorkerCommand::Set:
if (auto reply = handle_set(std::move(message))) {
- send(std::move(*reply));
+ _socket.send(std::move(*reply));
}
return;
case MdpMessage::WorkerCommand::Heartbeat:
@@ -100,14 +89,14 @@ public:
auto ready = create_message(MdpMessage::WorkerCommand::Ready);
ready.setBody(_service_description, yaz::MessagePart::dynamic_bytes_tag{});
- send(std::move(ready));
+ _socket.send(std::move(ready));
return true;
}
bool disconnect() {
auto msg = create_message(MdpMessage::WorkerCommand::Disconnect);
- send(std::move(msg));
+ _socket.send(std::move(msg));
return _socket.disconnect();
}
diff --git a/src/majordomo/include/majordomo/Message.hpp b/src/majordomo/include/majordomo/Message.hpp
index 6978eea..6e1cecc 100644
--- a/src/majordomo/include/majordomo/Message.hpp
+++ b/src/majordomo/include/majordomo/Message.hpp
@@ -12,25 +12,35 @@ namespace Majordomo::OpenCMW {
using yaz::Bytes;
using yaz::MessagePart;
-class MdpMessage : public yaz::Message {
+enum class MessageFormat {
+ WithSourceId, ///< 9-frame format, contains the source ID as frame 0, used with ROUTER sockets (broker)
+ WithoutSourceId ///< 8-frame format, does not contain the source ID frame
+};
+
+template<MessageFormat Format>
+class BasicMdpMessage;
+
+using MdpMessage = BasicMdpMessage<MessageFormat::WithoutSourceId>;
+
+template<MessageFormat Format>
+class BasicMdpMessage : public yaz::Message {
private:
static constexpr auto clientProtocol = "MDPC03";
static constexpr auto workerProtocol = "MDPW03";
- static constexpr auto numFrames = 9;
- // std::vector<Bytes> _frames{ numFrames };
+ static constexpr auto FrameCount = Format == MessageFormat::WithSourceId ? 9 : 8;
enum class Frame : std::size_t {
- SourceId = 0,
- Protocol = 1,
- Command = 2,
- ServiceName = 3,
- ClientSourceId = ServiceName,
- ClientRequestId = 4,
- Topic = 5,
- Body = 6,
- Error = 7,
- RBAC = 8
+ SourceId = 0,
+ Protocol = Format == MessageFormat::WithSourceId ? 1 : 0,
+ Command,
+ ServiceName,
+ ClientSourceId = ServiceName,
+ ClientRequestId,
+ Topic,
+ Body,
+ Error,
+ RBAC
};
template<typename T>
@@ -48,12 +58,12 @@ private:
return operator[](index(value));
}
- MdpMessage() {
- resize(numFrames);
+ BasicMdpMessage() {
+ resize(FrameCount);
}
- explicit MdpMessage(char command) {
- resize(numFrames);
+ explicit BasicMdpMessage(char command) {
+ resize(FrameCount);
setCommand(command);
assert(this->command() == command);
}
@@ -62,8 +72,8 @@ private:
setFrameData(Frame::Command, new std::string(1, command), MessagePart::dynamic_bytes_tag{});
}
- MdpMessage(const MdpMessage &) = default;
- MdpMessage &operator=(const MdpMessage &) = default;
+ BasicMdpMessage(const BasicMdpMessage &) = default;
+ BasicMdpMessage &operator=(const BasicMdpMessage &) = default;
[[nodiscard]] char command() const {
assert(frameAt(Frame::Command).data().length() == 1);
@@ -74,7 +84,7 @@ private:
template<typename Message>
static bool isMessageValid(const Message &ymsg) {
// TODO better error reporting
- if (ymsg.parts_count() != numFrames) {
+ if (ymsg.parts_count() != FrameCount) {
return false;
}
@@ -128,30 +138,31 @@ public:
Worker
};
- explicit MdpMessage(std::vector<yaz::MessagePart> &&parts)
+ explicit BasicMdpMessage(std::vector<yaz::MessagePart> &&parts)
: yaz::Message(std::move(parts)) {
}
- ~MdpMessage() = default;
- MdpMessage(MdpMessage &&other) = default;
- MdpMessage &operator=(MdpMessage &&other) = default;
+ ~BasicMdpMessage() = default;
+ BasicMdpMessage(BasicMdpMessage &&other) = default;
+ BasicMdpMessage &operator=(BasicMdpMessage &&other) = default;
- static MdpMessage createClientMessage(ClientCommand cmd) {
- MdpMessage msg{ static_cast<char>(cmd) };
+ static BasicMdpMessage createClientMessage(ClientCommand cmd) {
+ BasicMdpMessage msg{ static_cast<char>(cmd) };
msg.setFrameData(Frame::Protocol, clientProtocol, MessagePart::static_bytes_tag{});
return msg;
}
- static MdpMessage createWorkerMessage(WorkerCommand cmd) {
- MdpMessage msg{ static_cast<char>(cmd) };
+ static BasicMdpMessage createWorkerMessage(WorkerCommand cmd) {
+ BasicMdpMessage msg{ static_cast<char>(cmd) };
msg.setFrameData(Frame::Protocol, workerProtocol, MessagePart::static_bytes_tag{});
return msg;
}
- MdpMessage clone() const {
+ BasicMdpMessage clone() const {
// TODO make this nicer...
- MdpMessage tmp;
- for (size_t i = 0; i < parts_count(); ++i)
+ BasicMdpMessage tmp;
+ assert(parts_count() == FrameCount);
+ for (size_t i = 0; i < FrameCount; ++i)
tmp.add_part(std::make_unique<std::string>((*this)[i].data()));
return tmp;
}
@@ -205,10 +216,12 @@ public:
template<typename T, typename Tag>
void setSourceId(T &&sourceId, Tag tag) {
+ static_assert(Format == MessageFormat::WithSourceId, "not available for WithoutSourceId format");
setFrameData(Frame::SourceId, YAZ_FWD(sourceId), tag);
}
[[nodiscard]] std::string_view sourceId() const {
+ static_assert(Format == MessageFormat::WithSourceId, "not available for WithoutSourceId format");
return frameAt(Frame::SourceId).data();
}
@@ -276,7 +289,8 @@ public:
}
};
-static_assert(std::is_nothrow_move_constructible<MdpMessage>::value, "MdpMessage should be noexcept MoveConstructible");
+static_assert(std::is_nothrow_move_constructible<BasicMdpMessage<MessageFormat::WithSourceId>>::value, "MdpMessage should be noexcept MoveConstructible");
+static_assert(std::is_nothrow_move_constructible<BasicMdpMessage<MessageFormat::WithoutSourceId>>::value, "MdpMessage should be noexcept MoveConstructible");
} // namespace Majordomo::OpenCMW
#endif
diff --git a/src/majordomo/include/majordomo/broker.hpp b/src/majordomo/include/majordomo/broker.hpp
index 9f50dd5..7500402 100644
--- a/src/majordomo/include/majordomo/broker.hpp
+++ b/src/majordomo/include/majordomo/broker.hpp
@@ -38,7 +38,7 @@ constexpr int HEARTBEAT_LIVENESS = 3;
constexpr int HEARTBEAT_INTERVAL = 1000;
constexpr auto CLIENT_TIMEOUT = std::chrono::seconds(10); // TODO
-using Majordomo::OpenCMW::MdpMessage;
+using BrokerMessage = BasicMdpMessage<MessageFormat::WithSourceId>;
template<typename Message, typename Handler>
class BaseSocket : public yaz::Socket<Message, Handler> {
@@ -62,28 +62,28 @@ public:
};
template<typename Handler>
-class RouterSocket : public BaseSocket<MdpMessage, Handler> {
+class RouterSocket : public BaseSocket<BrokerMessage, Handler> {
public:
explicit RouterSocket(yaz::Context &context, Handler &&handler)
- : BaseSocket<MdpMessage, Handler>(context, ZMQ_ROUTER, std::move(handler)) {
+ : BaseSocket<BrokerMessage, Handler>(context, ZMQ_ROUTER, std::move(handler)) {
this->bind(INTERNAL_ADDRESS_BROKER);
}
};
template<typename Handler>
-class SubSocket : public BaseSocket<MdpMessage, Handler> {
+class SubSocket : public BaseSocket<BrokerMessage, Handler> {
public:
explicit SubSocket(yaz::Context &context, Handler &&handler)
- : BaseSocket<MdpMessage, Handler>(context, ZMQ_XSUB, std::move(handler)) {
+ : BaseSocket<BrokerMessage, Handler>(context, ZMQ_XSUB, std::move(handler)) {
this->bind(INTERNAL_ADDRESS_SUBSCRIBE);
}
};
template<typename Handler>
-class DnsSocket : public BaseSocket<MdpMessage, Handler> {
+class DnsSocket : public BaseSocket<BrokerMessage, Handler> {
public:
explicit DnsSocket(yaz::Context &context, Handler &&handler)
- : BaseSocket<MdpMessage, Handler>(context, ZMQ_DEALER, std::move(handler)) {
+ : BaseSocket<BrokerMessage, Handler>(context, ZMQ_DEALER, std::move(handler)) {
}
};
@@ -107,12 +107,12 @@ private:
using Timestamp = std::chrono::time_point<std::chrono::steady_clock>;
using SocketGroup = yaz::SocketGroup<Broker *, RouterSocket, PubSocket, SubSocket, DnsSocket>;
- using SocketType = yaz::Socket<MdpMessage, Broker::SocketGroup *>;
+ using SocketType = yaz::Socket<BrokerMessage, Broker::SocketGroup *>;
struct Client {
SocketType *socket;
const std::string id;
- std::deque<MdpMessage> requests;
+ std::deque<BrokerMessage> requests;
explicit Client(SocketType *s, std::string id_)
: socket(s)
@@ -148,19 +148,19 @@ private:
std::string name;
std::string description;
std::deque<Worker *> waiting;
- std::deque<MdpMessage> requests;
+ std::deque<BrokerMessage> requests;
explicit Service(std::string name_, std::string description_)
: name(std::move(name_))
, description(std::move(description_)) {
}
- void put_message(MdpMessage &&message) {
+ void put_message(BrokerMessage &&message) {
// TODO prioritise by RBAC role
requests.emplace_back(std::move(message));
}
- MdpMessage take_next_message() {
+ BrokerMessage take_next_message() {
assert(!requests.empty());
auto msg = std::move(requests.front());
requests.pop_front();
@@ -248,13 +248,13 @@ private:
assert(worker);
message.setClientSourceId(message.sourceId(), MessagePart::dynamic_bytes_tag{});
message.setSourceId(worker->id, MessagePart::dynamic_bytes_tag{});
- message.setProtocol(MdpMessage::Protocol::Worker);
+ message.setProtocol(BrokerMessage::Protocol::Worker);
// TODO assert that command exists in both protocols?
worker->socket->send(std::move(message));
}
}
- void send_with_source_id(MdpMessage &&message, std::string_view source_id) {
+ void send_with_source_id(BrokerMessage &&message, std::string_view source_id) {
message.setSourceId(source_id, MessagePart::dynamic_bytes_tag{});
_sockets.get<RouterSocket>().send(std::move(message));
}
@@ -264,7 +264,7 @@ private:
return subscription_topic.starts_with(topic);
}
- void dispatch_message_to_matching_subscribers(MdpMessage &&message) {
+ void dispatch_message_to_matching_subscribers(BrokerMessage &&message) {
const auto it = _subscribed_clients_by_topic.find(std::string(message.topic()));
const auto has_router_subscriptions = it != _subscribed_clients_by_topic.end();
@@ -325,7 +325,7 @@ private:
// not implemented -- reply according to Majordomo Management Interface (MMI) as defined in http://rfc.zeromq.org/spec:8
- auto reply = MdpMessage::createClientMessage(MdpMessage::ClientCommand::Final);
+ auto reply = BrokerMessage::createClientMessage(BrokerMessage::ClientCommand::Final);
constexpr auto tag = yaz::MessagePart::dynamic_bytes_tag{};
reply.setSourceId(client_message.sourceId(), tag);
reply.setClientSourceId(client_message.clientSourceId(), tag);
@@ -362,7 +362,7 @@ private:
}
template<typename Socket>
- void process_worker(Socket &socket, MdpMessage &&message) {
+ void process_worker(Socket &socket, BrokerMessage &&message) {
assert(message.isWorkerMessage());
const auto service_name = std::string(message.serviceName());
@@ -372,13 +372,13 @@ private:
worker.update_expiry();
switch (message.workerCommand()) {
- case MdpMessage::WorkerCommand::Ready: {
+ case BrokerMessage::WorkerCommand::Ready: {
debug() << "log new local/external worker for service " << service_name << " - " << message << std::endl;
std::ignore = require_service(service_name, std::string(message.body()));
worker_waiting(worker);
// notify potential listeners
- auto notify = MdpMessage::createWorkerMessage(MdpMessage::WorkerCommand::Notify);
+ auto notify = BrokerMessage::createWorkerMessage(BrokerMessage::WorkerCommand::Notify);
const auto dynamic_tag = yaz::MessagePart::dynamic_bytes_tag{};
notify.setServiceName(INTERNAL_SERVICE_NAMES, dynamic_tag);
notify.setTopic(INTERNAL_SERVICE_NAMES, dynamic_tag);
@@ -387,11 +387,11 @@ private:
pub_socket().send(std::move(notify));
break;
}
- case MdpMessage::WorkerCommand::Disconnect:
+ case BrokerMessage::WorkerCommand::Disconnect:
// delete_worker(worker); // TODO handle? also commented out in java impl
break;
- case MdpMessage::WorkerCommand::Partial:
- case MdpMessage::WorkerCommand::Final: {
+ case BrokerMessage::WorkerCommand::Partial:
+ case BrokerMessage::WorkerCommand::Final: {
if (known_worker) {
const auto client_id = message.clientSourceId();
auto client = _clients.find(std::string(client_id));
@@ -403,17 +403,17 @@ private:
message.setServiceName(worker.service_name, yaz::MessagePart::dynamic_bytes_tag{});
const auto client_command = [](auto worker_cmd) {
switch (worker_cmd) {
- case MdpMessage::WorkerCommand::Partial:
- return MdpMessage::ClientCommand::Partial;
- case MdpMessage::WorkerCommand::Final:
- return MdpMessage::ClientCommand::Final;
+ case BrokerMessage::WorkerCommand::Partial:
+ return BrokerMessage::ClientCommand::Partial;
+ case BrokerMessage::WorkerCommand::Final:
+ return BrokerMessage::ClientCommand::Final;
default:
assert(!"unexpected command");
- return MdpMessage::ClientCommand::Final;
+ return BrokerMessage::ClientCommand::Final;
}
}(message.workerCommand());
- message.setProtocol(MdpMessage::Protocol::Client);
+ message.setProtocol(BrokerMessage::Protocol::Client);
message.setClientCommand(client_command);
client->second.socket->send(std::move(message));
worker_waiting(worker);
@@ -422,9 +422,9 @@ private:
}
break;
}
- case MdpMessage::WorkerCommand::Notify: {
- message.setProtocol(MdpMessage::Protocol::Client);
- message.setClientCommand(MdpMessage::ClientCommand::Final);
+ case BrokerMessage::WorkerCommand::Notify: {
+ message.setProtocol(BrokerMessage::Protocol::Client);
+ message.setClientCommand(BrokerMessage::ClientCommand::Final);
message.setSourceId(message.serviceName(), yaz::MessagePart::dynamic_bytes_tag{});
message.setServiceName(worker.service_name, yaz::MessagePart::dynamic_bytes_tag{});
@@ -447,7 +447,7 @@ private:
}
void disconnect_worker(Worker &worker) {
- auto disconnect = MdpMessage::createWorkerMessage(MdpMessage::WorkerCommand::Disconnect);
+ auto disconnect = BrokerMessage::createWorkerMessage(BrokerMessage::WorkerCommand::Disconnect);
constexpr auto dynamic_tag = yaz::MessagePart::dynamic_bytes_tag{};
disconnect.setSourceId(worker.id, dynamic_tag);
disconnect.setServiceName(worker.service_name, dynamic_tag);
@@ -575,7 +575,7 @@ public:
if (message.isClientMessage()) {
switch (message.clientCommand()) {
// TODO handle READY (client)?
- case MdpMessage::ClientCommand::Subscribe: {
+ case BrokerMessage::ClientCommand::Subscribe: {
auto it = _subscribed_clients_by_topic.try_emplace(std::string(message.topic()), std::set<std::string>{});
// TODO check for duplicate subscriptions?
it.first->second.emplace(message.sourceId());
@@ -585,7 +585,7 @@ public:
}
return true;
}
- case MdpMessage::ClientCommand::Unsubscribe: {
+ case BrokerMessage::ClientCommand::Unsubscribe: {
auto it = _subscribed_clients_by_topic.find(std::string(message.topic()));
if (it != _subscribed_clients_by_topic.end()) {
it->second.erase(std::string(message.sourceId()));
diff --git a/src/majordomo/include/majordomo/client.hpp b/src/majordomo/include/majordomo/client.hpp
index a14fd5a..3592685 100644
--- a/src/majordomo/include/majordomo/client.hpp
+++ b/src/majordomo/include/majordomo/client.hpp
@@ -43,7 +43,7 @@ public:
Request get(std::string_view service_name, BodyType request) {
auto [handle, message] = create_request_template(MdpMessage::ClientCommand::Get, service_name);
message.setBody(YAZ_FWD(request), MessagePart::dynamic_bytes_tag{});
- send(std::move(message));
+ _socket.send(std::move(message));
return handle;
}
@@ -58,7 +58,7 @@ public:
Request set(std::string_view service_name, BodyType request) {
auto [handle, message] = create_request_template(MdpMessage::ClientCommand::Set, service_name);
message.setBody(YAZ_FWD(request), MessagePart::dynamic_bytes_tag{});
- send(std::move(message));
+ _socket.send(std::move(message));
return handle;
}
@@ -70,10 +70,6 @@ public:
}
void handle_message(MdpMessage &&message) {
- // we receive 8 frames here, add first empty frame for MdpMessage
- auto &frames = message.parts_ref();
- frames.emplace(frames.begin(), yaz::MessagePart{});
-
if (!message.isValid()) {
debug() << "Received invalid message" << message << std::endl;
return;
@@ -114,12 +110,6 @@ private:
Request make_request_handle() {
return Request{ _next_request_id++ };
}
-
- void send(MdpMessage &&message) {
- auto &frames = message.parts_ref();
- auto span = std::span(frames);
- _socket.send_parts(span.subspan(1, span.size() - 1));
- }
};
} // namespace Majordomo::OpenCMW
diff --git a/src/majordomo/test/majordomo_tests.cpp b/src/majordomo/test/majordomo_tests.cpp
index 4011442..c453245 100644
--- a/src/majordomo/test/majordomo_tests.cpp
+++ b/src/majordomo/test/majordomo_tests.cpp
@@ -13,8 +13,6 @@
#include <deque>
#include <thread>
-using Majordomo::OpenCMW::MdpMessage;
-
class TestNode {
std::deque<yaz::Message> _receivedMessages;
@@ -50,39 +48,75 @@ public:
};
TEST_CASE("OpenCMW::Message basics", "[Majordomo]") {
- using Majordomo::OpenCMW::MdpMessage;
+ {
+ using MdpMessage = Majordomo::OpenCMW::BasicMdpMessage<Majordomo::OpenCMW::MessageFormat::WithSourceId>;
+ auto msg = MdpMessage::createClientMessage(MdpMessage::ClientCommand::Final);
+ REQUIRE(msg.isClientMessage());
+ REQUIRE(msg.clientCommand() == MdpMessage::ClientCommand::Final);
+
+ auto tag = yaz::MessagePart::static_bytes_tag{};
+ msg.setTopic("I'm a topic", tag);
+ msg.setServiceName("service://abc", tag);
+ msg.setClientRequestId("request 1", tag);
+ msg.setBody("test body test body test body test body test body test body test body", tag);
+ msg.setError("fail!", tag);
+ msg.setRbac("password", tag);
+
+ REQUIRE(msg.isClientMessage());
+ REQUIRE(msg.clientCommand() == MdpMessage::ClientCommand::Final);
+ REQUIRE(msg.topic() == "I'm a topic");
+ REQUIRE(msg.serviceName() == "service://abc");
+ REQUIRE(msg.clientRequestId() == "request 1");
+ REQUIRE(msg.body() == "test body test body test body test body test body test body test body");
+ REQUIRE(msg.error() == "fail!");
+ REQUIRE(msg.rbac() == "password");
+
+ REQUIRE(msg.parts_count() == 9);
+ REQUIRE(msg[0].data() == "");
+ REQUIRE(msg[1].data() == "MDPC03");
+ REQUIRE(msg[2].data() == "\x6");
+ REQUIRE(msg[3].data() == "service://abc");
+ REQUIRE(msg[4].data() == "request 1");
+ REQUIRE(msg[5].data() == "I'm a topic");
+ REQUIRE(msg[6].data() == "test body test body test body test body test body test body test body");
+ REQUIRE(msg[7].data() == "fail!");
+ REQUIRE(msg[8].data() == "password");
+ }
+
+ {
+ using MdpMessage = Majordomo::OpenCMW::BasicMdpMessage<Majordomo::OpenCMW::MessageFormat::WithoutSourceId>;
+ auto msg = MdpMessage::createClientMessage(MdpMessage::ClientCommand::Final);
+ REQUIRE(msg.isClientMessage());
+ REQUIRE(msg.clientCommand() == MdpMessage::ClientCommand::Final);
+
+ auto tag = yaz::MessagePart::static_bytes_tag{};
+ msg.setTopic("I'm a topic", tag);
+ msg.setServiceName("service://abc", tag);
+ msg.setClientRequestId("request 1", tag);
+ msg.setBody("test body test body test body test body test body test body test body", tag);
+ msg.setError("fail!", tag);
+ msg.setRbac("password", tag);
+
+ REQUIRE(msg.isClientMessage());
+ REQUIRE(msg.clientCommand() == MdpMessage::ClientCommand::Final);
+ REQUIRE(msg.topic() == "I'm a topic");
+ REQUIRE(msg.serviceName() == "service://abc");
+ REQUIRE(msg.clientRequestId() == "request 1");
+ REQUIRE(msg.body() == "test body test body test body test body test body test body test body");
+ REQUIRE(msg.error() == "fail!");
+ REQUIRE(msg.rbac() == "password");
+
+ REQUIRE(msg.parts_count() == 8);
+ REQUIRE(msg[0].data() == "MDPC03");
+ REQUIRE(msg[1].data() == "\x6");
+ REQUIRE(msg[2].data() == "service://abc");
+ REQUIRE(msg[3].data() == "request 1");
+ REQUIRE(msg[4].data() == "I'm a topic");
+ REQUIRE(msg[5].data() == "test body test body test body test body test body test body test body");
+ REQUIRE(msg[6].data() == "fail!");
+ REQUIRE(msg[7].data() == "password");
+ }
- auto msg = MdpMessage::createClientMessage(MdpMessage::ClientCommand::Final);
- REQUIRE(msg.isClientMessage());
- REQUIRE(msg.clientCommand() == MdpMessage::ClientCommand::Final);
-
- auto tag = yaz::MessagePart::static_bytes_tag{};
- msg.setTopic("I'm a topic", tag);
- msg.setServiceName("service://abc", tag);
- msg.setClientRequestId("request 1", tag);
- msg.setBody("test body test body test body test body test body test body test body", tag);
- msg.setError("fail!", tag);
- msg.setRbac("password", tag);
-
- REQUIRE(msg.isClientMessage());
- REQUIRE(msg.clientCommand() == MdpMessage::ClientCommand::Final);
- REQUIRE(msg.topic() == "I'm a topic");
- REQUIRE(msg.serviceName() == "service://abc");
- REQUIRE(msg.clientRequestId() == "request 1");
- REQUIRE(msg.body() == "test body test body test body test body test body test body test body");
- REQUIRE(msg.error() == "fail!");
- REQUIRE(msg.rbac() == "password");
-
- REQUIRE(msg.parts_count() == 9);
- REQUIRE(msg[0].data().empty());
- REQUIRE(msg[1].data() == "MDPC03");
- REQUIRE(msg[2].data() == "\x6");
- REQUIRE(msg[3].data() == "service://abc");
- REQUIRE(msg[4].data() == "request 1");
- REQUIRE(msg[5].data() == "I'm a topic");
- REQUIRE(msg[6].data() == "test body test body test body test body test body test body test body");
- REQUIRE(msg[7].data() == "fail!");
- REQUIRE(msg[8].data() == "password");
}
TEST_CASE("Request answered with unknown service", "[Broker]") {
@@ -443,9 +477,12 @@ TEST_CASE("pubsub example using router socket", "[Broker]") {
}
}
+using Majordomo::OpenCMW::MdpMessage;
+
class TestIntWorker : public Majordomo::OpenCMW::BasicMdpWorker {
int _x = 10;
public:
+
explicit TestIntWorker(yaz::Context &context, std::string service_name, int initial_value)
: Majordomo::OpenCMW::BasicMdpWorker(context, service_name)
, _x(initial_value) {
diff --git a/src/yaxxzmq/include/yaz/kill/Message.hpp b/src/yaxxzmq/include/yaz/kill/Message.hpp
index 19c8a33..4fea62c 100644
--- a/src/yaxxzmq/include/yaz/kill/Message.hpp
+++ b/src/yaxxzmq/include/yaz/kill/Message.hpp
@@ -203,10 +203,6 @@ public:
void resize(std::size_t size) {
_parts.resize(size);
}
-
- std::vector<MessagePart> &parts_ref() {
- return _parts;
- }
};
} // namespace yaz
--
2.32.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment