servo: Perform the image prefetch in a different task

Having the prefetch task send back the results immediately will
provide an appropriate event to signel clients that want to know
when images become available.

Source-Repo: https://github.com/servo/servo
Source-Revision: fe4b1c92dd37ff7abe0d05721145cfa2c7baa55f
This commit is contained in:
Brian Anderson 2012-08-12 21:41:53 -07:00
Родитель eea05c5a76
Коммит 3f8b293e92
2 изменённых файлов: 248 добавлений и 97 удалений

Просмотреть файл

@ -8,7 +8,7 @@ import resource::resource_task;
import resource::resource_task::{ResourceTask};
import std::net::url::url;
import resource::image_cache_task;
import image_cache_task::{ImageCacheTask, image_cache_task};
import image_cache_task::{ImageCacheTask, image_cache_task, ImageCacheTaskClient};
import pipes::{port, chan};
@ -67,7 +67,7 @@ class Engine<S:Sink send copy> {
self.renderer.send(renderer::ExitMsg(response_chan));
response_port.recv();
self.image_cache_task.send(image_cache_task::Exit);
self.image_cache_task.exit();
self.resource_task.send(resource_task::Exit);
sender.send(());

Просмотреть файл

@ -2,6 +2,7 @@ export Msg, Prefetch, GetImage, Exit;
export ImageResponseMsg, ImageReady, ImageNotReady;
export ImageCacheTask;
export image_cache_task;
export ImageCacheTaskClient;
import image::base::{Image, load_from_memory, test_image_bin};
import std::net::url::url;
@ -13,16 +14,28 @@ import resource_task::ResourceTask;
import std::arc::arc;
import clone_arc = std::arc::clone;
import std::cell::Cell;
import result::{result, ok, err};
import to_str::to_str;
enum Msg {
/// Tell the cache that we may need a particular image soon. Must be posted
/// before GetImage
Prefetch(url),
/// Used be the prefetch tasks to post back image binaries
/*priv*/ StorePrefetchedImageData(url, result<Cell<~[u8]>, ()>),
/// Request an Image object for a URL
GetImage(url, chan<ImageResponseMsg>),
/// Used by the decoder tasks to post decoded images back to the cache
StoreImage(url, arc<~Image>),
Exit
/*priv*/ StoreImage(url, arc<~Image>),
/// For testing
/*priv*/ OnMsg(fn~(msg: &Msg)),
/// Clients must wait for a response before shutting down the ResourceTask
Exit(chan<()>)
}
enum ImageResponseMsg {
@ -37,7 +50,8 @@ fn image_cache_task(resource_task: ResourceTask) -> ImageCacheTask {
ImageCache {
resource_task: resource_task,
from_client: from_client,
state_map: url_map()
state_map: url_map(),
need_exit: none
}.run();
}
}
@ -49,21 +63,18 @@ struct ImageCache {
from_client: port<Msg>;
/// The state of processsing an image for a URL
state_map: UrlMap<ImageState>;
mut need_exit: option<chan<()>>;
}
enum ImageState {
Init,
Prefetching(@PrefetchData),
Prefetching,
Prefetched(@Cell<~[u8]>),
Decoding(@FutureData),
Decoded(@arc<~Image>),
Failed
}
struct PrefetchData {
response_port: port<resource_task::ProgressMsg>;
mut data: ~[u8];
}
struct FutureData {
mut waiters: ~[chan<ImageResponseMsg>];
}
@ -73,13 +84,51 @@ impl ImageCache {
fn run() {
let mut msg_handlers: ~[fn~(msg: &Msg)] = ~[];
loop {
let msg = self.from_client.recv();
for msg_handlers.each |handler| { handler(&msg) }
#debug("image_cache_task: received: %?", msg);
// FIXME: Need to move out the urls
match self.from_client.recv() {
match msg {
Prefetch(url) => self.prefetch(copy url),
StorePrefetchedImageData(url, data) => self.store_prefetched_image_data(copy url, &data),
GetImage(url, response) => self.get_image(copy url, response),
StoreImage(url, image) => self.store_image(copy url, &image),
Exit => break
OnMsg(handler) => msg_handlers += [copy handler],
Exit(response) => {
assert self.need_exit.is_none();
self.need_exit = some(response);
}
}
match copy self.need_exit {
some(response) => {
// Wait until we have no outstanding requests and subtasks
// before exiting
let mut can_exit = true;
for self.state_map.each_value |state| {
match state {
Prefetching => can_exit = false,
Decoding(*) => can_exit = false,
Init
| Prefetched(*)
| Decoded(*)
| Failed => ()
}
}
if can_exit {
response.send(());
break;
}
}
none => ()
}
}
}
@ -98,18 +147,30 @@ impl ImageCache {
/*priv*/ fn prefetch(+url: url) {
match self.get_state(copy url) {
Init => {
let response_port = port();
self.resource_task.send(resource_task::Load(copy url, response_port.chan()));
let to_cache = self.from_client.chan();
let resource_task = self.resource_task;
let url_cell = Cell(copy url);
let prefetch_data = @PrefetchData {
response_port: response_port,
data: ~[]
};
do spawn |move url_cell| {
let url = url_cell.take();
#debug("image_cache_task: started fetch for %s", url.to_str());
self.set_state(url, Prefetching(prefetch_data));
let image = load_image_data(copy url, resource_task);
let result = if image.is_ok() {
ok(Cell(result::unwrap(image)))
} else {
err(())
};
to_cache.send(StorePrefetchedImageData(copy url, result));
#debug("image_cache_task: ended fetch for %s", (copy url).to_str());
}
self.set_state(url, Prefetching);
}
Prefetching(*)
Prefetching
| Prefetched(*)
| Decoding(*)
| Decoded(*)
| Failed => {
@ -118,58 +179,61 @@ impl ImageCache {
}
}
/*priv*/ fn store_prefetched_image_data(+url: url, data: &result<Cell<~[u8]>, ()>) {
match self.get_state(copy url) {
Prefetching => {
match *data {
ok(data_cell) => {
let data = data_cell.take();
self.set_state(url, Prefetched(@Cell(data)));
}
err(*) => {
self.set_state(url, Failed);
}
}
}
Init
| Prefetched(*)
| Decoding(*)
| Decoded(*)
| Failed => {
fail ~"wrong state for storing prefetched image"
}
}
}
/*priv*/ fn get_image(+url: url, response: chan<ImageResponseMsg>) {
match self.get_state(copy url) {
Init => fail ~"Request for image before prefetch",
Prefetching(prefetch_data) => {
Prefetching => {
response.send(ImageNotReady);
}
let mut image_sent = false;
Prefetched(data_cell) => {
assert !data_cell.is_empty();
while prefetch_data.response_port.peek() {
match prefetch_data.response_port.recv() {
resource_task::Payload(data) => {
prefetch_data.data += data;
}
resource_task::Done(result::ok(*)) => {
// We've got the entire image binary
let mut data = ~[];
data <-> prefetch_data.data;
let data <- data; // freeze for capture
let data = data_cell.take();
let to_cache = self.from_client.chan();
let url_cell = Cell(copy url);
let to_cache = self.from_client.chan();
let url_cell = Cell(copy url);
do spawn |move url_cell| {
let image = arc(~load_from_memory(data));
// Send the image to the original requester
response.send(ImageReady(clone_arc(&image)));
to_cache.send(StoreImage(url_cell.take(), clone_arc(&image)));
}
let future_data = @FutureData {
waiters: ~[]
};
self.set_state(url, Decoding(future_data));
image_sent = true;
break;
}
resource_task::Done(result::err(*)) => {
// There was an error loading the image binary. Put it
// in the error map so we remember the error for future
// requests.
self.set_state(url, Failed);
break;
}
}
do spawn |move url_cell| {
let url = url_cell.take();
#debug("image_cache_task: started image decode for %s", url.to_str());
let image = arc(~load_from_memory(data));
// Send the image to the original requester
response.send(ImageReady(clone_arc(&image)));
to_cache.send(StoreImage(copy url, clone_arc(&image)));
#debug("image_cache_task: ended image decode for %s", url.to_str());
}
if !image_sent {
response.send(ImageNotReady);
}
let future_data = @FutureData {
waiters: ~[]
};
self.set_state(url, Decoding(future_data));
}
Decoding(future_data) => {
@ -206,7 +270,8 @@ impl ImageCache {
}
Init
| Prefetching(*)
| Prefetching
| Prefetched(*)
| Decoded(*)
| Failed => {
fail ~"incorrect state in store_image"
@ -216,6 +281,42 @@ impl ImageCache {
}
}
trait ImageCacheTaskClient {
fn exit();
}
impl ImageCacheTask: ImageCacheTaskClient {
fn exit() {
let response = port();
self.send(Exit(response.chan()));
response.recv();
}
}
fn load_image_data(+url: url, resource_task: ResourceTask) -> result<~[u8], ()> {
let response_port = port();
resource_task.send(resource_task::Load(url, response_port.chan()));
let mut image_data = ~[];
loop {
match response_port.recv() {
resource_task::Payload(data) => {
image_data += data;
}
resource_task::Done(result::ok(*)) => {
return ok(image_data);
}
resource_task::Done(result::err(*)) => {
return err(());
}
}
}
}
#[test]
fn should_exit_on_request() {
@ -235,7 +336,7 @@ fn should_exit_on_request() {
let image_cache_task = image_cache_task(mock_resource_task);
let _url = make_url(~"file", none);
image_cache_task.send(Exit);
image_cache_task.exit();
mock_resource_task.send(resource_task::Exit);
}
@ -266,7 +367,10 @@ fn should_request_url_from_resource_task_on_prefetch() {
loop {
match from_client.recv() {
resource_task::Load(url, _) => url_requested_chan.send(()),
resource_task::Load(url, response) => {
url_requested_chan.send(());
response.send(resource_task::Done(result::ok(())));
}
resource_task::Exit => break
}
}
@ -277,7 +381,7 @@ fn should_request_url_from_resource_task_on_prefetch() {
image_cache_task.send(Prefetch(url));
url_requested.recv();
image_cache_task.send(Exit);
image_cache_task.exit();
mock_resource_task.send(resource_task::Exit);
}
@ -293,7 +397,10 @@ fn should_not_request_url_from_resource_task_on_multiple_prefetches() {
loop {
match from_client.recv() {
resource_task::Load(url, _) => url_requested_chan.send(()),
resource_task::Load(url, response) => {
url_requested_chan.send(());
response.send(resource_task::Done(result::ok(())));
}
resource_task::Exit => break
}
}
@ -305,13 +412,16 @@ fn should_not_request_url_from_resource_task_on_multiple_prefetches() {
image_cache_task.send(Prefetch(copy url));
image_cache_task.send(Prefetch(url));
url_requested.recv();
image_cache_task.send(Exit);
image_cache_task.exit();
mock_resource_task.send(resource_task::Exit);
assert !url_requested.peek()
}
#[test]
fn should_return_image_not_ready_if_data_has_not_arrived() {
let (wait_chan, wait_port) = pipes::stream();
let mock_resource_task = do spawn_listener |from_client| {
// infer me
@ -319,8 +429,13 @@ fn should_return_image_not_ready_if_data_has_not_arrived() {
loop {
match from_client.recv() {
resource_task::Load(url, response) => {
// Don't send the data until after the client requests
// the image
wait_port.recv();
response.send(resource_task::Done(result::ok(())));
}
resource_task::Exit => break,
_ => ()
}
}
};
@ -332,16 +447,14 @@ fn should_return_image_not_ready_if_data_has_not_arrived() {
let response_port = port();
image_cache_task.send(GetImage(url, response_port.chan()));
assert response_port.recv() == ImageNotReady;
image_cache_task.send(Exit);
wait_chan.send(());
image_cache_task.exit();
mock_resource_task.send(resource_task::Exit);
}
#[test]
fn should_return_decoded_image_data_if_data_has_arrived() {
let image_bin_sent = port();
let image_bin_sent_chan = image_bin_sent.chan();
let mock_resource_task = do spawn_listener |from_client| {
// infer me
@ -352,7 +465,6 @@ fn should_return_decoded_image_data_if_data_has_arrived() {
resource_task::Load(_, response) => {
response.send(resource_task::Payload(test_image_bin()));
response.send(resource_task::Done(result::ok(())));
image_bin_sent_chan.send(());
}
resource_task::Exit => break
}
@ -362,10 +474,20 @@ fn should_return_decoded_image_data_if_data_has_arrived() {
let image_cache_task = image_cache_task(mock_resource_task);
let url = make_url(~"file", none);
let wait_for_prefetech = port();
let wait_for_prefetech_chan = wait_for_prefetech.chan();
image_cache_task.send(OnMsg(|msg| {
match *msg {
StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()),
_ => ()
}
}));
image_cache_task.send(Prefetch(copy url));
// Wait until our mock resource task has sent the image to the image cache
image_bin_sent.recv();
wait_for_prefetech_chan.recv();
let response_port = port();
image_cache_task.send(GetImage(url, response_port.chan()));
@ -374,16 +496,13 @@ fn should_return_decoded_image_data_if_data_has_arrived() {
_ => fail
}
image_cache_task.send(Exit);
image_cache_task.exit();
mock_resource_task.send(resource_task::Exit);
}
#[test]
fn should_return_decoded_image_data_for_multiple_requests() {
let image_bin_sent = port();
let image_bin_sent_chan = image_bin_sent.chan();
let mock_resource_task = do spawn_listener |from_client| {
// infer me
@ -394,7 +513,6 @@ fn should_return_decoded_image_data_for_multiple_requests() {
resource_task::Load(_, response) => {
response.send(resource_task::Payload(test_image_bin()));
response.send(resource_task::Done(result::ok(())));
image_bin_sent_chan.send(());
}
resource_task::Exit => break
}
@ -404,10 +522,20 @@ fn should_return_decoded_image_data_for_multiple_requests() {
let image_cache_task = image_cache_task(mock_resource_task);
let url = make_url(~"file", none);
let wait_for_prefetech = port();
let wait_for_prefetech_chan = wait_for_prefetech.chan();
image_cache_task.send(OnMsg(|msg| {
match *msg {
StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()),
_ => ()
}
}));
image_cache_task.send(Prefetch(copy url));
// Wait until our mock resource task has sent the image to the image cache
image_bin_sent.recv();
wait_for_prefetech.recv();
for iter::repeat(2) {
let response_port = port();
@ -418,7 +546,7 @@ fn should_return_decoded_image_data_for_multiple_requests() {
}
}
image_cache_task.send(Exit);
image_cache_task.exit();
mock_resource_task.send(resource_task::Exit);
}
@ -454,10 +582,21 @@ fn should_not_request_image_from_resource_task_if_image_is_already_available() {
let image_cache_task = image_cache_task(mock_resource_task);
let url = make_url(~"file", none);
let wait_for_prefetech = port();
let wait_for_prefetech_chan = wait_for_prefetech.chan();
image_cache_task.send(OnMsg(|msg| {
match *msg {
StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()),
_ => ()
}
}));
image_cache_task.send(Prefetch(copy url));
// Wait until our mock resource task has sent the image to the image cache
image_bin_sent.recv();
wait_for_prefetech.recv();
let response_port = port();
image_cache_task.send(GetImage(copy url, response_port.chan()));
@ -472,7 +611,7 @@ fn should_not_request_image_from_resource_task_if_image_is_already_available() {
image_cache_task.send(GetImage(url, response_port.chan()));
response_port.recv();
image_cache_task.send(Exit);
image_cache_task.exit();
mock_resource_task.send(resource_task::Exit);
resource_task_exited.recv();
@ -529,7 +668,7 @@ fn should_not_request_image_from_resource_task_if_image_fetch_already_failed() {
image_cache_task.send(GetImage(url, response_port.chan()));
response_port.recv();
image_cache_task.send(Exit);
image_cache_task.exit();
mock_resource_task.send(resource_task::Exit);
resource_task_exited.recv();
@ -542,9 +681,6 @@ fn should_not_request_image_from_resource_task_if_image_fetch_already_failed() {
#[test]
fn should_return_not_ready_if_image_bin_cannot_be_fetched() {
let image_bin_sent = port();
let image_bin_sent_chan = image_bin_sent.chan();
let mock_resource_task = do spawn_listener |from_client| {
// infer me
@ -556,7 +692,6 @@ fn should_return_not_ready_if_image_bin_cannot_be_fetched() {
response.send(resource_task::Payload(test_image_bin()));
// ERROR fetching image
response.send(resource_task::Done(result::err(())));
image_bin_sent_chan.send(());
}
resource_task::Exit => break
}
@ -566,10 +701,20 @@ fn should_return_not_ready_if_image_bin_cannot_be_fetched() {
let image_cache_task = image_cache_task(mock_resource_task);
let url = make_url(~"file", none);
let wait_for_prefetech = port();
let wait_for_prefetech_chan = wait_for_prefetech.chan();
image_cache_task.send(OnMsg(|msg| {
match *msg {
StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()),
_ => ()
}
}));
image_cache_task.send(Prefetch(copy url));
// Wait until our mock resource task has sent the image to the image cache
image_bin_sent.recv();
wait_for_prefetech.recv();
let response_port = port();
image_cache_task.send(GetImage(url, response_port.chan()));
@ -578,16 +723,13 @@ fn should_return_not_ready_if_image_bin_cannot_be_fetched() {
_ => fail
}
image_cache_task.send(Exit);
image_cache_task.exit();
mock_resource_task.send(resource_task::Exit);
}
#[test]
fn should_return_not_ready_for_multiple_get_image_requests_if_image_bin_cannot_be_fetched() {
let image_bin_sent = port();
let image_bin_sent_chan = image_bin_sent.chan();
let mock_resource_task = do spawn_listener |from_client| {
// infer me
@ -599,7 +741,6 @@ fn should_return_not_ready_for_multiple_get_image_requests_if_image_bin_cannot_b
response.send(resource_task::Payload(test_image_bin()));
// ERROR fetching image
response.send(resource_task::Done(result::err(())));
image_bin_sent_chan.send(());
}
resource_task::Exit => break
}
@ -609,10 +750,20 @@ fn should_return_not_ready_for_multiple_get_image_requests_if_image_bin_cannot_b
let image_cache_task = image_cache_task(mock_resource_task);
let url = make_url(~"file", none);
let wait_for_prefetech = port();
let wait_for_prefetech_chan = wait_for_prefetech.chan();
image_cache_task.send(OnMsg(|msg| {
match *msg {
StorePrefetchedImageData(*) => wait_for_prefetech_chan.send(()),
_ => ()
}
}));
image_cache_task.send(Prefetch(copy url));
// Wait until our mock resource task has sent the image to the image cache
image_bin_sent.recv();
wait_for_prefetech.recv();
let response_port = port();
image_cache_task.send(GetImage(copy url, response_port.chan()));
@ -629,6 +780,6 @@ fn should_return_not_ready_for_multiple_get_image_requests_if_image_bin_cannot_b
_ => fail
}
image_cache_task.send(Exit);
image_cache_task.exit();
mock_resource_task.send(resource_task::Exit);
}