Changeset 4559863 for src/effects


Ignore:
Timestamp:
Dec 7, 2018, 1:28:55 AM (6 years ago)
Author:
Paul Brossier <piem@piem.org>
Branches:
feature/cnn, feature/crepe, feature/timestretch, fix/ffmpeg5, master
Children:
284fe8a
Parents:
bdb249b
git-author:
Paul Brossier <piem@piem.org> (10/03/16 11:20:57)
git-committer:
Paul Brossier <piem@piem.org> (12/07/18 01:28:55)
Message:

src/effects/timestretch_rubberband.c: improve threading

Location:
src/effects
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • src/effects/timestretch.h

    rbdb249b r4559863  
    167167uint_t aubio_timestretch_seek(aubio_timestretch_t * o, uint_t pos);
    168168
     169uint_t aubio_timestretch_queue (aubio_timestretch_t *p, const char_t *uri, uint_t samplerate);
     170
     171uint_t aubio_timestretch_get_opened (aubio_timestretch_t *p);
     172
    169173#ifdef __cplusplus
    170174}
  • src/effects/timestretch_rubberband.c

    rbdb249b r4559863  
    5959  RubberBandOptions rboptions;
    6060
     61  uint_t opened;
     62  const char_t *uri;
    6163#ifdef HAVE_THREADS
    6264  pthread_t read_thread;
     
    6466  pthread_cond_t read_avail;
    6567  pthread_cond_t read_request;
     68  pthread_t open_thread;
     69  pthread_mutex_t open_mutex;
     70  uint_t open_thread_running;
    6671  sint_t available;
    6772  uint_t started;
     
    7681#ifdef HAVE_THREADS
    7782static void *aubio_timestretch_readfn(void *p);
     83static void *aubio_timestretch_openfn(void *z);
    7884#endif
    7985
     
    8389{
    8490  aubio_timestretch_t *p = AUBIO_NEW (aubio_timestretch_t);
    85   p->samplerate = samplerate;
    8691  p->hopsize = hopsize;
    8792  //p->source_hopsize = 2048;
    8893  p->source_hopsize = hopsize;
    8994  p->pitchscale = 1.;
    90   p->eof = 0;
    91 
    92   p->source = new_aubio_source(uri, samplerate, p->source_hopsize);
    93   if (!p->source) goto beach;
    94   if (samplerate == 0 ) p->samplerate = aubio_source_get_samplerate(p->source);
    95 
    96   p->in = new_fvec(p->source_hopsize);
    9795
    9896  if (stretchratio <= MAX_STRETCH_RATIO && stretchratio >= MIN_STRETCH_RATIO) {
     
    110108  }
    111109
    112   p->rb = rubberband_new(p->samplerate, 1, p->rboptions, p->stretchratio, p->pitchscale);
    113   rubberband_set_max_process_size(p->rb, p->source_hopsize);
    114   //rubberband_set_debug_level(p->rb, 10);
    115 
    116 #ifdef HAVE_THREADS
     110  p->in = new_fvec(p->source_hopsize);
     111
     112#ifndef HAVE_THREADS
     113  if (aubio_timestretch_queue(p, uri, samplerate)) goto beach;
     114  aubio_timestretch_warmup(p);
     115#else
    117116  p->started = 0;
    118117  p->finish = 0;
     118  p->open_thread_running = 0;
     119  //p->uri = uri;
     120  p->eof = 0;
     121  //p->samplerate = samplerate;
     122  //if (aubio_timestretch_open(p, uri, samplerate)) goto beach;
     123  pthread_mutex_init(&p->open_mutex, 0);
    119124  pthread_mutex_init(&p->read_mutex, 0);
    120125  pthread_cond_init (&p->read_avail, 0);
     
    124129  //AUBIO_DBG("timestretch: new_ waiting for warmup, got %d available\n", p->available);
    125130  pthread_mutex_lock(&p->read_mutex);
     131  aubio_timestretch_queue(p, uri, samplerate);
     132#if 0
    126133  pthread_cond_wait(&p->read_avail, &p->read_mutex);
     134  if (!p->opened) {
     135    goto beach;
     136  }
     137#endif
    127138  pthread_mutex_unlock(&p->read_mutex);
    128139  //AUBIO_DBG("timestretch: new_ warm up success, got %d available\n", p->available);
    129 #else
    130   aubio_timestretch_warmup(p);
    131140#endif
    132141
     
    138147}
    139148
     149#define HAVE_OPENTHREAD 1
     150//#undef HAVE_OPENTHREAD
     151
     152uint_t
     153aubio_timestretch_queue(aubio_timestretch_t *p, const char_t* uri, uint_t samplerate)
     154{
     155#ifdef HAVE_THREADS
     156#ifdef HAVE_OPENTHREAD
     157  if (p->open_thread_running) {
     158#if 1
     159    if (pthread_cancel(p->open_thread)) {
     160      AUBIO_WRN("timestretch: cancelling open thread failed\n");
     161      return AUBIO_FAIL;
     162    } else {
     163      AUBIO_WRN("timestretch: previous open of '%s' cancelled\n", p->uri);
     164    }
     165    p->open_thread_running = 0;
     166#else
     167    void *threadfn;
     168    if (pthread_join(p->open_thread, &threadfn)) {
     169      AUBIO_WRN("timestretch: failed joining existing open thread\n");
     170      return AUBIO_FAIL;
     171    }
     172#endif
     173  }
     174  //AUBIO_WRN("timestretch: queueing %s\n", uri);
     175  //pthread_mutex_lock(&p->read_mutex);
     176  p->opened = 0;
     177  p->started = 0;
     178  p->available = 0;
     179  p->uri = uri;
     180  p->samplerate = samplerate;
     181  //AUBIO_WRN("timestretch: creating thread\n");
     182  pthread_create(&p->open_thread, 0, aubio_timestretch_openfn, p);
     183#endif
     184  //pthread_mutex_unlock(&p->read_mutex);
     185  return AUBIO_OK;
     186}
     187
     188uint_t
     189aubio_timestretch_open(aubio_timestretch_t *p, const char_t* uri, uint_t samplerate)
     190{
     191  uint_t err = AUBIO_FAIL;
     192  p->available = 0;
     193  pthread_mutex_lock(&p->open_mutex);
     194  p->open_thread_running = 1;
     195#else
     196  uint_t err = AUBIO_FAIL;
     197#endif
     198  p->opened = 0;
     199  if (p->source) del_aubio_source(p->source);
     200  p->source = new_aubio_source(uri, samplerate, p->source_hopsize);
     201  if (!p->source) goto fail;
     202  p->uri = uri;
     203  p->samplerate = aubio_source_get_samplerate(p->source);
     204  p->eof = 0;
     205
     206  if (p->rb == NULL) {
     207    AUBIO_WRN("timestretch: creating with stretch: %.2f pitchscale: %.2f\n",
     208        p->stretchratio, p->pitchscale);
     209    p->rb = rubberband_new(p->samplerate, 1, p->rboptions, p->stretchratio, p->pitchscale);
     210    //rubberband_set_debug_level(p->rb, 10);
     211    rubberband_set_max_process_size(p->rb, p->source_hopsize);
     212  } else {
     213    if (samplerate != p->samplerate) {
     214      AUBIO_WRN("timestretch: samplerate change requested, but not implemented\n");
     215    }
     216    rubberband_reset(p->rb);
     217  }
     218  p->opened = 1;
     219  err = AUBIO_OK;
     220  goto unlock;
     221fail:
     222  p->opened = 2;
     223  AUBIO_ERR("timestretch: opening %s failed\n", uri);
     224unlock:
     225#ifdef HAVE_THREADS
     226  p->open_thread_running = 0;
     227  pthread_mutex_unlock(&p->open_mutex);
     228  //AUBIO_WRN("timestretch: failed opening %s at %dHz\n", uri, samplerate);
     229#endif
     230  return err;
     231}
     232
     233#ifdef HAVE_THREADS
     234void *
     235aubio_timestretch_openfn(void *z) {
     236  aubio_timestretch_t *p = z;
     237  int oldtype;
     238  pthread_setcancelstate(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype);
     239  //AUBIO_WRN("timestretch: creating thread\n");
     240  void *ret;
     241  uint_t err = aubio_timestretch_open(p, p->uri, p->samplerate);
     242  ret = &err;
     243  pthread_exit(ret);
     244}
     245#endif
     246
     247uint_t
     248aubio_timestretch_get_opened(aubio_timestretch_t *p)
     249{
     250  if (p == NULL) return 0;
     251  else return p->opened;
     252}
     253
    140254#ifdef HAVE_THREADS
    141255void *
     
    143257{
    144258  aubio_timestretch_t *p = z;
     259  //AUBIO_WRN("timestretch: entering thread with %s at %dHz\n", p->uri, p->samplerate);
    145260  while(1) { //p->available < (int)p->hopsize && p->eof != 1) {
    146261    //AUBIO_WRN("timestretch: locking in readfn\n");
    147262    pthread_mutex_lock(&p->read_mutex);
     263#if 1
     264    if (p->opened == 2) {
     265      pthread_cond_signal(&p->read_avail);
     266    } else
     267    if (p->opened == 0) {
     268#ifdef HAVE_OPENTHREAD
     269      //(!aubio_timestretch_open(p, p->uri, p->samplerate)) {
     270      void * threadfn;
     271      if (p->open_thread_running && pthread_join(p->open_thread, &threadfn)) {
     272        AUBIO_WRN("timestretch: failed to join opening thread %s at %dHz in thread "
     273            "(opened: %d, playing: %d, eof: %d)\n",
     274            p->uri, p->samplerate, p->opened, p->started, p->eof);
     275      }
     276#else
     277      //AUBIO_WRN("timestretch: opening source %s\n", p->uri);
     278      if (!aubio_timestretch_open(p, p->uri, p->samplerate)) {
     279        AUBIO_WRN("timestretch: opened %s at %dHz in thread "
     280            "(opened: %d, playing: %d, eof: %d)\n",
     281            p->uri, p->samplerate, p->opened, p->started, p->eof);
     282        //pthread_cond_signal(&p->read_avail);
     283      } else {
     284        AUBIO_WRN("timestretch: failed opening %s, exiting thread\n", p->uri);
     285        //pthread_cond_signal(&p->read_avail);
     286        //pthread_mutex_unlock(&p->read_mutex);
     287        //goto end;
     288      }
     289#endif
     290    } else
    148291    if (!p->started && !p->eof) {
     292#endif
    149293      // fetch the first few samples and mark as started
    150       //AUBIO_WRN("timestretch: fetching first samples\n");
    151294      aubio_timestretch_warmup(p);
     295      pthread_cond_signal(&p->read_avail);
     296      //pthread_cond_wait(&p->read_request, &p->read_mutex);
    152297      p->started = 1;
    153298    } else if (!p->eof) {
     
    172317    pthread_mutex_unlock(&p->read_mutex);
    173318  }
     319end:
    174320  //AUBIO_WRN("timestretch: exiting readfn\n");
    175321  pthread_exit(NULL);
     
    181327{
    182328  // warm up rubber band
     329  //AUBIO_WRN("timestretch: warming-up\n");
    183330  unsigned int latency = MAX(p->hopsize, rubberband_get_latency(p->rb));
    184331#ifdef HAVE_THREADS
     
    187334  aubio_timestretch_fetch(p, latency);
    188335#endif
     336  //AUBIO_WRN("timestretch: warmup got %d\n", latency);
    189337}
    190338
     
    195343  void *threadfn;
    196344  //AUBIO_WRN("timestretch: entering delete\n");
     345  if (p->open_thread_running) {
     346    if (pthread_cancel(p->open_thread)) {
     347      AUBIO_WRN("timestretch: cancelling open thread failed\n");
     348    }
     349    if (pthread_join(p->open_thread, &threadfn)) {
     350      AUBIO_WRN("timestretch: joining open thread failed\n");
     351    }
     352  }
     353  if (!p->opened) goto cleanup;
    197354  pthread_mutex_lock(&p->read_mutex);
    198355  p->finish = 1;
     
    201358  pthread_mutex_unlock(&p->read_mutex);
    202359  if ((p->eof == 0) && (pthread_cancel(p->read_thread))) {
    203       AUBIO_WRN("timestretch: cancelling thread failed\n");
     360    AUBIO_WRN("timestretch: cancelling thread failed\n");
    204361  }
    205362  if (pthread_join(p->read_thread, &threadfn)) {
    206       AUBIO_WRN("timestretch: joining thread failed\n");
     363    AUBIO_WRN("timestretch: joining thread failed\n");
    207364  }
    208365  pthread_mutex_destroy(&p->read_mutex);
    209366  pthread_cond_destroy(&p->read_avail);
    210367  pthread_cond_destroy(&p->read_request);
     368cleanup:
    211369#endif
    212370  if (p->in) del_fvec(p->in);
     
    231389aubio_timestretch_set_stretch (aubio_timestretch_t * p, smpl_t stretch)
    232390{
     391  if (!p->rb) {
     392    AUBIO_WRN("timestretch: could not set stretch ratio, rubberband not created\n");
     393    return AUBIO_FAIL;
     394  }
    233395  if (stretch >= MIN_STRETCH_RATIO && stretch <= MAX_STRETCH_RATIO) {
    234396    p->stretchratio = stretch;
     
    250412aubio_timestretch_set_pitchscale (aubio_timestretch_t * p, smpl_t pitchscale)
    251413{
     414  if (!p->rb) {
     415    AUBIO_WRN("timestretch: could not set pitch scale, rubberband not created\n");
     416    return AUBIO_FAIL;
     417  }
    252418  if (pitchscale >= 0.0625  && pitchscale <= 4.) {
    253419    p->pitchscale = pitchscale;
     
    288454{
    289455  uint_t source_read = p->source_hopsize;
     456  if (p->source == NULL) {
     457    AUBIO_ERR("timestretch: trying to fetch on NULL source\n");
     458    return 0;
     459  }
    290460  // read more samples from source until we have enough available or eof is reached
    291461  int available = rubberband_available(p->rb);
     
    309479  int available;
    310480  pthread_mutex_lock(&p->read_mutex);
     481#if 1
     482  if (!p->opened) {
     483    // this may occur if _do was was called while being opened
     484    //AUBIO_WRN("timestretch: calling _do before opening a file\n");
     485    pthread_cond_signal(&p->read_request);
     486    //available = 0;
     487    //pthread_cond_wait(&p->read_avail, &p->read_mutex);
     488    available = 0; //p->available;
     489  } else
     490#endif
    311491  if (p->eof != 1) {
     492    //AUBIO_WRN("timestretch: calling _do after opening a file\n");
    312493    // signal a read request
    313494    pthread_cond_signal(&p->read_request);
    314495    // wait for an available signal
    315496    pthread_cond_wait(&p->read_avail, &p->read_mutex);
     497    available = p->available;
    316498  } else {
    317499    available = rubberband_available(p->rb);
    318   }
     500    //AUBIO_WRN("timestretch: reached eof (%d/%d)\n", p->hopsize, available);
     501  }
     502  pthread_mutex_unlock(&p->read_mutex);
    319503#endif /* HAVE_THREADS */
    320504  // now retrieve the samples and write them into out->data
     
    323507    *read = p->hopsize;
    324508  } else if (available > 0) {
     509    // this occurs each time the end of file is reached
     510    //AUBIO_WRN("timestretch: short read\n");
    325511    rubberband_retrieve(p->rb, (float* const*)&(out->data), available);
    326512    *read = available;
    327513  } else {
     514    // this may occur if the previous was a short read available == hopsize
    328515    fvec_zeros(out);
    329516    *read = 0;
    330517  }
    331518#ifdef HAVE_THREADS
    332   pthread_mutex_unlock(&p->read_mutex);
     519  //pthread_mutex_unlock(&p->read_mutex);
    333520#endif
    334521}
     
    339526  uint_t err = AUBIO_OK;
    340527#if HAVE_THREADS
     528  if (p == NULL) {
     529    AUBIO_WRN("seeking but object not set yet (ignoring)\n");
     530    return AUBIO_FAIL;
     531  }
    341532  pthread_mutex_lock(&p->read_mutex);
     533  if (p->open_thread_running) {
     534    //AUBIO_WRN("seeking but opening thread not completed yet (ignoring)\n");
     535    err = AUBIO_OK;
     536    goto beach;
     537  }
     538  if (!p->opened || !p->source) {
     539    //AUBIO_WRN("timestretch: seeking but source not opened yet (ignoring)\n");
     540    err = AUBIO_OK;
     541    goto beach;
     542  }
    342543#endif
    343544  p->eof = 0;
    344   rubberband_reset(p->rb);
    345   err = aubio_source_seek(p->source, pos);
     545  if (p->rb) {
     546    rubberband_reset(p->rb);
     547  }
     548#ifdef HAVE_THREADS
     549#ifdef HAVE_OPENTHREAD
     550  pthread_mutex_lock(&p->open_mutex);
     551#endif
     552#endif
     553  if (p->source) {
     554    err = aubio_source_seek(p->source, pos);
     555  } else {
     556    AUBIO_WRN("timestretch: seeking but p->source not created?!\n");
     557    err = AUBIO_FAIL;
     558    goto beach;
     559  }
    346560#if HAVE_THREADS
     561  pthread_mutex_unlock(&p->open_mutex);
    347562  p->available = 0;
    348563  p->started = 1;
     564beach:
    349565  pthread_mutex_unlock(&p->read_mutex);
     566#else
     567beach:
    350568#endif
    351569  return err;
Note: See TracChangeset for help on using the changeset viewer.