Redesign the code block in retrieve_from_file.

This commit is contained in:
Ilim Ugur 2012-07-24 18:56:38 +03:00
parent 6f102902ad
commit 40493a76fe
4 changed files with 245 additions and 284 deletions

View File

@ -49,14 +49,14 @@ wget_SOURCES = cmpt.c connect.c convert.c cookies.c ftp.c \
css_.c css-url.c \
ftp-basic.c ftp-ls.c hash.c host.c html-parse.c html-url.c \
http.c init.c log.c main.c netrc.c progress.c ptimer.c \
recur.c res.c retr.c spider.c url.c \
recur.c res.c retr.c spider.c url.c multi.c \
utils.c exits.c build_info.c $(IRI_OBJ) $(METALINK_OBJ) \
css-url.h css-tokens.h connect.h convert.h cookies.h \
ftp.h hash.h host.h html-parse.h html-url.h \
http.h http-ntlm.h init.h log.h mswindows.h netrc.h \
options.h progress.h ptimer.h recur.h res.h retr.h \
spider.h ssl.h sysdep.h url.h utils.h wget.h iri.h \
exits.h gettext.h metalink.h
exits.h gettext.h metalink.h multi.h
nodist_wget_SOURCES = version.c
EXTRA_wget_SOURCES = iri.c metalink.c
LDADD = $(LIBOBJS) ../lib/libgnu.a

77
src/multi.c Normal file
View File

@ -0,0 +1,77 @@
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <errno.h>
#include "multi.h"
#include "url.h"
#include "wget.h"
int
spawn_thread (struct s_thread_ctx *thread_ctx, char * name, int index, int resource)
{
static pthread_t *thread;
char *command;
sprintf(thread_ctx[index].file, TEMP_PREFIX "%s.%d", name, index);
thread_ctx[index].url_parsed = url_parse (thread_ctx[index].url,
&(thread_ctx[index].url_err), thread_ctx[index].i, true);
if(!thread_ctx[index].url_parsed)
return 1;
/* TODO: Update this when configuring fallbacking code so that downloading
goes on from where the previous resource failed.
TODO: size of the bytes allocated by malloc should be recalculated and
the string assignment to that space should be readjusted to contain only
the files created by wget.(ONLY the files[] elements) */
if(file_exists_p(thread_ctx[index].file))
{
command = malloc(sizeof("rm -f ") + sizeof(TEMP_PREFIX) + strlen(name)
+ (sizeof(".")-1) + (opt.jobs/10 + 1) + sizeof(""));
sprintf(command, "rm -f " TEMP_PREFIX "%s.%d", name, index);
system(command);
free(command);
}
(thread_ctx[index].range)->is_assigned = 1;
(thread_ctx[index].range)->resources[resource] = true;
thread_ctx[index].used = 1;
thread_ctx[index].terminated = 0;
return pthread_create (&thread, NULL, segmented_retrieve_url, &thread_ctx[index]);
}
int
collect_thread (sem_t *retr_sem, struct s_thread_ctx *thread_ctx)
{
int k, ret;
do
ret = sem_wait (retr_sem);
while (ret < 0 && errno == EINTR);
for (k = 0; k < opt.jobs; k++)
if (thread_ctx[k].used && thread_ctx[k].terminated)
{
url_free (thread_ctx[k].url_parsed);
thread_ctx[k].used = 0;
thread_ctx[k].terminated = 0;
(thread_ctx[k].range)->is_assigned = 0;
return k;
}
}
static void *
segmented_retrieve_url (void *arg)
{
struct s_thread_ctx *ctx = (struct s_thread_ctx *) arg;
ctx->status = retrieve_url (ctx->url_parsed, ctx->url,
&ctx->file, &ctx->redirected,
ctx->referer, &ctx->dt,
false, ctx->i, true, ctx->range);
ctx->terminated = 1;
sem_post (ctx->retr_sem);
}

View File

@ -35,6 +35,12 @@ as that of the covered work. */
#include "iri.h"
#include "url.h"
#include "wget.h"
#define TEMP_PREFIX "temp_"
#define FILENAME_SIZE strlen("temp_") + strlen(file->name) + (sizeof ".")-1 \
+ (N_THREADS/10 + 1) + sizeof ""
struct range {
int first_byte;
@ -42,6 +48,7 @@ struct range {
int is_covered;
int is_assigned;
int resources_tried;
bool *resources;
uerr_t status_least_severe;
};
@ -61,4 +68,13 @@ struct s_thread_ctx
uerr_t status;
};
int
spawn_thread (struct s_thread_ctx*, char *, int, int);
int
collect_thread (sem_t *, struct s_thread_ctx *);
static void *
segmented_retrieve_url (void *);
#endif /* MULTI_H */

View File

