42#include <netinet/in.h>
54#define _COMPONENT_ "manager"
56#ifndef MAX_BACKOFF_CONNECT_RETRY
57 #define MAX_BACKOFF_CONNECT_RETRY 3600
61static void *connect_loop(
void *arg);
66manager_session_lookup_itad_id(
const manager_t *m, uint32_t itad,
69 for (
size_t i = 0; i < m->sessions_size; i++)
70 if (m->sessions[i]->
peer->itad == itad &&
71 m->sessions[i]->
id ==
id)
73 return m->sessions[i];
82 for (
size_t i = 0; i < m->sessions_size; i++)
85 return m->sessions[i];
93 const struct sockaddr_in6 *addr)
95 for (
size_t i = 0; i < m->sessions_size; i++)
97 memcmp(&m->sessions[i]->
peer->addr.sin6_addr, &addr->sin6_addr,
98 sizeof(
struct in6_addr)) == 0)
100 return m->sessions[i];
110send_notification_res(
int fd,
int res)
112 uint8_t code = NOTIF_CODE_ERROR_OPEN, subcode = 0;
115 code = NOTIF_CODE_ERROR_MSG;
116 subcode = NOTIF_SUBCODE_MSG_BAD_TYPE;
118 case ERROR_VERSION: subcode = NOTIF_SUBCODE_OPEN_UNSUP_VERSION;
break;
119 case ERROR_ITAD: subcode = NOTIF_SUBCODE_OPEN_BAD_ITAD;
break;
120 case ERROR_OPT: subcode = NOTIF_SUBCODE_OPEN_UNSUP_OPT;
break;
121 case ERROR_HOLD: subcode = NOTIF_SUBCODE_OPEN_BAD_HOLD;
break;
136 if (m->sessions_size + 1 > m->sessions_capacity) {
137 m->sessions = realloc(m->sessions,
138 2 *
sizeof(
session_t) * m->sessions_capacity);
139 m->sessions_capacity *= 2;
142 m->sessions[m->sessions_size++] = s;
150 for (
int i = 0; i < m->sessions_size; i++)
151 if (m->sessions[i] == s)
156 memcpy(&m->sessions[s_idx], &m->sessions[s_idx + 1],
157 sizeof(
session_t*) * (m->sessions_size - s_idx - 1));
166compare_itad_id(uint32_t itad1, uint32_t id1, uint32_t itad2, uint32_t id2)
168 return (id1 < id2) || (id1 == id2 && (itad1 < itad2));
183 ERROR(
"colliding connections initiated by same ID");
188 return compare_itad_id(m->itad, m->id, s2->
peer->itad, s2->
id);
192 return compare_itad_id(s1->
peer->itad, s1->
id, m->itad, m->id);
203 int res = 0, toread = 0;
204 SOCK_TRY_RECV(s->
fd, recv_wnd, msg_open_t,
goto sock_error);
206 const msg_open_t *open = NULL;
209 res,
goto proto_error
212 DEBUG(
"OPEN(ver %d, hold %d, itad %d, id %s, opts len %d)",
213 open->open_ver, open->open_hold, open->open_itad,
214 id_str(open->open_id), open->open_opts_len);
217 if (open->open_itad != s->
peer->itad) {
218 ERROR(
"peer ITAD mismatch");
220 NOTIF_SUBCODE_OPEN_BAD_ITAD);
225 if ((open->open_itad == m->itad) && (open->open_id == m->id)) {
226 ERROR(
"duplicate ID in ITAD detected, colliding peer rejected");
228 NOTIF_SUBCODE_OPEN_BAD_ID);
233 session_t *coll_s = manager_session_lookup_itad_id(m, open->open_itad,
236 WARNING(
"collision detected with peer (%d, %s)", open->open_itad,
237 inaddr_str(open->open_id));
238 if (coll_s->
state == STATE_OPENCONFIRM) {
239 WARNING(
"collision resolved: kept higher ID or ITAD");
240 if (manager_collision_sessions_compare(m, s, coll_s)) {
246 manager_session_remove(m, coll_s);
248 }
else if (coll_s->
state == STATE_ESTABLISHED) {
249 WARNING(
"collision resolved: kept established connection");
257 if (init_s && (init_s->
initiated == 1) && (init_s != s)) {
258 INFO(
"closing old initiating session");
260 pthread_join(init_s->
thread, NULL);
264 s->
id = open->open_id;
265 s->
hold = MIN(s->
peer->hold, open->open_hold);
270 pthread_mutex_lock(&m->sessions_mutex);
271 manager_session_add(m, s);
272 pthread_mutex_unlock(&m->sessions_mutex);
275 size_t opts_toread = open->open_opts_len;
276 const void *opt_cur = open->open_opts;
277 while (opts_toread) {
278 SOCK_TRY_RECV(s->
fd, recv_wnd, msg_open_opt_t,
281 const msg_open_opt_t *opt = NULL;
284 res,
goto proto_error
292 switch (opt->opt_type) {
293 case OPEN_OPT_TYPE_CAPABILITY_INFO: {
294 size_t capinfos_toread = opt->opt_len;
295 const void *capinfo_cur = opt->opt_val;
296 while (capinfos_toread) {
297 SOCK_TRY_RECV(s->
fd, recv_wnd, capinfo_t,
300 const capinfo_t *capinfo = NULL;
303 res,
goto proto_error
307 capinfos_toread -= res;
309 DEBUG(
" capability info: %s[%d]",
311 capinfo->capinfo_len);
314 switch (capinfo->capinfo_code) {
315 case CAPINFO_CODE_ROUTETYPE: {
316 size_t routetypes_toread = capinfo->capinfo_len;
317 const void *routetype_cur = capinfo->capinfo_val;
320 sizeof(capinfo_routetype_t);
324 while (routetypes_toread) {
325 SOCK_TRY_RECV(s->
fd, recv_wnd,
326 capinfo_routetype_t,
goto sock_error);
328 const capinfo_routetype_t *routetype = NULL;
332 res,
goto proto_error
335 routetype_cur += res;
337 capinfos_toread -= res;
338 routetypes_toread -= res;
340 DEBUG(
" route type: %s:%s",
341 af_strs[routetype->routetype_af],
344 memcpy(crt_cur, routetype, res);
348 case CAPINFO_CODE_TRANSMODE: {
349 SOCK_TRY_RECV(s->
fd, recv_wnd,
356 res,
goto proto_error
360 capinfos_toread -= res;
362 DEBUG(
" transmission mode: %s",
365 if (*transmode != CAPINFO_TRANS_SEND_RECV &&
366 (*transmode == s->
peer->transmode))
368 WARNING(
" transmission mode mismatch");
370 NOTIF_SUBCODE_OPEN_CAP_MISMATCH);
378 capinfo_cur +=
sizeof(capinfo_t) + capinfo->capinfo_len;
383 opt_cur +=
sizeof(msg_open_opt_t) + opt->opt_len;
389 send_notification_res(s->
fd, res);
405peer_handshake(
void *arg)
410 int res = 0, toread = 0;
411 char buff[MAX_MSG_SIZE];
416 s->
peer->hold, m->itad, m->id,
419 res,
goto proto_error
423 send(s->
fd, buff, res, 0),
431 void *recv_wnd = buff;
433 SOCK_TRY_RECV(s->
fd, recv_wnd, msg_t,
goto sock_error);
435 const msg_t *msg = NULL;
438 res,
goto proto_error
444 switch (msg->msg_type) {
445 case MSG_TYPE_OPEN: {
446 if (handle_open(m, s, msg, recv_wnd) < 0)
451 res,
goto proto_error
455 send(s->
fd, buff, res, 0),
463 case MSG_TYPE_NOTIFICATION: {
464 SOCK_TRY_RECV(s->
fd, recv_wnd, msg_notif_t,
goto sock_error);
466 const msg_notif_t *notif= NULL;
469 res,
goto proto_error
472 DEBUG(
"error code: %s, error subcode: %s",
476 [notif->notif_error_subcode]
479 case MSG_TYPE_KEEPALIVE: {
480 if (s->
state == STATE_OPENCONFIRM)
497 if ((ssize_t)connect_loop(arg) < 0)
501 ERROR(
"unexpected %s message");
508 send_notification_res(s->
fd, res);
514 pthread_mutex_lock(&m->sessions_mutex);
515 manager_session_remove(m, s);
516 pthread_mutex_unlock(&m->sessions_mutex);
529listen_loop(
void *arg)
533 struct sockaddr_in6 peer_addr = { 0 };
534 socklen_t peer_addr_size =
sizeof(
struct sockaddr_in6);
538 int request_fd = accept(m->fd, (struct sockaddr*)&peer_addr,
540 if (request_fd < 0) {
541 ERROR(
"could not accept() peer: %s", strerror(errno));
549 INFO(
"rejecting unknown peer connection: %s",
550 sockaddr_str((
struct sockaddr *)&peer_addr));
555 DEBUG(
"accepted connection from %s, initiating peer",
556 sockaddr_str((
struct sockaddr*)&peer_addr));
562 s->
state = STATE_IDLE;
570 void **handshake_data = malloc(2 *
sizeof(
void*));
571 handshake_data[0] = m;
572 handshake_data[1] = s;
574 pthread_create(&s->
thread, NULL, &peer_handshake, handshake_data);
583maintenance_loop(
void *arg)
586 char buff[MAX_MSG_SIZE];
589 time_t now = time(NULL);
591 for (
int i = 0; i < m->sessions_size; i++) {
593 if (s->
state != STATE_ESTABLISHED || s->
hold == 0)
599 DEBUG(
"sending KEEPALIVE to %s", sockaddr6_str(&s->
peer->addr));
606 NOTIF_CODE_ERROR_EXPIRED, 0, 0, NULL);
608 ERROR(
"session %s hold timer expired %ld",
623update_loop(
void *arg)
628 pthread_mutex_lock(&m->update_mut);
629 pthread_cond_wait(&m->update_cond, &m->update_mut);
634 for (
size_t i = 0; i < m->sessions_size; i++)
637 pthread_mutex_unlock(&m->update_mut);
657 m->listen_thread = 0;
658 m->maintenance_thread = 0;
659 m->update_mut = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
660 m->update_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
670 m->sessions_size = 0;
671 m->sessions_capacity = 16;
672 m->sessions = malloc(m->sessions_capacity *
sizeof(
session_t*));
673 memset(m->sessions, 0, m->sessions_capacity *
sizeof(
session_t*));
674 m->sessions_mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
676 m->connect_retry = TIMER_CONNECT_RETRY;
677 m->hold = TIMER_HOLD_TIME;
678 m->keepalive = TIMER_KEEPALIVE;
679 m->max_purge_time = TIMER_MAX_PURGE_TIME;
680 m->disable_time = TIMER_DISABLE_TIME;
681 m->min_itad_orig_int = TIMER_MIN_ITAD_ORIG_INT;
682 m->min_route_advert_int = TIMER_MIN_ROUTE_ADVERT_INT;
684 m->def_local_pref = DEF_LOCAL_PREF;
685 m->def_metric = DEF_METRIC;
688 m->fd = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
690 ERROR(
"could not create listen socket: %s", strerror(errno));
694 if (bind(m->fd, (
const struct sockaddr*)listen_addr,
695 sizeof(
struct sockaddr_in6)) < 0)
697 ERROR(
"could not bind() listen socket: %s", strerror(errno));
701 if (listen(m->fd, SOMAXCONN) < 0) {
702 ERROR(
"could not listen() listen socket: %s", strerror(errno));
706 DEBUG(
"started session manager, listening at [%s]:%d",
707 sockaddr_str((
struct sockaddr *)listen_addr),
708 ntohs(listen_addr->sin6_port));
728connect_loop(
void *arg)
734 s->
fd = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
736 time_t connect_retry = m->connect_retry;
741 pthread_mutex_lock(&m->sessions_mutex);
742 manager_session_remove(m, s);
743 pthread_mutex_unlock(&m->sessions_mutex);
750 int res = connect(s->
fd, (
struct sockaddr*)&s->
peer->addr,
751 sizeof(
struct sockaddr_in6));
754 ERROR(
"connect(): %s", strerror(errno));
757 uint64_t retry_left = connect_retry * 1e6;
763 if (connect_retry < MAX_BACKOFF_CONNECT_RETRY)
783 manager->hold, CAPINFO_TRANS_SEND_RECV);
789 s->
state = STATE_IDLE;
796 pthread_mutex_lock(&manager->sessions_mutex);
797 manager_session_add(manager, s);
798 pthread_mutex_unlock(&manager->sessions_mutex);
800 void **connect_data = malloc(2 *
sizeof(
void*));
801 connect_data[0] = manager;
804 pthread_create(&s->
thread, NULL, &connect_loop, connect_data);
817 pthread_create(&manager->listen_thread, NULL, &listen_loop, manager);
818 pthread_create(&manager->maintenance_thread, NULL, &maintenance_loop,
820 pthread_create(&manager->update_thread, NULL, &update_loop, manager);
827 pthread_mutex_lock(&manager->update_mut);
828 pthread_cond_signal(&manager->update_cond);
829 pthread_mutex_unlock(&manager->update_mut);
836 shutdown(manager->fd, SHUT_RDWR);
837 pthread_join(manager->listen_thread, NULL);
838 pthread_join(manager->maintenance_thread, NULL);
840 pthread_mutex_lock(&manager->update_mut);
841 pthread_cond_signal(&manager->update_cond);
842 pthread_mutex_unlock(&manager->update_mut);
843 pthread_join(manager->update_thread, NULL);
849 DEBUG(
"beginning shutdown");
853 for (
size_t i = 0; i < manager->sessions_size; i++) {
854 if (!manager->sessions[i])
860 pthread_join(manager->sessions[i]->
thread, NULL);
864 DEBUG(
"shutdown complete");
873 free(manager->sessions);
peer_t * locator_add(locator_t *locator, const struct sockaddr_in6 *addr, uint32_t itad, uint16_t hold, capinfo_transmode_t transmode)
Add a known peer.
Definition locator.c:47
peer_t * locator_lookup(locator_t *locator, const struct sockaddr_in6 *addr)
Lookup peer by its address.
Definition locator.c:68
void locator_destroy(locator_t *locator)
Destroy locator object.
Definition locator.c:87
locator_t * locator_new()
Initialize singleton locator known peer list.
Definition locator.c:37
#define DEBUG(format,...)
log for debugging purposes
Definition logging.h:65
#define ERROR(format,...)
log an error
Definition logging.h:56
#define INFO(format,...)
log informational event
Definition logging.h:62
#define WARNING(format,...)
log a warning
Definition logging.h:59
void manager_stop(manager_t *manager)
Stop accept loop.
Definition manager.c:833
manager_t * manager_new(const struct sockaddr_in6 *listen_addr)
Create manager and bind socket.
Definition manager.c:645
void manager_destroy(manager_t *manager)
Destroy manager object.
Definition manager.c:868
session_t * manager_session_lookup_address(const manager_t *m, const struct sockaddr_in6 *addr)
Lookup session by locator peer.
Definition manager.c:92
void manager_shutdown(manager_t *manager)
Shut down manager and all sessions.
Definition manager.c:847
void manager_run(manager_t *manager)
Run accept loop in thread.
Definition manager.c:814
peer_t * manager_peer_find(manager_t *manager, const struct sockaddr_in6 *addr)
Find known peer by address.
Definition manager.c:808
void manager_peer_add(manager_t *manager, const struct sockaddr_in6 *addr, uint32_t itad)
Add known peer to underlaying locator.
Definition manager.c:778
void manager_schedule_update(manager_t *manager)
Schedule UPDATEs.
Definition manager.c:824
pib_t * pib_new()
Initialize PIB.
Definition pib.c:50
void pib_destroy(pib_t *pib)
Deinitialize PIB.
Definition pib.c:68
const char * notif_code_strs[]
NOTIFICATION error code strings.
Definition protocol.c:75
runtime_error_t new_msg_notif(void *buff, size_t len, uint8_t error_code, uint8_t error_subcode, size_t datalen, const void *data)
Serialize NOTIFICATION message.
Definition protocol.c:649
const size_t supported_routetypes_size
Supported routetypes constant size.
Definition protocol.c:181
runtime_error_t parse_msg_open_opt(const void *buff, size_t len, const msg_open_opt_t **opt_out)
Deserialize message OPEN optional parameter.
Definition protocol.c:731
runtime_error_t parse_msg_open(const void *buff, size_t len, const msg_open_t **open_out)
Deserialize message OPEN.
Definition protocol.c:708
const char ** notif_code_subcodes_strs[]
subcode strings per code class
Definition protocol.c:112
runtime_error_t parse_capinfo_transmode(const void *buff, size_t len, const capinfo_transmode_t **transmode_out)
Deserialize option transmission mode.
Definition protocol.c:795
runtime_error_t parse_capinfo(const void *buff, size_t len, const capinfo_t **capinfo_out)
Deserialize option capability information.
Definition protocol.c:748
const char * af_strs[]
Address family strings.
Definition protocol.c:65
const capinfo_routetype_t supported_routetypes[]
Supported routetypes constant.
Definition protocol.c:167
runtime_error_t parse_msg_notif(const void *buff, size_t len, const msg_notif_t **notif_out)
Deserialize NOTIFICATION message.
Definition protocol.c:1019
const char * app_proto_str(int app_proto)
Application protocol to string.
Definition protocol.c:125
const char * capinfo_transmode_strs[]
Capability information transmission mode types strings.
Definition protocol.c:58
runtime_error_t new_msg_keepalive(void *buff, size_t len)
Serialize KEEPALIVE message.
Definition protocol.c:630
runtime_error_t new_msg_open(void *buff, size_t len, uint16_t hold, uint32_t itad, uint32_t id, const capinfo_routetype_t *capinfo_routetypes, size_t routetypes_size, capinfo_transmode_t capinfo_transmode)
Serialize OPEN message.
Definition protocol.c:237
const char * open_opt_type_strs[]
Message type strings.
Definition protocol.c:45
runtime_error_t parse_capinfo_routetype(const void *buff, size_t len, const capinfo_routetype_t **routetype_out)
Deserialize option route type.
Definition protocol.c:768
const char * msg_type_strs[]
Message type strings.
Definition protocol.c:37
runtime_error_t parse_msg(const void *buff, size_t len, const msg_t **msg_out)
Deserialize message.
Definition protocol.c:688
const char * capinfo_code_strs[]
Capability information option type strings.
Definition protocol.c:51
Protocol definition header.
uint32_t capinfo_transmode_t
Capability information transmission mode.
Definition protocol.h:124
@ ERROR_ITAD
Definition protocol.h:434
@ ERROR_OPT
Definition protocol.h:441
@ ERROR_CAPINFO_CODE
Definition protocol.h:442
@ ERROR_MSGTYPE
Definition protocol.h:439
@ ERROR_HOLD
Definition protocol.h:433
@ ERROR_VERSION
Definition protocol.h:440
#define PROTO_TRY(o, res, a)
Try-Catch macro for serialization/deserialization functions.
Definition protocol.h:488
void session_shutdown(session_t *session)
Shutdown socket, terminate connection and thread.
Definition session.c:413
int send_notification(int fd, int code, int subcode)
Send notification helper.
Definition session.c:89
const char * id_str(uint32_t id)
LSID string.
Definition session.c:71
void session_update(const session_t *s, uint32_t local_id, uint32_t local_itad)
Update session.
Definition session.c:381
void session_change_state(session_t *s, session_state_t new_state)
Change session state.
Definition session.c:79
void * session_loop(void *arg)
Session loop.
Definition session.c:137
void session_destroy(session_t *session)
Destroy session object.
Definition session.c:423
Manager object.
Definition manager.h:41
int run
Definition manager.h:42
Known peer info object.
Definition locator.h:37
Session object.
Definition session.h:54
pthread_t thread
Definition session.h:55
int mark_stop_init
Definition session.h:58
const peer_t * peer
Definition session.h:68
time_t last_read_time
Definition session.h:73
time_t last_write_time
Definition session.h:74
uint16_t hold
Definition session.h:64
capinfo_routetype_t * routetypes
Definition session.h:81
int fd
Definition session.h:61
session_state_t state
Definition session.h:56
int initiated
Definition session.h:57
uint16_t keepalive
Definition session.h:65
table_t adj_trib_in
Definition session.h:85
uint32_t id
Definition session.h:69
size_t routetypes_count
Definition session.h:82
table_t adj_trib_out
Definition session.h:86
capinfo_transmode_t transmode
Definition session.h:80
routemap_t * routemap
Definition trib.h:103
void trib_destroy(trib_t *trib)
Deinit TRIB structure.
Definition trib.c:174
void trib_adj_pair_add(trib_t *trib, table_t *in, table_t *out)
Add and init pair of tables owned by caller.
Definition trib.c:138
void trib_update_adj_out(trib_t *trib, table_t *adj_trib_out)
Update an Adj-TRIB-Out.
Definition trib.c:339
trib_t * trib_new(uint32_t local_itad)
Initialize TRIB structure.
Definition trib.c:115
Telephony Routing Information Base.