-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdds.cpp
More file actions
106 lines (90 loc) · 3.65 KB
/
dds.cpp
File metadata and controls
106 lines (90 loc) · 3.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include "dds.hpp"
#include <map>
#include <signal.h>
class SubListener : public DataReaderListener {
public:
JSONMessage message;
Dds* dds;
SubListener(Dds* dds_) : dds(dds_) {
}
~SubListener() override {}
void on_subscription_matched(DataReader*, const SubscriptionMatchedStatus& info) override {
if (info.current_count_change == 1) {
std::cout << "Subscriber matched." << std::endl;
} else if (info.current_count_change == -1) {
std::cout << "Subscriber unmatched." << std::endl;
} else {
std::cout << info.current_count_change
<< " is not a valid value for SubscriptionMatchedStatus current count change" << std::endl;
}
}
void on_data_available(DataReader* reader) override {
SampleInfo info;
if (reader->take_next_sample(&message, &info) == ReturnCode_t::RETCODE_OK) {
if (info.valid_data) {
dds->on_message(reader->get_topicdescription()->get_name(), message.message());
// std::cout << "Message: " << message.message() << " RECEIVED." << std::endl;
}
}
}
};
SubListener* listener_;
TypeSupport* typeSupport;
JSONMessagePubSubType jsonMessagePubSubType;
std::map<std::string, std::unique_ptr<DDSPublisher<JSONMessage, JSONMessagePubSubType>>> dds_publishers;
std::map<std::string, std::unique_ptr<DDSSubscriber>> dds_subscribers;
void sig_handler(int sig) {
dds_publishers.clear();
dds_subscribers.clear();
}
Dds::Dds(std::string client_name, int domain, function<void(std::string, std::string)> func) {
signal(SIGTERM, sig_handler);
this->client_name = client_name;
this->domain = domain;
this->on_message_function = func;
listener_ = new SubListener(this);
typeSupport = new TypeSupport(&jsonMessagePubSubType);
std::thread dds_th(&Dds::from_dds_thread, this);
dds_th.detach();
}
Dds::Dds(std::string client_name, int domain, function<void(std::string, std::string)> func, std::string msg_type) {
signal(SIGTERM, sig_handler);
this->client_name = client_name;
this->domain = domain;
this->on_message_function = func;
this->message_type = msg_type;
listener_ = new SubListener(this);
typeSupport = new TypeSupport(&jsonMessagePubSubType);
std::thread dds_th(&Dds::from_dds_thread, this);
dds_th.detach();
}
Dds::~Dds() {
}
void Dds::publish(std::string topic, std::string message) {
std::unique_ptr<JSONMessage> jsonMessage = std::make_unique<JSONMessage>();
jsonMessage->uuid(1);
jsonMessage->topic(topic);
jsonMessage->message(message);
jsonMessage->datetime(std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count());
dds_publishers[topic].get()->publish(std::move(jsonMessage));
}
void Dds::subscribe(std::string topic) {
std::unique_ptr<DDSSubscriber> ptr = \
std::make_unique<DDSSubscriber>(listener_, typeSupport);
ptr.get()->init(this->client_name, this->domain, topic, "JSONMessage", TOPIC_QOS_DEFAULT);
dds_subscribers[topic] = std::move(ptr);
}
void Dds::provision_publisher(std::string topic) {
std::unique_ptr<DDSPublisher<JSONMessage, JSONMessagePubSubType>> ptr = \
std::make_unique<DDSPublisher<JSONMessage, JSONMessagePubSubType>>(typeSupport);
ptr.get()->init(this->client_name, this->domain, topic, "JSONMessage", TOPIC_QOS_DEFAULT);
dds_publishers[topic] = std::move(ptr);
}
void Dds::from_dds_thread() {
while (1) {
std::this_thread::sleep_for(std::chrono::seconds(100));
}
}
void Dds::on_message(std::string topic, std::string message) {
on_message_function(topic, message);
}