From 44f75a19556a0270d2e8bca2c8f8a846e216424a Mon Sep 17 00:00:00 2001 From: Ilim Ugur Date: Sun, 15 Jul 2012 00:48:28 +0300 Subject: [PATCH] Separate segmented download and fallbacking. --- src/retr.c | 263 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 150 insertions(+), 113 deletions(-) diff --git a/src/retr.c b/src/retr.c index 11e23aee..736bacb2 100644 --- a/src/retr.c +++ b/src/retr.c @@ -1009,105 +1009,109 @@ retrieve_from_file (const char *file, bool html, int *count) else if(metalink = metalink_context(url)) { /*GSoC wget*/ - int i,j,k,index,dt,url_err,error_severity; + int i, j, k, index, dt, url_err, error_severity; metalink_file_t* file; metalink_resource_t* resource; struct url *url_parsed; uerr_t status, status_least_severe; - sem_t retr_sem; - pthread_t thread; - int N_THREADS = 3; - int free_threads = N_THREADS, range_start, chunk_size,file_extension; - struct s_thread_ctx *thread_ctx = NULL; - char *temp_name; + int ret, N_THREADS = opt.jobs > 0 ? opt.jobs : 1; + int free_threads, range_start, chunk_size, file_extension; + struct s_thread_ctx *thread_ctx; - i = 0; - while((file = metalink->files[i]) != NULL) + if(N_THREADS>1) { - if(1) + pthread_t thread; + sem_t retr_sem; + char *command; + + thread_ctx = calloc (N_THREADS, sizeof *thread_ctx); + i = 0; + while((file = metalink->files[i]) != NULL) { - thread_ctx = calloc (N_THREADS, sizeof *thread_ctx); + memset(thread_ctx, '\0', N_THREADS * (sizeof *thread_ctx)); + for(k = 0; k < N_THREADS; ++k) + { + thread_ctx[k].file = malloc(7 + (N_THREADS/10 + 1) + + strlen(file->name)); + thread_ctx[k].range = malloc(sizeof(struct range)); + } sem_init (&retr_sem, 0, 0); range_start = 0; chunk_size = (file->size) / N_THREADS; file_extension = 0; - } - j = 0; - while((resource = file->resources[j]) != NULL) - { - url = resource->url; - url_parsed = url_parse (url, &url_err, iri, true); - if (!url_parsed) + free_threads = N_THREADS; + + j = 0; + while(1) { - char *error = url_error (url, url_err); - logprintf (LOG_NOTQUIET, "%s: %s.\n", url, error); - xfree (error); - return URLERROR; - } - - if (!opt.base_href) - opt.base_href = xstrdup (url); - - if(1) - { - if (url && free_threads && range_start < (file->size - 1)) + resource = file->resources[j]; + + if (range_start < file->size) { - for (k = 0; k < N_THREADS; k++) - if (! thread_ctx[k].used) - { - index = k; - free_threads--; - thread_ctx[k].used = 1; - thread_ctx[k].terminated = 0; - break; - } - - temp_name = malloc(7 + (N_THREADS/10 + 1) + strlen(file->name)); - sprintf(temp_name, "temp_%s.%d", file->name, file_extension++); - thread_ctx[index].file = temp_name; - thread_ctx[index].referer = NULL; - thread_ctx[index].dt = dt; - thread_ctx[index].i = iri; - thread_ctx[index].redirected = NULL; - thread_ctx[index].url = url; - thread_ctx[index].retr_sem = &retr_sem; - thread_ctx[index].url_parsed = url_parsed; - if(!thread_ctx[index].range) - thread_ctx[index].range = malloc(sizeof(struct range)); - (thread_ctx[index].range)->first_byte = range_start; - range_start += chunk_size; - (thread_ctx[index].range)->last_byte = range_start -1; - - pthread_create (&thread, NULL, segmented_retrieve_url, - &thread_ctx[index]); - ++j; - /* GSoC TODO: Replace this with something better. */ - /* When the resources are traversed once, return to - the first resource to start re-traversing URLs - (and assigning them to the threads).*/ - if(!(file->resources[j])) - j=0; - continue; - } - - /* If file range is not covered yet, wait until a thread is - available. If the file is downloaded, then destroy all - the threads. */ - if(range_start < file->size) - { - if(!free_threads) + if (free_threads) { - int ret; + if (!resource) + j = 0; + if (url = resource->url) + { + for (k = 0; k < N_THREADS; k++) + if (! thread_ctx[k].used) + { + index = k; + free_threads--; + thread_ctx[k].used = 1; + thread_ctx[k].terminated = 0; + break; + } + + url_parsed = url_parse (url, &url_err, iri, true); + if (!url_parsed) + { + char *error = url_error (url, url_err); + logprintf (LOG_NOTQUIET, "%s: %s.\n", url, + error); + xfree (error); + return URLERROR; + } + + if (!opt.base_href) + opt.base_href = xstrdup (url); + + sprintf(thread_ctx[index].file, "temp_%s.%d", + file->name, file_extension++); + thread_ctx[index].referer = NULL; + thread_ctx[index].dt = dt; + thread_ctx[index].i = iri; + thread_ctx[index].redirected = NULL; + thread_ctx[index].url = url; + thread_ctx[index].retr_sem = &retr_sem; + thread_ctx[index].url_parsed = url_parsed; + (thread_ctx[index].range)->first_byte = range_start; + range_start += chunk_size; + (thread_ctx[index].range)->last_byte = range_start - 1; + + pthread_create (&thread, NULL, segmented_retrieve_url, + &thread_ctx[index]); + + ++j; + continue; + } + } + else + { + /* This part seems redundant FOR NOW, as the file is + initially divided into size/threads many segments, + which means the file size will already be reached + once the initial assignment to threads is made. */ do ret = sem_wait (&retr_sem); while (ret < 0 && errno == EINTR); - index = -1; for (k = 0; k < N_THREADS; k++) if (thread_ctx[k].used && thread_ctx[k].terminated) { - index = k; thread_ctx[k].used = 0; + url_free (thread_ctx[k].url_parsed); free_threads++; break; } @@ -1115,22 +1119,76 @@ retrieve_from_file (const char *file, bool html, int *count) } else { - int ret; while(free_threads < N_THREADS) { - ret = sem_wait (&retr_sem); - if(ret >= 0 || errno != EINTR) - free_threads++; + do + ret = sem_wait (&retr_sem); + while (ret < 0 && errno == EINTR); + + for (k = 0; k < N_THREADS; k++) + if (thread_ctx[k].used && thread_ctx[k].terminated) + { + thread_ctx[k].used = 0; + url_free (thread_ctx[k].url_parsed); + free_threads++; + break; + } } /*TODO: Find a way to check the success of downloads.*/ status = RETROK; break; } } - else + + for(k = 0; k < N_THREADS; ++k) { + free(thread_ctx[k].file); + free(thread_ctx[k].range); + } + sem_destroy(&retr_sem); + + /* Either all resources are exhausted and the least severe error + * among all is returned, or an error irrelevant to the server is. + * MUST be reconsidered for multiple files!!! + * Can't just exit after 1 failure, if metalink has multiple files.*/ + if (status != RETROK) + return status; + + command = malloc(15 + (N_THREADS) * (strlen(file->name) + + (N_THREADS/10 + 1) + 2) + strlen(file->name)); + sprintf(command, "cat temp_%s.* > %s",file->name , file->name); + system(command); + sprintf(command, "rm -f temp_%s.*", file->name); + system(command); + free(command); + + ++i; + } + free (thread_ctx); + } + else + { + i = 0; + while((file = metalink->files[i]) != NULL) + { + j = 0; + while((resource = file->resources[j]) != NULL) + { + url = resource->url; + url_parsed = url_parse (url, &url_err, iri, true); + if (!url_parsed) + { + char *error = url_error (url, url_err); + logprintf (LOG_NOTQUIET, "%s: %s.\n", url, error); + xfree (error); + return URLERROR; + } + + if (!opt.base_href) + opt.base_href = xstrdup (url); + status = retrieve_url (url_parsed, url, &(file->name), NULL, - NULL, &dt, false, iri, true, NULL); + NULL, &dt, false, iri, true, NULL); url_free (url_parsed); /* 3 indicates WGET_EXIT_IO_FAIL. Used the integral value to @@ -1147,39 +1205,18 @@ retrieve_from_file (const char *file, bool html, int *count) ++j; } - } - /* Either all resources are exhausted and the least severe error - * among all is returned, or an error irrelevant to the server is. - * MUST be reconsidered for multiple files!!! - * Can't just exit after 1 failure, if metalink has multiple files.*/ - if (status != RETROK) - return status; - if(thread_ctx) - { - char *command; - *(strrchr(temp_name, '.')) = '\0'; - command = malloc(9 + (N_THREADS) * (strlen(temp_name) + (N_THREADS/10 + 1) + 2) + strlen(file->name)); - sprintf(command, "cat %s* > %s",temp_name , file->name); - system(command); - sprintf(command, "rm -f %s.*", temp_name); - system(command); - free(command); - } - for(k = 0; k < N_THREADS; ++k) - { - xfree (thread_ctx[k].range); - free(thread_ctx[k].file); - url_free (thread_ctx[k].url_parsed); - } + if (status != RETROK) + return status; - ++i; + ++i; + } } - xfree (thread_ctx); - iri_free (iri); - /* delete metalink_t */ - metalink_delete(metalink); - return status; + + iri_free (iri); + /* delete metalink_t */ + metalink_delete(metalink); + return status; } else input_file = (char *) file;