@ -66,8 +66,6 @@ static pthread_mutex_t pconn_mutex = PTHREAD_MUTEX_INITIALIZER;
#define PCONN_UNLOCK() pthread_mutex_unlock (&pconn_mutex)
#define TEMP_PREFIX "temp_"
/* Total size of downloaded files. Used to enforce quota. */
SUM_SIZE_INT total_downloaded_bytes;
@ -939,19 +937,6 @@ bail:
return result;
}
static void *
segmented_retrieve_url (void *arg)
{
struct s_thread_ctx *ctx = (struct s_thread_ctx *) arg;
ctx->status = retrieve_url (ctx->url_parsed, ctx->url,
&ctx->file, &ctx->redirected,
ctx->referer, &ctx->dt,
false, ctx->i, true, ctx->range);
ctx->terminated = 1;
sem_post (ctx->retr_sem);
}
/* Find the URLs in the file and call retrieve_url() for each of them.
If HTML is true, treat the file as HTML, and construct the URLs
accordingly.
@ -1018,305 +1003,188 @@ retrieve_from_file (const char *file, bool html, int *count)
else if(metalink = metalink_context(url))
{
/*GSoC wget*/
char *command, **files;
int i, j, k, r, index, dt, url_err, error_severity;
int ret, N_THREADS = opt.jobs > 0 ? opt.jobs : 1;
int ranges_covered, chunk_size, num_of_resources;
pthread_t thread;
sem_t retr_sem;
uerr_t status;
metalink_file_t* file;
metalink_resource_t* resource;
struct url *url_parsed;
uerr_t status;
int ret, N_THREADS = opt.jobs > 0 ? opt.jobs : 1;
int free_threads, chunk_size, num_of_resources;
struct s_thread_ctx *thread_ctx;
struct range *ranges;
if(N_THREADS>1)
thread_ctx = malloc (N_THREADS * (sizeof *thread_ctx));
ranges = malloc (N_THREADS * (sizeof *ranges));
files = malloc (N_THREADS * (sizeof *files));
i = 0;
while ((file = metalink->files[i]) != NULL)
{
pthread_t thread;
sem_t retr_sem;
char *command;
thread_ctx = malloc (N_THREADS* (sizeof *thread_ctx));
ranges = malloc (N_THREADS * (sizeof *ranges));
i = 0;
while((file = metalink->files[i]) != NULL)
memset(thread_ctx, '\0', N_THREADS * (sizeof *thread_ctx));
for (j = 0; j < N_THREADS; ++j)
files[j] = malloc(FILENAME_SIZE);
chunk_size = (file->size) / N_THREADS;
num_of_resources = 0;
while (file->resources[num_of_resources])
++num_of_resources;
for (j = 0; j < N_THREADS; ++j)
{
memset(thread_ctx, '\0', N_THREADS * (sizeof *thread_ctx));
for(k = 0; k < N_THREADS; ++k)
thread_ctx[k].file = malloc( strlen(TEMP_PREFIX)
+ strlen(file->name)
+ (sizeof ".")-1
+ (N_THREADS/10 + 1) /* simpler than log10,
and close enough */
+ sizeof "" );
ranges[j].first_byte = j * chunk_size;
ranges[j].last_byte = (j+1) * chunk_size - 1;
ranges[j].is_covered = ranges[j].is_assigned = 0;
ranges[j].resources_tried = 0;
ranges[j].resources = malloc(num_of_resources * sizeof(bool));
for (k = 0; k < num_of_resources; ++k)
ranges[j].resources[k] = false;
}
ranges[k-1].last_byte = file->size -1;
chunk_size = (file->size) / N_THREADS;
for(k = 0; k < N_THREADS; ++k)
sem_init (&retr_sem, 0, 0);
j = ranges_covered = 0;
for (r = 0; r < N_THREADS; ++r)
{
resource = file->resources[j];
if (!resource)
{
ranges[k].first_byte = k * chunk_size;
ranges[k].last_byte = (k+1) * chunk_size - 1;
ranges[k].is_covered = ranges[k].is_assigned = 0;
ranges[k].resources_tried = 0;
j = 0;
resource = file->resources[j];
}
ranges[k-1].last_byte = file->size -1;
sem_init (&retr_sem, 0, 0);
free_threads = N_THREADS;
thread_ctx[r].file = files[r];
thread_ctx[r].range = ranges + r;
thread_ctx[r].referer = NULL;
thread_ctx[r].redirected = NULL;
thread_ctx[r].dt = dt;
thread_ctx[r].i = iri;
thread_ctx[r].url = resource->url;
thread_ctx[r].retr_sem = &retr_sem;
j = num_of_resources = 0;
while(1)
ret = spawn_thread (thread_ctx, file->name, r, j);
if (ret)
{
attemptfail: resource = file->resources[j];
char *error = url_error (thread_ctx[r].url, thread_ctx[r].url_err);
logprintf (LOG_NOTQUIET, "%s: %s.\n", thread_ctx[r].url, error);
xfree (error);
for(r = 0; r < N_THREADS; ++r)
{
if (! (ranges[r].is_assigned || ranges[r].is_covered))
break;
}
if (r < N_THREADS)
{
if (free_threads)
{
if (!resource)
{
num_of_resources = j;
j = 0;
resource = file->resources[j];
}
if (url = resource->url)
{
for (k = 0; k < N_THREADS; k++)
if (! thread_ctx[k].used)
{
index = k;
free_threads--;
thread_ctx[k].used = 1;
thread_ctx[k].terminated = 0;
break;
}
url_parsed = url_parse (url, &url_err, iri, true);
if (!url_parsed)
{
char *error = url_error (url, url_err);
logprintf (LOG_NOTQUIET, "%s: %s.\n", url,
error);
xfree (error);
return URLERROR;
}
if (!opt.base_href)
opt.base_href = xstrdup (url);
sprintf(thread_ctx[index].file, TEMP_PREFIX "%s.%d",
file->name, r);
/* Update this when configuring fallbacking code
so that downloading goes on from where the
previous resource failed. */
if(file_exists_p(thread_ctx[index].file))
{
command = malloc(15 + (N_THREADS)*(strlen(file->name) +
(N_THREADS/10 + 1) + 2) + strlen(file->name));
sprintf(command, "rm -f " TEMP_PREFIX "%s.*", file->name);
system(command);
free(command);
}
thread_ctx[index].referer = NULL;
thread_ctx[index].dt = dt;
thread_ctx[index].i = iri;
thread_ctx[index].redirected = NULL;
thread_ctx[index].url = url;
thread_ctx[index].retr_sem = &retr_sem;
thread_ctx[index].url_parsed = url_parsed;
thread_ctx[index].range = ranges + r;
(thread_ctx[index].range)->is_assigned = 1;
pthread_create (&thread, NULL, segmented_retrieve_url,
&thread_ctx[index]);
++j;
continue;
}
}
else
{
do
ret = sem_wait (&retr_sem);
while (ret < 0 && errno == EINTR);
for (k = 0; k < N_THREADS; k++)
if (thread_ctx[k].used && thread_ctx[k].terminated)
{
status = thread_ctx[k].status;
/* Check return status of thread for errors. */
if (status == RETROK)
(thread_ctx[k].range)->is_covered = 1;
else
{
PCONN_LOCK ();
/* Pick the least severe error.*/
error_severity = get_exit_status();
inform_exit_status((thread_ctx[k].range)->status_least_severe);
if(get_exit_status() != error_severity)
(thread_ctx[k].range)->status_least_severe = status;
PCONN_UNLOCK ();
++(thread_ctx[k].range)->resources_tried;
}
thread_ctx[k].used = 0;
url_free (thread_ctx[k].url_parsed);
free_threads++;
break;
}
if(num_of_resources &&
ranges[k].resources_tried == num_of_resources)
goto segmentfail;
else if(ranges[k].resources_tried < num_of_resources)
{
(thread_ctx[k].range)->is_assigned = 0;
(thread_ctx[k].range)->is_covered = 0;
}
}
}
else
{
segmentfail: while(free_threads < N_THREADS)
{
do
ret = sem_wait (&retr_sem);
while (ret < 0 && errno == EINTR);
for (k = 0; k < N_THREADS; k++)
if (thread_ctx[k].used && thread_ctx[k].terminated)
{
status = thread_ctx[k].status;
/* Check return status of thread for errors. */
if (status == RETROK)
(thread_ctx[k].range)->is_covered = 1;
else
{
PCONN_LOCK ();
/* Pick the least severe error.*/
error_severity = get_exit_status();
inform_exit_status((thread_ctx[k].range)->status_least_severe);
if(get_exit_status() != error_severity)
(thread_ctx[k].range)->status_least_severe = status;
PCONN_UNLOCK ();
++(thread_ctx[k].range)->resources_tried;
}
thread_ctx[k].used = 0;
url_free (thread_ctx[k].url_parsed);
free_threads++;
break;
}
if(num_of_resources &&
ranges[k].resources_tried == num_of_resources)
{
(thread_ctx[k].range)->is_assigned = 0;
(thread_ctx[k].range)->is_covered = 0;
}
else if(ranges[k].resources_tried < num_of_resources)
goto attemptfail;
}
/* If true, goto segmentfail was used to come here.*/
if(r < N_THREADS)
status = ranges[r].status_least_severe;
else
status = RETROK;
break;
}
free(ranges[r].resources);
for (r = 0; r < N_THREADS; ++r)
free(files[r]);
free(thread_ctx);
free(ranges);
free(files);
return URLERROR;
}
for(k = 0; k < N_THREADS; ++k)
free(thread_ctx[k].file);
++j;
}
sem_destroy(&retr_sem);
command = malloc(15 + (N_THREADS) * (strlen(file->name) +
(N_THREADS/10 + 1) + 2) + strlen(file->name));
if (status != RETROK)
while(ranges_covered < N_THREADS)
{
r = collect_thread (&retr_sem, thread_ctx);
status = thread_ctx[r].status;
/* Check return status of thread for errors. */
if (status == RETROK)
{
/* Segment r could not be downloaded due to status.
Do not return, go on with the download. */
(thread_ctx[r].range)->is_covered = 1;
++ranges_covered;
}
else
{
sprintf(command, "cat temp_%s.* > %s",file->name , file->name);
system(command);
}
sprintf(command, "rm -f temp_%s.*", file->name);
system(command);
free(command);
++i;
}
free(ranges);
free (thread_ctx);
}
else
{
uerr_t status_least_severe;
i = 0;
while((file = metalink->files[i]) != NULL)
{
j = 0;
while((resource = file->resources[j]) != NULL)
{
url = resource->url;
url_parsed = url_parse (url, &url_err, iri, true);
if (!url_parsed)
{
char *error = url_error (url, url_err);
logprintf (LOG_NOTQUIET, "%s: %s.\n", url, error);
xfree (error);
return URLERROR;
}
if (!opt.base_href)
opt.base_href = xstrdup (url);
status = retrieve_url (url_parsed, url, &(file->name), NULL,
NULL, &dt, false, iri, true, NULL);
url_free (url_parsed);
/* 3 indicates WGET_EXIT_IO_FAIL. Used the integral value to
* avoid including a .c file (i.e. exits.c) in retr.c. */
if ((status == RETROK) ||
get_exit_status() == 3)
break;
PCONN_LOCK ();
/* Pick the least severe error.*/
error_severity = get_exit_status();
inform_exit_status(status_least_severe);
inform_exit_status((thread_ctx[r].range)->status_least_severe);
if(get_exit_status() != error_severity)
status_least_severe = status;
(thread_ctx[r].range)->status_least_severe = status;
PCONN_UNLOCK ();
++j;
++(thread_ctx[r].range)->resources_tried;
for (j = 0; j < num_of_resources; ++j)
if (!((thread_ctx[r].range)->resources)[j])
break;
if (j < num_of_resources)
{
thread_ctx[r].url = file->resources[j]->url;
ret = spawn_thread (thread_ctx, file->name, r, j);
if (ret)
{
char *error = url_error (thread_ctx[r].url, thread_ctx[r].url_err);
logprintf (LOG_NOTQUIET, "%s: %s.\n", thread_ctx[r].url, error);
xfree (error);
for(r = 0; r < N_THREADS; ++r)
free(ranges[r].resources);
for (r = 0; r < N_THREADS; ++r)
free(files[r]);
free(thread_ctx);
free(ranges);
free(files);
return URLERROR;
}
}
else
break;
}
if(! resource)
status = status_least_severe;
if (status != RETROK)
{
/* File file->name could not be downloaded due to status.
Do not return, go on with the download. */
}
++i;
}
sem_destroy(&retr_sem);
command = malloc(sizeof("cat ")
+ N_THREADS * (sizeof(TEMP_PREFIX) + strlen(file->name)
+ (sizeof(".") - 1) + (N_THREADS/10 + 1)
+ (sizeof(" ") - 1))
+ (sizeof("> ") - 1)
+ strlen(file->name)
+ sizeof(""));
if (status != RETROK)
{
/* Segment r could not be downloaded due to
ranges[r].status_least_severe. Going on with the download. */
}
else
{
sprintf(command, "cat ");
j=0;
r = 4;
while(j < opt.jobs)
{
sprintf(command+r, TEMP_PREFIX "%s.%d ", file->name, j++);
r = strlen(command);
}
sprintf(command+r, "> %s", file->name);
system(command);
}
sprintf(command, "rm -f ");
j=0;
r = 6;
while(j < opt.jobs)
{
sprintf(command+r, TEMP_PREFIX "%s.%d ", file->name, j++);
r = strlen(command);
}
system(command);
free(command);
for (j = 0; j < N_THREADS; ++j)
free(ranges[j].resources);
for (j = 0; j < N_THREADS; ++j)
free(files[j]);
}
free(files);
free(ranges);
free(thread_ctx);
*count = i;
iri_free (iri);
/* delete metalink_t */
metalink_delete(metalink);
/*What exactly to return MUST be reconsidered.*/
return status;
}
else