source: python/tests/test_sink.py @ df2de24

Last change on this file since df2de24 was b4445fb, checked in by Paul Brossier <piem@piem.org>, 6 years ago

[tests] also capture expected source warnings in test_sink

  • Property mode set to 100755
File size: 4.5 KB
Line 
1#! /usr/bin/env python
2
3from numpy.testing import TestCase
4from aubio import fvec, source, sink
5from utils import list_all_sounds, get_tmp_sink_path, del_tmp_sink_path
6from utils import parse_file_samplerate
7from _tools import parametrize, skipTest, assert_raises, assert_warns
8
9list_of_sounds = list_all_sounds('sounds')
10samplerates = [0, 44100, 8000, 32000]
11hop_sizes = [512, 1024, 64]
12
13path = None
14
15many_files = 300 # 256 opened files is too much
16
17all_params = []
18for soundfile in list_of_sounds:
19    for hop_size in hop_sizes:
20        for samplerate in samplerates:
21            all_params.append((hop_size, samplerate, soundfile))
22
23class Test_aubio_sink(object):
24
25    def test_wrong_filename(self):
26        with assert_raises(RuntimeError):
27            sink('')
28
29    def test_wrong_samplerate(self):
30        with assert_raises(RuntimeError):
31            sink(get_tmp_sink_path(), -1)
32
33    def test_wrong_samplerate_too_large(self):
34        with assert_raises(RuntimeError):
35            sink(get_tmp_sink_path(), 1536001, 2)
36
37    def test_wrong_channels(self):
38        with assert_raises(RuntimeError):
39            sink(get_tmp_sink_path(), 44100, -1)
40
41    def test_wrong_channels_too_large(self):
42        with assert_raises(RuntimeError):
43            sink(get_tmp_sink_path(), 44100, 202020)
44
45    def test_many_sinks(self):
46        from tempfile import mkdtemp
47        import os.path
48        import shutil
49        tmpdir = mkdtemp()
50        sink_list = []
51        for i in range(many_files):
52            path = os.path.join(tmpdir, 'f-' + str(i) + '.wav')
53            g = sink(path, 0)
54            sink_list.append(g)
55            write = 32
56            for _ in range(200):
57                vec = fvec(write)
58                g(vec, write)
59            g.close()
60        shutil.rmtree(tmpdir)
61
62    @parametrize('hop_size, samplerate, path', all_params)
63    def test_read_and_write(self, hop_size, samplerate, path):
64        orig_samplerate = parse_file_samplerate(soundfile)
65        try:
66            if orig_samplerate is not None and orig_samplerate < samplerate:
67                # upsampling should emit a warning
68                with assert_warns(UserWarning):
69                    f = source(soundfile, samplerate, hop_size)
70            else:
71                f = source(soundfile, samplerate, hop_size)
72        except RuntimeError as e:
73            err_msg = '{:s} (hop_s = {:d}, samplerate = {:d})'
74            skipTest(err_msg.format(str(e), hop_size, samplerate))
75        if samplerate == 0: samplerate = f.samplerate
76        sink_path = get_tmp_sink_path()
77        g = sink(sink_path, samplerate)
78        total_frames = 0
79        while True:
80            vec, read = f()
81            g(vec, read)
82            total_frames += read
83            if read < f.hop_size: break
84        del_tmp_sink_path(sink_path)
85
86    @parametrize('hop_size, samplerate, path', all_params)
87    def test_read_and_write_multi(self, hop_size, samplerate, path):
88        orig_samplerate = parse_file_samplerate(soundfile)
89        try:
90            if orig_samplerate is not None and orig_samplerate < samplerate:
91                # upsampling should emit a warning
92                with assert_warns(UserWarning):
93                    f = source(soundfile, samplerate, hop_size)
94            else:
95                f = source(soundfile, samplerate, hop_size)
96        except RuntimeError as e:
97            err_msg = '{:s} (hop_s = {:d}, samplerate = {:d})'
98            skipTest(err_msg.format(str(e), hop_size, samplerate))
99        if samplerate == 0: samplerate = f.samplerate
100        sink_path = get_tmp_sink_path()
101        g = sink(sink_path, samplerate, channels = f.channels)
102        total_frames = 0
103        while True:
104            vec, read = f.do_multi()
105            g.do_multi(vec, read)
106            total_frames += read
107            if read < f.hop_size: break
108        del_tmp_sink_path(sink_path)
109
110    def test_close_file(self):
111        samplerate = 44100
112        sink_path = get_tmp_sink_path()
113        g = sink(sink_path, samplerate)
114        g.close()
115        del_tmp_sink_path(sink_path)
116
117    def test_close_file_twice(self):
118        samplerate = 44100
119        sink_path = get_tmp_sink_path()
120        g = sink(sink_path, samplerate)
121        g.close()
122        g.close()
123        del_tmp_sink_path(sink_path)
124
125    def test_read_with(self):
126        samplerate = 44100
127        sink_path = get_tmp_sink_path()
128        vec = fvec(128)
129        with sink(sink_path, samplerate) as g:
130            for _ in range(10):
131                g(vec, 128)
132
133if __name__ == '__main__':
134    from _tools import run_module_suite
135    run_module_suite()
Note: See TracBrowser for help on using the repository browser.