Ignore:
Timestamp:
Dec 7, 2018, 1:28:55 AM (5 years ago)
Author:
Paul Brossier <piem@piem.org>
Branches:
feature/cnn, feature/crepe, feature/timestretch, fix/ffmpeg5, master
Children:
e7d4aa0
Parents:
4559863
git-author:
Paul Brossier <piem@piem.org> (11/28/16 14:30:40)
git-committer:
Paul Brossier <piem@piem.org> (12/07/18 01:28:55)
Message:

src/effects/timestretch*: move out threaded file reading stuff, add _push and _available

File:
1 edited

Legend:

Unmodified
Added
Removed
  • src/effects/timestretch_rubberband.c

    r4559863 r284fe8a  
    5151  smpl_t pitchscale;              /**< pitch scale */
    5252
    53   aubio_source_t *source;
    54   uint_t source_hopsize;          /**< hop size at which the source is read */
    55   fvec_t *in;
    56   uint_t eof;
    57 
    5853  RubberBandState rb;
    5954  RubberBandOptions rboptions;
    60 
    61   uint_t opened;
    62   const char_t *uri;
    63 #ifdef HAVE_THREADS
    64   pthread_t read_thread;
    65   pthread_mutex_t read_mutex;
    66   pthread_cond_t read_avail;
    67   pthread_cond_t read_request;
    68   pthread_t open_thread;
    69   pthread_mutex_t open_mutex;
    70   uint_t open_thread_running;
    71   sint_t available;
    72   uint_t started;
    73   uint_t finish;
    74 #endif
    7555};
    7656
     
    7858
    7959static void aubio_timestretch_warmup (aubio_timestretch_t * p);
    80 static sint_t aubio_timestretch_fetch(aubio_timestretch_t *p, uint_t fetch);
    81 #ifdef HAVE_THREADS
    82 static void *aubio_timestretch_readfn(void *p);
    83 static void *aubio_timestretch_openfn(void *z);
    84 #endif
    8560
    8661aubio_timestretch_t *
    87 new_aubio_timestretch (const char_t * uri, const char_t * mode,
    88     smpl_t stretchratio, uint_t hopsize, uint_t samplerate)
     62new_aubio_timestretch (const char_t * mode, smpl_t stretchratio, uint_t hopsize,
     63    uint_t samplerate)
    8964{
    9065  aubio_timestretch_t *p = AUBIO_NEW (aubio_timestretch_t);
    9166  p->hopsize = hopsize;
    92   //p->source_hopsize = 2048;
    93   p->source_hopsize = hopsize;
    9467  p->pitchscale = 1.;
    9568
     
    10881  }
    10982
    110   p->in = new_fvec(p->source_hopsize);
    111 
    112 #ifndef HAVE_THREADS
    113   if (aubio_timestretch_queue(p, uri, samplerate)) goto beach;
    114   aubio_timestretch_warmup(p);
    115 #else
    116   p->started = 0;
    117   p->finish = 0;
    118   p->open_thread_running = 0;
    119   //p->uri = uri;
    120   p->eof = 0;
    121   //p->samplerate = samplerate;
    122   //if (aubio_timestretch_open(p, uri, samplerate)) goto beach;
    123   pthread_mutex_init(&p->open_mutex, 0);
    124   pthread_mutex_init(&p->read_mutex, 0);
    125   pthread_cond_init (&p->read_avail, 0);
    126   pthread_cond_init (&p->read_request, 0);
    127   //AUBIO_WRN("timestretch: creating thread\n");
    128   pthread_create(&p->read_thread, 0, aubio_timestretch_readfn, p);
    129   //AUBIO_DBG("timestretch: new_ waiting for warmup, got %d available\n", p->available);
    130   pthread_mutex_lock(&p->read_mutex);
    131   aubio_timestretch_queue(p, uri, samplerate);
    132 #if 0
    133   pthread_cond_wait(&p->read_avail, &p->read_mutex);
    134   if (!p->opened) {
    135     goto beach;
    136   }
    137 #endif
    138   pthread_mutex_unlock(&p->read_mutex);
    139   //AUBIO_DBG("timestretch: new_ warm up success, got %d available\n", p->available);
    140 #endif
     83  p->rb = rubberband_new(samplerate, 1, p->rboptions, p->stretchratio, p->pitchscale);
     84  if (!p->rb) goto beach;
     85
     86  p->samplerate = samplerate;
     87
     88  //aubio_timestretch_warmup(p);
    14189
    14290  return p;
     
    14795}
    14896
    149 #define HAVE_OPENTHREAD 1
    150 //#undef HAVE_OPENTHREAD
    151 
    152 uint_t
    153 aubio_timestretch_queue(aubio_timestretch_t *p, const char_t* uri, uint_t samplerate)
    154 {
    155 #ifdef HAVE_THREADS
    156 #ifdef HAVE_OPENTHREAD
    157   if (p->open_thread_running) {
    158 #if 1
    159     if (pthread_cancel(p->open_thread)) {
    160       AUBIO_WRN("timestretch: cancelling open thread failed\n");
    161       return AUBIO_FAIL;
    162     } else {
    163       AUBIO_WRN("timestretch: previous open of '%s' cancelled\n", p->uri);
    164     }
    165     p->open_thread_running = 0;
    166 #else
    167     void *threadfn;
    168     if (pthread_join(p->open_thread, &threadfn)) {
    169       AUBIO_WRN("timestretch: failed joining existing open thread\n");
    170       return AUBIO_FAIL;
    171     }
    172 #endif
    173   }
    174   //AUBIO_WRN("timestretch: queueing %s\n", uri);
    175   //pthread_mutex_lock(&p->read_mutex);
    176   p->opened = 0;
    177   p->started = 0;
    178   p->available = 0;
    179   p->uri = uri;
    180   p->samplerate = samplerate;
    181   //AUBIO_WRN("timestretch: creating thread\n");
    182   pthread_create(&p->open_thread, 0, aubio_timestretch_openfn, p);
    183 #endif
    184   //pthread_mutex_unlock(&p->read_mutex);
    185   return AUBIO_OK;
    186 }
    187 
    188 uint_t
    189 aubio_timestretch_open(aubio_timestretch_t *p, const char_t* uri, uint_t samplerate)
    190 {
    191   uint_t err = AUBIO_FAIL;
    192   p->available = 0;
    193   pthread_mutex_lock(&p->open_mutex);
    194   p->open_thread_running = 1;
    195 #else
    196   uint_t err = AUBIO_FAIL;
    197 #endif
    198   p->opened = 0;
    199   if (p->source) del_aubio_source(p->source);
    200   p->source = new_aubio_source(uri, samplerate, p->source_hopsize);
    201   if (!p->source) goto fail;
    202   p->uri = uri;
    203   p->samplerate = aubio_source_get_samplerate(p->source);
    204   p->eof = 0;
    205 
    206   if (p->rb == NULL) {
    207     AUBIO_WRN("timestretch: creating with stretch: %.2f pitchscale: %.2f\n",
    208         p->stretchratio, p->pitchscale);
    209     p->rb = rubberband_new(p->samplerate, 1, p->rboptions, p->stretchratio, p->pitchscale);
    210     //rubberband_set_debug_level(p->rb, 10);
    211     rubberband_set_max_process_size(p->rb, p->source_hopsize);
    212   } else {
    213     if (samplerate != p->samplerate) {
    214       AUBIO_WRN("timestretch: samplerate change requested, but not implemented\n");
    215     }
    216     rubberband_reset(p->rb);
    217   }
    218   p->opened = 1;
    219   err = AUBIO_OK;
    220   goto unlock;
    221 fail:
    222   p->opened = 2;
    223   AUBIO_ERR("timestretch: opening %s failed\n", uri);
    224 unlock:
    225 #ifdef HAVE_THREADS
    226   p->open_thread_running = 0;
    227   pthread_mutex_unlock(&p->open_mutex);
    228   //AUBIO_WRN("timestretch: failed opening %s at %dHz\n", uri, samplerate);
    229 #endif
    230   return err;
    231 }
    232 
    233 #ifdef HAVE_THREADS
    234 void *
    235 aubio_timestretch_openfn(void *z) {
    236   aubio_timestretch_t *p = z;
    237   int oldtype;
    238   pthread_setcancelstate(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype);
    239   //AUBIO_WRN("timestretch: creating thread\n");
    240   void *ret;
    241   uint_t err = aubio_timestretch_open(p, p->uri, p->samplerate);
    242   ret = &err;
    243   pthread_exit(ret);
    244 }
    245 #endif
    246 
    247 uint_t
    248 aubio_timestretch_get_opened(aubio_timestretch_t *p)
    249 {
    250   if (p == NULL) return 0;
    251   else return p->opened;
    252 }
    253 
    254 #ifdef HAVE_THREADS
    255 void *
    256 aubio_timestretch_readfn(void *z)
    257 {
    258   aubio_timestretch_t *p = z;
    259   //AUBIO_WRN("timestretch: entering thread with %s at %dHz\n", p->uri, p->samplerate);
    260   while(1) { //p->available < (int)p->hopsize && p->eof != 1) {
    261     //AUBIO_WRN("timestretch: locking in readfn\n");
    262     pthread_mutex_lock(&p->read_mutex);
    263 #if 1
    264     if (p->opened == 2) {
    265       pthread_cond_signal(&p->read_avail);
    266     } else
    267     if (p->opened == 0) {
    268 #ifdef HAVE_OPENTHREAD
    269       //(!aubio_timestretch_open(p, p->uri, p->samplerate)) {
    270       void * threadfn;
    271       if (p->open_thread_running && pthread_join(p->open_thread, &threadfn)) {
    272         AUBIO_WRN("timestretch: failed to join opening thread %s at %dHz in thread "
    273             "(opened: %d, playing: %d, eof: %d)\n",
    274             p->uri, p->samplerate, p->opened, p->started, p->eof);
    275       }
    276 #else
    277       //AUBIO_WRN("timestretch: opening source %s\n", p->uri);
    278       if (!aubio_timestretch_open(p, p->uri, p->samplerate)) {
    279         AUBIO_WRN("timestretch: opened %s at %dHz in thread "
    280             "(opened: %d, playing: %d, eof: %d)\n",
    281             p->uri, p->samplerate, p->opened, p->started, p->eof);
    282         //pthread_cond_signal(&p->read_avail);
    283       } else {
    284         AUBIO_WRN("timestretch: failed opening %s, exiting thread\n", p->uri);
    285         //pthread_cond_signal(&p->read_avail);
    286         //pthread_mutex_unlock(&p->read_mutex);
    287         //goto end;
    288       }
    289 #endif
    290     } else
    291     if (!p->started && !p->eof) {
    292 #endif
    293       // fetch the first few samples and mark as started
    294       aubio_timestretch_warmup(p);
    295       pthread_cond_signal(&p->read_avail);
    296       //pthread_cond_wait(&p->read_request, &p->read_mutex);
    297       p->started = 1;
    298     } else if (!p->eof) {
    299       // fetch at least p->hopsize stretched samples
    300       p->available = aubio_timestretch_fetch(p, p->hopsize);
    301       // signal available frames
    302       pthread_cond_signal(&p->read_avail);
    303       if (p->eof != 1) {
    304         // the end of file was not reached yet, wait for the next read_request
    305         pthread_cond_wait(&p->read_request, &p->read_mutex);
    306       } else {
    307         // eof was reached, do not wait for a read request and mark as stopped
    308         p->started = 0;
    309       }
    310     } else {
    311       //pthread_cond_signal(&p->read_avail);
    312       pthread_cond_wait(&p->read_request, &p->read_mutex);
    313       //AUBIO_WRN("timestretch: finished idle in readfn\n");
    314       if (p->finish) pthread_exit(NULL);
    315     }
    316     //AUBIO_WRN("timestretch: unlocking in readfn\n");
    317     pthread_mutex_unlock(&p->read_mutex);
    318   }
    319 end:
    320   //AUBIO_WRN("timestretch: exiting readfn\n");
    321   pthread_exit(NULL);
    322 }
    323 #endif
    324 
    32597static void
    32698aubio_timestretch_warmup (aubio_timestretch_t * p)
     
    329101  //AUBIO_WRN("timestretch: warming-up\n");
    330102  unsigned int latency = MAX(p->hopsize, rubberband_get_latency(p->rb));
    331 #ifdef HAVE_THREADS
    332   p->available = aubio_timestretch_fetch(p, latency);
    333 #else
    334   aubio_timestretch_fetch(p, latency);
    335 #endif
    336   //AUBIO_WRN("timestretch: warmup got %d\n", latency);
     103  fvec_t *input = new_fvec(p->hopsize);
     104  while (aubio_timestretch_push(p, input, input->length) < (int)latency) {
     105    //sint_t available = aubio_timestretch_get_available(p);
     106    //AUBIO_WRN("timestretch: warmup got %d, latency: %d\n", available, latency);
     107  }
     108  del_fvec(input);
    337109}
    338110
     
    340112del_aubio_timestretch (aubio_timestretch_t * p)
    341113{
    342 #ifdef HAVE_THREADS
    343   void *threadfn;
    344   //AUBIO_WRN("timestretch: entering delete\n");
    345   if (p->open_thread_running) {
    346     if (pthread_cancel(p->open_thread)) {
    347       AUBIO_WRN("timestretch: cancelling open thread failed\n");
    348     }
    349     if (pthread_join(p->open_thread, &threadfn)) {
    350       AUBIO_WRN("timestretch: joining open thread failed\n");
    351     }
    352   }
    353   if (!p->opened) goto cleanup;
    354   pthread_mutex_lock(&p->read_mutex);
    355   p->finish = 1;
    356   pthread_cond_signal(&p->read_request);
    357   //pthread_cond_wait(&p->read_avail, &p->read_mutex);
    358   pthread_mutex_unlock(&p->read_mutex);
    359   if ((p->eof == 0) && (pthread_cancel(p->read_thread))) {
    360     AUBIO_WRN("timestretch: cancelling thread failed\n");
    361   }
    362   if (pthread_join(p->read_thread, &threadfn)) {
    363     AUBIO_WRN("timestretch: joining thread failed\n");
    364   }
    365   pthread_mutex_destroy(&p->read_mutex);
    366   pthread_cond_destroy(&p->read_avail);
    367   pthread_cond_destroy(&p->read_request);
    368 cleanup:
    369 #endif
    370   if (p->in) del_fvec(p->in);
    371   if (p->source) del_aubio_source(p->source);
    372114  if (p->rb) {
    373115    rubberband_delete(p->rb);
     
    451193
    452194sint_t
    453 aubio_timestretch_fetch(aubio_timestretch_t *p, uint_t length)
    454 {
    455   uint_t source_read = p->source_hopsize;
    456   if (p->source == NULL) {
    457     AUBIO_ERR("timestretch: trying to fetch on NULL source\n");
    458     return 0;
    459   }
    460   // read more samples from source until we have enough available or eof is reached
     195aubio_timestretch_push(aubio_timestretch_t *p, fvec_t *input, uint_t length)
     196{
     197  // push new samples to rubberband, return available
     198  int available;
     199  int eof = (input->length != length) ? 1 : 0;
     200  rubberband_process(p->rb, (const float* const*)&(input->data), length, eof);
     201  available = rubberband_available(p->rb);
     202  //AUBIO_WRN("timestretch: processed %d, %d available, eof: %d\n",
     203  //    length, available, eof);
     204  return available;
     205}
     206
     207sint_t
     208aubio_timestretch_get_available(aubio_timestretch_t *p) {
     209  return rubberband_available(p->rb);
     210}
     211
     212void
     213aubio_timestretch_do(aubio_timestretch_t * p, fvec_t * out, uint_t * read)
     214{
     215  // now retrieve the samples and write them into out->data
    461216  int available = rubberband_available(p->rb);
    462   while ((available < (int)length) && (p->eof == 0)) {
    463     aubio_source_do(p->source, p->in, &source_read);
    464     if (source_read < p->source_hopsize) {
    465       p->eof = 1;
    466     }
    467     rubberband_process(p->rb, (const float* const*)&(p->in->data), source_read, p->eof);
    468     available = rubberband_available(p->rb);
    469   }
    470   return available;
    471 }
    472 
    473 void
    474 aubio_timestretch_do (aubio_timestretch_t * p, fvec_t * out, uint_t * read)
    475 {
    476 #ifndef HAVE_THREADS
    477   int available = aubio_timestretch_fetch(p, p->hopsize);
    478 #else /* HAVE_THREADS */
    479   int available;
    480   pthread_mutex_lock(&p->read_mutex);
    481 #if 1
    482   if (!p->opened) {
    483     // this may occur if _do was was called while being opened
    484     //AUBIO_WRN("timestretch: calling _do before opening a file\n");
    485     pthread_cond_signal(&p->read_request);
    486     //available = 0;
    487     //pthread_cond_wait(&p->read_avail, &p->read_mutex);
    488     available = 0; //p->available;
    489   } else
    490 #endif
    491   if (p->eof != 1) {
    492     //AUBIO_WRN("timestretch: calling _do after opening a file\n");
    493     // signal a read request
    494     pthread_cond_signal(&p->read_request);
    495     // wait for an available signal
    496     pthread_cond_wait(&p->read_avail, &p->read_mutex);
    497     available = p->available;
    498   } else {
    499     available = rubberband_available(p->rb);
    500     //AUBIO_WRN("timestretch: reached eof (%d/%d)\n", p->hopsize, available);
    501   }
    502   pthread_mutex_unlock(&p->read_mutex);
    503 #endif /* HAVE_THREADS */
    504   // now retrieve the samples and write them into out->data
    505   if (available >= (int)p->hopsize) {
    506     rubberband_retrieve(p->rb, (float* const*)&(out->data), p->hopsize);
    507     *read = p->hopsize;
     217  if (available >= (int)out->length) {
     218    rubberband_retrieve(p->rb, (float* const*)&(out->data), out->length);
     219    *read = out->length;
    508220  } else if (available > 0) {
    509221    // this occurs each time the end of file is reached
    510222    //AUBIO_WRN("timestretch: short read\n");
    511223    rubberband_retrieve(p->rb, (float* const*)&(out->data), available);
     224    fvec_t zeros; zeros.length = out->length - available; zeros.data = out->data + available;
     225    fvec_zeros(&zeros);
    512226    *read = available;
    513227  } else {
     
    516230    *read = 0;
    517231  }
    518 #ifdef HAVE_THREADS
    519   //pthread_mutex_unlock(&p->read_mutex);
    520 #endif
    521 }
    522 
    523 uint_t
    524 aubio_timestretch_seek (aubio_timestretch_t *p, uint_t pos)
     232}
     233
     234uint_t
     235aubio_timestretch_reset(aubio_timestretch_t *p)
    525236{
    526237  uint_t err = AUBIO_OK;
    527 #if HAVE_THREADS
    528   if (p == NULL) {
    529     AUBIO_WRN("seeking but object not set yet (ignoring)\n");
    530     return AUBIO_FAIL;
    531   }
    532   pthread_mutex_lock(&p->read_mutex);
    533   if (p->open_thread_running) {
    534     //AUBIO_WRN("seeking but opening thread not completed yet (ignoring)\n");
    535     err = AUBIO_OK;
    536     goto beach;
    537   }
    538   if (!p->opened || !p->source) {
    539     //AUBIO_WRN("timestretch: seeking but source not opened yet (ignoring)\n");
    540     err = AUBIO_OK;
    541     goto beach;
    542   }
    543 #endif
    544   p->eof = 0;
    545238  if (p->rb) {
    546239    rubberband_reset(p->rb);
    547240  }
    548 #ifdef HAVE_THREADS
    549 #ifdef HAVE_OPENTHREAD
    550   pthread_mutex_lock(&p->open_mutex);
     241  return err;
     242}
     243
    551244#endif
    552 #endif
    553   if (p->source) {
    554     err = aubio_source_seek(p->source, pos);
    555   } else {
    556     AUBIO_WRN("timestretch: seeking but p->source not created?!\n");
    557     err = AUBIO_FAIL;
    558     goto beach;
    559   }
    560 #if HAVE_THREADS
    561   pthread_mutex_unlock(&p->open_mutex);
    562   p->available = 0;
    563   p->started = 1;
    564 beach:
    565   pthread_mutex_unlock(&p->read_mutex);
    566 #else
    567 beach:
    568 #endif
    569   return err;
    570 }
    571 
    572 #endif
Note: See TracChangeset for help on using the changeset viewer.