40#include <netinet/in.h>
53#define _COMPONENT_ "manager"
55#ifndef MAX_BACKOFF_CONNECT_RETRY
56 #define MAX_BACKOFF_CONNECT_RETRY 3600
60static void *connect_loop(
void *arg);
65manager_session_lookup_itad_id(
const manager_t *m, uint32_t itad,
68 for (
size_t i = 0; i < m->sessions_size; i++)
69 if (m->sessions[i]->
peer->itad == itad &&
70 m->sessions[i]->
id ==
id)
72 return m->sessions[i];
81 for (
size_t i = 0; i < m->sessions_size; i++)
84 return m->sessions[i];
92 const struct sockaddr_in6 *addr)
94 for (
size_t i = 0; i < m->sessions_size; i++)
96 memcmp(&m->sessions[i]->
peer->addr.sin6_addr, &addr->sin6_addr,
97 sizeof(
struct in6_addr)) == 0)
99 return m->sessions[i];
109send_notification_res(
int fd,
int res)
111 uint8_t code = NOTIF_CODE_ERROR_OPEN, subcode = 0;
114 code = NOTIF_CODE_ERROR_MSG;
115 subcode = NOTIF_SUBCODE_MSG_BAD_TYPE;
117 case ERROR_VERSION: subcode = NOTIF_SUBCODE_OPEN_UNSUP_VERSION;
break;
118 case ERROR_ITAD: subcode = NOTIF_SUBCODE_OPEN_BAD_ITAD;
break;
119 case ERROR_OPT: subcode = NOTIF_SUBCODE_OPEN_UNSUP_OPT;
break;
120 case ERROR_HOLD: subcode = NOTIF_SUBCODE_OPEN_BAD_HOLD;
break;
135 if (m->sessions_size + 1 > m->sessions_capacity) {
136 m->sessions = realloc(m->sessions,
137 2 *
sizeof(
session_t) * m->sessions_capacity);
138 m->sessions_capacity *= 2;
141 m->sessions[m->sessions_size++] = s;
149 for (
int i = 0; i < m->sessions_size; i++)
150 if (m->sessions[i] == s)
154 memcpy(&m->sessions[s_idx], &m->sessions[s_idx + 1],
155 sizeof(
session_t*) * (m->sessions_size - s_idx));
164compare_itad_id(uint32_t itad1, uint32_t id1, uint32_t itad2, uint32_t id2)
166 return (id1 < id2) || (id1 == id2 && (itad1 < itad2));
181 ERROR(
"colliding connections initiated by same ID");
186 return compare_itad_id(m->itad, m->id, s2->
peer->itad, s2->
id);
190 return compare_itad_id(s1->
peer->itad, s1->
id, m->itad, m->id);
201 int res = 0, toread = 0;
202 SOCK_TRY_RECV(s->
fd, recv_wnd, msg_open_t,
goto sock_error);
204 const msg_open_t *open = NULL;
207 res,
goto proto_error
210 DEBUG(
"OPEN(ver %d, hold %d, itad %d, id %s, opts len %d)",
211 open->open_ver, open->open_hold, open->open_itad,
212 id_str(open->open_id), open->open_opts_len);
215 if (open->open_itad != s->
peer->itad) {
216 ERROR(
"peer ITAD mismatch");
218 NOTIF_SUBCODE_OPEN_BAD_ITAD);
223 if ((open->open_itad == m->itad) && (open->open_id == m->id)) {
224 ERROR(
"duplicate ID in ITAD detected, colliding peer rejected");
226 NOTIF_SUBCODE_OPEN_BAD_ID);
231 session_t *coll_s = manager_session_lookup_itad_id(m, open->open_itad,
234 WARNING(
"collision detected with peer (%d,%d)", open->open_itad,
236 if (coll_s->
state == STATE_OPENCONFIRM) {
237 WARNING(
"collision resolved: kept higher ID or ITAD");
238 if (manager_collision_sessions_compare(m, s, coll_s)) {
245 manager_session_remove(m, coll_s);
247 }
else if (coll_s->
state == STATE_ESTABLISHED) {
248 WARNING(
"collision resolved: kept established connection");
256 if (init_s && (init_s->
initiated == 1) && (init_s != s)) {
257 INFO(
"closing old initiating session");
262 s->
id = open->open_id;
263 s->
hold = MIN(s->
peer->hold, open->open_hold);
268 manager_session_add(m, s);
270 size_t opts_toread = open->open_opts_len;
271 const void *opt_cur = open->open_opts;
272 while (opts_toread) {
273 SOCK_TRY_RECV(s->
fd, recv_wnd, msg_open_opt_t,
276 const msg_open_opt_t *opt = NULL;
279 res,
goto proto_error
287 switch (opt->opt_type) {
288 case OPEN_OPT_TYPE_CAPABILITY_INFO: {
289 size_t capinfos_toread = opt->opt_len;
290 const void *capinfo_cur = opt->opt_val;
291 while (capinfos_toread) {
292 SOCK_TRY_RECV(s->
fd, recv_wnd, capinfo_t,
295 const capinfo_t *capinfo = NULL;
298 res,
goto proto_error
302 capinfos_toread -= res;
304 DEBUG(
" capability info: %s[%d]",
306 capinfo->capinfo_len);
309 switch (capinfo->capinfo_code) {
310 case CAPINFO_CODE_ROUTETYPE: {
311 size_t routetypes_toread = capinfo->capinfo_len;
312 const void *routetype_cur = capinfo->capinfo_val;
315 sizeof(capinfo_routetype_t);
319 while (routetypes_toread) {
320 SOCK_TRY_RECV(s->
fd, recv_wnd,
321 capinfo_routetype_t,
goto sock_error);
323 const capinfo_routetype_t *routetype = NULL;
327 res,
goto proto_error
330 routetype_cur += res;
332 capinfos_toread -= res;
333 routetypes_toread -= res;
335 DEBUG(
" route type: %s:%s",
336 af_strs[routetype->routetype_af],
339 memcpy(crt_cur, routetype, res);
343 case CAPINFO_CODE_TRANSMODE: {
344 SOCK_TRY_RECV(s->
fd, recv_wnd,
351 res,
goto proto_error
355 capinfos_toread -= res;
357 DEBUG(
" transmission mode: %s",
360 if (*transmode != CAPINFO_TRANS_SEND_RECV &&
361 (*transmode == s->
peer->transmode))
363 WARNING(
" transmission mode mismatch");
365 NOTIF_SUBCODE_OPEN_CAP_MISMATCH);
373 capinfo_cur +=
sizeof(capinfo_t) + capinfo->capinfo_len;
378 opt_cur +=
sizeof(msg_open_opt_t) + opt->opt_len;
384 send_notification_res(s->
fd, res);
400peer_handshake(
void *arg)
405 int res = 0, toread = 0;
406 char buff[MAX_MSG_SIZE];
411 s->
peer->hold, m->itad, m->id,
414 res,
goto proto_error
418 send(s->
fd, buff, res, 0),
426 void *recv_wnd = buff;
428 SOCK_TRY_RECV(s->
fd, recv_wnd, msg_t,
goto sock_error);
430 const msg_t *msg = NULL;
433 res,
goto proto_error
439 switch (msg->msg_type) {
440 case MSG_TYPE_OPEN: {
441 if (handle_open(m, s, msg, recv_wnd) < 0)
446 res,
goto proto_error
450 send(s->
fd, buff, res, 0),
456 case MSG_TYPE_NOTIFICATION: {
457 SOCK_TRY_RECV(s->
fd, recv_wnd, msg_notif_t,
goto sock_error);
459 const msg_notif_t *notif= NULL;
462 res,
goto proto_error
465 DEBUG(
"error code: %s, error subcode: %s",
468 [notif->notif_error_subcode]);
470 case MSG_TYPE_KEEPALIVE: {
471 if (s->
state == STATE_OPENCONFIRM)
473 time_t now = time(NULL);
490 ERROR(
"unexpected %s message");
497 send_notification_res(s->
fd, res);
511listen_loop(
void *arg)
515 struct sockaddr_in6 peer_addr = { 0 };
516 socklen_t peer_addr_size =
sizeof(
struct sockaddr_in6);
520 int request_fd = accept(m->fd, (struct sockaddr*)&peer_addr,
522 if (request_fd < 0) {
523 ERROR(
"could not accept() peer: %s", strerror(errno));
531 INFO(
"rejecting unknown peer connection: %s",
532 sockaddr_str((
struct sockaddr *)&peer_addr));
537 DEBUG(
"accepted connection from %s, initiating peer",
538 sockaddr_str((
struct sockaddr*)&peer_addr));
548 void **handshake_data = malloc(2 *
sizeof(
void*));
549 handshake_data[0] = m;
550 handshake_data[1] = s;
552 pthread_create(&s->
thread, NULL, &peer_handshake, handshake_data);
561maintenance_loop(
void *arg)
564 char buff[MAX_MSG_SIZE];
567 time_t now = time(NULL);
569 for (
int i = 0; i < m->sessions_size; i++) {
571 if (s->
state != STATE_ESTABLISHED || s->
hold == 0)
577 DEBUG(
"sending KEEPALIVE");
584 NOTIF_CODE_ERROR_EXPIRED, 0, 0, NULL);
586 ERROR(
"session %s hold timer expired %ld",
616 m->listen_thread = 0;
617 m->maintenance_thread = 0;
625 m->sessions_size = 0;
626 m->sessions_capacity = 16;
627 m->sessions = malloc(m->sessions_capacity *
sizeof(
session_t*));
628 memset(m->sessions, 0, m->sessions_capacity *
sizeof(
session_t*));
630 m->connect_retry = TIMER_CONNECT_RETRY;
631 m->hold = TIMER_HOLD_TIME;
632 m->keepalive = TIMER_KEEPALIVE;
633 m->max_purge_time = TIMER_MAX_PURGE_TIME;
634 m->disable_time = TIMER_DISABLE_TIME;
635 m->min_itad_orig_int = TIMER_MIN_ITAD_ORIG_INT;
636 m->min_route_advert_int = TIMER_MIN_ROUTE_ADVERT_INT;
639 m->fd = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
641 ERROR(
"could not create listen socket: %s", strerror(errno));
645 if (bind(m->fd, (
const struct sockaddr*)listen_addr,
646 sizeof(
struct sockaddr_in6)) < 0)
648 ERROR(
"could not bind() listen socket: %s", strerror(errno));
652 if (listen(m->fd, SOMAXCONN) < 0) {
653 ERROR(
"could not listen() listen socket: %s", strerror(errno));
657 DEBUG(
"started session manager, listening at [%s]:%d",
658 sockaddr_str((
struct sockaddr *)listen_addr),
659 ntohs(listen_addr->sin6_port));
672connect_loop(
void *arg)
678 s->
fd = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
680 time_t connect_retry = m->connect_retry;
684 manager_session_remove(m, s);
690 int res = connect(s->
fd, (
struct sockaddr*)&s->
peer->addr,
691 sizeof(
struct sockaddr_in6));
694 ERROR(
"connect(): %s", strerror(errno));
697 uint64_t retry_left = connect_retry * 1e6;
703 if (connect_retry < MAX_BACKOFF_CONNECT_RETRY)
723 manager->hold, CAPINFO_TRANS_SEND_RECV);
729 s->
state = STATE_IDLE;
733 manager_session_add(manager, s);
735 void **connect_data = malloc(2 *
sizeof(
void*));
736 connect_data[0] = manager;
739 pthread_create(&s->
thread, NULL, &connect_loop, connect_data);
746 pthread_create(&manager->listen_thread, NULL, &listen_loop, manager);
747 pthread_create(&manager->maintenance_thread, NULL, &maintenance_loop,
755 shutdown(manager->fd, SHUT_RDWR);
756 pthread_join(manager->listen_thread, NULL);
757 pthread_join(manager->maintenance_thread, NULL);
763 DEBUG(
"beginning shutdown");
765 for (
size_t i = 0; i < manager->sessions_size; i++) {
766 if (!manager->sessions[i])
772 for (
size_t i = 0; i < manager->sessions_size; i++) {
773 if (!manager->sessions[i])
775 pthread_join(manager->sessions[i]->
thread, NULL);
778 for (
size_t i = 0; i < manager->sessions_size; i++) {
779 if (!manager->sessions[i])
786 DEBUG(
"shutdown complete");
793 free(manager->sessions);
const peer_t * locator_lookup(locator_t *locator, const struct sockaddr_in6 *addr)
Lookup peer by its address.
Definition locator.c:66
const peer_t * locator_add(locator_t *locator, const struct sockaddr_in6 *addr, uint32_t itad, uint16_t hold, capinfo_transmode_t transmode)
Add a known peer.
Definition locator.c:47
void locator_destroy(locator_t *locator)
Destroy locator object.
Definition locator.c:85
locator_t * locator_new()
Initialize singleton locator known peer list.
Definition locator.c:37
#define DEBUG(format,...)
log for debugging purposes
Definition logging.h:65
#define ERROR(format,...)
log an error
Definition logging.h:56
#define INFO(format,...)
log informational event
Definition logging.h:62
#define WARNING(format,...)
log a warning
Definition logging.h:59
void manager_stop(manager_t *manager)
Stop accept loop.
Definition manager.c:752
manager_t * manager_new(const struct sockaddr_in6 *listen_addr)
Create manager and bind socket.
Definition manager.c:604
void manager_destroy(manager_t *manager)
Destroy manager object.
Definition manager.c:790
session_t * manager_session_lookup_address(const manager_t *m, const struct sockaddr_in6 *addr)
Lookup session by locator peer.
Definition manager.c:91
void manager_shutdown(manager_t *manager)
Shut down manager and all sessions.
Definition manager.c:761
void manager_run(manager_t *manager)
Run accept loop in thread.
Definition manager.c:743
void manager_add_peer(manager_t *manager, const struct sockaddr_in6 *addr, uint32_t itad)
Add known peer to underlaying locator.
Definition manager.c:718
const char * notif_code_strs[]
NOTIFICATION error code strings.
Definition protocol.c:75
runtime_error_t new_msg_notif(void *buff, size_t len, uint8_t error_code, uint8_t error_subcode, size_t datalen, const void *data)
Serialize NOTIFICATION message.
Definition protocol.c:651
const size_t supported_routetypes_size
Supported routetypes constant size.
Definition protocol.c:178
runtime_error_t parse_msg_open_opt(const void *buff, size_t len, const msg_open_opt_t **opt_out)
Deserialize message OPEN optional parameter.
Definition protocol.c:733
runtime_error_t parse_msg_open(const void *buff, size_t len, const msg_open_t **open_out)
Deserialize message OPEN.
Definition protocol.c:710
const char ** notif_code_subcodes_strs[]
subcode strings per code class
Definition protocol.c:112
runtime_error_t parse_capinfo_transmode(const void *buff, size_t len, const capinfo_transmode_t **transmode_out)
Deserialize option transmission mode.
Definition protocol.c:797
runtime_error_t parse_capinfo(const void *buff, size_t len, const capinfo_t **capinfo_out)
Deserialize option capability information.
Definition protocol.c:750
const char * af_strs[]
Address family strings.
Definition protocol.c:65
const capinfo_routetype_t supported_routetypes[]
Supported routetypes constant.
Definition protocol.c:164
runtime_error_t parse_msg_notif(const void *buff, size_t len, const msg_notif_t **notif_out)
Deserialize NOTIFICATION message.
Definition protocol.c:1021
const char * app_proto_str(int app_proto)
Application protocol to string.
Definition protocol.c:122
const char * capinfo_transmode_strs[]
Capability information transmission mode types strings.
Definition protocol.c:58
runtime_error_t new_msg_keepalive(void *buff, size_t len)
Serialize KEEPALIVE message.
Definition protocol.c:632
runtime_error_t new_msg_open(void *buff, size_t len, uint16_t hold, uint32_t itad, uint32_t id, const capinfo_routetype_t *capinfo_routetypes, size_t routetypes_size, capinfo_transmode_t capinfo_transmode)
Serialize OPEN message.
Definition protocol.c:234
const char * open_opt_type_strs[]
Message type strings.
Definition protocol.c:45
runtime_error_t parse_capinfo_routetype(const void *buff, size_t len, const capinfo_routetype_t **routetype_out)
Deserialize option route type.
Definition protocol.c:770
const char * msg_type_strs[]
Message type strings.
Definition protocol.c:37
runtime_error_t parse_msg(const void *buff, size_t len, const msg_t **msg_out)
Deserialize message.
Definition protocol.c:690
const char * capinfo_code_strs[]
Capability information option type strings.
Definition protocol.c:51
Protocol definition header.
uint32_t capinfo_transmode_t
Capability information transmission mode.
Definition protocol.h:124
@ ERROR_ITAD
Definition protocol.h:431
@ ERROR_OPT
Definition protocol.h:438
@ ERROR_CAPINFO_CODE
Definition protocol.h:439
@ ERROR_MSGTYPE
Definition protocol.h:436
@ ERROR_HOLD
Definition protocol.h:430
@ ERROR_VERSION
Definition protocol.h:437
#define PROTO_TRY(o, res, a)
Try-Catch macro for serialization/deserialization functions.
Definition protocol.h:480
void session_shutdown(session_t *session)
Shutdown socket, terminate connection and thread.
Definition session.c:204
int send_notification(int fd, int code, int subcode)
Send notification helper.
Definition session.c:87
const char * id_str(uint32_t id)
LSID string.
Definition session.c:70
void session_change_state(session_t *s, session_state_t new_state)
Change session state.
Definition session.c:78
void * session_loop(void *arg)
Session loop.
Definition session.c:136
void session_destroy(session_t *session)
Destroy session object.
Definition session.c:213
Manager object.
Definition manager.h:38
int run
Definition manager.h:39
Known peer info object.
Definition locator.h:36
Session object.
Definition session.h:53
pthread_t thread
Definition session.h:54
int mark_stop_init
Definition session.h:57
const peer_t * peer
Definition session.h:66
time_t last_read_time
Definition session.h:71
time_t last_write_time
Definition session.h:72
time_t established_time
Definition session.h:70
uint16_t hold
Definition session.h:62
capinfo_routetype_t * routetypes
Definition session.h:76
int fd
Definition session.h:59
session_state_t state
Definition session.h:55
int initiated
Definition session.h:56
uint16_t keepalive
Definition session.h:63
uint32_t id
Definition session.h:67
size_t routetypes_count
Definition session.h:77
capinfo_transmode_t transmode
Definition session.h:75