Use unified progress for multipart uploads.

This commit is contained in:
Jonathan Xu 2016-12-16 14:37:03 +08:00
parent f7f8356283
commit 656666a9af
6 changed files with 93 additions and 27 deletions

View File

@ -508,9 +508,16 @@ convert_http_task (HttpTxTask *task)
} else {
g_object_set (t, "ttype", "upload", NULL);
if (task->runtime_state == HTTP_TASK_RT_STATE_BLOCK) {
g_object_set (t, "block_total", task->n_blocks,
"block_done", task->done_blocks,
NULL);
SyncInfo *info = seaf_sync_manager_get_sync_info (seaf->sync_mgr, task->repo_id);
if (info && info->multipart_upload) {
g_object_set (t, "block_total", info->total_bytes,
"block_done", info->uploaded_bytes,
NULL);
} else {
g_object_set (t, "block_total", task->n_blocks,
"block_done", task->done_blocks,
NULL);
}
g_object_set (t, "rate", http_tx_task_get_rate(task), NULL);
}
}

View File

@ -3024,7 +3024,7 @@ send_block_callback (void *ptr, size_t size, size_t nmemb, void *userp)
}
static int
send_block (HttpTxTask *task, Connection *conn, const char *block_id)
send_block (HttpTxTask *task, Connection *conn, const char *block_id, guint32 *psize)
{
CURL *curl;
char *url;
@ -3088,8 +3088,12 @@ send_block (HttpTxTask *task, Connection *conn, const char *block_id)
seaf_warning ("Bad response code for PUT %s: %d.\n", url, status);
handle_http_errors (task, status);
ret = -1;
goto out;
}
if (psize)
*psize = bmd->size;
out:
g_free (url);
curl_easy_reset (curl);
@ -3109,6 +3113,7 @@ typedef struct BlockUploadData {
typedef struct BlockUploadTask {
char block_id[41];
int result;
guint32 block_size;
} BlockUploadTask;
static void
@ -3134,7 +3139,7 @@ upload_block_thread_func (gpointer data, gpointer user_data)
goto out;
}
ret = send_block (http_task, conn, task->block_id);
ret = send_block (http_task, conn, task->block_id, &task->block_size);
connection_pool_return_connection (tx_data->cpool, conn);
@ -3156,6 +3161,7 @@ multi_threaded_send_blocks (HttpTxTask *http_task, GList *block_list)
GList *ptr;
char *block_id;
BlockUploadTask *task;
SyncInfo *info;
int ret = 0;
if (block_list == NULL)
@ -3182,6 +3188,8 @@ multi_threaded_send_blocks (HttpTxTask *http_task, GList *block_list)
g_free,
(GDestroyNotify)block_upload_task_free);
info = seaf_sync_manager_get_sync_info (seaf->sync_mgr, http_task->repo_id);
for (ptr = block_list; ptr; ptr = ptr->next) {
block_id = ptr->data;
@ -3205,6 +3213,10 @@ multi_threaded_send_blocks (HttpTxTask *http_task, GList *block_list)
++(http_task->done_blocks);
if (info && info->multipart_upload) {
info->uploaded_bytes += (gint64)task->block_size;
}
g_hash_table_remove (pending_tasks, task->block_id);
if (g_hash_table_size(pending_tasks) == 0)
break;

View File

@ -1342,8 +1342,10 @@ add_file (const char *repo_id,
path,
NULL);
}
} else
} else {
*total_size += (gint64)(st->st_size);
g_queue_push_tail (*remain_files, g_strdup(path));
}
if (ret < 0) {
seaf_sync_manager_update_active_path (seaf->sync_mgr,
@ -2532,6 +2534,8 @@ handle_add_files (SeafRepo *repo, struct index_state *istate,
WTStatus *status, WTEvent *event,
GList **scanned_dirs, gint64 *total_size)
{
SyncInfo *info;
if (!repo->create_partial_commit) {
/* XXX: We now use remain_files = NULL to signify not creating
* partial commits. It's better to use total_size = NULL for
@ -2571,6 +2575,12 @@ handle_add_files (SeafRepo *repo, struct index_state *istate,
pthread_mutex_lock (&status->q_lock);
g_queue_push_head (status->event_q, event);
pthread_mutex_unlock (&status->q_lock);
info = seaf_sync_manager_get_sync_info (seaf->sync_mgr, repo->id);
if (!info->multipart_upload) {
info->multipart_upload = TRUE;
info->total_bytes = *total_size;
}
}
return TRUE;
@ -2585,6 +2595,9 @@ handle_add_files (SeafRepo *repo, struct index_state *istate,
g_queue_push_head (status->event_q, event);
pthread_mutex_unlock (&status->q_lock);
return TRUE;
} else {
info = seaf_sync_manager_get_sync_info (seaf->sync_mgr, repo->id);
info->end_multipart_upload = TRUE;
}
if (*total_size >= MAX_COMMIT_SIZE)
return TRUE;

View File

@ -599,7 +599,7 @@ find_meaningful_commit (SeafCommit *commit, void *data, gboolean *stop)
}
static void
notify_sync (SeafRepo *repo)
notify_sync (SeafRepo *repo, gboolean is_multipart_upload)
{
SeafCommit *head = NULL;
@ -621,9 +621,14 @@ notify_sync (SeafRepo *repo)
head->commit_id,
head->parent_id,
head->desc);
seaf_mq_manager_publish_notification (seaf->mq_mgr,
"sync.done",
buf->str);
if (!is_multipart_upload)
seaf_mq_manager_publish_notification (seaf->mq_mgr,
"sync.done",
buf->str);
else
seaf_mq_manager_publish_notification (seaf->mq_mgr,
"sync.multipart_upload",
buf->str);
g_string_free (buf, TRUE);
seaf_commit_unref (head);
}
@ -645,12 +650,41 @@ update_sync_info_error_state (SyncTask *task, int new_state)
}
}
static void commit_repo (SyncTask *task);
static inline void
transition_sync_state (SyncTask *task, int new_state)
{
g_return_if_fail (new_state >= 0 && new_state < SYNC_STATE_NUM);
SyncInfo *info = task->info;
if (task->state != new_state) {
if (!task->server_side_merge) {
if ((task->state == SYNC_STATE_MERGE ||
task->state == SYNC_STATE_UPLOAD) &&
new_state == SYNC_STATE_DONE &&
need_notify_sync(task->repo))
notify_sync (task->repo, FALSE);
} else {
if (((task->state == SYNC_STATE_INIT && task->uploaded) ||
task->state == SYNC_STATE_FETCH) &&
new_state == SYNC_STATE_DONE &&
need_notify_sync(task->repo))
notify_sync (task->repo, (info->multipart_upload && !info->end_multipart_upload));
}
/* If we're in the process of uploading large set of files, they'll be splitted
* into multiple batches for upload. We want to immediately start the next batch
* after previous one is done.
*/
if (new_state == SYNC_STATE_DONE &&
info->multipart_upload &&
!info->end_multipart_upload) {
commit_repo (task);
return;
}
if (!(task->state == SYNC_STATE_DONE && new_state == SYNC_STATE_INIT) &&
!(task->state == SYNC_STATE_INIT && new_state == SYNC_STATE_DONE)) {
seaf_message ("Repo '%s' sync state transition from '%s' to '%s'.\n",
@ -659,27 +693,21 @@ transition_sync_state (SyncTask *task, int new_state)
sync_state_str[new_state]);
}
if (!task->server_side_merge) {
if ((task->state == SYNC_STATE_MERGE ||
task->state == SYNC_STATE_UPLOAD) &&
new_state == SYNC_STATE_DONE &&
need_notify_sync(task->repo))
notify_sync (task->repo);
} else {
if (((task->state == SYNC_STATE_INIT && task->uploaded) ||
task->state == SYNC_STATE_FETCH) &&
new_state == SYNC_STATE_DONE &&
need_notify_sync(task->repo))
notify_sync (task->repo);
}
task->state = new_state;
if (new_state == SYNC_STATE_DONE ||
new_state == SYNC_STATE_CANCELED ||
new_state == SYNC_STATE_ERROR) {
task->info->in_sync = FALSE;
info->in_sync = FALSE;
--(task->mgr->n_running_tasks);
update_sync_info_error_state (task, new_state);
/* Keep previous upload progress if sync task is canceled or failed. */
if (new_state == SYNC_STATE_DONE) {
info->multipart_upload = FALSE;
info->end_multipart_upload = FALSE;
info->total_bytes = 0;
info->uploaded_bytes = 0;
}
}
#ifdef WIN32

View File

@ -27,6 +27,12 @@ struct _SyncInfo {
gboolean need_fetch;
gboolean need_upload;
gboolean need_merge;
/* Used by multipart upload. */
gboolean multipart_upload;
gint64 total_bytes;
gint64 uploaded_bytes;
gboolean end_multipart_upload;
};
enum {

View File

@ -23,8 +23,8 @@ public class Task : Object {
public string error_str { get; set; }
public int block_total { get; set; }
public int block_done { get; set; } // the number of blocks sent or received
public int64 block_total { get; set; }
public int64 block_done { get; set; } // the number of blocks sent or received
public int fs_objects_total { get; set; }
public int fs_objects_done { get; set; }