source: python/lib/aubio/cmd.py @ c95062b

feature/autosinkfeature/cnnfeature/cnn_orgfeature/constantqfeature/crepefeature/crepe_orgfeature/pitchshiftfeature/pydocstringsfeature/timestretchfix/ffmpeg5
Last change on this file since c95062b was c95062b, checked in by Paul Brossier <piem@piem.org>, 7 years ago

python/lib/aubio/cmd.py: use custom parser class

  • Property mode set to 100644
File size: 19.0 KB
Line 
1#! /usr/bin/env python
2# -*- coding: utf-8 -*-
3
4"""aubio command line tool
5
6This file was written by Paul Brossier <piem@aubio.org> and is released under
7the GNU/GPL v3.
8
9Note: this script is mostly about parsing command line arguments. For more
10readable code examples, check out the `python/demos` folder."""
11
12import sys
13import argparse
14import aubio
15
16def aubio_parser():
17    epilog = 'use "%(prog)s <command> --help" for more info about each command'
18    parser = argparse.ArgumentParser(epilog=epilog)
19    parser.add_argument('-V', '--version', help="show version",
20            action="store_true", dest="show_version")
21
22    subparsers = parser.add_subparsers(title='commands', dest='command',
23            parser_class= AubioArgumentParser,
24            metavar="")
25
26    parser_add_subcommand_help(subparsers)
27
28    parser_add_subcommand_onset(subparsers)
29    parser_add_subcommand_pitch(subparsers)
30    parser_add_subcommand_beat(subparsers)
31    parser_add_subcommand_tempo(subparsers)
32    parser_add_subcommand_notes(subparsers)
33    parser_add_subcommand_mfcc(subparsers)
34    parser_add_subcommand_melbands(subparsers)
35    parser_add_subcommand_quiet(subparsers)
36
37    return parser
38
39def parser_add_subcommand_help(subparsers):
40    # global help subcommand
41    subparsers.add_parser('help',
42            help='show help message',
43            formatter_class = argparse.ArgumentDefaultsHelpFormatter)
44
45def parser_add_subcommand_onset(subparsers):
46    # onset subcommand
47    subparser = subparsers.add_parser('onset',
48            help='estimate time of onsets (beginning of sound event)',
49            formatter_class = argparse.ArgumentDefaultsHelpFormatter)
50    subparser.add_input()
51    subparser.add_buf_hop_size()
52    helpstr = "onset novelty function"
53    helpstr += " <default|energy|hfc|complex|phase|specdiff|kl|mkl|specflux>"
54    subparser.add_method(helpstr=helpstr)
55    subparser.add_threshold()
56    subparser.add_silence()
57    subparser.add_minioi()
58    subparser.add_time_format()
59    subparser.add_verbose_help()
60    subparser.set_defaults(process=process_onset)
61
62def parser_add_subcommand_pitch(subparsers):
63    # pitch subcommand
64    subparser = subparsers.add_parser('pitch',
65            help='estimate fundamental frequency (monophonic)')
66    subparser.add_input()
67    subparser.add_buf_hop_size(buf_size=2048)
68    helpstr = "pitch detection method <default|yinfft|yin|mcomb|fcomb|schmitt>"
69    subparser.add_method(helpstr=helpstr)
70    subparser.add_threshold()
71    subparser.add_pitch_unit()
72    subparser.add_silence()
73    subparser.add_time_format()
74    subparser.add_verbose_help()
75    subparser.set_defaults(process=process_pitch)
76
77def parser_add_subcommand_beat(subparsers):
78    # beat subcommand
79    subparser = subparsers.add_parser('beat',
80            help='estimate location of beats')
81    subparser.add_input()
82    subparser.add_buf_hop_size(buf_size=1024, hop_size=512)
83    subparser.add_time_format()
84    subparser.add_verbose_help()
85    subparser.set_defaults(process=process_beat)
86
87def parser_add_subcommand_tempo(subparsers):
88    # tempo subcommand
89    subparser = subparsers.add_parser('tempo',
90            help='estimate overall tempo in bpm')
91    subparser.add_input()
92    subparser.add_buf_hop_size(buf_size=1024, hop_size=512)
93    subparser.add_time_format()
94    subparser.add_verbose_help()
95    subparser.set_defaults(process=process_tempo)
96
97def parser_add_subcommand_notes(subparsers):
98    # notes subcommand
99    subparser = subparsers.add_parser('notes',
100            help='estimate midi-like notes (monophonic)')
101    subparser.add_input()
102    subparser.add_buf_hop_size()
103    subparser.add_time_format()
104    subparser.add_verbose_help()
105    subparser.set_defaults(process=process_notes)
106
107def parser_add_subcommand_mfcc(subparsers):
108    # mfcc subcommand
109    subparser = subparsers.add_parser('mfcc',
110            help='extract Mel-Frequency Cepstrum Coefficients')
111    subparser.add_input()
112    subparser.add_buf_hop_size()
113    subparser.add_time_format()
114    subparser.add_verbose_help()
115    subparser.set_defaults(process=process_mfcc)
116
117def parser_add_subcommand_melbands(subparsers):
118    # melbands subcommand
119    subparser = subparsers.add_parser('melbands',
120            help='extract energies in Mel-frequency bands')
121    subparser.add_input()
122    subparser.add_buf_hop_size()
123    subparser.add_time_format()
124    subparser.add_verbose_help()
125    subparser.set_defaults(process=process_melbands)
126
127def parser_add_subcommand_quiet(subparsers):
128    # quiet subcommand
129    subparser = subparsers.add_parser('quiet',
130            help='extract timestamps of quiet and loud regions')
131    subparser.add_input()
132    subparser.add_hop_size()
133    subparser.add_silence()
134    subparser.add_time_format()
135    subparser.add_verbose_help()
136    subparser.set_defaults(process=process_quiet)
137
138class AubioArgumentParser(argparse.ArgumentParser):
139
140    def add_input(self):
141        self.add_argument("source_uri", default=None, nargs='?',
142                help="input sound file to analyse", metavar = "<source_uri>")
143        self.add_argument("-i", "--input", dest = "source_uri2",
144                help="input sound file to analyse", metavar = "<source_uri>")
145        self.add_argument("-r", "--samplerate",
146                metavar = "<freq>", type=int,
147                action="store", dest="samplerate", default=0,
148                help="samplerate at which the file should be represented")
149
150    def add_verbose_help(self):
151        self.add_argument("-v","--verbose",
152                action="count", dest="verbose", default=1,
153                help="make lots of noise [default]")
154        self.add_argument("-q","--quiet",
155                action="store_const", dest="verbose", const=0,
156                help="be quiet")
157
158    def add_buf_hop_size(self, buf_size=512, hop_size=256):
159        self.add_buf_size(buf_size=buf_size)
160        self.add_hop_size(hop_size=hop_size)
161
162    def add_buf_size(self, buf_size=512):
163        self.add_argument("-B","--bufsize",
164                action="store", dest="buf_size", default=buf_size,
165                metavar = "<size>", type=int,
166                help="buffer size [default=%d]" % buf_size)
167
168    def add_hop_size(self, hop_size=256):
169        self.add_argument("-H","--hopsize",
170                metavar = "<size>", type=int,
171                action="store", dest="hop_size", default=hop_size,
172                help="overlap size [default=%d]" % hop_size)
173
174    def add_method(self, method='default', helpstr='method'):
175        self.add_argument("-m","--method",
176                metavar = "<method>", type=str,
177                action="store", dest="method", default=method,
178                help="%s [default=%s]" % (helpstr, method))
179
180    def add_threshold(self, default=None):
181        self.add_argument("-t","--threshold",
182                metavar = "<threshold>", type=float,
183                action="store", dest="threshold", default=default,
184                help="threshold [default=%s]" % default)
185
186    def add_silence(self):
187        self.add_argument("-s", "--silence",
188                metavar = "<value>", type=float,
189                action="store", dest="silence", default=-70,
190                help="silence threshold")
191
192    def add_minioi(self, default="12ms"):
193        self.add_argument("-M", "--minioi",
194                metavar = "<value>", type=str,
195                action="store", dest="minioi", default=default,
196                help="minimum Inter-Onset Interval [default=%s]" % default)
197
198    def add_pitch_unit(self, default="Hz"):
199        help_str = "frequency unit, should be one of Hz, midi, bin, cent"
200        help_str += " [default=%s]" % default
201        self.add_argument("-u", "--pitch-unit",
202                metavar = "<value>", type=str,
203                action="store", dest="pitch_unit", default=default,
204                help=help_str)
205
206    def add_time_format(self):
207        helpstr = "select time values output format (samples, ms, seconds)"
208        helpstr += " [default=seconds]"
209        self.add_argument("-T", "--time-format",
210                 metavar='format',
211                 dest="time_format",
212                 default=None,
213                 help=helpstr)
214
215# some utilities
216
217def samples2seconds(n_frames, samplerate):
218    return "%f\t" % (n_frames / float(samplerate))
219
220def samples2milliseconds(n_frames, samplerate):
221    return "%f\t" % (1000. * n_frames / float(samplerate))
222
223def samples2samples(n_frames, _samplerate):
224    return "%d\t" % n_frames
225
226def timefunc(mode):
227    if mode is None or mode == 'seconds' or mode == 's':
228        return samples2seconds
229    elif mode == 'ms' or mode == 'milliseconds':
230        return samples2milliseconds
231    elif mode == 'samples':
232        return samples2samples
233    else:
234        raise ValueError("invalid time format '%s'" % mode)
235
236# definition of processing classes
237
238class default_process(object):
239    def __init__(self, args):
240        if 'time_format' in args:
241            self.time2string = timefunc(args.time_format)
242        if args.verbose > 2 and hasattr(self, 'options'):
243            name = type(self).__name__.split('_')[1]
244            optstr = ' '.join(['running', name, 'with options', repr(self.options), '\n'])
245            sys.stderr.write(optstr)
246    def flush(self, frames_read, samplerate):
247        # optionally called at the end of process
248        pass
249
250    def parse_options(self, args, valid_opts):
251        # get any valid options found in a dictionnary of arguments
252        options = {k :v for k,v in vars(args).items() if k in valid_opts}
253        self.options = options
254
255    def remap_pvoc_options(self, options):
256        # FIXME: we need to remap buf_size to win_s, hop_size to hop_s
257        # adjust python/ext/py-phasevoc.c to understand buf_size/hop_size
258        if 'buf_size' in options:
259            options['win_s'] = options['buf_size']
260            del options['buf_size']
261        if 'hop_size' in options:
262            options['hop_s'] = options['hop_size']
263            del options['hop_size']
264        self.options = options
265
266class process_onset(default_process):
267    valid_opts = ['method', 'hop_size', 'buf_size', 'samplerate']
268    def __init__(self, args):
269        self.parse_options(args, self.valid_opts)
270        self.onset = aubio.onset(**self.options)
271        if args.threshold is not None:
272            self.onset.set_threshold(args.threshold)
273        if args.minioi:
274            if args.minioi.endswith('ms'):
275                self.onset.set_minioi_ms(float(args.minioi[:-2]))
276            elif args.minioi.endswith('s'):
277                self.onset.set_minioi_s(float(args.minioi[:-1]))
278            else:
279                self.onset.set_minioi(int(args.minioi))
280        if args.silence:
281            self.onset.set_silence(args.silence)
282        super(process_onset, self).__init__(args)
283    def __call__(self, block):
284        return self.onset(block)
285    def repr_res(self, res, _frames_read, samplerate):
286        if res[0] != 0:
287            outstr = self.time2string(self.onset.get_last(), samplerate)
288            sys.stdout.write(outstr + '\n')
289
290class process_pitch(default_process):
291    valid_opts = ['method', 'hop_size', 'buf_size', 'samplerate']
292    def __init__(self, args):
293        self.parse_options(args, self.valid_opts)
294        self.pitch = aubio.pitch(**self.options)
295        if args.pitch_unit is not None:
296            self.pitch.set_unit(args.pitch_unit)
297        if args.threshold is not None:
298            self.pitch.set_tolerance(args.threshold)
299        if args.silence is not None:
300            self.pitch.set_silence(args.silence)
301        super(process_pitch, self).__init__(args)
302    def __call__(self, block):
303        return self.pitch(block)
304    def repr_res(self, res, frames_read, samplerate):
305        fmt_out = self.time2string(frames_read, samplerate)
306        sys.stdout.write(fmt_out + "%.6f\n" % res[0])
307
308class process_beat(default_process):
309    valid_opts = ['method', 'hop_size', 'buf_size', 'samplerate']
310    def __init__(self, args):
311        self.parse_options(args, self.valid_opts)
312        self.tempo = aubio.tempo(**self.options)
313        super(process_beat, self).__init__(args)
314    def __call__(self, block):
315        return self.tempo(block)
316    def repr_res(self, res, _frames_read, samplerate):
317        if res[0] != 0:
318            outstr = self.time2string(self.tempo.get_last(), samplerate)
319            sys.stdout.write(outstr + '\n')
320
321class process_tempo(process_beat):
322    def __init__(self, args):
323        super(process_tempo, self).__init__(args)
324        self.beat_locations = []
325    def repr_res(self, res, _frames_read, samplerate):
326        if res[0] != 0:
327            self.beat_locations.append(self.tempo.get_last_s())
328    def flush(self, frames_read, samplerate):
329        import numpy as np
330        if len(self.beat_locations) < 2:
331            outstr = "unknown bpm"
332        else:
333            bpms = 60./ np.diff(self.beat_locations)
334            median_bpm = np.mean(bpms)
335            if len(self.beat_locations) < 10:
336                outstr = "%.2f bpm (uncertain)" % median_bpm
337            else:
338                outstr = "%.2f bpm" % median_bpm
339        sys.stdout.write(outstr + '\n')
340
341class process_notes(default_process):
342    valid_opts = ['method', 'hop_size', 'buf_size', 'samplerate']
343    def __init__(self, args):
344        self.parse_options(args, self.valid_opts)
345        self.notes = aubio.notes(**self.options)
346        super(process_notes, self).__init__(args)
347    def __call__(self, block):
348        return self.notes(block)
349    def repr_res(self, res, frames_read, samplerate):
350        if res[2] != 0: # note off
351            fmt_out = self.time2string(frames_read, samplerate)
352            sys.stdout.write(fmt_out + '\n')
353        if res[0] != 0: # note on
354            lastmidi = res[0]
355            fmt_out = "%f\t" % lastmidi
356            fmt_out += self.time2string(frames_read, samplerate)
357            sys.stdout.write(fmt_out) # + '\t')
358    def flush(self, frames_read, samplerate):
359        eof = self.time2string(frames_read, samplerate)
360        sys.stdout.write(eof + '\n')
361
362class process_mfcc(default_process):
363    def __init__(self, args):
364        valid_opts1 = ['hop_size', 'buf_size']
365        self.parse_options(args, valid_opts1)
366        self.remap_pvoc_options(self.options)
367        self.pv = aubio.pvoc(**self.options)
368
369        valid_opts2 = ['buf_size', 'n_filters', 'n_coeffs', 'samplerate']
370        self.parse_options(args, valid_opts2)
371        self.mfcc = aubio.mfcc(**self.options)
372
373        # remember all options
374        self.parse_options(args, list(set(valid_opts1 + valid_opts2)))
375
376        super(process_mfcc, self).__init__(args)
377
378    def __call__(self, block):
379        fftgrain = self.pv(block)
380        return self.mfcc(fftgrain)
381    def repr_res(self, res, frames_read, samplerate):
382        fmt_out = self.time2string(frames_read, samplerate)
383        fmt_out += ' '.join(["% 9.7f" % f for f in res.tolist()])
384        sys.stdout.write(fmt_out + '\n')
385
386class process_melbands(default_process):
387    def __init__(self, args):
388        self.args = args
389        valid_opts = ['hop_size', 'buf_size']
390        self.parse_options(args, valid_opts)
391        self.remap_pvoc_options(self.options)
392        self.pv = aubio.pvoc(**self.options)
393
394        valid_opts = ['buf_size', 'n_filters']
395        self.parse_options(args, valid_opts)
396        self.remap_pvoc_options(self.options)
397        self.filterbank = aubio.filterbank(**self.options)
398        self.filterbank.set_mel_coeffs_slaney(args.samplerate)
399
400        super(process_melbands, self).__init__(args)
401    def __call__(self, block):
402        fftgrain = self.pv(block)
403        return self.filterbank(fftgrain)
404    def repr_res(self, res, frames_read, samplerate):
405        fmt_out = self.time2string(frames_read, samplerate)
406        fmt_out += ' '.join(["% 9.7f" % f for f in res.tolist()])
407        sys.stdout.write(fmt_out + '\n')
408
409class process_quiet(default_process):
410    def __init__(self, args):
411        self.args = args
412        valid_opts = ['hop_size', 'silence']
413        self.parse_options(args, valid_opts)
414        self.wassilence = 1
415
416        if args.silence is not None:
417            self.silence = args.silence
418        super(process_quiet, self).__init__(args)
419
420    def __call__(self, block):
421        if aubio.silence_detection(block, self.silence) == 1:
422            if self.wassilence != 1:
423                self.wassilence = 1
424                return 2 # newly found silence
425            return 1 # silence again
426        else:
427            if self.wassilence != 0:
428                self.wassilence = 0
429                return -1 # newly found noise
430            return 0 # noise again
431
432    def repr_res(self, res, frames_read, samplerate):
433        fmt_out = None
434        if res == -1:
435            fmt_out = "NOISY: "
436        if res == 2:
437            fmt_out = "QUIET: "
438        if fmt_out is not None:
439            fmt_out += self.time2string(frames_read, samplerate)
440            sys.stdout.write(fmt_out + '\n')
441
442def main():
443    parser = aubio_parser()
444    args = parser.parse_args()
445    if 'show_version' in args and args.show_version:
446        sys.stdout.write('aubio version ' + aubio.version + '\n')
447        sys.exit(0)
448    elif 'verbose' in args and args.verbose > 3:
449        sys.stderr.write('aubio version ' + aubio.version + '\n')
450    if 'command' not in args or args.command is None or args.command in ['help']:
451        # no command given, print help and return 1
452        parser.print_help()
453        if args.command and args.command in ['help']:
454            sys.exit(0)
455        else:
456            sys.exit(1)
457    elif not args.source_uri and not args.source_uri2:
458        sys.stderr.write("Error: a source is required\n")
459        parser.print_help()
460        sys.exit(1)
461    elif args.source_uri2 is not None:
462        args.source_uri = args.source_uri2
463    try:
464        # open source_uri
465        with aubio.source(args.source_uri, hop_size=args.hop_size,
466                samplerate=args.samplerate) as a_source:
467            # always update args.samplerate to native samplerate, in case
468            # source was opened with args.samplerate=0
469            args.samplerate = a_source.samplerate
470            # create the processor for this subcommand
471            processor = args.process(args)
472            frames_read = 0
473            while True:
474                # read new block from source
475                block, read = a_source()
476                # execute processor on this block
477                res = processor(block)
478                # print results for this block
479                if args.verbose > 0:
480                    processor.repr_res(res, frames_read, a_source.samplerate)
481                # increment total number of frames read
482                frames_read += read
483                # exit loop at end of file
484                if read < a_source.hop_size: break
485            # flush the processor if needed
486            processor.flush(frames_read, a_source.samplerate)
487            if args.verbose > 1:
488                fmt_string = "read {:.2f}s"
489                fmt_string += " ({:d} samples in {:d} blocks of {:d})"
490                fmt_string += " from {:s} at {:d}Hz\n"
491                sys.stderr.write(fmt_string.format(
492                        frames_read/float(a_source.samplerate),
493                        frames_read,
494                        frames_read // a_source.hop_size + 1,
495                        a_source.hop_size,
496                        a_source.uri,
497                        a_source.samplerate))
498    except KeyboardInterrupt:
499        sys.exit(1)
Note: See TracBrowser for help on using the repository browser.