зеркало из https://github.com/mozilla/gecko-dev.git
252 строки
8.2 KiB
Python
252 строки
8.2 KiB
Python
try:
|
|
import unittest2 as unittest # python < 2.7 compat
|
|
except ImportError:
|
|
import unittest
|
|
import tempfile
|
|
import shutil
|
|
import os
|
|
import time
|
|
import six
|
|
from mock import Mock
|
|
|
|
from dlmanager import manager as download_manager
|
|
|
|
|
|
def mock_session():
|
|
response = Mock()
|
|
session = Mock(get=Mock(return_value=response))
|
|
return session, response
|
|
|
|
|
|
def mock_response(response, data, wait=0):
|
|
data = six.b(data)
|
|
|
|
def iter_content(chunk_size=4):
|
|
rest = data
|
|
while rest:
|
|
time.sleep(wait)
|
|
chunk = rest[:chunk_size]
|
|
rest = rest[chunk_size:]
|
|
yield chunk
|
|
|
|
response.headers = {'Content-length': str(len(data))}
|
|
response.iter_content = iter_content
|
|
|
|
|
|
class TestDownload(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tempdir = tempfile.mkdtemp()
|
|
self.addCleanup(shutil.rmtree, self.tempdir)
|
|
self.finished = Mock()
|
|
self.session, self.session_response = mock_session()
|
|
self.tempfile = os.path.join(self.tempdir, 'dest')
|
|
self.dl = download_manager.Download('http://url', self.tempfile,
|
|
finished_callback=self.finished,
|
|
chunk_size=4,
|
|
session=self.session)
|
|
|
|
def test_creation(self):
|
|
self.assertFalse(self.dl.is_canceled())
|
|
self.assertFalse(self.dl.is_running())
|
|
self.assertIsNone(self.dl.error())
|
|
self.assertEquals(self.dl.get_url(), 'http://url')
|
|
self.assertEquals(self.dl.get_dest(), self.tempfile)
|
|
|
|
def create_response(self, data, wait=0):
|
|
mock_response(self.session_response, data, wait)
|
|
|
|
def test_download(self):
|
|
self.create_response('1234' * 4, 0.01)
|
|
|
|
# no file present yet
|
|
self.assertFalse(os.path.exists(self.tempfile))
|
|
|
|
self.dl.start()
|
|
self.assertTrue(self.dl.is_running())
|
|
self.dl.wait()
|
|
|
|
self.assertFalse(self.dl.is_running())
|
|
self.finished.assert_called_with(self.dl)
|
|
# file has been downloaded
|
|
with open(self.tempfile) as f:
|
|
self.assertEquals(f.read(), '1234' * 4)
|
|
|
|
def test_download_cancel(self):
|
|
self.create_response('1234' * 1000, wait=0.01)
|
|
|
|
start = time.time()
|
|
self.dl.start()
|
|
time.sleep(0.1)
|
|
self.dl.cancel()
|
|
|
|
with self.assertRaises(download_manager.DownloadInterrupt):
|
|
self.dl.wait()
|
|
|
|
self.assertTrue(self.dl.is_canceled())
|
|
|
|
# response generation should have taken 1000 * 0.01 = 10 seconds.
|
|
# since we canceled, this must be lower.
|
|
self.assertTrue((time.time() - start) < 1.0)
|
|
|
|
# file was deleted
|
|
self.assertFalse(os.path.exists(self.tempfile))
|
|
# finished callback was called
|
|
self.finished.assert_called_with(self.dl)
|
|
|
|
def test_download_with_progress(self):
|
|
data = []
|
|
|
|
def update_progress(_dl, current, total):
|
|
data.append((_dl, current, total))
|
|
|
|
self.create_response('1234' * 4)
|
|
|
|
self.dl.set_progress(update_progress)
|
|
self.dl.start()
|
|
self.dl.wait()
|
|
|
|
self.assertEquals(data, [
|
|
(self.dl, 0, 16),
|
|
(self.dl, 4, 16),
|
|
(self.dl, 8, 16),
|
|
(self.dl, 12, 16),
|
|
(self.dl, 16, 16),
|
|
])
|
|
# file has been downloaded
|
|
with open(self.tempfile) as f:
|
|
self.assertEquals(f.read(), '1234' * 4)
|
|
# finished callback was called
|
|
self.finished.assert_called_with(self.dl)
|
|
|
|
def test_download_error_in_thread(self):
|
|
self.session_response.headers = {'Content-length': '24'}
|
|
self.session_response.iter_content.side_effect = IOError
|
|
|
|
self.dl.start()
|
|
with self.assertRaises(IOError):
|
|
self.dl.wait()
|
|
|
|
self.assertEquals(self.dl.error()[0], IOError)
|
|
# finished callback was called
|
|
self.finished.assert_called_with(self.dl)
|
|
|
|
def test_wait_does_not_block_on_exception(self):
|
|
# this test the case when a user may hit CTRL-C for example
|
|
# during a dl.wait() call.
|
|
self.create_response('1234' * 1000, wait=0.01)
|
|
|
|
original_join = self.dl.thread.join
|
|
it = iter('123')
|
|
|
|
def join(timeout=None):
|
|
next(it) # will throw StopIteration after a few calls
|
|
original_join(timeout)
|
|
|
|
self.dl.thread.join = join
|
|
|
|
start = time.time()
|
|
self.dl.start()
|
|
|
|
with self.assertRaises(StopIteration):
|
|
self.dl.wait()
|
|
|
|
self.assertTrue(self.dl.is_canceled())
|
|
# wait for the thread to finish
|
|
original_join()
|
|
|
|
# response generation should have taken 1000 * 0.01 = 10 seconds.
|
|
# since we got an error, this must be lower.
|
|
self.assertTrue((time.time() - start) < 1.0)
|
|
|
|
# file was deleted
|
|
self.assertFalse(os.path.exists(self.tempfile))
|
|
# finished callback was called
|
|
self.finished.assert_called_with(self.dl)
|
|
|
|
|
|
class TestDownloadManager(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tempdir = tempfile.mkdtemp()
|
|
self.addCleanup(shutil.rmtree, self.tempdir)
|
|
|
|
self.dl_manager = download_manager.DownloadManager(self.tempdir)
|
|
|
|
def do_download(self, url, fname, data, wait=0):
|
|
session, response = mock_session()
|
|
mock_response(response, data, wait)
|
|
# patch the session, so the download will use that
|
|
self.dl_manager.session = session
|
|
return self.dl_manager.download(url, fname)
|
|
|
|
def test_download(self):
|
|
dl1 = self.do_download('http://foo', 'foo', 'hello' * 4, wait=0.02)
|
|
self.assertIsInstance(dl1, download_manager.Download)
|
|
self.assertTrue(dl1.is_running())
|
|
|
|
# with the same fname, no new download is started. The same instance
|
|
# is returned since the download is running.
|
|
dl2 = self.do_download('http://bar', 'foo', 'hello2' * 4, wait=0.02)
|
|
self.assertEquals(dl1, dl2)
|
|
|
|
# starting a download with another fname will trigger a new download
|
|
dl3 = self.do_download('http://bar', 'foo2', 'hello you' * 4)
|
|
self.assertIsInstance(dl3, download_manager.Download)
|
|
self.assertNotEquals(dl3, dl1)
|
|
|
|
# let's wait for the downloads to finish
|
|
dl3.wait()
|
|
dl1.wait()
|
|
|
|
# now if we try to download a fname that exists, None is returned
|
|
dl4 = self.do_download('http://bar', 'foo', 'hello2' * 4, wait=0.02)
|
|
self.assertIsNone(dl4)
|
|
|
|
# downloaded files are what is expected
|
|
def content(fname):
|
|
with open(os.path.join(self.tempdir, fname)) as f:
|
|
return f.read()
|
|
self.assertEquals(content('foo'), 'hello' * 4)
|
|
self.assertEquals(content('foo2'), 'hello you' * 4)
|
|
|
|
# download instances are removed from the manager (internal test)
|
|
self.assertEquals(self.dl_manager._downloads, {})
|
|
|
|
def test_cancel(self):
|
|
dl1 = self.do_download('http://foo', 'foo', 'foo' * 50000, wait=0.02)
|
|
dl2 = self.do_download('http://foo', 'bar', 'bar' * 50000, wait=0.02)
|
|
dl3 = self.do_download('http://foo', 'foobar', 'foobar' * 4)
|
|
|
|
# let's cancel only one
|
|
def cancel_if(dl):
|
|
if os.path.basename(dl.get_dest()) == 'foo':
|
|
return True
|
|
self.dl_manager.cancel(cancel_if=cancel_if)
|
|
|
|
self.assertTrue(dl1.is_canceled())
|
|
self.assertFalse(dl2.is_canceled())
|
|
self.assertFalse(dl3.is_canceled())
|
|
|
|
# wait for dl3
|
|
dl3.wait()
|
|
|
|
# cancel everything
|
|
self.dl_manager.cancel()
|
|
|
|
self.assertTrue(dl1.is_canceled())
|
|
self.assertTrue(dl2.is_canceled())
|
|
# dl3 is not canceled since it finished before
|
|
self.assertFalse(dl3.is_canceled())
|
|
|
|
# wait for the completion of dl1 and dl2 threads
|
|
dl1.wait(raise_if_error=False)
|
|
dl2.wait(raise_if_error=False)
|
|
|
|
# at the end, only dl3 has been downloaded
|
|
self.assertEquals(os.listdir(self.tempdir), ["foobar"])
|
|
|
|
with open(os.path.join(self.tempdir, 'foobar')) as f:
|
|
self.assertEquals(f.read(), 'foobar' * 4)
|
|
|
|
# download instances are removed from the manager (internal test)
|
|
self.assertEquals(self.dl_manager._downloads, {})
|