[client] Split huge data set to multiple batches for upload.

This commit is contained in:
Jiaqiang Xu 2014-07-08 10:51:34 +08:00
parent 100f13eb9b
commit abda0be437
7 changed files with 160 additions and 23 deletions

View File

@ -965,7 +965,8 @@ int add_to_index(const char *repo_id,
int flags,
SeafileCrypt *crypt,
IndexCB index_cb,
const char *modifier)
const char *modifier,
gboolean *added)
{
int size, namelen;
mode_t st_mode = st->st_mode;
@ -974,6 +975,8 @@ int add_to_index(const char *repo_id,
unsigned ce_option = CE_MATCH_IGNORE_VALID|CE_MATCH_IGNORE_SKIP_WORKTREE|CE_MATCH_RACY_IS_DIRTY;
int add_option = (ADD_CACHE_OK_TO_ADD|ADD_CACHE_OK_TO_REPLACE);
*added = FALSE;
if (!S_ISREG(st_mode) && !S_ISLNK(st_mode) && !S_ISDIR(st_mode)) {
g_warning("%s: can only add regular files, symbolic links or git-directories\n", path);
return -1;
@ -1036,6 +1039,8 @@ update_index:
g_warning("unable to add %s to index\n",path);
return -1;
}
*added = TRUE;
return 0;
}

View File

@ -444,7 +444,8 @@ int add_to_index(const char *repo_id,
int flags,
struct SeafileCrypt *crypt,
IndexCB index_cb,
const char *modifier);
const char *modifier,
gboolean *added);
int
add_empty_dir_to_index (struct index_state *istate,

View File

@ -413,6 +413,12 @@ index_cb (const char *repo_id,
return 0;
}
#define MAX_COMMIT_SIZE 100 * (1 << 20) /* 100MB */
/*
* @remain_files: returns the files haven't been added under this path.
* If it's set to NULL, no partial commit will be created.
*/
static int
add_recursive (const char *repo_id,
int version,
@ -422,7 +428,9 @@ add_recursive (const char *repo_id,
const char *path,
SeafileCrypt *crypt,
gboolean ignore_empty_dir,
GList *ignore_list)
GList *ignore_list,
gint64 *total_size,
GQueue **remain_files)
{
char *full_path;
GDir *dir;
@ -447,8 +455,21 @@ add_recursive (const char *repo_id,
}
if (S_ISREG(st.st_mode)) {
ret = add_to_index (repo_id, version, istate, path, full_path,
&st, 0, crypt, index_cb, modifier);
gboolean added = FALSE;
if (!remain_files) {
ret = add_to_index (repo_id, version, istate, path, full_path,
&st, 0, crypt, index_cb, modifier, &added);
} else if (*remain_files == NULL) {
ret = add_to_index (repo_id, version, istate, path, full_path,
&st, 0, crypt, index_cb, modifier, &added);
if (added) {
*total_size += (gint64)(st.st_size);
if (*total_size >= MAX_COMMIT_SIZE)
*remain_files = g_queue_new ();
}
} else
g_queue_push_tail (*remain_files, g_strdup(path));
g_free (full_path);
return ret;
}
@ -476,7 +497,8 @@ add_recursive (const char *repo_id,
#endif
ret = add_recursive (repo_id, version, modifier,
istate, worktree, subpath,
crypt, ignore_empty_dir, ignore_list);
crypt, ignore_empty_dir, ignore_list,
total_size, remain_files);
g_free (subpath);
if (ret < 0)
break;
@ -486,7 +508,10 @@ add_recursive (const char *repo_id,
goto bad;
if (n == 0 && path[0] != 0 && !ignore_empty_dir) {
add_empty_dir_to_index (istate, path, &st);
if (!remain_files || *remain_files == NULL)
add_empty_dir_to_index (istate, path, &st);
else
g_queue_push_tail (*remain_files, g_strdup(path));
}
}
@ -561,7 +586,8 @@ scan_worktree_for_changes (struct index_state *istate, SeafRepo *repo,
SeafileCrypt *crypt, GList *ignore_list)
{
if (add_recursive (repo->id, repo->version, repo->email,
istate, repo->worktree, "", crypt, FALSE, ignore_list) < 0)
istate, repo->worktree, "", crypt, FALSE, ignore_list,
NULL, NULL) < 0)
return -1;
remove_deleted (istate, repo->worktree, ignore_list);
@ -600,7 +626,7 @@ out:
static int
add_path_to_index (SeafRepo *repo, struct index_state *istate,
SeafileCrypt *crypt, const char *path, GList *ignore_list,
GList **scanned_dirs)
GList **scanned_dirs, gint64 *total_size, GQueue **remain_files)
{
char *full_path;
SeafStat st;
@ -611,7 +637,8 @@ add_path_to_index (SeafRepo *repo, struct index_state *istate,
if (path[0] == 0) {
add_recursive (repo->id, repo->version, repo->email, istate,
repo->worktree, path,
crypt, FALSE, ignore_list);
crypt, FALSE, ignore_list,
total_size, remain_files);
return 0;
}
@ -654,12 +681,53 @@ add_path_to_index (SeafRepo *repo, struct index_state *istate,
/* Add is always recursive */
add_recursive (repo->id, repo->version, repo->email, istate, repo->worktree, path,
crypt, FALSE, ignore_list);
crypt, FALSE, ignore_list, total_size, remain_files);
g_free (full_path);
return 0;
}
static int
add_remain_files (SeafRepo *repo, struct index_state *istate,
SeafileCrypt *crypt, GQueue *remain_files,
GList *ignore_list, gint64 *total_size)
{
char *path;
char *full_path;
SeafStat st;
while ((path = g_queue_pop_head (remain_files)) != NULL) {
full_path = g_build_filename (repo->worktree, path, NULL);
if (seaf_stat (full_path, &st) < 0) {
seaf_warning ("Failed to stat %s: %s.\n", full_path, strerror(errno));
g_free (path);
g_free (full_path);
continue;
}
if (S_ISREG(st.st_mode)) {
gboolean added = FALSE;
add_to_index (repo->id, repo->version, istate, path, full_path,
&st, 0, crypt, index_cb, repo->email, &added);
if (added) {
*total_size += (gint64)(st.st_size);
if (*total_size >= MAX_COMMIT_SIZE) {
g_free (path);
g_free (full_path);
break;
}
}
} else if (S_ISDIR(st.st_mode)) {
if (is_empty_dir (full_path, ignore_list))
add_empty_dir_to_index (istate, path, &st);
}
g_free (path);
g_free (full_path);
}
return 0;
}
static void
try_add_empty_parent_dir_entry (SeafRepo *repo, struct index_state *istate,
GList *ignore_list, const char *path)
@ -701,12 +769,21 @@ apply_worktree_changes_to_index (SeafRepo *repo, struct index_state *istate,
GList *scanned_dirs = NULL;
WTEvent *last_event;
pthread_mutex_lock (&status->q_lock);
last_event = g_queue_peek_tail (status->event_q);
pthread_mutex_unlock (&status->q_lock);
if (repo->create_partial_commit && status->last_event != NULL)
last_event = status->last_event;
else {
if (!repo->create_partial_commit)
status->last_event = NULL;
pthread_mutex_lock (&status->q_lock);
last_event = g_queue_peek_tail (status->event_q);
pthread_mutex_unlock (&status->q_lock);
}
if (!last_event)
goto out;
gint64 total_size = 0;
while (1) {
pthread_mutex_lock (&status->q_lock);
event = g_queue_pop_head (status->event_q);
@ -716,8 +793,37 @@ apply_worktree_changes_to_index (SeafRepo *repo, struct index_state *istate,
switch (event->ev_type) {
case WT_EVENT_CREATE_OR_UPDATE:
add_path_to_index (repo, istate, crypt, event->path,
ignore_list, &scanned_dirs);
if (!repo->create_partial_commit) {
add_path_to_index (repo, istate, crypt, event->path,
ignore_list, &scanned_dirs,
&total_size, NULL);
} else if (!event->remain_files) {
GQueue *remain_files = NULL;
add_path_to_index (repo, istate, crypt, event->path,
ignore_list, &scanned_dirs,
&total_size, &remain_files);
if (remain_files) {
/* Cache remaining files in the event structure. */
event->remain_files = remain_files;
pthread_mutex_lock (&status->q_lock);
g_queue_push_head (status->event_q, event);
pthread_mutex_unlock (&status->q_lock);
/* Set status->last_event to signify partial commit. */
status->last_event = last_event;
goto out;
}
} else {
add_remain_files (repo, istate, crypt, event->remain_files,
ignore_list, &total_size);
if (g_queue_get_length (event->remain_files) != 0) {
pthread_mutex_lock (&status->q_lock);
g_queue_push_head (status->event_q, event);
pthread_mutex_unlock (&status->q_lock);
goto out;
}
}
break;
case WT_EVENT_DELETE:
remove_from_index_with_prefix (istate, event->path);
@ -747,17 +853,19 @@ apply_worktree_changes_to_index (SeafRepo *repo, struct index_state *istate,
*/
add_recursive (repo->id, repo->version, repo->email,
istate, repo->worktree, event->new_path,
crypt, FALSE, ignore_list);
crypt, FALSE, ignore_list,
NULL, NULL);
break;
case WT_EVENT_OVERFLOW:
/* Linux only */
seaf_warning ("Inotify kernel event queue overflowed, fall back to scan.\n");
seaf_warning ("Kernel event queue overflowed, fall back to scan.\n");
scan_worktree_for_changes (istate, repo, crypt, ignore_list);
break;
}
if (event == last_event) {
wt_event_free (event);
if (status->last_event != NULL)
status->last_event = NULL;
break;
} else
wt_event_free (event);
@ -830,8 +938,9 @@ retry:
if (is_empty_dir (path, ignore_list))
add_empty_dir_to_index (istate, ce_name, &st);
} else {
gboolean added;
add_to_index (repo->id, repo->version, istate, ce_name, path,
&st, 0, crypt, index_cb, repo->email);
&st, 0, crypt, index_cb, repo->email, &added);
}
}
@ -926,7 +1035,8 @@ seaf_repo_index_worktree_files (const char *repo_id,
* dir, we'll fail to detect fast-forward relationship later.
*/
if (add_recursive (repo_id, repo_version, modifier,
&istate, worktree, "", crypt, FALSE, ignore_list) < 0)
&istate, worktree, "", crypt, FALSE, ignore_list,
NULL, NULL) < 0)
goto error;
remove_deleted (&istate, worktree, ignore_list);

