Line data Source code
1 : /*
2 : * Copyright 2004-2024 the Pacemaker project contributors
3 : *
4 : * The version control history for this file may have further details.
5 : *
6 : * This source code is licensed under the GNU Lesser General Public License
7 : * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8 : */
9 :
10 : #include <crm_internal.h>
11 :
12 : #include <stdio.h>
13 : #include <errno.h>
14 : #include <bzlib.h>
15 : #include <sys/stat.h>
16 : #include <sys/types.h>
17 :
18 : #include <crm/crm.h>
19 : #include <crm/common/xml.h>
20 : #include <crm/common/ipc.h>
21 : #include <crm/common/ipc_internal.h>
22 : #include "crmcommon_private.h"
23 :
24 : /* Evict clients whose event queue grows this large (by default) */
25 : #define PCMK_IPC_DEFAULT_QUEUE_MAX 500
26 :
27 : static GHashTable *client_connections = NULL;
28 :
29 : /*!
30 : * \internal
31 : * \brief Count IPC clients
32 : *
33 : * \return Number of active IPC client connections
34 : */
35 : guint
36 0 : pcmk__ipc_client_count(void)
37 : {
38 0 : return client_connections? g_hash_table_size(client_connections) : 0;
39 : }
40 :
41 : /*!
42 : * \internal
43 : * \brief Execute a function for each active IPC client connection
44 : *
45 : * \param[in] func Function to call
46 : * \param[in,out] user_data Pointer to pass to function
47 : *
48 : * \note The parameters are the same as for g_hash_table_foreach().
49 : */
50 : void
51 0 : pcmk__foreach_ipc_client(GHFunc func, gpointer user_data)
52 : {
53 0 : if ((func != NULL) && (client_connections != NULL)) {
54 0 : g_hash_table_foreach(client_connections, func, user_data);
55 : }
56 0 : }
57 :
58 : pcmk__client_t *
59 0 : pcmk__find_client(const qb_ipcs_connection_t *c)
60 : {
61 0 : if (client_connections) {
62 0 : return g_hash_table_lookup(client_connections, c);
63 : }
64 :
65 0 : crm_trace("No client found for %p", c);
66 0 : return NULL;
67 : }
68 :
69 : pcmk__client_t *
70 0 : pcmk__find_client_by_id(const char *id)
71 : {
72 0 : if ((client_connections != NULL) && (id != NULL)) {
73 : gpointer key;
74 0 : pcmk__client_t *client = NULL;
75 : GHashTableIter iter;
76 :
77 0 : g_hash_table_iter_init(&iter, client_connections);
78 0 : while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
79 0 : if (strcmp(client->id, id) == 0) {
80 0 : return client;
81 : }
82 : }
83 : }
84 0 : crm_trace("No client found with id='%s'", pcmk__s(id, ""));
85 0 : return NULL;
86 : }
87 :
88 : /*!
89 : * \internal
90 : * \brief Get a client identifier for use in log messages
91 : *
92 : * \param[in] c Client
93 : *
94 : * \return Client's name, client's ID, or a string literal, as available
95 : * \note This is intended to be used in format strings like "client %s".
96 : */
97 : const char *
98 0 : pcmk__client_name(const pcmk__client_t *c)
99 : {
100 0 : if (c == NULL) {
101 0 : return "(unspecified)";
102 :
103 0 : } else if (c->name != NULL) {
104 0 : return c->name;
105 :
106 0 : } else if (c->id != NULL) {
107 0 : return c->id;
108 :
109 : } else {
110 0 : return "(unidentified)";
111 : }
112 : }
113 :
114 : void
115 0 : pcmk__client_cleanup(void)
116 : {
117 0 : if (client_connections != NULL) {
118 0 : int active = g_hash_table_size(client_connections);
119 :
120 0 : if (active > 0) {
121 0 : crm_warn("Exiting with %d active IPC client%s",
122 : active, pcmk__plural_s(active));
123 : }
124 0 : g_hash_table_destroy(client_connections);
125 0 : client_connections = NULL;
126 : }
127 0 : }
128 :
129 : void
130 0 : pcmk__drop_all_clients(qb_ipcs_service_t *service)
131 : {
132 0 : qb_ipcs_connection_t *c = NULL;
133 :
134 0 : if (service == NULL) {
135 0 : return;
136 : }
137 :
138 0 : c = qb_ipcs_connection_first_get(service);
139 :
140 0 : while (c != NULL) {
141 0 : qb_ipcs_connection_t *last = c;
142 :
143 0 : c = qb_ipcs_connection_next_get(service, last);
144 :
145 : /* There really shouldn't be anyone connected at this point */
146 0 : crm_notice("Disconnecting client %p, pid=%d...",
147 : last, pcmk__client_pid(last));
148 0 : qb_ipcs_disconnect(last);
149 0 : qb_ipcs_connection_unref(last);
150 : }
151 : }
152 :
153 : /*!
154 : * \internal
155 : * \brief Allocate a new pcmk__client_t object based on an IPC connection
156 : *
157 : * \param[in] c IPC connection (NULL to allocate generic client)
158 : * \param[in] key Connection table key (NULL to use sane default)
159 : * \param[in] uid_client UID corresponding to c (ignored if c is NULL)
160 : *
161 : * \return Pointer to new pcmk__client_t (guaranteed not to be \c NULL)
162 : */
163 : static pcmk__client_t *
164 0 : client_from_connection(qb_ipcs_connection_t *c, void *key, uid_t uid_client)
165 : {
166 0 : pcmk__client_t *client = pcmk__assert_alloc(1, sizeof(pcmk__client_t));
167 :
168 0 : if (c) {
169 0 : client->user = pcmk__uid2username(uid_client);
170 0 : if (client->user == NULL) {
171 0 : client->user = pcmk__str_copy("#unprivileged");
172 0 : crm_err("Unable to enforce ACLs for user ID %d, assuming unprivileged",
173 : uid_client);
174 : }
175 0 : client->ipcs = c;
176 0 : pcmk__set_client_flags(client, pcmk__client_ipc);
177 0 : client->pid = pcmk__client_pid(c);
178 0 : if (key == NULL) {
179 0 : key = c;
180 : }
181 : }
182 :
183 0 : client->id = crm_generate_uuid();
184 0 : if (key == NULL) {
185 0 : key = client->id;
186 : }
187 0 : if (client_connections == NULL) {
188 0 : crm_trace("Creating IPC client table");
189 0 : client_connections = g_hash_table_new(g_direct_hash, g_direct_equal);
190 : }
191 0 : g_hash_table_insert(client_connections, key, client);
192 0 : return client;
193 : }
194 :
195 : /*!
196 : * \brief Allocate a new pcmk__client_t object and generate its ID
197 : *
198 : * \param[in] key What to use as connections hash table key (NULL to use ID)
199 : *
200 : * \return Pointer to new pcmk__client_t (asserts on failure)
201 : */
202 : pcmk__client_t *
203 0 : pcmk__new_unauth_client(void *key)
204 : {
205 0 : return client_from_connection(NULL, key, 0);
206 : }
207 :
208 : pcmk__client_t *
209 0 : pcmk__new_client(qb_ipcs_connection_t *c, uid_t uid_client, gid_t gid_client)
210 : {
211 0 : gid_t uid_cluster = 0;
212 0 : gid_t gid_cluster = 0;
213 :
214 0 : pcmk__client_t *client = NULL;
215 :
216 0 : CRM_CHECK(c != NULL, return NULL);
217 :
218 0 : if (pcmk_daemon_user(&uid_cluster, &gid_cluster) < 0) {
219 : static bool need_log = TRUE;
220 :
221 0 : if (need_log) {
222 0 : crm_warn("Could not find user and group IDs for user %s",
223 : CRM_DAEMON_USER);
224 0 : need_log = FALSE;
225 : }
226 : }
227 :
228 0 : if (uid_client != 0) {
229 0 : crm_trace("Giving group %u access to new IPC connection", gid_cluster);
230 : /* Passing -1 to chown(2) means don't change */
231 0 : qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
232 : }
233 :
234 : /* TODO: Do our own auth checking, return NULL if unauthorized */
235 0 : client = client_from_connection(c, NULL, uid_client);
236 :
237 0 : if ((uid_client == 0) || (uid_client == uid_cluster)) {
238 : /* Remember when a connection came from root or hacluster */
239 0 : pcmk__set_client_flags(client, pcmk__client_privileged);
240 : }
241 :
242 0 : crm_debug("New IPC client %s for PID %u with uid %d and gid %d",
243 : client->id, client->pid, uid_client, gid_client);
244 0 : return client;
245 : }
246 :
247 : static struct iovec *
248 0 : pcmk__new_ipc_event(void)
249 : {
250 0 : return (struct iovec *) pcmk__assert_alloc(2, sizeof(struct iovec));
251 : }
252 :
253 : /*!
254 : * \brief Free an I/O vector created by pcmk__ipc_prepare_iov()
255 : *
256 : * \param[in,out] event I/O vector to free
257 : */
258 : void
259 0 : pcmk_free_ipc_event(struct iovec *event)
260 : {
261 0 : if (event != NULL) {
262 0 : free(event[0].iov_base);
263 0 : free(event[1].iov_base);
264 0 : free(event);
265 : }
266 0 : }
267 :
268 : static void
269 0 : free_event(gpointer data)
270 : {
271 0 : pcmk_free_ipc_event((struct iovec *) data);
272 0 : }
273 :
274 : static void
275 0 : add_event(pcmk__client_t *c, struct iovec *iov)
276 : {
277 0 : if (c->event_queue == NULL) {
278 0 : c->event_queue = g_queue_new();
279 : }
280 0 : g_queue_push_tail(c->event_queue, iov);
281 0 : }
282 :
283 : void
284 0 : pcmk__free_client(pcmk__client_t *c)
285 : {
286 0 : if (c == NULL) {
287 0 : return;
288 : }
289 :
290 0 : if (client_connections) {
291 0 : if (c->ipcs) {
292 0 : crm_trace("Destroying %p/%p (%d remaining)",
293 : c, c->ipcs, g_hash_table_size(client_connections) - 1);
294 0 : g_hash_table_remove(client_connections, c->ipcs);
295 :
296 : } else {
297 0 : crm_trace("Destroying remote connection %p (%d remaining)",
298 : c, g_hash_table_size(client_connections) - 1);
299 0 : g_hash_table_remove(client_connections, c->id);
300 : }
301 : }
302 :
303 0 : if (c->event_timer) {
304 0 : g_source_remove(c->event_timer);
305 : }
306 :
307 0 : if (c->event_queue) {
308 0 : crm_debug("Destroying %d events", g_queue_get_length(c->event_queue));
309 0 : g_queue_free_full(c->event_queue, free_event);
310 : }
311 :
312 0 : free(c->id);
313 0 : free(c->name);
314 0 : free(c->user);
315 0 : if (c->remote) {
316 0 : if (c->remote->auth_timeout) {
317 0 : g_source_remove(c->remote->auth_timeout);
318 : }
319 0 : free(c->remote->buffer);
320 0 : free(c->remote);
321 : }
322 0 : free(c);
323 : }
324 :
325 : /*!
326 : * \internal
327 : * \brief Raise IPC eviction threshold for a client, if allowed
328 : *
329 : * \param[in,out] client Client to modify
330 : * \param[in] qmax New threshold (as non-NULL string)
331 : *
332 : * \return true if change was allowed, false otherwise
333 : */
334 : bool
335 0 : pcmk__set_client_queue_max(pcmk__client_t *client, const char *qmax)
336 : {
337 0 : if (pcmk_is_set(client->flags, pcmk__client_privileged)) {
338 : long long qmax_ll;
339 :
340 0 : if ((pcmk__scan_ll(qmax, &qmax_ll, 0LL) == pcmk_rc_ok)
341 0 : && (qmax_ll > 0LL) && (qmax_ll <= UINT_MAX)) {
342 0 : client->queue_max = (unsigned int) qmax_ll;
343 0 : return true;
344 : }
345 : }
346 0 : return false;
347 : }
348 :
349 : int
350 0 : pcmk__client_pid(qb_ipcs_connection_t *c)
351 : {
352 : struct qb_ipcs_connection_stats stats;
353 :
354 0 : stats.client_pid = 0;
355 0 : qb_ipcs_connection_stats_get(c, &stats, 0);
356 0 : return stats.client_pid;
357 : }
358 :
359 : /*!
360 : * \internal
361 : * \brief Retrieve message XML from data read from client IPC
362 : *
363 : * \param[in,out] c IPC client connection
364 : * \param[in] data Data read from client connection
365 : * \param[out] id Where to store message ID from libqb header
366 : * \param[out] flags Where to store flags from libqb header
367 : *
368 : * \return Message XML on success, NULL otherwise
369 : */
370 : xmlNode *
371 0 : pcmk__client_data2xml(pcmk__client_t *c, void *data, uint32_t *id,
372 : uint32_t *flags)
373 : {
374 0 : xmlNode *xml = NULL;
375 0 : char *uncompressed = NULL;
376 0 : char *text = ((char *)data) + sizeof(pcmk__ipc_header_t);
377 0 : pcmk__ipc_header_t *header = data;
378 :
379 0 : if (!pcmk__valid_ipc_header(header)) {
380 0 : return NULL;
381 : }
382 :
383 0 : if (id) {
384 0 : *id = ((struct qb_ipc_response_header *)data)->id;
385 : }
386 0 : if (flags) {
387 0 : *flags = header->flags;
388 : }
389 :
390 0 : if (pcmk_is_set(header->flags, crm_ipc_proxied)) {
391 : /* Mark this client as being the endpoint of a proxy connection.
392 : * Proxy connections responses are sent on the event channel, to avoid
393 : * blocking the controller serving as proxy.
394 : */
395 0 : pcmk__set_client_flags(c, pcmk__client_proxied);
396 : }
397 :
398 0 : if (header->size_compressed) {
399 0 : int rc = 0;
400 0 : unsigned int size_u = 1 + header->size_uncompressed;
401 0 : uncompressed = pcmk__assert_alloc(1, size_u);
402 :
403 0 : crm_trace("Decompressing message data %u bytes into %u bytes",
404 : header->size_compressed, size_u);
405 :
406 0 : rc = BZ2_bzBuffToBuffDecompress(uncompressed, &size_u, text, header->size_compressed, 1, 0);
407 0 : text = uncompressed;
408 :
409 0 : rc = pcmk__bzlib2rc(rc);
410 :
411 0 : if (rc != pcmk_rc_ok) {
412 0 : crm_err("Decompression failed: %s " CRM_XS " rc=%d",
413 : pcmk_rc_str(rc), rc);
414 0 : free(uncompressed);
415 0 : return NULL;
416 : }
417 : }
418 :
419 0 : CRM_ASSERT(text[header->size_uncompressed - 1] == 0);
420 :
421 0 : xml = pcmk__xml_parse(text);
422 0 : crm_log_xml_trace(xml, "[IPC received]");
423 :
424 0 : free(uncompressed);
425 0 : return xml;
426 : }
427 :
428 : static int crm_ipcs_flush_events(pcmk__client_t *c);
429 :
430 : static gboolean
431 0 : crm_ipcs_flush_events_cb(gpointer data)
432 : {
433 0 : pcmk__client_t *c = data;
434 :
435 0 : c->event_timer = 0;
436 0 : crm_ipcs_flush_events(c);
437 0 : return FALSE;
438 : }
439 :
440 : /*!
441 : * \internal
442 : * \brief Add progressive delay before next event queue flush
443 : *
444 : * \param[in,out] c Client connection to add delay to
445 : * \param[in] queue_len Current event queue length
446 : */
447 : static inline void
448 0 : delay_next_flush(pcmk__client_t *c, unsigned int queue_len)
449 : {
450 : /* Delay a maximum of 1.5 seconds */
451 0 : guint delay = (queue_len < 5)? (1000 + 100 * queue_len) : 1500;
452 :
453 0 : c->event_timer = g_timeout_add(delay, crm_ipcs_flush_events_cb, c);
454 0 : }
455 :
456 : /*!
457 : * \internal
458 : * \brief Send client any messages in its queue
459 : *
460 : * \param[in,out] c Client to flush
461 : *
462 : * \return Standard Pacemaker return value
463 : */
464 : static int
465 0 : crm_ipcs_flush_events(pcmk__client_t *c)
466 : {
467 0 : int rc = pcmk_rc_ok;
468 0 : ssize_t qb_rc = 0;
469 0 : unsigned int sent = 0;
470 0 : unsigned int queue_len = 0;
471 :
472 0 : if (c == NULL) {
473 0 : return rc;
474 :
475 0 : } else if (c->event_timer) {
476 : /* There is already a timer, wait until it goes off */
477 0 : crm_trace("Timer active for %p - %d", c->ipcs, c->event_timer);
478 0 : return rc;
479 : }
480 :
481 0 : if (c->event_queue) {
482 0 : queue_len = g_queue_get_length(c->event_queue);
483 : }
484 0 : while (sent < 100) {
485 0 : pcmk__ipc_header_t *header = NULL;
486 0 : struct iovec *event = NULL;
487 :
488 0 : if (c->event_queue) {
489 : // We don't pop unless send is successful
490 0 : event = g_queue_peek_head(c->event_queue);
491 : }
492 0 : if (event == NULL) { // Queue is empty
493 0 : break;
494 : }
495 :
496 0 : qb_rc = qb_ipcs_event_sendv(c->ipcs, event, 2);
497 0 : if (qb_rc < 0) {
498 0 : rc = (int) -qb_rc;
499 0 : break;
500 : }
501 0 : event = g_queue_pop_head(c->event_queue);
502 :
503 0 : sent++;
504 0 : header = event[0].iov_base;
505 0 : if (header->size_compressed) {
506 0 : crm_trace("Event %d to %p[%d] (%lld compressed bytes) sent",
507 : header->qb.id, c->ipcs, c->pid, (long long) qb_rc);
508 : } else {
509 0 : crm_trace("Event %d to %p[%d] (%lld bytes) sent: %.120s",
510 : header->qb.id, c->ipcs, c->pid, (long long) qb_rc,
511 : (char *) (event[1].iov_base));
512 : }
513 0 : pcmk_free_ipc_event(event);
514 : }
515 :
516 0 : queue_len -= sent;
517 0 : if (sent > 0 || queue_len) {
518 0 : crm_trace("Sent %d events (%d remaining) for %p[%d]: %s (%lld)",
519 : sent, queue_len, c->ipcs, c->pid,
520 : pcmk_rc_str(rc), (long long) qb_rc);
521 : }
522 :
523 0 : if (queue_len) {
524 :
525 : /* Allow clients to briefly fall behind on processing incoming messages,
526 : * but drop completely unresponsive clients so the connection doesn't
527 : * consume resources indefinitely.
528 : */
529 0 : if (queue_len > QB_MAX(c->queue_max, PCMK_IPC_DEFAULT_QUEUE_MAX)) {
530 0 : if ((c->queue_backlog <= 1) || (queue_len < c->queue_backlog)) {
531 : /* Don't evict for a new or shrinking backlog */
532 0 : crm_warn("Client with process ID %u has a backlog of %u messages "
533 : CRM_XS " %p", c->pid, queue_len, c->ipcs);
534 : } else {
535 0 : crm_err("Evicting client with process ID %u due to backlog of %u messages "
536 : CRM_XS " %p", c->pid, queue_len, c->ipcs);
537 0 : c->queue_backlog = 0;
538 0 : qb_ipcs_disconnect(c->ipcs);
539 0 : return rc;
540 : }
541 : }
542 :
543 0 : c->queue_backlog = queue_len;
544 0 : delay_next_flush(c, queue_len);
545 :
546 : } else {
547 : /* Event queue is empty, there is no backlog */
548 0 : c->queue_backlog = 0;
549 : }
550 :
551 0 : return rc;
552 : }
553 :
554 : /*!
555 : * \internal
556 : * \brief Create an I/O vector for sending an IPC XML message
557 : *
558 : * \param[in] request Identifier for libqb response header
559 : * \param[in] message XML message to send
560 : * \param[in] max_send_size If 0, default IPC buffer size is used
561 : * \param[out] result Where to store prepared I/O vector
562 : * \param[out] bytes Size of prepared data in bytes
563 : *
564 : * \return Standard Pacemaker return code
565 : */
566 : int
567 0 : pcmk__ipc_prepare_iov(uint32_t request, const xmlNode *message,
568 : uint32_t max_send_size, struct iovec **result,
569 : ssize_t *bytes)
570 : {
571 : struct iovec *iov;
572 0 : unsigned int total = 0;
573 0 : GString *buffer = NULL;
574 0 : pcmk__ipc_header_t *header = NULL;
575 0 : int rc = pcmk_rc_ok;
576 :
577 0 : if ((message == NULL) || (result == NULL)) {
578 0 : rc = EINVAL;
579 0 : goto done;
580 : }
581 :
582 0 : header = calloc(1, sizeof(pcmk__ipc_header_t));
583 0 : if (header == NULL) {
584 0 : rc = ENOMEM;
585 0 : goto done;
586 : }
587 :
588 0 : buffer = g_string_sized_new(1024);
589 0 : pcmk__xml_string(message, 0, buffer, 0);
590 :
591 0 : if (max_send_size == 0) {
592 0 : max_send_size = crm_ipc_default_buffer_size();
593 : }
594 0 : CRM_LOG_ASSERT(max_send_size != 0);
595 :
596 0 : *result = NULL;
597 0 : iov = pcmk__new_ipc_event();
598 0 : iov[0].iov_len = sizeof(pcmk__ipc_header_t);
599 0 : iov[0].iov_base = header;
600 :
601 0 : header->version = PCMK__IPC_VERSION;
602 0 : header->size_uncompressed = 1 + buffer->len;
603 0 : total = iov[0].iov_len + header->size_uncompressed;
604 :
605 0 : if (total < max_send_size) {
606 0 : iov[1].iov_base = pcmk__str_copy(buffer->str);
607 0 : iov[1].iov_len = header->size_uncompressed;
608 :
609 : } else {
610 : static unsigned int biggest = 0;
611 :
612 0 : char *compressed = NULL;
613 0 : unsigned int new_size = 0;
614 :
615 0 : if (pcmk__compress(buffer->str,
616 0 : (unsigned int) header->size_uncompressed,
617 : (unsigned int) max_send_size, &compressed,
618 : &new_size) == pcmk_rc_ok) {
619 :
620 0 : pcmk__set_ipc_flags(header->flags, "send data", crm_ipc_compressed);
621 0 : header->size_compressed = new_size;
622 :
623 0 : iov[1].iov_len = header->size_compressed;
624 0 : iov[1].iov_base = compressed;
625 :
626 0 : biggest = QB_MAX(header->size_compressed, biggest);
627 :
628 : } else {
629 0 : crm_log_xml_trace(message, "EMSGSIZE");
630 0 : biggest = QB_MAX(header->size_uncompressed, biggest);
631 :
632 0 : crm_err("Could not compress %u-byte message into less than IPC "
633 : "limit of %u bytes; set PCMK_ipc_buffer to higher value "
634 : "(%u bytes suggested)",
635 : header->size_uncompressed, max_send_size, 4 * biggest);
636 :
637 0 : free(compressed);
638 0 : pcmk_free_ipc_event(iov);
639 0 : rc = EMSGSIZE;
640 0 : goto done;
641 : }
642 : }
643 :
644 0 : header->qb.size = iov[0].iov_len + iov[1].iov_len;
645 0 : header->qb.id = (int32_t)request; /* Replying to a specific request */
646 :
647 0 : *result = iov;
648 0 : CRM_ASSERT(header->qb.size > 0);
649 0 : if (bytes != NULL) {
650 0 : *bytes = header->qb.size;
651 : }
652 :
653 0 : done:
654 0 : if (buffer != NULL) {
655 0 : g_string_free(buffer, TRUE);
656 : }
657 0 : return rc;
658 : }
659 :
660 : int
661 0 : pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags)
662 : {
663 0 : int rc = pcmk_rc_ok;
664 : static uint32_t id = 1;
665 0 : pcmk__ipc_header_t *header = iov[0].iov_base;
666 :
667 0 : if (c->flags & pcmk__client_proxied) {
668 : /* _ALL_ replies to proxied connections need to be sent as events */
669 0 : if (!pcmk_is_set(flags, crm_ipc_server_event)) {
670 : /* The proxied flag lets us know this was originally meant to be a
671 : * response, even though we're sending it over the event channel.
672 : */
673 0 : pcmk__set_ipc_flags(flags, "server event",
674 : crm_ipc_server_event
675 : |crm_ipc_proxied_relay_response);
676 : }
677 : }
678 :
679 0 : pcmk__set_ipc_flags(header->flags, "server event", flags);
680 0 : if (flags & crm_ipc_server_event) {
681 0 : header->qb.id = id++; /* We don't really use it, but doesn't hurt to set one */
682 :
683 0 : if (flags & crm_ipc_server_free) {
684 0 : crm_trace("Sending the original to %p[%d]", c->ipcs, c->pid);
685 0 : add_event(c, iov);
686 :
687 : } else {
688 0 : struct iovec *iov_copy = pcmk__new_ipc_event();
689 :
690 0 : crm_trace("Sending a copy to %p[%d]", c->ipcs, c->pid);
691 0 : iov_copy[0].iov_len = iov[0].iov_len;
692 0 : iov_copy[0].iov_base = malloc(iov[0].iov_len);
693 0 : memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
694 :
695 0 : iov_copy[1].iov_len = iov[1].iov_len;
696 0 : iov_copy[1].iov_base = malloc(iov[1].iov_len);
697 0 : memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
698 :
699 0 : add_event(c, iov_copy);
700 : }
701 :
702 : } else {
703 : ssize_t qb_rc;
704 :
705 0 : CRM_LOG_ASSERT(header->qb.id != 0); /* Replying to a specific request */
706 :
707 0 : qb_rc = qb_ipcs_response_sendv(c->ipcs, iov, 2);
708 0 : if (qb_rc < header->qb.size) {
709 0 : if (qb_rc < 0) {
710 0 : rc = (int) -qb_rc;
711 : }
712 0 : crm_notice("Response %d to pid %d failed: %s "
713 : CRM_XS " bytes=%u rc=%lld ipcs=%p",
714 : header->qb.id, c->pid, pcmk_rc_str(rc),
715 : header->qb.size, (long long) qb_rc, c->ipcs);
716 :
717 : } else {
718 0 : crm_trace("Response %d sent, %lld bytes to %p[%d]",
719 : header->qb.id, (long long) qb_rc, c->ipcs, c->pid);
720 : }
721 :
722 0 : if (flags & crm_ipc_server_free) {
723 0 : pcmk_free_ipc_event(iov);
724 : }
725 : }
726 :
727 0 : if (flags & crm_ipc_server_event) {
728 0 : rc = crm_ipcs_flush_events(c);
729 : } else {
730 0 : crm_ipcs_flush_events(c);
731 : }
732 :
733 0 : if ((rc == EPIPE) || (rc == ENOTCONN)) {
734 0 : crm_trace("Client %p disconnected", c->ipcs);
735 : }
736 0 : return rc;
737 : }
738 :
739 : int
740 0 : pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message,
741 : uint32_t flags)
742 : {
743 0 : struct iovec *iov = NULL;
744 0 : int rc = pcmk_rc_ok;
745 :
746 0 : if (c == NULL) {
747 0 : return EINVAL;
748 : }
749 0 : rc = pcmk__ipc_prepare_iov(request, message, crm_ipc_default_buffer_size(),
750 : &iov, NULL);
751 0 : if (rc == pcmk_rc_ok) {
752 0 : pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free);
753 0 : rc = pcmk__ipc_send_iov(c, iov, flags);
754 : } else {
755 0 : pcmk_free_ipc_event(iov);
756 0 : crm_notice("IPC message to pid %d failed: %s " CRM_XS " rc=%d",
757 : c->pid, pcmk_rc_str(rc), rc);
758 : }
759 0 : return rc;
760 : }
761 :
762 : /*!
763 : * \internal
764 : * \brief Create an acknowledgement with a status code to send to a client
765 : *
766 : * \param[in] function Calling function
767 : * \param[in] line Source file line within calling function
768 : * \param[in] flags IPC flags to use when sending
769 : * \param[in] tag Element name to use for acknowledgement
770 : * \param[in] ver IPC protocol version (can be NULL)
771 : * \param[in] status Exit status code to add to ack
772 : *
773 : * \return Newly created XML for ack
774 : * \note The caller is responsible for freeing the return value with free_xml().
775 : */
776 : xmlNode *
777 0 : pcmk__ipc_create_ack_as(const char *function, int line, uint32_t flags,
778 : const char *tag, const char *ver, crm_exit_t status)
779 : {
780 0 : xmlNode *ack = NULL;
781 :
782 0 : if (pcmk_is_set(flags, crm_ipc_client_response)) {
783 0 : ack = pcmk__xe_create(NULL, tag);
784 0 : crm_xml_add(ack, PCMK_XA_FUNCTION, function);
785 0 : crm_xml_add_int(ack, PCMK__XA_LINE, line);
786 0 : crm_xml_add_int(ack, PCMK_XA_STATUS, (int) status);
787 0 : crm_xml_add(ack, PCMK__XA_IPC_PROTO_VERSION, ver);
788 : }
789 0 : return ack;
790 : }
791 :
792 : /*!
793 : * \internal
794 : * \brief Send an acknowledgement with a status code to a client
795 : *
796 : * \param[in] function Calling function
797 : * \param[in] line Source file line within calling function
798 : * \param[in] c Client to send ack to
799 : * \param[in] request Request ID being replied to
800 : * \param[in] flags IPC flags to use when sending
801 : * \param[in] tag Element name to use for acknowledgement
802 : * \param[in] ver IPC protocol version (can be NULL)
803 : * \param[in] status Status code to send with acknowledgement
804 : *
805 : * \return Standard Pacemaker return code
806 : */
807 : int
808 0 : pcmk__ipc_send_ack_as(const char *function, int line, pcmk__client_t *c,
809 : uint32_t request, uint32_t flags, const char *tag,
810 : const char *ver, crm_exit_t status)
811 : {
812 0 : int rc = pcmk_rc_ok;
813 0 : xmlNode *ack = pcmk__ipc_create_ack_as(function, line, flags, tag, ver, status);
814 :
815 0 : if (ack != NULL) {
816 0 : crm_trace("Ack'ing IPC message from client %s as <%s status=%d>",
817 : pcmk__client_name(c), tag, status);
818 0 : crm_log_xml_trace(ack, "sent-ack");
819 0 : c->request_id = 0;
820 0 : rc = pcmk__ipc_send_xml(c, request, ack, flags);
821 0 : free_xml(ack);
822 : }
823 0 : return rc;
824 : }
825 :
826 : /*!
827 : * \internal
828 : * \brief Add an IPC server to the main loop for the pacemaker-based API
829 : *
830 : * \param[out] ipcs_ro New IPC server for read-only pacemaker-based API
831 : * \param[out] ipcs_rw New IPC server for read/write pacemaker-based API
832 : * \param[out] ipcs_shm New IPC server for shared-memory pacemaker-based API
833 : * \param[in] ro_cb IPC callbacks for read-only API
834 : * \param[in] rw_cb IPC callbacks for read/write and shared-memory APIs
835 : *
836 : * \note This function exits fatally if unable to create the servers.
837 : */
838 0 : void pcmk__serve_based_ipc(qb_ipcs_service_t **ipcs_ro,
839 : qb_ipcs_service_t **ipcs_rw,
840 : qb_ipcs_service_t **ipcs_shm,
841 : struct qb_ipcs_service_handlers *ro_cb,
842 : struct qb_ipcs_service_handlers *rw_cb)
843 : {
844 0 : *ipcs_ro = mainloop_add_ipc_server(PCMK__SERVER_BASED_RO,
845 : QB_IPC_NATIVE, ro_cb);
846 :
847 0 : *ipcs_rw = mainloop_add_ipc_server(PCMK__SERVER_BASED_RW,
848 : QB_IPC_NATIVE, rw_cb);
849 :
850 0 : *ipcs_shm = mainloop_add_ipc_server(PCMK__SERVER_BASED_SHM,
851 : QB_IPC_SHM, rw_cb);
852 :
853 0 : if (*ipcs_ro == NULL || *ipcs_rw == NULL || *ipcs_shm == NULL) {
854 0 : crm_err("Failed to create the CIB manager: exiting and inhibiting respawn");
855 0 : crm_warn("Verify pacemaker and pacemaker_remote are not both enabled");
856 0 : crm_exit(CRM_EX_FATAL);
857 : }
858 0 : }
859 :
860 : /*!
861 : * \internal
862 : * \brief Destroy IPC servers for pacemaker-based API
863 : *
864 : * \param[out] ipcs_ro IPC server for read-only pacemaker-based API
865 : * \param[out] ipcs_rw IPC server for read/write pacemaker-based API
866 : * \param[out] ipcs_shm IPC server for shared-memory pacemaker-based API
867 : *
868 : * \note This is a convenience function for calling qb_ipcs_destroy() for each
869 : * argument.
870 : */
871 : void
872 0 : pcmk__stop_based_ipc(qb_ipcs_service_t *ipcs_ro,
873 : qb_ipcs_service_t *ipcs_rw,
874 : qb_ipcs_service_t *ipcs_shm)
875 : {
876 0 : qb_ipcs_destroy(ipcs_ro);
877 0 : qb_ipcs_destroy(ipcs_rw);
878 0 : qb_ipcs_destroy(ipcs_shm);
879 0 : }
880 :
881 : /*!
882 : * \internal
883 : * \brief Add an IPC server to the main loop for the pacemaker-controld API
884 : *
885 : * \param[in] cb IPC callbacks
886 : *
887 : * \return Newly created IPC server
888 : */
889 : qb_ipcs_service_t *
890 0 : pcmk__serve_controld_ipc(struct qb_ipcs_service_handlers *cb)
891 : {
892 0 : return mainloop_add_ipc_server(CRM_SYSTEM_CRMD, QB_IPC_NATIVE, cb);
893 : }
894 :
895 : /*!
896 : * \internal
897 : * \brief Add an IPC server to the main loop for the pacemaker-attrd API
898 : *
899 : * \param[out] ipcs Where to store newly created IPC server
900 : * \param[in] cb IPC callbacks
901 : *
902 : * \note This function exits fatally if unable to create the servers.
903 : */
904 : void
905 0 : pcmk__serve_attrd_ipc(qb_ipcs_service_t **ipcs,
906 : struct qb_ipcs_service_handlers *cb)
907 : {
908 0 : *ipcs = mainloop_add_ipc_server(PCMK__VALUE_ATTRD, QB_IPC_NATIVE, cb);
909 :
910 0 : if (*ipcs == NULL) {
911 0 : crm_err("Failed to create pacemaker-attrd server: exiting and inhibiting respawn");
912 0 : crm_warn("Verify pacemaker and pacemaker_remote are not both enabled.");
913 0 : crm_exit(CRM_EX_FATAL);
914 : }
915 0 : }
916 :
917 : /*!
918 : * \internal
919 : * \brief Add an IPC server to the main loop for the pacemaker-fenced API
920 : *
921 : * \param[out] ipcs Where to store newly created IPC server
922 : * \param[in] cb IPC callbacks
923 : *
924 : * \note This function exits fatally if unable to create the servers.
925 : */
926 : void
927 0 : pcmk__serve_fenced_ipc(qb_ipcs_service_t **ipcs,
928 : struct qb_ipcs_service_handlers *cb)
929 : {
930 0 : *ipcs = mainloop_add_ipc_server_with_prio("stonith-ng", QB_IPC_NATIVE, cb,
931 : QB_LOOP_HIGH);
932 :
933 0 : if (*ipcs == NULL) {
934 0 : crm_err("Failed to create fencer: exiting and inhibiting respawn.");
935 0 : crm_warn("Verify pacemaker and pacemaker_remote are not both enabled.");
936 0 : crm_exit(CRM_EX_FATAL);
937 : }
938 0 : }
939 :
940 : /*!
941 : * \internal
942 : * \brief Add an IPC server to the main loop for the pacemakerd API
943 : *
944 : * \param[out] ipcs Where to store newly created IPC server
945 : * \param[in] cb IPC callbacks
946 : *
947 : * \note This function exits with CRM_EX_OSERR if unable to create the servers.
948 : */
949 : void
950 0 : pcmk__serve_pacemakerd_ipc(qb_ipcs_service_t **ipcs,
951 : struct qb_ipcs_service_handlers *cb)
952 : {
953 0 : *ipcs = mainloop_add_ipc_server(CRM_SYSTEM_MCP, QB_IPC_NATIVE, cb);
954 :
955 0 : if (*ipcs == NULL) {
956 0 : crm_err("Couldn't start pacemakerd IPC server");
957 0 : crm_warn("Verify pacemaker and pacemaker_remote are not both enabled.");
958 : /* sub-daemons are observed by pacemakerd. Thus we exit CRM_EX_FATAL
959 : * if we want to prevent pacemakerd from restarting them.
960 : * With pacemakerd we leave the exit-code shown to e.g. systemd
961 : * to what it was prior to moving the code here from pacemakerd.c
962 : */
963 0 : crm_exit(CRM_EX_OSERR);
964 : }
965 0 : }
966 :
967 : /*!
968 : * \internal
969 : * \brief Add an IPC server to the main loop for the pacemaker-schedulerd API
970 : *
971 : * \param[in] cb IPC callbacks
972 : *
973 : * \return Newly created IPC server
974 : * \note This function exits fatally if unable to create the servers.
975 : */
976 : qb_ipcs_service_t *
977 0 : pcmk__serve_schedulerd_ipc(struct qb_ipcs_service_handlers *cb)
978 : {
979 0 : return mainloop_add_ipc_server(CRM_SYSTEM_PENGINE, QB_IPC_NATIVE, cb);
980 : }
981 :
982 : /*!
983 : * \brief Check whether string represents a client name used by cluster daemons
984 : *
985 : * \param[in] name String to check
986 : *
987 : * \return true if name is standard client name used by daemons, false otherwise
988 : *
989 : * \note This is provided by the client, and so cannot be used by itself as a
990 : * secure means of authentication.
991 : */
992 : bool
993 0 : crm_is_daemon_name(const char *name)
994 : {
995 0 : return pcmk__str_any_of(pcmk__message_name(name),
996 : "attrd",
997 : CRM_SYSTEM_CIB,
998 : CRM_SYSTEM_CRMD,
999 : CRM_SYSTEM_DC,
1000 : CRM_SYSTEM_LRMD,
1001 : CRM_SYSTEM_MCP,
1002 : CRM_SYSTEM_PENGINE,
1003 : CRM_SYSTEM_STONITHD,
1004 : CRM_SYSTEM_TENGINE,
1005 : "pacemaker-remoted",
1006 : "stonith-ng",
1007 : NULL);
1008 : }
|