[client] Use single worker thread in getfs proc.

This commit is contained in:
Jiaqiang Xu 2014-01-08 17:30:44 +08:00
parent 7dc1fb1dee
commit a5ddb28f0f

View File

@ -55,16 +55,28 @@ enum {
FETCH_OBJECT
};
typedef struct {
gboolean worker_running;
GQueue *inspect_queue; /* objects to check exists */
typedef struct ThreadData {
gint refcnt;
CcnetProcessor *processor;
gboolean is_clone;
int cmd_pipe;
uint32_t cevent_id;
char root_id[41];
GHashTable *fs_objects;
GList *fetch_objs;
} ThreadData;
typedef struct {
gboolean worker_checking;
gboolean worker_started;
GQueue *inspect_queue; /* objects to check exists */
int pending_objects;
guint32 writer_id;
/* Used by worker thread */
char root_id[41];
GList *fetch_objs;
int cmd_pipe[2];
uint32_t cevent_id;
ThreadData *tdata;
char buf[4096];
char *bufptr;
@ -88,17 +100,40 @@ static void handle_response (CcnetProcessor *processor,
char *code, char *code_msg,
char *content, int clen);
static void
thread_data_ref (ThreadData *tdata)
{
g_atomic_int_inc (&tdata->refcnt);
}
static void
thread_data_unref (ThreadData *tdata)
{
if (g_atomic_int_dec_and_test (&tdata->refcnt)) {
if (tdata->fetch_objs)
string_list_free (tdata->fetch_objs);
if (tdata->fs_objects)
g_hash_table_destroy (tdata->fs_objects);
g_free (tdata);
}
}
static void
release_resource(CcnetProcessor *processor)
{
USE_PRIV;
g_queue_free (priv->inspect_queue);
g_hash_table_destroy (priv->fs_objects);
g_free (priv->obj_seg);
if (priv->fetch_objs)
string_list_free (priv->fetch_objs);
g_free (priv->obj_seg);
seaf_obj_store_unregister_async_write (seaf->fs_mgr->obj_store, priv->writer_id);
if (priv->worker_started) {
/* The worker thread will notice the command pipe has been closed and exits.
*/
pipeclose (priv->cmd_pipe[1]);
cevent_manager_unregister (seaf->ev_mgr, priv->cevent_id);
thread_data_unref (priv->tdata);
}
CCNET_PROCESSOR_CLASS (seafile_getfs_proc_parent_class)->release_resource (processor);
}
@ -164,23 +199,21 @@ request_object_batch (CcnetProcessor *processor,
* all non-existent or invalid objects have been put into data->fetch_objs.
*/
static void
check_seafdir (CcnetProcessor *processor, const char *dir_id)
check_seafdir (ThreadData *tdata, const char *dir_id)
{
SeafileGetfsProc *proc = (SeafileGetfsProc *)processor;
USE_PRIV;
SeafDir *dir = NULL;
GList *ptr;
SeafDirent *dent;
if (!seaf_fs_manager_object_exists(seaf->fs_mgr, dir_id)) {
priv->fetch_objs = g_list_prepend (priv->fetch_objs, g_strdup(dir_id));
tdata->fetch_objs = g_list_prepend (tdata->fetch_objs, g_strdup(dir_id));
return;
}
dir = seaf_fs_manager_get_seafdir (seaf->fs_mgr, dir_id);
if (!dir) {
/* corrupt dir object */
priv->fetch_objs = g_list_prepend (priv->fetch_objs, g_strdup(dir_id));
tdata->fetch_objs = g_list_prepend (tdata->fetch_objs, g_strdup(dir_id));
return;
}
@ -188,19 +221,19 @@ check_seafdir (CcnetProcessor *processor, const char *dir_id)
dent = ptr->data;
/* Don't check objects that have been checked before. */
if (g_hash_table_lookup (priv->fs_objects, dent->id))
if (g_hash_table_lookup (tdata->fs_objects, dent->id))
continue;
g_hash_table_insert (priv->fs_objects, g_strdup(dent->id), (gpointer)1);
g_hash_table_insert (tdata->fs_objects, g_strdup(dent->id), (gpointer)1);
if (!seaf_fs_manager_object_exists(seaf->fs_mgr, dent->id)) {
priv->fetch_objs = g_list_prepend (priv->fetch_objs, g_strdup(dent->id));
tdata->fetch_objs = g_list_prepend (tdata->fetch_objs, g_strdup(dent->id));
continue;
}
if (S_ISDIR(dent->mode)) {
check_seafdir (processor, dent->id);
} else if (S_ISREG (dent->mode) && proc->tx_task->is_clone) {
check_seafdir (tdata, dent->id);
} else if (S_ISREG (dent->mode) && tdata->is_clone) {
/* Only check seafile object integrity when clone.
* This is for the purpose of recovery.
* In ordinary sync, checking every file object's integrity would
@ -212,7 +245,7 @@ check_seafdir (CcnetProcessor *processor, const char *dir_id)
if (!ok && !err) {
seaf_warning ("File object %.8s is corrupt, recover from server.\n",
dent->id);
priv->fetch_objs = g_list_prepend (priv->fetch_objs, g_strdup(dent->id));
tdata->fetch_objs = g_list_prepend (tdata->fetch_objs, g_strdup(dent->id));
}
}
}
@ -225,11 +258,11 @@ check_end_condition (SeafileGetfsProcPriv *priv)
{
return (g_queue_get_length (priv->inspect_queue) == 0 &&
priv->pending_objects == 0 &&
!priv->worker_running);
!priv->worker_checking);
}
static int
check_fs_tree_from (CcnetProcessor *processor, const char *root_id);
check_fs_tree_from (ThreadData *tdata, const char *root_id);
static void
end_or_check_next_dir (CcnetProcessor *processor, SeafileGetfsProcPriv *priv)
@ -241,14 +274,14 @@ end_or_check_next_dir (CcnetProcessor *processor, SeafileGetfsProcPriv *priv)
return;
}
if (priv->worker_running) {
if (priv->worker_checking) {
return;
}
/* Trigger checking the next dir. */
char *next_dir_id = g_queue_pop_head (priv->inspect_queue);
if (next_dir_id) {
if (check_fs_tree_from (processor, next_dir_id) < 0) {
if (check_fs_tree_from (priv->tdata, next_dir_id) < 0) {
transfer_task_set_error (((SeafileGetfsProc *)processor)->tx_task,
TASK_ERR_DOWNLOAD_FS);
ccnet_processor_send_update (processor, SC_SHUTDOWN, SS_SHUTDOWN, NULL, 0);
@ -261,56 +294,71 @@ end_or_check_next_dir (CcnetProcessor *processor, SeafileGetfsProcPriv *priv)
static void *
check_objects_thread (void *vdata)
{
CcnetProcessor *processor = vdata;
USE_PRIV;
ThreadData *tdata = vdata;
int cmd;
check_seafdir (processor, priv->root_id);
/* Hold one reference for worker thread. */
thread_data_ref (tdata);
while (1) {
int n = piperead (tdata->cmd_pipe, &cmd, sizeof(cmd));
if (n < 0) {
seaf_warning ("Failed to read commnd pipe: %s.\n", strerror(errno));
goto out;
}
if (n == 0) {
seaf_message ("Getfs proc is done, worker thread exits.\n");
goto out;
}
check_seafdir (tdata, tdata->root_id);
cevent_manager_add_event (seaf->ev_mgr, tdata->cevent_id, tdata);
}
out:
pipeclose (tdata->cmd_pipe);
thread_data_unref (tdata);
return vdata;
}
static void
check_objects_done (void *vdata)
check_objects_done (CEvent *event, void *unused)
{
CcnetProcessor *processor = vdata;
ThreadData *tdata = event->data;
CcnetProcessor *processor = tdata->processor;
USE_PRIV;
GList *ptr;
char *obj_id;
priv->worker_running = FALSE;
priv->worker_checking = FALSE;
request_object_batch_begin (priv);
for (ptr = priv->fetch_objs; ptr; ptr = ptr->next) {
for (ptr = tdata->fetch_objs; ptr; ptr = ptr->next) {
obj_id = ptr->data;
request_object_batch (processor, priv, obj_id);
g_free (obj_id);
}
request_object_batch_flush (processor, priv);
g_list_free (priv->fetch_objs);
priv->fetch_objs = NULL;
g_list_free (tdata->fetch_objs);
tdata->fetch_objs = NULL;
end_or_check_next_dir (processor, priv);
}
static int
check_fs_tree_from (CcnetProcessor *processor, const char *root_id)
check_fs_tree_from (ThreadData *tdata, const char *root_id)
{
CcnetProcessor *processor = tdata->processor;
USE_PRIV;
memcpy (priv->root_id, root_id, 40);
priv->fetch_objs = NULL;
memcpy (tdata->root_id, root_id, 40);
tdata->fetch_objs = NULL;
int rc = ccnet_processor_thread_create (processor,
seaf->job_mgr,
check_objects_thread,
check_objects_done,
processor);
if (rc < 0) {
seaf_warning ("Failed to start worker thread.\n");
return -1;
}
int cmd = 1;
pipewrite (priv->cmd_pipe[1], &cmd, sizeof(cmd));
priv->worker_running = TRUE;
priv->worker_checking = TRUE;
return 0;
}
@ -429,6 +477,41 @@ process_fs_object_seg (CcnetProcessor *processor)
}
}
static int
start_worker_thread (CcnetProcessor *processor)
{
SeafileGetfsProc *proc = (SeafileGetfsProc *)processor;
USE_PRIV;
ThreadData *tdata;
if (ccnet_pipe (priv->cmd_pipe) < 0)
return -1;
priv->cevent_id = cevent_manager_register (seaf->ev_mgr,
check_objects_done,
processor);
tdata = g_new0 (ThreadData, 1);
tdata->cmd_pipe = priv->cmd_pipe[0];
tdata->cevent_id = priv->cevent_id;
tdata->processor = processor;
tdata->is_clone = proc->tx_task->is_clone;
tdata->fs_objects = g_hash_table_new_full (g_str_hash, g_str_equal,
g_free, NULL);
/* Hold one reference for the main thread. */
thread_data_ref (tdata);
priv->tdata = tdata;
ccnet_job_manager_schedule_job (seaf->job_mgr,
check_objects_thread,
NULL,
tdata);
priv->worker_started = TRUE;
return 0;
}
static void
load_fsroot_list (CcnetProcessor *processor)
{
@ -458,6 +541,12 @@ handle_response (CcnetProcessor *processor,
switch (processor->state) {
case REQUEST_SENT:
if (strncmp(code, SC_OK, 3) == 0) {
if (start_worker_thread (processor) < 0) {
ccnet_processor_send_update (processor, SC_SHUTDOWN, SS_SHUTDOWN,
NULL, 0);
ccnet_processor_done (processor, FALSE);
return;
}
load_fsroot_list (processor);
processor->state = FETCH_OBJECT;
return;
@ -506,8 +595,6 @@ start (CcnetProcessor *processor, int argc, char **argv)
processor->state = REQUEST_SENT;
priv->inspect_queue = g_queue_new ();
priv->fs_objects = g_hash_table_new_full (g_str_hash, g_str_equal,
g_free, NULL);
priv->writer_id = seaf_obj_store_register_async_write (seaf->fs_mgr->obj_store,
fs_object_write_cb,