diff --git a/agent/agent-priv.h b/agent/agent-priv.h index cea86b3..7dee415 100644 --- a/agent/agent-priv.h +++ b/agent/agent-priv.h @@ -61,6 +61,8 @@ #include "stun/usages/turn.h" #include "stun/usages/ice.h" +#include "memlist.h" + /* XXX: starting from ICE ID-18, Ta SHOULD now be set according * to session bandwidth -> this is not yet implemented in NICE */ @@ -125,6 +127,8 @@ struct _NiceAgent gboolean media_after_tick; /* Received media after keepalive tick */ gchar *software_attribute; /* SOFTWARE attribute */ gboolean reliable; /* property: reliable */ + + MemlistInterface **mem_list_interface; /* mem_list_interface: used for recvmmsg buffer access */ /* XXX: add pointer to internal data struct for ABI-safe extensions */ }; diff --git a/agent/agent.c b/agent/agent.c index f6fa8f0..bb9b29f 100644 --- a/agent/agent.c +++ b/agent/agent.c @@ -146,6 +146,8 @@ static gboolean priv_attach_stream_component (NiceAgent * agent, Stream * stream, Component * component); static void priv_detach_stream_component (NiceAgent * agent, Stream * stream, Component * component); +static void mem_list_interface_clean_up_and_replace_locked (NiceAgent * agent, + MemlistInterface ** replacement); void agent_lock (NiceAgent * agent) @@ -1020,6 +1022,48 @@ nice_agent_set_property (GObject * object, } +void nice_agent_set_mem_list_interface(NiceAgent * agent, MemlistInterface **ml_interface){ + /* Clean up and propagate new mem_list_interface */ + agent_lock (agent); + mem_list_interface_clean_up_and_replace_locked (agent, ml_interface); + agent->mem_list_interface = ml_interface; + agent_unlock (agent); +} + +static void +mem_list_interface_clean_up_and_replace_locked (NiceAgent * agent, + MemlistInterface ** replacement) +{ + (void) agent; + MemlistInterface **old_interface = agent->mem_list_interface; + /* We need to go trough all (udp) sockets and release any pending buffers */ + GSList *stream_index; + + for (stream_index = agent->streams; stream_index; + stream_index = stream_index->next) { + GSList *component_index; + Stream *stream = stream_index->data; + for (component_index = stream->components; component_index; + component_index = component_index->next) { + GSList *socket_index; + Component *component = component_index->data; + for (socket_index = component->sockets; socket_index; + socket_index = socket_index->next) { + NiceSocket *udpsocket = socket_index->data; + if (udpsocket->type == NICE_SOCKET_TYPE_UDP_BSD) + { + if ( old_interface != NULL){ + nice_udp_socket_buffers_and_interface_unref (udpsocket); + } + if (replacement != NULL) { + nice_udp_socket_interface_set (udpsocket, replacement); + } + } + } + } + } +} + static void log_local_candidate_event (NiceAgent * agent, NiceCandidate * local_candidate) { @@ -2321,14 +2365,15 @@ _nice_should_have_padding (NiceCompatibility compatibility) return TRUE; } } - -static gint -_nice_agent_recv (NiceAgent * agent, +/* Returns whether the buffer is processed (TRUE) or if it should be forwarded + to the client (FALSE) */ +static gboolean nice_agent_recv_process(NiceAgent * agent, + NiceSocket *socket, Stream * stream, Component * component, - NiceSocket * socket, guint buf_len, gchar * buf, NiceAddress * from) + gint *buf_len, gchar * buf, NiceAddress * from) { - gint len; + gint len = *buf_len; GList *item; gboolean has_padding = _nice_should_have_padding (agent->compatibility); NiceAddress stun_server; @@ -2336,30 +2381,6 @@ _nice_agent_recv (NiceAgent * agent, gchar *stun_server_ip = NULL; guint stun_server_port; - len = nice_socket_recv (socket, from, buf_len, buf); - - if (len <= 0) - return len; - -#ifndef NDEBUG - if (len > 0) { - gchar tmpbuf[INET6_ADDRSTRLEN]; - nice_address_to_string (from, tmpbuf); - GST_LOG_OBJECT (agent, - "Packet received on local %s socket %u from [%s]:%u (%u octets).", - socket_type_to_string (socket->type), - socket->fileno ? g_socket_get_fd (socket->fileno) : 0, tmpbuf, - nice_address_get_port (from), len); - } -#endif - - - if ((guint) len > buf_len) { - /* buffer is not big enough to accept this packet */ - /* XXX: test this case */ - return 0; - } - /* * If the packet comes from a relayed candidate then let the turn socket * have first crack at it @@ -2426,18 +2447,146 @@ _nice_agent_recv (NiceAgent * agent, } agent->media_after_tick = TRUE; + *buf_len = len; if (len > 0) { if (stun_message_validate_buffer_length ((uint8_t *) buf, (size_t) len, has_padding) != len) { /* If the retval is no 0, its not a valid stun packet, probably data */ - return len; + return FALSE; } if (conn_check_handle_inbound_stun (agent, stream, component, socket, from, buf, len)) /* handled STUN message */ - return 0; + return TRUE; + } + return FALSE; + +} + +/* Buffers and addresses should be of at least NICE_UDP_SOCKET_MMSG_TOTAL size */ +#ifdef NICE_UDP_SOCKET_HAVE_RECVMMSG +static gint +_nice_agent_recv_multiple (NiceAgent * agent, + Stream * stream, + Component * component, + NiceSocket * socket, + NiceMemoryBufferRef ** buffers, + NiceAddress * from_addresses) +{ + gint num_packets_received; + + /* Make sure the agent user has set a mem list interface, and that this socket + supports receiving multiple messages in one call. */ + if (agent->mem_list_interface == NULL){ + return -ENOTSUP; + } + + if (socket->type != NICE_SOCKET_TYPE_UDP_BSD){ + return -ENOTSUP; + } + + num_packets_received = nice_udp_socket_recvmmsg (socket); + + if (num_packets_received <= 0) + return num_packets_received; + + MemlistInterface **memlist_interface_ptr = agent->mem_list_interface; + MemlistInterface *memlist_interface = *(agent->mem_list_interface); + int out_pkt_idx = 0; + + for(int pkt_idx = 0; pkt_idx < num_packets_received; pkt_idx++) + { + gboolean handled_internally; + NiceMemoryBufferRef *retrieved_buffer; + retrieved_buffer = nice_udp_socket_packet_retrieve(socket, pkt_idx, &from_addresses[out_pkt_idx]); + g_assert(retrieved_buffer != NULL); + gsize buf_len = memlist_interface->buffer_size(memlist_interface_ptr, retrieved_buffer); + char* buf_contents = memlist_interface->buffer_contents(memlist_interface_ptr, retrieved_buffer); + +#ifndef NDEBUG + if (buf_len > 0) { + gchar tmpbuf[INET6_ADDRSTRLEN]; + nice_address_to_string (&from_addresses[out_pkt_idx], tmpbuf); + GST_INFO_OBJECT (agent, + "Packet received multiple on local %s socket %u from [%s]:%u (%lu octets, %lu packets).", + socket_type_to_string (socket->type), + socket->fileno ? g_socket_get_fd (socket->fileno) : 0, tmpbuf, + nice_address_get_port (&from_addresses[out_pkt_idx]), buf_len, num_packets_received); + } +#endif + /* Figure out if this is a buffer that we handle internally, or if we should forward it on to the client */ + gsize new_len = buf_len; + handled_internally = nice_agent_recv_process(agent, + socket, stream, component, &new_len, buf_contents, &from_addresses[out_pkt_idx]); + + if (buf_len != new_len) { + GST_INFO_OBJECT (agent, "Packet resized: %d -> %d", buf_len, new_len); + memlist_interface->buffer_resize(memlist_interface_ptr, retrieved_buffer, new_len); + } + if (handled_internally) { + /* Unref buffer */ + memlist_interface->buffer_return(memlist_interface_ptr, retrieved_buffer); + } + else{ + buffers[out_pkt_idx] = retrieved_buffer; + + out_pkt_idx++; + } + } + + nice_udp_socket_recvmmsg_structures_fill_new_buffers(socket, 0, num_packets_received); + + return out_pkt_idx; +} +#endif + + +static gint +_nice_agent_recv (NiceAgent * agent, + Stream * stream, + Component * component, + NiceSocket * socket, guint buf_len, gchar * buf, NiceAddress * from) +{ + gint len; + + len = nice_socket_recv (socket, from, buf_len, buf); + + if (len <= 0) + return len; + +#ifndef NDEBUG + if (len > 0) { + gchar tmpbuf[INET6_ADDRSTRLEN]; + nice_address_to_string (from, tmpbuf); + GST_LOG_OBJECT (agent, + "Packet received on local %s socket %u from [%s]:%u (%u octets).", + socket_type_to_string (socket->type), + socket->fileno ? g_socket_get_fd (socket->fileno) : 0, tmpbuf, + nice_address_get_port (from), len); + } +#endif + if ((guint) len > buf_len) { +#ifndef NDEBUG + gchar tmpbuf[INET6_ADDRSTRLEN]; + nice_address_to_string (from, tmpbuf); + GST_WARNING_OBJECT (agent, + "TOO BIG Packet received on local %s socket %u from [%s]:%u (%u octets).", + socket_type_to_string (socket->type), + socket->fileno ? g_socket_get_fd (socket->fileno) : 0, tmpbuf, + nice_address_get_port (from), len); +#endif + /* buffer is not big enough to accept this packet */ + /* XXX: test this case */ + return FALSE; + } + + if (nice_agent_recv_process(agent, socket, stream, component, &len, buf, from)) { + GST_LOG_OBJECT (agent, + "Handled packet as an internal STUN packet, don't pass it downstream to the client."); + /* Handeled stun, don't pass to the client */ + return FALSE; } /* unhandled STUN, pass to client */ @@ -2871,8 +3020,6 @@ nice_agent_g_source_cb (GSocket * gsocket, NiceAgent *agent = ctx->agent; Stream *stream = ctx->stream; Component *component = ctx->component; - NiceAddress from; - gchar buf[MAX_BUFFER_SIZE]; gint len; agent_lock (agent); @@ -2882,26 +3029,73 @@ nice_agent_g_source_cb (GSocket * gsocket, return FALSE; } - len = _nice_agent_recv (agent, stream, component, ctx->socket, - MAX_BUFFER_SIZE, buf, &from); +#ifdef NICE_UDP_SOCKET_HAVE_RECVMMSG + if (component->g_source_io_multiple_cb + && ctx->socket->type == NICE_SOCKET_TYPE_UDP_BSD + && agent->mem_list_interface != NULL) { + NiceMemoryBufferRef *buffers[NICE_UDP_SOCKET_MMSG_TOTAL]; + NiceAddress from_addresses[NICE_UDP_SOCKET_MMSG_TOTAL]; - if (len > 0 && component->g_source_io_cb) { - gpointer data = component->data; - gint sid = stream->id; - gint cid = component->id; - NiceAgentRecvFunc callback = component->g_source_io_cb; - /* Unlock the agent before calling the callback */ - agent_unlock (agent); - callback (agent, sid, cid, len, buf, data, &from, &ctx->socket->addr); - goto done; - } else if (len < 0) { - GSource *source = ctx->source; + len = _nice_agent_recv_multiple(agent, stream, component, ctx->socket, + buffers, from_addresses); - GST_WARNING_OBJECT (agent, "_nice_agent_recv returned %d, errno (%d) : %s", - len, errno, g_strerror (errno)); - component->gsources = g_slist_remove (component->gsources, source); - g_source_destroy (source); - g_source_unref (source); + if (len > 0) { + gpointer data = component->data; + gint sid = stream->id; + gint cid = component->id; + NiceAgentRecvMultipleFunc callback = component->g_source_io_multiple_cb; + /* Unlock the agent before calling the callback */ + agent_unlock (agent); + callback (agent, sid, cid, len, buffers, from_addresses, + &ctx->socket->addr, data); + goto done; + } else if (len < 0 && len != -ENOTSUP && len != -ENOMEM) { + /* If ENOTSUP is received, try to call the non mmsg receive function */ + GSource *source = ctx->source; + + GST_WARNING_OBJECT (agent, "_nice_agent_recv_multiple returned %d, errno (%d) : %s", + len, errno, g_strerror (errno)); + component->gsources = g_slist_remove (component->gsources, source); + g_source_destroy (source); + g_source_unref (source); + /* If a unknown error is received, skip the non mmsg receive function, + by unlocking the agent and going to done. */ + agent_unlock(agent); + goto done; + } + else + { + GST_WARNING_OBJECT (agent, "_nice_agent_recv_multiple skipped, returned %d", + len); + } + /* If receiving multiple packets were not supported, fall back to receiving single packets below */ + } +#endif + { + NiceAddress from; + gchar buf[MAX_BUFFER_SIZE]; + + len = _nice_agent_recv (agent, stream, component, ctx->socket, + MAX_BUFFER_SIZE, buf, &from); + + if (len > 0 && component->g_source_io_cb) { + gpointer data = component->data; + gint sid = stream->id; + gint cid = component->id; + NiceAgentRecvFunc callback = component->g_source_io_cb; + /* Unlock the agent before calling the callback */ + agent_unlock (agent); + callback (agent, sid, cid, len, buf, data, &from, &ctx->socket->addr); + goto done; + } else if (len < 0) { + GSource *source = ctx->source; + + GST_WARNING_OBJECT (agent, "_nice_agent_recv returned %d, errno (%d) : %s", + len, errno, g_strerror (errno)); + component->gsources = g_slist_remove (component->gsources, source); + g_source_destroy (source); + g_source_unref (source); + } } agent_unlock (agent); @@ -2990,7 +3184,10 @@ NICEAPI_EXPORT gboolean nice_agent_attach_recv (NiceAgent * agent, guint stream_id, guint component_id, - GMainContext * ctx, NiceAgentRecvFunc func, gpointer data) + GMainContext * ctx, + NiceAgentRecvFunc func, + NiceAgentRecvMultipleFunc multiple_func, + gpointer data) { Component *component = NULL; Stream *stream = NULL; @@ -3014,6 +3211,7 @@ nice_agent_attach_recv (NiceAgent * agent, ret = TRUE; component->g_source_io_cb = NULL; + component->g_source_io_multiple_cb = NULL; component->data = NULL; if (component->ctx) g_main_context_unref (component->ctx); @@ -3021,6 +3219,8 @@ nice_agent_attach_recv (NiceAgent * agent, if (func) { component->g_source_io_cb = func; + component->g_source_io_multiple_cb = multiple_func; + component->data = data; component->ctx = ctx; if (ctx) diff --git a/agent/agent.h b/agent/agent.h index c97447e..0ba4d30 100644 --- a/agent/agent.h +++ b/agent/agent.h @@ -112,6 +112,7 @@ typedef struct _NiceAgent NiceAgent; #include "address.h" #include "candidate.h" #include "debug.h" +#include "memlist.h" G_BEGIN_DECLS @@ -263,6 +264,31 @@ typedef void (*NiceAgentRecvFunc) ( NiceAgent *agent, guint stream_id, guint component_id, guint len, gchar *buf, gpointer user_data, const NiceAddress *from, const NiceAddress *to); +/** + * NiceAgentRecvMultipleFunc: + * @agent: The #NiceAgent Object + * @stream_id: The id of the stream + * @component_id: The id of the component of the stream + * which received the data + * @num_buffers: Count of buffers retrieved + * @buffers: Pointers to received buffers (of num_buffers length) + * @from: Pointers to from addresses (of num_buffers length) + * @to: Ponter to to address (i.e addr of agent, of 1 length) + * @user_data: The user data set in nice_agent_attach_recv() + * + * Callback function when data is received on a component + * In order to get the buffers that are retrieved, call + * nice_agent_memory_buffer_retrieve with indices from 0 to num_buffers-1 + * while executing this function. + */ +typedef void (*NiceAgentRecvMultipleFunc) ( + NiceAgent *agent, guint stream_id, guint component_id, guint num_buffers, + NiceMemoryBufferRef **buffers, const NiceAddress *from, const NiceAddress *to, gpointer user_data); + +/* This function should only be called inside the NiceAgentRecvMultipleFunc callback */ +NICE_EXPORT NiceMemoryBufferRef *nice_agent_memory_buffer_retrieve(NiceAgent *agent, + guint stream_id, guint component_id, gsize buffer_index, gpointer user_data, + const NiceAddress *from, const NiceAddress *to); /** * nice_agent_new: @@ -278,6 +304,8 @@ typedef void (*NiceAgentRecvFunc) ( NICE_EXPORT NiceAgent * nice_agent_new (GMainContext *ctx, NiceCompatibility compat, NiceCompatibility turn_compat); +NICE_EXPORT void +nice_agent_set_mem_list_interface(NiceAgent * agent, MemlistInterface **ml_interface); /** * nice_agent_add_local_address: (skip) @@ -763,6 +791,9 @@ nice_agent_restart_stream ( * @ctx: The Glib Mainloop Context to use for listening on the component * @func: The callback function to be called when data is received on * the stream's component + * @func: The callback function to be called when data spanning multiple buffers + * is received on the stream's component, or null if receiving mutiple buffers + * are not received by the agent user. * @data: user data associated with the callback * * Attaches the stream's component's sockets to the Glib Mainloop Context in @@ -777,6 +808,7 @@ nice_agent_attach_recv ( guint component_id, GMainContext *ctx, NiceAgentRecvFunc func, + NiceAgentRecvMultipleFunc multi_func, gpointer data); /** diff --git a/agent/component.h b/agent/component.h index fdf40ba..ab135df 100644 --- a/agent/component.h +++ b/agent/component.h @@ -118,6 +118,7 @@ struct _Component see ICE 11.1. "Sending Media" (ID-19) */ NiceCandidate *restart_candidate; /**< for storing active remote candidate during a restart */ NiceAgentRecvFunc g_source_io_cb; /**< function called on io cb */ + NiceAgentRecvMultipleFunc g_source_io_multiple_cb; /**< function called on io cb for multiple packets */ gpointer data; /**< data passed to the io function */ GMainContext *ctx; /**< context for data callbacks for this component */ diff --git a/gst/gstnicesrc.c b/gst/gstnicesrc.c index d8f82ec..618766d 100644 --- a/gst/gstnicesrc.c +++ b/gst/gstnicesrc.c @@ -41,22 +41,36 @@ #include #include "gstnicesrc.h" -#if GST_CHECK_VERSION (1,0,0) #include -#else -#include -#endif GST_DEBUG_CATEGORY_STATIC (nicesrc_debug); #define GST_CAT_DEFAULT nicesrc_debug -#define BUFFER_SIZE (65536) +//#define BUFFER_SIZE (65536) +#define BUFFER_SIZE (4096) -static GstFlowReturn -gst_nice_src_create ( - GstPushSrc *basesrc, - GstBuffer **buffer); +static gboolean gst_nice_src_query ( + GstBaseSrc * src, + GstQuery * query); + +static GstFlowReturn gst_nice_src_create ( + GstBaseSrc * bsrc, + guint64 offset, + guint length, + GstBuffer ** ret); + +static GstFlowReturn gst_nice_src_alloc ( + GstBaseSrc * bsrc, + guint64 offset, + guint length, + GstBuffer ** ret); + +static GstFlowReturn gst_nice_src_fill ( + GstBaseSrc * bsrc, + guint64 offset, + guint length, + GstBuffer * ret); static gboolean gst_nice_src_unlock ( @@ -66,9 +80,14 @@ static gboolean gst_nice_src_unlock_stop ( GstBaseSrc *basesrc); +static gboolean +gst_nice_src_decide_allocation ( + GstBaseSrc * bsrc, + GstQuery * query); + static gboolean gst_nice_src_negotiate ( - GstBaseSrc * src); + GstBaseSrc * basesrc); static void gst_nice_src_set_property ( @@ -87,12 +106,32 @@ gst_nice_src_get_property ( static void gst_nice_src_dispose (GObject *object); +static +void gst_nice_src_clean_up_pool(GstNiceSrc * src); static GstStateChangeReturn gst_nice_src_change_state ( GstElement * element, GstStateChange transition); + +static void gst_nice_src_mem_buffer_ref_array_clear(void *element); + +NiceMemoryBufferRef* gst_nice_src_buffer_get(MemlistInterface **ml_interface, gsize size); +void gst_nice_src_buffer_return(MemlistInterface **ml_interface, NiceMemoryBufferRef* buffer); +char* gst_nice_src_buffer_contents(MemlistInterface **ml_interface, NiceMemoryBufferRef* buffer); +gsize gst_nice_src_buffer_size(MemlistInterface **ml_interface, NiceMemoryBufferRef* buffer); +void gst_nice_src_buffer_resize(MemlistInterface **ml_interface, + NiceMemoryBufferRef* buffer, gsize new_size); + +static const MemlistInterface nice_src_mem_interface = { + .buffer_get = gst_nice_src_buffer_get, + .buffer_return = gst_nice_src_buffer_return, + .buffer_contents = gst_nice_src_buffer_contents, + .buffer_size = gst_nice_src_buffer_size, + .buffer_resize = gst_nice_src_buffer_resize, +}; + static GstStaticPadTemplate gst_nice_src_src_template = GST_STATIC_PAD_TEMPLATE ( "src", @@ -102,7 +141,7 @@ GST_STATIC_PAD_TEMPLATE ( #define gst_nice_src_parent_class parent_class -G_DEFINE_TYPE (GstNiceSrc, gst_nice_src, GST_TYPE_PUSH_SRC); +G_DEFINE_TYPE (GstNiceSrc, gst_nice_src, GST_TYPE_BASE_SRC); enum { @@ -151,7 +190,6 @@ gst_nice_src_handle_event (GstBaseSrc *basesrc, GstEvent * event) static void gst_nice_src_class_init (GstNiceSrcClass *klass) { - GstPushSrcClass *gstpushsrc_class; GstBaseSrcClass *gstbasesrc_class; GstElementClass *gstelement_class; GObjectClass *gobject_class; @@ -159,15 +197,21 @@ gst_nice_src_class_init (GstNiceSrcClass *klass) GST_DEBUG_CATEGORY_INIT (nicesrc_debug, "nicesrc", 0, "libnice source"); - gstpushsrc_class = (GstPushSrcClass *) klass; - gstpushsrc_class->create = GST_DEBUG_FUNCPTR (gst_nice_src_create); - gstbasesrc_class = (GstBaseSrcClass *) klass; gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_nice_src_unlock); gstbasesrc_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_nice_src_unlock_stop); gstbasesrc_class->negotiate = GST_DEBUG_FUNCPTR (gst_nice_src_negotiate); +#ifdef NICE_UDP_SOCKET_HAVE_RECVMMSG + gstbasesrc_class->decide_allocation = GST_DEBUG_FUNCPTR (gst_nice_src_decide_allocation); +#endif gstbasesrc_class->event = GST_DEBUG_FUNCPTR (gst_nice_src_handle_event); + /* Reimplementation of gstpushsrc in order to support buffer lists */ + gstbasesrc_class->create = GST_DEBUG_FUNCPTR (gst_nice_src_create); + gstbasesrc_class->alloc = GST_DEBUG_FUNCPTR (gst_nice_src_alloc); + gstbasesrc_class->fill = GST_DEBUG_FUNCPTR (gst_nice_src_fill); + gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_nice_src_query); + gobject_class = (GObjectClass *) klass; gobject_class->set_property = gst_nice_src_set_property; gobject_class->get_property = gst_nice_src_get_property; @@ -178,11 +222,7 @@ gst_nice_src_class_init (GstNiceSrcClass *klass) gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&gst_nice_src_src_template)); -#if GST_CHECK_VERSION (1,0,0) gst_element_class_set_metadata (gstelement_class, -#else - gst_element_class_set_details_simple (gstelement_class, -#endif "ICE source", "Source", "Interactive UDP connectivity establishment", @@ -345,8 +385,32 @@ gst_nice_src_init (GstNiceSrc *src) gst_nice_src_nice_address_compare, gst_nice_src_destroy_hash_key, gst_object_unref); +#ifdef NICE_UDP_SOCKET_HAVE_RECVMMSG + src->mem_list_interface.function_interface = &nice_src_mem_interface; + src->mem_list_interface.gst_src = src; + src->mem_list_interface.temp_refs = g_array_sized_new(FALSE, TRUE, + sizeof(GstNiceSrcMemoryBufferRef*), GST_NICE_SRC_MEM_BUFFERS_PREALLOCATED); + g_array_set_clear_func(src->mem_list_interface.temp_refs, &gst_nice_src_mem_buffer_ref_array_clear); + src->mem_list_interface_set = FALSE; +#endif } +static void gst_nice_buffer_address_meta_add( + GstNiceSrc *nicesrc, + const NiceAddress *from, + GstBuffer* buffer + ){ + if (from != NULL) { + GSocketAddress * saddr = gst_nice_src_gsocket_addr_create_or_retrieve( + nicesrc, from); + if (saddr != NULL) { + gst_buffer_add_net_address_meta (buffer, saddr); + g_object_unref (saddr); + } else { + GST_ERROR_OBJECT (nicesrc, "Could not convert address to GSocketAddress"); + } + } +} static void gst_nice_src_read_callback (NiceAgent *agent, guint stream_id, @@ -360,9 +424,6 @@ gst_nice_src_read_callback (NiceAgent *agent, GstBaseSrc *basesrc = GST_BASE_SRC (data); GstNiceSrc *nicesrc = GST_NICE_SRC (basesrc); GstBaseSrcClass *bclass = GST_BASE_SRC_GET_CLASS (basesrc); -#if !GST_CHECK_VERSION (1,0,0) - GstNetBuffer *netbuffer = NULL; -#endif GstBuffer *buffer = NULL; (void)stream_id; @@ -370,7 +431,6 @@ gst_nice_src_read_callback (NiceAgent *agent, GST_LOG_OBJECT (agent, "Got buffer, getting out of the main loop"); -#if GST_CHECK_VERSION (1,0,0) (void)to; GstFlowReturn status = bclass->alloc(basesrc, 0, len, &buffer); if (status != GST_FLOW_OK) @@ -379,55 +439,54 @@ gst_nice_src_read_callback (NiceAgent *agent, ", allocate using local allocator instead"); buffer = gst_buffer_new_allocate (NULL, len, NULL); } - gst_buffer_fill (buffer, 0, buf, len); - if (from != NULL) { - GSocketAddress * saddr = gst_nice_src_gsocket_addr_create_or_retrieve( - nicesrc, from); - if (saddr != NULL) { - gst_buffer_add_net_address_meta (buffer, saddr); - g_object_unref (saddr); - } else { - GST_ERROR_OBJECT (nicesrc, "Could not convert address to GSocketAddress"); - } + if (gst_buffer_get_size(buffer) != len) + { + gst_buffer_resize(buffer, 0, len); + g_assert(gst_buffer_get_size(buffer) == len); } -#else - if (from != NULL && to != NULL) { - netbuffer = gst_netbuffer_new(); - GST_BUFFER_DATA(netbuffer) = g_memdup(buf, len); - GST_BUFFER_MALLOCDATA(netbuffer) = GST_BUFFER_DATA(netbuffer); - GST_BUFFER_SIZE(netbuffer) = len; + gst_buffer_fill (buffer, 0, buf, len); - switch (from->s.addr.sa_family) { - case AF_INET: - { - gst_netaddress_set_ip4_address (&netbuffer->from, from->s.ip4.sin_addr.s_addr, from->s.ip4.sin_port); - gst_netaddress_set_ip4_address (&netbuffer->to, to->s.ip4.sin_addr.s_addr, to->s.ip4.sin_port); - } - break; - case AF_INET6: - { - gst_netaddress_set_ip6_address (&netbuffer->from, (guint8 *)(&from->s.ip6.sin6_addr), from->s.ip6.sin6_port); - gst_netaddress_set_ip6_address (&netbuffer->to, (guint8 *)(&to->s.ip6.sin6_addr), to->s.ip6.sin6_port); - } - break; - default: - GST_ERROR_OBJECT (nicesrc, "Unknown address family"); - break; - } + gst_nice_buffer_address_meta_add(nicesrc, from, buffer); + g_queue_push_tail (nicesrc->outbufs, buffer); - buffer = GST_BUFFER_CAST(netbuffer); - } else { - buffer = gst_buffer_new_and_alloc (len); - memcpy (GST_BUFFER_DATA (buffer), buf, len); + g_main_loop_quit (nicesrc->mainloop); +} +#ifdef NICE_UDP_SOCKET_HAVE_RECVMMSG +/* NB: This function does not support pre 1.0 gstreamer */ +static void +gst_nice_src_read_multiple_callback (NiceAgent *agent, + guint stream_id, + guint component_id, + guint num_buffers, + NiceMemoryBufferRef **buffers, + const NiceAddress *from, + const NiceAddress *to, + gpointer data) +{ + GstBaseSrc *basesrc = GST_BASE_SRC (data); + GstNiceSrc *nicesrc = GST_NICE_SRC (basesrc); + + GstBufferList *outlist = gst_buffer_list_new_sized (num_buffers); + for (int i = 0; i < num_buffers; ++i) { + GstNiceSrcMemoryBufferRef *buffer_ref = (GstNiceSrcMemoryBufferRef*)buffers[i]; + GstBuffer *gbuffer = buffer_ref->buffer; + gst_buffer_unmap(gbuffer, &(buffer_ref->buf_map)); + gst_nice_buffer_address_meta_add(nicesrc, &from[i], gbuffer); + gst_buffer_list_insert (outlist, -1, gbuffer); + buffer_ref->buffer = NULL; + gst_nice_src_buffer_return((MemlistInterface**)&(nicesrc->mem_list_interface.function_interface), buffer_ref); } -#endif - g_queue_push_tail (nicesrc->outbufs, buffer); + + GST_LOG_OBJECT (agent, "Got multiple buffers (%d), getting out of the main loop", num_buffers); + + g_queue_push_tail (nicesrc->outbufs, outlist); g_main_loop_quit (nicesrc->mainloop); } +#endif static gboolean gst_nice_src_unlock_idler (gpointer data) @@ -497,7 +556,7 @@ gst_nice_src_negotiate (GstBaseSrc * basesrc) gboolean result = FALSE; caps = gst_pad_get_allowed_caps (GST_BASE_SRC_PAD (basesrc)); - if (!caps) + if (caps == NULL) caps = gst_pad_get_pad_template_caps (GST_BASE_SRC_PAD (basesrc)); GST_OBJECT_LOCK (src); @@ -512,24 +571,136 @@ gst_nice_src_negotiate (GstBaseSrc * basesrc) result = TRUE; } else { GstBaseSrcClass *bclass = GST_BASE_SRC_GET_CLASS (basesrc); - if (bclass->fixate) + if (bclass->fixate){ + GstCaps *oldcaps = caps; caps = bclass->fixate (basesrc, caps); + } GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps); if (gst_caps_is_fixed (caps)) { result = gst_base_src_set_caps (basesrc, caps); } } - gst_caps_unref (caps); } else { GST_DEBUG_OBJECT (basesrc, "no common caps"); } + if (caps != NULL){ + gst_caps_unref (caps); + } return result; } +#ifdef NICE_UDP_SOCKET_HAVE_RECVMMSG +static gboolean +gst_nice_src_decide_allocation (GstBaseSrc * bsrc, GstQuery * query) +{ + GstBufferPool *pool; + gboolean update; + GstStructure *config; + GstCaps *caps = NULL; + guint size = BUFFER_SIZE; + + GstNiceSrc *src = GST_NICE_SRC_CAST (bsrc); + + if (gst_query_get_n_allocation_pools (query) > 0) { + update = TRUE; + } else { + update = FALSE; + } + + pool = gst_buffer_pool_new (); + + config = gst_buffer_pool_get_config (pool); + + gst_query_parse_allocation (query, &caps, NULL); + + gst_buffer_pool_config_set_params (config, caps, size, 0, 0); + + gst_buffer_pool_set_config (pool, config); + + if (update) + gst_query_set_nth_allocation_pool (query, 0, pool, size, 0, 0); + else + gst_query_add_allocation_pool (query, pool, size, 0, 0); + + gst_nice_src_clean_up_pool(src); + + src->mem_list_interface.pool = pool; + + if (src->agent){ + if (src->mem_list_interface_set == FALSE) + { + nice_agent_set_mem_list_interface(src->agent, (MemlistInterface**)&src->mem_list_interface); + src->mem_list_interface_set = TRUE; + } + } + + return TRUE; +} + +static void gst_nice_src_clean_up_pool(GstNiceSrc * src) +{ + if (src->mem_list_interface.pool != NULL) { + /* The entries that already exists will be with the old pool until they die + as we have no way of moving them to the new pool. */ + //gst_buffer_pool_set_active (src->mem_list_interface.pool, FALSE); + gst_object_unref (src->mem_list_interface.pool); + src->mem_list_interface.pool = NULL; + } +} +#endif + +static gboolean +gst_nice_src_query (GstBaseSrc * src, GstQuery * query) +{ + gboolean ret; + + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_SCHEDULING: + { + /* a pushsrc can by default never operate in pull mode override + * if you want something different. */ + gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEQUENTIAL, 1, -1, + 0); + gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH); + + ret = TRUE; + break; + } + default: + ret = GST_BASE_SRC_CLASS (parent_class)->query (src, query); + break; + } + return ret; +} +static void gst_nice_src_set_timestamp(GstBaseSrc * basesrc, GstBuffer *buffer) +{ + GstClock *clock; + GstClockTime running_time, now; + GstClockTime base_time = GST_ELEMENT_CAST (basesrc)->base_time; + + /* get clock, if no clock, we can't sync or do timestamps */ + if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL) + { + return; + } + else + { + gst_object_ref (clock); + } + + now = gst_clock_get_time (clock); + running_time = now - base_time; + + GST_BUFFER_DTS (buffer) = running_time; + + GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT, + GST_TIME_ARGS (running_time)); + + gst_object_unref(clock); +} static GstFlowReturn -gst_nice_src_create ( - GstPushSrc *basesrc, - GstBuffer **buffer) +gst_nice_src_create (GstBaseSrc * basesrc, guint64 offset, guint length, + GstBuffer ** ret) { GstNiceSrc *nicesrc = GST_NICE_SRC (basesrc); @@ -537,33 +708,66 @@ gst_nice_src_create ( GST_OBJECT_LOCK (basesrc); if (nicesrc->unlocked) { + GST_LOG_OBJECT (nicesrc, "Source unlinkend, transitioning to flushing"); GST_OBJECT_UNLOCK (basesrc); -#if GST_CHECK_VERSION (1,0,0) return GST_FLOW_FLUSHING; -#else - return GST_FLOW_WRONG_STATE; -#endif } GST_OBJECT_UNLOCK (basesrc); if (g_queue_is_empty (nicesrc->outbufs)) g_main_loop_run (nicesrc->mainloop); - *buffer = g_queue_pop_head (nicesrc->outbufs); - if (*buffer != NULL) { - GST_LOG_OBJECT (nicesrc, "Got buffer, pushing"); + gpointer bufptr = g_queue_pop_head (nicesrc->outbufs); + + if (bufptr != NULL) { + if (GST_IS_BUFFER_LIST(bufptr)){ + *ret = NULL; + guint bl_len = gst_buffer_list_length(bufptr); + for(int i = 0; i < bl_len; i++){ + GstBuffer *buf = gst_buffer_list_get(bufptr, i); + gst_nice_src_set_timestamp(basesrc, buf); + } + gst_base_src_submit_buffer_list (basesrc, bufptr); + GST_LOG_OBJECT (nicesrc, "Got buffer list, pushing"); + } + else + { + *ret = bufptr; + gst_nice_src_set_timestamp(basesrc, bufptr); + + GST_LOG_OBJECT (nicesrc, "Got buffer, pushing"); + } return GST_FLOW_OK; } else { + *ret = NULL; GST_LOG_OBJECT (nicesrc, "Got interrupting, returning wrong-state"); -#if GST_CHECK_VERSION (1,0,0) return GST_FLOW_FLUSHING; -#else - return GST_FLOW_WRONG_STATE; -#endif } } +static GstFlowReturn +gst_nice_src_alloc (GstBaseSrc * bsrc, guint64 offset, guint length, + GstBuffer ** ret) +{ + GstFlowReturn fret; + + fret = GST_BASE_SRC_CLASS (parent_class)->alloc (bsrc, offset, length, ret); + + return fret; +} + +static GstFlowReturn +gst_nice_src_fill (GstBaseSrc * bsrc, guint64 offset, guint length, + GstBuffer * ret) +{ + GstFlowReturn fret; + + fret = GST_BASE_SRC_CLASS (parent_class)->fill (bsrc, offset, length, ret); + + return fret; +} + static void gst_nice_src_dispose (GObject *object) { @@ -575,10 +779,32 @@ gst_nice_src_dispose (GObject *object) } src->idle_source = NULL; - if (src->agent) + if (src->agent){ +#ifdef NICE_UDP_SOCKET_HAVE_RECVMMSG + if (src->mem_list_interface_set == TRUE) + { + nice_agent_set_mem_list_interface(src->agent, NULL); + src->mem_list_interface_set = FALSE; + } +#endif g_object_unref (src->agent); + } + src->agent = NULL; +#ifdef NICE_UDP_SOCKET_HAVE_RECVMMSG + if (src->mem_list_interface.temp_refs){ + GArray *temp_refs = src->mem_list_interface.temp_refs; + /* Clean up all elements in array */ + for(int i=temp_refs->len-1;i==0; i--) + { + g_array_remove_index(temp_refs, i); + } + g_array_free(temp_refs, TRUE); + } + gst_nice_src_clean_up_pool(src); +#endif + if (src->mainloop) g_main_loop_unref (src->mainloop); src->mainloop = NULL; @@ -727,12 +953,18 @@ gst_nice_src_change_state (GstElement * element, GstStateChange transition) else { nice_agent_attach_recv (src->agent, src->stream_id, src->component_id, - src->mainctx, gst_nice_src_read_callback, (gpointer) src); + src->mainctx, gst_nice_src_read_callback, +#ifdef NICE_UDP_SOCKET_HAVE_RECVMMSG + gst_nice_src_read_multiple_callback, +#else + NULL, +#endif + (gpointer) src); } break; case GST_STATE_CHANGE_READY_TO_NULL: nice_agent_attach_recv (src->agent, src->stream_id, src->component_id, - src->mainctx, NULL, NULL); + src->mainctx, NULL, NULL, NULL); break; default: break; @@ -743,3 +975,97 @@ gst_nice_src_change_state (GstElement * element, GstStateChange transition) return ret; } + +NiceMemoryBufferRef* gst_nice_src_buffer_ref_allocate(MemlistInterface **ml_interface){ + struct _GstNiceMemlistInterface *mem_list_interface = (struct _GstNiceMemlistInterface *)ml_interface; + + GstNiceSrcMemoryBufferRef* ref; + if (mem_list_interface->temp_refs->len > 0) + { + /* Use an existing allocated reference */ + int last_index = mem_list_interface->temp_refs->len-1; + ref = (GstNiceSrcMemoryBufferRef*) g_array_index(mem_list_interface->temp_refs, GstNiceSrcMemoryBufferRef*, last_index); + // Make sure the ref is not freed when removed from the array + g_array_index(mem_list_interface->temp_refs, GstNiceSrcMemoryBufferRef*, last_index) = NULL; + g_array_remove_index(mem_list_interface->temp_refs, last_index); + } + else + { + /* No existing elements are stored, allocate a new one */ + ref = g_new0(GstNiceSrcMemoryBufferRef, 1); + } + g_assert_cmpint((gsize)ref, !=, (gsize)NULL); + return ref; +} + +NiceMemoryBufferRef* gst_nice_src_buffer_get(MemlistInterface **ml_interface, gsize size){ + struct _GstNiceMemlistInterface *mem_list_interface = (struct _GstNiceMemlistInterface *)ml_interface; + GstBufferPoolAcquireParams params = { 0 }; + GstNiceSrcMemoryBufferRef *ref = gst_nice_src_buffer_ref_allocate(ml_interface); + GstBuffer *buffer = NULL; + + g_assert(mem_list_interface->pool != NULL); + gint status = gst_buffer_pool_acquire_buffer (mem_list_interface->pool, &buffer, + ¶ms); + if(status != GST_FLOW_OK) + { + gst_nice_src_buffer_return(ml_interface, ref); + return NULL; + } + g_assert_cmpint(status, ==, GST_FLOW_OK); + g_assert(buffer != NULL); + ref->buffer = buffer; + + gboolean mapped = gst_buffer_map (ref->buffer, &ref->buf_map, + GST_MAP_WRITE | GST_MAP_READ); + g_assert(mapped); + + return (NiceMemoryBufferRef*) ref; +} + +void gst_nice_src_buffer_return(MemlistInterface **ml_interface, NiceMemoryBufferRef* buffer){ + struct _GstNiceMemlistInterface *mem_list_interface = (struct _GstNiceMemlistInterface *)ml_interface; + GstNiceSrcMemoryBufferRef *buffer_ref = (GstNiceSrcMemoryBufferRef*)buffer; + if(buffer_ref->buffer){ + /* Return allocated buffer to the pool after it has been used */ + gst_buffer_unmap (buffer_ref->buffer, &buffer_ref->buf_map); + gst_buffer_unref (buffer_ref->buffer); + } + /* TODO: The ref should be added to the array, this is not done at the moment */ + memset (buffer_ref, 0, sizeof (GstNiceSrcMemoryBufferRef)); + g_array_append_vals(mem_list_interface->temp_refs, &buffer_ref, 1); + +} + +char* gst_nice_src_buffer_contents(MemlistInterface **ml_interface, NiceMemoryBufferRef* buffer){ + GstNiceSrcMemoryBufferRef *buffer_ref = (GstNiceSrcMemoryBufferRef*)buffer; + return (char*) buffer_ref->buf_map.data; +} + +gsize gst_nice_src_buffer_size(MemlistInterface **ml_interface, NiceMemoryBufferRef* buffer){ + GstNiceSrcMemoryBufferRef *buffer_ref = (GstNiceSrcMemoryBufferRef*)buffer; + return buffer_ref->buf_map.size; +} + +void gst_nice_src_buffer_resize(MemlistInterface **ml_interface, NiceMemoryBufferRef* buffer, gsize new_size) { + GstNiceSrcMemoryBufferRef *buffer_ref = (GstNiceSrcMemoryBufferRef*)buffer; + guint8* data_location; + g_assert(new_size <= buffer_ref->buf_map.size); + data_location = buffer_ref->buf_map.data; + gst_buffer_unmap (buffer_ref->buffer, &buffer_ref->buf_map); + gst_buffer_resize(buffer_ref->buffer, 0, new_size); + gboolean mapped = gst_buffer_map (buffer_ref->buffer, &buffer_ref->buf_map, + GST_MAP_WRITE | GST_MAP_READ); + g_assert(mapped); + g_assert(buffer_ref->buf_map.data == data_location); + g_assert(buffer_ref->buf_map.size == new_size); +} + +/* Only to be used as a clear function for the temp_refs array, which contains uninitialised refs */ +static void gst_nice_src_mem_buffer_ref_array_clear(void *element){ + GstNiceSrcMemoryBufferRef **ref = (GstNiceSrcMemoryBufferRef**)element; + if (ref != NULL){ + g_free(*ref); + *ref = NULL; + } +} \ No newline at end of file diff --git a/gst/gstnicesrc.h b/gst/gstnicesrc.h index 8d81502..a8482b0 100644 --- a/gst/gstnicesrc.h +++ b/gst/gstnicesrc.h @@ -44,6 +44,7 @@ #include G_BEGIN_DECLS +#define GST_NICE_SRC_MEM_BUFFERS_PREALLOCATED 32 #define GST_TYPE_NICE_SRC \ (gst_nice_src_get_type()) @@ -57,12 +58,35 @@ G_BEGIN_DECLS (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_NICE_SRC)) #define GST_NICE_SRC_CAST(obj) \ ((GstNiceSrc *)(obj)) +#define GST_NICE_SRC_GET_CLASS(obj) \ + (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_NICE_SRC, GstNiceSrcClass)) typedef struct _GstNiceSrc GstNiceSrc; +typedef struct _GstNiceSrcMemoryBufferRef GstNiceSrcMemoryBufferRef; + +struct _GstNiceSrcMemoryBufferRef{ + GstBuffer *buffer; + GstMapInfo buf_map; +}; + +struct _GstNiceMemlistInterface{ + /* This must be first in struct for upcasts to work */ + const MemlistInterface *function_interface; + /* When embedded in GstNiceSrc this will be a self referencing pointer, + that is not refcounted */ + GstNiceSrc *gst_src; + /* Pointers to GstNiceSrcMemoryBufferRefs that have been returned that are + waiting to be given out again. It is assumed that the buffer and mapping + refed to by the reference is in an unintitialised state */ + GArray *temp_refs; + GstBufferPool *pool; + +}; + struct _GstNiceSrc { - GstPushSrc parent; + GstBaseSrc parent; GstPad *srcpad; NiceAgent *agent; guint stream_id; @@ -74,13 +98,17 @@ struct _GstNiceSrc GSource *idle_source; GstCaps *caps; GHashTable *socket_addresses; +#ifdef NICE_UDP_SOCKET_HAVE_RECVMMSG + struct _GstNiceMemlistInterface mem_list_interface; + gboolean mem_list_interface_set; +#endif }; typedef struct _GstNiceSrcClass GstNiceSrcClass; struct _GstNiceSrcClass { - GstPushSrcClass parent_class; + GstBaseSrcClass parent_class; }; GType gst_nice_src_get_type (void); diff --git a/nice/memlist.h b/nice/memlist.h new file mode 100644 index 0000000..4cc1432 --- /dev/null +++ b/nice/memlist.h @@ -0,0 +1,77 @@ +/* + * Lightweight abstraction over GStreamers buffer lists / buffer pools to allow + * allocating buffers for use with recvmmsg. + * This file is part of the Nice GLib ICE library. + * + * + * (C) 2022 Pexip AS + * + * The contents of this file are subject to the Mozilla Public License VersioVn + * 1.1 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" basis, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + * for the specific language governing rights and limitations under the + * License. + * + * The Original Code is the Nice GLib ICE library. + * + * The Initial Developers of the Original Code are Collabora Ltd and Nokia + * Corporation. All Rights Reserved. + * + * Contributors: + * Frederik Vestre, Pexip AS + * + * Alternatively, the contents of this file may be used under the terms of the + * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which + * case the provisions of LGPL are applicable instead of those above. If you + * wish to allow use of your version of this file only under the terms of the + * LGPL and not to allow others to use your version of this file under the + * MPL, indicate your decision by deleting the provisions above and replace + * them with the notice and other provisions required by the LGPL. If you do + * not delete the provisions above, a recipient may use your version of this + * file under either the MPL or the LGPL. + */ + +#ifndef _NICE_MEMLIST_H +#define _NICE_MEMLIST_H + +#ifdef __linux__ +#define NICE_UDP_SOCKET_HAVE_RECVMMSG 1 +#define _GNU_SOURCE +#endif + + +#include + +G_BEGIN_DECLS + +typedef struct _MemlistInterface MemlistInterface; +typedef void NiceMemoryBufferRef; +/* Libnice expects the buffers received trough the interface to be accessable until they are either + returned though nice_return_memory_buffer or by passing them as a result of a read operation. + All buffers will be returned when the corresponding agent is destroyed */ +typedef NiceMemoryBufferRef* (*nice_memory_buffer_get)(MemlistInterface **ml_interface, gsize size); +/* Return a memory buffer to the provider */ +typedef void (*nice_memory_buffer_return)(MemlistInterface **ml_interface, NiceMemoryBufferRef* buffer); +/* Get a pointer to the contents (i.e. bytes) of the memory buffer */ +typedef char* (*nice_memory_buffer_contents)(MemlistInterface **ml_interface, NiceMemoryBufferRef* buffer); +/* Get the size of the memory buffer */ +typedef gsize (*nice_memory_buffer_size)(MemlistInterface **ml_interface, NiceMemoryBufferRef* buffer); +/* Reduce the size of the memory buffer */ +typedef void (*nice_memory_buffer_resize)(MemlistInterface **ml_interface, NiceMemoryBufferRef* buffer, gsize new_size); + +struct _MemlistInterface { + nice_memory_buffer_get buffer_get; + nice_memory_buffer_return buffer_return; + nice_memory_buffer_contents buffer_contents; + nice_memory_buffer_size buffer_size; + nice_memory_buffer_resize buffer_resize; +}; + +G_END_DECLS + +#endif /* _NICE_MEMLIST_H */ + diff --git a/nice/meson.build b/nice/meson.build index afe3194..afcfec3 100644 --- a/nice/meson.build +++ b/nice/meson.build @@ -1,4 +1,4 @@ -install_headers('nice.h', 'niceconfig.h', subdir : 'nice') +install_headers('nice.h', 'niceconfig.h', 'memlist.h', subdir : 'nice') pkg_install_dir = join_paths(get_option('libdir'), 'pkgconfig') pkgconf = configuration_data() diff --git a/socket/socket.c b/socket/socket.c index 0f73f5e..26864b9 100644 --- a/socket/socket.c +++ b/socket/socket.c @@ -98,20 +98,42 @@ nice_socket_attach (NiceSocket *sock, GMainContext* ctx) } } -const char* socket_type_to_string (NiceSocketType type) +void +nice_socket_buffers_and_interface_unref (NiceSocket *sock) +{ + if (sock) { + if (sock->type == NICE_SOCKET_TYPE_UDP_BSD) + { + nice_udp_socket_buffers_and_interface_unref (sock); + } + } +} + +void +nice_socket_buffer_interface_set (NiceSocket *sock, MemlistInterface **ml_interface) +{ + if (sock) { + if (sock->type == NICE_SOCKET_TYPE_UDP_BSD) + { + nice_udp_socket_interface_set(sock, ml_interface); + } + } +} + +const char* socket_type_to_string (NiceSocketType socket_type) { - switch (type) { - case NICE_SOCKET_TYPE_UDP_BSD: return "udp"; - case NICE_SOCKET_TYPE_TCP_BSD: return "tcp-bsd"; - case NICE_SOCKET_TYPE_TCP_ACTIVE: return "tcp-active"; - case NICE_SOCKET_TYPE_TCP_PASSIVE: return "tcp-passive"; - case NICE_SOCKET_TYPE_TCP_ESTABLISHED: return "tcp-established"; - case NICE_SOCKET_TYPE_TCP_SO: return "tcp-so"; - case NICE_SOCKET_TYPE_PSEUDOSSL: return "pseudossl"; - case NICE_SOCKET_TYPE_HTTP: return "http"; - case NICE_SOCKET_TYPE_SOCKS5: return "socks5"; - case NICE_SOCKET_TYPE_TURN: return "turn"; - case NICE_SOCKET_TYPE_TCP_TURN: return "tcp-turn"; + switch (socket_type) { + case NICE_SOCKET_TYPE_UDP_BSD: return "udp"; + case NICE_SOCKET_TYPE_TCP_BSD: return "tcp-bsd"; + case NICE_SOCKET_TYPE_TCP_ACTIVE: return "tcp-active"; + case NICE_SOCKET_TYPE_TCP_PASSIVE: return "tcp-passive"; + case NICE_SOCKET_TYPE_TCP_ESTABLISHED: return "tcp-established"; + case NICE_SOCKET_TYPE_TCP_SO: return "tcp-so"; + case NICE_SOCKET_TYPE_PSEUDOSSL: return "pseudossl"; + case NICE_SOCKET_TYPE_HTTP: return "http"; + case NICE_SOCKET_TYPE_SOCKS5: return "socks5"; + case NICE_SOCKET_TYPE_TURN: return "turn"; + case NICE_SOCKET_TYPE_TCP_TURN: return "tcp-turn"; } return "(invalid)"; } diff --git a/socket/socket.h b/socket/socket.h index 0b68d49..9df0bb4 100644 --- a/socket/socket.h +++ b/socket/socket.h @@ -37,6 +37,7 @@ #ifndef _SOCKET_H #define _SOCKET_H +#include "memlist.h" #include "address.h" #include @@ -111,7 +112,7 @@ nice_socket_set_rx_enabled (NiceSocket *sock, gboolean enabled); void nice_socket_free (NiceSocket *sock); -const char *socket_type_to_string (NiceSocketType type); +const char *socket_type_to_string (NiceSocketType socket_type); #include "udp-bsd.h" #include "tcp-bsd.h" diff --git a/socket/udp-bsd.c b/socket/udp-bsd.c index 31941e1..6435b8f 100644 --- a/socket/udp-bsd.c +++ b/socket/udp-bsd.c @@ -40,22 +40,27 @@ * Implementation of UDP socket interface using Berkeley sockets. (See * http://en.wikipedia.org/wiki/Berkeley_sockets.) */ + #ifdef HAVE_CONFIG_H # include "config.h" #endif +#include "udp-bsd.h" #include #include #include -#include "udp-bsd.h" + +#include "agent-priv.h" +#include "memlist.h" + + #ifndef G_OS_WIN32 #include #endif - static void socket_close (NiceSocket *sock); static gint socket_recv (NiceSocket *sock, NiceAddress *from, guint len, gchar *buf); @@ -63,10 +68,40 @@ static gint socket_send (NiceSocket *sock, const NiceAddress *to, guint len, const gchar *buf); static gboolean socket_is_reliable (NiceSocket *sock); +#ifdef NICE_UDP_SOCKET_HAVE_RECVMMSG + +struct _MessageData +{ + NiceMemoryBufferRef *buffer; + + struct iovec iovec; + struct sockaddr_storage remote; +}; +typedef struct _MessageData MessageData; + +static void socket_recvmmsg_structures_fill_entry_with_buffer(MemlistInterface **memory_interface, + MessageData *message_data, struct mmsghdr *hdr); +/* TODO: Use those where appropriate */ +static void socket_recvmmsg_structures_set_up(NiceSocket *udp_socket); +#endif + + struct UdpBsdSocketPrivate { NiceAddress niceaddr; GSocketAddress *gaddr; +#ifdef NICE_UDP_SOCKET_HAVE_RECVMMSG + MemlistInterface **ml_interface; + /* Alloc buffers outside callback, to avoid reallocing buffers for messages + that are not received. Any messages that are passed along are replaced with + freshly allocated memory. */ + MessageData message_datas[NICE_UDP_SOCKET_MMSG_TOTAL]; + /* This is stored outside the MessageData struct as this must be in a + continous list to be able to be passed to recvmmsg */ + struct mmsghdr message_headers[NICE_UDP_SOCKET_MMSG_TOTAL]; + uint64_t buffer_available[((NICE_UDP_SOCKET_MMSG_TOTAL-1)/sizeof(uint64_t))+1]; + gboolean missing_buffers; +#endif }; NiceSocket * @@ -146,6 +181,12 @@ nice_udp_bsd_socket_new (NiceAddress *addr) sock->close = socket_close; sock->attach = NULL; +#ifdef NICE_UDP_SOCKET_HAVE_RECVMMSG + memset(priv->message_datas, 0, sizeof(MessageData)*NICE_UDP_SOCKET_MMSG_TOTAL); + memset(priv->message_headers, 0, sizeof(struct mmsghdr)*NICE_UDP_SOCKET_MMSG_TOTAL); + priv->missing_buffers = TRUE; +#endif + return sock; } @@ -156,6 +197,9 @@ socket_close (NiceSocket *sock) if (priv->gaddr) g_object_unref (priv->gaddr); + + nice_udp_socket_buffers_and_interface_unref(sock); + g_slice_free (struct UdpBsdSocketPrivate, sock->priv); if (sock->fileno) { @@ -163,8 +207,48 @@ socket_close (NiceSocket *sock) g_object_unref (sock->fileno); sock->fileno = NULL; } + } +#ifdef NICE_UDP_SOCKET_HAVE_RECVMMSG +gint nice_udp_socket_recvmmsg(NiceSocket *sock) +{ + g_assert(sock->type == NICE_SOCKET_TYPE_UDP_BSD); + struct UdpBsdSocketPrivate *priv = sock->priv; + MemlistInterface **memory_interface_ptr = priv->ml_interface; + MemlistInterface *memory_interface; + + if(memory_interface_ptr != NULL) + { + memory_interface = *memory_interface_ptr; + } + else + { + return -ENOTSUP; + } + + if(priv->missing_buffers){ + priv->missing_buffers = FALSE; + nice_udp_socket_recvmmsg_structures_fill_new_buffers(sock, 0, NICE_UDP_SOCKET_MMSG_LEN); + if(priv->missing_buffers == TRUE){ + return -ENOMEM; + } + } + + /* What do we do here if we haven't been able to initiate enough buffers to put data in?*/ + int socket_fd = g_socket_get_fd(sock->fileno); + gssize result = + recvmmsg (socket_fd, priv->message_headers, NICE_UDP_SOCKET_MMSG_LEN, MSG_WAITFORONE, NULL); + + // Resize buffers to the actual received length + for( int i = 0; i < result; i++ ) { + MessageData* message_data = &(priv->message_datas[i]); + struct mmsghdr *message_header = &(priv->message_headers[i]); + memory_interface->buffer_resize(memory_interface_ptr, message_data->buffer, message_header->msg_len); + } + return result; +} +#endif static gint socket_recv (NiceSocket *sock, NiceAddress *from, guint len, gchar *buf) { @@ -225,3 +309,187 @@ socket_is_reliable (NiceSocket *sock) return FALSE; } +#ifdef NICE_UDP_SOCKET_HAVE_RECVMMSG + +void nice_udp_socket_interface_set(NiceSocket *udp_socket, MemlistInterface **ml_interface){ + g_assert(udp_socket->type == NICE_SOCKET_TYPE_UDP_BSD); + struct UdpBsdSocketPrivate *priv = udp_socket->priv; + g_assert(priv->ml_interface == NULL); + priv->ml_interface = ml_interface; + socket_recvmmsg_structures_set_up(udp_socket); +} + +NiceMemoryBufferRef *nice_udp_socket_packet_retrieve(NiceSocket *udp_socket, + guint packet_index, NiceAddress *from) +{ + g_assert(packet_index < NICE_UDP_SOCKET_MMSG_TOTAL); + struct UdpBsdSocketPrivate *priv = udp_socket->priv; + MessageData *message_data = &(priv->message_datas[packet_index]); + struct mmsghdr *message_header = &(priv->message_headers[packet_index]); + nice_address_set_from_sockaddr (from, (struct sockaddr *)&(message_data->remote)); + + NiceMemoryBufferRef *result = message_data->buffer; + message_data->buffer = NULL; + /* Replace the entry with a fresh buffer for next recvmmsg call */ + socket_recvmmsg_structures_fill_entry_with_buffer (priv->ml_interface, + message_data, message_header); + return result; +} + +/* This ensures no references to any buffers are present for this socket. + If this function is called the mem_interface may be changed, as long as it + happens before any more data is received or sent (practically while the agent + lock is locked). It is safe to call this function even if no MessageInterface + has been set earlier, and thus no buffers are in need of beeing cleaned up. */ +void nice_udp_socket_buffers_and_interface_unref(NiceSocket *udp_socket) +{ + struct UdpBsdSocketPrivate *priv = udp_socket->priv; + MemlistInterface **memory_interface_ptr = priv->ml_interface; + if(memory_interface_ptr != NULL) + { + g_assert(priv->message_datas != NULL); + MemlistInterface *memory_interface = *memory_interface_ptr; + for(int i = 0; i < NICE_UDP_SOCKET_MMSG_TOTAL; i++) + { + MessageData* msgdata = &(priv->message_datas[i]); + struct mmsghdr *message_header = &(priv->message_headers[i]); + + if (msgdata != NULL){ + if (msgdata->buffer != NULL){ + memory_interface->buffer_return(memory_interface_ptr, msgdata->buffer); + } + msgdata->iovec.iov_len = 0; + msgdata->iovec.iov_base = NULL; + + memset(message_header, 0, sizeof(struct mmsghdr)); + memset(msgdata, 0, sizeof(MessageData)); + + } + } + } + /* Currently we don't manage buffers when not recvmmsg is supported. + This may change in the future. However until then do nothing here. */ + + /* Clear the interface pointer, regardless if it is set or not */ + priv->ml_interface = NULL; +} + + +static void socket_recvmmsg_structures_fill_entry_with_buffer(MemlistInterface **memory_interface_ptr, + MessageData *message_data, struct mmsghdr *hdr) +{ + MemlistInterface *memory_interface = *memory_interface_ptr; + if (message_data->buffer == NULL) + { + message_data->buffer = memory_interface->buffer_get(memory_interface_ptr, NICE_UDP_SOCKET_BUFFER_ALLOC_SIZE); + } + if (message_data->buffer == NULL){ + return; + } + + gsize buffer_size = memory_interface->buffer_size(memory_interface_ptr, message_data->buffer); + message_data->iovec.iov_len = buffer_size; + message_data->iovec.iov_base = memory_interface->buffer_contents(memory_interface_ptr, message_data->buffer); + hdr->msg_len = buffer_size; + +} + +static void socket_recvmmsg_structures_set_up(NiceSocket *udp_socket) +{ + struct UdpBsdSocketPrivate *priv = udp_socket->priv; + MemlistInterface **memory_interface = priv->ml_interface; + gboolean missing_buffers = FALSE; + + for(int i = 0; i < NICE_UDP_SOCKET_MMSG_TOTAL; i++) + { + MessageData *message_data = &(priv->message_datas[i]); + struct mmsghdr *hdr = &(priv->message_headers[i]); + g_assert(i < NICE_UDP_SOCKET_MMSG_TOTAL); + + priv->message_headers[i].msg_hdr.msg_control = NULL; + priv->message_headers[i].msg_hdr.msg_controllen = 0; + priv->message_headers[i].msg_hdr.msg_iovlen = 1; + priv->message_headers[i].msg_hdr.msg_iov = &message_data->iovec; + priv->message_headers[i].msg_hdr.msg_name = (struct sockaddr *) &message_data->remote; + priv->message_headers[i].msg_hdr.msg_namelen = sizeof (struct sockaddr_storage); + + if (memory_interface != NULL){ + socket_recvmmsg_structures_fill_entry_with_buffer(memory_interface, message_data, hdr); + if (message_data->buffer != NULL){ + continue; + } + else{ + missing_buffers = TRUE; + } + } + + message_data->iovec.iov_len = 0; + message_data->iovec.iov_base = NULL; + hdr->msg_len = 0; + } + priv->missing_buffers = missing_buffers; + +} + +void nice_udp_socket_recvmmsg_structures_fill_new_buffers(NiceSocket *udp_socket, guint iter_start, guint iter_end) +{ + struct UdpBsdSocketPrivate *priv = udp_socket->priv; + MemlistInterface **memory_interface = priv->ml_interface; + g_assert(iter_start < iter_end); + g_assert(iter_end <= NICE_UDP_SOCKET_MMSG_TOTAL); + g_assert(memory_interface != NULL); + + for(int i = iter_start; i < iter_end; i++) + { + MessageData *message_data = &(priv->message_datas[i]); + struct mmsghdr *hdr = &(priv->message_headers[i]); + + socket_recvmmsg_structures_fill_entry_with_buffer(memory_interface, message_data, hdr); + if (message_data->buffer == NULL){ + message_data->iovec.iov_len = 0; + message_data->iovec.iov_base = NULL; + hdr->msg_len = 0; + priv->missing_buffers = TRUE; + } + } +} + +#else + +void nice_udp_socket_buffers_and_interface_unref(NiceSocket *udp_socket) +{ + (void) udp_socket; +} +void clean_up_recvmmsg_structures(NiceSocket *udp_socket) +{ + (void)udp_socket; +} +void socket_recvmmsg_structures_set_up(NiceSocket *udp_socket) +{ + (void)udp_socket; +} +gint nice_udp_socket_recvmmsg(NiceSocket *sock); +NiceMemoryBufferRef *nice_udp_socket_packet_retrieve(NiceSocket *udp_socket, + guint packet_index, NiceAddress *from); + +NiceMemoryBufferRef *nice_udp_socket_packet_retrieve(NiceSocket *udp_socket, + guint packet_index, NiceAddress *from) +{ + (void)udp_socket; + (void)packet_index; + (void)from; + return NULL; +} +void nice_udp_socket_recvmmsg_structures_fill_new_buffers(NiceSocket *udp_socket, + guint iter_start, guint iter_end) +{ + (void)udp_socket; + (void)iter_start; + (void)iter_end; +} +void nice_udp_socket_interface_set(NiceSocket *udp_socket, MemlistInterface **ml_interface){ + (void)udp_socket; + (void)ml_interface; +} + +#endif \ No newline at end of file diff --git a/socket/udp-bsd.h b/socket/udp-bsd.h index c8d6190..64a4a47 100644 --- a/socket/udp-bsd.h +++ b/socket/udp-bsd.h @@ -39,13 +39,37 @@ #ifndef _UDP_BSD_H #define _UDP_BSD_H +#ifdef NICE_UDP_SOCKET_HAVE_RECVMMSG +#define _GNU_SOURCE +#include +#endif + #include "socket.h" +#include "agent.h" G_BEGIN_DECLS +#ifdef NICE_UDP_SOCKET_HAVE_RECVMMSG +#define NICE_UDP_SOCKET_MSG_RECEIVE_TIMES 1 +#define NICE_UDP_SOCKET_MMSG_LEN 32 +#else +#define NICE_UDP_SOCKET_MSG_RECEIVE_TIMES 32 +#define NICE_UDP_SOCKET_MMSG_LEN 1 +#endif +#define NICE_UDP_SOCKET_BUFFER_ALLOC_SIZE 1500 +#define NICE_UDP_SOCKET_MMSG_TOTAL (NICE_UDP_SOCKET_MSG_RECEIVE_TIMES * NICE_UDP_SOCKET_MMSG_LEN) + NiceSocket * nice_udp_bsd_socket_new (NiceAddress *addr); +void nice_udp_socket_interface_set(NiceSocket *udp_socket, MemlistInterface **ml_interface); +void nice_udp_socket_buffers_and_interface_unref(NiceSocket *udp_socket); +gint nice_udp_socket_recvmmsg(NiceSocket *sock); +NiceMemoryBufferRef *nice_udp_socket_packet_retrieve(NiceSocket *udp_socket, + guint packet_index, NiceAddress *from); +void nice_udp_socket_recvmmsg_structures_fill_new_buffers(NiceSocket *udp_socket, + guint iter_start, guint iter_end); + G_END_DECLS #endif /* _UDP_BSD_H */