/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ #include "common.h" #include #include "db.h" #include "seafile-session.h" #include "seafile-config.h" #include "sync-mgr.h" #include "seafile-error-impl.h" #include "mq-mgr.h" #include "utils.h" #include "vc-utils.h" #include "sync-status-tree.h" #include "diff-simple.h" #ifdef WIN32 #include #endif #define DEBUG_FLAG SEAFILE_DEBUG_SYNC #include "log.h" #include "timer.h" #define DEFAULT_SYNC_INTERVAL 30 /* 30s */ #define CHECK_SYNC_INTERVAL 1000 /* 1s */ #define UPDATE_TX_STATE_INTERVAL 1000 /* 1s */ #define MAX_RUNNING_SYNC_TASKS 5 #define CHECK_LOCKED_FILES_INTERVAL 10 /* 10s */ #define CHECK_FOLDER_PERMS_INTERVAL 30 /* 30s */ #define JWT_TOKEN_EXPIRE_TIME 3*24*3600 /* 3 days */ #define SYNC_PERM_ERROR_RETRY_TIME 2 struct _HttpServerState { int http_version; gboolean checking; gint64 last_http_check_time; char *testing_host; /* Can be server_url or server_url:8082, depends on which one works. */ char *effective_host; gboolean use_fileserver_port; gboolean notif_server_checked; gboolean notif_server_alive; gboolean server_disconnected; gboolean folder_perms_not_supported; gint64 last_check_perms_time; gboolean checking_folder_perms; gboolean locked_files_not_supported; gint64 last_check_locked_files_time; gboolean checking_locked_files; gboolean immediate_check_folder_perms; gboolean immediate_check_locked_files; /* * repo_id -> head commit id mapping. * Caches the head commit ids of synced repos. */ GHashTable *head_commit_map; pthread_mutex_t head_commit_map_lock; gboolean head_commit_map_init; gint64 last_update_head_commit_map_time; gint64 n_jwt_token_request; }; typedef struct _HttpServerState HttpServerState; typedef struct DelConfirmationResult { gboolean resync; } DelConfirmationResult; struct _SeafSyncManagerPriv { struct SeafTimer *check_sync_timer; struct SeafTimer *update_tx_state_timer; int pulse_count; /* When FALSE, auto sync is globally disabled */ gboolean auto_sync_enabled; GHashTable *active_paths; pthread_mutex_t paths_lock; #ifdef WIN32 GAsyncQueue *refresh_paths; struct SeafTimer *refresh_windows_timer; #endif pthread_mutex_t del_confirmation_lock; GHashTable *del_confirmation_tasks; }; struct _ActivePathsInfo { GHashTable *paths; struct SyncStatusTree *syncing_tree; struct SyncStatusTree *synced_tree; }; typedef struct _ActivePathsInfo ActivePathsInfo; static int auto_sync_pulse (void *vmanager); static void on_repo_http_fetched (SeafileSession *seaf, HttpTxTask *tx_task, SeafSyncManager *manager); static void on_repo_http_uploaded (SeafileSession *seaf, HttpTxTask *tx_task, SeafSyncManager *manager); static inline void transition_sync_state (SyncTask *task, int new_state); static void sync_task_free (SyncTask *task); static int sync_repo_v2 (SeafSyncManager *manager, SeafRepo *repo, gboolean is_manual_sync); static gboolean check_http_protocol (SeafSyncManager *mgr, SeafRepo *repo); static void active_paths_info_free (ActivePathsInfo *info); static HttpServerState * http_server_state_new () { HttpServerState *state = g_new0 (HttpServerState, 1); state->head_commit_map = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free); pthread_mutex_init (&state->head_commit_map_lock, NULL); return state; } static void http_server_state_free (HttpServerState *state) { if (!state) return; g_hash_table_destroy (state->head_commit_map); pthread_mutex_destroy (&state->head_commit_map_lock); g_free (state); } SeafSyncManager* seaf_sync_manager_new (SeafileSession *seaf) { SeafSyncManager *mgr = g_new0 (SeafSyncManager, 1); mgr->priv = g_new0 (SeafSyncManagerPriv, 1); mgr->priv->auto_sync_enabled = TRUE; mgr->seaf = seaf; mgr->sync_interval = DEFAULT_SYNC_INTERVAL; mgr->sync_infos = g_hash_table_new (g_str_hash, g_str_equal); mgr->http_server_states = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, (GDestroyNotify)http_server_state_free); gboolean exists; int download_limit = seafile_session_config_get_int (seaf, KEY_DOWNLOAD_LIMIT, &exists); if (exists) mgr->download_limit = download_limit; int upload_limit = seafile_session_config_get_int (seaf, KEY_UPLOAD_LIMIT, &exists); if (exists) mgr->upload_limit = upload_limit; mgr->priv->active_paths = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, (GDestroyNotify)active_paths_info_free); pthread_mutex_init (&mgr->priv->paths_lock, NULL); #ifdef WIN32 mgr->priv->refresh_paths = g_async_queue_new (); #endif mgr->priv->del_confirmation_tasks = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free); pthread_mutex_init (&mgr->priv->del_confirmation_lock, NULL); return mgr; } static SyncInfo* get_sync_info (SeafSyncManager *manager, const char *repo_id) { SyncInfo *info = g_hash_table_lookup (manager->sync_infos, repo_id); if (info) return info; info = g_new0 (SyncInfo, 1); memcpy (info->repo_id, repo_id, 36); g_hash_table_insert (manager->sync_infos, info->repo_id, info); return info; } SyncInfo * seaf_sync_manager_get_sync_info (SeafSyncManager *mgr, const char *repo_id) { return g_hash_table_lookup (mgr->sync_infos, repo_id); } int seaf_sync_manager_init (SeafSyncManager *mgr) { return 0; } static void format_http_task_detail (HttpTxTask *task, GString *buf) { if (task->state != HTTP_TASK_STATE_NORMAL || task->runtime_state == HTTP_TASK_RT_STATE_INIT || task->runtime_state == HTTP_TASK_RT_STATE_FINISHED) return; SeafRepo *repo = seaf_repo_manager_get_repo (seaf->repo_mgr, task->repo_id); char *repo_name; char *type; if (repo) { repo_name = repo->name; type = (task->type == HTTP_TASK_TYPE_UPLOAD) ? "upload" : "download"; } else if (task->is_clone) { CloneTask *ctask; ctask = seaf_clone_manager_get_task (seaf->clone_mgr, task->repo_id); repo_name = ctask->repo_name; type = "download"; } else { return; } int rate = http_tx_task_get_rate(task); g_string_append_printf (buf, "%s\t%d %s\n", type, rate, repo_name); } /* * Publish a notification message to report : * * [uploading/downloading]\t[transfer-rate] [repo-name]\n */ static int update_tx_state (void *vmanager) { SeafSyncManager *mgr = vmanager; GString *buf = g_string_new (NULL); GList *tasks, *ptr; HttpTxTask *http_task; mgr->last_sent_bytes = g_atomic_int_get (&mgr->sent_bytes); g_atomic_int_set (&mgr->sent_bytes, 0); mgr->last_recv_bytes = g_atomic_int_get (&mgr->recv_bytes); g_atomic_int_set (&mgr->recv_bytes, 0); tasks = http_tx_manager_get_upload_tasks (seaf->http_tx_mgr); for (ptr = tasks; ptr; ptr = ptr->next) { http_task = ptr->data; format_http_task_detail (http_task, buf); } g_list_free (tasks); tasks = http_tx_manager_get_download_tasks (seaf->http_tx_mgr); for (ptr = tasks; ptr; ptr = ptr->next) { http_task = ptr->data; format_http_task_detail (http_task, buf); } g_list_free (tasks); if (buf->len != 0) seaf_mq_manager_publish_notification (seaf->mq_mgr, "transfer", buf->str); g_string_free (buf, TRUE); return TRUE; } #ifdef WIN32 static void * refresh_windows_explorer_thread (void *vdata); #define STARTUP_REFRESH_WINDOWS_DELAY 10000 static int refresh_all_windows_on_startup (void *vdata) { /* This is a hack to tell Windows Explorer to refresh all open windows. * On startup, if there is one big library, its events may dominate the * explorer refresh queue. Other libraries don't get refreshed until * the big library's events are consumed. So we refresh the open windows * to reduce the delay. */ SHChangeNotify (SHCNE_ASSOCCHANGED, SHCNF_IDLIST, NULL, NULL); /* One time */ return 0; } #endif static void *update_cached_head_commit_ids (void *arg); int seaf_sync_manager_start (SeafSyncManager *mgr) { mgr->priv->check_sync_timer = seaf_timer_new ( auto_sync_pulse, mgr, CHECK_SYNC_INTERVAL); mgr->priv->update_tx_state_timer = seaf_timer_new ( update_tx_state, mgr, UPDATE_TX_STATE_INTERVAL); g_signal_connect (seaf, "repo-http-fetched", (GCallback)on_repo_http_fetched, mgr); g_signal_connect (seaf, "repo-http-uploaded", (GCallback)on_repo_http_uploaded, mgr); #ifdef WIN32 seaf_job_manager_schedule_job (seaf->job_mgr, refresh_windows_explorer_thread, NULL, mgr->priv->refresh_paths); mgr->priv->refresh_windows_timer = seaf_timer_new ( refresh_all_windows_on_startup, mgr, STARTUP_REFRESH_WINDOWS_DELAY); #endif pthread_t tid; pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); if (pthread_create (&tid, &attr, update_cached_head_commit_ids, mgr) < 0) { seaf_warning ("Failed to create update cached head commit id thread.\n"); return -1; } return 0; } int seaf_sync_manager_add_sync_task (SeafSyncManager *mgr, const char *repo_id, GError **error) { if (!seaf->started) { seaf_message ("sync manager is not started, skip sync request.\n"); return -1; } SeafRepo *repo; repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id); if (!repo) { seaf_warning ("[sync mgr] cannot find repo %s.\n", repo_id); g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_REPO, "Invalid repo"); return -1; } if (seaf_repo_check_worktree (repo) < 0) { g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_NO_WORKTREE, "Worktree doesn't exist"); return -1; } #ifdef USE_GPL_CRYPTO if (repo->version == 0 || (repo->encrypted && repo->enc_version < 2)) { seaf_warning ("Don't support syncing old version libraries.\n"); g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS, "Don't support syncing old version libraries"); return -1; } #endif SyncInfo *info = get_sync_info (mgr, repo->id); if (info->in_sync) return 0; if (repo->version > 0) { if (check_http_protocol (mgr, repo)) { sync_repo_v2 (mgr, repo, TRUE); return 0; } } else { seaf_warning ("Repo %s(%s) is version 0 library. Syncing is no longer supported.\n", repo->name, repo->id); } return 0; } void seaf_sync_manager_cancel_sync_task (SeafSyncManager *mgr, const char *repo_id) { SyncInfo *info; SyncTask *task; if (!seaf->started) { seaf_message ("sync manager is not started, skip cancel request.\n"); return; } /* Cancel running task. */ info = g_hash_table_lookup (mgr->sync_infos, repo_id); if (!info) return; else if (!info->in_sync) { if (info->current_task && info->current_task->state == SYNC_STATE_ERROR) { info->err_cnt = 0; info->in_error = FALSE; info->sync_perm_err_cnt = 0; } return; } g_return_if_fail (info->current_task != NULL); task = info->current_task; switch (task->state) { case SYNC_STATE_FETCH: http_tx_manager_cancel_task (seaf->http_tx_mgr, repo_id, HTTP_TASK_TYPE_DOWNLOAD); transition_sync_state (task, SYNC_STATE_CANCEL_PENDING); break; case SYNC_STATE_UPLOAD: http_tx_manager_cancel_task (seaf->http_tx_mgr, repo_id, HTTP_TASK_TYPE_UPLOAD); transition_sync_state (task, SYNC_STATE_CANCEL_PENDING); break; case SYNC_STATE_COMMIT: case SYNC_STATE_INIT: case SYNC_STATE_MERGE: transition_sync_state (task, SYNC_STATE_CANCEL_PENDING); break; case SYNC_STATE_CANCEL_PENDING: break; default: g_return_if_reached (); } } /* Check the notify setting by user. */ static gboolean need_notify_sync (SeafRepo *repo) { char *notify_setting = seafile_session_config_get_string(seaf, "notify_sync"); if (notify_setting == NULL) { seafile_session_config_set_string(seaf, "notify_sync", "on"); return TRUE; } gboolean result = (g_strcmp0(notify_setting, "on") == 0); g_free (notify_setting); return result; } static const char *sync_state_str[] = { "synchronized", "committing", "initializing", "downloading", "merging", "uploading", "error", "canceled", "cancel pending" }; static gboolean find_meaningful_commit (SeafCommit *commit, void *data, gboolean *stop) { SeafCommit **p_head = data; if (commit->second_parent_id && commit->new_merge && !commit->conflict) return TRUE; *stop = TRUE; seaf_commit_ref (commit); *p_head = commit; return TRUE; } static void notify_sync (SeafRepo *repo, gboolean is_multipart_upload) { SeafCommit *head = NULL; if (!seaf_commit_manager_traverse_commit_tree_truncated (seaf->commit_mgr, repo->id, repo->version, repo->head->commit_id, find_meaningful_commit, &head, FALSE)) { seaf_warning ("Failed to traverse commit tree of %.8s.\n", repo->id); return; } if (!head) return; GString *buf = g_string_new (NULL); g_string_append_printf (buf, "%s\t%s\t%s\t%s\t%s", repo->name, repo->id, head->commit_id, head->parent_id, head->desc); if (!is_multipart_upload) seaf_mq_manager_publish_notification (seaf->mq_mgr, "sync.done", buf->str); else seaf_mq_manager_publish_notification (seaf->mq_mgr, "sync.multipart_upload", buf->str); g_string_free (buf, TRUE); seaf_commit_unref (head); } #define IN_ERROR_THRESHOLD 3 static gboolean is_perm_error (int error) { return (error == SYNC_ERROR_ID_ACCESS_DENIED || error == SYNC_ERROR_ID_NO_WRITE_PERMISSION || error == SYNC_ERROR_ID_PERM_NOT_SYNCABLE || error == SYNC_ERROR_ID_FOLDER_PERM_DENIED || error == SYNC_ERROR_ID_TOO_MANY_FILES || error == SYNC_ERROR_ID_QUOTA_FULL); } static void update_sync_info_error_state (SyncTask *task, int new_state) { SyncInfo *info = task->info; if (new_state == SYNC_STATE_ERROR) { info->err_cnt++; if (info->err_cnt == IN_ERROR_THRESHOLD) info->in_error = TRUE; if (is_perm_error(task->error)) info->sync_perm_err_cnt++; } else if (info->err_cnt > 0) { info->err_cnt = 0; info->in_error = FALSE; info->sync_perm_err_cnt = 0; } } static void commit_repo (SyncTask *task); static void transition_sync_state (SyncTask *task, int new_state) { g_return_if_fail (new_state >= 0 && new_state < SYNC_STATE_NUM); SyncInfo *info = task->info; if (task->state != new_state) { if (((task->state == SYNC_STATE_INIT && task->uploaded) || task->state == SYNC_STATE_FETCH) && new_state == SYNC_STATE_DONE && need_notify_sync(task->repo)) notify_sync (task->repo, (info->multipart_upload && !info->end_multipart_upload)); /* If we're in the process of uploading large set of files, they'll be splitted * into multiple batches for upload. We want to immediately start the next batch * after previous one is done. */ if (new_state == SYNC_STATE_DONE && info->multipart_upload && !info->end_multipart_upload) { commit_repo (task); return; } /* If file error levels occured during sync, the whole sync process can still finish * with DONE state. But we need to notify the user about this error in the interface. * Such file level errors are set with seaf_sync_manager_set_task_error_code(). */ if (new_state != SYNC_STATE_ERROR && task->error != SYNC_ERROR_ID_NO_ERROR) { seaf_message ("Repo '%s' sync is finished but with error: %s\n", task->repo->name, sync_error_id_to_str(task->error)); } if (!(task->state == SYNC_STATE_DONE && new_state == SYNC_STATE_INIT) && !(task->state == SYNC_STATE_INIT && new_state == SYNC_STATE_DONE)) { seaf_message ("Repo '%s' sync state transition from '%s' to '%s'.\n", task->repo->name, sync_state_str[task->state], sync_state_str[new_state]); } task->state = new_state; if (new_state == SYNC_STATE_DONE || new_state == SYNC_STATE_CANCELED || new_state == SYNC_STATE_ERROR) { info->in_sync = FALSE; --(task->mgr->n_running_tasks); update_sync_info_error_state (task, new_state); /* Keep previous upload progress if sync task is canceled or failed. */ if (new_state == SYNC_STATE_DONE) { info->multipart_upload = FALSE; info->end_multipart_upload = FALSE; info->total_bytes = 0; info->uploaded_bytes = 0; } } #ifdef WIN32 seaf_sync_manager_add_refresh_path (seaf->sync_mgr, task->repo->worktree); #endif } } static void set_task_error (SyncTask *task, int error) { g_return_if_fail (error >= 0 && error < N_SYNC_ERROR_ID); const char *err_str = sync_error_id_to_str(error); int err_level = sync_error_level(error); if (task->state != SYNC_STATE_ERROR) { seaf_message ("Repo '%s' sync state transition from %s to '%s': '%s'.\n", task->repo->name, sync_state_str[task->state], sync_state_str[SYNC_STATE_ERROR], err_str); task->state = SYNC_STATE_ERROR; task->error = error; task->info->in_sync = FALSE; --(task->mgr->n_running_tasks); update_sync_info_error_state (task, SYNC_STATE_ERROR); /* For repo-level errors, only need to record in database, but not send notifications. * File-level errors are recorded and notified in the location they happens, not here. */ if (err_level == SYNC_ERROR_LEVEL_REPO) send_file_sync_error_notification (task->repo->id, task->repo->name, NULL, error); #ifdef WIN32 seaf_sync_manager_add_refresh_path (seaf->sync_mgr, task->repo->worktree); #endif } } void seaf_sync_manager_set_task_error_code (SeafSyncManager *mgr, const char *repo_id, int error) { SyncInfo *info = g_hash_table_lookup (mgr->sync_infos, repo_id); if (!info) return; info->current_task->error = error; } static void sync_task_free (SyncTask *task) { g_free (task->tx_id); g_free (task->dest_id); g_free (task->token); g_free (task); } static void start_upload_if_necessary (SyncTask *task) { GError *error = NULL; SeafRepo *repo = task->repo; if (http_tx_manager_add_upload (seaf->http_tx_mgr, repo->id, repo->version, repo->effective_host, repo->token, task->http_version, repo->use_fileserver_port, &error) < 0) { seaf_warning ("Failed to start http upload: %s\n", error->message); set_task_error (task, SYNC_ERROR_ID_NOT_ENOUGH_MEMORY); return; } task->tx_id = g_strdup(repo->id); transition_sync_state (task, SYNC_STATE_UPLOAD); } static void start_fetch_if_necessary (SyncTask *task, const char *remote_head) { GError *error = NULL; SeafRepo *repo = task->repo; if (http_tx_manager_add_download (seaf->http_tx_mgr, repo->id, repo->version, repo->effective_host, repo->token, remote_head, FALSE, NULL, NULL, task->http_version, repo->email, repo->username, repo->use_fileserver_port, repo->name, &error) < 0) { seaf_warning ("Failed to start http download: %s.\n", error->message); set_task_error (task, SYNC_ERROR_ID_NOT_ENOUGH_MEMORY); return; } task->tx_id = g_strdup(repo->id); transition_sync_state (task, SYNC_STATE_FETCH); } static gboolean repo_block_store_exists (SeafRepo *repo) { gboolean ret; char *store_path = g_build_filename (seaf->seaf_dir, "storage", "blocks", repo->id, NULL); if (g_file_test (store_path, G_FILE_TEST_IS_DIR)) ret = TRUE; else ret = FALSE; g_free (store_path); return ret; } #if defined WIN32 || defined __APPLE__ static GHashTable * load_locked_files_blocks (const char *repo_id) { LockedFileSet *fset; GHashTable *block_id_hash; GHashTableIter iter; gpointer key, value; LockedFile *locked; Seafile *file; int i; char *blk_id; fset = seaf_repo_manager_get_locked_file_set (seaf->repo_mgr, repo_id); block_id_hash = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL); g_hash_table_iter_init (&iter, fset->locked_files); while (g_hash_table_iter_next (&iter, &key, &value)) { locked = value; if (strcmp (locked->operation, LOCKED_OP_UPDATE) == 0) { file = seaf_fs_manager_get_seafile (seaf->fs_mgr, fset->repo_id, 1, locked->file_id); if (!file) { seaf_warning ("Failed to find file %s in repo %.8s.\n", locked->file_id, fset->repo_id); continue; } for (i = 0; i < file->n_blocks; ++i) { blk_id = g_strdup (file->blk_sha1s[i]); g_hash_table_replace (block_id_hash, blk_id, blk_id); } seafile_unref (file); } } locked_file_set_free (fset); return block_id_hash; } static gboolean remove_block_cb (const char *store_id, int version, const char *block_id, void *user_data) { GHashTable *block_hash = user_data; if (!g_hash_table_lookup (block_hash, block_id)) seaf_block_manager_remove_block (seaf->block_mgr, store_id, version, block_id); return TRUE; } #endif static void * remove_repo_blocks (void *vtask) { SyncTask *task = vtask; #if defined WIN32 || defined __APPLE__ GHashTable *block_hash; block_hash = load_locked_files_blocks (task->repo->id); if (g_hash_table_size (block_hash) == 0) { g_hash_table_destroy (block_hash); seaf_block_manager_remove_store (seaf->block_mgr, task->repo->id); return vtask; } seaf_block_manager_foreach_block (seaf->block_mgr, task->repo->id, task->repo->version, remove_block_cb, block_hash); g_hash_table_destroy (block_hash); #else seaf_block_manager_remove_store (seaf->block_mgr, task->repo->id); #endif return vtask; } static void remove_blocks_done (void *vtask) { SyncTask *task = vtask; transition_sync_state (task, SYNC_STATE_DONE); } static void on_repo_deleted_on_server (SyncTask *task, SeafRepo *repo) { set_task_error (task, SYNC_ERROR_ID_SERVER_REPO_DELETED); seaf_warning ("repo %s(%.8s) not found on server\n", repo->name, repo->id); if (!seafile_session_config_get_allow_repo_not_found_on_server(seaf)) { seaf_message ("remove repo %s(%.8s) since it's deleted on relay\n", repo->name, repo->id); /* seaf_mq_manager_publish_notification (seaf->mq_mgr, */ /* "repo.deleted_on_relay", */ /* repo->name); */ seaf_repo_manager_del_repo (seaf->repo_mgr, repo); } } static void update_sync_status_v2 (SyncTask *task) { SyncInfo *info = task->info; SeafRepo *repo = task->repo; SeafBranch *master = NULL, *local = NULL; local = seaf_branch_manager_get_branch ( seaf->branch_mgr, info->repo_id, "local"); if (!local) { seaf_warning ("[sync-mgr] Branch local not found for repo %s(%.8s).\n", repo->name, repo->id); set_task_error (task, SYNC_ERROR_ID_LOCAL_DATA_CORRUPT); return; } master = seaf_branch_manager_get_branch ( seaf->branch_mgr, info->repo_id, "master"); if (!master) { seaf_warning ("[sync-mgr] Branch master not found for repo %s(%.8s).\n", repo->name, repo->id); set_task_error (task, SYNC_ERROR_ID_LOCAL_DATA_CORRUPT); return; } if (info->repo_corrupted) { set_task_error (task, SYNC_ERROR_ID_SERVER_REPO_CORRUPT); } else if (info->deleted_on_relay) { on_repo_deleted_on_server (task, repo); } else { /* If local head is the same as remote head, already in sync. */ if (strcmp (local->commit_id, info->head_commit) == 0) { /* As long as the repo is synced with the server. All the local * blocks are not useful any more. */ if (repo_block_store_exists (repo)) { seaf_message ("Removing blocks for repo %s(%.8s).\n", repo->name, repo->id); seaf_job_manager_schedule_job (seaf->job_mgr, remove_repo_blocks, remove_blocks_done, task); } else transition_sync_state (task, SYNC_STATE_DONE); } else start_fetch_if_necessary (task, task->info->head_commit); } seaf_branch_unref (local); seaf_branch_unref (master); } static void check_head_commit_done (HttpHeadCommit *result, void *user_data) { SyncTask *task = user_data; SyncInfo *info = task->info; if (!result->check_success) { set_task_error (task, result->error_code); return; } info->deleted_on_relay = result->is_deleted; info->repo_corrupted = result->is_corrupt; memcpy (info->head_commit, result->head_commit, 40); update_sync_status_v2 (task); } static int check_head_commit_http (SyncTask *task) { SeafRepo *repo = task->repo; int ret = http_tx_manager_check_head_commit (seaf->http_tx_mgr, repo->id, repo->version, repo->effective_host, repo->token, repo->use_fileserver_port, check_head_commit_done, task); if (ret == 0) transition_sync_state (task, SYNC_STATE_INIT); else if (ret < 0) set_task_error (task, SYNC_ERROR_ID_NOT_ENOUGH_MEMORY); return ret; } struct CommitResult { SyncTask *task; gboolean changed; gboolean success; }; static void * commit_job (void *vtask) { SyncTask *task = vtask; SeafRepo *repo = task->repo; struct CommitResult *res = g_new0 (struct CommitResult, 1); GError *error = NULL; res->task = task; if (repo->delete_pending) return res; res->changed = TRUE; res->success = TRUE; char *commit_id = seaf_repo_index_commit (repo, task->is_manual_sync, task->is_initial_commit, &error); if (commit_id == NULL && error != NULL) { seaf_warning ("[Sync mgr] Failed to commit to repo %s(%.8s).\n", repo->name, repo->id); res->success = FALSE; } else if (commit_id == NULL) { res->changed = FALSE; } g_free (commit_id); return res; } static char * exceed_max_deleted_files (SeafRepo *repo); static void notify_delete_confirmation (const char *repo_name, const char *desc, const char *confirmation_id); static void commit_job_done (void *vres) { struct CommitResult *res = vres; SeafRepo *repo = res->task->repo; SyncTask *task = res->task; res->task->mgr->commit_job_running = FALSE; if (repo->delete_pending) { transition_sync_state (res->task, SYNC_STATE_CANCELED); seaf_repo_manager_del_repo (seaf->repo_mgr, repo); g_free (res); return; } if (res->task->state == SYNC_STATE_CANCEL_PENDING) { transition_sync_state (res->task, SYNC_STATE_CANCELED); g_free (res); return; } if (!res->success) { set_task_error (res->task, SYNC_ERROR_ID_INDEX_ERROR); g_free (res); return; } if (res->changed) { char *desc = NULL; desc = exceed_max_deleted_files (repo); if (desc) { notify_delete_confirmation (repo->name, desc, repo->head->commit_id); seaf_warning ("Delete more than %d files, add delete confirmation.\n", seaf->delete_confirm_threshold); task->info->del_confirmation_pending = TRUE; set_task_error (res->task, SYNC_ERROR_ID_DEL_CONFIRMATION_PENDING); g_free (desc); g_free (res); return; } start_upload_if_necessary (res->task); } else if (task->is_manual_sync || task->is_initial_commit) check_head_commit_http (task); else transition_sync_state (task, SYNC_STATE_DONE); g_free (res); } static int check_commit_state (void *data); static void commit_repo (SyncTask *task) { /* In order not to eat too much CPU power, only one commit job can be run * at the same time. Other sync tasks have to check every 1 second. */ if (task->mgr->commit_job_running) { task->commit_timer = seaf_timer_new (check_commit_state, task, 1000); return; } task->mgr->commit_job_running = TRUE; transition_sync_state (task, SYNC_STATE_COMMIT); if (seaf_job_manager_schedule_job (seaf->job_mgr, commit_job, commit_job_done, task) < 0) set_task_error (task, SYNC_ERROR_ID_NOT_ENOUGH_MEMORY); } static int check_commit_state (void *data) { SyncTask *task = data; if (!task->mgr->commit_job_running) { seaf_timer_free (&task->commit_timer); commit_repo (task); return 0; } return 1; } static SyncTask * create_sync_task_v2 (SeafSyncManager *manager, SeafRepo *repo, gboolean is_manual_sync, gboolean is_initial_commit) { SyncTask *task = g_new0 (SyncTask, 1); SyncInfo *info; info = get_sync_info (manager, repo->id); task->info = info; task->mgr = manager; task->dest_id = g_strdup (repo->relay_id); task->token = g_strdup(repo->token); task->is_manual_sync = is_manual_sync; task->is_initial_commit = is_initial_commit; task->error = SYNC_ERROR_ID_NO_ERROR; repo->last_sync_time = time(NULL); ++(manager->n_running_tasks); /* Free the last task when a new task is started. * This way we can always get the state of the last task even * after it's done. */ if (task->info->current_task) sync_task_free (task->info->current_task); task->info->current_task = task; task->info->in_sync = TRUE; task->repo = repo; if (repo->server_url) { HttpServerState *state = g_hash_table_lookup (manager->http_server_states, repo->server_url); if (state) { task->http_version = state->http_version; } } return task; } static gboolean create_commit_from_event_queue (SeafSyncManager *manager, SeafRepo *repo, gboolean is_manual_sync) { WTStatus *status; SyncTask *task; gboolean ret = FALSE; gint now = (gint)time(NULL); gint last_changed; status = seaf_wt_monitor_get_worktree_status (manager->seaf->wt_monitor, repo->id); if (status) { last_changed = g_atomic_int_get (&status->last_changed); if (status->last_check == 0) { /* Force commit and sync after a new repo is added. */ task = create_sync_task_v2 (manager, repo, is_manual_sync, TRUE); repo->create_partial_commit = TRUE; commit_repo (task); status->last_check = now; ret = TRUE; } else if (status->partial_commit) { task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE); repo->create_partial_commit = TRUE; commit_repo (task); ret = TRUE; } else if (last_changed != 0 && status->last_check <= last_changed) { /* Commit and sync if the repo has been updated after the * last check and is not updated for the last 2 seconds. */ if (now - last_changed >= 2) { task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE); repo->create_partial_commit = TRUE; commit_repo (task); status->last_check = now; ret = TRUE; } } wt_status_unref (status); } return ret; } static gboolean can_schedule_repo (SeafSyncManager *manager, SeafRepo *repo) { int now = (int)time(NULL); return ((repo->last_sync_time == 0 || repo->last_sync_time < now - manager->sync_interval) && manager->n_running_tasks < MAX_RUNNING_SYNC_TASKS); } static gboolean need_check_on_server (SeafSyncManager *manager, SeafRepo *repo, const char *master_head_id) { #define HEAD_COMMIT_MAP_TTL 90 HttpServerState *state; gboolean ret = FALSE; SyncInfo *info; /* If sync state is in error, always retry. */ info = get_sync_info (manager, repo->id); if (info && info->current_task && info->current_task->state == SYNC_STATE_ERROR) return TRUE; state = g_hash_table_lookup (manager->http_server_states, repo->server_url); if (!state) return TRUE; pthread_mutex_lock (&state->head_commit_map_lock); if (!state->head_commit_map_init) { ret = TRUE; goto out; } gint64 now = (gint64)time(NULL); if (now - state->last_update_head_commit_map_time >= HEAD_COMMIT_MAP_TTL) { ret = TRUE; goto out; } char *server_head = g_hash_table_lookup (state->head_commit_map, repo->id); if (!server_head) { /* Repo was removed on server. Just return "changed on server". */ ret = TRUE; goto out; } if (g_strcmp0 (server_head, master_head_id) != 0) ret = TRUE; out: pthread_mutex_unlock (&state->head_commit_map_lock); return ret; } #define MAX_DELETED_FILES_NUM 100 inline static char * get_basename (char *path) { char *slash; slash = strrchr (path, '/'); if (!slash) return path; return (slash + 1); } static char * exceed_max_deleted_files (SeafRepo *repo) { SeafBranch *master = NULL, *local = NULL; SeafCommit *local_head = NULL, *master_head = NULL; GList *diff_results = NULL; char *deleted_file = NULL; GString *desc = NULL; char *ret = NULL; local = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "local"); if (!local) { seaf_warning ("No local branch found for repo %s(%.8s).\n", repo->name, repo->id); goto out; } master = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "master"); if (!master) { seaf_warning ("No master branch found for repo %s(%.8s).\n", repo->name, repo->id); goto out; } local_head = seaf_commit_manager_get_commit (seaf->commit_mgr, repo->id, repo->version, local->commit_id); if (!local_head) { seaf_warning ("Failed to get head of local branch for repo %s.\n", repo->id); goto out; } master_head = seaf_commit_manager_get_commit (seaf->commit_mgr, repo->id, repo->version, master->commit_id); if (!master_head) { seaf_warning ("Failed to get head of master branch for repo %s.\n", repo->id); goto out; } diff_commit_roots (repo->id, repo->version, master_head->root_id, local_head->root_id, &diff_results, FALSE); if (!diff_results) { goto out; } GList *p; DiffEntry *de; int n_deleted = 0; for (p = diff_results; p != NULL; p = p->next) { de = p->data; switch (de->status) { case DIFF_STATUS_DELETED: if (n_deleted == 0) deleted_file = get_basename(de->name); n_deleted++; break; } } if (n_deleted >= seaf->delete_confirm_threshold) { desc = g_string_new (""); g_string_append_printf (desc, "Deleted \"%s\" and %d more files.\n", deleted_file, n_deleted - 1); ret = g_string_free (desc, FALSE); } out: seaf_branch_unref (local); seaf_branch_unref (master); seaf_commit_unref (local_head); seaf_commit_unref (master_head); g_list_free_full (diff_results, (GDestroyNotify)diff_entry_free); return ret; } static void notify_delete_confirmation (const char *repo_name, const char *desc, const char *confirmation_id) { json_t *obj = json_object (); json_object_set_string_member (obj, "repo_name", repo_name); json_object_set_string_member (obj, "delete_files", desc); json_object_set_string_member (obj, "confirmation_id", confirmation_id); char *msg = json_dumps (obj, JSON_COMPACT); seaf_mq_manager_publish_notification (seaf->mq_mgr, "sync.del_confirmation", msg); json_decref (obj); g_free (msg); } int seaf_sync_manager_add_del_confirmation (SeafSyncManager *mgr, const char *confirmation_id, gboolean resync) { SeafSyncManagerPriv *priv = seaf->sync_mgr->priv; DelConfirmationResult *result = NULL; result = g_new0 (DelConfirmationResult, 1); result->resync = resync; pthread_mutex_lock (&priv->del_confirmation_lock); g_hash_table_insert (priv->del_confirmation_tasks, g_strdup (confirmation_id), result); pthread_mutex_unlock (&priv->del_confirmation_lock); return 0; } static DelConfirmationResult * get_del_confirmation_result (const char *confirmation_id) { SeafSyncManagerPriv *priv = seaf->sync_mgr->priv; DelConfirmationResult *result, *copy = NULL; pthread_mutex_lock (&priv->del_confirmation_lock); result = g_hash_table_lookup (priv->del_confirmation_tasks, confirmation_id); if (result) { copy = g_new0 (DelConfirmationResult, 1); copy->resync = result->resync; g_hash_table_remove (priv->del_confirmation_tasks, confirmation_id); } pthread_mutex_unlock (&priv->del_confirmation_lock); return copy; } static void resync_repo (SeafRepo *repo) { GError *error = NULL; char *repo_id = g_strdup (repo->id); int repo_version = repo->version; char *repo_name = g_strdup (repo->name); char *token = g_strdup (repo->token); char *magic = g_strdup (repo->magic); int enc_version = repo->enc_version; char *random_key = g_strdup (repo->random_key); char *worktree = g_strdup (repo->worktree); char *email = g_strdup (repo->email); char *more_info = NULL; gboolean is_encrypted = FALSE; char key[65], iv[33]; json_t *obj = json_object (); json_object_set_int_member (obj, "is_readonly", repo->is_readonly); json_object_set_string_member (obj, "repo_salt", repo->salt); json_object_set_string_member (obj, "server_url", repo->server_url); if (repo->username) json_object_set_string_member (obj, "username", repo->username); if (repo->encrypted) { is_encrypted = TRUE; if (repo->enc_version == 1) { rawdata_to_hex (repo->enc_key, key, 16); rawdata_to_hex (repo->enc_iv, iv, 16); } else if (repo->enc_version >= 2){ rawdata_to_hex (repo->enc_key, key, 32); rawdata_to_hex (repo->enc_iv, iv, 16); } json_object_set_int_member (obj, "resync_enc_repo", TRUE); if (repo->pwd_hash_algo) { json_object_set_string_member (obj, "pwd_hash_algo", repo->pwd_hash_algo); json_object_set_string_member (obj, "pwd_hash_params", repo->pwd_hash_params); json_object_set_string_member (obj, "pwd_hash", repo->pwd_hash); } } more_info = json_dumps (obj, 0); if (repo->auto_sync && (repo->sync_interval == 0)) seaf_wt_monitor_unwatch_repo (seaf->wt_monitor, repo->id); seaf_repo_manager_del_repo (seaf->repo_mgr, repo); if (is_encrypted) { seaf_repo_manager_save_repo_enc_info (seaf->repo_mgr, repo_id, key, iv); } char *ret = seaf_clone_manager_add_task (seaf->clone_mgr, repo_id, repo_version, repo_name, token, NULL, magic, enc_version, random_key, worktree, email, more_info, &error); if (error) { seaf_warning ("Failed to clone repo %s: %s\n", repo_id, error->message); g_clear_error (&error); } g_free (ret); g_free (repo_id); g_free (repo_name); g_free (token); g_free (magic); g_free (random_key); g_free (worktree); g_free (email); json_decref (obj); g_free (more_info); } static int sync_repo_v2 (SeafSyncManager *manager, SeafRepo *repo, gboolean is_manual_sync) { SeafBranch *master, *local; SyncTask *task; int ret = 0; char *last_download = NULL; SyncInfo *info = NULL; master = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "master"); if (!master) { seaf_warning ("No master branch found for repo %s(%.8s).\n", repo->name, repo->id); return -1; } local = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "local"); if (!local) { seaf_warning ("No local branch found for repo %s(%.8s).\n", repo->name, repo->id); return -1; } /* If last download was interrupted in the fetch and download stage, * need to resume it at exactly the same remote commit. */ last_download = seaf_repo_manager_get_repo_property (seaf->repo_mgr, repo->id, REPO_PROP_DOWNLOAD_HEAD); if (last_download && strcmp (last_download, EMPTY_SHA1) != 0) { if (is_manual_sync || can_schedule_repo (manager, repo)) { task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE); start_fetch_if_necessary (task, last_download); } goto out; } info = get_sync_info (manager, repo->id); if (strcmp (master->commit_id, local->commit_id) != 0) { if (is_manual_sync || can_schedule_repo (manager, repo) || info->del_confirmation_pending) { task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE); if (!task->info->del_confirmation_pending) { char *desc = NULL; desc = exceed_max_deleted_files (repo); if (desc) { notify_delete_confirmation (repo->name, desc, local->commit_id); seaf_warning ("Delete more than %d files, add delete confirmation.\n", seaf->delete_confirm_threshold); task->info->del_confirmation_pending = TRUE; set_task_error (task, SYNC_ERROR_ID_DEL_CONFIRMATION_PENDING); g_free (desc); goto out; } } else { DelConfirmationResult *result = get_del_confirmation_result (local->commit_id); if (!result) { // User has not confirmed whether to continue syncing. set_task_error (task, SYNC_ERROR_ID_DEL_CONFIRMATION_PENDING); goto out; } else if (result->resync) { // User chooses to resync. g_free (result); task->info->del_confirmation_pending = FALSE; set_task_error (task, SYNC_ERROR_ID_DEL_CONFIRMATION_PENDING); // Delete this repo and resync this repo by adding clone task. resync_repo (repo); ret = -1; goto out; } // User chooes to continue syncing. g_free (result); task->info->del_confirmation_pending = FALSE; } start_upload_if_necessary (task); } /* Do nothing if the client still has something to upload * but it's before 30-second schedule. */ goto out; } else if (is_manual_sync) { task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE); commit_repo (task); goto out; } else if (create_commit_from_event_queue (manager, repo, is_manual_sync)) goto out; if (is_manual_sync || can_schedule_repo (manager, repo)) { /* If file syncing protocol version is higher than 2, we check for all head commit ids * for synced repos regularly. */ if (!is_manual_sync && !need_check_on_server (manager, repo, master->commit_id)) { seaf_debug ("Repo %s is not changed on server %s.\n", repo->name, repo->server_url); repo->last_sync_time = time(NULL); goto out; } task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE); check_head_commit_http (task); } out: g_free (last_download); seaf_branch_unref (master); seaf_branch_unref (local); return ret; } void seaf_sync_manager_update_repo (SeafSyncManager *manager, SeafRepo *repo, const char *head_commit) { HttpServerState *state; SeafBranch *master = NULL; state = g_hash_table_lookup (manager->http_server_states, repo->server_url); if (!state) { return; } master = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "master"); if (!master) { return; } if (g_strcmp0(head_commit, master->commit_id) != 0) { pthread_mutex_lock (&state->head_commit_map_lock); g_hash_table_replace (state->head_commit_map, g_strdup (repo->id), g_strdup (head_commit)); pthread_mutex_unlock (&state->head_commit_map_lock); // Set last_sync_time to 0 to allow the repo to be sync immediately. // Otherwise it only gets synced after 30 seconds since the last sync. repo->last_sync_time = 0; } seaf_branch_unref (master); return; } void seaf_sync_manager_check_locks_and_folder_perms (SeafSyncManager *manager, const char *server_url) { HttpServerState *state; state = g_hash_table_lookup (manager->http_server_states, server_url); if (!state) { return; } if (!seaf_repo_manager_server_is_pro (seaf->repo_mgr, server_url)) return; state->immediate_check_folder_perms = TRUE; state->immediate_check_locked_files = TRUE; return; } static void auto_delete_repo (SeafSyncManager *manager, SeafRepo *repo) { SyncInfo *info = seaf_sync_manager_get_sync_info (manager, repo->id); char *name = g_strdup (repo->name); seaf_message ("Auto deleted repo '%s'.\n", repo->name); seaf_sync_manager_cancel_sync_task (seaf->sync_mgr, repo->id); if (info != NULL && info->in_sync) { seaf_repo_manager_mark_repo_deleted (seaf->repo_mgr, repo); } else { seaf_repo_manager_del_repo (seaf->repo_mgr, repo); } /* Publish a message, for applet to notify in the system tray */ seaf_mq_manager_publish_notification (seaf->mq_mgr, "repo.removed", name); g_free (name); } static char * http_fileserver_url (const char *url) { const char *host; char *colon; char *url_no_port; char *ret = NULL; /* Just return the url itself if it's invalid. */ if (strlen(url) <= strlen("http://")) return g_strdup(url); /* Skip protocol schem. */ host = url + strlen("http://"); colon = strrchr (host, ':'); if (colon) { url_no_port = g_strndup(url, colon - url); ret = g_strconcat(url_no_port, ":8082", NULL); g_free (url_no_port); } else { ret = g_strconcat(url, ":8082", NULL); } return ret; } static void check_http_fileserver_protocol_done (HttpProtocolVersion *result, void *user_data) { HttpServerState *state = user_data; state->checking = FALSE; if (result->check_success && !result->not_supported) { state->http_version = MIN(result->version, CURRENT_SYNC_PROTO_VERSION); state->effective_host = http_fileserver_url(state->testing_host); state->use_fileserver_port = TRUE; seaf_message ("File syncing protocol version on server %s is %d. " "Client file syncing protocol version is %d. Use version %d.\n", state->effective_host, result->version, CURRENT_SYNC_PROTO_VERSION, state->http_version); } } static void check_http_protocol_done (HttpProtocolVersion *result, void *user_data) { HttpServerState *state = user_data; if (result->check_success && !result->not_supported) { state->http_version = MIN(result->version, CURRENT_SYNC_PROTO_VERSION); state->effective_host = g_strdup(state->testing_host); state->checking = FALSE; seaf_message ("File syncing protocol version on server %s is %d. " "Client file syncing protocol version is %d. Use version %d.\n", state->effective_host, result->version, CURRENT_SYNC_PROTO_VERSION, state->http_version); } else if (strncmp(state->testing_host, "https", 5) != 0) { char *host_fileserver = http_fileserver_url(state->testing_host); if (http_tx_manager_check_protocol_version (seaf->http_tx_mgr, host_fileserver, TRUE, check_http_fileserver_protocol_done, state) < 0) state->checking = FALSE; g_free (host_fileserver); } else { state->checking = FALSE; } } #define CHECK_HTTP_INTERVAL 10 /* * Returns TRUE if we're ready to use http-sync; otherwise FALSE. */ static gboolean check_http_protocol (SeafSyncManager *mgr, SeafRepo *repo) { /* If a repo was cloned before 4.0, server-url is not set. */ if (!repo->server_url) return FALSE; HttpServerState *state = g_hash_table_lookup (mgr->http_server_states, repo->server_url); if (!state) { state = http_server_state_new (); g_hash_table_insert (mgr->http_server_states, g_strdup(repo->server_url), state); } if (state->checking) { return FALSE; } if (state->http_version > 0) { if (!repo->effective_host) { repo->effective_host = g_strdup(state->effective_host); repo->use_fileserver_port = state->use_fileserver_port; } return TRUE; } /* If we haven't detected the server url successfully, retry every 10 seconds. */ gint64 now = time(NULL); if (now - state->last_http_check_time < CHECK_HTTP_INTERVAL) return FALSE; /* First try repo->server_url. * If it fails and https is not used, try server_url:8082 instead. */ g_free (state->testing_host); state->testing_host = g_strdup(repo->server_url); state->last_http_check_time = (gint64)time(NULL); if (http_tx_manager_check_protocol_version (seaf->http_tx_mgr, repo->server_url, FALSE, check_http_protocol_done, state) < 0) return FALSE; state->checking = TRUE; return FALSE; } static void check_notif_server_done (gboolean is_alive, void *user_data) { HttpServerState *state = user_data; if (is_alive) { state->notif_server_alive = TRUE; seaf_message ("Notification server is enabled on the remote server %s.\n", state->effective_host); } } static char * http_notification_url (const char *url) { const char *host; char *colon; char *url_no_port; char *ret = NULL; /* Just return the url itself if it's invalid. */ if (strlen(url) <= strlen("http://")) return g_strdup(url); /* Skip protocol schem. */ host = url + strlen("http://"); colon = strrchr (host, ':'); if (colon) { url_no_port = g_strndup(url, colon - url); ret = g_strconcat(url_no_port, ":8083", NULL); g_free (url_no_port); } else { ret = g_strconcat(url, ":8083", NULL); } return ret; } #if defined WIN32 || defined __APPLE__ || defined COMPILE_LINUX_WS // Returns TRUE if notification server is alive; otherwise FALSE. // We only check notification server once. static gboolean check_notif_server (SeafSyncManager *mgr, SeafRepo *repo) { if (!repo->server_url) return FALSE; HttpServerState *state = g_hash_table_lookup (mgr->http_server_states, repo->server_url); if (!state) { return FALSE; } if (state->notif_server_alive) { return TRUE; } if (state->notif_server_checked) { return FALSE; } char *notif_url = NULL; if (state->use_fileserver_port) { notif_url = http_notification_url (repo->server_url); } else { notif_url = g_strdup (repo->server_url); } if (http_tx_manager_check_notif_server (seaf->http_tx_mgr, notif_url, state->use_fileserver_port, check_notif_server_done, state) < 0) { g_free (notif_url); return FALSE; } state->notif_server_checked = TRUE; g_free (notif_url); return FALSE; } #endif gint cmp_repos_by_sync_time (gconstpointer a, gconstpointer b, gpointer user_data) { const SeafRepo *repo_a = a; const SeafRepo *repo_b = b; return (repo_a->last_sync_time - repo_b->last_sync_time); } #if defined WIN32 || defined __APPLE__ static void cleanup_file_blocks (const char *repo_id, int version, const char *file_id) { Seafile *file; int i; file = seaf_fs_manager_get_seafile (seaf->fs_mgr, repo_id, version, file_id); for (i = 0; i < file->n_blocks; ++i) seaf_block_manager_remove_block (seaf->block_mgr, repo_id, version, file->blk_sha1s[i]); seafile_unref (file); } static gboolean handle_locked_file_update (SeafRepo *repo, struct index_state *istate, LockedFileSet *fset, const char *path, LockedFile *locked, CheckoutBlockAux *aux) { gboolean locked_on_server = FALSE; struct cache_entry *ce; char file_id[41]; char *fullpath = NULL; SeafStat st; gboolean file_exists = TRUE; SeafileCrypt *crypt = NULL; SeafBranch *master = NULL; gboolean ret = TRUE; locked_on_server = seaf_filelock_manager_is_file_locked (seaf->filelock_mgr, repo->id, path); /* File is still locked, do nothing. */ if (do_check_file_locked (path, repo->worktree, locked_on_server)) return FALSE; seaf_debug ("Update previously locked file %s in repo %.8s.\n", path, repo->id); /* If the file was locked on the last checkout, the worktree file was not * updated, but the index has been updated. So the ce in the index should * contain the information for the file to be updated. */ ce = index_name_exists (istate, path, strlen(path), 0); if (!ce) { seaf_warning ("Cache entry for %s in repo %s(%.8s) is not found " "when update locked file.", path, repo->name, repo->id); goto remove_from_db; } rawdata_to_hex (ce->sha1, file_id, 20); fullpath = g_build_filename (repo->worktree, path, NULL); file_exists = seaf_util_exists (fullpath); if (file_exists && seaf_stat (fullpath, &st) < 0) { seaf_warning ("Failed to stat %s: %s.\n", fullpath, strerror(errno)); goto out; } if (repo->encrypted) crypt = seafile_crypt_new (repo->enc_version, repo->enc_key, repo->enc_iv); master = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "master"); if (!master) { seaf_warning ("No master branch found for repo %s(%.8s).\n", repo->name, repo->id); goto out; } gboolean conflicted; gboolean force_conflict = (file_exists && st.st_mtime != locked->old_mtime); char *username = repo->username; if (!username) { username = repo->email; } if (seaf_repo_manager_checkout_file (repo, file_id, fullpath, ce->ce_mode, ce->ce_mtime.sec, crypt, path, master->commit_id, force_conflict, &conflicted, username, aux) < 0) { seaf_warning ("Failed to checkout previously locked file %s in repo " "%s(%.8s).\n", path, repo->name, repo->id); } seaf_sync_manager_update_active_path (seaf->sync_mgr, repo->id, path, S_IFREG, SYNC_STATUS_SYNCED, TRUE); /* In checkout, the file was overwritten by rename, so the file attributes are gone. We have to set read-only state again. */ if (locked_on_server) seaf_filelock_manager_lock_wt_file (seaf->filelock_mgr, repo->id, path); out: cleanup_file_blocks (repo->id, repo->version, file_id); remove_from_db: /* Remove the locked file record from db. */ locked_file_set_remove (fset, path, TRUE); g_free (fullpath); g_free (crypt); seaf_branch_unref (master); return ret; } static gboolean handle_locked_file_delete (SeafRepo *repo, struct index_state *istate, LockedFileSet *fset, const char *path, LockedFile *locked) { gboolean locked_on_server = FALSE; char *fullpath = NULL; SeafStat st; gboolean file_exists = TRUE; gboolean ret = TRUE; locked_on_server = seaf_filelock_manager_is_file_locked (seaf->filelock_mgr, repo->id, path); /* File is still locked, do nothing. */ if (do_check_file_locked (path, repo->worktree, locked_on_server)) return FALSE; seaf_debug ("Delete previously locked file %s in repo %.8s.\n", path, repo->id); fullpath = g_build_filename (repo->worktree, path, NULL); file_exists = seaf_util_exists (fullpath); if (file_exists && seaf_stat (fullpath, &st) < 0) { seaf_warning ("Failed to stat %s: %s.\n", fullpath, strerror(errno)); goto out; } if (file_exists && st.st_mtime == locked->old_mtime) seaf_util_unlink (fullpath); out: /* Remove the locked file record from db. */ locked_file_set_remove (fset, path, TRUE); g_free (fullpath); return ret; } static void * check_locked_files (void *vdata) { SeafRepo *repo = vdata; LockedFileSet *fset; GHashTableIter iter; gpointer key, value; char *path; LockedFile *locked; char index_path[SEAF_PATH_MAX]; struct index_state istate; fset = seaf_repo_manager_get_locked_file_set (seaf->repo_mgr, repo->id); if (g_hash_table_size (fset->locked_files) == 0) { locked_file_set_free (fset); return vdata; } memset (&istate, 0, sizeof(istate)); snprintf (index_path, SEAF_PATH_MAX, "%s/%s", repo->manager->index_dir, repo->id); if (read_index_from (&istate, index_path, repo->version) < 0) { seaf_warning ("Failed to load index.\n"); return vdata; } CheckoutBlockAux *aux = g_new0 (CheckoutBlockAux, 1); aux->repo_id = g_strdup (repo->id); aux->host = g_strdup (repo->effective_host); aux->token = g_strdup (repo->token); aux->use_fileserver_port = repo->use_fileserver_port; gboolean success; g_hash_table_iter_init (&iter, fset->locked_files); while (g_hash_table_iter_next (&iter, &key, &value)) { path = key; locked = value; success = FALSE; if (strcmp (locked->operation, LOCKED_OP_UPDATE) == 0) success = handle_locked_file_update (repo, &istate, fset, path, locked, aux); else if (strcmp (locked->operation, LOCKED_OP_DELETE) == 0) success = handle_locked_file_delete (repo, &istate, fset, path, locked); if (success) g_hash_table_iter_remove (&iter); } free_checkout_block_aux (aux); discard_index (&istate); locked_file_set_free (fset); return vdata; } static void check_locked_files_done (void *vdata) { SeafRepo *repo = vdata; repo->checking_locked_files = FALSE; } #endif static void check_folder_perms_done (HttpFolderPerms *result, void *user_data) { HttpServerState *server_state = user_data; GList *ptr; HttpFolderPermRes *res; gint64 now = (gint64)time(NULL); server_state->checking_folder_perms = FALSE; if (!result->success) { /* If on star-up we find that checking folder perms fails, * we assume the server doesn't support it. */ if (server_state->last_check_perms_time == 0) server_state->folder_perms_not_supported = TRUE; server_state->last_check_perms_time = now; return; } SyncInfo *info; for (ptr = result->results; ptr; ptr = ptr->next) { res = ptr->data; info = get_sync_info (seaf->sync_mgr, res->repo_id); if (info->in_sync) continue; seaf_repo_manager_update_folder_perms (seaf->repo_mgr, res->repo_id, FOLDER_PERM_TYPE_USER, res->user_perms); seaf_repo_manager_update_folder_perms (seaf->repo_mgr, res->repo_id, FOLDER_PERM_TYPE_GROUP, res->group_perms); seaf_repo_manager_update_folder_perm_timestamp (seaf->repo_mgr, res->repo_id, res->timestamp); } server_state->last_check_perms_time = now; } static void check_folder_permissions_one_server_immediately (SeafSyncManager *mgr, const char *host, HttpServerState *server_state, GList *repos, gboolean force) { GList *ptr; SeafRepo *repo; char *token; gint64 timestamp; HttpFolderPermReq *req; GList *requests = NULL; for (ptr = repos; ptr; ptr = ptr->next) { repo = ptr->data; #if defined WIN32 || defined __APPLE__ || defined COMPILE_LINUX_WS if (!force && seaf_notif_manager_is_repo_subscribed (seaf->notif_mgr, repo)) continue; #endif if (!repo->head) continue; if (g_strcmp0 (host, repo->server_url) != 0) continue; token = seaf_repo_manager_get_repo_property (seaf->repo_mgr, repo->id, REPO_PROP_TOKEN); if (!token) continue; timestamp = seaf_repo_manager_get_folder_perm_timestamp (seaf->repo_mgr, repo->id); if (timestamp < 0) timestamp = 0; req = g_new0 (HttpFolderPermReq, 1); memcpy (req->repo_id, repo->id, 36); req->token = g_strdup(token); req->timestamp = timestamp; requests = g_list_append (requests, req); g_free (token); } if (!requests) return; server_state->checking_folder_perms = TRUE; /* The requests list will be freed in http tx manager. */ if (http_tx_manager_get_folder_perms (seaf->http_tx_mgr, server_state->effective_host, server_state->use_fileserver_port, requests, check_folder_perms_done, server_state) < 0) { seaf_warning ("Failed to schedule check folder permissions\n"); server_state->checking_folder_perms = FALSE; } } static void check_folder_permissions_one_server (SeafSyncManager *mgr, const char *host, HttpServerState *server_state, GList *repos) { if (!seaf_repo_manager_server_is_pro (seaf->repo_mgr, host)) return; gint64 now = (gint64)time(NULL); if (server_state->http_version == 0 || server_state->folder_perms_not_supported || server_state->checking_folder_perms) return; if (server_state->immediate_check_folder_perms) { server_state->immediate_check_folder_perms = FALSE; check_folder_permissions_one_server_immediately (mgr, host, server_state, repos, TRUE); return; } if (server_state->last_check_perms_time > 0 && now - server_state->last_check_perms_time < CHECK_FOLDER_PERMS_INTERVAL) return; check_folder_permissions_one_server_immediately (mgr, host, server_state, repos, FALSE); } static void check_folder_permissions (SeafSyncManager *mgr, GList *repos) { GHashTableIter iter; gpointer key, value; char *host; HttpServerState *state; g_hash_table_iter_init (&iter, mgr->http_server_states); while (g_hash_table_iter_next (&iter, &key, &value)) { host = key; state = value; check_folder_permissions_one_server (mgr, host, state, repos); } } static void check_server_locked_files_done (HttpLockedFiles *result, void *user_data) { HttpServerState *server_state = user_data; GList *ptr; HttpLockedFilesRes *locked_res; gint64 now = (gint64)time(NULL); server_state->checking_locked_files = FALSE; if (!result->success) { /* If on star-up we find that checking locked files fails, * we assume the server doesn't support it. */ if (server_state->last_check_locked_files_time == 0) server_state->locked_files_not_supported = TRUE; server_state->last_check_locked_files_time = now; return; } SyncInfo *info; for (ptr = result->results; ptr; ptr = ptr->next) { locked_res = ptr->data; info = get_sync_info (seaf->sync_mgr, locked_res->repo_id); if (info->in_sync) continue; seaf_filelock_manager_update (seaf->filelock_mgr, locked_res->repo_id, locked_res->locked_files); seaf_filelock_manager_update_timestamp (seaf->filelock_mgr, locked_res->repo_id, locked_res->timestamp); } server_state->last_check_locked_files_time = now; } static void check_locked_files_one_server_immediately (SeafSyncManager *mgr, const char *host, HttpServerState *server_state, GList *repos, gboolean force) { GList *ptr; SeafRepo *repo; char *token; gint64 timestamp; HttpLockedFilesReq *req; GList *requests = NULL; for (ptr = repos; ptr; ptr = ptr->next) { repo = ptr->data; #if defined WIN32 || defined __APPLE__ || defined COMPILE_LINUX_WS if (!force && seaf_notif_manager_is_repo_subscribed (seaf->notif_mgr, repo)) continue; #endif if (!repo->head) continue; if (g_strcmp0 (host, repo->server_url) != 0) continue; token = seaf_repo_manager_get_repo_property (seaf->repo_mgr, repo->id, REPO_PROP_TOKEN); if (!token) continue; timestamp = seaf_filelock_manager_get_timestamp (seaf->filelock_mgr, repo->id); if (timestamp < 0) timestamp = 0; req = g_new0 (HttpLockedFilesReq, 1); memcpy (req->repo_id, repo->id, 36); req->token = g_strdup(token); req->timestamp = timestamp; requests = g_list_append (requests, req); g_free (token); } if (!requests) return; server_state->checking_locked_files = TRUE; /* The requests list will be freed in http tx manager. */ if (http_tx_manager_get_locked_files (seaf->http_tx_mgr, server_state->effective_host, server_state->use_fileserver_port, requests, check_server_locked_files_done, server_state) < 0) { seaf_warning ("Failed to schedule check server locked files\n"); server_state->checking_locked_files = FALSE; } } static void check_locked_files_one_server (SeafSyncManager *mgr, const char *host, HttpServerState *server_state, GList *repos) { if (!seaf_repo_manager_server_is_pro (seaf->repo_mgr, host)) return; gint64 now = (gint64)time(NULL); if (server_state->http_version == 0 || server_state->locked_files_not_supported || server_state->checking_locked_files) return; if (server_state->immediate_check_locked_files) { server_state->immediate_check_locked_files = FALSE; check_locked_files_one_server_immediately (mgr, host, server_state, repos, TRUE); return; } if (server_state->last_check_locked_files_time > 0 && now - server_state->last_check_locked_files_time < CHECK_FOLDER_PERMS_INTERVAL) return; check_locked_files_one_server_immediately (mgr, host, server_state, repos, FALSE); } static void check_server_locked_files (SeafSyncManager *mgr, GList *repos) { GHashTableIter iter; gpointer key, value; char *host; HttpServerState *state; g_hash_table_iter_init (&iter, mgr->http_server_states); while (g_hash_table_iter_next (&iter, &key, &value)) { host = key; state = value; check_locked_files_one_server (mgr, host, state, repos); } } #if 0 static void print_active_paths (SeafSyncManager *mgr) { int n = seaf_sync_manager_active_paths_number(mgr); seaf_message ("%d active paths\n\n", n); if (n < 10) { char *paths_json = seaf_sync_manager_list_active_paths_json (mgr); seaf_message ("%s\n", paths_json); g_free (paths_json); } } #endif static char * parse_jwt_token (const char *rsp_content, gint64 rsp_size) { json_t *object = NULL; json_error_t jerror; const char *member = NULL; char *jwt_token = NULL; object = json_loadb (rsp_content, rsp_size, 0, &jerror); if (!object) { return NULL; } if (json_object_has_member (object, "jwt_token")) { member = json_object_get_string_member (object, "jwt_token"); if (member) jwt_token = g_strdup (member); } else { json_decref (object); return NULL; } json_decref (object); return jwt_token; } #define HTTP_FORBIDDEN 403 #define HTTP_NOT_FOUND 404 #define HTTP_SERVERR 500 #define HTTP_SERVERR_BAD_GATEWAY 502 #define HTTP_SERVERR_UNAVAILABLE 503 #define HTTP_SERVERR_TIMEOUT 504 typedef struct _GetJwtTokenAux { HttpServerState *state; char *repo_id; } GetJwtTokenAux; static void fileserver_get_jwt_token_cb (HttpAPIGetResult *result, void *user_data) { GetJwtTokenAux *aux = user_data; HttpServerState *state = aux->state; char *repo_id = aux->repo_id; SeafRepo *repo = NULL; char *jwt_token = NULL; state->n_jwt_token_request--; if (result->http_status == HTTP_NOT_FOUND || result->http_status == HTTP_FORBIDDEN || result->http_status == HTTP_SERVERR) { goto out; } if (!result->success) { goto out; } repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id); if (!repo) goto out; jwt_token = parse_jwt_token (result->rsp_content,result->rsp_size); if (!jwt_token) { seaf_warning ("Failed to parse jwt token for repo %s\n", repo->id); goto out; } g_free (repo->jwt_token); repo->jwt_token = jwt_token; out: g_free (aux->repo_id); g_free (aux); return; } inline static gboolean periodic_sync_due (SeafRepo *repo) { int now = (int)time(NULL); return (now > (repo->last_sync_time + repo->sync_interval)); } #if defined WIN32 || defined __APPLE__ || defined COMPILE_LINUX_WS static int check_and_subscribe_repo (SeafSyncManager *mgr, SeafRepo *repo) { char *url = NULL; HttpServerState *state = g_hash_table_lookup (mgr->http_server_states, repo->server_url); if (!state || !state->notif_server_alive) { return 0; } if (state->n_jwt_token_request > 10) { return 0; } gint64 now = (gint64)time(NULL); if (now - repo->last_check_jwt_token > JWT_TOKEN_EXPIRE_TIME) { repo->last_check_jwt_token = now; if (!repo->use_fileserver_port) url = g_strdup_printf ("%s/seafhttp/repo/%s/jwt-token", repo->effective_host, repo->id); else url = g_strdup_printf ("%s/repo/%s/jwt-token", repo->effective_host, repo->id); state->n_jwt_token_request++; GetJwtTokenAux *aux = g_new0 (GetJwtTokenAux, 1); aux->repo_id = g_strdup (repo->id); aux->state = state; if (http_tx_manager_fileserver_api_get (seaf->http_tx_mgr, repo->effective_host, url, repo->token, fileserver_get_jwt_token_cb, aux) < 0) { g_free (aux->repo_id); g_free (aux); state->n_jwt_token_request--; } g_free (url); return 0; } if (!seaf_notif_manager_is_repo_subscribed (seaf->notif_mgr, repo)) { if (repo->jwt_token && repo->server_url) seaf_notif_manager_subscribe_repo (seaf->notif_mgr, repo); } return 0; } #endif static int auto_sync_pulse (void *vmanager) { SeafSyncManager *manager = vmanager; GList *repos, *ptr; SeafRepo *repo; repos = seaf_repo_manager_get_repo_list (manager->seaf->repo_mgr, -1, -1); check_folder_permissions (manager, repos); check_server_locked_files (manager, repos); /* Sort repos by last_sync_time, so that we don't "starve" any repo. */ repos = g_list_sort_with_data (repos, cmp_repos_by_sync_time, NULL); for (ptr = repos; ptr != NULL; ptr = ptr->next) { repo = ptr->data; if (!manager->priv->auto_sync_enabled || !repo->auto_sync) continue; /* Every second, we'll check the worktree to see if it still exists. * We'll invalidate worktree if it gets moved or deleted. * But there is a hole here: If the user delete the worktree dir and * recreate a dir with the same name within a second, we'll falsely * see the worktree as valid. What's worse, the new worktree dir won't * be monitored. * This problem can only be solved by restart. */ /* If repo has been checked out and the worktree doesn't exist, * we'll delete the repo automatically. */ if (repo->head != NULL) { if (seaf_repo_check_worktree (repo) < 0) { if (!repo->worktree_invalid) { // The repo worktree was valid, but now it's invalid seaf_repo_manager_invalidate_repo_worktree (seaf->repo_mgr, repo); if (!seafile_session_config_get_allow_invalid_worktree(seaf)) { auto_delete_repo (manager, repo); } } continue; } else { if (repo->worktree_invalid) { // The repo worktree was invalid, but now it's valid again, // so we start watch it seaf_repo_manager_validate_repo_worktree (seaf->repo_mgr, repo); continue; } } } repo->worktree_invalid = FALSE; #ifdef USE_GPL_CRYPTO if (repo->version == 0 || (repo->encrypted && repo->enc_version < 2)) { continue; } #endif if (!repo->token) { /* If the user has logged out of the account, the repo token would * be null */ seaf_debug ("repo token of %s (%.8s) is null, would not sync it\n", repo->name, repo->id); continue; } /* Don't sync repos not checked out yet. */ if (!repo->head) continue; gint64 now = (gint64)time(NULL); #if defined WIN32 || defined __APPLE__ if (repo->version > 0) { if (repo->checking_locked_files) continue; // Since delayed file updates requires blocks from the server, we need to check the protocol information before updating. if (check_http_protocol (manager, repo)) { if (repo->last_check_locked_time == 0 || now - repo->last_check_locked_time >= CHECK_LOCKED_FILES_INTERVAL) { repo->checking_locked_files = TRUE; if (seaf_job_manager_schedule_job (seaf->job_mgr, check_locked_files, check_locked_files_done, repo) < 0) { seaf_warning ("Failed to schedule check local locked files\n"); repo->checking_locked_files = FALSE; } else { repo->last_check_locked_time = now; } } } } #endif SyncInfo *info = get_sync_info (manager, repo->id); if (info->in_sync) continue; if (info->sync_perm_err_cnt > SYNC_PERM_ERROR_RETRY_TIME) continue; if (repo->version > 0) { /* For repo version > 0, only use http sync. */ if (check_http_protocol (manager, repo)) { #if defined WIN32 || defined __APPLE__ || defined COMPILE_LINUX_WS if (check_notif_server (manager, repo)) { seaf_notif_manager_connect_server (seaf->notif_mgr, repo->server_url, repo->use_fileserver_port); } #endif if (repo->sync_interval == 0) { if (sync_repo_v2 (manager, repo, FALSE) < 0) continue; #if defined WIN32 || defined __APPLE__ || defined COMPILE_LINUX_WS check_and_subscribe_repo (manager, repo); #endif } else if (periodic_sync_due (repo)) { if (sync_repo_v2 (manager, repo, TRUE) < 0) continue; #if defined WIN32 || defined __APPLE__ || defined COMPILE_LINUX_WS check_and_subscribe_repo (manager, repo); #endif } } } else { seaf_warning ("Repo %s(%s) is version 0 library. Syncing is no longer supported.\n", repo->name, repo->id); } } g_list_free (repos); return TRUE; } static void on_repo_http_fetched (SeafileSession *seaf, HttpTxTask *tx_task, SeafSyncManager *manager) { SyncInfo *info = get_sync_info (manager, tx_task->repo_id); SyncTask *task = info->current_task; /* Clone tasks are handled by clone manager. */ if (tx_task->is_clone) return; if (task->repo->delete_pending) { transition_sync_state (task, SYNC_STATE_CANCELED); seaf_repo_manager_del_repo (seaf->repo_mgr, task->repo); return; } if (tx_task->state == HTTP_TASK_STATE_FINISHED) { memcpy (info->head_commit, tx_task->head, 41); transition_sync_state (task, SYNC_STATE_DONE); } else if (tx_task->state == HTTP_TASK_STATE_CANCELED) { transition_sync_state (task, SYNC_STATE_CANCELED); } else if (tx_task->state == HTTP_TASK_STATE_ERROR) { if (tx_task->error == SYNC_ERROR_ID_SERVER_REPO_DELETED) { on_repo_deleted_on_server (task, task->repo); } else { set_task_error (task, tx_task->error); } } } static void on_repo_http_uploaded (SeafileSession *seaf, HttpTxTask *tx_task, SeafSyncManager *manager) { SyncInfo *info = get_sync_info (manager, tx_task->repo_id); SyncTask *task = info->current_task; g_return_if_fail (task != NULL && info->in_sync); if (task->repo->delete_pending) { transition_sync_state (task, SYNC_STATE_CANCELED); seaf_repo_manager_del_repo (seaf->repo_mgr, task->repo); return; } if (tx_task->state == HTTP_TASK_STATE_FINISHED) { memcpy (info->head_commit, tx_task->head, 41); /* Save current head commit id for GC. */ seaf_repo_manager_set_repo_property (seaf->repo_mgr, task->repo->id, REPO_LOCAL_HEAD, task->repo->head->commit_id); task->uploaded = TRUE; check_head_commit_http (task); } else if (tx_task->state == HTTP_TASK_STATE_CANCELED) { transition_sync_state (task, SYNC_STATE_CANCELED); } else if (tx_task->state == HTTP_TASK_STATE_ERROR) { if (tx_task->error == SYNC_ERROR_ID_SERVER_REPO_DELETED) { on_repo_deleted_on_server (task, task->repo); } else { set_task_error (task, tx_task->error); } } } const char * sync_state_to_str (int state) { if (state < 0 || state >= SYNC_STATE_NUM) { seaf_warning ("illegal sync state: %d\n", state); return NULL; } return sync_state_str[state]; } static void disable_auto_sync_for_repos (SeafSyncManager *mgr) { GList *repos; GList *ptr; SeafRepo *repo; repos = seaf_repo_manager_get_repo_list (seaf->repo_mgr, -1, -1); for (ptr = repos; ptr; ptr = ptr->next) { repo = ptr->data; if (repo->sync_interval == 0) seaf_wt_monitor_unwatch_repo (seaf->wt_monitor, repo->id); seaf_sync_manager_cancel_sync_task (mgr, repo->id); seaf_sync_manager_remove_active_path_info (mgr, repo->id); } g_list_free (repos); } int seaf_sync_manager_disable_auto_sync (SeafSyncManager *mgr) { if (!seaf->started) { seaf_message ("sync manager is not started, skip disable auto sync.\n"); return -1; } disable_auto_sync_for_repos (mgr); mgr->priv->auto_sync_enabled = FALSE; g_debug ("[sync mgr] auto sync is disabled\n"); return 0; } static void enable_auto_sync_for_repos (SeafSyncManager *mgr) { GList *repos; GList *ptr; SeafRepo *repo; repos = seaf_repo_manager_get_repo_list (seaf->repo_mgr, -1, -1); for (ptr = repos; ptr; ptr = ptr->next) { repo = ptr->data; if (repo->sync_interval == 0) seaf_wt_monitor_watch_repo (seaf->wt_monitor, repo->id, repo->worktree); } g_list_free (repos); } int seaf_sync_manager_enable_auto_sync (SeafSyncManager *mgr) { if (!seaf->started) { seaf_message ("sync manager is not started, skip enable auto sync.\n"); return -1; } enable_auto_sync_for_repos (mgr); mgr->priv->auto_sync_enabled = TRUE; g_debug ("[sync mgr] auto sync is enabled\n"); return 0; } int seaf_sync_manager_is_auto_sync_enabled (SeafSyncManager *mgr) { if (mgr->priv->auto_sync_enabled) return 1; else return 0; } static ActivePathsInfo * active_paths_info_new (SeafRepo *repo) { ActivePathsInfo *info = g_new0 (ActivePathsInfo, 1); info->paths = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL); info->syncing_tree = sync_status_tree_new (repo->worktree); info->synced_tree = sync_status_tree_new (repo->worktree); return info; } static void active_paths_info_free (ActivePathsInfo *info) { if (!info) return; g_hash_table_destroy (info->paths); sync_status_tree_free (info->syncing_tree); sync_status_tree_free (info->synced_tree); g_free (info); } void seaf_sync_manager_update_active_path (SeafSyncManager *mgr, const char *repo_id, const char *path, int mode, SyncStatus status, gboolean refresh) { ActivePathsInfo *info; SeafRepo *repo; if (!repo_id || !path) { seaf_warning ("BUG: empty repo_id or path.\n"); return; } if (status <= SYNC_STATUS_NONE || status >= N_SYNC_STATUS) { seaf_warning ("BUG: invalid sync status %d.\n", status); return; } pthread_mutex_lock (&mgr->priv->paths_lock); info = g_hash_table_lookup (mgr->priv->active_paths, repo_id); if (!info) { repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id); if (!repo) { pthread_mutex_unlock (&mgr->priv->paths_lock); return; } info = active_paths_info_new (repo); g_hash_table_insert (mgr->priv->active_paths, g_strdup(repo_id), info); } SyncStatus existing = (SyncStatus) g_hash_table_lookup (info->paths, path); if (!existing) { g_hash_table_insert (info->paths, g_strdup(path), (void*)status); if (status == SYNC_STATUS_SYNCING) sync_status_tree_add (info->syncing_tree, path, mode, refresh); else if (status == SYNC_STATUS_SYNCED) sync_status_tree_add (info->synced_tree, path, mode, refresh); #ifdef WIN32 else if (refresh) seaf_sync_manager_add_refresh_path (mgr, path); #endif } else if (existing != status) { g_hash_table_replace (info->paths, g_strdup(path), (void*)status); if (existing == SYNC_STATUS_SYNCING) sync_status_tree_del (info->syncing_tree, path); else if (existing == SYNC_STATUS_SYNCED) sync_status_tree_del (info->synced_tree, path); if (status == SYNC_STATUS_SYNCING) sync_status_tree_add (info->syncing_tree, path, mode, refresh); else if (status == SYNC_STATUS_SYNCED) sync_status_tree_add (info->synced_tree, path, mode, refresh); #ifdef WIN32 else if (refresh) seaf_sync_manager_add_refresh_path (mgr, path); #endif } pthread_mutex_unlock (&mgr->priv->paths_lock); } void seaf_sync_manager_delete_active_path (SeafSyncManager *mgr, const char *repo_id, const char *path) { ActivePathsInfo *info; if (!repo_id || !path) { seaf_warning ("BUG: empty repo_id or path.\n"); return; } pthread_mutex_lock (&mgr->priv->paths_lock); info = g_hash_table_lookup (mgr->priv->active_paths, repo_id); if (!info) { pthread_mutex_unlock (&mgr->priv->paths_lock); return; } g_hash_table_remove (info->paths, path); sync_status_tree_del (info->syncing_tree, path); sync_status_tree_del (info->synced_tree, path); pthread_mutex_unlock (&mgr->priv->paths_lock); } static char *path_status_tbl[] = { "none", "syncing", "error", "ignored", "synced", "paused", "readonly", "locked", "locked_by_me", NULL, }; static char * get_repo_sync_status (SeafSyncManager *mgr, const char *repo_id) { SyncInfo *info = get_sync_info (mgr, repo_id); SeafRepo *repo; if (info->in_error) return g_strdup(path_status_tbl[SYNC_STATUS_ERROR]); repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id); if (!repo) return g_strdup(path_status_tbl[SYNC_STATUS_NONE]); if (!repo->auto_sync || !mgr->priv->auto_sync_enabled) return g_strdup(path_status_tbl[SYNC_STATUS_PAUSED]); char allzeros[41] = {0}; if (!info->in_sync && memcmp(allzeros, info->head_commit, 41) == 0) return g_strdup(path_status_tbl[SYNC_STATUS_NONE]); if (info->in_sync && (info->current_task->state == SYNC_STATE_COMMIT || info->current_task->state == SYNC_STATE_FETCH || info->current_task->state == SYNC_STATE_UPLOAD || info->current_task->state == SYNC_STATE_MERGE)) return g_strdup(path_status_tbl[SYNC_STATUS_SYNCING]); else if (!repo->is_readonly) return g_strdup(path_status_tbl[SYNC_STATUS_SYNCED]); else return g_strdup(path_status_tbl[SYNC_STATUS_READONLY]); } char * seaf_sync_manager_get_path_sync_status (SeafSyncManager *mgr, const char *repo_id, const char *path, gboolean is_dir) { ActivePathsInfo *info; SyncInfo *sync_info; SyncStatus ret = SYNC_STATUS_NONE; if (!repo_id || !path) { seaf_warning ("BUG: empty repo_id or path.\n"); return NULL; } if (path[0] == 0) { return get_repo_sync_status (mgr, repo_id); } /* If the repo is in error, all files in it should show no sync status. */ sync_info = get_sync_info (mgr, repo_id); if (sync_info && sync_info->in_error) { ret = SYNC_STATUS_NONE; goto out; } pthread_mutex_lock (&mgr->priv->paths_lock); info = g_hash_table_lookup (mgr->priv->active_paths, repo_id); if (!info) { pthread_mutex_unlock (&mgr->priv->paths_lock); ret = SYNC_STATUS_NONE; goto out; } ret = (SyncStatus) g_hash_table_lookup (info->paths, path); if (is_dir && (ret == SYNC_STATUS_NONE)) { /* If a dir is not in the syncing tree but in the synced tree, * it's synced. Otherwise if it's in the syncing tree, some files * under it must be syncing, so it should be in syncing status too. */ if (sync_status_tree_exists (info->syncing_tree, path)) ret = SYNC_STATUS_SYNCING; else if (sync_status_tree_exists (info->synced_tree, path)) ret = SYNC_STATUS_SYNCED; } pthread_mutex_unlock (&mgr->priv->paths_lock); if (ret == SYNC_STATUS_SYNCED) { if (!seaf_repo_manager_is_path_writable(seaf->repo_mgr, repo_id, path)) ret = SYNC_STATUS_READONLY; else if (seaf_filelock_manager_is_file_locked_by_me (seaf->filelock_mgr, repo_id, path)) ret = SYNC_STATUS_LOCKED_BY_ME; else if (seaf_filelock_manager_is_file_locked (seaf->filelock_mgr, repo_id, path)) ret = SYNC_STATUS_LOCKED; } out: return g_strdup(path_status_tbl[ret]); } static json_t * active_paths_to_json (GHashTable *paths) { json_t *array = NULL, *obj = NULL; GHashTableIter iter; gpointer key, value; char *path; SyncStatus status; array = json_array (); g_hash_table_iter_init (&iter, paths); while (g_hash_table_iter_next (&iter, &key, &value)) { path = key; status = (SyncStatus)value; obj = json_object (); json_object_set (obj, "path", json_string(path)); json_object_set (obj, "status", json_string(path_status_tbl[status])); json_array_append (array, obj); } return array; } char * seaf_sync_manager_list_active_paths_json (SeafSyncManager *mgr) { json_t *array = NULL, *obj = NULL, *path_array = NULL; GHashTableIter iter; gpointer key, value; char *repo_id; ActivePathsInfo *info; char *ret = NULL; pthread_mutex_lock (&mgr->priv->paths_lock); array = json_array (); g_hash_table_iter_init (&iter, mgr->priv->active_paths); while (g_hash_table_iter_next (&iter, &key, &value)) { repo_id = key; info = value; obj = json_object(); path_array = active_paths_to_json (info->paths); json_object_set (obj, "repo_id", json_string(repo_id)); json_object_set (obj, "paths", path_array); json_array_append (array, obj); } pthread_mutex_unlock (&mgr->priv->paths_lock); ret = json_dumps (array, JSON_INDENT(4)); if (!ret) { seaf_warning ("Failed to convert active paths to json\n"); } json_decref (array); return ret; } int seaf_sync_manager_active_paths_number (SeafSyncManager *mgr) { GHashTableIter iter; gpointer key, value; ActivePathsInfo *info; int ret = 0; g_hash_table_iter_init (&iter, mgr->priv->active_paths); while (g_hash_table_iter_next (&iter, &key, &value)) { info = value; ret += g_hash_table_size(info->paths); } return ret; } void seaf_sync_manager_remove_active_path_info (SeafSyncManager *mgr, const char *repo_id) { pthread_mutex_lock (&mgr->priv->paths_lock); g_hash_table_remove (mgr->priv->active_paths, repo_id); pthread_mutex_unlock (&mgr->priv->paths_lock); #ifdef WIN32 /* This is a hack to tell Windows Explorer to refresh all open windows. */ SHChangeNotify (SHCNE_ASSOCCHANGED, SHCNF_IDLIST, NULL, NULL); #endif } #ifdef WIN32 static wchar_t * win_path (const char *path) { char *ret = g_strdup(path); wchar_t *ret_w; char *p; for (p = ret; *p != 0; ++p) if (*p == '/') *p = '\\'; ret_w = g_utf8_to_utf16 (ret, -1, NULL, NULL, NULL); g_free (ret); return ret_w; } static void * refresh_windows_explorer_thread (void *vdata) { GAsyncQueue *q = vdata; char *path; wchar_t *wpath; int count = 0; while (1) { path = g_async_queue_pop (q); wpath = win_path (path); SHChangeNotify (SHCNE_ATTRIBUTES, SHCNF_PATHW, wpath, NULL); g_free (path); g_free (wpath); if (++count >= 100) { g_usleep (G_USEC_PER_SEC); count = 0; } } return NULL; } void seaf_sync_manager_add_refresh_path (SeafSyncManager *mgr, const char *path) { g_async_queue_push (mgr->priv->refresh_paths, g_strdup(path)); } void seaf_sync_manager_refresh_path (SeafSyncManager *mgr, const char *path) { wchar_t *wpath; wpath = win_path (path); SHChangeNotify (SHCNE_ATTRIBUTES, SHCNF_PATHW, wpath, NULL); g_free (wpath); } #endif static void update_head_commit_ids_for_server (gpointer key, gpointer value, gpointer user_data) { char *server_url = key; HttpServerState *state = value; int status = 200; /* Only get head commit ids from server if: * 1. syncing protocol version has been checked, and * 2. protocol version is at least 2. */ if (state->http_version >= 2) { seaf_debug ("Updating repo head commit ids for server %s.\n", server_url); GList *repo_id_list = seaf_repo_manager_get_repo_id_list_by_server (seaf->repo_mgr, server_url); if (!repo_id_list) { return; } GHashTable *new_map = http_tx_manager_get_head_commit_ids (seaf->http_tx_mgr, state->effective_host, state->use_fileserver_port, repo_id_list, &status); if (new_map) { //If the fileserver hangs up before sending events to the notification server, // the client will not receive the updates, and some updates will be lost. // Therefore, after the fileserver recovers, immediately checking locks and folder perms. if (state->server_disconnected) { seaf_sync_manager_check_locks_and_folder_perms (seaf->sync_mgr, server_url); } state->server_disconnected = FALSE; pthread_mutex_lock (&state->head_commit_map_lock); g_hash_table_destroy (state->head_commit_map); state->head_commit_map = new_map; if (!state->head_commit_map_init) state->head_commit_map_init = TRUE; state->last_update_head_commit_map_time = (gint64)time(NULL); pthread_mutex_unlock (&state->head_commit_map_lock); } else { if (status == HTTP_SERVERR_BAD_GATEWAY || status == HTTP_SERVERR_UNAVAILABLE || status == HTTP_SERVERR_TIMEOUT) { state->server_disconnected = TRUE; } } g_list_free_full (repo_id_list, g_free); } } static void * update_cached_head_commit_ids (void *arg) { SeafSyncManager *mgr = (SeafSyncManager *)arg; while (1) { g_usleep (30 * G_USEC_PER_SEC); g_hash_table_foreach (mgr->http_server_states, update_head_commit_ids_for_server, mgr); } return NULL; }