View File

@ -71,6 +71,8 @@ struct _SeafRepo {
unsigned int access_denied_notified : 1;
int version;
gboolean create_partial_commit;
};

View File

@ -1262,9 +1262,10 @@ start_sync (SeafSyncManager *manager, SeafRepo *repo,
task->info->in_sync = TRUE;
task->repo = repo;
if (need_commit)
if (need_commit) {
repo->create_partial_commit = FALSE;
commit_repo (task);
else
} else
start_sync_repo_proc (manager, task);
}
@ -1361,6 +1362,7 @@ create_commit_from_event_queue (SeafSyncManager *manager, SeafRepo *repo,
if (status->last_check == 0) {
/* Force commit and sync after a new repo is added. */
task = create_sync_task_v2 (manager, repo, is_manual_sync, TRUE);
repo->create_partial_commit = TRUE;
commit_repo (task);
status->last_check = now;
ret = TRUE;
@ -1370,12 +1372,14 @@ create_commit_from_event_queue (SeafSyncManager *manager, SeafRepo *repo,
*/
if (now - last_changed >= 2) {
task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
repo->create_partial_commit = TRUE;
commit_repo (task);
status->last_check = now;
ret = TRUE;
}
} else if (status->last_event != NULL) {
task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
repo->create_partial_commit = TRUE;
commit_repo (task);
ret = TRUE;
}

View File

@ -17,11 +17,20 @@ WTEvent *wt_event_new (int type, const char *path, const char *new_path)
return event;
}
static void free_path (gpointer data, gpointer user_data)
{
g_free (data);
}
void wt_event_free (WTEvent *event)
{
g_free (event->path);
g_free (event->new_path);
g_free (event);
if (event->remain_files) {
g_queue_foreach (event->remain_files, free_path, NULL);
g_queue_free (event->remain_files);
}
}
/* WTStatus */

View File

@ -15,6 +15,12 @@ typedef struct WTEvent {
int ev_type;
char *path;
char *new_path; /* only used by rename event */
/* For CREATE_OR_UPDATE events, if a partial commit was created when
* adding files recursively, the remaining files will be cached in
* this queue so that we don't have to rescan the dir from beginning.
*/
GQueue *remain_files;
} WTEvent;
WTEvent *wt_event_new (int type, const char *path, const char *new_path);