Logo Search packages:      
Sourcecode: farsight version File versions

rtpstream.c

/*
 * rtpstream.c - Source for RTP plugin stream implementation
 *
 * Farsight RTP/AVP/SAVP/AVPF Module
 * Copyright (C) 2005,2006 Collabora Ltd.
 * Copyright (C) 2005,2006 Nokia Corporation
 *   @author Rob Taylor <rob.taylor@collabora.co.uk>
 *   @author Philippe Kalaf <philippe.kalaf@collabora.co.uk>
 * Copyright (C) 2005 INdT 
 *   @author Andre Moreira Magalhaes <andre.magalhaes@indt.org.br>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */

#include "rtp.h"
#include "rtpstream.h"
#include "rtpsession.h"
#include "rtpgstcodecs.h"
#include "farsight-transport.h"
#include "farsight-transmitter.h"

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include "helpers/farsight-interfaces.h"
#ifndef G_OS_WIN32
#include <arpa/inet.h>
#endif

#ifdef HAVE_CLINKC
#include "helpers/upnpigd.h"
#endif

#include <gst/gst.h>

#include <string.h>

#define DEFAULT_CONN_TIMEOUT 45

#define DEFAULT_MIN_PTIME 0
#define DEFAULT_MAX_PTIME -1

#define GST_QUERY_JB_STATS \
        (gst_query_type_get_by_nick ("jitterbuffer-statistics"))
#define GST_QUERY_JB_STATS_SUPPORTED \
        (GST_QUERY_JB_STATS != GST_QUERY_NONE)

#define GST_CODECS_CONF_FILE "gstcodecs.conf"

enum
{
  PROP_0,
  PROP_CONN_TIMEOUT,
  PROP_TRANSMITTER,
  PROP_MIN_PTIME,
  PROP_TRANSMITTER_OBJECT,
  PROP_MAX_PTIME
};

struct _FarsightRTPStreamPrivate
{
  gboolean disposed;

  GList *local_codecs;
  GList *remote_codecs;
  GList *negotiated_codecs;
  GHashTable *pt_caps_table;

  GArray *codec_pref_list;
  GHashTable *negotiated_codec_associations;
  GHashTable *local_codec_associations;

  GList *codecs_configuration;

  FarsightTransmitter *transmitter;
  gchar *transmitter_name;

  /* send/recv pipeline */
  GstElement *main_pipeline; /*this is an optional user provided GstPipeline*/
  GstElement *pipeline;
  GstElement *rtpbin;
  GstElement *send_codec_bin;

  GstElement *src;
  GstCaps *src_filter;

  GstElement *rtpdemux;
  GstElement *sink;
  GstCaps *sink_filter;

  gboolean sending;

  guint bus_watch;

  GArray *pending_src_ids;

  gint recv_codec_id;
  gint send_codec_id;

  gint preload_recv_codec_id;

  gboolean build_send_pipeline;

  gboolean stopping;

  gboolean prepared; /*set if prepare has been called*/

  guint conn_timeout;
  guint timeout_src;
  guint stats_timeout;

  GList *local_candidates;
  GList *remote_candidates;

  gchar *active_native_candidate;
  gchar *active_remote_candidate;

  gboolean use_upnp;

  guint64 min_ptime;
  guint64 max_ptime;
};

typedef struct _DepayloaderChainInfo DepayloaderChainInfo;
struct _DepayloaderChainInfo
{
  FarsightStream *stream;
  GstPad *pad;
  gint pt;
};

#define FARSIGHT_RTP_STREAM_GET_PRIVATE(o)  \
  (G_TYPE_INSTANCE_GET_PRIVATE ((o), FARSIGHT_TYPE_RTP_STREAM, \
                                FarsightRTPStreamPrivate))

static gboolean g_object_has_property (GObject *object, const gchar *property);

static void farsight_rtp_stream_class_init (FarsightRTPStreamClass *klass);
static void farsight_rtp_stream_init (FarsightRTPStream *rtp_stream);

static void farsight_rtp_stream_dispose (GObject *object);
static void farsight_rtp_stream_finalize (GObject *object);

static void farsight_rtp_stream_set_property (GObject * object,
    guint prop_id, const GValue * value, GParamSpec * pspec);
static void farsight_rtp_stream_get_property (GObject * object,
    guint prop_id, GValue * value, GParamSpec * pspec);

static gboolean farsight_rtp_stream_bus_watch_cb (GstBus *bus, GstMessage
    *message, gpointer user_data);
static void farsight_rtp_stream_new_payload_type (GstElement *element, 
                                                  gint pt, GstPad *pad, 
                                                  gpointer user_data);
static void farsight_rtp_stream_payload_type_change (GstElement *element, 
                                                     gint pt,
                                                     gpointer user_data);
static void farsight_rtp_stream_create_new_pt_recv_pipeline (FarsightStream *stream, 
                                                             gint id, GstPad *pad);
static GstElement *farsight_rtp_stream_get_pipeline (FarsightStream *stream);
static gboolean farsight_rtp_stream_set_pipeline (FarsightStream *stream,
                                                  GstElement *pipeline);
static void farsight_rtp_stream_set_active_codec (FarsightStream *stream, 
                                                  gint id);
static gint farsight_rtp_stream_get_active_codec (FarsightStream *stream);
static G_CONST_RETURN GList *farsight_rtp_stream_get_local_codecs (FarsightStream *stream);
static gboolean farsight_rtp_stream_set_remote_codecs (FarsightStream *stream,
                                                   const GList *codecs);
static GList *farsight_rtp_stream_get_codec_intersection (FarsightStream *stream);
static void farsight_rtp_stream_set_codec_preference_list (FarsightStream *stream,
    const GArray *codec_pref);
static gboolean farsight_rtp_stream_set_source (FarsightStream *stream,
                                                GstElement *source);
static gboolean farsight_rtp_stream_set_source_filter (FarsightStream *stream,
    GstCaps *filter);
static GstElement *farsight_rtp_stream_get_source (FarsightStream *stream);
static gboolean farsight_rtp_stream_set_sink (FarsightStream *stream,
                                              GstElement *sink);
static gboolean farsight_rtp_stream_set_sink_filter (FarsightStream *stream,
    GstCaps *filter);
static GstElement *farsight_rtp_stream_get_sink (FarsightStream *stream);

static gboolean farsight_rtp_stream_build_base_pipeline (FarsightRTPStream *self);
static gboolean farsight_rtp_stream_build_send_pipeline (FarsightRTPStream *self);
static void farsight_rtp_stream_try_set_playing (FarsightRTPStream *self);
static gboolean farsight_rtp_stream_set_playing (gpointer data);
static gboolean farsight_rtp_stream_start (FarsightStream *stream);
static void farsight_rtp_stream_stop (FarsightStream *stream);
static gboolean farsight_rtp_stream_set_sending (FarsightStream *stream,
    gboolean sending);
static CodecAssociation * farsight_rtp_stream_choose_codec (
    FarsightRTPStream *self);
static gboolean farsight_rtp_stream_candidate_exists (FarsightStream *stream, 
                                                      const GList *candidate_list, 
                                                      const GList *candidate);

static void 
farsight_rtp_stream_new_native_candidate (gpointer transmitter,
    const FarsightTransportInfo *candidate, gpointer stream);
static void
farsight_rtp_stream_native_candidates_prepared (gpointer transmitter,
    gpointer stream);
static void
farsight_rtp_stream_new_active_candidate_pair (gpointer transmitter,
    const gchar *native_candidate_id, 
    const gchar *remote_candidate_id,
    gpointer stream);
static void 
farsight_rtp_stream_transmitter_state_changed (gpointer transmitter,
    FarsightTransmitterState state,
    gpointer stream);
static void farsight_rtp_stream_transmitter_error (gpointer transmitter,
    gpointer stream);

static void farsight_rtp_stream_upnp_send_request (FarsightRTPStream *self,
    const FarsightTransportInfo *derived_trans);
static void farsight_rtp_stream_upnp_close_ports (FarsightRTPStream *self);

static void farsight_rtp_stream_prepare_transports (FarsightStream *self);
static gboolean farsight_rtp_stream_add_remote_candidate_to_rtpbin (FarsightRTPStream *self, 
                                                                    const gchar *remote_candidate_id);
static gboolean farsight_rtp_stream_set_active_candidate_pair (FarsightStream *stream,
                                                               const gchar *native_candidate_id, 
                                                               const gchar *remote_candidate_id);
static GList *farsight_rtp_stream_get_native_candidate (FarsightStream *stream, 
                                                        const gchar *candidate_id);
static G_CONST_RETURN GList *farsight_rtp_stream_get_native_candidate_list (FarsightStream *stream);
static void farsight_rtp_stream_set_remote_candidate_list (FarsightStream *stream, 
                                                           const GList *remote_candidates);
static void farsight_rtp_stream_add_remote_candidate (FarsightStream *self,
                                                      const GList *remote_candidate);

static gboolean farsight_rtp_stream_connection_timed_out (gpointer data);
static gboolean farsight_rtp_stream_start_telephony_event (
    FarsightStream *self, guint8 ev, guint8 volume, FarsightStreamDTMFMethod method);
static gboolean farsight_rtp_stream_stop_telephony_event (
    FarsightStream *self, FarsightStreamDTMFMethod method);
static gboolean farsight_rtp_stream_preload_receive_pipeline (
    FarsightStream *self, gint payload_type);


FarsightStreamClass *rtp_stream_parent_class = NULL;

static void
blocked_cb (GstPad *pad, gboolean blocked,
    gpointer user_data);

static GType type = 0;

void farsight_rtp_stream_register_type(GTypeModule *module)
{
  static const GTypeInfo info = {
    sizeof (FarsightRTPStreamClass),
    NULL,
    NULL,
    (GClassInitFunc) farsight_rtp_stream_class_init,
    NULL,
    NULL,
    sizeof (FarsightRTPStream),
    0,
    (GInstanceInitFunc) farsight_rtp_stream_init
  };

  type = g_type_module_register_type (module, FARSIGHT_TYPE_STREAM,
      "FarsightRTPStream", &info, 0);
}

GType
farsight_rtp_stream_get_type (void)
{
  return type;
}


static void
farsight_rtp_stream_class_init (FarsightRTPStreamClass *klass)
{
  GObjectClass *gobject_class;
  FarsightStreamClass *farsight_stream_class;

  gobject_class = (GObjectClass *) klass;
  farsight_stream_class = (FarsightStreamClass *) klass;
  rtp_stream_parent_class = g_type_class_peek_parent (klass);

  gobject_class->set_property = farsight_rtp_stream_set_property;
  gobject_class->get_property = farsight_rtp_stream_get_property;

  g_object_class_install_property (gobject_class, PROP_CONN_TIMEOUT,
      g_param_spec_uint ("conn_timeout", "Connection timeout",
        "Number of secs before connection timeout", 0, G_MAXUINT, 0,
        G_PARAM_READWRITE));

  g_object_class_install_property (gobject_class, PROP_TRANSMITTER,
      g_param_spec_string ("transmitter", "Transmitter name",
        "The name of the network transmitter to use",
        NULL, G_PARAM_WRITABLE));

  g_object_class_install_property (gobject_class, PROP_MIN_PTIME,
      g_param_spec_int64 ("min-ptime", "Min packet time",
          "Minimum duration of the packet data in ns (can't go above MTU)",
          0, G_MAXINT64, DEFAULT_MIN_PTIME, 
          G_PARAM_READWRITE | G_PARAM_CONSTRUCT));

  g_object_class_install_property (gobject_class, PROP_MAX_PTIME,
      g_param_spec_int64 ("max-ptime", "Max packet time",
          "Maximum duration of the packet data in ns (-1 = unlimited up to MTU)",
          -1, G_MAXINT64, DEFAULT_MAX_PTIME, 
          G_PARAM_READWRITE | G_PARAM_CONSTRUCT));

  g_object_class_install_property (gobject_class, PROP_TRANSMITTER_OBJECT,
      g_param_spec_pointer ("transmitter-object", "Transmitter object",
        "Transmitter object used for this stream", G_PARAM_READABLE));

  gobject_class->dispose = farsight_rtp_stream_dispose;
  gobject_class->finalize = farsight_rtp_stream_finalize;

  farsight_stream_class->prepare_transports =
      farsight_rtp_stream_prepare_transports;
  farsight_stream_class->get_native_candidate_list =
      farsight_rtp_stream_get_native_candidate_list;
  farsight_stream_class->get_native_candidate =
      farsight_rtp_stream_get_native_candidate;
  farsight_stream_class->set_remote_candidate_list =
      farsight_rtp_stream_set_remote_candidate_list;
  farsight_stream_class->add_remote_candidate =
      farsight_rtp_stream_add_remote_candidate;
  //farsight_stream_class->remove_remote_candidate =
      //farsight_rtp_stream_remove_remote_candidate;
  farsight_stream_class->set_active_candidate_pair =
      farsight_rtp_stream_set_active_candidate_pair;
  farsight_stream_class->get_local_codecs =
      farsight_rtp_stream_get_local_codecs;
  farsight_stream_class->set_remote_codecs =
      farsight_rtp_stream_set_remote_codecs;
  farsight_stream_class->get_codec_intersection =
    farsight_rtp_stream_get_codec_intersection;
  farsight_stream_class->set_codec_preference_list =
    farsight_rtp_stream_set_codec_preference_list;
  farsight_stream_class->set_active_codec =
      farsight_rtp_stream_set_active_codec;
  farsight_stream_class->get_active_codec =
      farsight_rtp_stream_get_active_codec;
  farsight_stream_class->set_sink = farsight_rtp_stream_set_sink;
  farsight_stream_class->set_sink_filter = farsight_rtp_stream_set_sink_filter;
  farsight_stream_class->get_sink = farsight_rtp_stream_get_sink;
  farsight_stream_class->set_source = farsight_rtp_stream_set_source;
  farsight_stream_class->set_source_filter =
    farsight_rtp_stream_set_source_filter;
  farsight_stream_class->get_source = farsight_rtp_stream_get_source;
  farsight_stream_class->get_pipeline = farsight_rtp_stream_get_pipeline;
  farsight_stream_class->set_pipeline = farsight_rtp_stream_set_pipeline;
  farsight_stream_class->start = farsight_rtp_stream_start;
  farsight_stream_class->stop = farsight_rtp_stream_stop;
  farsight_stream_class->set_sending = farsight_rtp_stream_set_sending;
  farsight_stream_class->start_telephony_event =
      farsight_rtp_stream_start_telephony_event;
  farsight_stream_class->stop_telephony_event =
      farsight_rtp_stream_stop_telephony_event;
  farsight_stream_class->preload_receive_pipeline =
      farsight_rtp_stream_preload_receive_pipeline;

  g_type_class_add_private (klass, sizeof (FarsightRTPStreamPrivate));
}

