-
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathmain.cpp
More file actions
307 lines (254 loc) · 10.8 KB
/
main.cpp
File metadata and controls
307 lines (254 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
#include <libwebsockets.h>
#include <cstring>
#include <csignal>
#include <iostream>
#include <string>
#include <cstdlib>
#include <atomic>
#include <ev.h>
#include <aws/core/Aws.h>
#include <spdlog/spdlog.h>
#include <spdlog/sinks/stdout_sinks.h>
#include <memory>
#include "thread-pool.h"
#include "connection-manager.h"
#include "string-utils.h"
#include "s3-client-manager.h"
#include "cloudwatch-client.h"
extern const struct lws_protocols protocols[];
static const lws_retry_bo_t retry = {
.secs_since_valid_ping = 3,
.secs_since_valid_hangup = 10,
};
static struct ev_loop *m_loop = nullptr;
static struct ev_async *m_stop_watcher = nullptr;
static std::atomic<bool> interrupted(false);
#if defined(LWS_WITH_PLUGINS)
/* if plugins enabled, only protocols explicitly named in pvo bind to vhost */
static lws_protocol_vhost_options pvo = { nullptr, nullptr, "lws-minimal", "" };
#endif
void sigint_handler(int sig) {
interrupted = true;
if (m_loop && m_stop_watcher) {
ev_async_send(m_loop, m_stop_watcher);
}
}
// Parse command-line arguments
int parse_port(int argc, const char **argv, int default_port) {
for (int i = 1; i < argc; ++i) {
if (std::strcmp(argv[i], "--port") == 0 && i + 1 < argc) {
return std::atoi(argv[i + 1]);
}
}
return default_port;
}
// Parse thread count from command line
int parse_thread_count(int argc, const char **argv) {
for (int i = 1; i < argc; ++i) {
if (std::strcmp(argv[i], "--threads") == 0 && i + 1 < argc) {
return std::atoi(argv[i + 1]);
}
}
return std::thread::hardware_concurrency(); // Default to hardware concurrency
}
// Parse AWS max connections from command line
int parse_aws_max_connections(int argc, const char **argv) {
for (int i = 1; i < argc; ++i) {
if (std::strcmp(argv[i], "--aws-max-connections") == 0 && i + 1 < argc) {
return std::atoi(argv[i + 1]);
}
}
return std::thread::hardware_concurrency() * 2; // Default to CPU count * 2
}
// Parse buffer process size from command line (in KB)
size_t parse_buffer_process_size(int argc, const char **argv) {
for (int i = 1; i < argc; ++i) {
if (std::strcmp(argv[i], "--buffer-process-size") == 0 && i + 1 < argc) {
return std::atoi(argv[i + 1]) * 1024; // Convert KB to bytes
}
}
return 512 * 1024; // Default 512KB
}
// Parse max buffer size from command line (in MB)
size_t parse_max_buffer_size(int argc, const char **argv) {
for (int i = 1; i < argc; ++i) {
if (std::strcmp(argv[i], "--max-buffer-size") == 0 && i + 1 < argc) {
return std::atoi(argv[i + 1]) * 1024 * 1024; // Convert MB to bytes
}
}
return 3 * 1024 * 1024; // Default 3MB
}
void print_usage(const char* program_name) {
std::cout << "Usage: " << program_name << " [OPTIONS]\n"
<< "Options:\n"
<< " --port PORT Server port (default: 3017)\n"
<< " --threads COUNT Thread pool size (default: CPU count)\n"
<< " --aws-max-connections COUNT AWS S3 max connections (default: CPU count * 2)\n"
<< " --buffer-process-size KB Buffer processing threshold in KB (default: 512)\n"
<< " --max-buffer-size MB Maximum buffer size per session in MB (default: 3)\n"
<< " -v, --version Show version\n"
<< " -h, --help Show this help\n"
<< " -d LOG_LEVEL LWS debug log level\n"
<< " -s Enable TLS\n"
<< "\nEnvironment Variables:\n"
<< " LOG_LEVEL Application log level (debug, info, warn, error)\n"
<< " ENCRYPTION_SECRET Required for credential decryption\n"
<< " JAMBONZ_UPLOADER_TMP_FOLDER Temporary upload folder (default: /tmp/uploads)\n"
<< " BASIC_AUTH_USERNAME WebSocket basic auth username\n"
<< " BASIC_AUTH_PASSWORD WebSocket basic auth password\n";
}
int main(int argc, const char **argv) {
lws_context_creation_info info;
lws_context *context;
struct ev_loop *loop;
struct ev_async stop_watcher;
const char *p;
int n = 0;
// Check for help flag first
for (int i = 1; i < argc; ++i) {
if (std::strcmp(argv[i], "-h") == 0 || std::strcmp(argv[i], "--help") == 0) {
print_usage(argv[0]);
return 0;
}
}
// Create a non-colored stdout sink
auto stdout_sink = std::make_shared<spdlog::sinks::stdout_sink_mt>();
// Create the logger using the non-colored sink
auto logger = std::make_shared<spdlog::logger>("uploader", stdout_sink);
// Set it as the default logger
spdlog::set_default_logger(logger);
// Check for version flag first, before any other logging
for (int i = 1; i < argc; ++i) {
if (std::strcmp(argv[i], "-v") == 0 || std::strcmp(argv[i], "--version") == 0) {
std::cout << "jambonz recording server (ws) version " << UPLOADER_VERSION << std::endl;
return 0;
}
}
// Check for required environment variables only if we're actually starting the server
const char* encryption_secret = std::getenv("ENCRYPTION_SECRET");
if (!encryption_secret) {
throw std::runtime_error("ENCRYPTION_SECRET environment variable is not set");
}
std::string threadId = getThreadIdString();
spdlog::info("Main thread id: {}", threadId);
// Set the log level from an environment variable
const char* env_log_level = std::getenv("LOG_LEVEL");
spdlog::level::level_enum level = spdlog::level::info; // Default level: info
if (env_log_level) {
std::string level_str(env_log_level);
if (level_str == "debug") {
std::cout << "Setting log level to debug" << std::endl;
level = spdlog::level::debug;
}
else if (level_str == "info") level = spdlog::level::info;
else if (level_str == "warn") level = spdlog::level::warn;
else if (level_str == "error") level = spdlog::level::err;
}
spdlog::set_level(level);
spdlog::info("Log level set to: {}", spdlog::level::to_string_view(spdlog::get_level()));
// Parse configuration from command line
int thread_count = parse_thread_count(argc, argv);
int aws_max_connections = parse_aws_max_connections(argc, argv);
size_t buffer_process_size = parse_buffer_process_size(argc, argv);
size_t max_buffer_size = parse_max_buffer_size(argc, argv);
int port = parse_port(argc, argv, 3017);
spdlog::info("Configuration:");
spdlog::info(" Thread pool size: {}", thread_count);
spdlog::info(" AWS max connections: {}", aws_max_connections);
spdlog::info(" Buffer process size: {} KB", buffer_process_size / 1024);
spdlog::info(" Max buffer size: {} MB", max_buffer_size / (1024 * 1024));
spdlog::info(" Server port: {}", port);
Aws::SDKOptions options;
try {
// Disable AWS SDK file logging to prevent log files in working directory
options.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Off;
Aws::InitAPI(options);
// Initialize the thread pool with parsed thread count
auto& threadPool = ThreadPool::getInstance(thread_count);
// Initialize the connection manager
auto& connectionManager = ConnectionManager::getInstance();
// Initialize statsd client at startup
ConnectionManager::initializeStatsd();
// Initialize CloudWatch client at startup
ConnectionManager::initializeCloudWatch();
// Set global configuration values for Session class
Session::setGlobalConfig(buffer_process_size, max_buffer_size, aws_max_connections);
int logs = LLL_ERR | LLL_WARN;
// Set the SIGINT handler
std::signal(SIGINT, sigint_handler);
std::signal(SIGTERM, sigint_handler);
// Check for log level argument
if ((p = lws_cmdline_option(argc, argv, "-d"))) {
logs = std::atoi(p);
}
lws_set_log_level(logs, nullptr);
spdlog::info("jambonz recording server (ws) version {} | Listening on http://localhost:{}",
UPLOADER_VERSION, port);
// Initialize info struct
std::memset(&info, 0, sizeof(info));
info.port = port;
info.protocols = protocols;
#if defined(LWS_WITH_PLUGINS)
info.pvo = &pvo;
#endif
info.options = LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE |
LWS_SERVER_OPTION_LIBEV;
#if defined(LWS_WITH_TLS)
if (lws_cmdline_option(argc, argv, "-s")) {
spdlog::info("Server using TLS");
info.options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
info.ssl_cert_filepath = "localhost-100y.cert";
info.ssl_private_key_filepath = "localhost-100y.key";
}
#endif
if (lws_cmdline_option(argc, argv, "-h")) {
info.options |= LWS_SERVER_OPTION_VHOST_UPG_STRICT_HOST_CHECK;
}
if (lws_cmdline_option(argc, argv, "-v")) {
info.retry_and_idle_policy = &retry;
}
// create our own libev loop
loop = ev_loop_new(EVFLAG_NOSIGMASK);
if (!loop) {
spdlog::info("Failed to create libev event loop");
return 1;
}
// provide our event loop to lws
void *foreign_loops[1] = { loop };
info.foreign_loops = foreign_loops;
context = lws_create_context(&info);
if (!context) {
spdlog::info("lws init failed\n");
ev_loop_destroy(loop);
return 1;
}
// SET UP ASYNC WATCHER FOR CLEAN SHUTDOWN
ev_async_init(&stop_watcher, [](EV_P_ ev_async *w, int revents) {
// Just waking up is enough - the while loop checks stopFlag
});
ev_async_start(loop, &stop_watcher);
// Store for shutdown from other threads
m_loop = loop;
m_stop_watcher = &stop_watcher;
spdlog::info("starting event loop...");
while (!interrupted) {
ev_run(loop, EVRUN_ONCE);
}
spdlog::info("Shutting down server");
lws_context_destroy(context);
ev_loop_destroy(loop);
spdlog::info("LWS context destroyed");
// Shutdown thread pool
threadPool.shutdown();
S3ClientManager::getInstance().shutdown();
// Shutdown CloudWatch client
CloudWatchClient::getInstance().stopMetricsPublishing();
} catch (const std::exception &e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
// Shutdown AWS SDK
spdlog::info("Shutting down AWS SDK");
Aws::ShutdownAPI(options);
spdlog::info("AWS SDK shutdown complete");
return 0;
}