40 #include <sys/types.h> 42 #include <sys/socket.h> 45 #include <sys/ioctl.h> 46 #include <sys/param.h> 47 #include <netinet/in.h> 48 #include <arpa/inet.h> 61 #include <qb/qblist.h> 62 #include <qb/qbdefs.h> 63 #include <qb/qbloop.h> 67 #define LOGSYS_UTILS_ONLY 1 74 #define MSG_NOSIGNAL 0 77 #define MCAST_SOCKET_BUFFER_SIZE (TRANSMITS_ALLOWED * UDP_RECEIVE_FRAME_SIZE_MAX) 78 #define NETIF_STATE_REPORT_UP 1 79 #define NETIF_STATE_REPORT_DOWN 2 81 #define BIND_STATE_UNBOUND 0 82 #define BIND_STATE_REGULAR 1 83 #define BIND_STATE_LOOPBACK 2 103 void (*totemudpu_deliver_fn) (
106 unsigned int msg_len,
109 void (*totemudpu_iface_change_fn) (
112 unsigned int ring_no);
114 void (*totemudpu_target_set_completed) (
void *context);
131 void (*totemudpu_log_printf) (
134 const char *
function,
144 struct iovec totemudpu_iov_recv;
146 struct qb_list_head member_list;
158 struct timeval stats_tv_start;
176 int local_loop_sock[2];
187 unsigned int msg_len;
191 static int totemudpu_build_sockets (
196 static int totemudpu_create_sending_socket(
203 static void totemudpu_start_merge_detect_timeout(
206 static void totemudpu_stop_merge_detect_timeout(
227 #define log_printf(level, format, args...) \ 229 instance->totemudpu_log_printf ( \ 230 level, instance->totemudpu_subsys_id, \ 231 __FUNCTION__, __FILE__, __LINE__, \ 232 (const char *)format, ##args); \ 234 #define LOGSYS_PERROR(err_num, level, fmt, args...) \ 236 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \ 237 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \ 238 instance->totemudpu_log_printf ( \ 239 level, instance->totemudpu_subsys_id, \ 240 __FUNCTION__, __FILE__, __LINE__, \ 241 fmt ": %s (%d)", ##args, _error_ptr, err_num); \ 246 const char *cipher_type,
247 const char *hash_type)
254 static inline void ucast_sendmsg (
258 unsigned int msg_len)
260 struct msghdr msg_ucast;
262 struct sockaddr_storage sockaddr;
267 iovec.iov_base = (
void *)msg;
268 iovec.iov_len = msg_len;
275 memset(&msg_ucast, 0,
sizeof(msg_ucast));
276 msg_ucast.msg_name = &sockaddr;
277 msg_ucast.msg_namelen = addrlen;
278 msg_ucast.msg_iov = (
void *)&iovec;
279 msg_ucast.msg_iovlen = 1;
280 #ifdef HAVE_MSGHDR_CONTROL 281 msg_ucast.msg_control = 0;
283 #ifdef HAVE_MSGHDR_CONTROLLEN 284 msg_ucast.msg_controllen = 0;
286 #ifdef HAVE_MSGHDR_FLAGS 287 msg_ucast.msg_flags = 0;
289 #ifdef HAVE_MSGHDR_ACCRIGHTS 290 msg_ucast.msg_accrights = NULL;
292 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN 293 msg_ucast.msg_accrightslen = 0;
300 msg_ucast.msg_name = NULL;
301 msg_ucast.msg_namelen = 0;
312 "sendmsg(ucast) failed (non-critical)");
316 static inline void mcast_sendmsg (
319 unsigned int msg_len,
322 struct msghdr msg_mcast;
325 struct sockaddr_storage sockaddr;
327 struct qb_list_head *
list;
330 iovec.iov_base = (
void *)msg;
331 iovec.iov_len = msg_len;
333 memset(&msg_mcast, 0,
sizeof(msg_mcast));
339 member = qb_list_entry (list,
351 msg_mcast.msg_name = &sockaddr;
352 msg_mcast.msg_namelen = addrlen;
353 msg_mcast.msg_iov = (
void *)&iovec;
354 msg_mcast.msg_iovlen = 1;
355 #ifdef HAVE_MSGHDR_CONTROL 356 msg_mcast.msg_control = 0;
358 #ifdef HAVE_MSGHDR_CONTROLLEN 359 msg_mcast.msg_controllen = 0;
361 #ifdef HAVE_MSGHDR_FLAGS 362 msg_mcast.msg_flags = 0;
364 #ifdef HAVE_MSGHDR_ACCRIGHTS 365 msg_mcast.msg_accrights = NULL;
367 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN 368 msg_mcast.msg_accrightslen = 0;
378 "sendmsg(mcast) failed (non-critical)");
394 msg_mcast.msg_name = NULL;
395 msg_mcast.msg_namelen = 0;
396 msg_mcast.msg_iov = (
void *)&iovec;
397 msg_mcast.msg_iovlen = 1;
398 #ifdef HAVE_MSGHDR_CONTROL 399 msg_mcast.msg_control = 0;
401 #ifdef HAVE_MSGHDR_CONTROLLEN 402 msg_mcast.msg_controllen = 0;
404 #ifdef HAVE_MSGHDR_FLAGS 405 msg_mcast.msg_flags = 0;
407 #ifdef HAVE_MSGHDR_ACCRIGHTS 408 msg_mcast.msg_accrights = NULL;
410 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN 411 msg_mcast.msg_accrightslen = 0;
418 "sendmsg(local mcast loop) failed (non-critical)");
442 totemudpu_stop_merge_detect_timeout(instance);
447 static int net_deliver_fn (
453 struct msghdr msg_recv;
455 struct sockaddr_storage system_from;
457 int truncated_packet;
465 msg_recv.msg_namelen =
sizeof (
struct sockaddr_storage);
466 msg_recv.msg_iov = iovec;
467 msg_recv.msg_iovlen = 1;
468 #ifdef HAVE_MSGHDR_CONTROL 469 msg_recv.msg_control = 0;
471 #ifdef HAVE_MSGHDR_CONTROLLEN 472 msg_recv.msg_controllen = 0;
474 #ifdef HAVE_MSGHDR_FLAGS 475 msg_recv.msg_flags = 0;
477 #ifdef HAVE_MSGHDR_ACCRIGHTS 478 msg_recv.msg_accrights = NULL;
480 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN 481 msg_recv.msg_accrightslen = 0;
484 bytes_received = recvmsg (fd, &msg_recv,
MSG_NOSIGNAL | MSG_DONTWAIT);
485 if (bytes_received == -1) {
491 truncated_packet = 0;
493 #ifdef HAVE_MSGHDR_FLAGS 494 if (msg_recv.msg_flags & MSG_TRUNC) {
495 truncated_packet = 1;
503 truncated_packet = 1;
507 if (truncated_packet) {
509 "Received too big message. This may be because something bad is happening" 510 "on the network (attack?), or you tried join more nodes than corosync is" 511 "compiled with (%u) or bug in the code (bad estimation of " 516 iovec->iov_len = bytes_received;
531 static int netif_determine (
541 interface_up, interface_num,
553 static void timer_function_netif_check_timeout (
563 netif_determine (instance,
566 &interface_up, &interface_num);
572 interface_up == 0) ||
576 interface_up == 1)) {
582 timer_function_netif_check_timeout,
583 &instance->timer_netif_check_timeout);
598 if (interface_up == 0) {
601 "One of your ip addresses are now bound to localhost. " 602 "Corosync would not work correctly.");
618 timer_function_netif_check_timeout,
619 &instance->timer_netif_check_timeout);
629 totemudpu_build_sockets (instance,
637 POLLIN, instance, net_deliver_fn);
648 "The network interface [%s] is now up.",
661 timer_function_netif_check_timeout,
662 &instance->timer_netif_check_timeout);
668 "The network interface is down.");
678 static void totemudpu_traffic_control_set(
struct totemudpu_instance *instance,
int sock)
683 if (setsockopt(sock, SOL_SOCKET, SO_PRIORITY, &prio,
sizeof(
int))) {
685 "Could not set traffic priority");
690 static int totemudpu_build_sockets_ip (
696 struct sockaddr_storage sockaddr;
699 unsigned int recvbuf_size;
700 unsigned int optlen =
sizeof (recvbuf_size);
701 unsigned int retries = 0;
714 res = fcntl (instance->
token_socket, F_SETFL, O_NONBLOCK);
717 "Could not set non-blocking operation on token socket");
727 res = bind (instance->
token_socket, (
struct sockaddr *)&sockaddr, addrlen);
732 "bind token socket failed");
752 res = setsockopt (instance->
token_socket, SOL_SOCKET, SO_RCVBUF,
753 &recvbuf_size, optlen);
756 "Could not set recvbuf size");
765 unsigned int *iface_count)
778 static int totemudpu_build_local_sockets(
782 unsigned int sendbuf_size;
783 unsigned int recvbuf_size;
784 unsigned int optlen =
sizeof (sendbuf_size);
790 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, instance->
local_loop_sock) == -1) {
796 for (i = 0; i < 2; i++) {
801 "Could not set non-blocking operation on multicast socket");
809 res = setsockopt (instance->
local_loop_sock[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
812 "Unable to set SO_RCVBUF size on UDP local mcast loop socket");
815 res = setsockopt (instance->
local_loop_sock[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
818 "Unable to set SO_SNDBUF size on UDP local mcast loop socket");
822 res = getsockopt (instance->
local_loop_sock[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
825 "Local receive multicast loop socket recv buffer size (%d bytes).", recvbuf_size);
828 res = getsockopt (instance->
local_loop_sock[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen);
831 "Local transmit multicast loop socket send buffer size (%d bytes).", sendbuf_size);
837 static int totemudpu_build_sockets (
849 res = netif_determine (instance,
861 res = totemudpu_build_sockets_ip (instance,
862 bindnet_address, bound_to, interface_num);
867 "Unable to create sockets, exiting");
872 totemudpu_traffic_control_set(instance, instance->
token_socket);
891 qb_loop_t *poll_handle,
900 unsigned int msg_len,
903 void (*iface_change_fn) (
906 unsigned int ring_no),
908 void (*mtu_changed) (
912 void (*target_set_completed) (
918 if (instance == NULL) {
922 totemudpu_instance_initialize (instance);
958 if (totemudpu_build_local_sockets(instance) == -1) {
967 POLLIN, instance, net_deliver_fn);
975 100*QB_TIME_NS_IN_MSEC,
977 timer_function_netif_check_timeout,
978 &instance->timer_netif_check_timeout);
980 totemudpu_start_merge_detect_timeout((
void*)instance);
982 *udpu_context = instance;
1006 if (processor_count == 1) {
1011 timer_function_netif_check_timeout,
1012 &instance->timer_netif_check_timeout);
1035 unsigned int msg_len)
1040 ucast_sendmsg (instance, &instance->
token_target, msg, msg_len);
1047 unsigned int msg_len)
1052 mcast_sendmsg (instance, msg, msg_len, 0);
1060 unsigned int msg_len)
1065 mcast_sendmsg (instance, msg, msg_len, 1);
1075 timer_function_netif_check_timeout (instance);
1092 struct qb_list_head *
list;
1096 qb_list_for_each(list, &(instance->
member_list)) {
1097 member = qb_list_entry (list,
1117 struct sockaddr_storage system_from;
1118 struct msghdr msg_recv;
1121 int msg_processed = 0;
1128 msg_recv.msg_namelen =
sizeof (
struct sockaddr_storage);
1130 msg_recv.msg_iovlen = 1;
1131 #ifdef HAVE_MSGHDR_CONTROL 1132 msg_recv.msg_control = 0;
1134 #ifdef HAVE_MSGHDR_CONTROLLEN 1135 msg_recv.msg_controllen = 0;
1137 #ifdef HAVE_MSGHDR_FLAGS 1138 msg_recv.msg_flags = 0;
1140 #ifdef HAVE_MSGHDR_ACCRIGHTS 1141 msg_recv.msg_accrights = NULL;
1143 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN 1144 msg_recv.msg_accrightslen = 0;
1147 for (i = 0; i < 2; i++) {
1163 ufd.events = POLLIN;
1164 nfds = poll (&ufd, 1, 0);
1165 if (nfds == 1 && ufd.revents & POLLIN) {
1166 res = recvmsg (sock, &msg_recv,
MSG_NOSIGNAL | MSG_DONTWAIT);
1173 }
while (nfds == 1);
1176 return (msg_processed);
1179 static int totemudpu_create_sending_socket(
1186 unsigned int sendbuf_size;
1187 unsigned int optlen =
sizeof (sendbuf_size);
1188 struct sockaddr_storage sockaddr;
1191 fd = socket (member->
family, SOCK_DGRAM, 0);
1194 "Could not create socket for new member");
1198 res = fcntl (fd, F_SETFL, O_NONBLOCK);
1201 "Could not set non-blocking operation on token socket");
1202 goto error_close_fd;
1210 res = setsockopt (fd, SOL_SOCKET, SO_SNDBUF,
1211 &sendbuf_size, optlen);
1214 "Could not set sendbuf size");
1224 res = bind (fd, (
struct sockaddr *)&sockaddr, addrlen);
1227 "bind token socket failed");
1228 goto error_close_fd;
1240 unsigned short ip_port,
1241 unsigned int iface_no)
1258 if (new_member == NULL) {
1262 memset(new_member, 0,
sizeof(*new_member));
1266 qb_list_init (&new_member->
list);
1269 new_member->
fd = totemudpu_create_sending_socket(udpu_context, member);
1281 struct qb_list_head *
list;
1289 qb_list_for_each(list, &(instance->
member_list)) {
1290 member = qb_list_entry (list,
1296 "removing UDPU member {%s}",
1299 if (member->
fd > 0) {
1301 "Closing socket to: {%s}",
1326 struct qb_list_head *
list;
1331 qb_list_for_each(list, &(instance->
member_list)) {
1332 member = qb_list_entry (list,
1336 if (member->
fd > 0) {
1340 member->
fd = totemudpu_create_sending_socket(udpu_context, &member->
member);
1347 static void timer_function_merge_detect_timeout (
1358 totemudpu_start_merge_detect_timeout(instance);
1361 static void totemudpu_start_merge_detect_timeout(
1370 timer_function_merge_detect_timeout,
1371 &instance->timer_merge_detect_timeout);
1375 static void totemudpu_stop_merge_detect_timeout(
1386 struct totem_config *totem_config)
unsigned int clear_node_high_bit
#define BIND_STATE_UNBOUND
#define NETIF_STATE_REPORT_UP
struct totem_config * totem_config
struct totem_ip_address member
unsigned int my_memb_entries
struct totem_interface * interfaces
void(* totemudpu_iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no)
The totem_ip_address struct.
void(*) void udpu_context)
struct qb_list_head member_list
const char * totemip_print(const struct totem_ip_address *addr)
struct totem_ip_address my_id
struct totemudpu_instance * instance
int totemudpu_iface_set(void *net_context, const struct totem_ip_address *local_addr, unsigned short ip_port, unsigned int iface_no)
#define NETIF_STATE_REPORT_DOWN
int totemip_compare(const void *a, const void *b)
#define log_printf(level, format, args...)
int totemudpu_processor_count_set(void *udpu_context, int processor_count)
int totemudpu_log_level_security
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
int totemudpu_member_remove(void *udpu_context, const struct totem_ip_address *token_target, int ring_no)
unsigned int downcheck_timeout
qb_loop_timer_handle timer_merge_detect_timeout
int send_merge_detect_message
#define totemip_nosigpipe(s)
int totemudpu_log_level_warning
int totemudpu_log_level_debug
struct iovec totemudpu_iov_recv
#define BIND_STATE_REGULAR
int totemip_iface_check(struct totem_ip_address *bindnet, struct totem_ip_address *boundto, int *interface_up, int *interface_num, int mask_high_bit)
#define UDP_RECEIVE_FRAME_SIZE_MAX
void totemudpu_buffer_release(void *ptr)
void * totemudpu_buffer_alloc(void)
unsigned int merge_detect_messages_sent_before_timeout
qb_loop_t * totemudpu_poll_handle
int totemudpu_mcast_noflush_send(void *udpu_context, const void *msg, unsigned int msg_len)
#define BIND_RETRIES_INTERVAL
struct totem_ip_address token_target
int totemudpu_crypto_set(void *udpu_context, const char *cipher_type, const char *hash_type)
#define LOGSYS_LEVEL_DEBUG
struct totem_interface * totem_interface
int totemudpu_token_send(void *udpu_context, const void *msg, unsigned int msg_len)
struct totem_ip_address boundto
size_t totemip_udpip_header_size(int family)
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
qb_loop_timer_handle timer_netif_check_timeout
void(* totemudpu_target_set_completed)(void *context)
#define BIND_STATE_LOOPBACK
#define MCAST_SOCKET_BUFFER_SIZE
#define PROCESSOR_COUNT_MAX
int totemudpu_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
char iov_buffer[UDP_RECEIVE_FRAME_SIZE_MAX]
int totemudpu_token_target_set(void *udpu_context, unsigned int nodeid)
int totemudpu_initialize(qb_loop_t *poll_handle, void **udpu_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
Create an instance.
int totemip_totemip_to_sockaddr_convert(struct totem_ip_address *ip_addr, uint16_t port, struct sockaddr_storage *saddr, int *addrlen)
struct totem_logging_configuration totem_logging_configuration
#define LOGSYS_LEVEL_NOTICE
int totemudpu_recv_mcast_empty(void *udpu_context)
struct srp_addr system_from
int totemudpu_log_level_error
int totemudpu_member_add(void *udpu_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
unsigned int merge_timeout
int totemudpu_reconfigure(void *udpu_context, struct totem_config *totem_config)
void totemudpu_net_mtu_adjust(void *udpu_context, struct totem_config *totem_config)
struct totem_ip_address bindnet
void(* totemudpu_deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
int totemudpu_mcast_flush_send(void *udpu_context, const void *msg, unsigned int msg_len)
int totemudpu_send_flush(void *udpu_context)
int totemudpu_finalize(void *udpu_context)
int totemudpu_recv_flush(void *udpu_context)
#define LOGSYS_PERROR(err_num, level, fmt, args...)
int totemudpu_member_list_rebind_ip(void *udpu_context)
void(* totemudpu_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
int totemudpu_iface_check(void *udpu_context)
int totemudpu_log_level_notice