static void
farsight_rtp_stream_init (FarsightRTPStream *self)
{
  self->priv = FARSIGHT_RTP_STREAM_GET_PRIVATE (self);
  self->priv->remote_codecs = NULL;

  self->priv->codec_pref_list = NULL;

  self->priv->pipeline = NULL;
  self->priv->rtpbin = NULL;
  self->priv->send_codec_bin = NULL;

  self->priv->src = NULL;
  self->priv->src_filter = NULL;

  self->priv->rtpdemux = NULL;
  self->priv->sink = NULL;
  self->priv->sink_filter = NULL;

  self->priv->local_codecs = NULL;
  self->priv->remote_codecs = NULL;
  self->priv->negotiated_codecs = NULL;

  self->priv->negotiated_codec_associations = NULL;
  self->priv->local_codec_associations = NULL;
  self->priv->codecs_configuration = NULL;


  if (farsight_stream_get_direction (FARSIGHT_STREAM (self)) &
      FARSIGHT_STREAM_DIRECTION_SENDONLY)
  {
    self->priv->sending = TRUE;
  }
  else
  {
    self->priv->sending = FALSE;
  }

  self->priv->stopping = FALSE;

  self->priv->prepared = FALSE;

  self->priv->bus_watch = 0;
  self->priv->stats_timeout = 0;

  self->priv->pending_src_ids = g_array_new (FALSE, FALSE, sizeof (gint));

  self->priv->pt_caps_table = g_hash_table_new_full (g_direct_hash,
      g_direct_equal, NULL, NULL);

  self->priv->local_candidates = NULL;
  self->priv->remote_candidates = NULL;

  /* this tells _start to auto pick a codec */
  self->priv->send_codec_id = -1;
  self->priv->recv_codec_id = -1;
  self->priv->preload_recv_codec_id = -1;

  self->priv->build_send_pipeline = FALSE;

  self->priv->timeout_src = 0;
  self->priv->conn_timeout = DEFAULT_CONN_TIMEOUT;

  self->priv->min_ptime = DEFAULT_MIN_PTIME;
  self->priv->max_ptime = DEFAULT_MAX_PTIME;

#ifdef HAVE_CLINKC
  if (!getenv ("FARSIGHT_DISABLE_UPNP"))
  {
    self->priv->use_upnp = TRUE;
    g_message ("Setting uPnP to true");
    if (!upnp_cp_init())
    {
      g_warning ("Error setting up uPnP");
      self->priv->use_upnp = FALSE;
    }
  }
#endif
}

static GList *
load_codecs_configuration (void)
{
  GList *list = NULL;
  gchar *path = NULL;

  path = g_build_filename (g_get_home_dir(), ".farsight",
      GST_CODECS_CONF_FILE, NULL);
  g_debug ("Trying to load %s", path);
  list = farsight_codec_list_from_keyfile (path);
  g_free (path);

  if (list)
    return list;

  path = g_build_filename (SYSCONFDIR, "farsight",
          GST_CODECS_CONF_FILE, NULL);
  g_debug ("Trying to load %s", path);
  list = farsight_codec_list_from_keyfile (path);
  g_free (path);

  return list;
}

static gboolean
ensure_local_codecs (FarsightRTPStream *self)
{
  GList *local_codecs;
  guint media_type;

  if (self->priv->local_codecs)
    return TRUE;

  g_object_get (G_OBJECT (self), "media-type", &media_type, NULL);

  g_debug ("%s: media type is %d", __FUNCTION__, media_type);

  if (!load_codecs (media_type))
  {
    g_debug ("%s (%d): loading codecs failed", __FUNCTION__, __LINE__);

    farsight_stream_signal_error (FARSIGHT_STREAM (self),
        FARSIGHT_STREAM_ERROR_PIPELINE_SETUP,
        "loading codecs failed");

    return FALSE;
  }

  if (!self->priv->codecs_configuration) {
    self->priv->codecs_configuration = load_codecs_configuration ();
  }

  if (!self->priv->codecs_configuration) {
    g_debug ("%s (%d): could not load codecs configuration file", __FUNCTION__,
        __LINE__);
  }

  /* Now lets remove any invalid entry */
  if (self->priv->codecs_configuration) {
    self->priv->codecs_configuration = validate_codecs_configuration (media_type,
        self->priv->codecs_configuration);
  }

  self->priv->local_codec_associations = create_local_codec_associations (
      media_type, self->priv->codecs_configuration,
      self->priv->negotiated_codec_associations, &local_codecs);

  if (self->priv->local_codec_associations) {
    self->priv->local_codecs = local_codecs;

    /* What to do with the resorting... Evil resorting done here */
    /* And only for audio.. thats even more evil */

    if (self->priv->codec_pref_list)
      sort_codecs (&self->priv->local_codecs, self->priv->codec_pref_list);

    return TRUE;
  } else {
    return FALSE;
  }
}

static void
remove_pending_mainloop_sources (FarsightRTPStreamPrivate *priv)
{
  gint i;
  if (priv->pending_src_ids->len)
  {
    for (i = 0; i < priv->pending_src_ids->len; i++)
    {
      g_source_remove (g_array_index (priv->pending_src_ids, gint, i));
    }
    g_array_remove_range (priv->pending_src_ids, 0, priv->pending_src_ids->len);
  }
}

static void
farsight_rtp_stream_dispose (GObject *object)
{
  FarsightRTPStream *self = FARSIGHT_RTP_STREAM (object);
  guint media_type;

  if (self->priv->disposed) {
    /* If dispose did already run, return. */
    return;
  }

  self->priv->disposed = TRUE;

  farsight_rtp_stream_stop (FARSIGHT_STREAM (self));

  g_object_get (G_OBJECT (self), "media-type", &media_type, NULL);

  unload_codecs (media_type);

  remove_pending_mainloop_sources (self->priv);

  /* chain up to parent */
  G_OBJECT_CLASS (rtp_stream_parent_class)->dispose (object);
}

static void
farsight_rtp_stream_finalize (GObject *object)
{
  FarsightRTPStream *self = FARSIGHT_RTP_STREAM (object);

  g_return_if_fail (self != NULL);
  g_return_if_fail (FARSIGHT_IS_RTP_STREAM (self));

  if (self->priv->pending_src_ids)
    g_array_free (self->priv->pending_src_ids, TRUE);

  if (self->priv->local_candidates)
    farsight_transport_list_destroy (self->priv->local_candidates);

  if (self->priv->remote_candidates)
    farsight_transport_list_destroy (self->priv->remote_candidates);

  if (self->priv->codec_pref_list)
    g_array_free (self->priv->codec_pref_list, TRUE);

  if (self->priv->local_codecs)
    g_list_free (self->priv->local_codecs);

  if (self->priv->remote_codecs)
    farsight_codec_list_destroy (self->priv->remote_codecs);

  if (self->priv->active_native_candidate) {
    g_free (self->priv->active_native_candidate);
  }

  if (self->priv->active_remote_candidate) {
    g_free (self->priv->active_remote_candidate);
  }

  if (self->priv->pt_caps_table) {
    g_hash_table_destroy (self->priv->pt_caps_table);
  }

  if (self->priv->negotiated_codec_associations)
  {
    g_hash_table_destroy (self->priv->negotiated_codec_associations);
  }

  if (self->priv->sink) {
    gst_object_unref (self->priv->sink);
  }

  if (self->priv->sink_filter) {
    gst_caps_unref (self->priv->sink_filter);
  }

  if (self->priv->src) {
    gst_object_unref (self->priv->src);
  }

  if (self->priv->src_filter) {
    gst_caps_unref (self->priv->src_filter);
  }

  /* TODO free GSTelements */

  G_OBJECT_CLASS (rtp_stream_parent_class)->finalize (object);
}

static void 
bin_element_set_property (GstBin *bin, const gchar *property, ...) 
{
  GstIterator *it;

  if (!bin)
    return;

  it = gst_bin_iterate_elements (bin);

  if (!it)
    return;

  for(;;) {
    gpointer pt;
    GstIteratorResult res = gst_iterator_next (it, &pt);
    GstElement *elem = pt;

    if (res == GST_ITERATOR_DONE)
      break;
    else if (res == GST_ITERATOR_RESYNC) {
      gst_iterator_resync (it);
      break;
    } else if (res == GST_ITERATOR_ERROR) {
      g_error ("Error iterating contents of send_codec_bin\n");
      break;
    } else if (res == GST_ITERATOR_OK) {
      GObjectClass *klass;
      klass = G_OBJECT_GET_CLASS (elem);
      if (g_object_class_find_property (klass, property)) {
        va_list var_args;
        va_start (var_args, property);
        g_object_set_valist (G_OBJECT (elem), property, var_args);
        va_end (var_args);
      }
      gst_object_unref (GST_OBJECT (elem));
    }
  }
  gst_iterator_free (it);
}

