Line data Source code
1 : /*
2 : * Copyright 2004-2024 the Pacemaker project contributors
3 : *
4 : * The version control history for this file may have further details.
5 : *
6 : * This source code is licensed under the GNU Lesser General Public License
7 : * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8 : */
9 :
10 : #include <crm_internal.h>
11 :
12 : #include <arpa/inet.h>
13 : #include <inttypes.h> // PRIu32
14 : #include <netdb.h>
15 : #include <netinet/in.h>
16 : #include <stdbool.h>
17 : #include <stdint.h> // uint32_t
18 : #include <sys/socket.h>
19 : #include <sys/types.h> // size_t
20 : #include <sys/utsname.h>
21 :
22 : #include <bzlib.h>
23 : #include <corosync/corodefs.h>
24 : #include <corosync/corotypes.h>
25 : #include <corosync/hdb.h>
26 : #include <corosync/cpg.h>
27 : #include <qb/qbipc_common.h>
28 : #include <qb/qbipcc.h>
29 : #include <qb/qbutil.h>
30 :
31 : #include <crm/cluster/internal.h>
32 : #include <crm/common/ipc.h>
33 : #include <crm/common/ipc_internal.h> // PCMK__SPECIAL_PID
34 : #include <crm/common/mainloop.h>
35 : #include <crm/common/xml.h>
36 :
37 : #include "crmcluster_private.h"
38 :
39 : /* @TODO Once we can update the public API to require pcmk_cluster_t* in more
40 : * functions, we can ditch this in favor of cluster->cpg_handle.
41 : */
42 : static cpg_handle_t pcmk_cpg_handle = 0;
43 :
44 : // @TODO These could be moved to pcmk_cluster_t* at that time as well
45 : static bool cpg_evicted = false;
46 : static GList *cs_message_queue = NULL;
47 : static int cs_message_timer = 0;
48 :
49 : struct pcmk__cpg_host_s {
50 : uint32_t id;
51 : uint32_t pid;
52 : gboolean local;
53 : enum crm_ais_msg_types type;
54 : uint32_t size;
55 : char uname[MAX_NAME];
56 : } __attribute__ ((packed));
57 :
58 : typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
59 :
60 : struct pcmk__cpg_msg_s {
61 : struct qb_ipc_response_header header __attribute__ ((aligned(8)));
62 : uint32_t id;
63 : gboolean is_compressed;
64 :
65 : pcmk__cpg_host_t host;
66 : pcmk__cpg_host_t sender;
67 :
68 : uint32_t size;
69 : uint32_t compressed_size;
70 : /* 584 bytes */
71 : char data[0];
72 :
73 : } __attribute__ ((packed));
74 :
75 : typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
76 :
77 : static void crm_cs_flush(gpointer data);
78 :
79 : #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
80 :
81 : #define cs_repeat(rc, counter, max, code) do { \
82 : rc = code; \
83 : if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
84 : counter++; \
85 : crm_debug("Retrying operation after %ds", counter); \
86 : sleep(counter); \
87 : } else { \
88 : break; \
89 : } \
90 : } while (counter < max)
91 :
92 : /*!
93 : * \internal
94 : * \brief Get the local Corosync node ID (via CPG)
95 : *
96 : * \param[in] handle CPG connection to use (or 0 to use new connection)
97 : *
98 : * \return Corosync ID of local node (or 0 if not known)
99 : */
100 : uint32_t
101 0 : pcmk__cpg_local_nodeid(cpg_handle_t handle)
102 : {
103 0 : cs_error_t rc = CS_OK;
104 0 : int retries = 0;
105 : static uint32_t local_nodeid = 0;
106 0 : cpg_handle_t local_handle = handle;
107 0 : cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
108 0 : int fd = -1;
109 0 : uid_t found_uid = 0;
110 0 : gid_t found_gid = 0;
111 0 : pid_t found_pid = 0;
112 0 : int rv = 0;
113 :
114 0 : if (local_nodeid != 0) {
115 0 : return local_nodeid;
116 : }
117 :
118 0 : if (handle == 0) {
119 0 : crm_trace("Creating connection");
120 0 : cs_repeat(rc, retries, 5,
121 : cpg_model_initialize(&local_handle, CPG_MODEL_V1,
122 : (cpg_model_data_t *) &cpg_model_info,
123 : NULL));
124 0 : if (rc != CS_OK) {
125 0 : crm_err("Could not connect to the CPG API: %s (%d)",
126 : cs_strerror(rc), rc);
127 0 : return 0;
128 : }
129 :
130 0 : rc = cpg_fd_get(local_handle, &fd);
131 0 : if (rc != CS_OK) {
132 0 : crm_err("Could not obtain the CPG API connection: %s (%d)",
133 : cs_strerror(rc), rc);
134 0 : goto bail;
135 : }
136 :
137 : // CPG provider run as root (at least in given user namespace)?
138 0 : rv = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid,
139 : &found_uid, &found_gid);
140 0 : if (rv == 0) {
141 0 : crm_err("CPG provider is not authentic:"
142 : " process %lld (uid: %lld, gid: %lld)",
143 : (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
144 : (long long) found_uid, (long long) found_gid);
145 0 : goto bail;
146 :
147 0 : } else if (rv < 0) {
148 0 : crm_err("Could not verify authenticity of CPG provider: %s (%d)",
149 : strerror(-rv), -rv);
150 0 : goto bail;
151 : }
152 : }
153 :
154 0 : if (rc == CS_OK) {
155 0 : retries = 0;
156 0 : crm_trace("Performing lookup");
157 0 : cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
158 : }
159 :
160 0 : if (rc != CS_OK) {
161 0 : crm_err("Could not get local node id from the CPG API: %s (%d)",
162 : pcmk__cs_err_str(rc), rc);
163 : }
164 :
165 0 : bail:
166 0 : if (handle == 0) {
167 0 : crm_trace("Closing connection");
168 0 : cpg_finalize(local_handle);
169 : }
170 0 : crm_debug("Local nodeid is %u", local_nodeid);
171 0 : return local_nodeid;
172 : }
173 :
174 : /*!
175 : * \internal
176 : * \brief Callback function for Corosync message queue timer
177 : *
178 : * \param[in] data CPG handle
179 : *
180 : * \return FALSE (to indicate to glib that timer should not be removed)
181 : */
182 : static gboolean
183 0 : crm_cs_flush_cb(gpointer data)
184 : {
185 0 : cs_message_timer = 0;
186 0 : crm_cs_flush(data);
187 0 : return FALSE;
188 : }
189 :
190 : // Send no more than this many CPG messages in one flush
191 : #define CS_SEND_MAX 200
192 :
193 : /*!
194 : * \internal
195 : * \brief Send messages in Corosync CPG message queue
196 : *
197 : * \param[in] data CPG handle
198 : */
199 : static void
200 0 : crm_cs_flush(gpointer data)
201 : {
202 0 : unsigned int sent = 0;
203 0 : guint queue_len = 0;
204 0 : cs_error_t rc = 0;
205 0 : cpg_handle_t *handle = (cpg_handle_t *) data;
206 :
207 0 : if (*handle == 0) {
208 0 : crm_trace("Connection is dead");
209 0 : return;
210 : }
211 :
212 0 : queue_len = g_list_length(cs_message_queue);
213 0 : if (((queue_len % 1000) == 0) && (queue_len > 1)) {
214 0 : crm_err("CPG queue has grown to %d", queue_len);
215 :
216 0 : } else if (queue_len == CS_SEND_MAX) {
217 0 : crm_warn("CPG queue has grown to %d", queue_len);
218 : }
219 :
220 0 : if (cs_message_timer != 0) {
221 : /* There is already a timer, wait until it goes off */
222 0 : crm_trace("Timer active %d", cs_message_timer);
223 0 : return;
224 : }
225 :
226 0 : while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
227 0 : struct iovec *iov = cs_message_queue->data;
228 :
229 0 : rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
230 0 : if (rc != CS_OK) {
231 0 : break;
232 : }
233 :
234 0 : sent++;
235 0 : crm_trace("CPG message sent, size=%llu",
236 : (unsigned long long) iov->iov_len);
237 :
238 0 : cs_message_queue = g_list_remove(cs_message_queue, iov);
239 0 : free(iov->iov_base);
240 0 : free(iov);
241 : }
242 :
243 0 : queue_len -= sent;
244 0 : do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
245 : "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
246 : sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
247 : (int) rc);
248 :
249 0 : if (cs_message_queue) {
250 0 : uint32_t delay_ms = 100;
251 0 : if (rc != CS_OK) {
252 : /* Proportionally more if sending failed but cap at 1s */
253 0 : delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
254 : }
255 0 : cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
256 : }
257 : }
258 :
259 : /*!
260 : * \internal
261 : * \brief Dispatch function for CPG handle
262 : *
263 : * \param[in,out] user_data Cluster object
264 : *
265 : * \return 0 on success, -1 on error (per mainloop_io_t interface)
266 : */
267 : static int
268 0 : pcmk_cpg_dispatch(gpointer user_data)
269 : {
270 0 : cs_error_t rc = CS_OK;
271 0 : pcmk_cluster_t *cluster = (pcmk_cluster_t *) user_data;
272 :
273 0 : rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
274 0 : if (rc != CS_OK) {
275 0 : crm_err("Connection to the CPG API failed: %s (%d)",
276 : pcmk__cs_err_str(rc), rc);
277 0 : cpg_finalize(cluster->cpg_handle);
278 0 : cluster->cpg_handle = 0;
279 0 : return -1;
280 :
281 0 : } else if (cpg_evicted) {
282 0 : crm_err("Evicted from CPG membership");
283 0 : return -1;
284 : }
285 0 : return 0;
286 : }
287 :
288 : static inline const char *
289 0 : ais_dest(const pcmk__cpg_host_t *host)
290 : {
291 0 : if (host->local) {
292 0 : return "local";
293 0 : } else if (host->size > 0) {
294 0 : return host->uname;
295 : } else {
296 0 : return "<all>";
297 : }
298 : }
299 :
300 : static inline const char *
301 0 : msg_type2text(enum crm_ais_msg_types type)
302 : {
303 0 : const char *text = "unknown";
304 :
305 0 : switch (type) {
306 0 : case crm_msg_none:
307 0 : text = "unknown";
308 0 : break;
309 0 : case crm_msg_ais:
310 0 : text = "ais";
311 0 : break;
312 0 : case crm_msg_cib:
313 0 : text = "cib";
314 0 : break;
315 0 : case crm_msg_crmd:
316 0 : text = "crmd";
317 0 : break;
318 0 : case crm_msg_pe:
319 0 : text = "pengine";
320 0 : break;
321 0 : case crm_msg_te:
322 0 : text = "tengine";
323 0 : break;
324 0 : case crm_msg_lrmd:
325 0 : text = "lrmd";
326 0 : break;
327 0 : case crm_msg_attrd:
328 0 : text = "attrd";
329 0 : break;
330 0 : case crm_msg_stonithd:
331 0 : text = "stonithd";
332 0 : break;
333 0 : case crm_msg_stonith_ng:
334 0 : text = "stonith-ng";
335 0 : break;
336 : }
337 0 : return text;
338 : }
339 :
340 : /*!
341 : * \internal
342 : * \brief Check whether a Corosync CPG message is valid
343 : *
344 : * \param[in] msg Corosync CPG message to check
345 : *
346 : * \return true if \p msg is valid, otherwise false
347 : */
348 : static bool
349 0 : check_message_sanity(const pcmk__cpg_msg_t *msg)
350 : {
351 0 : int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
352 :
353 0 : if (payload_size < 1) {
354 0 : crm_err("%sCPG message %d from %s invalid: "
355 : "Claimed size of %d bytes is too small "
356 : CRM_XS " from %s[%u] to %s@%s",
357 : (msg->is_compressed? "Compressed " : ""),
358 : msg->id, ais_dest(&(msg->sender)),
359 : (int) msg->header.size,
360 : msg_type2text(msg->sender.type), msg->sender.pid,
361 : msg_type2text(msg->host.type), ais_dest(&(msg->host)));
362 0 : return false;
363 : }
364 :
365 0 : if (msg->header.error != CS_OK) {
366 0 : crm_err("%sCPG message %d from %s invalid: "
367 : "Sender indicated error %d "
368 : CRM_XS " from %s[%u] to %s@%s",
369 : (msg->is_compressed? "Compressed " : ""),
370 : msg->id, ais_dest(&(msg->sender)),
371 : msg->header.error,
372 : msg_type2text(msg->sender.type), msg->sender.pid,
373 : msg_type2text(msg->host.type), ais_dest(&(msg->host)));
374 0 : return false;
375 : }
376 :
377 0 : if (msg_data_len(msg) != payload_size) {
378 0 : crm_err("%sCPG message %d from %s invalid: "
379 : "Total size %d inconsistent with payload size %d "
380 : CRM_XS " from %s[%u] to %s@%s",
381 : (msg->is_compressed? "Compressed " : ""),
382 : msg->id, ais_dest(&(msg->sender)),
383 : (int) msg->header.size, (int) msg_data_len(msg),
384 : msg_type2text(msg->sender.type), msg->sender.pid,
385 : msg_type2text(msg->host.type), ais_dest(&(msg->host)));
386 0 : return false;
387 : }
388 :
389 0 : if (!msg->is_compressed &&
390 : /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
391 : * but checking the last byte or two should be quick
392 : */
393 0 : (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
394 0 : || (msg->data[msg->size - 1] != '\0'))) {
395 0 : crm_err("CPG message %d from %s invalid: "
396 : "Payload does not end at byte %llu "
397 : CRM_XS " from %s[%u] to %s@%s",
398 : msg->id, ais_dest(&(msg->sender)),
399 : (unsigned long long) msg->size,
400 : msg_type2text(msg->sender.type), msg->sender.pid,
401 : msg_type2text(msg->host.type), ais_dest(&(msg->host)));
402 0 : return false;
403 : }
404 :
405 0 : crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
406 : (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
407 : msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
408 : ais_dest(&(msg->sender)),
409 : msg_type2text(msg->host.type), ais_dest(&(msg->host)));
410 0 : return true;
411 : }
412 :
413 : /*!
414 : * \internal
415 : * \brief Extract text data from a Corosync CPG message
416 : *
417 : * \param[in] handle CPG connection (to get local node ID if not known)
418 : * \param[in] sender_id Corosync ID of node that sent message
419 : * \param[in] pid Process ID of message sender (for logging only)
420 : * \param[in,out] content CPG message
421 : * \param[out] kind If not \c NULL, will be set to CPG header ID
422 : * (which should be an <tt>enum crm_ais_msg_class</tt>
423 : * value, currently always \c crm_class_cluster)
424 : * \param[out] from If not \c NULL, will be set to sender uname
425 : * (valid for the lifetime of \p content)
426 : *
427 : * \return Newly allocated string with message data
428 : *
429 : * \note The caller is responsible for freeing the return value using \c free().
430 : */
431 : char *
432 0 : pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid,
433 : void *content, uint32_t *kind, const char **from)
434 : {
435 0 : char *data = NULL;
436 0 : pcmk__cpg_msg_t *msg = content;
437 :
438 0 : if (handle != 0) {
439 : // Do filtering and field massaging
440 0 : uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
441 0 : const char *local_name = pcmk__cluster_local_node_name();
442 :
443 0 : if ((msg->sender.id != 0) && (msg->sender.id != sender_id)) {
444 0 : crm_err("Nodeid mismatch from %" PRIu32 ".%" PRIu32
445 : ": claimed nodeid=%" PRIu32,
446 : sender_id, pid, msg->sender.id);
447 0 : return NULL;
448 : }
449 0 : if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) {
450 0 : crm_trace("Not for us: %" PRIu32" != %" PRIu32,
451 : msg->host.id, local_nodeid);
452 0 : return NULL;
453 : }
454 0 : if ((msg->host.size > 0)
455 0 : && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
456 :
457 0 : crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
458 0 : return NULL;
459 : }
460 :
461 0 : msg->sender.id = sender_id;
462 0 : if (msg->sender.size == 0) {
463 : const crm_node_t *peer =
464 0 : pcmk__get_node(sender_id, NULL, NULL,
465 : pcmk__node_search_cluster_member);
466 :
467 0 : if (peer->uname == NULL) {
468 0 : crm_err("No uname for peer with nodeid=%u", sender_id);
469 :
470 : } else {
471 0 : crm_notice("Fixing uname for peer with nodeid=%u", sender_id);
472 0 : msg->sender.size = strlen(peer->uname);
473 0 : memset(msg->sender.uname, 0, MAX_NAME);
474 0 : memcpy(msg->sender.uname, peer->uname, msg->sender.size);
475 : }
476 : }
477 : }
478 :
479 0 : crm_trace("Got new%s message (size=%d, %d, %d)",
480 : msg->is_compressed ? " compressed" : "",
481 : msg_data_len(msg), msg->size, msg->compressed_size);
482 :
483 0 : if (kind != NULL) {
484 0 : *kind = msg->header.id;
485 : }
486 0 : if (from != NULL) {
487 0 : *from = msg->sender.uname;
488 : }
489 :
490 0 : if (msg->is_compressed && (msg->size > 0)) {
491 0 : int rc = BZ_OK;
492 0 : char *uncompressed = NULL;
493 0 : unsigned int new_size = msg->size + 1;
494 :
495 0 : if (!check_message_sanity(msg)) {
496 0 : goto badmsg;
497 : }
498 :
499 0 : crm_trace("Decompressing message data");
500 0 : uncompressed = pcmk__assert_alloc(1, new_size);
501 0 : rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data,
502 : msg->compressed_size, 1, 0);
503 :
504 0 : rc = pcmk__bzlib2rc(rc);
505 :
506 0 : if (rc != pcmk_rc_ok) {
507 0 : crm_err("Decompression failed: %s " CRM_XS " rc=%d",
508 : pcmk_rc_str(rc), rc);
509 0 : free(uncompressed);
510 0 : goto badmsg;
511 : }
512 :
513 0 : CRM_ASSERT(new_size == msg->size);
514 :
515 0 : data = uncompressed;
516 :
517 0 : } else if (!check_message_sanity(msg)) {
518 0 : goto badmsg;
519 :
520 : } else {
521 0 : data = strdup(msg->data);
522 : }
523 :
524 : // Is this necessary?
525 0 : pcmk__get_node(msg->sender.id, msg->sender.uname, NULL,
526 : pcmk__node_search_cluster_member);
527 :
528 0 : crm_trace("Payload: %.200s", data);
529 0 : return data;
530 :
531 0 : badmsg:
532 0 : crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
533 : " min=%d, total=%d, size=%d, bz2_size=%d",
534 : msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
535 : ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
536 : msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
537 : msg->header.size, msg->size, msg->compressed_size);
538 :
539 0 : free(data);
540 0 : return NULL;
541 : }
542 :
543 : /*!
544 : * \internal
545 : * \brief Compare cpg_address objects by node ID
546 : *
547 : * \param[in] first First cpg_address structure to compare
548 : * \param[in] second Second cpg_address structure to compare
549 : *
550 : * \return Negative number if first's node ID is lower,
551 : * positive number if first's node ID is greater,
552 : * or 0 if both node IDs are equal
553 : */
554 : static int
555 0 : cmp_member_list_nodeid(const void *first, const void *second)
556 : {
557 0 : const struct cpg_address *const a = *((const struct cpg_address **) first),
558 0 : *const b = *((const struct cpg_address **) second);
559 0 : if (a->nodeid < b->nodeid) {
560 0 : return -1;
561 0 : } else if (a->nodeid > b->nodeid) {
562 0 : return 1;
563 : }
564 : /* don't bother with "reason" nor "pid" */
565 0 : return 0;
566 : }
567 :
568 : /*!
569 : * \internal
570 : * \brief Get a readable string equivalent of a cpg_reason_t value
571 : *
572 : * \param[in] reason CPG reason value
573 : *
574 : * \return Readable string suitable for logging
575 : */
576 : static const char *
577 0 : cpgreason2str(cpg_reason_t reason)
578 : {
579 0 : switch (reason) {
580 0 : case CPG_REASON_JOIN: return " via cpg_join";
581 0 : case CPG_REASON_LEAVE: return " via cpg_leave";
582 0 : case CPG_REASON_NODEDOWN: return " via cluster exit";
583 0 : case CPG_REASON_NODEUP: return " via cluster join";
584 0 : case CPG_REASON_PROCDOWN: return " for unknown reason";
585 0 : default: break;
586 : }
587 0 : return "";
588 : }
589 :
590 : /*!
591 : * \internal
592 : * \brief Get a log-friendly node name
593 : *
594 : * \param[in] peer Node to check
595 : *
596 : * \return Node's uname, or readable string if not known
597 : */
598 : static inline const char *
599 0 : peer_name(const crm_node_t *peer)
600 : {
601 0 : if (peer == NULL) {
602 0 : return "unknown node";
603 0 : } else if (peer->uname == NULL) {
604 0 : return "peer node";
605 : } else {
606 0 : return peer->uname;
607 : }
608 : }
609 :
610 : /*!
611 : * \internal
612 : * \brief Process a CPG peer's leaving the cluster
613 : *
614 : * \param[in] cpg_group_name CPG group name (for logging)
615 : * \param[in] event_counter Event number (for logging)
616 : * \param[in] local_nodeid Node ID of local node
617 : * \param[in] cpg_peer CPG peer that left
618 : * \param[in] sorted_member_list List of remaining members, qsort()-ed by ID
619 : * \param[in] member_list_entries Number of entries in \p sorted_member_list
620 : */
621 : static void
622 0 : node_left(const char *cpg_group_name, int event_counter,
623 : uint32_t local_nodeid, const struct cpg_address *cpg_peer,
624 : const struct cpg_address **sorted_member_list,
625 : size_t member_list_entries)
626 : {
627 : crm_node_t *peer =
628 0 : pcmk__search_node_caches(cpg_peer->nodeid, NULL,
629 : pcmk__node_search_cluster_member);
630 0 : const struct cpg_address **rival = NULL;
631 :
632 : /* Most CPG-related Pacemaker code assumes that only one process on a node
633 : * can be in the process group, but Corosync does not impose this
634 : * limitation, and more than one can be a member in practice due to a
635 : * daemon attempting to start while another instance is already running.
636 : *
637 : * Check for any such duplicate instances, because we don't want to process
638 : * their leaving as if our actual peer left. If the peer that left still has
639 : * an entry in sorted_member_list (with a different PID), we will ignore the
640 : * leaving.
641 : *
642 : * @TODO Track CPG members' PIDs so we can tell exactly who left.
643 : */
644 0 : if (peer != NULL) {
645 0 : rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
646 : sizeof(const struct cpg_address *),
647 : cmp_member_list_nodeid);
648 : }
649 :
650 0 : if (rival == NULL) {
651 0 : crm_info("Group %s event %d: %s (node %u pid %u) left%s",
652 : cpg_group_name, event_counter, peer_name(peer),
653 : cpg_peer->nodeid, cpg_peer->pid,
654 : cpgreason2str(cpg_peer->reason));
655 0 : if (peer != NULL) {
656 0 : crm_update_peer_proc(__func__, peer, crm_proc_cpg,
657 : PCMK_VALUE_OFFLINE);
658 : }
659 0 : } else if (cpg_peer->nodeid == local_nodeid) {
660 0 : crm_warn("Group %s event %d: duplicate local pid %u left%s",
661 : cpg_group_name, event_counter,
662 : cpg_peer->pid, cpgreason2str(cpg_peer->reason));
663 : } else {
664 0 : crm_warn("Group %s event %d: "
665 : "%s (node %u) duplicate pid %u left%s (%u remains)",
666 : cpg_group_name, event_counter, peer_name(peer),
667 : cpg_peer->nodeid, cpg_peer->pid,
668 : cpgreason2str(cpg_peer->reason), (*rival)->pid);
669 : }
670 0 : }
671 :
672 : /*!
673 : * \internal
674 : * \brief Handle a CPG configuration change event
675 : *
676 : * \param[in] handle CPG connection
677 : * \param[in] group_name CPG group name
678 : * \param[in] member_list List of current CPG members
679 : * \param[in] member_list_entries Number of entries in \p member_list
680 : * \param[in] left_list List of CPG members that left
681 : * \param[in] left_list_entries Number of entries in \p left_list
682 : * \param[in] joined_list List of CPG members that joined
683 : * \param[in] joined_list_entries Number of entries in \p joined_list
684 : *
685 : * \note This is of type \c cpg_confchg_fn_t, intended to be used in a
686 : * \c cpg_callbacks_t object.
687 : */
688 : void
689 0 : pcmk__cpg_confchg_cb(cpg_handle_t handle,
690 : const struct cpg_name *group_name,
691 : const struct cpg_address *member_list,
692 : size_t member_list_entries,
693 : const struct cpg_address *left_list,
694 : size_t left_list_entries,
695 : const struct cpg_address *joined_list,
696 : size_t joined_list_entries)
697 : {
698 : static int counter = 0;
699 :
700 0 : bool found = false;
701 0 : uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
702 0 : const struct cpg_address **sorted = NULL;
703 :
704 0 : sorted = pcmk__assert_alloc(member_list_entries,
705 : sizeof(const struct cpg_address *));
706 :
707 0 : for (size_t iter = 0; iter < member_list_entries; iter++) {
708 0 : sorted[iter] = member_list + iter;
709 : }
710 :
711 : // So that the cross-matching of multiply-subscribed nodes is then cheap
712 0 : qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
713 : cmp_member_list_nodeid);
714 :
715 0 : for (int i = 0; i < left_list_entries; i++) {
716 0 : node_left(group_name->value, counter, local_nodeid, &left_list[i],
717 : sorted, member_list_entries);
718 : }
719 0 : free(sorted);
720 0 : sorted = NULL;
721 :
722 0 : for (int i = 0; i < joined_list_entries; i++) {
723 0 : crm_info("Group %s event %d: node %u pid %u joined%s",
724 : group_name->value, counter, joined_list[i].nodeid,
725 : joined_list[i].pid, cpgreason2str(joined_list[i].reason));
726 : }
727 :
728 0 : for (int i = 0; i < member_list_entries; i++) {
729 0 : crm_node_t *peer = pcmk__get_node(member_list[i].nodeid, NULL, NULL,
730 : pcmk__node_search_cluster_member);
731 :
732 0 : if (member_list[i].nodeid == local_nodeid
733 0 : && member_list[i].pid != getpid()) {
734 : // See the note in node_left()
735 0 : crm_warn("Group %s event %d: detected duplicate local pid %u",
736 : group_name->value, counter, member_list[i].pid);
737 0 : continue;
738 : }
739 0 : crm_info("Group %s event %d: %s (node %u pid %u) is member",
740 : group_name->value, counter, peer_name(peer),
741 : member_list[i].nodeid, member_list[i].pid);
742 :
743 : /* If the caller left auto-reaping enabled, this will also update the
744 : * state to member.
745 : */
746 0 : peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
747 : PCMK_VALUE_ONLINE);
748 :
749 0 : if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
750 : /* The node is a CPG member, but we currently think it's not a
751 : * cluster member. This is possible only if auto-reaping was
752 : * disabled. The node may be joining, and we happened to get the CPG
753 : * notification before the quorum notification; or the node may have
754 : * just died, and we are processing its final messages; or a bug
755 : * has affected the peer cache.
756 : */
757 0 : time_t now = time(NULL);
758 :
759 0 : if (peer->when_lost == 0) {
760 : // Track when we first got into this contradictory state
761 0 : peer->when_lost = now;
762 :
763 0 : } else if (now > (peer->when_lost + 60)) {
764 : // If it persists for more than a minute, update the state
765 0 : crm_warn("Node %u is member of group %s but was believed "
766 : "offline",
767 : member_list[i].nodeid, group_name->value);
768 0 : pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
769 : }
770 : }
771 :
772 0 : if (local_nodeid == member_list[i].nodeid) {
773 0 : found = true;
774 : }
775 : }
776 :
777 0 : if (!found) {
778 0 : crm_err("Local node was evicted from group %s", group_name->value);
779 0 : cpg_evicted = true;
780 : }
781 :
782 0 : counter++;
783 0 : }
784 :
785 : /*!
786 : * \brief Set the CPG deliver callback function for a cluster object
787 : *
788 : * \param[in,out] cluster Cluster object
789 : * \param[in] fn Deliver callback function to set
790 : *
791 : * \return Standard Pacemaker return code
792 : */
793 : int
794 6 : pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
795 : {
796 6 : if (cluster == NULL) {
797 2 : return EINVAL;
798 : }
799 4 : cluster->cpg.cpg_deliver_fn = fn;
800 4 : return pcmk_rc_ok;
801 : }
802 :
803 : /*!
804 : * \brief Set the CPG config change callback function for a cluster object
805 : *
806 : * \param[in,out] cluster Cluster object
807 : * \param[in] fn Configuration change callback function to set
808 : *
809 : * \return Standard Pacemaker return code
810 : */
811 : int
812 6 : pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
813 : {
814 6 : if (cluster == NULL) {
815 2 : return EINVAL;
816 : }
817 4 : cluster->cpg.cpg_confchg_fn = fn;
818 4 : return pcmk_rc_ok;
819 : }
820 :
821 : /*!
822 : * \brief Connect to Corosync CPG
823 : *
824 : * \param[in,out] cluster Initialized cluster object to connect
825 : *
826 : * \return Standard Pacemaker return code
827 : */
828 : int
829 0 : pcmk__cpg_connect(pcmk_cluster_t *cluster)
830 : {
831 : cs_error_t rc;
832 0 : int fd = -1;
833 0 : int retries = 0;
834 0 : uint32_t id = 0;
835 0 : crm_node_t *peer = NULL;
836 0 : cpg_handle_t handle = 0;
837 0 : const char *message_name = pcmk__message_name(crm_system_name);
838 0 : uid_t found_uid = 0;
839 0 : gid_t found_gid = 0;
840 0 : pid_t found_pid = 0;
841 : int rv;
842 :
843 0 : struct mainloop_fd_callbacks cpg_fd_callbacks = {
844 : .dispatch = pcmk_cpg_dispatch,
845 0 : .destroy = cluster->destroy,
846 : };
847 :
848 0 : cpg_model_v1_data_t cpg_model_info = {
849 : .model = CPG_MODEL_V1,
850 0 : .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
851 0 : .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
852 : .cpg_totem_confchg_fn = NULL,
853 : .flags = 0,
854 : };
855 :
856 0 : cpg_evicted = false;
857 0 : cluster->group.length = 0;
858 0 : cluster->group.value[0] = 0;
859 :
860 : /* group.value is char[128] */
861 0 : strncpy(cluster->group.value, message_name, 127);
862 0 : cluster->group.value[127] = 0;
863 0 : cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
864 :
865 0 : cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
866 0 : if (rc != CS_OK) {
867 0 : crm_err("Could not connect to the CPG API: %s (%d)",
868 : cs_strerror(rc), rc);
869 0 : goto bail;
870 : }
871 :
872 0 : rc = cpg_fd_get(handle, &fd);
873 0 : if (rc != CS_OK) {
874 0 : crm_err("Could not obtain the CPG API connection: %s (%d)",
875 : cs_strerror(rc), rc);
876 0 : goto bail;
877 : }
878 :
879 : /* CPG provider run as root (in given user namespace, anyway)? */
880 0 : if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
881 : &found_uid, &found_gid))) {
882 0 : crm_err("CPG provider is not authentic:"
883 : " process %lld (uid: %lld, gid: %lld)",
884 : (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
885 : (long long) found_uid, (long long) found_gid);
886 0 : rc = CS_ERR_ACCESS;
887 0 : goto bail;
888 0 : } else if (rv < 0) {
889 0 : crm_err("Could not verify authenticity of CPG provider: %s (%d)",
890 : strerror(-rv), -rv);
891 0 : rc = CS_ERR_ACCESS;
892 0 : goto bail;
893 : }
894 :
895 0 : id = pcmk__cpg_local_nodeid(handle);
896 0 : if (id == 0) {
897 0 : crm_err("Could not get local node id from the CPG API");
898 0 : goto bail;
899 :
900 : }
901 0 : cluster->nodeid = id;
902 :
903 0 : retries = 0;
904 0 : cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
905 0 : if (rc != CS_OK) {
906 0 : crm_err("Could not join the CPG group '%s': %d", message_name, rc);
907 0 : goto bail;
908 : }
909 :
910 0 : pcmk_cpg_handle = handle;
911 0 : cluster->cpg_handle = handle;
912 0 : mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
913 :
914 0 : bail:
915 0 : if (rc != CS_OK) {
916 0 : cpg_finalize(handle);
917 : // @TODO Map rc to more specific Pacemaker return code
918 0 : return ENOTCONN;
919 : }
920 :
921 0 : peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member);
922 0 : crm_update_peer_proc(__func__, peer, crm_proc_cpg, PCMK_VALUE_ONLINE);
923 0 : return pcmk_rc_ok;
924 : }
925 :
926 : /*!
927 : * \internal
928 : * \brief Disconnect from Corosync CPG
929 : *
930 : * \param[in,out] cluster Cluster object to disconnect
931 : */
932 : void
933 0 : pcmk__cpg_disconnect(pcmk_cluster_t *cluster)
934 : {
935 0 : pcmk_cpg_handle = 0;
936 0 : if (cluster->cpg_handle != 0) {
937 0 : crm_trace("Disconnecting CPG");
938 0 : cpg_leave(cluster->cpg_handle, &cluster->group);
939 0 : cpg_finalize(cluster->cpg_handle);
940 0 : cluster->cpg_handle = 0;
941 :
942 : } else {
943 0 : crm_info("No CPG connection");
944 : }
945 0 : }
946 :
947 : /*!
948 : * \internal
949 : * \brief Send string data via Corosync CPG
950 : *
951 : * \param[in] data Data to send
952 : * \param[in] local What to set as host "local" value (which is never used)
953 : * \param[in] node Cluster node to send message to
954 : * \param[in] dest Type of message to send
955 : *
956 : * \return \c true on success, or \c false otherwise
957 : */
958 : static bool
959 0 : send_cpg_text(const char *data, bool local, const crm_node_t *node,
960 : enum crm_ais_msg_types dest)
961 : {
962 : // @COMPAT Drop local argument when send_cluster_text is dropped
963 : static int msg_id = 0;
964 : static int local_pid = 0;
965 : static int local_name_len = 0;
966 : static const char *local_name = NULL;
967 :
968 0 : char *target = NULL;
969 : struct iovec *iov;
970 0 : pcmk__cpg_msg_t *msg = NULL;
971 :
972 0 : CRM_CHECK(dest != crm_msg_ais, return false);
973 :
974 0 : if (local_name == NULL) {
975 0 : local_name = pcmk__cluster_local_node_name();
976 : }
977 0 : if ((local_name_len == 0) && (local_name != NULL)) {
978 0 : local_name_len = strlen(local_name);
979 : }
980 :
981 0 : if (data == NULL) {
982 0 : data = "";
983 : }
984 :
985 0 : if (local_pid == 0) {
986 0 : local_pid = getpid();
987 : }
988 :
989 0 : msg = pcmk__assert_alloc(1, sizeof(pcmk__cpg_msg_t));
990 :
991 0 : msg_id++;
992 0 : msg->id = msg_id;
993 0 : msg->header.id = crm_class_cluster;
994 0 : msg->header.error = CS_OK;
995 :
996 0 : msg->host.type = dest;
997 0 : msg->host.local = local;
998 :
999 0 : if (node != NULL) {
1000 0 : if (node->uname != NULL) {
1001 0 : target = pcmk__str_copy(node->uname);
1002 0 : msg->host.size = strlen(node->uname);
1003 0 : memset(msg->host.uname, 0, MAX_NAME);
1004 0 : memcpy(msg->host.uname, node->uname, msg->host.size);
1005 :
1006 : } else {
1007 0 : target = crm_strdup_printf("%u", node->id);
1008 : }
1009 0 : msg->host.id = node->id;
1010 :
1011 : } else {
1012 0 : target = pcmk__str_copy("all");
1013 : }
1014 :
1015 0 : msg->sender.id = 0;
1016 0 : msg->sender.type = pcmk__cluster_parse_msg_type(crm_system_name);
1017 0 : msg->sender.pid = local_pid;
1018 0 : msg->sender.size = local_name_len;
1019 0 : memset(msg->sender.uname, 0, MAX_NAME);
1020 :
1021 0 : if ((local_name != NULL) && (msg->sender.size != 0)) {
1022 0 : memcpy(msg->sender.uname, local_name, msg->sender.size);
1023 : }
1024 :
1025 0 : msg->size = 1 + strlen(data);
1026 0 : msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
1027 :
1028 0 : if (msg->size < CRM_BZ2_THRESHOLD) {
1029 0 : msg = pcmk__realloc(msg, msg->header.size);
1030 0 : memcpy(msg->data, data, msg->size);
1031 :
1032 : } else {
1033 0 : char *compressed = NULL;
1034 0 : unsigned int new_size = 0;
1035 :
1036 0 : if (pcmk__compress(data, (unsigned int) msg->size, 0, &compressed,
1037 : &new_size) == pcmk_rc_ok) {
1038 :
1039 0 : msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
1040 0 : msg = pcmk__realloc(msg, msg->header.size);
1041 0 : memcpy(msg->data, compressed, new_size);
1042 :
1043 0 : msg->is_compressed = TRUE;
1044 0 : msg->compressed_size = new_size;
1045 :
1046 : } else {
1047 : // cppcheck seems not to understand the abort logic in pcmk__realloc
1048 : // cppcheck-suppress memleak
1049 0 : msg = pcmk__realloc(msg, msg->header.size);
1050 0 : memcpy(msg->data, data, msg->size);
1051 : }
1052 :
1053 0 : free(compressed);
1054 : }
1055 :
1056 0 : iov = pcmk__assert_alloc(1, sizeof(struct iovec));
1057 0 : iov->iov_base = msg;
1058 0 : iov->iov_len = msg->header.size;
1059 :
1060 0 : if (msg->compressed_size > 0) {
1061 0 : crm_trace("Queueing CPG message %u to %s "
1062 : "(%llu bytes, %d bytes compressed payload): %.200s",
1063 : msg->id, target, (unsigned long long) iov->iov_len,
1064 : msg->compressed_size, data);
1065 : } else {
1066 0 : crm_trace("Queueing CPG message %u to %s "
1067 : "(%llu bytes, %d bytes payload): %.200s",
1068 : msg->id, target, (unsigned long long) iov->iov_len,
1069 : msg->size, data);
1070 : }
1071 :
1072 0 : free(target);
1073 :
1074 0 : cs_message_queue = g_list_append(cs_message_queue, iov);
1075 0 : crm_cs_flush(&pcmk_cpg_handle);
1076 :
1077 0 : return true;
1078 : }
1079 :
1080 : /*!
1081 : * \internal
1082 : * \brief Send an XML message via Corosync CPG
1083 : *
1084 : * \param[in] msg XML message to send
1085 : * \param[in] node Cluster node to send message to
1086 : * \param[in] dest Type of message to send
1087 : *
1088 : * \return TRUE on success, otherwise FALSE
1089 : */
1090 : bool
1091 0 : pcmk__cpg_send_xml(const xmlNode *msg, const crm_node_t *node,
1092 : enum crm_ais_msg_types dest)
1093 : {
1094 0 : bool rc = true;
1095 0 : GString *data = g_string_sized_new(1024);
1096 :
1097 0 : pcmk__xml_string(msg, 0, data, 0);
1098 :
1099 0 : rc = send_cpg_text(data->str, false, node, dest);
1100 0 : g_string_free(data, TRUE);
1101 0 : return rc;
1102 : }
1103 :
1104 : // Deprecated functions kept only for backward API compatibility
1105 : // LCOV_EXCL_START
1106 :
1107 : #include <crm/cluster/compat.h>
1108 :
1109 : gboolean
1110 : cluster_connect_cpg(pcmk_cluster_t *cluster)
1111 : {
1112 : return pcmk__cpg_connect(cluster) == pcmk_rc_ok;
1113 : }
1114 :
1115 : void
1116 : cluster_disconnect_cpg(pcmk_cluster_t *cluster)
1117 : {
1118 : pcmk__cpg_disconnect(cluster);
1119 : }
1120 :
1121 : uint32_t
1122 : get_local_nodeid(cpg_handle_t handle)
1123 : {
1124 : return pcmk__cpg_local_nodeid(handle);
1125 : }
1126 :
1127 : void
1128 : pcmk_cpg_membership(cpg_handle_t handle,
1129 : const struct cpg_name *group_name,
1130 : const struct cpg_address *member_list,
1131 : size_t member_list_entries,
1132 : const struct cpg_address *left_list,
1133 : size_t left_list_entries,
1134 : const struct cpg_address *joined_list,
1135 : size_t joined_list_entries)
1136 : {
1137 : pcmk__cpg_confchg_cb(handle, group_name, member_list, member_list_entries,
1138 : left_list, left_list_entries,
1139 : joined_list, joined_list_entries);
1140 : }
1141 :
1142 : gboolean
1143 : send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
1144 : gboolean local, const crm_node_t *node,
1145 : enum crm_ais_msg_types dest)
1146 : {
1147 : switch (msg_class) {
1148 : case crm_class_cluster:
1149 : return send_cpg_text(data, local, node, dest);
1150 : default:
1151 : crm_err("Invalid message class: %d", msg_class);
1152 : return FALSE;
1153 : }
1154 : }
1155 :
1156 : char *
1157 : pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid,
1158 : void *content, uint32_t *kind, const char **from)
1159 : {
1160 : return pcmk__cpg_message_data(handle, nodeid, pid, content, kind, from);
1161 : }
1162 :
1163 : enum crm_ais_msg_types
1164 : text2msg_type(const char *text)
1165 : {
1166 : int type = crm_msg_none;
1167 :
1168 : CRM_CHECK(text != NULL, return type);
1169 : text = pcmk__message_name(text);
1170 : if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
1171 : type = crm_msg_ais;
1172 : } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
1173 : type = crm_msg_cib;
1174 : } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
1175 : type = crm_msg_crmd;
1176 : } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
1177 : type = crm_msg_te;
1178 : } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
1179 : type = crm_msg_pe;
1180 : } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
1181 : type = crm_msg_lrmd;
1182 : } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
1183 : type = crm_msg_stonithd;
1184 : } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
1185 : type = crm_msg_stonith_ng;
1186 : } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
1187 : type = crm_msg_attrd;
1188 :
1189 : } else {
1190 : /* This will normally be a transient client rather than
1191 : * a cluster daemon. Set the type to the pid of the client
1192 : */
1193 : int scan_rc = sscanf(text, "%d", &type);
1194 :
1195 : if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
1196 : /* Ensure it's sane */
1197 : type = crm_msg_none;
1198 : }
1199 : }
1200 : return type;
1201 : }
1202 :
1203 : // LCOV_EXCL_STOP
1204 : // End deprecated API
|