static void farsight_rtp_stream_set_property (GObject * object,
    guint prop_id, const GValue * value, GParamSpec * pspec)
{
  FarsightRTPStream *self = FARSIGHT_RTP_STREAM (object);

  switch (prop_id) {
    case PROP_CONN_TIMEOUT:
      self->priv->conn_timeout = g_value_get_uint (value);
      break;
    case PROP_TRANSMITTER:
      self->priv->transmitter_name = g_value_dup_string (value);

      if (self->priv->transmitter) {
          farsight_transmitter_stop (self->priv->transmitter);
          g_object_unref (G_OBJECT (self->priv->transmitter));
          self->priv->transmitter = NULL;
      }

      self->priv->transmitter = farsight_transmitter_factory_make
        (self->priv->transmitter_name);

      if (!self->priv->transmitter)
      {
        g_warning ("Error creating %s transmitter", self->priv->transmitter_name);
      }
      break;
    case PROP_TRANSMITTER_OBJECT:
      g_warning ("%s: trying to manually set transmitter object", G_STRFUNC);
      break;
    case PROP_MIN_PTIME:
      self->priv->min_ptime = g_value_get_int64 (value);

      bin_element_set_property (GST_BIN (self->priv->send_codec_bin),
          "min-ptime", self->priv->min_ptime, NULL);
      break;
    case PROP_MAX_PTIME:
      self->priv->max_ptime = g_value_get_int64 (value);

      bin_element_set_property (GST_BIN (self->priv->send_codec_bin),
          "max-ptime", self->priv->max_ptime, NULL);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void farsight_rtp_stream_get_property (GObject * object,
    guint prop_id, GValue * value, GParamSpec * pspec)
{
  FarsightRTPStream *self = FARSIGHT_RTP_STREAM (object);

  switch (prop_id) {
    case PROP_CONN_TIMEOUT:
      g_value_set_uint (value, self->priv->conn_timeout);
      break;
    case PROP_TRANSMITTER:
      g_value_set_string (value, self->priv->transmitter_name);
      break;
    case PROP_MIN_PTIME:
      g_value_set_int64 (value, self->priv->min_ptime);
      break;
    case PROP_MAX_PTIME:
      g_value_set_int64 (value, self->priv->max_ptime);
      break;
    case PROP_TRANSMITTER_OBJECT:
      g_object_ref (self->priv->transmitter);
      g_value_set_pointer (value, self->priv->transmitter);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static gboolean
farsight_rtp_stream_bus_watch_cb (GstBus *bus, GstMessage *message,
                                  gpointer user_data)
{
  FarsightStream *stream = FARSIGHT_STREAM (user_data);

  switch (GST_MESSAGE_TYPE (message)) {
    case GST_MESSAGE_EOS:
      {
      gchar *source_name = gst_object_get_name (message->src);
      g_debug ("%s (%d): end of stream on stream pipeline from %s",
          __FUNCTION__, __LINE__, source_name);
      g_free (source_name);
      farsight_stream_signal_error (stream,
                                    FARSIGHT_STREAM_ERROR_EOS, NULL);
      farsight_rtp_stream_stop (stream);
      break;
      }
    case GST_MESSAGE_ERROR:
      {
      gchar *debug = NULL;
      GError *err = NULL;

      gst_message_parse_error (message, &err, &debug);

      if (err == NULL) {
        g_warning (G_STRLOC "gst_message_parse_error returned err == NULL");

        farsight_stream_signal_error (stream,
                                      FARSIGHT_STREAM_ERROR_UNKNOWN, NULL);
      } else {
        g_warning (G_STRLOC ": error on stream pipeline. "
                   "Error code=%d message=%s",
                   err->code, err->message);
        g_error_free (err);
        g_debug (G_STRLOC ": Error: %s", debug);

        g_free (debug);

        if (err->domain == GST_RESOURCE_ERROR) {
          farsight_stream_signal_error (stream,
                                        FARSIGHT_STREAM_ERROR_RESOURCE, err->message);
        } else {
          farsight_stream_signal_error (stream,
                                        FARSIGHT_STREAM_ERROR_UNKNOWN, err->message);
        }
      }
      farsight_rtp_stream_stop (stream);
      break;
      }
    default:
      break;
  }

  return TRUE;
}

static gboolean
call_create_recv_pipeline (gpointer user_data)
{
  DepayloaderChainInfo *chain_info = (DepayloaderChainInfo *)user_data;

  g_debug ("%s (%d): calling create_recv_pipeline from main thread",
      __FUNCTION__, __LINE__);
  farsight_rtp_stream_create_new_pt_recv_pipeline(chain_info->stream,
      chain_info->pt, chain_info->pad);

  g_free (chain_info);

  return FALSE;
}

static void
rtpdemux_pad_blocked_callback (GstPad *pad, gboolean blocked,
    gpointer user_data)
{
  DepayloaderChainInfo *chain_info = (DepayloaderChainInfo *)user_data;
  FarsightRTPStream *self = FARSIGHT_RTP_STREAM (chain_info->stream);

  if (self->priv->stopping)
  {
    g_debug ("%s (%d): stream is stopping, canceling action",
        __FUNCTION__, __LINE__);
    g_free (chain_info);
    return;
  }

  if (blocked)
  {
    gint src_id;
    g_debug ("%s (%d): rtpdemux pad blocked, idle_adding create_recv_pipeline",
        __FUNCTION__, __LINE__);
    src_id = g_idle_add (call_create_recv_pipeline, user_data);
    g_array_append_val (self->priv->pending_src_ids, src_id);
  }
  else
  {
    g_free (chain_info);
  }
}

static void
farsight_rtp_stream_new_payload_type (GstElement *element, gint pt, 
                                      GstPad *pad, gpointer user_data)
{
  FarsightRTPStream *self = FARSIGHT_RTP_STREAM (user_data);
  /*CodecBlueprint *codec_blueprint;*/
  DepayloaderChainInfo *chain_info;

  /*
  codec_blueprint =
    lookup_codec_by_pt (
        farsight_stream_get_media_type (FARSIGHT_STREAM(stream)), pt);
  if (!codec_blueprint) {
    g_warning ("%s (%d): g_hash_table_lookup on %d return null", __FUNCTION__,
        __LINE__, pt);
    farsight_rtp_stream_stop (FARSIGHT_STREAM(stream));
    farsight_stream_signal_error (FARSIGHT_STREAM(stream), 
                                  FARSIGHT_STREAM_ERROR_UNKNOWN, NULL);
    return;
  }
  */

  chain_info = g_new0 (DepayloaderChainInfo, 1);
  chain_info->stream = (FarsightStream *)self;
  chain_info->pt = pt;
  chain_info->pad = pad;

  gst_object_ref (GST_OBJECT (pad));
  g_debug ("%s (%d): received stream with new pt, blocking rtpdemux",
      __FUNCTION__, __LINE__);
  gst_pad_set_blocked_async (pad, TRUE, rtpdemux_pad_blocked_callback,
      chain_info);
}

static void
farsight_rtp_stream_payload_type_change (GstElement *element, gint pt,
                                         gpointer user_data)
{
  g_debug ("%s (%d): received stream payload changed to %d",
      __FUNCTION__, __LINE__, pt);
}

static gboolean
cleanup_unique (FarsightMediaType media_type,
    FarsightStreamDirection direction, gint unique_id)
{
  GstElement *old_codec_bin;
  gboolean ret = TRUE;

  old_codec_bin = get_unique_bin (media_type, direction, unique_id);

  if (old_codec_bin) {
    GstElement *container =
        (GstElement*) gst_element_get_parent (old_codec_bin);
    GstPad *pad1 = NULL;
    GstState state;

    if (!container) {
      return FALSE;
    }

    pad1 = gst_element_get_pad (old_codec_bin, "sink");
    if (gst_pad_is_linked (pad1)) {
      GstPad *pad2 = gst_pad_get_peer (pad1);

      gst_pad_unlink (pad2, pad1);
      gst_object_unref (pad2);
    }
    gst_object_unref (pad1);

    gst_element_set_state (old_codec_bin, GST_STATE_NULL);
    gst_element_get_state (old_codec_bin, &state, NULL,
        GST_CLOCK_TIME_NONE);
    g_debug ("Removing bin");
    if (!gst_bin_remove (GST_BIN(container), old_codec_bin)) {
      gchar *name = gst_element_get_name(old_codec_bin);
      gchar *cname = gst_element_get_name(container);
      ret = FALSE;
      g_warning ("There was an error removing unique codec bin %s from container %s",
          name, cname);
      g_free (name);
      g_free (cname);
    }

    gst_object_unref (GST_OBJECT (container));
  }

  return ret;
}

/*
 * When called with a NULL pad, it will create the recv pipeline, but not link
 * it
 */

static void
farsight_rtp_stream_create_new_pt_recv_pipeline (FarsightStream *stream,
    gint id, GstPad *pad)
{
  FarsightRTPStream *self = FARSIGHT_RTP_STREAM (stream);
  gchar *name = NULL;
  GstElement *codec_bin = NULL;
  CodecAssociation *codec_association = NULL;

  codec_association = lookup_codec_by_pt (
      self->priv->negotiated_codec_associations, id);
  if (!codec_association) {
    g_warning ("Payload type %d not supported", id);
    goto error;
  }

  g_debug ("%s (%d): active PT change to %d", __FUNCTION__, __LINE__, id);

  if (!self->priv->pipeline)
  {
    g_warning ("%s (%d): Pipeline has dissappeared, not doing anything",
        __FUNCTION__, __LINE__);
    return;
  }

  /* Let's see if a recv codec bin for this codec already exists */
  name = g_strdup_printf ("recv%d", id);
  codec_bin = gst_bin_get_by_name (GST_BIN(self->priv->pipeline), name);
  g_free (name);

  if (!codec_bin) {
    gchar *padname;

    g_message ("%s (%d): setting up new pipeline for id=%d",
            __FUNCTION__, __LINE__, id);

    if (!cleanup_unique (farsight_stream_get_media_type (stream),
            DIR_RECV, codec_association->codec_blueprint->receive_has_unique)) {
      g_warning ("Could not unload unique codec");
      goto error;
    }


    codec_bin = create_codec_bin (self->priv->negotiated_codec_associations,
        id, DIR_RECV, NULL);
    if (!codec_bin)
    {
      g_warning ("Couldn't create elements, check your gstreamer installation");
      goto error;
    }
    gst_bin_add (GST_BIN (self->priv->pipeline), codec_bin);

    if (!pad) {
      gst_element_set_state (codec_bin, GST_STATE_READY);
      return;
    }

    self->priv->recv_codec_id = id;

    /* we emit now so we can set anything we need on the elements before they
     * are playing */
    farsight_stream_signal_codec_changed (stream, id);

    farsight_stream_signal_state_changed (stream,
        FARSIGHT_STREAM_STATE_CONNECTED,
        farsight_stream_get_current_direction (stream) |
        FARSIGHT_STREAM_DIRECTION_RECEIVEONLY);

    /* TODO why do I go to READY here? */
    gst_element_set_state (codec_bin, GST_STATE_READY);

    padname = gst_pad_get_name (pad);
    gst_element_link_pads (self->priv->rtpdemux, padname, GST_ELEMENT(codec_bin),
            "sink");

    /* TODO why do I go to PLAYING before linking a sink (if required)? */
    gst_element_set_state (codec_bin, GST_STATE_PLAYING);

    g_free (padname);

  } else {
    GstPad *codec_bin_sink_pad;
    
    if (!pad)
      return;

    codec_bin_sink_pad = gst_element_get_pad (codec_bin, "sink");

    if (!codec_bin_sink_pad) {
      g_error ("There is no sink pad on the codec_bin");
      goto error;
    }

    self->priv->recv_codec_id = id;

    if (gst_pad_is_linked (codec_bin_sink_pad)) {
      g_debug ("%s (%d): pipeline for id %d already configured, using it",
          __FUNCTION__, __LINE__, id);
    } else {
      g_debug ("%s (%d): Using pre-loaded element for codec %d",
          __FUNCTION__, __LINE__, id);
      gst_pad_link (pad, codec_bin_sink_pad);
      gst_element_set_state (codec_bin, GST_STATE_PLAYING);
    }
    gst_object_unref (GST_OBJECT (codec_bin));
    gst_object_unref (codec_bin_sink_pad);
  }

  if (!codec_association->codec_blueprint->has_sink)
  {
    if (self->priv->sink)
    {
      GstPad *sinkpad = gst_element_get_pad (self->priv->sink, "sink");
      if (!gst_pad_is_linked (sinkpad))
      {
        g_debug ("%s (%d): linking audio sink to codec bin",
            __FUNCTION__, __LINE__);

        if ((self->priv->main_pipeline && 
              gst_element_get_parent (self->priv->sink) == NULL)
            || !self->priv->main_pipeline)
        {
          gst_bin_add (GST_BIN (self->priv->pipeline), self->priv->sink);
        }
      }
      else
      {
        GstBin *old_codec_bin;

        g_debug ("%s (%d): relinking audio sink to new codec bin with pt %d",
            __FUNCTION__, __LINE__, id);

        name = g_strdup_printf ("recv%d", self->priv->recv_codec_id);
        old_codec_bin = GST_BIN (gst_bin_get_by_name (GST_BIN (self->priv->pipeline), name));
        g_free (name);

        /* TODO must block the src pad on the codec bin before unlinking or
         * pause it (look at part-block.txt) */

        gst_element_unlink (GST_ELEMENT (old_codec_bin), self->priv->sink);

        gst_object_unref (GST_OBJECT (old_codec_bin));
      }
      /* TODO do we really need to be READY before linking if the codec_bin is
       * not PLAYING ? */
      if (!gst_element_set_state (self->priv->sink, GST_STATE_READY))
      {
        g_warning ("%s: Sink state change to READY failed!", G_STRFUNC);
        goto error;
      }
      gst_element_link_filtered (GST_ELEMENT (codec_bin), self->priv->sink, self->priv->sink_filter);
      if (!gst_element_set_state (self->priv->sink, GST_STATE_PLAYING))
      {
        g_warning ("%s: Sink state change to PLAYING failed!", G_STRFUNC);
        goto error;
      }

      gst_object_unref (GST_OBJECT (sinkpad));
    }
    else
    {
      GstPad *codec_bin_src_pad;

      g_warning("received stream while sink unset, blocking recv pipeline");

      codec_bin_src_pad =
        gst_element_get_pad (codec_bin, "src");

      gst_pad_set_blocked_async (codec_bin_src_pad, TRUE,
          blocked_cb, (gpointer) __FUNCTION__);
    }
  }

  /* restart self pipeline */
  //gst_xml_write_file (GST_ELEMENT (self->priv->pipeline), stdout);
  g_debug ("%s: unblocking rtpdemux src pad", G_STRFUNC);
  gst_pad_set_blocked_async (pad, FALSE, blocked_cb,(gpointer)  __FUNCTION__);
  return;

error:
  {
    g_warning ("%s: PT change failed", G_STRFUNC);
    farsight_rtp_stream_stop (stream);
    farsight_stream_signal_error (stream, FARSIGHT_STREAM_ERROR_PIPELINE_SETUP,
        "Error creating new recv pipeline");
  }
}

static GstElement*
farsight_rtp_stream_get_pipeline (FarsightStream *stream)
{
  FarsightRTPStream *self = (FarsightRTPStream*) stream;

  if (self->priv->main_pipeline)
  {
    return self->priv->main_pipeline;
  }
  else
  {
    return self->priv->pipeline;
  }
}

static gboolean
farsight_rtp_stream_set_pipeline (FarsightStream *stream,
    GstElement *pipeline)
{
  FarsightRTPStream *self = (FarsightRTPStream*) stream;

  if (self->priv->main_pipeline)
  {
    g_warning ("Pipeline already set! Ignoring");
    return FALSE;
  }

  self->priv->main_pipeline = pipeline;
  return TRUE;
}

static void
farsight_rtp_stream_set_active_codec (FarsightStream *stream, gint id)
{
  /* TODO This dosen't work yet, need to make sure the audiosrc or whatever else is
   * used sends a new_segment after a reconnection, this is done by
   * overwriting the link/unlink functions
   * in the current case without the new segment the timestamp stops
   * incrementing 
   * Also make sure that if there is a user defined pipeline, we change the
   * ghost pad target to the new send_codec_bin */

  FarsightRTPStream *self = FARSIGHT_RTP_STREAM (stream);
  gchar *name;
  GstElement *codec_bin = NULL;
  GstElement *old_codec_bin = NULL;
  GstElement *new_codec_bin = NULL;
  CodecAssociation *codec_association = NULL;

  g_debug ("%s (%d): called to change codec from %d to %d", __FUNCTION__,
          __LINE__, farsight_rtp_stream_get_active_codec(stream), id);

  if (!self->priv->send_codec_bin)
    goto set_var;

  g_debug ("%s: this does not work yet, returning", __FUNCTION__);
  return;

  if (id == farsight_rtp_stream_get_active_codec(stream))
    return;

  /* Let's check if the given codec id is valid */
  codec_association = lookup_codec_by_pt (
      self->priv->negotiated_codec_associations, id);
  if (!codec_association)
  {
    g_message ("%s (%d): invalid codec id %d", 
              __FUNCTION__, __LINE__, id);
    return;
  }

  /* Let's make sure codec is in the remote_codec list */
  /* FIXME This will not work if the remote codecs are not set. But we usually
   * call set_active_codec before setting the remote codecs. This allows the
   * send pipeline creation code not to auto-chose a codec. We might just have
   * to trust the user to set an available active codec */
#if 0
  if (!self->priv->remote_codecs)
  {
    g_warning ("You need to set the remote codecs before changing the active"
        " codec");
    return;
  }

  GList *codec_list = self->priv->remote_codecs;
  FarsightCodec *cur_codec;
  do {
    cur_codec = codec_list->data;
    if (codec_association->codec->id < 96)
    {
      if (codec_association->codec->id == cur_codec->id)
      {
        break;
      }
    }
    else
    {
      if (g_ascii_strcasecmp (codec_association->codec->encoding_name,
            cur_codec->encoding_name) == 0)
      {
        break;
      }
    }

    codec_list = codec_list->next;
  } while (codec_list);

  if (!codec_list)
  {
    g_warning ("Called farsight_stream_set_active_codec() with codec (%d %s)"
        " not available in remote codec list! Ignoring", 
        id, codec_association->codec->encoding_name);
    return;
  }
#endif

  g_debug ("%s (%d): changing active send PT to %d", __FUNCTION__,
          __LINE__, id);

  /* Let's see if a send codec bin for this codec already exists
   * in shouldn't really exist... */
  name = g_strdup_printf ("send%d", id);
  codec_bin = gst_bin_get_by_name (GST_BIN(self->priv->pipeline), name);
  g_free (name);
  if (codec_bin) {
    gst_object_unref (GST_OBJECT (codec_bin));
    g_warning ("Send codec already exists for codec %d, this shouldn't happen",
        id);
    goto error;
  }

  /* Let's unlink the existing codec bin and delete it */
  name = g_strdup_printf ("send%d", self->priv->send_codec_id);
  old_codec_bin = gst_bin_get_by_name (GST_BIN(self->priv->pipeline), name);
  g_debug ("removing send codec bin %p on pipeline %p", codec_bin, self->priv->pipeline);
  g_free (name);
  if (!old_codec_bin)
  {
    g_warning ("Couldn't find current send codec for codec %d!",
        self->priv->send_codec_id);
    goto error;
  }

  /* TODO must block the src pad before unlinking
   * (look at part-block.txt) */
  gst_element_unlink (old_codec_bin, self->priv->rtpbin);
  if (self->priv->src)
  {
    gst_element_unlink (self->priv->src, old_codec_bin);
  }

  gst_bin_remove (GST_BIN (self->priv->pipeline), old_codec_bin);

  gst_object_unref (GST_OBJECT (old_codec_bin));

  self->priv->send_codec_bin = NULL;

  if (!cleanup_unique (farsight_stream_get_media_type (stream),
          DIR_SEND, codec_association->codec_blueprint->send_has_unique)) {
    g_warning ("Could not unload unique send codec");
    goto error;
  }

  /* Let's create a new send codec bin for this codec */
  new_codec_bin = create_codec_bin (self->priv->negotiated_codec_associations,
      id, DIR_SEND, self->priv->remote_codecs);
  if (!new_codec_bin)
  {
    g_warning ("Couldn't create elements, check your gstreamer installation");
    goto error;
  }

  g_debug ("adding send codec bin %p on pipeline %p", new_codec_bin, self->priv->pipeline);
  gst_bin_add (GST_BIN (self->priv->pipeline), new_codec_bin);
  /* now we link the src and rtpbin to it */
  gst_element_link (new_codec_bin, self->priv->rtpbin);
  if (self->priv->src)
  {
    gst_element_link_filtered (self->priv->src, new_codec_bin, self->priv->src_filter);
  }

  self->priv->send_codec_bin = new_codec_bin;

  bin_element_set_property (GST_BIN (self->priv->send_codec_bin),
      "min-ptime", self->priv->min_ptime, NULL);
  bin_element_set_property (GST_BIN (self->priv->send_codec_bin),
      "max-ptime", self->priv->max_ptime, NULL);

set_var:
  self->priv->send_codec_id = id;

  return;

error:
  farsight_rtp_stream_stop (stream);
  farsight_stream_signal_error (stream, FARSIGHT_STREAM_ERROR_PIPELINE_SETUP,
      "Error while changing send codec");
}

static gint
farsight_rtp_stream_get_active_codec (FarsightStream *stream)
{
  FarsightRTPStream *self = FARSIGHT_RTP_STREAM (stream);
  return self->priv->send_codec_id;
}

static void
blocked_cb (GstPad *pad, gboolean blocked,
    gpointer user_data)
{
  gchar *name = gst_pad_get_name (pad);
  if (blocked)
  {
    g_debug ("Pad %s blocked successfully for %s", name, (gchar *) user_data);
  }
  else
  {
    g_debug ("Pad %s unblocked successfully for %s", name, (gchar *) user_data);
  }
  gst_object_unref (GST_OBJECT (pad));
  g_free (name);
}

/* this function will be called when a block has succeeded, it will unlink the
 * pads and insert the new element if given in user_data.
 * Also works if the new element is a sink it implements part-block.txt on
 * dynamically switching element in PLAYING state.
 * It expects element4 to be in the correct bin
 * 
  .----------.      .----------.      .----------.
  | element1 |      | element2 |      | element3 |
 ...        src -> sink       src -> sink       ...
  '----------'      '----------'      '----------'
                    .----------.
                    | element4 |
                   sink       src
                    '----------'
*/
static void
unlink_and_replace (GstPad *element1_src_pad, gboolean blocked,
    gpointer user_data)
{
  GstPad *element2_sink_pad = NULL;
  GstElement *element4, *element2;

  g_debug ("%s: Blocked pad successfully, unlinking and replacing downstream", G_STRFUNC);

  element2_sink_pad = gst_pad_get_peer (element1_src_pad);

  if (element2_sink_pad == NULL)
  {
    return;
  }

  element4 = (GstElement *)user_data;
  element2 = gst_pad_get_parent_element (element2_sink_pad);

  if (element2 == NULL)
  {
    gst_object_unref (GST_OBJECT (element2_sink_pad));
    return;
  }

  gst_pad_unlink (element1_src_pad, element2_sink_pad);

  if (element4)
  {
    GstElement *element1 = NULL;
    GstPad *element2_src_pad = NULL;

    element1 = gst_pad_get_parent_element (element1_src_pad);
    element2_src_pad = gst_element_get_pad (element2, "src");
    if (element2_src_pad)
    {
      GstPad *element3_sink_pad = NULL;
      GstElement *element3 = NULL;
      element3_sink_pad = gst_pad_get_peer (element2_src_pad);
      element3 = gst_pad_get_parent_element (element3_sink_pad);
      gst_pad_unlink (element2_src_pad, element3_sink_pad);
      gst_element_link (element4, element3);
      gst_object_unref (GST_OBJECT (element3_sink_pad));
      gst_object_unref (GST_OBJECT (element3));
      gst_object_unref (GST_OBJECT (element2_src_pad));
    }
    gst_element_link (element1, element4);
    //gst_element_sync_state_with_parent (element4);
    gst_element_set_state (element4, GST_STATE_PLAYING);
    gst_object_unref (GST_OBJECT (element1));
    gst_pad_set_blocked_async (element1_src_pad, FALSE, blocked_cb,
        (gpointer) __FUNCTION__);
    gst_object_unref (GST_OBJECT (element4));
  }

  gst_object_unref (GST_OBJECT (element2_sink_pad));
  gst_object_unref (GST_OBJECT (element2));
}

struct _replace_sink_data
{
  FarsightRTPStream *self;
  GstElement *sink;
};

typedef struct _replace_sink_data replace_sink_data;

static void
unlink_and_replace_sink (GstPad *element1_src_pad, gboolean blocked,
    gpointer user_data)
{
  replace_sink_data *data;
  FarsightRTPStream *self;
  gchar *name = gst_pad_get_name (element1_src_pad);
  if (blocked)
  {
    g_debug ("%s: Pad %s blocked successfully", G_STRFUNC, name);
  }
  else
  {
    g_debug ("%s: Pad %s unblocked successfully", G_STRFUNC, name);
  }
  g_free (name);

  data = (replace_sink_data *)user_data;
  self = data->self;

  unlink_and_replace (element1_src_pad, blocked, data->sink);

  /* remove old sink and update references */
  if (gst_element_set_state (self->priv->sink, GST_STATE_NULL) ==
      GST_STATE_CHANGE_ASYNC)
  {
    g_debug ("waiting for state change");
    gst_element_get_state (self->priv->sink, NULL, NULL, GST_CLOCK_TIME_NONE);
    g_debug ("done");
  }
  if (!self->priv->main_pipeline)
  {
    gst_bin_remove (GST_BIN (self->priv->pipeline), self->priv->sink);
  }
  /* unref previous sink */
  gst_object_unref (self->priv->sink);
  /* set new sink */
  self->priv->sink = data->sink;
  g_free (data);
}


static void
farsight_rtp_stream_unlink_source (FarsightRTPStream *self)
{
  /* we want to safely unlink the source here */
  if (!self->priv->src)
  {
    return;
  }
  /* FIXME Decide how to properly unlink the source. If the source is a tee
   * we don't need to block it, we just unlink. But if the source is something
   * else, and we just unlink it, it might return a GST_FLOW_ERROR and stop the
   * whole pipeline. Problem with blocking the source is that blocking is done
   * async, and by the time we block+unlink the whole farsight stream might have
   * been unreffed. Doing a sync block might hang forever if the source stops
   * emitting buffers... For now I just unlink if there is a user defined
   * pipeline assuming it will be a tee. If there is no user defined pipeline
   * then there is no need to unlink the source since it will be NULLED along
   * the whole pipeline */
#if 0
  GstPad *codec_bin_sink_pad = gst_element_get_pad (self->priv->send_codec_bin, "sink");
  if (codec_bin_sink_pad == NULL)
  {
    g_warning ("Could not find sink pad on send codec bin! This should not"
        "happen! Source will not be unlinked properly");
    return;
  }
  /* this assumes the send codec bin is connected to the src */
  GstPad *src_src_pad = gst_pad_get_peer (codec_bin_sink_pad);
  gst_object_ref (GST_OBJECT (src_src_pad));
  gst_object_unref (GST_OBJECT (codec_bin_sink_pad));
  if (!src_src_pad)
  {
    return;
  }
  if (!gst_pad_set_blocked_async (src_src_pad, TRUE, unlink_and_replace, NULL))
  {
    g_warning ("Trying to block an already blocked pad!");
  }
  else
  {
    g_debug ("Waiting for source src pad to block");
  }
#endif
  if (self->priv->main_pipeline)
  {
    GstPad *tee_src;
    GstPad *pad = gst_element_get_pad (self->priv->pipeline, "sink");
    if (!pad)
    {
      g_debug("The pipeline doesn't yet have a sink pad");
      return;
    }
    tee_src = gst_pad_get_peer (pad);
    if (!tee_src)
    {
      g_debug ("Nothing linked to pipeline sink");
      return;
    }
    g_debug ("unlinking source");
    if (self->priv->sending)
    {
      g_debug ("waiting for pad to block");
      gst_object_ref (tee_src);
      gst_pad_set_blocked_async (tee_src, TRUE, blocked_cb,
          (gpointer) __FUNCTION__);
      if (gst_pad_is_blocking (tee_src)) {
      g_debug ("pad blocked, unlinking");
      } else {
        g_debug ("pad is not blocked, the next unlink may crash farsight");
      }
    }
    gst_element_unlink (self->priv->src, self->priv->pipeline);
    if (self->priv->sending)
    {
      g_debug ("unblocking source");
      gst_object_ref (tee_src);
      gst_pad_set_blocked_async (tee_src, FALSE, blocked_cb,
          (gpointer) __FUNCTION__);
      g_debug ("done unblocking");
    }
    /*
    const gchar *longname = gst_element_factory_get_longname
      (gst_element_get_factory (self->priv->src));
    if (g_ascii_strcasecmp (longname, "Tee pipe fitting") == 0)
    {
      g_debug ("RELEASING REQUEST PAD");
      gst_element_release_request_pad (self->priv->src, tee_src);
    }
    */
    gst_object_unref (pad);
    gst_object_unref (tee_src);
  }
  /* gst_object_unref (GST_OBJECT (codec_bin_sink_pad)); */
}

static G_CONST_RETURN GList *
farsight_rtp_stream_get_local_codecs (FarsightStream *stream)
{
  FarsightRTPStream *self = FARSIGHT_RTP_STREAM (stream);

  if (!ensure_local_codecs (self))
    return NULL;

  return self->priv->local_codecs;
}

static gboolean
farsight_rtp_stream_set_remote_codecs (FarsightStream *stream,
    const GList *codecs)
{
  FarsightRTPStream *self = (FarsightRTPStream*) stream;
  GHashTable *new_negotiated_codec_associations = NULL;
  GList *new_negotiated_codecs = NULL;
  const GList *codec_list;
  GHashTable *old_pt_caps_table;

  if (!ensure_local_codecs (self))
    return FALSE;

  codec_list = codecs;
  do {
    FarsightCodec *codec = codec_list->data;
    g_debug ("remote_codec %s %d", codec->encoding_name, codec->clock_rate);
    codec_list = codec_list->next;
  } while (codec_list);

  new_negotiated_codec_associations = negotiate_codecs (codecs,
      self->priv->negotiated_codec_associations,
      self->priv->local_codec_associations,
      self->priv->local_codecs,
      &new_negotiated_codecs);

  if (!new_negotiated_codec_associations) {
    g_warning ("Codec negociation failed, there is no intersection between remote and local codecs");
    return FALSE;
  }

  /* we sort the negotiated codecs according to our priorities */
  /* TODO What is the correct behaviour?
   * it seems wrong to ignore the remote preferences, should they be included in
   * the reply? Should they be only kept in memory for the stream we send while
   * our reply has out preference?
   * GTalk's behaviour is to save our preference, send us what we prefer, but
   * in the intersection reply, he overwrites our preference with his. The
   * result is that we can get 2 different codecs being sent/recved. This won't
   * happen if we sort as done below. */

  /* Sorting codecs if EVIL */
  /* We should respect the remote order
   * We also have to find a way to prevent the remote guy from
   * sending us PCMA/U unless its absolutely required
   * Seems like GTalk no longer behaves like that
   */
  if (self->priv->codec_pref_list)
    sort_codecs (&self->priv->remote_codecs,self->priv->codec_pref_list);


  if (self->priv->remote_codecs)
    farsight_codec_list_destroy (self->priv->remote_codecs);
  self->priv->remote_codecs = farsight_codec_list_copy (codecs);

 if (self->priv->negotiated_codecs)
    g_list_free (self->priv->negotiated_codecs);
  self->priv->negotiated_codecs = new_negotiated_codecs;

  if (self->priv->negotiated_codec_associations)
    g_hash_table_destroy (self->priv->negotiated_codec_associations);
  self->priv->negotiated_codec_associations = new_negotiated_codec_associations;


  old_pt_caps_table = self->priv->pt_caps_table;

  self->priv->pt_caps_table =
      create_pt_caps_hashtable (self->priv->negotiated_codec_associations);

  if (self->priv->rtpbin)
  {
    g_object_set (G_OBJECT(self->priv->rtpbin), "pt-map", self->priv->pt_caps_table, NULL);
  }
  else
  {
    farsight_stream_signal_error (FARSIGHT_STREAM(self),
        FARSIGHT_STREAM_ERROR_UNKNOWN,
        "You need to run farsight_stream_prepare_transports() before setting "
        "the remote codecs");
    return FALSE;
  }


  if (old_pt_caps_table) {
    g_hash_table_destroy (old_pt_caps_table);
  }

  /* We can create the send pipeline now */
  farsight_rtp_stream_build_send_pipeline (self);

  farsight_rtp_stream_set_playing (self);

  return TRUE;
}

static GList *
farsight_rtp_stream_get_codec_intersection (FarsightStream *stream)
{
  FarsightRTPStream *self = FARSIGHT_RTP_STREAM (stream);

  if (self->priv->negotiated_codecs == NULL)
    return NULL;

  return farsight_codec_list_copy (self->priv->negotiated_codecs);
}

static void
farsight_rtp_stream_set_codec_preference_list (FarsightStream *stream,
    const GArray *codec_pref_list)
{
  FarsightRTPStream *self = (FarsightRTPStream*) stream;
  FarsightRTPStreamPrivate *priv = self->priv;

  g_return_if_fail (codec_pref_list->len);

  if (!ensure_local_codecs (self))
    return;

  /* let's free the previous list and make a copy of this one */
  if (priv->codec_pref_list)
  {
    g_array_free (priv->codec_pref_list, TRUE);
    priv->codec_pref_list = NULL;
  }

  if (codec_pref_list) {
    priv->codec_pref_list = g_array_sized_new (FALSE, FALSE, sizeof
        (FarsightCodecPreference), codec_pref_list->len);
    g_array_append_vals (priv->codec_pref_list, codec_pref_list->data,
        codec_pref_list->len);
    sort_codecs (&priv->local_codecs, priv->codec_pref_list);
  }
}

static gboolean
farsight_rtp_stream_set_source (FarsightStream *stream, GstElement *source)
{
  FarsightRTPStream *self = (FarsightRTPStream*) stream;

  if (!ensure_local_codecs (self))
    return FALSE;

  g_debug ("%s (%d): setting src", __FUNCTION__, __LINE__);
  if (self->priv->src)
  {
    if (self->priv->send_codec_bin)
    {
      g_warning ("Send pipeline already created,"
          "will attempt to replace while pipeline is running");
      /* TODO do I need to pause the pipeline? pause the src? just remove it and
       * replace a new one? */
    }
    gst_object_unref(self->priv->src);
  }
  self->priv->src = source;
  gst_object_ref(source);

  if (self->priv->build_send_pipeline)
  {
    farsight_rtp_stream_build_send_pipeline (self);
  }
  return TRUE;
}

static gboolean
farsight_rtp_stream_set_source_filter (FarsightStream *stream, GstCaps *filter)
{
  FarsightRTPStream *self = (FarsightRTPStream*) stream;
  g_debug ("%s (%d): setting source filter", __FUNCTION__, __LINE__);
  if (self->priv->src_filter)
    gst_caps_unref (self->priv->src_filter);
  self->priv->src_filter = filter;
  gst_caps_ref(filter);
  return TRUE;
}

static GstElement *
farsight_rtp_stream_get_source (FarsightStream *stream)
{
  FarsightRTPStream *self = (FarsightRTPStream*) stream;
  GstElement *codec_bin = NULL;
  GstIterator *iter;
  gchar *name;

  if (self->priv->src)
    return self->priv->src;

  g_return_val_if_fail (self->priv->pipeline != NULL, NULL);

  /* see if we have a src inside the pipeline */
  /* Let's see if a recv codec bin for this codec already exists */
  name = g_strdup_printf ("send%d", self->priv->send_codec_id);
  codec_bin = gst_bin_get_by_name (GST_BIN(self->priv->pipeline), name);
  g_free (name);

  if (codec_bin) {
    gboolean done = FALSE;
    iter = gst_bin_iterate_elements (GST_BIN(codec_bin));

    while (!done) {
      gpointer data;
      switch (gst_iterator_next (iter, &data)) {
        case GST_ITERATOR_OK:
          {
            GstElement *child;
            gboolean is_src;
            child = GST_ELEMENT_CAST (data);

            GST_OBJECT_LOCK (child);
            if (!GST_OBJECT_FLAG_IS_SET (child, GST_ELEMENT_IS_SINK) &&
                !child->numsinkpads) {
              is_src = TRUE;
            }
            else
            {
              is_src = FALSE;
            }
            GST_OBJECT_UNLOCK (child);

            gst_object_unref (child);

            if (is_src)
            {
              gst_iterator_free (iter);
              return child;
            }
            break;
          }
        case GST_ITERATOR_RESYNC:
          gst_iterator_resync (iter);
          break;
        case GST_ITERATOR_DONE:
          done = TRUE;
          break;
        case GST_ITERATOR_ERROR:
          g_assert_not_reached ();
          break;
      }
    }
    gst_iterator_free (iter);
    gst_object_unref (GST_OBJECT (codec_bin));
  }
  return NULL;
}

static gboolean
farsight_rtp_stream_set_sink (FarsightStream *stream, GstElement *sink)
{
  FarsightRTPStream *self = (FarsightRTPStream*) stream;
  GstElement *codec_bin;

  g_debug ("%s (%d): setting sink %s", __FUNCTION__, __LINE__,
      sink?gst_element_get_name (sink):"NULL");

  codec_bin = NULL;
  if (self->priv->pipeline)
  {
    gchar *name;
    name = g_strdup_printf ("recv%d", self->priv->recv_codec_id);
    codec_bin = gst_bin_get_by_name (GST_BIN(self->priv->pipeline), name);
    g_free (name);
  }

  if (codec_bin)
  {
    GstPad *codec_bin_src_pad =
      gst_element_get_pad (codec_bin, "src");

    /* no new sink */
    if (sink == NULL)
    {
      /* in this case we just want to block the recv bin and unlink the sink */
      replace_sink_data *user_data = g_new0 (replace_sink_data, 1);
      user_data->self = self;
      user_data->sink = NULL;

      g_debug ("%s: blocking codec_bin_src_pad and removing old sink", G_STRFUNC);
      gst_pad_set_blocked_async (codec_bin_src_pad, TRUE, unlink_and_replace_sink,
          NULL);
      self->priv->sink = NULL;
    }
    /* new sink provided */
    else
    {
      if (!self->priv->main_pipeline)
      {
        gst_bin_add (GST_BIN (self->priv->pipeline), sink);
      }
      /* in this case we just want to unblock the recv bin and connect the new
       * sink */
      if (self->priv->sink == NULL)
      {
        gst_element_set_state (sink, GST_STATE_READY);
        gst_element_link (codec_bin, sink);
        gst_element_set_state (sink, GST_STATE_PLAYING);
        g_debug ("%s: unblocking codec_bin_src_pad and setting new sink", G_STRFUNC);
        gst_pad_set_blocked_async (codec_bin_src_pad, FALSE, blocked_cb,
            "set_sink new, there was none");
        gst_object_ref (sink);
        self->priv->sink = sink;
      }
      else
      {
        replace_sink_data *user_data;
        /* we want to block, remove old sink, add new sink, link and unblock */
        g_debug ("%s: sink already present, replacing old one", G_STRFUNC);
        user_data = g_new0 (replace_sink_data, 1);
        user_data->self = self;
        user_data->sink = sink;
        gst_object_ref (sink);
        if (!gst_pad_set_blocked_async (codec_bin_src_pad, TRUE,
              unlink_and_replace_sink, user_data))
        {
          g_debug ("Recv pipeline already blocked, connecting new sink");
          gst_element_set_state (sink, GST_STATE_READY);
          gst_element_link (codec_bin, sink);
          gst_element_set_state (sink, GST_STATE_PLAYING);
          gst_pad_set_blocked_async (codec_bin_src_pad, FALSE, blocked_cb,
              "set_sink, new there was old");
          self->priv->sink = sink;
        }
      }
    }
    gst_object_unref (GST_OBJECT (codec_bin));
  }
  else
  {
    g_debug ("%s: No codec bin present, setting new sink for future use", G_STRFUNC);
    self->priv->sink = sink;
    if (sink)
    {
      gst_object_ref(sink);
    }
  }

  return TRUE;
}

static gboolean
farsight_rtp_stream_set_sink_filter (FarsightStream *stream, GstCaps *filter)
{
  FarsightRTPStream *self = (FarsightRTPStream*) stream;
  g_debug ("%s (%d): setting sink filter", __FUNCTION__, __LINE__);
  if (self->priv->sink_filter)
    gst_caps_unref (self->priv->sink_filter);
  self->priv->sink_filter = filter;
  gst_caps_ref(filter);
  return TRUE;
}

static GstElement *
farsight_rtp_stream_get_sink (FarsightStream *stream)
{
  FarsightRTPStream *self = (FarsightRTPStream*) stream;
  GstElement *codec_bin = NULL;
  GstIterator *iter;
  gchar *name;

  if (self->priv->sink)
    return self->priv->sink;

  /* see if we have a sink inside the pipeline */
  /* we need a pipeline in this case */
  if (!self->priv->pipeline)
  {
    return NULL;
  }

  /* Let's see if a recv codec bin for this codec already exists */
  name = g_strdup_printf ("recv%d", self->priv->recv_codec_id);
  codec_bin = gst_bin_get_by_name (GST_BIN(self->priv->pipeline), name);
  g_free (name);

  if (codec_bin) {
    gboolean done = FALSE;
    iter = gst_bin_iterate_elements (GST_BIN(codec_bin));

    while (!done) {
      gpointer data;

      switch (gst_iterator_next (iter, &data)) {
        case GST_ITERATOR_OK:
          {
            GstElement *child;
            gboolean is_sink;
            child = GST_ELEMENT_CAST (data);

            GST_OBJECT_LOCK (child);
            is_sink = GST_OBJECT_FLAG_IS_SET (child, GST_ELEMENT_IS_SINK);
            GST_OBJECT_UNLOCK (child);

            gst_object_unref (child);

            if (is_sink)
            {
              gst_iterator_free (iter);
              return child;
            }
            break;
          }
        case GST_ITERATOR_RESYNC:
          gst_iterator_resync (iter);
          break;
        case GST_ITERATOR_DONE:
          done = TRUE;
          break;
        case GST_ITERATOR_ERROR:
          g_assert_not_reached ();
          break;
      }
    }
    gst_iterator_free (iter);
    gst_object_unref (GST_OBJECT (codec_bin));
  }

  return NULL;
}

static gboolean
query_jb_stats (gpointer user_data)
{
  FarsightRTPStream *self = FARSIGHT_RTP_STREAM (user_data);
  GstPad *pad;
  GstQuery *query;
  GstStructure *structure;

  if (self->priv->rtpbin == NULL)
    return FALSE;

  structure = gst_structure_new ("jb-stats", NULL);
  query = gst_query_new_application (GST_QUERY_JB_STATS, structure);

  pad = gst_element_get_pad (self->priv->rtpbin, "rtpsink");

  if (gst_pad_query (pad, query)) {
    const GValue *val;
    char *field_name[5] = {
            "total-packets",
            "late-packets",
            "duplicate-packets",
            "times-overrun",
            "times-underrun" };
    gint i;

    g_print ("Jitterbuffers statisics:\n");
    for (i=0; i<5; i++) {
      val = gst_structure_get_value (structure, field_name[i]);
      if (val != NULL)
        g_print ("%s: %lld\n", field_name[i], g_value_get_uint64 (val));
    }
    g_print ("--------------\n");
  }

  gst_object_unref (pad);
  gst_query_unref (query);

  return TRUE;
}

/* this build the core of the pipeline, meaning RTPBin and rtpdemux. The
 * send/recv parts are added later */
static gboolean
farsight_rtp_stream_build_base_pipeline (FarsightRTPStream *self)
{
  GstElement *rtpbin = NULL;
  GstBus *pipe_bus;
  gchar *jb_stats_freq;

  g_return_val_if_fail (self != NULL, FALSE);

  g_debug ("%s (%d): creating core RTP pipeline", __FUNCTION__,
      __LINE__);
  /* 
   * build base pipeline 
   */
  if (self->priv->pipeline == NULL) {
    GstElement *transmitter_src = NULL;

    if (self->priv->main_pipeline)
    {
      /* Create a bin and a gst bus for it and set it on the bin */
      self->priv->pipeline = gst_bin_new (NULL);
      if (!self->priv->pipeline)
      {
        goto error;
      }
      /* since our bin is independant of the parent state changes, we need to
       * make sure we manage our own async state changes inside the bin */
      /* NOTE async-handling only exists since gst 10.13 */
      if (g_object_has_property (G_OBJECT (self->priv->pipeline),
            "async-handling"))
      {
        g_object_set (G_OBJECT (self->priv->pipeline), "async-handling", TRUE,
            NULL);
      }
      /* let's make our bin independant of the parent state changes */
      gst_element_set_locked_state (self->priv->pipeline, TRUE);
      gst_bin_add (GST_BIN (self->priv->main_pipeline), self->priv->pipeline);
      pipe_bus = gst_bus_new ();
      gst_element_set_bus (GST_ELEMENT_CAST (self->priv->pipeline), pipe_bus);
    }
    else
    {
      /* Create a pipeline and listen on it's bus */
      self->priv->pipeline = gst_pipeline_new ("pipeline");
      if (!self->priv->pipeline)
      {
        goto error;
      }
      pipe_bus = gst_pipeline_get_bus (GST_PIPELINE (self->priv->pipeline));
    }
    self->priv->bus_watch = gst_bus_add_watch (pipe_bus, farsight_rtp_stream_bus_watch_cb, self);
    gst_object_unref (pipe_bus);

    /* create rtpbin element */
    rtpbin = gst_element_factory_make ("rtpbin", NULL);
    if (!rtpbin)
    {
      g_warning ("Couldn't create rtpbin, check your gstreamer install");
      goto error;
    }
    self->priv->rtpbin = rtpbin;
    gst_bin_add (GST_BIN (self->priv->pipeline), rtpbin);

    g_object_set (G_OBJECT (rtpbin),
        "rtcp-support", FALSE,
        "pt-map", self->priv->pt_caps_table,
        NULL);

    switch(farsight_stream_get_media_type (FARSIGHT_STREAM (self))) {
      case FARSIGHT_MEDIA_TYPE_AUDIO:
        g_object_set (G_OBJECT (rtpbin), "queue-delay", 200, NULL);
        break;
      case FARSIGHT_MEDIA_TYPE_VIDEO:
        g_object_set (G_OBJECT (rtpbin), "queue-delay", 0, NULL);
        break;
    }

    /* create transmitter specific source element */
    g_object_set (G_OBJECT(rtpbin), "bypass-udp", TRUE, NULL);

    transmitter_src = farsight_transmitter_get_gst_src (self->priv->transmitter);

    gst_bin_add (GST_BIN (self->priv->pipeline), transmitter_src);
    g_debug("added transmitter_src %p to pipeline %p",
        transmitter_src, self->priv->pipeline);

    /* TODO HACK ALERT HACK ALERT */
    /* just adding a boggus destination for now */
    /* so that jrtplib releases the packets to someone */
    g_object_set (G_OBJECT (rtpbin), "destinations", "64.34.23.11:5000", NULL);

    /* link rtpbin to rtpdemux to be able to discover what codec 
     * should be used.
     * We don't actually connect the depayloaders decoders yet */
    self->priv->rtpdemux = gst_element_factory_make ("rtpdemux", NULL);
    if (!self->priv->rtpdemux)
    {
      g_warning ("Couldn't create rtpdemux, check your gstreamer install");
      goto error;
    }
    g_signal_connect (G_OBJECT (self->priv->rtpdemux), "new-payload-type",
        G_CALLBACK (farsight_rtp_stream_new_payload_type), self);
    g_signal_connect (G_OBJECT (self->priv->rtpdemux), "payload-type-change",
        G_CALLBACK (farsight_rtp_stream_payload_type_change), self);

    gst_bin_add (GST_BIN (self->priv->pipeline),
        self->priv->rtpdemux);
    if (!gst_element_link_pads (rtpbin, "src%d", self->priv->rtpdemux,
        "sink"))
    {
      g_warning ("Could not link rtpbin:src to rtpdemux:sink");
      goto error;
    }

    if (!gst_element_link_pads (transmitter_src, "src", rtpbin, "rtpsink"))
    {
      g_warning ("Could not link transmitter_src:src to rtpbin:rtpsink");
      goto error;
    }
  }
 
  /* How often should the jitterbuffer stats be queried and dumped */
  jb_stats_freq = getenv ("FS_JB_STATS_FREQUENCY");
  if (jb_stats_freq && GST_QUERY_JB_STATS_SUPPORTED)
  {
    gint freq = atoi (jb_stats_freq);

    self->priv->stats_timeout =
            g_timeout_add (freq * 1000, query_jb_stats, self);
  }

  if (self->priv->preload_recv_codec_id >= 0)
    farsight_rtp_stream_preload_receive_pipeline (FARSIGHT_STREAM(self),
        self->priv->preload_recv_codec_id);

  farsight_rtp_stream_set_playing (self);

  return TRUE;

error:
  g_warning ("%s (%d): error setting up core RTP pipeline",
      __FUNCTION__, __LINE__);

  if (self->priv->pipeline) {
    if (self->priv->main_pipeline && 
        gst_element_get_parent (self->priv->pipeline))
      gst_bin_remove (GST_BIN (self->priv->main_pipeline), 
          self->priv->pipeline);
    else
      gst_object_unref (GST_OBJECT (self->priv->pipeline));
    self->priv->pipeline = NULL;
  }
  farsight_stream_signal_error (FARSIGHT_STREAM(self),
          FARSIGHT_STREAM_ERROR_PIPELINE_SETUP,
          "Error setting up core RTP pipeline");
  return FALSE;
}

static gboolean
farsight_rtp_stream_has_dtmf (FarsightRTPStream *self)
{
  GList *codec_list;
  GstPluginFeature *pluginfeature = NULL;

  if (farsight_stream_get_media_type (FARSIGHT_STREAM (self)) !=
      FARSIGHT_MEDIA_TYPE_AUDIO)
    return FALSE;

  for (codec_list = self->priv->remote_codecs;
       codec_list;
       codec_list = g_list_next (codec_list)) {
    FarsightCodec *codec = codec_list->data;
    if (codec->media_type == FARSIGHT_MEDIA_TYPE_AUDIO &&
        !g_ascii_strcasecmp ("telephone-event", codec->encoding_name)) {
      g_debug ("Found audio/telephone-event with PT %d\n", codec->id);
      break;
    }
  }

  if (!codec_list) {
    /* Remote end does not adverstise telephone-event codec */
    g_debug ("Remote end does not have audio/telephone-event");
    return FALSE;
  }

  pluginfeature = gst_default_registry_find_feature("rtpdtmfsrc",
      GST_TYPE_ELEMENT_FACTORY);
  if (!pluginfeature) {
    g_message ("The rtpdtmfsrc element is not installed");
    goto no_plugin;
  }
  gst_object_unref (pluginfeature);

  pluginfeature = gst_default_registry_find_feature("rtpdtmfmux",
      GST_TYPE_ELEMENT_FACTORY);
  if (!pluginfeature) {
    g_message ("The rtpdtmfmux element is not installed");
    goto no_plugin;
  }
  gst_object_unref (pluginfeature);

  return TRUE;

 no_plugin:
  if (pluginfeature)
    g_object_unref (pluginfeature);

  return FALSE;
}

GstElement *
build_dtmf_rtpdtmfsrc (FarsightRTPStream *self)
{
  GList *codec_list;
  GstElement *dtmfsrc = NULL;
  guint pt;

  for (codec_list = self->priv->remote_codecs;
       codec_list;
       codec_list = g_list_next (codec_list)) {
    FarsightCodec *codec = codec_list->data;
    if (codec->media_type == FARSIGHT_MEDIA_TYPE_AUDIO &&
        !g_ascii_strcasecmp ("telephone-event", codec->encoding_name)) {
      pt = codec->id;
      break;
    }
  }

  if (!codec_list)
    return FALSE;

  dtmfsrc = gst_element_factory_make ("rtpdtmfsrc", "rtpdtmfsrc");
  if (!dtmfsrc) {
    g_warning ("Error creating rtpdtmfsrc element");
    goto error;
  }

  g_object_set (dtmfsrc,
      "pt", pt,
      "interval", 30,
      "packet-redundancy", 3, NULL);

  return dtmfsrc;

 error:
  if (dtmfsrc)
    gst_object_unref (dtmfsrc);

  return NULL;

}

gboolean
g_object_has_property (GObject *object, const gchar *property)
{
  GObjectClass *klass;

  klass = G_OBJECT_GET_CLASS (object);
  return NULL != g_object_class_find_property (klass, property);
}

static gboolean
farsight_rtp_stream_build_send_pipeline (FarsightRTPStream *self)
{
  CodecAssociation *codec_association = NULL;
  GstElement *codec_bin;
  GstElement *rtpmuxer = NULL;
  GstElement *rtpdtmfsrc = NULL;
  GstIterator *sinkiter;
  gboolean done = FALSE;
  gpointer item;
  GstPad *codec_sinkpad;
  GstElement *transmitter_sink = NULL;
  GstState parentState;
  GstState parentPending;
  GstStateChangeReturn ret;

  g_return_val_if_fail (self != NULL, FALSE);
  /* let's create the base pipeline if not already done so (prepare_transports
   * not called) */
  if (!self->priv->rtpbin || !self->priv->pipeline)
  {
    farsight_rtp_stream_build_base_pipeline (self);
  }

  g_return_val_if_fail (self->priv->rtpbin != NULL, FALSE);
  g_return_val_if_fail (self->priv->pipeline != NULL, FALSE);

  if (self->priv->send_codec_bin)
  {
    g_warning ("Send pipeline already created, will not recreate");
    return TRUE;
  }

  /* Let us automaticaly pick a codec if not set by the user */
  /* TODO farsight_rtp_stream_choose_codec() will pick the first codec from the
   * remote codec list that it finds in the local internal (rtpgstcodecs) list.
   * Should we actually pick the first codec in our list? If so, we need to use
   * the self->local_codecs list since the internal one is not sorted according
   * to codec_prefs */
  if (self->priv->send_codec_id == -1)
  {
    codec_association = farsight_rtp_stream_choose_codec (self);
    if (!codec_association)
      return FALSE;

    self->priv->send_codec_id = codec_association->codec->id;
  }
  else
  {
    /* let's get the CodecBlueprint for the selected PT */
    codec_association = lookup_codec_by_pt (
        self->priv->negotiated_codec_associations, self->priv->send_codec_id);
    if (!codec_association) {
      g_warning ("Codec %d not supported", self->priv->send_codec_id);
      goto error;
    }
  }

  g_message ("%s (%d): creating send pipeline with codec %d", __FUNCTION__,
      __LINE__, self->priv->send_codec_id);

  if (!self->priv->src && !codec_association->codec_blueprint->has_src)
  {
    g_message ("No source has been set yet, send pipeline build for later");
    self->priv->build_send_pipeline = TRUE;
    return FALSE;
  }

  /* add active remote candidate as destination if set */
  if (self->priv->active_remote_candidate)
  {
    farsight_rtp_stream_add_remote_candidate_to_rtpbin(self,
        self->priv->active_remote_candidate);
  }

  /*
   * sending part of pipeline
   */

  if (self->priv->src)
  {
    /* let's add the source to our bin if there is no user provided pipeline or
     * if the given source is not added to the user pipeline */
    if ((self->priv->main_pipeline && gst_element_get_parent (self->priv->src) == NULL)
        || !self->priv->main_pipeline)
      gst_bin_add (GST_BIN (self->priv->pipeline),
          self->priv->src);
  }


  /* Lets build a Muxer in the case we have to add DTMF or some other
   * secondary codec
   */
  rtpmuxer = gst_element_factory_make ("rtpdtmfmux", NULL);
  if (!rtpmuxer) {
    g_warning ("Error creating rtpdtmfmux element");
    goto error;
  }

  gst_bin_add (GST_BIN (self->priv->pipeline), rtpmuxer);

  if (!gst_element_link_pads (rtpmuxer, "src",
          self->priv->rtpbin, "sink%d")) {
    g_warning ("Could not link rtpmuxer and rtpbin\n");
    goto error;
  }

  if (!cleanup_unique (farsight_stream_get_media_type (FARSIGHT_STREAM (self)),
          DIR_SEND, codec_association->codec_blueprint->send_has_unique)) {
    g_warning ("Could not unload unique send codec");
    goto error;
  }

  /* build send part based on send_pipeline_factory */
  /* put all these elements in a bin */
  codec_bin = create_codec_bin (self->priv->negotiated_codec_associations,
      self->priv->send_codec_id, DIR_SEND, self->priv->remote_codecs);
  if (!codec_bin)
  {
    g_warning ("Couldn't create elements for codec %d", self->priv->send_codec_id);
    goto error;
  }

  gst_bin_add (GST_BIN (self->priv->pipeline), codec_bin);

  if (self->priv->src)
  {
    if ((GstElement *)gst_element_get_parent (self->priv->src) == self->priv->pipeline)
    {
      /* connect src to codec_bin */
      gchar *tmp_caps = gst_caps_to_string (self->priv->src_filter);
      g_debug ("linking src %p to codec bin %p with caps %s", self->priv->src, codec_bin,
          tmp_caps);
      g_free (tmp_caps);
      if (!gst_element_link_filtered (self->priv->src, codec_bin, self->priv->src_filter))
      {
        g_warning ("Could not link src to codec bin");
        goto error;
      }
    }
    else
    {
      /* codec_bin and source have different parents, let's use our ghostpad */
      /* let's create a ghostpad for our sink */
      GstPad *ghostpad;
      GstPad *codec_bin_sink_pad = gst_element_get_pad (codec_bin, "sink");
      ghostpad = gst_ghost_pad_new ("sink", codec_bin_sink_pad);
      if (gst_pad_is_active (codec_bin_sink_pad))
        gst_pad_set_active (ghostpad, TRUE);
      gst_element_add_pad (self->priv->pipeline, ghostpad);
      gst_element_link_filtered (self->priv->src, self->priv->pipeline,
          self->priv->src_filter);
      gst_object_unref (codec_bin_sink_pad);
    }
  }


  sinkiter = gst_element_iterate_src_pads (codec_bin);
  while (!done) {
    switch (gst_iterator_next (sinkiter, &item)) {
       case GST_ITERATOR_OK:
         codec_sinkpad = item;
         if (!gst_pad_is_linked (codec_sinkpad)) {
           gchar *padname = gst_pad_get_name (codec_sinkpad);
           GstCaps *caps = farsight_codec_to_gst_caps (
               codec_association->codec);

           if (!gst_pad_accept_caps (codec_sinkpad, caps)) {
             gst_caps_unref (caps);
             caps = NULL;
           }

           if (!gst_element_link_pads_filtered (codec_bin, padname , rtpmuxer,
                   "sink_%d", caps)) {
             g_error ("Can't link pad %s from codec_bin to rtpmuxer\n",
                      padname);
           }
           g_free (padname);
         }
         gst_object_unref (GST_OBJECT (item));
         break;
       case GST_ITERATOR_RESYNC:

         // ...rollback changes to items... ???
         gst_iterator_resync (sinkiter);
         break;
       case GST_ITERATOR_ERROR:

         g_error ("Something is wrong, can't iterate sink pads\n");
         done = TRUE;
         break;
       case GST_ITERATOR_DONE:

         done = TRUE;
         break;
     }
   }
   gst_iterator_free (sinkiter);



   if (farsight_rtp_stream_has_dtmf (self)) {

     rtpdtmfsrc = build_dtmf_rtpdtmfsrc (self);
     gst_bin_add (GST_BIN (self->priv->pipeline), rtpdtmfsrc);

     gst_element_link_pads (rtpdtmfsrc, "src" , rtpmuxer, "sink_%d");

     if (!rtpdtmfsrc) {
       g_warning ("Could not build dtmf source element\n");
     }

     gst_element_sync_state_with_parent (rtpdtmfsrc);
 
   }

  /* create transmitter specific sink element */
  transmitter_sink = farsight_transmitter_get_gst_sink
    (self->priv->transmitter);

  gst_bin_add (GST_BIN (self->priv->pipeline), transmitter_sink);

  if (!gst_element_link_pads (self->priv->rtpbin, "rtpsrc",
        transmitter_sink, "sink"))
  {
    g_warning ("Could not link rtpbin:rtpsrc to transmitter_sink:sink");
    goto error;
  }


  self->priv->send_codec_bin = codec_bin;

  bin_element_set_property (GST_BIN (self->priv->send_codec_bin),
      "min-ptime", self->priv->min_ptime, NULL);
  bin_element_set_property (GST_BIN (self->priv->send_codec_bin),
      "max-ptime", self->priv->max_ptime, NULL)
;
  self->priv->build_send_pipeline = FALSE;

  /* if user requested to block the pad, let's do it */
  if (self->priv->sending == FALSE)
  {
    self->priv->sending = TRUE;
    farsight_rtp_stream_set_sending (FARSIGHT_STREAM (self), FALSE);
  }

  /* Only sync the src if it exists and it was given to us as an element,
   * not if its part of the external pipeline (in that case, the state is
   * managed by the user
   */

  ret = gst_element_get_state(self->priv->pipeline, &parentState, &parentPending, (GstClockTime) 0);
  if (parentPending == GST_STATE_PLAYING ||
      (ret == GST_STATE_CHANGE_SUCCESS &&
       parentPending == GST_STATE_VOID_PENDING &&
          parentState == GST_STATE_PLAYING)) {
    GstClock *clock = NULL;

    if (parentState == GST_STATE_PLAYING &&
        self->priv->main_pipeline == NULL &&
        (clock = gst_element_provide_clock (codec_bin)) != NULL) {
      GstStateChangeReturn ret;
      gst_object_unref (clock);

      ret = gst_element_set_state (self->priv->pipeline, GST_STATE_PAUSED);
      if (ret == GST_STATE_CHANGE_FAILURE) {
        g_error ("Can't change state");
      } else if (ret == GST_STATE_CHANGE_ASYNC) {
        g_debug ("Waiting to go back to paused");
        gst_element_get_state (self->priv->pipeline, NULL, NULL,
            GST_CLOCK_TIME_NONE);
        g_debug ("now paused");
      }
      ret = gst_element_set_state (self->priv->pipeline, GST_STATE_PLAYING);
      if (ret == GST_STATE_CHANGE_FAILURE)
        g_error ("Can't change state");

    } else {
      gst_element_set_state (transmitter_sink, GST_STATE_PLAYING);
      gst_element_set_state (rtpmuxer, GST_STATE_PLAYING);
      gst_element_set_state (codec_bin, GST_STATE_PLAYING);
      if (self->priv->src &&
          ((GstElement *)gst_element_get_parent (self->priv->src) ==
              self->priv->pipeline))
        gst_element_set_state (self->priv->src, GST_STATE_PLAYING);
    }
  }


return TRUE;

error:
  g_warning ("%s (%d): error setting up send codec pipeline",
      __FUNCTION__, __LINE__);
  return FALSE;
}

static gboolean
farsight_rtp_stream_start (FarsightStream *stream)
{
  FarsightRTPStream *self = (FarsightRTPStream *) stream;

  g_return_val_if_fail (self != NULL, FALSE);
  if (self->priv->pipeline == NULL ||
      self->priv->rtpbin == NULL ||
      farsight_stream_get_state (stream) != FARSIGHT_STREAM_STATE_CONNECTED)
  {
    return FALSE;
  }

  return TRUE;
}


static void
farsight_rtp_stream_try_set_playing (FarsightRTPStream *self)
{
  guint src_id = 0;

  if (farsight_stream_get_state (FARSIGHT_STREAM (self)) ==
      FARSIGHT_STREAM_STATE_CONNECTED &&
      self->priv->remote_codecs != NULL &&
      self->priv->pipeline != NULL) {
    src_id = g_idle_add_full (G_PRIORITY_HIGH, farsight_rtp_stream_set_playing, self, NULL);
    g_array_append_val (self->priv->pending_src_ids, src_id);
  }
}

static gboolean
farsight_rtp_stream_set_playing (gpointer data)
{
  FarsightRTPStream *stream = FARSIGHT_RTP_STREAM (data);
  GstStateChangeReturn rv;

  if (stream->priv->pipeline == NULL ||
      stream->priv->remote_codecs == NULL ||
      farsight_stream_get_state (FARSIGHT_STREAM (stream)) !=
      FARSIGHT_STREAM_STATE_CONNECTED)
    return FALSE;

  g_debug("We are now PLAYING\n");

  rv = gst_element_set_state (stream->priv->pipeline, GST_STATE_PLAYING);

  g_debug("Set_state result was %d\n", rv);
  if (rv == GST_STATE_CHANGE_FAILURE) {
    g_warning("Setting the pipeline to playing returned failure\n");
  }

  return FALSE;
}


/**
 * Stop a #FarsightRTPStream instance.
 *
 * @param stream #FarsightRTPStream instance
 */
static void
farsight_rtp_stream_stop (FarsightStream *stream)
{
  FarsightRTPStream *self = (FarsightRTPStream *) stream;

  g_return_if_fail (stream != NULL);

  self->priv->stopping = TRUE;

  if (self->priv->timeout_src)
  {
    if (g_source_remove (self->priv->timeout_src) == FALSE) {
      g_warning("Tried to remove non-existent source %u\n", 
          self->priv->timeout_src);
    }  
    self->priv->timeout_src = 0;
  }

  if (self->priv->pipeline)
  {
    GstStateChangeReturn state_set_return;

    g_debug ("%s (%d): stopping media pipeline", __FUNCTION__, __LINE__);

    /* let's make sure we are not blocking the source */
    farsight_rtp_stream_set_sending(FARSIGHT_STREAM (self), TRUE);

    /* let's unlink from the source first */
    farsight_rtp_stream_unlink_source (self);

    /* let's unlink the sink now */
    if (self->priv->main_pipeline)
    {
      gchar *name = g_strdup_printf ("recv%d", self->priv->recv_codec_id);
      GstElement *codec_bin = gst_bin_get_by_name (GST_BIN(self->priv->pipeline), name);
      g_free (name);

      if (codec_bin && self->priv->sink)
      {
        g_debug ("unlinking sink");
        gst_element_unlink (codec_bin, self->priv->sink);
      }
      if (codec_bin)
      {
        gst_object_unref (GST_OBJECT (codec_bin));
      }
    }

    g_debug ("%s: Setting state to NULL", __FUNCTION__);
    state_set_return = gst_element_set_state (self->priv->pipeline, GST_STATE_NULL);
    g_debug ("%s: DONE Setting state to NULL returned %d", __FUNCTION__, state_set_return);

    if (state_set_return == GST_STATE_CHANGE_ASYNC)
    {
      GstStateChangeReturn state;
      g_debug ("%s: Getting state", __FUNCTION__);
      state = gst_element_get_state (self->priv->pipeline, NULL, NULL, 5 * GST_SECOND);
      g_debug ("%s: DONE Getting state", __FUNCTION__);
      switch(state)
      {
        case GST_STATE_CHANGE_FAILURE:
          g_warning("Unable to set pipeline to NULL! This could break the "
              "teardown");
          break;
        case GST_STATE_CHANGE_ASYNC:
          g_warning ("State change not finished, after 5 seconds. This could "
              "break the teardown");
          break;
        default:
          break;
      }
    }
    else if (state_set_return == GST_STATE_CHANGE_FAILURE)
    {
      g_warning ("State change unsuccessfull. This could break the teardown");
    }
    else if (state_set_return == GST_STATE_CHANGE_SUCCESS)
    {
      g_debug ("Changed pipeline state to NULL succesfully");
    }

    if (self->priv->bus_watch)
      g_source_remove (self->priv->bus_watch);

    if (!self->priv->main_pipeline)
    {
      gst_object_unref (GST_OBJECT (self->priv->pipeline));
    }
    else
    {
      gst_bin_remove (GST_BIN (self->priv->main_pipeline), self->priv->pipeline);
    }
    self->priv->pipeline = NULL;
    self->priv->rtpbin = NULL;
    self->priv->src = NULL;
    self->priv->sink = NULL;

    if (self->priv->stats_timeout)
      g_source_remove (self->priv->stats_timeout);
  }
  else
  {
    if (self->priv->src)
    {
      gst_object_unref (GST_OBJECT (self->priv->src));
      self->priv->src = NULL;
    }
    if (self->priv->sink)
    {
      gst_object_unref (GST_OBJECT (self->priv->sink));
      self->priv->sink = NULL;
    }
  }

  remove_pending_mainloop_sources (self->priv);

  if (self->priv->transmitter) {
    farsight_transmitter_stop (self->priv->transmitter);
    g_object_unref (G_OBJECT (self->priv->transmitter));
    self->priv->transmitter = NULL;
  }

  if (self->priv->use_upnp)
  {
    farsight_rtp_stream_upnp_close_ports (self);
#ifdef HAVE_CLINKC
    upnp_cp_shutdown();
#endif
  }

  farsight_stream_signal_state_changed (stream,
      FARSIGHT_STREAM_STATE_DISCONNECTED,
      FARSIGHT_STREAM_DIRECTION_NONE);
}

static gboolean
farsight_rtp_stream_set_sending (FarsightStream *stream, gboolean sending)
{
  FarsightRTPStream *self = (FarsightRTPStream *) stream;

  GstElement *codec_bin = NULL;
  GstPad *codec_bin_src_pad  = NULL;
  gchar *name = NULL;

  if (self->priv->sending == sending)
  {
    return TRUE;
  }

  self->priv->sending = sending;

  if (self->priv->pipeline == NULL)
  {
    g_warning ("No pipeline present, will set sending later");
    return FALSE;
  }

  name = g_strdup_printf ("send%d", self->priv->send_codec_id);
  codec_bin = gst_bin_get_by_name (GST_BIN(self->priv->pipeline), name);
  g_free (name);

  if (codec_bin)
  {
    codec_bin_src_pad = gst_element_get_pad (codec_bin, "src");
    if (!codec_bin_src_pad)
    {
      g_warning ("send codec has no source pad! This shouldn't happen");
      return FALSE;
    }
    gst_object_unref (GST_OBJECT (codec_bin));
  }
  else
  {
    g_message ("send codec bin not created yet, will set sending later");
    return TRUE;
  }

  if (sending)
  {
    g_debug ("Setting sending to %d", sending);
    gst_pad_set_blocked_async (codec_bin_src_pad, FALSE, blocked_cb,
        (gpointer) __FUNCTION__);

    /* only advertise sending if we're CONNECTED */
    if (farsight_stream_get_state (stream) == FARSIGHT_STREAM_STATE_CONNECTED)
      farsight_stream_signal_state_changed (stream,
          FARSIGHT_STREAM_STATE_CONNECTED,
          farsight_stream_get_current_direction (stream) |
          FARSIGHT_STREAM_DIRECTION_SENDONLY);
  }
  else
  {
    g_debug ("Setting sending on %d", sending);
    gst_pad_set_blocked_async (codec_bin_src_pad, TRUE, blocked_cb,
        (gpointer) __FUNCTION__);

    farsight_stream_signal_state_changed (stream,
        farsight_stream_get_state (stream),
        farsight_stream_get_current_direction (stream) &
        ~FARSIGHT_STREAM_DIRECTION_SENDONLY);
  }

  return TRUE;
}

CodecAssociation *
farsight_rtp_stream_choose_codec (FarsightRTPStream *self)
{
  GList *codec_item = NULL;
  CodecAssociation *ca = NULL;

  for (codec_item = self->priv->negotiated_codecs;
       codec_item;
       codec_item = g_list_next (codec_item)) {
    FarsightCodec *codec = codec_item->data;

    ca = lookup_codec_by_pt (
                self->priv->negotiated_codec_associations,
                codec->id);
    if (ca)
      return ca;
  }
  return NULL;
}

static void
farsight_rtp_stream_upnp_send_request(FarsightRTPStream *self,
    const FarsightTransportInfo *derived_trans)
{
#ifdef HAVE_CLINKC
  FarsightTransportInfo *trans = NULL;
  const GList *lp;
  GList *candidate_list = self->priv->local_candidates;


  g_debug ("Looking for local ip");

  for (lp = candidate_list; lp; lp = g_list_next (lp)) {
    trans = (FarsightTransportInfo *) lp->data;
    if (trans->type == FARSIGHT_CANDIDATE_TYPE_LOCAL)
    {
      if (trans->ip)
      {
        /* let's ignore loopback */
        if (g_ascii_strcasecmp(trans->ip, "127.0.0.1") != 0)
        {
          g_debug ("Found local_ip %s", trans->ip);
          /* open ports */
          upnp_open_port_all_igds (trans->ip, derived_trans->port, derived_trans->port,
              (derived_trans->proto == FARSIGHT_NETWORK_PROTOCOL_UDP)?
              IPPROTO_UDP:IPPROTO_TCP);
        }
      }
    }
  }
#endif
}

static void
farsight_rtp_stream_upnp_close_ports (FarsightRTPStream *self)
{
#ifdef HAVE_CLINKC
  FarsightTransportInfo *trans = NULL;
  const GList *lp;
  GList *candidate_list = self->priv->local_candidates;

  for (lp = candidate_list; lp; lp = g_list_next (lp)) {
    trans = (FarsightTransportInfo *) lp->data;
    if (trans->type == FARSIGHT_CANDIDATE_TYPE_DERIVED)
    {
      g_debug ("Found derived ip %s", trans->ip);
      /* close ports */
      upnp_close_port_all_igds (trans->port, 
          (trans->proto == FARSIGHT_NETWORK_PROTOCOL_UDP)?
            IPPROTO_UDP:IPPROTO_TCP);
    }
  }
#endif
}

static gboolean 
farsight_rtp_stream_candidate_exists (FarsightStream *stream, 
                                      const GList *candidate_list, 
                                      const GList *candidate)
{
  FarsightTransportInfo *trans = NULL;
  FarsightTransportInfo *trans2 = NULL;
  const GList *lp = NULL;
  const GList *lp2 = NULL;
  gint i = 0;

  if (candidate_list == NULL || candidate == NULL)
    return FALSE;

  /* we check ip and port */
  for (lp = candidate; lp; lp = g_list_next (lp)) {
    trans = (FarsightTransportInfo *) lp->data;

    for (lp2 = candidate_list; lp2; lp2 = g_list_next (lp2)) {
      trans2 = (FarsightTransportInfo *) lp2->data;
      if (farsight_transport_are_equal (trans, trans2))
        i++;
    }
  }

  if (i == g_list_length ((GList *)candidate))
    return TRUE;
  else if (i == 0)
    return FALSE;
  else
  {
    g_error("Candidate only partially exists (some components do), \
              this should not happen!");
    return FALSE;
  }
}

static void 
farsight_rtp_stream_new_native_candidate (gpointer transmitter, 
    const FarsightTransportInfo *candidate, gpointer stream)
{
  FarsightRTPStream *self = (FarsightRTPStream *) stream;
  FarsightTransportInfo *candidate_copy = NULL;
  GList *temp_list = NULL;

  g_debug ("Called farsight_rtp_stream_new_native_candidate");

  /* FIXME let's just add it to a GList for now until the whole
 * Candidate/component/transport issue is solved */
  temp_list = g_list_append (temp_list, (gpointer) candidate);
  if (farsight_rtp_stream_candidate_exists (stream, self->priv->local_candidates,
              temp_list))
  {
    g_message ("Native candidate already in list, not adding");
    g_list_free (temp_list);
    return;
  }
  else
  {
    g_list_free (temp_list);
    g_debug ("Native candidates found, adding to list");
    candidate_copy = farsight_transport_copy (candidate);
    self->priv->local_candidates = g_list_append(self->priv->local_candidates, 
        candidate_copy);
    farsight_stream_signal_new_native_candidate (stream,
            candidate_copy->candidate_id);

    if (self->priv->use_upnp)
    {
      /* TODO this assumes the local IP is found before the derived IP */
      /* if not it won't send any request since it needs the local ip */
      if (candidate_copy->type == FARSIGHT_CANDIDATE_TYPE_DERIVED)
      {
        farsight_rtp_stream_upnp_send_request (self, candidate_copy);
      }
    }
  }
}

static void
farsight_rtp_stream_native_candidates_prepared (gpointer transmitter, gpointer stream)
{
  farsight_stream_signal_native_candidates_prepared (stream);
}


static void farsight_rtp_stream_new_active_candidate_pair (gpointer transmitter, 
    const gchar *native_candidate_id, 
    const gchar *remote_candidate_id,
    gpointer stream)
{
  farsight_stream_signal_new_active_candidate_pair (stream, native_candidate_id,
      remote_candidate_id);
}

static void 
farsight_rtp_stream_transmitter_state_changed (gpointer transmitter,
    FarsightTransmitterState state, gpointer stream)
{
  FarsightRTPStream *self = FARSIGHT_RTP_STREAM (stream);

  g_message ("connect state changed to %d", state);

  g_return_if_fail(self->priv->disposed == FALSE);

  if (state == FARSIGHT_TRANSMITTER_STATE_CONNECTED)
    {
      GstStateChangeReturn rv;

      if (self->priv->timeout_src)
      {
        if (g_source_remove (self->priv->timeout_src) == FALSE) {
          g_warning("Tried to remove non-existent source %u", 
              self->priv->timeout_src);
        }
        self->priv->timeout_src = 0;
      }

      if (self->priv->sending)
        farsight_stream_signal_state_changed (stream,
          FARSIGHT_STREAM_STATE_CONNECTED,
          farsight_stream_get_current_direction (stream) |
          FARSIGHT_STREAM_DIRECTION_SENDONLY);
      else
        farsight_stream_signal_state_changed (stream,
          FARSIGHT_STREAM_STATE_CONNECTED,
          farsight_stream_get_current_direction (stream));

      if (self->priv->pipeline) {
        /* At this stage, we want to be able to receive data, so makes sure the
         * pipeline is playing */
        rv = gst_element_set_state (self->priv->pipeline, GST_STATE_PLAYING);
        g_debug ("Setting pipeline to PLAYING returned %d", rv);
      }
      farsight_rtp_stream_try_set_playing (self);
    }
  else if (state == FARSIGHT_TRANSMITTER_STATE_CONNECTING)
    {
      if (self->priv->timeout_src)
      {
        if (g_source_remove (self->priv->timeout_src) == FALSE) {
          g_warning("Tried to remove non-existent source %u",
              self->priv->timeout_src);
        }
        self->priv->timeout_src = 0;
      }
      /* Let us set a timer for a timeout on reestablishing a connection */
      self->priv->timeout_src = g_timeout_add (self->priv->conn_timeout * 1000,
          farsight_rtp_stream_connection_timed_out,
          self);

      /* farsight_rtp_stream_stop (FARSIGHT_STREAM (stream)); */
      /* What we really want here is some sort of _pause/_resume functionality
       * in farsight */
      farsight_stream_signal_state_changed (stream,
          FARSIGHT_STREAM_STATE_CONNECTING,
          farsight_stream_get_current_direction (stream));
    }
}

static void 
farsight_rtp_stream_transmitter_error (gpointer transmitter, gpointer stream)
{
  g_warning ("error from transmitter.");
  farsight_rtp_stream_stop (stream);
  farsight_stream_signal_error (stream,
          FARSIGHT_STREAM_ERROR_NETWORK, 
          "Network error from the transmitter");
}

static gboolean
farsight_rtp_stream_connection_timed_out (gpointer data)
{
  FarsightRTPStream *self = (FarsightRTPStream *) data;
 
  self->priv->timeout_src = 0;

  /* let's check if we are connected yet, throw an error if not */
  if (farsight_stream_get_state (FARSIGHT_STREAM(self)) ==
      FARSIGHT_STREAM_STATE_CONNECTING)
  {
    g_warning ("Could not establish a connection");
    farsight_stream_signal_error (FARSIGHT_STREAM(self),
        FARSIGHT_STREAM_ERROR_TIMEOUT, "Could not establish a connection");
    farsight_rtp_stream_stop (FARSIGHT_STREAM(self));
  }

  return FALSE;
}

/* This is to signal you to prepare the list of transport condidates,
 * e.g. start off any network probing, like STUN
 * */
static void 
farsight_rtp_stream_prepare_transports (FarsightStream *stream)
{
  FarsightRTPStream *self = (FarsightRTPStream *) stream;
  guint media_type;

  if (!ensure_local_codecs (self))
    return;

  if (self->priv->prepared)
    return;

  if (NULL == self->priv->transmitter)
    {
      g_warning ("%s: no transmitter created yet, exiting", G_STRFUNC);
      farsight_stream_signal_error (stream, FARSIGHT_STREAM_ERROR_UNKNOWN,
          "Create transmitter prior to calling prepare_transports()");
      return;
    }

  g_message ("Preparing transmitter");

  g_object_get (G_OBJECT (self), "media-type", &media_type, NULL);
  g_object_set (G_OBJECT (self->priv->transmitter), "media-type", media_type,
      NULL);

  /* connect callbacks to signals from the transmitter */
  g_signal_connect (G_OBJECT (self->priv->transmitter), "new-native-candidate",
      G_CALLBACK (farsight_rtp_stream_new_native_candidate), self);
  g_signal_connect (G_OBJECT (self->priv->transmitter), "native-candidates-prepared",
      G_CALLBACK (farsight_rtp_stream_native_candidates_prepared), self);
  g_signal_connect (G_OBJECT (self->priv->transmitter), "new-active-candidate-pair",
      G_CALLBACK (farsight_rtp_stream_new_active_candidate_pair), self);
  g_signal_connect (G_OBJECT (self->priv->transmitter), "connection-state-changed",
      G_CALLBACK (farsight_rtp_stream_transmitter_state_changed), self);
  g_signal_connect (G_OBJECT (self->priv->transmitter), "error",
      G_CALLBACK (farsight_rtp_stream_transmitter_error), self);

  farsight_transmitter_prepare (self->priv->transmitter);

  if(self->priv->timeout_src == 0) {
    /* Let us set a timer for a timeout on establishing a connection */
    self->priv->timeout_src = g_timeout_add (self->priv->conn_timeout * 1000,
        farsight_rtp_stream_connection_timed_out,
        self);
  }

  /* We can already create the core pipeline now */
  farsight_rtp_stream_build_base_pipeline (self);
}

static gboolean 
farsight_rtp_stream_add_remote_candidate_to_rtpbin (FarsightRTPStream *self, 
                                                    const gchar *remote_candidate_id)
{
  GList *remote_candidate;
  gchar *addr = NULL;
  FarsightTransportInfo *trans = NULL;
  const GList *lp;

  if (self->priv->rtpbin)
  {
    remote_candidate = farsight_transport_get_list_for_candidate_id
        (self->priv->remote_candidates, remote_candidate_id);
    if (remote_candidate == NULL)
      return FALSE;

    /* Find remote candidate that is of subtype RTP */
    for (lp = remote_candidate; lp; lp = g_list_next (lp)) {
      trans = (FarsightTransportInfo *) lp->data;
      if (g_ascii_strcasecmp(trans->proto_subtype, "RTP") == 0)
      {
        break;
      }
    }
    if (trans == NULL)
      return FALSE;
    /* Set the ip:port of that transport to rtpbin */
    /* TODO there should be a way to set alternative RTCP ip:port that are
     * not simply ip:port+1 */
    addr = g_strdup_printf ("%s:%d", trans->ip, trans->port);
    g_object_set (G_OBJECT (self->priv->rtpbin),
            "destinations", addr, NULL);

    g_free(addr);
    return TRUE;
  }
  return FALSE;
}

static gboolean 
farsight_rtp_stream_set_active_candidate_pair (FarsightStream *stream,
                                               const gchar *native_candidate_id, 
                                               const gchar *remote_candidate_id)
{
  FarsightRTPStream *self = (FarsightRTPStream *) stream;

  self->priv->active_native_candidate = g_strdup (native_candidate_id);
  self->priv->active_remote_candidate = g_strdup (remote_candidate_id);

  /* this will not work if the pipeline has not yet been created */
  if (!farsight_rtp_stream_add_remote_candidate_to_rtpbin (self,
              remote_candidate_id))
    return FALSE;

  return TRUE;
}

static GList *
farsight_rtp_stream_get_native_candidate (FarsightStream *stream, 
                                          const gchar *candidate_id)
{
  FarsightRTPStream *self = (FarsightRTPStream *) stream;
  GList *candidate = NULL;

  candidate = farsight_transport_get_list_for_candidate_id
          (self->priv->local_candidates, candidate_id);

  return candidate;
}

static G_CONST_RETURN GList *
farsight_rtp_stream_get_native_candidate_list (FarsightStream *stream)
{
  FarsightRTPStream *self = (FarsightRTPStream *) stream;
  return self->priv->local_candidates;
}

static void 
farsight_rtp_stream_set_remote_candidate_list (FarsightStream *stream, 
                                               const GList *remote_candidates)
{
  FarsightRTPStream *self = (FarsightRTPStream *) stream;

  self->priv->remote_candidates = farsight_transport_list_copy(remote_candidates);
}

static void 
farsight_rtp_stream_add_remote_candidate (FarsightStream *stream,
                                          const GList *remote_candidate)
{
  FarsightRTPStream *self = (FarsightRTPStream *) stream;
  FarsightTransportInfo *info = (FarsightTransportInfo *)remote_candidate->data;
  GList *rc_copy;

  g_debug ("%s (%d): adding remote candidate %s %d",
          __FUNCTION__, __LINE__, info->ip, info->port);

  rc_copy = farsight_transport_list_copy (remote_candidate);

  if (self->priv->remote_candidates == NULL)
  {
    self->priv->remote_candidates = rc_copy;
  }
  else
  {
    if (farsight_rtp_stream_candidate_exists (stream,
                self->priv->remote_candidates, rc_copy))
    {
      g_message ("Remote candidate already in list, not adding");
      return;
    }
    else
    {
      self->priv->remote_candidates = g_list_concat(self->priv->remote_candidates, 
                rc_copy);
      g_debug ("%s (%d): Added remote candidate",
                __FUNCTION__, __LINE__);
    }
  }

  if (self->priv->transmitter)
  {
    farsight_transmitter_add_remote_candidates (self->priv->transmitter, 
        (const GList *)rc_copy);
  }
}


static gboolean
farsight_rtp_stream_start_telephony_event (FarsightStream *self,
                                           guint8 ev,
                                           guint8 volume,
                                           FarsightStreamDTMFMethod method)
{
  GstStructure *structure = NULL;
  GstEvent *event = NULL;
  FarsightRTPStream *rtpself = FARSIGHT_RTP_STREAM (self);
  gchar *method_str;

  g_return_val_if_fail (rtpself->priv->pipeline != NULL, FALSE);

  structure = gst_structure_new ("dtmf-event",
                                 "type", G_TYPE_INT, 1,
                                 "number", G_TYPE_INT, ev,
                                 "volume", G_TYPE_INT, volume,
                                 "start", G_TYPE_BOOLEAN, TRUE, NULL);

  if (method == FARSIGHT_DTMF_METHOD_AUTO) {
    GstElement *dtmfsrc = gst_bin_get_by_name (GST_BIN (rtpself->priv->pipeline),
                                               "rtpdtmfsrc");
    if (dtmfsrc) {
      gst_structure_set (structure,
          "method", G_TYPE_INT, FARSIGHT_DTMF_METHOD_RTP_RFC4733, NULL);
      gst_object_unref (dtmfsrc);
    } else {
      gst_structure_set (structure,
          "method", G_TYPE_INT, FARSIGHT_DTMF_METHOD_SOUND, NULL);
    }
  } else {
    gst_structure_set (structure, "method", G_TYPE_INT, method, NULL);
  }

  switch (method) {
    case FARSIGHT_DTMF_METHOD_AUTO:
      method_str = "default";
      break;
    case FARSIGHT_DTMF_METHOD_SOUND:
      method_str="sound";
      break;
    case FARSIGHT_DTMF_METHOD_RTP_RFC4733:
      method_str="RFC4733";
      break;
    default:
      method_str="other";
  }
  g_debug ("%s: sending telephony event %d using method=%s",
      G_STRFUNC, ev, method_str);

  event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, structure);

  return gst_element_send_event (rtpself->priv->pipeline, event);
}

static gboolean
farsight_rtp_stream_stop_telephony_event (FarsightStream *self,
                                          FarsightStreamDTMFMethod method)
{
  GstStructure *structure = NULL;
  GstEvent *event = NULL;
  FarsightRTPStream *rtpself = FARSIGHT_RTP_STREAM (self);

  g_return_val_if_fail (rtpself->priv->pipeline != NULL, FALSE);

  structure = gst_structure_new ("dtmf-event",
                                 "type", G_TYPE_INT, 1,
                                 "start", G_TYPE_BOOLEAN, FALSE, NULL);

  if (method != FARSIGHT_DTMF_METHOD_AUTO)
    gst_structure_set (structure, "method", G_TYPE_INT, method, NULL);

  event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, structure);

  return gst_element_send_event (rtpself->priv->pipeline, event);
}


static gboolean
farsight_rtp_stream_preload_receive_pipeline (FarsightStream *stream,
    gint payload_type)
{
  FarsightRTPStream *self = FARSIGHT_RTP_STREAM (stream);
  CodecAssociation *ca = NULL;
  GstElement *codec_bin = NULL;
  gchar *name = NULL;

  g_debug ("Trying to preload codec %d", payload_type);

  if (self->priv->recv_codec_id >= 0) {
    g_warning ("Tried to preload codec while receive codec already loaded");
    return FALSE;
  }
  self->priv->preload_recv_codec_id = payload_type;

  if (!self->priv->pipeline) {
    g_debug ("Pipeline not created yet, will preload later");
    return TRUE;
  }

  name = g_strdup_printf ("recv%d", payload_type);
  codec_bin = gst_bin_get_by_name (GST_BIN(self->priv->pipeline), name);
  g_free (name);

  if (codec_bin) {
    gst_object_unref (codec_bin);
    return TRUE;
  }

  ca = lookup_codec_by_pt (self->priv->negotiated_codec_associations,
      payload_type);

  if (!ca) {
    g_warning ("Tried to preload Codec that does not exist");
    return FALSE;
  }


  farsight_rtp_stream_create_new_pt_recv_pipeline (stream, payload_type, NULL);


  return TRUE;
}

Generated by  Doxygen 1.6.0   Back to index