seafile/daemon/sync-mgr.c
feiniks 8e0e9ce76b
Checkout file directly (#2781)
* Checkout file directly

Add download progress

Return error when cancel task

Check if usr is NULL when get notif server

Adjust code struct

Checkout file from offset

Don't use http task

Adjust function parameters

* Support set file_id attr to tmp file

* user error_id to check failed

---------

Co-authored-by: yangheran <heran.yang@seafile.com>
2024-07-03 16:46:58 +08:00

3269 lines
100 KiB
C

/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#include "common.h"
#include <pthread.h>
#include "db.h"
#include "seafile-session.h"
#include "seafile-config.h"
#include "sync-mgr.h"
#include "seafile-error-impl.h"
#include "mq-mgr.h"
#include "utils.h"
#include "vc-utils.h"
#include "sync-status-tree.h"
#include "diff-simple.h"
#ifdef WIN32
#include <shlobj.h>
#endif
#define DEBUG_FLAG SEAFILE_DEBUG_SYNC
#include "log.h"
#include "timer.h"
#define DEFAULT_SYNC_INTERVAL 30 /* 30s */
#define CHECK_SYNC_INTERVAL 1000 /* 1s */
#define UPDATE_TX_STATE_INTERVAL 1000 /* 1s */
#define MAX_RUNNING_SYNC_TASKS 5
#define CHECK_LOCKED_FILES_INTERVAL 10 /* 10s */
#define CHECK_FOLDER_PERMS_INTERVAL 30 /* 30s */
#define JWT_TOKEN_EXPIRE_TIME 3*24*3600 /* 3 days */
#define SYNC_PERM_ERROR_RETRY_TIME 2
struct _HttpServerState {
int http_version;
gboolean checking;
gint64 last_http_check_time;
char *testing_host;
/* Can be server_url or server_url:8082, depends on which one works. */
char *effective_host;
gboolean use_fileserver_port;
gboolean notif_server_checked;
gboolean notif_server_alive;
gboolean server_disconnected;
gboolean folder_perms_not_supported;
gint64 last_check_perms_time;
gboolean checking_folder_perms;
gboolean locked_files_not_supported;
gint64 last_check_locked_files_time;
gboolean checking_locked_files;
gboolean immediate_check_folder_perms;
gboolean immediate_check_locked_files;
/*
* repo_id -> head commit id mapping.
* Caches the head commit ids of synced repos.
*/
GHashTable *head_commit_map;
pthread_mutex_t head_commit_map_lock;
gboolean head_commit_map_init;
gint64 last_update_head_commit_map_time;
gint64 n_jwt_token_request;
};
typedef struct _HttpServerState HttpServerState;
typedef struct DelConfirmationResult {
gboolean resync;
} DelConfirmationResult;
struct _SeafSyncManagerPriv {
struct SeafTimer *check_sync_timer;
struct SeafTimer *update_tx_state_timer;
int pulse_count;
/* When FALSE, auto sync is globally disabled */
gboolean auto_sync_enabled;
GHashTable *active_paths;
pthread_mutex_t paths_lock;
#ifdef WIN32
GAsyncQueue *refresh_paths;
struct SeafTimer *refresh_windows_timer;
#endif
pthread_mutex_t del_confirmation_lock;
GHashTable *del_confirmation_tasks;
};
struct _ActivePathsInfo {
GHashTable *paths;
struct SyncStatusTree *syncing_tree;
struct SyncStatusTree *synced_tree;
};
typedef struct _ActivePathsInfo ActivePathsInfo;
static int auto_sync_pulse (void *vmanager);
static void on_repo_http_fetched (SeafileSession *seaf,
HttpTxTask *tx_task,
SeafSyncManager *manager);
static void on_repo_http_uploaded (SeafileSession *seaf,
HttpTxTask *tx_task,
SeafSyncManager *manager);
static inline void
transition_sync_state (SyncTask *task, int new_state);
static void sync_task_free (SyncTask *task);
static int
sync_repo_v2 (SeafSyncManager *manager, SeafRepo *repo, gboolean is_manual_sync);
static gboolean
check_http_protocol (SeafSyncManager *mgr, SeafRepo *repo);
static void
active_paths_info_free (ActivePathsInfo *info);
static HttpServerState *
http_server_state_new ()
{
HttpServerState *state = g_new0 (HttpServerState, 1);
state->head_commit_map = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free);
pthread_mutex_init (&state->head_commit_map_lock, NULL);
return state;
}
static void
http_server_state_free (HttpServerState *state)
{
if (!state)
return;
g_hash_table_destroy (state->head_commit_map);
pthread_mutex_destroy (&state->head_commit_map_lock);
g_free (state);
}
SeafSyncManager*
seaf_sync_manager_new (SeafileSession *seaf)
{
SeafSyncManager *mgr = g_new0 (SeafSyncManager, 1);
mgr->priv = g_new0 (SeafSyncManagerPriv, 1);
mgr->priv->auto_sync_enabled = TRUE;
mgr->seaf = seaf;
mgr->sync_interval = DEFAULT_SYNC_INTERVAL;
mgr->sync_infos = g_hash_table_new (g_str_hash, g_str_equal);
mgr->http_server_states = g_hash_table_new_full (g_str_hash, g_str_equal,
g_free,
(GDestroyNotify)http_server_state_free);
gboolean exists;
int download_limit = seafile_session_config_get_int (seaf,
KEY_DOWNLOAD_LIMIT,
&exists);
if (exists)
mgr->download_limit = download_limit;
int upload_limit = seafile_session_config_get_int (seaf,
KEY_UPLOAD_LIMIT,
&exists);
if (exists)
mgr->upload_limit = upload_limit;
mgr->priv->active_paths = g_hash_table_new_full (g_str_hash, g_str_equal,
g_free,
(GDestroyNotify)active_paths_info_free);
pthread_mutex_init (&mgr->priv->paths_lock, NULL);
#ifdef WIN32
mgr->priv->refresh_paths = g_async_queue_new ();
#endif
mgr->priv->del_confirmation_tasks = g_hash_table_new_full (g_str_hash, g_str_equal,
g_free,
g_free);
pthread_mutex_init (&mgr->priv->del_confirmation_lock, NULL);
return mgr;
}
static SyncInfo*
get_sync_info (SeafSyncManager *manager, const char *repo_id)
{
SyncInfo *info = g_hash_table_lookup (manager->sync_infos, repo_id);
if (info) return info;
info = g_new0 (SyncInfo, 1);
memcpy (info->repo_id, repo_id, 36);
g_hash_table_insert (manager->sync_infos, info->repo_id, info);
return info;
}
SyncInfo *
seaf_sync_manager_get_sync_info (SeafSyncManager *mgr,
const char *repo_id)
{
return g_hash_table_lookup (mgr->sync_infos, repo_id);
}
int
seaf_sync_manager_init (SeafSyncManager *mgr)
{
return 0;
}
static void
format_http_task_detail (HttpTxTask *task, GString *buf)
{
if (task->state != HTTP_TASK_STATE_NORMAL ||
task->runtime_state == HTTP_TASK_RT_STATE_INIT ||
task->runtime_state == HTTP_TASK_RT_STATE_FINISHED)
return;
SeafRepo *repo = seaf_repo_manager_get_repo (seaf->repo_mgr,
task->repo_id);
char *repo_name;
char *type;
if (repo) {
repo_name = repo->name;
type = (task->type == HTTP_TASK_TYPE_UPLOAD) ? "upload" : "download";
} else if (task->is_clone) {
CloneTask *ctask;
ctask = seaf_clone_manager_get_task (seaf->clone_mgr, task->repo_id);
repo_name = ctask->repo_name;
type = "download";
} else {
return;
}
int rate = http_tx_task_get_rate(task);
g_string_append_printf (buf, "%s\t%d %s\n", type, rate, repo_name);
}
/*
* Publish a notification message to report :
*
* [uploading/downloading]\t[transfer-rate] [repo-name]\n
*/
static int
update_tx_state (void *vmanager)
{
SeafSyncManager *mgr = vmanager;
GString *buf = g_string_new (NULL);
GList *tasks, *ptr;
HttpTxTask *http_task;
mgr->last_sent_bytes = g_atomic_int_get (&mgr->sent_bytes);
g_atomic_int_set (&mgr->sent_bytes, 0);
mgr->last_recv_bytes = g_atomic_int_get (&mgr->recv_bytes);
g_atomic_int_set (&mgr->recv_bytes, 0);
tasks = http_tx_manager_get_upload_tasks (seaf->http_tx_mgr);
for (ptr = tasks; ptr; ptr = ptr->next) {
http_task = ptr->data;
format_http_task_detail (http_task, buf);
}
g_list_free (tasks);
tasks = http_tx_manager_get_download_tasks (seaf->http_tx_mgr);
for (ptr = tasks; ptr; ptr = ptr->next) {
http_task = ptr->data;
format_http_task_detail (http_task, buf);
}
g_list_free (tasks);
if (buf->len != 0)
seaf_mq_manager_publish_notification (seaf->mq_mgr, "transfer",
buf->str);
g_string_free (buf, TRUE);
return TRUE;
}
#ifdef WIN32
static void *
refresh_windows_explorer_thread (void *vdata);
#define STARTUP_REFRESH_WINDOWS_DELAY 10000
static int
refresh_all_windows_on_startup (void *vdata)
{
/* This is a hack to tell Windows Explorer to refresh all open windows.
* On startup, if there is one big library, its events may dominate the
* explorer refresh queue. Other libraries don't get refreshed until
* the big library's events are consumed. So we refresh the open windows
* to reduce the delay.
*/
SHChangeNotify (SHCNE_ASSOCCHANGED, SHCNF_IDLIST, NULL, NULL);
/* One time */
return 0;
}
#endif
static void *update_cached_head_commit_ids (void *arg);
int
seaf_sync_manager_start (SeafSyncManager *mgr)
{
mgr->priv->check_sync_timer = seaf_timer_new (
auto_sync_pulse, mgr, CHECK_SYNC_INTERVAL);
mgr->priv->update_tx_state_timer = seaf_timer_new (
update_tx_state, mgr, UPDATE_TX_STATE_INTERVAL);
g_signal_connect (seaf, "repo-http-fetched",
(GCallback)on_repo_http_fetched, mgr);
g_signal_connect (seaf, "repo-http-uploaded",
(GCallback)on_repo_http_uploaded, mgr);
#ifdef WIN32
seaf_job_manager_schedule_job (seaf->job_mgr,
refresh_windows_explorer_thread,
NULL,
mgr->priv->refresh_paths);
mgr->priv->refresh_windows_timer = seaf_timer_new (
refresh_all_windows_on_startup, mgr, STARTUP_REFRESH_WINDOWS_DELAY);
#endif
pthread_t tid;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if (pthread_create (&tid, &attr, update_cached_head_commit_ids, mgr) < 0) {
seaf_warning ("Failed to create update cached head commit id thread.\n");
return -1;
}
return 0;
}
int
seaf_sync_manager_add_sync_task (SeafSyncManager *mgr,
const char *repo_id,
GError **error)
{
if (!seaf->started) {
seaf_message ("sync manager is not started, skip sync request.\n");
return -1;
}
SeafRepo *repo;
repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id);
if (!repo) {
seaf_warning ("[sync mgr] cannot find repo %s.\n", repo_id);
g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_REPO, "Invalid repo");
return -1;
}
if (seaf_repo_check_worktree (repo) < 0) {
g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_NO_WORKTREE,
"Worktree doesn't exist");
return -1;
}
#ifdef USE_GPL_CRYPTO
if (repo->version == 0 || (repo->encrypted && repo->enc_version < 2)) {
seaf_warning ("Don't support syncing old version libraries.\n");
g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_BAD_ARGS,
"Don't support syncing old version libraries");
return -1;
}
#endif
SyncInfo *info = get_sync_info (mgr, repo->id);
if (info->in_sync)
return 0;
if (repo->version > 0) {
if (check_http_protocol (mgr, repo)) {
sync_repo_v2 (mgr, repo, TRUE);
return 0;
}
} else {
seaf_warning ("Repo %s(%s) is version 0 library. Syncing is no longer supported.\n",
repo->name, repo->id);
}
return 0;
}
void
seaf_sync_manager_cancel_sync_task (SeafSyncManager *mgr,
const char *repo_id)
{
SyncInfo *info;
SyncTask *task;
if (!seaf->started) {
seaf_message ("sync manager is not started, skip cancel request.\n");
return;
}
/* Cancel running task. */
info = g_hash_table_lookup (mgr->sync_infos, repo_id);
if (!info)
return;
else if (!info->in_sync) {
if (info->current_task && info->current_task->state == SYNC_STATE_ERROR) {
info->err_cnt = 0;
info->in_error = FALSE;
info->sync_perm_err_cnt = 0;
}
return;
}
g_return_if_fail (info->current_task != NULL);
task = info->current_task;
switch (task->state) {
case SYNC_STATE_FETCH:
http_tx_manager_cancel_task (seaf->http_tx_mgr,
repo_id,
HTTP_TASK_TYPE_DOWNLOAD);
transition_sync_state (task, SYNC_STATE_CANCEL_PENDING);
break;
case SYNC_STATE_UPLOAD:
http_tx_manager_cancel_task (seaf->http_tx_mgr,
repo_id,
HTTP_TASK_TYPE_UPLOAD);
transition_sync_state (task, SYNC_STATE_CANCEL_PENDING);
break;
case SYNC_STATE_COMMIT:
case SYNC_STATE_INIT:
case SYNC_STATE_MERGE:
transition_sync_state (task, SYNC_STATE_CANCEL_PENDING);
break;
case SYNC_STATE_CANCEL_PENDING:
break;
default:
g_return_if_reached ();
}
}
/* Check the notify setting by user. */
static gboolean
need_notify_sync (SeafRepo *repo)
{
char *notify_setting = seafile_session_config_get_string(seaf, "notify_sync");
if (notify_setting == NULL) {
seafile_session_config_set_string(seaf, "notify_sync", "on");
return TRUE;
}
gboolean result = (g_strcmp0(notify_setting, "on") == 0);
g_free (notify_setting);
return result;
}
static const char *sync_state_str[] = {
"synchronized",
"committing",
"initializing",
"downloading",
"merging",
"uploading",
"error",
"canceled",
"cancel pending"
};
static gboolean
find_meaningful_commit (SeafCommit *commit, void *data, gboolean *stop)
{
SeafCommit **p_head = data;
if (commit->second_parent_id && commit->new_merge && !commit->conflict)
return TRUE;
*stop = TRUE;
seaf_commit_ref (commit);
*p_head = commit;
return TRUE;
}
static void
notify_sync (SeafRepo *repo, gboolean is_multipart_upload)
{
SeafCommit *head = NULL;
if (!seaf_commit_manager_traverse_commit_tree_truncated (seaf->commit_mgr,
repo->id, repo->version,
repo->head->commit_id,
find_meaningful_commit,
&head, FALSE)) {
seaf_warning ("Failed to traverse commit tree of %.8s.\n", repo->id);
return;
}
if (!head)
return;
GString *buf = g_string_new (NULL);
g_string_append_printf (buf, "%s\t%s\t%s\t%s\t%s",
repo->name,
repo->id,
head->commit_id,
head->parent_id,
head->desc);
if (!is_multipart_upload)
seaf_mq_manager_publish_notification (seaf->mq_mgr,
"sync.done",
buf->str);
else
seaf_mq_manager_publish_notification (seaf->mq_mgr,
"sync.multipart_upload",
buf->str);
g_string_free (buf, TRUE);
seaf_commit_unref (head);
}
#define IN_ERROR_THRESHOLD 3
static gboolean
is_perm_error (int error)
{
return (error == SYNC_ERROR_ID_ACCESS_DENIED ||
error == SYNC_ERROR_ID_NO_WRITE_PERMISSION ||
error == SYNC_ERROR_ID_PERM_NOT_SYNCABLE ||
error == SYNC_ERROR_ID_FOLDER_PERM_DENIED ||
error == SYNC_ERROR_ID_TOO_MANY_FILES ||
error == SYNC_ERROR_ID_QUOTA_FULL);
}
static void
update_sync_info_error_state (SyncTask *task, int new_state)
{
SyncInfo *info = task->info;
if (new_state == SYNC_STATE_ERROR) {
info->err_cnt++;
if (info->err_cnt == IN_ERROR_THRESHOLD)
info->in_error = TRUE;
if (is_perm_error(task->error))
info->sync_perm_err_cnt++;
} else if (info->err_cnt > 0) {
info->err_cnt = 0;
info->in_error = FALSE;
info->sync_perm_err_cnt = 0;
}
}
static void commit_repo (SyncTask *task);
static void
transition_sync_state (SyncTask *task, int new_state)
{
g_return_if_fail (new_state >= 0 && new_state < SYNC_STATE_NUM);
SyncInfo *info = task->info;
if (task->state != new_state) {
if (((task->state == SYNC_STATE_INIT && task->uploaded) ||
task->state == SYNC_STATE_FETCH) &&
new_state == SYNC_STATE_DONE &&
need_notify_sync(task->repo))
notify_sync (task->repo, (info->multipart_upload && !info->end_multipart_upload));
/* If we're in the process of uploading large set of files, they'll be splitted
* into multiple batches for upload. We want to immediately start the next batch
* after previous one is done.
*/
if (new_state == SYNC_STATE_DONE &&
info->multipart_upload &&
!info->end_multipart_upload) {
commit_repo (task);
return;
}
/* If file error levels occured during sync, the whole sync process can still finish
* with DONE state. But we need to notify the user about this error in the interface.
* Such file level errors are set with seaf_sync_manager_set_task_error_code().
*/
if (new_state != SYNC_STATE_ERROR && task->error != SYNC_ERROR_ID_NO_ERROR) {
seaf_message ("Repo '%s' sync is finished but with error: %s\n",
task->repo->name,
sync_error_id_to_str(task->error));
}
if (!(task->state == SYNC_STATE_DONE && new_state == SYNC_STATE_INIT) &&
!(task->state == SYNC_STATE_INIT && new_state == SYNC_STATE_DONE)) {
seaf_message ("Repo '%s' sync state transition from '%s' to '%s'.\n",
task->repo->name,
sync_state_str[task->state],
sync_state_str[new_state]);
}
task->state = new_state;
if (new_state == SYNC_STATE_DONE ||
new_state == SYNC_STATE_CANCELED ||
new_state == SYNC_STATE_ERROR) {
info->in_sync = FALSE;
--(task->mgr->n_running_tasks);
update_sync_info_error_state (task, new_state);
/* Keep previous upload progress if sync task is canceled or failed. */
if (new_state == SYNC_STATE_DONE) {
info->multipart_upload = FALSE;
info->end_multipart_upload = FALSE;
info->total_bytes = 0;
info->uploaded_bytes = 0;
}
}
#ifdef WIN32
seaf_sync_manager_add_refresh_path (seaf->sync_mgr, task->repo->worktree);
#endif
}
}
static void
set_task_error (SyncTask *task, int error)
{
g_return_if_fail (error >= 0 && error < N_SYNC_ERROR_ID);
const char *err_str = sync_error_id_to_str(error);
int err_level = sync_error_level(error);
if (task->state != SYNC_STATE_ERROR) {
seaf_message ("Repo '%s' sync state transition from %s to '%s': '%s'.\n",
task->repo->name,
sync_state_str[task->state],
sync_state_str[SYNC_STATE_ERROR],
err_str);
task->state = SYNC_STATE_ERROR;
task->error = error;
task->info->in_sync = FALSE;
--(task->mgr->n_running_tasks);
update_sync_info_error_state (task, SYNC_STATE_ERROR);
/* For repo-level errors, only need to record in database, but not send notifications.
* File-level errors are recorded and notified in the location they happens, not here.
*/
if (err_level == SYNC_ERROR_LEVEL_REPO)
send_file_sync_error_notification (task->repo->id, task->repo->name, NULL, error);
#ifdef WIN32
seaf_sync_manager_add_refresh_path (seaf->sync_mgr, task->repo->worktree);
#endif
}
}
void
seaf_sync_manager_set_task_error_code (SeafSyncManager *mgr,
const char *repo_id,
int error)
{
SyncInfo *info = g_hash_table_lookup (mgr->sync_infos, repo_id);
if (!info)
return;
info->current_task->error = error;
}
static void
sync_task_free (SyncTask *task)
{
g_free (task->tx_id);
g_free (task->dest_id);
g_free (task->token);
g_free (task);
}
static void
start_upload_if_necessary (SyncTask *task)
{
GError *error = NULL;
SeafRepo *repo = task->repo;
if (http_tx_manager_add_upload (seaf->http_tx_mgr,
repo->id,
repo->version,
repo->effective_host,
repo->token,
task->http_version,
repo->use_fileserver_port,
&error) < 0) {
seaf_warning ("Failed to start http upload: %s\n", error->message);
set_task_error (task, SYNC_ERROR_ID_NOT_ENOUGH_MEMORY);
return;
}
task->tx_id = g_strdup(repo->id);
transition_sync_state (task, SYNC_STATE_UPLOAD);
}
static void
start_fetch_if_necessary (SyncTask *task, const char *remote_head)
{
GError *error = NULL;
SeafRepo *repo = task->repo;
if (http_tx_manager_add_download (seaf->http_tx_mgr,
repo->id,
repo->version,
repo->effective_host,
repo->token,
remote_head,
FALSE,
NULL,
NULL,
task->http_version,
repo->email,
repo->username,
repo->use_fileserver_port,
repo->name,
&error) < 0) {
seaf_warning ("Failed to start http download: %s.\n", error->message);
set_task_error (task, SYNC_ERROR_ID_NOT_ENOUGH_MEMORY);
return;
}
task->tx_id = g_strdup(repo->id);
transition_sync_state (task, SYNC_STATE_FETCH);
}
static gboolean
repo_block_store_exists (SeafRepo *repo)
{
gboolean ret;
char *store_path = g_build_filename (seaf->seaf_dir, "storage", "blocks",
repo->id, NULL);
if (g_file_test (store_path, G_FILE_TEST_IS_DIR))
ret = TRUE;
else
ret = FALSE;
g_free (store_path);
return ret;
}
#if defined WIN32 || defined __APPLE__
static GHashTable *
load_locked_files_blocks (const char *repo_id)
{
LockedFileSet *fset;
GHashTable *block_id_hash;
GHashTableIter iter;
gpointer key, value;
LockedFile *locked;
Seafile *file;
int i;
char *blk_id;
fset = seaf_repo_manager_get_locked_file_set (seaf->repo_mgr, repo_id);
block_id_hash = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
g_hash_table_iter_init (&iter, fset->locked_files);
while (g_hash_table_iter_next (&iter, &key, &value)) {
locked = value;
if (strcmp (locked->operation, LOCKED_OP_UPDATE) == 0) {
file = seaf_fs_manager_get_seafile (seaf->fs_mgr,
fset->repo_id, 1,
locked->file_id);
if (!file) {
seaf_warning ("Failed to find file %s in repo %.8s.\n",
locked->file_id, fset->repo_id);
continue;
}
for (i = 0; i < file->n_blocks; ++i) {
blk_id = g_strdup (file->blk_sha1s[i]);
g_hash_table_replace (block_id_hash, blk_id, blk_id);
}
seafile_unref (file);
}
}
locked_file_set_free (fset);
return block_id_hash;
}
static gboolean
remove_block_cb (const char *store_id,
int version,
const char *block_id,
void *user_data)
{
GHashTable *block_hash = user_data;
if (!g_hash_table_lookup (block_hash, block_id))
seaf_block_manager_remove_block (seaf->block_mgr, store_id, version, block_id);
return TRUE;
}
#endif
static void *
remove_repo_blocks (void *vtask)
{
SyncTask *task = vtask;
#if defined WIN32 || defined __APPLE__
GHashTable *block_hash;
block_hash = load_locked_files_blocks (task->repo->id);
if (g_hash_table_size (block_hash) == 0) {
g_hash_table_destroy (block_hash);
seaf_block_manager_remove_store (seaf->block_mgr, task->repo->id);
return vtask;
}
seaf_block_manager_foreach_block (seaf->block_mgr,
task->repo->id,
task->repo->version,
remove_block_cb,
block_hash);
g_hash_table_destroy (block_hash);
#else
seaf_block_manager_remove_store (seaf->block_mgr, task->repo->id);
#endif
return vtask;
}
static void
remove_blocks_done (void *vtask)
{
SyncTask *task = vtask;
transition_sync_state (task, SYNC_STATE_DONE);
}
static void
on_repo_deleted_on_server (SyncTask *task, SeafRepo *repo)
{
set_task_error (task, SYNC_ERROR_ID_SERVER_REPO_DELETED);
seaf_warning ("repo %s(%.8s) not found on server\n",
repo->name, repo->id);
if (!seafile_session_config_get_allow_repo_not_found_on_server(seaf)) {
seaf_message ("remove repo %s(%.8s) since it's deleted on relay\n",
repo->name, repo->id);
/* seaf_mq_manager_publish_notification (seaf->mq_mgr, */
/* "repo.deleted_on_relay", */
/* repo->name); */
seaf_repo_manager_del_repo (seaf->repo_mgr, repo);
}
}
static void
update_sync_status_v2 (SyncTask *task)
{
SyncInfo *info = task->info;
SeafRepo *repo = task->repo;
SeafBranch *master = NULL, *local = NULL;
local = seaf_branch_manager_get_branch (
seaf->branch_mgr, info->repo_id, "local");
if (!local) {
seaf_warning ("[sync-mgr] Branch local not found for repo %s(%.8s).\n",
repo->name, repo->id);
set_task_error (task, SYNC_ERROR_ID_LOCAL_DATA_CORRUPT);
return;
}
master = seaf_branch_manager_get_branch (
seaf->branch_mgr, info->repo_id, "master");
if (!master) {
seaf_warning ("[sync-mgr] Branch master not found for repo %s(%.8s).\n",
repo->name, repo->id);
set_task_error (task, SYNC_ERROR_ID_LOCAL_DATA_CORRUPT);
return;
}
if (info->repo_corrupted) {
set_task_error (task, SYNC_ERROR_ID_SERVER_REPO_CORRUPT);
} else if (info->deleted_on_relay) {
on_repo_deleted_on_server (task, repo);
} else {
/* If local head is the same as remote head, already in sync. */
if (strcmp (local->commit_id, info->head_commit) == 0) {
/* As long as the repo is synced with the server. All the local
* blocks are not useful any more.
*/
if (repo_block_store_exists (repo)) {
seaf_message ("Removing blocks for repo %s(%.8s).\n",
repo->name, repo->id);
seaf_job_manager_schedule_job (seaf->job_mgr,
remove_repo_blocks,
remove_blocks_done,
task);
} else
transition_sync_state (task, SYNC_STATE_DONE);
} else
start_fetch_if_necessary (task, task->info->head_commit);
}
seaf_branch_unref (local);
seaf_branch_unref (master);
}
static void
check_head_commit_done (HttpHeadCommit *result, void *user_data)
{
SyncTask *task = user_data;
SyncInfo *info = task->info;
if (!result->check_success) {
set_task_error (task, result->error_code);
return;
}
info->deleted_on_relay = result->is_deleted;
info->repo_corrupted = result->is_corrupt;
memcpy (info->head_commit, result->head_commit, 40);
update_sync_status_v2 (task);
}
static int
check_head_commit_http (SyncTask *task)
{
SeafRepo *repo = task->repo;
int ret = http_tx_manager_check_head_commit (seaf->http_tx_mgr,
repo->id, repo->version,
repo->effective_host,
repo->token,
repo->use_fileserver_port,
check_head_commit_done,
task);
if (ret == 0)
transition_sync_state (task, SYNC_STATE_INIT);
else if (ret < 0)
set_task_error (task, SYNC_ERROR_ID_NOT_ENOUGH_MEMORY);
return ret;
}
struct CommitResult {
SyncTask *task;
gboolean changed;
gboolean success;
};
static void *
commit_job (void *vtask)
{
SyncTask *task = vtask;
SeafRepo *repo = task->repo;
struct CommitResult *res = g_new0 (struct CommitResult, 1);
GError *error = NULL;
res->task = task;
if (repo->delete_pending)
return res;
res->changed = TRUE;
res->success = TRUE;
char *commit_id = seaf_repo_index_commit (repo,
task->is_manual_sync,
task->is_initial_commit,
&error);
if (commit_id == NULL && error != NULL) {
seaf_warning ("[Sync mgr] Failed to commit to repo %s(%.8s).\n",
repo->name, repo->id);
res->success = FALSE;
} else if (commit_id == NULL) {
res->changed = FALSE;
}
g_free (commit_id);
return res;
}
static char *
exceed_max_deleted_files (SeafRepo *repo);
static void
notify_delete_confirmation (const char *repo_name, const char *desc, const char *confirmation_id);
static void
commit_job_done (void *vres)
{
struct CommitResult *res = vres;
SeafRepo *repo = res->task->repo;
SyncTask *task = res->task;
res->task->mgr->commit_job_running = FALSE;
if (repo->delete_pending) {
transition_sync_state (res->task, SYNC_STATE_CANCELED);
seaf_repo_manager_del_repo (seaf->repo_mgr, repo);
g_free (res);
return;
}
if (res->task->state == SYNC_STATE_CANCEL_PENDING) {
transition_sync_state (res->task, SYNC_STATE_CANCELED);
g_free (res);
return;
}
if (!res->success) {
set_task_error (res->task, SYNC_ERROR_ID_INDEX_ERROR);
g_free (res);
return;
}
if (res->changed) {
char *desc = NULL;
desc = exceed_max_deleted_files (repo);
if (desc) {
notify_delete_confirmation (repo->name, desc, repo->head->commit_id);
seaf_warning ("Delete more than %d files, add delete confirmation.\n", seaf->delete_confirm_threshold);
task->info->del_confirmation_pending = TRUE;
set_task_error (res->task, SYNC_ERROR_ID_DEL_CONFIRMATION_PENDING);
g_free (desc);
g_free (res);
return;
}
start_upload_if_necessary (res->task);
}
else if (task->is_manual_sync || task->is_initial_commit)
check_head_commit_http (task);
else
transition_sync_state (task, SYNC_STATE_DONE);
g_free (res);
}
static int check_commit_state (void *data);
static void
commit_repo (SyncTask *task)
{
/* In order not to eat too much CPU power, only one commit job can be run
* at the same time. Other sync tasks have to check every 1 second.
*/
if (task->mgr->commit_job_running) {
task->commit_timer = seaf_timer_new (check_commit_state, task, 1000);
return;
}
task->mgr->commit_job_running = TRUE;
transition_sync_state (task, SYNC_STATE_COMMIT);
if (seaf_job_manager_schedule_job (seaf->job_mgr,
commit_job,
commit_job_done,
task) < 0)
set_task_error (task, SYNC_ERROR_ID_NOT_ENOUGH_MEMORY);
}
static int
check_commit_state (void *data)
{
SyncTask *task = data;
if (!task->mgr->commit_job_running) {
seaf_timer_free (&task->commit_timer);
commit_repo (task);
return 0;
}
return 1;
}
static SyncTask *
create_sync_task_v2 (SeafSyncManager *manager, SeafRepo *repo,
gboolean is_manual_sync, gboolean is_initial_commit)
{
SyncTask *task = g_new0 (SyncTask, 1);
SyncInfo *info;
info = get_sync_info (manager, repo->id);
task->info = info;
task->mgr = manager;
task->dest_id = g_strdup (repo->relay_id);
task->token = g_strdup(repo->token);
task->is_manual_sync = is_manual_sync;
task->is_initial_commit = is_initial_commit;
task->error = SYNC_ERROR_ID_NO_ERROR;
repo->last_sync_time = time(NULL);
++(manager->n_running_tasks);
/* Free the last task when a new task is started.
* This way we can always get the state of the last task even
* after it's done.
*/
if (task->info->current_task)
sync_task_free (task->info->current_task);
task->info->current_task = task;
task->info->in_sync = TRUE;
task->repo = repo;
if (repo->server_url) {
HttpServerState *state = g_hash_table_lookup (manager->http_server_states,
repo->server_url);
if (state) {
task->http_version = state->http_version;
}
}
return task;
}
static gboolean
create_commit_from_event_queue (SeafSyncManager *manager, SeafRepo *repo,
gboolean is_manual_sync)
{
WTStatus *status;
SyncTask *task;
gboolean ret = FALSE;
gint now = (gint)time(NULL);
gint last_changed;
status = seaf_wt_monitor_get_worktree_status (manager->seaf->wt_monitor,
repo->id);
if (status) {
last_changed = g_atomic_int_get (&status->last_changed);
if (status->last_check == 0) {
/* Force commit and sync after a new repo is added. */
task = create_sync_task_v2 (manager, repo, is_manual_sync, TRUE);
repo->create_partial_commit = TRUE;
commit_repo (task);
status->last_check = now;
ret = TRUE;
} else if (status->partial_commit) {
task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
repo->create_partial_commit = TRUE;
commit_repo (task);
ret = TRUE;
} else if (last_changed != 0 && status->last_check <= last_changed) {
/* Commit and sync if the repo has been updated after the
* last check and is not updated for the last 2 seconds.
*/
if (now - last_changed >= 2) {
task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
repo->create_partial_commit = TRUE;
commit_repo (task);
status->last_check = now;
ret = TRUE;
}
}
wt_status_unref (status);
}
return ret;
}
static gboolean
can_schedule_repo (SeafSyncManager *manager, SeafRepo *repo)
{
int now = (int)time(NULL);
return ((repo->last_sync_time == 0 ||
repo->last_sync_time < now - manager->sync_interval) &&
manager->n_running_tasks < MAX_RUNNING_SYNC_TASKS);
}
static gboolean
need_check_on_server (SeafSyncManager *manager, SeafRepo *repo, const char *master_head_id)
{
#define HEAD_COMMIT_MAP_TTL 90
HttpServerState *state;
gboolean ret = FALSE;
SyncInfo *info;
/* If sync state is in error, always retry. */
info = get_sync_info (manager, repo->id);
if (info && info->current_task && info->current_task->state == SYNC_STATE_ERROR)
return TRUE;
state = g_hash_table_lookup (manager->http_server_states, repo->server_url);
if (!state)
return TRUE;
pthread_mutex_lock (&state->head_commit_map_lock);
if (!state->head_commit_map_init) {
ret = TRUE;
goto out;
}
gint64 now = (gint64)time(NULL);
if (now - state->last_update_head_commit_map_time >= HEAD_COMMIT_MAP_TTL) {
ret = TRUE;
goto out;
}
char *server_head = g_hash_table_lookup (state->head_commit_map, repo->id);
if (!server_head) {
/* Repo was removed on server. Just return "changed on server". */
ret = TRUE;
goto out;
}
if (g_strcmp0 (server_head, master_head_id) != 0)
ret = TRUE;
out:
pthread_mutex_unlock (&state->head_commit_map_lock);
return ret;
}
#define MAX_DELETED_FILES_NUM 100
inline static char *
get_basename (char *path)
{
char *slash;
slash = strrchr (path, '/');
if (!slash)
return path;
return (slash + 1);
}
static char *
exceed_max_deleted_files (SeafRepo *repo)
{
SeafBranch *master = NULL, *local = NULL;
SeafCommit *local_head = NULL, *master_head = NULL;
GList *diff_results = NULL;
char *deleted_file = NULL;
GString *desc = NULL;
char *ret = NULL;
local = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "local");
if (!local) {
seaf_warning ("No local branch found for repo %s(%.8s).\n",
repo->name, repo->id);
goto out;
}
master = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "master");
if (!master) {
seaf_warning ("No master branch found for repo %s(%.8s).\n",
repo->name, repo->id);
goto out;
}
local_head = seaf_commit_manager_get_commit (seaf->commit_mgr, repo->id, repo->version,
local->commit_id);
if (!local_head) {
seaf_warning ("Failed to get head of local branch for repo %s.\n", repo->id);
goto out;
}
master_head = seaf_commit_manager_get_commit (seaf->commit_mgr, repo->id, repo->version,
master->commit_id);
if (!master_head) {
seaf_warning ("Failed to get head of master branch for repo %s.\n", repo->id);
goto out;
}
diff_commit_roots (repo->id, repo->version, master_head->root_id, local_head->root_id, &diff_results, FALSE);
if (!diff_results) {
goto out;
}
GList *p;
DiffEntry *de;
int n_deleted = 0;
for (p = diff_results; p != NULL; p = p->next) {
de = p->data;
switch (de->status) {
case DIFF_STATUS_DELETED:
if (n_deleted == 0)
deleted_file = get_basename(de->name);
n_deleted++;
break;
}
}
if (n_deleted >= seaf->delete_confirm_threshold) {
desc = g_string_new ("");
g_string_append_printf (desc, "Deleted \"%s\" and %d more files.\n",
deleted_file, n_deleted - 1);
ret = g_string_free (desc, FALSE);
}
out:
seaf_branch_unref (local);
seaf_branch_unref (master);
seaf_commit_unref (local_head);
seaf_commit_unref (master_head);
g_list_free_full (diff_results, (GDestroyNotify)diff_entry_free);
return ret;
}
static void
notify_delete_confirmation (const char *repo_name, const char *desc, const char *confirmation_id)
{
json_t *obj = json_object ();
json_object_set_string_member (obj, "repo_name", repo_name);
json_object_set_string_member (obj, "delete_files", desc);
json_object_set_string_member (obj, "confirmation_id", confirmation_id);
char *msg = json_dumps (obj, JSON_COMPACT);
seaf_mq_manager_publish_notification (seaf->mq_mgr, "sync.del_confirmation", msg);
json_decref (obj);
g_free (msg);
}
int
seaf_sync_manager_add_del_confirmation (SeafSyncManager *mgr,
const char *confirmation_id,
gboolean resync)
{
SeafSyncManagerPriv *priv = seaf->sync_mgr->priv;
DelConfirmationResult *result = NULL;
result = g_new0 (DelConfirmationResult, 1);
result->resync = resync;
pthread_mutex_lock (&priv->del_confirmation_lock);
g_hash_table_insert (priv->del_confirmation_tasks, g_strdup (confirmation_id), result);
pthread_mutex_unlock (&priv->del_confirmation_lock);
return 0;
}
static DelConfirmationResult *
get_del_confirmation_result (const char *confirmation_id)
{
SeafSyncManagerPriv *priv = seaf->sync_mgr->priv;
DelConfirmationResult *result, *copy = NULL;
pthread_mutex_lock (&priv->del_confirmation_lock);
result = g_hash_table_lookup (priv->del_confirmation_tasks, confirmation_id);
if (result) {
copy = g_new0 (DelConfirmationResult, 1);
copy->resync = result->resync;
g_hash_table_remove (priv->del_confirmation_tasks, confirmation_id);
}
pthread_mutex_unlock (&priv->del_confirmation_lock);
return copy;
}
static void
resync_repo (SeafRepo *repo)
{
GError *error = NULL;
char *repo_id = g_strdup (repo->id);
int repo_version = repo->version;
char *repo_name = g_strdup (repo->name);
char *token = g_strdup (repo->token);
char *magic = g_strdup (repo->magic);
int enc_version = repo->enc_version;
char *random_key = g_strdup (repo->random_key);
char *worktree = g_strdup (repo->worktree);
char *email = g_strdup (repo->email);
char *more_info = NULL;
gboolean is_encrypted = FALSE;
char key[65], iv[33];
json_t *obj = json_object ();
json_object_set_int_member (obj, "is_readonly", repo->is_readonly);
json_object_set_string_member (obj, "repo_salt", repo->salt);
json_object_set_string_member (obj, "server_url", repo->server_url);
if (repo->username)
json_object_set_string_member (obj, "username", repo->username);
if (repo->encrypted) {
is_encrypted = TRUE;
if (repo->enc_version == 1) {
rawdata_to_hex (repo->enc_key, key, 16);
rawdata_to_hex (repo->enc_iv, iv, 16);
} else if (repo->enc_version >= 2){
rawdata_to_hex (repo->enc_key, key, 32);
rawdata_to_hex (repo->enc_iv, iv, 16);
}
json_object_set_int_member (obj, "resync_enc_repo", TRUE);
}
more_info = json_dumps (obj, 0);
if (repo->auto_sync && (repo->sync_interval == 0))
seaf_wt_monitor_unwatch_repo (seaf->wt_monitor, repo->id);
seaf_repo_manager_del_repo (seaf->repo_mgr, repo);
if (is_encrypted) {
seaf_repo_manager_save_repo_enc_info (seaf->repo_mgr, repo_id, key, iv);
}
char *ret = seaf_clone_manager_add_task (seaf->clone_mgr, repo_id,
repo_version, repo_name,
token, NULL,
magic, enc_version,
random_key, worktree,
email, more_info, &error);
if (error) {
seaf_warning ("Failed to clone repo %s: %s\n", repo_id, error->message);
g_clear_error (&error);
}
g_free (ret);
g_free (repo_id);
g_free (repo_name);
g_free (token);
g_free (magic);
g_free (random_key);
g_free (worktree);
g_free (email);
json_decref (obj);
g_free (more_info);
}
static int
sync_repo_v2 (SeafSyncManager *manager, SeafRepo *repo, gboolean is_manual_sync)
{
SeafBranch *master, *local;
SyncTask *task;
int ret = 0;
char *last_download = NULL;
SyncInfo *info = NULL;
master = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "master");
if (!master) {
seaf_warning ("No master branch found for repo %s(%.8s).\n",
repo->name, repo->id);
return -1;
}
local = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "local");
if (!local) {
seaf_warning ("No local branch found for repo %s(%.8s).\n",
repo->name, repo->id);
return -1;
}
/* If last download was interrupted in the fetch and download stage,
* need to resume it at exactly the same remote commit.
*/
last_download = seaf_repo_manager_get_repo_property (seaf->repo_mgr,
repo->id,
REPO_PROP_DOWNLOAD_HEAD);
if (last_download && strcmp (last_download, EMPTY_SHA1) != 0) {
if (is_manual_sync || can_schedule_repo (manager, repo)) {
task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
start_fetch_if_necessary (task, last_download);
}
goto out;
}
info = get_sync_info (manager, repo->id);
if (strcmp (master->commit_id, local->commit_id) != 0) {
if (is_manual_sync || can_schedule_repo (manager, repo) || info->del_confirmation_pending) {
task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
if (!task->info->del_confirmation_pending) {
char *desc = NULL;
desc = exceed_max_deleted_files (repo);
if (desc) {
notify_delete_confirmation (repo->name, desc, local->commit_id);
seaf_warning ("Delete more than %d files, add delete confirmation.\n", seaf->delete_confirm_threshold);
task->info->del_confirmation_pending = TRUE;
set_task_error (task, SYNC_ERROR_ID_DEL_CONFIRMATION_PENDING);
g_free (desc);
goto out;
}
} else {
DelConfirmationResult *result = get_del_confirmation_result (local->commit_id);
if (!result) {
// User has not confirmed whether to continue syncing.
set_task_error (task, SYNC_ERROR_ID_DEL_CONFIRMATION_PENDING);
goto out;
} else if (result->resync) {
// User chooses to resync.
g_free (result);
task->info->del_confirmation_pending = FALSE;
set_task_error (task, SYNC_ERROR_ID_DEL_CONFIRMATION_PENDING);
// Delete this repo and resync this repo by adding clone task.
resync_repo (repo);
ret = -1;
goto out;
}
// User chooes to continue syncing.
g_free (result);
task->info->del_confirmation_pending = FALSE;
}
start_upload_if_necessary (task);
}
/* Do nothing if the client still has something to upload
* but it's before 30-second schedule.
*/
goto out;
} else if (is_manual_sync) {
task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
commit_repo (task);
goto out;
} else if (create_commit_from_event_queue (manager, repo, is_manual_sync))
goto out;
if (is_manual_sync || can_schedule_repo (manager, repo)) {
/* If file syncing protocol version is higher than 2, we check for all head commit ids
* for synced repos regularly.
*/
if (!is_manual_sync && !need_check_on_server (manager, repo, master->commit_id)) {
seaf_debug ("Repo %s is not changed on server %s.\n", repo->name, repo->server_url);
repo->last_sync_time = time(NULL);
goto out;
}
task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
check_head_commit_http (task);
}
out:
g_free (last_download);
seaf_branch_unref (master);
seaf_branch_unref (local);
return ret;
}
void
seaf_sync_manager_update_repo (SeafSyncManager *manager, SeafRepo *repo,
const char *head_commit)
{
HttpServerState *state;
SeafBranch *master = NULL;
state = g_hash_table_lookup (manager->http_server_states, repo->server_url);
if (!state) {
return;
}
master = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "master");
if (!master) {
return;
}
if (g_strcmp0(head_commit, master->commit_id) != 0) {
pthread_mutex_lock (&state->head_commit_map_lock);
g_hash_table_replace (state->head_commit_map, g_strdup (repo->id), g_strdup (head_commit));
pthread_mutex_unlock (&state->head_commit_map_lock);
// Set last_sync_time to 0 to allow the repo to be sync immediately.
// Otherwise it only gets synced after 30 seconds since the last sync.
repo->last_sync_time = 0;
}
seaf_branch_unref (master);
return;
}
void
seaf_sync_manager_check_locks_and_folder_perms (SeafSyncManager *manager, const char *server_url)
{
HttpServerState *state;
state = g_hash_table_lookup (manager->http_server_states, server_url);
if (!state) {
return;
}
if (!seaf_repo_manager_server_is_pro (seaf->repo_mgr, server_url))
return;
state->immediate_check_folder_perms = TRUE;
state->immediate_check_locked_files = TRUE;
return;
}
static void
auto_delete_repo (SeafSyncManager *manager, SeafRepo *repo)
{
SyncInfo *info = seaf_sync_manager_get_sync_info (manager, repo->id);
char *name = g_strdup (repo->name);
seaf_message ("Auto deleted repo '%s'.\n", repo->name);
seaf_sync_manager_cancel_sync_task (seaf->sync_mgr, repo->id);
if (info != NULL && info->in_sync) {
seaf_repo_manager_mark_repo_deleted (seaf->repo_mgr, repo);
} else {
seaf_repo_manager_del_repo (seaf->repo_mgr, repo);
}
/* Publish a message, for applet to notify in the system tray */
seaf_mq_manager_publish_notification (seaf->mq_mgr,
"repo.removed",
name);
g_free (name);
}
static char *
http_fileserver_url (const char *url)
{
const char *host;
char *colon;
char *url_no_port;
char *ret = NULL;
/* Just return the url itself if it's invalid. */
if (strlen(url) <= strlen("http://"))
return g_strdup(url);
/* Skip protocol schem. */
host = url + strlen("http://");
colon = strrchr (host, ':');
if (colon) {
url_no_port = g_strndup(url, colon - url);
ret = g_strconcat(url_no_port, ":8082", NULL);
g_free (url_no_port);
} else {
ret = g_strconcat(url, ":8082", NULL);
}
return ret;
}
static void
check_http_fileserver_protocol_done (HttpProtocolVersion *result, void *user_data)
{
HttpServerState *state = user_data;
state->checking = FALSE;
if (result->check_success && !result->not_supported) {
state->http_version = MIN(result->version, CURRENT_SYNC_PROTO_VERSION);
state->effective_host = http_fileserver_url(state->testing_host);
state->use_fileserver_port = TRUE;
seaf_message ("File syncing protocol version on server %s is %d. "
"Client file syncing protocol version is %d. Use version %d.\n",
state->effective_host, result->version, CURRENT_SYNC_PROTO_VERSION,
state->http_version);
}
}
static void
check_http_protocol_done (HttpProtocolVersion *result, void *user_data)
{
HttpServerState *state = user_data;
if (result->check_success && !result->not_supported) {
state->http_version = MIN(result->version, CURRENT_SYNC_PROTO_VERSION);
state->effective_host = g_strdup(state->testing_host);
state->checking = FALSE;
seaf_message ("File syncing protocol version on server %s is %d. "
"Client file syncing protocol version is %d. Use version %d.\n",
state->effective_host, result->version, CURRENT_SYNC_PROTO_VERSION,
state->http_version);
} else if (strncmp(state->testing_host, "https", 5) != 0) {
char *host_fileserver = http_fileserver_url(state->testing_host);
if (http_tx_manager_check_protocol_version (seaf->http_tx_mgr,
host_fileserver,
TRUE,
check_http_fileserver_protocol_done,
state) < 0)
state->checking = FALSE;
g_free (host_fileserver);
} else {
state->checking = FALSE;
}
}
#define CHECK_HTTP_INTERVAL 10
/*
* Returns TRUE if we're ready to use http-sync; otherwise FALSE.
*/
static gboolean
check_http_protocol (SeafSyncManager *mgr, SeafRepo *repo)
{
/* If a repo was cloned before 4.0, server-url is not set. */
if (!repo->server_url)
return FALSE;
HttpServerState *state = g_hash_table_lookup (mgr->http_server_states,
repo->server_url);
if (!state) {
state = http_server_state_new ();
g_hash_table_insert (mgr->http_server_states,
g_strdup(repo->server_url), state);
}
if (state->checking) {
return FALSE;
}
if (state->http_version > 0) {
if (!repo->effective_host) {
repo->effective_host = g_strdup(state->effective_host);
repo->use_fileserver_port = state->use_fileserver_port;
}
return TRUE;
}
/* If we haven't detected the server url successfully, retry every 10 seconds. */
gint64 now = time(NULL);
if (now - state->last_http_check_time < CHECK_HTTP_INTERVAL)
return FALSE;
/* First try repo->server_url.
* If it fails and https is not used, try server_url:8082 instead.
*/
g_free (state->testing_host);
state->testing_host = g_strdup(repo->server_url);
state->last_http_check_time = (gint64)time(NULL);
if (http_tx_manager_check_protocol_version (seaf->http_tx_mgr,
repo->server_url,
FALSE,
check_http_protocol_done,
state) < 0)
return FALSE;
state->checking = TRUE;
return FALSE;
}
static void
check_notif_server_done (gboolean is_alive, void *user_data)
{
HttpServerState *state = user_data;
if (is_alive) {
state->notif_server_alive = TRUE;
seaf_message ("Notification server is enabled on the remote server %s.\n", state->effective_host);
}
}
static char *
http_notification_url (const char *url)
{
const char *host;
char *colon;
char *url_no_port;
char *ret = NULL;
/* Just return the url itself if it's invalid. */
if (strlen(url) <= strlen("http://"))
return g_strdup(url);
/* Skip protocol schem. */
host = url + strlen("http://");
colon = strrchr (host, ':');
if (colon) {
url_no_port = g_strndup(url, colon - url);
ret = g_strconcat(url_no_port, ":8083", NULL);
g_free (url_no_port);
} else {
ret = g_strconcat(url, ":8083", NULL);
}
return ret;
}
#if defined WIN32 || defined __APPLE__ || defined COMPILE_LINUX_WS
// Returns TRUE if notification server is alive; otherwise FALSE.
// We only check notification server once.
static gboolean
check_notif_server (SeafSyncManager *mgr, SeafRepo *repo)
{
if (!repo->server_url)
return FALSE;
HttpServerState *state = g_hash_table_lookup (mgr->http_server_states,
repo->server_url);
if (!state) {
return FALSE;
}
if (state->notif_server_alive) {
return TRUE;
}
if (state->notif_server_checked) {
return FALSE;
}
char *notif_url = NULL;
if (state->use_fileserver_port) {
notif_url = http_notification_url (repo->server_url);
} else {
notif_url = g_strdup (repo->server_url);
}
if (http_tx_manager_check_notif_server (seaf->http_tx_mgr,
notif_url,
state->use_fileserver_port,
check_notif_server_done,
state) < 0) {
g_free (notif_url);
return FALSE;
}
state->notif_server_checked = TRUE;
g_free (notif_url);
return FALSE;
}
#endif
gint
cmp_repos_by_sync_time (gconstpointer a, gconstpointer b, gpointer user_data)
{
const SeafRepo *repo_a = a;
const SeafRepo *repo_b = b;
return (repo_a->last_sync_time - repo_b->last_sync_time);
}
#if defined WIN32 || defined __APPLE__
static void
cleanup_file_blocks (const char *repo_id, int version, const char *file_id)
{
Seafile *file;
int i;
file = seaf_fs_manager_get_seafile (seaf->fs_mgr,
repo_id, version,
file_id);
for (i = 0; i < file->n_blocks; ++i)
seaf_block_manager_remove_block (seaf->block_mgr,
repo_id, version,
file->blk_sha1s[i]);
seafile_unref (file);
}
static gboolean
handle_locked_file_update (SeafRepo *repo, struct index_state *istate,
LockedFileSet *fset, const char *path, LockedFile *locked,
CheckoutBlockAux *aux)
{
gboolean locked_on_server = FALSE;
struct cache_entry *ce;
char file_id[41];
char *fullpath = NULL;
SeafStat st;
gboolean file_exists = TRUE;
SeafileCrypt *crypt = NULL;
SeafBranch *master = NULL;
gboolean ret = TRUE;
locked_on_server = seaf_filelock_manager_is_file_locked (seaf->filelock_mgr,
repo->id,
path);
/* File is still locked, do nothing. */
if (do_check_file_locked (path, repo->worktree, locked_on_server))
return FALSE;
seaf_debug ("Update previously locked file %s in repo %.8s.\n",
path, repo->id);
/* If the file was locked on the last checkout, the worktree file was not
* updated, but the index has been updated. So the ce in the index should
* contain the information for the file to be updated.
*/
ce = index_name_exists (istate, path, strlen(path), 0);
if (!ce) {
seaf_warning ("Cache entry for %s in repo %s(%.8s) is not found "
"when update locked file.",
path, repo->name, repo->id);
goto remove_from_db;
}
rawdata_to_hex (ce->sha1, file_id, 20);
fullpath = g_build_filename (repo->worktree, path, NULL);
file_exists = seaf_util_exists (fullpath);
if (file_exists && seaf_stat (fullpath, &st) < 0) {
seaf_warning ("Failed to stat %s: %s.\n", fullpath, strerror(errno));
goto out;
}
if (repo->encrypted)
crypt = seafile_crypt_new (repo->enc_version,
repo->enc_key,
repo->enc_iv);
master = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "master");
if (!master) {
seaf_warning ("No master branch found for repo %s(%.8s).\n",
repo->name, repo->id);
goto out;
}
gboolean conflicted;
gboolean force_conflict = (file_exists && st.st_mtime != locked->old_mtime);
char *username = repo->username;
if (!username) {
username = repo->email;
}
if (seaf_repo_manager_checkout_file (repo,
file_id, fullpath,
ce->ce_mode, ce->ce_mtime.sec,
crypt,
path,
master->commit_id,
force_conflict,
&conflicted,
username,
aux) < 0) {
seaf_warning ("Failed to checkout previously locked file %s in repo "
"%s(%.8s).\n",
path, repo->name, repo->id);
}
seaf_sync_manager_update_active_path (seaf->sync_mgr,
repo->id,
path,
S_IFREG,
SYNC_STATUS_SYNCED,
TRUE);
/* In checkout, the file was overwritten by rename, so the file attributes
are gone. We have to set read-only state again.
*/
if (locked_on_server)
seaf_filelock_manager_lock_wt_file (seaf->filelock_mgr,
repo->id,
path);
out:
cleanup_file_blocks (repo->id, repo->version, file_id);
remove_from_db:
/* Remove the locked file record from db. */
locked_file_set_remove (fset, path, TRUE);
g_free (fullpath);
g_free (crypt);
seaf_branch_unref (master);
return ret;
}
static gboolean
handle_locked_file_delete (SeafRepo *repo, struct index_state *istate,
LockedFileSet *fset, const char *path, LockedFile *locked)
{
gboolean locked_on_server = FALSE;
char *fullpath = NULL;
SeafStat st;
gboolean file_exists = TRUE;
gboolean ret = TRUE;
locked_on_server = seaf_filelock_manager_is_file_locked (seaf->filelock_mgr,
repo->id,
path);
/* File is still locked, do nothing. */
if (do_check_file_locked (path, repo->worktree, locked_on_server))
return FALSE;
seaf_debug ("Delete previously locked file %s in repo %.8s.\n",
path, repo->id);
fullpath = g_build_filename (repo->worktree, path, NULL);
file_exists = seaf_util_exists (fullpath);
if (file_exists && seaf_stat (fullpath, &st) < 0) {
seaf_warning ("Failed to stat %s: %s.\n", fullpath, strerror(errno));
goto out;
}
if (file_exists && st.st_mtime == locked->old_mtime)
seaf_util_unlink (fullpath);
out:
/* Remove the locked file record from db. */
locked_file_set_remove (fset, path, TRUE);
g_free (fullpath);
return ret;
}
static void *
check_locked_files (void *vdata)
{
SeafRepo *repo = vdata;
LockedFileSet *fset;
GHashTableIter iter;
gpointer key, value;
char *path;
LockedFile *locked;
char index_path[SEAF_PATH_MAX];
struct index_state istate;
fset = seaf_repo_manager_get_locked_file_set (seaf->repo_mgr, repo->id);
if (g_hash_table_size (fset->locked_files) == 0) {
locked_file_set_free (fset);
return vdata;
}
memset (&istate, 0, sizeof(istate));
snprintf (index_path, SEAF_PATH_MAX, "%s/%s", repo->manager->index_dir, repo->id);
if (read_index_from (&istate, index_path, repo->version) < 0) {
seaf_warning ("Failed to load index.\n");
return vdata;
}
CheckoutBlockAux *aux = g_new0 (CheckoutBlockAux, 1);
aux->repo_id = g_strdup (repo->id);
aux->host = g_strdup (repo->effective_host);
aux->token = g_strdup (repo->token);
aux->use_fileserver_port = repo->use_fileserver_port;
gboolean success;
g_hash_table_iter_init (&iter, fset->locked_files);
while (g_hash_table_iter_next (&iter, &key, &value)) {
path = key;
locked = value;
success = FALSE;
if (strcmp (locked->operation, LOCKED_OP_UPDATE) == 0)
success = handle_locked_file_update (repo, &istate, fset, path, locked, aux);
else if (strcmp (locked->operation, LOCKED_OP_DELETE) == 0)
success = handle_locked_file_delete (repo, &istate, fset, path, locked);
if (success)
g_hash_table_iter_remove (&iter);
}
free_checkout_block_aux (aux);
discard_index (&istate);
locked_file_set_free (fset);
return vdata;
}
static void
check_locked_files_done (void *vdata)
{
SeafRepo *repo = vdata;
repo->checking_locked_files = FALSE;
}
#endif
static void
check_folder_perms_done (HttpFolderPerms *result, void *user_data)
{
HttpServerState *server_state = user_data;
GList *ptr;
HttpFolderPermRes *res;
gint64 now = (gint64)time(NULL);
server_state->checking_folder_perms = FALSE;
if (!result->success) {
/* If on star-up we find that checking folder perms fails,
* we assume the server doesn't support it.
*/
if (server_state->last_check_perms_time == 0)
server_state->folder_perms_not_supported = TRUE;
server_state->last_check_perms_time = now;
return;
}
SyncInfo *info;
for (ptr = result->results; ptr; ptr = ptr->next) {
res = ptr->data;
info = get_sync_info (seaf->sync_mgr, res->repo_id);
if (info->in_sync)
continue;
seaf_repo_manager_update_folder_perms (seaf->repo_mgr, res->repo_id,
FOLDER_PERM_TYPE_USER,
res->user_perms);
seaf_repo_manager_update_folder_perms (seaf->repo_mgr, res->repo_id,
FOLDER_PERM_TYPE_GROUP,
res->group_perms);
seaf_repo_manager_update_folder_perm_timestamp (seaf->repo_mgr,
res->repo_id,
res->timestamp);
}
server_state->last_check_perms_time = now;
}
static void
check_folder_permissions_one_server_immediately (SeafSyncManager *mgr,
const char *host,
HttpServerState *server_state,
GList *repos,
gboolean force)
{
GList *ptr;
SeafRepo *repo;
char *token;
gint64 timestamp;
HttpFolderPermReq *req;
GList *requests = NULL;
for (ptr = repos; ptr; ptr = ptr->next) {
repo = ptr->data;
#if defined WIN32 || defined __APPLE__ || defined COMPILE_LINUX_WS
if (!force && seaf_notif_manager_is_repo_subscribed (seaf->notif_mgr, repo))
continue;
#endif
if (!repo->head)
continue;
if (g_strcmp0 (host, repo->server_url) != 0)
continue;
token = seaf_repo_manager_get_repo_property (seaf->repo_mgr,
repo->id, REPO_PROP_TOKEN);
if (!token)
continue;
timestamp = seaf_repo_manager_get_folder_perm_timestamp (seaf->repo_mgr,
repo->id);
if (timestamp < 0)
timestamp = 0;
req = g_new0 (HttpFolderPermReq, 1);
memcpy (req->repo_id, repo->id, 36);
req->token = g_strdup(token);
req->timestamp = timestamp;
requests = g_list_append (requests, req);
g_free (token);
}
if (!requests)
return;
server_state->checking_folder_perms = TRUE;
/* The requests list will be freed in http tx manager. */
if (http_tx_manager_get_folder_perms (seaf->http_tx_mgr,
server_state->effective_host,
server_state->use_fileserver_port,
requests,
check_folder_perms_done,
server_state) < 0) {
seaf_warning ("Failed to schedule check folder permissions\n");
server_state->checking_folder_perms = FALSE;
}
}
static void
check_folder_permissions_one_server (SeafSyncManager *mgr,
const char *host,
HttpServerState *server_state,
GList *repos)
{
if (!seaf_repo_manager_server_is_pro (seaf->repo_mgr, host))
return;
gint64 now = (gint64)time(NULL);
if (server_state->http_version == 0 ||
server_state->folder_perms_not_supported ||
server_state->checking_folder_perms)
return;
if (server_state->immediate_check_folder_perms) {
server_state->immediate_check_folder_perms = FALSE;
check_folder_permissions_one_server_immediately (mgr, host, server_state, repos, TRUE);
return;
}
if (server_state->last_check_perms_time > 0 &&
now - server_state->last_check_perms_time < CHECK_FOLDER_PERMS_INTERVAL)
return;
check_folder_permissions_one_server_immediately (mgr, host, server_state, repos, FALSE);
}
static void
check_folder_permissions (SeafSyncManager *mgr, GList *repos)
{
GHashTableIter iter;
gpointer key, value;
char *host;
HttpServerState *state;
g_hash_table_iter_init (&iter, mgr->http_server_states);
while (g_hash_table_iter_next (&iter, &key, &value)) {
host = key;
state = value;
check_folder_permissions_one_server (mgr, host, state, repos);
}
}
static void
check_server_locked_files_done (HttpLockedFiles *result, void *user_data)
{
HttpServerState *server_state = user_data;
GList *ptr;
HttpLockedFilesRes *locked_res;
gint64 now = (gint64)time(NULL);
server_state->checking_locked_files = FALSE;
if (!result->success) {
/* If on star-up we find that checking locked files fails,
* we assume the server doesn't support it.
*/
if (server_state->last_check_locked_files_time == 0)
server_state->locked_files_not_supported = TRUE;
server_state->last_check_locked_files_time = now;
return;
}
SyncInfo *info;
for (ptr = result->results; ptr; ptr = ptr->next) {
locked_res = ptr->data;
info = get_sync_info (seaf->sync_mgr, locked_res->repo_id);
if (info->in_sync)
continue;
seaf_filelock_manager_update (seaf->filelock_mgr,
locked_res->repo_id,
locked_res->locked_files);
seaf_filelock_manager_update_timestamp (seaf->filelock_mgr,
locked_res->repo_id,
locked_res->timestamp);
}
server_state->last_check_locked_files_time = now;
}
static void
check_locked_files_one_server_immediately (SeafSyncManager *mgr,
const char *host,
HttpServerState *server_state,
GList *repos,
gboolean force)
{
GList *ptr;
SeafRepo *repo;
char *token;
gint64 timestamp;
HttpLockedFilesReq *req;
GList *requests = NULL;
for (ptr = repos; ptr; ptr = ptr->next) {
repo = ptr->data;
#if defined WIN32 || defined __APPLE__ || defined COMPILE_LINUX_WS
if (!force && seaf_notif_manager_is_repo_subscribed (seaf->notif_mgr, repo))
continue;
#endif
if (!repo->head)
continue;
if (g_strcmp0 (host, repo->server_url) != 0)
continue;
token = seaf_repo_manager_get_repo_property (seaf->repo_mgr,
repo->id, REPO_PROP_TOKEN);
if (!token)
continue;
timestamp = seaf_filelock_manager_get_timestamp (seaf->filelock_mgr,
repo->id);
if (timestamp < 0)
timestamp = 0;
req = g_new0 (HttpLockedFilesReq, 1);
memcpy (req->repo_id, repo->id, 36);
req->token = g_strdup(token);
req->timestamp = timestamp;
requests = g_list_append (requests, req);
g_free (token);
}
if (!requests)
return;
server_state->checking_locked_files = TRUE;
/* The requests list will be freed in http tx manager. */
if (http_tx_manager_get_locked_files (seaf->http_tx_mgr,
server_state->effective_host,
server_state->use_fileserver_port,
requests,
check_server_locked_files_done,
server_state) < 0) {
seaf_warning ("Failed to schedule check server locked files\n");
server_state->checking_locked_files = FALSE;
}
}
static void
check_locked_files_one_server (SeafSyncManager *mgr,
const char *host,
HttpServerState *server_state,
GList *repos)
{
if (!seaf_repo_manager_server_is_pro (seaf->repo_mgr, host))
return;
gint64 now = (gint64)time(NULL);
if (server_state->http_version == 0 ||
server_state->locked_files_not_supported ||
server_state->checking_locked_files)
return;
if (server_state->immediate_check_locked_files) {
server_state->immediate_check_locked_files = FALSE;
check_locked_files_one_server_immediately (mgr, host, server_state, repos, TRUE);
return;
}
if (server_state->last_check_locked_files_time > 0 &&
now - server_state->last_check_locked_files_time < CHECK_FOLDER_PERMS_INTERVAL)
return;
check_locked_files_one_server_immediately (mgr, host, server_state, repos, FALSE);
}
static void
check_server_locked_files (SeafSyncManager *mgr, GList *repos)
{
GHashTableIter iter;
gpointer key, value;
char *host;
HttpServerState *state;
g_hash_table_iter_init (&iter, mgr->http_server_states);
while (g_hash_table_iter_next (&iter, &key, &value)) {
host = key;
state = value;
check_locked_files_one_server (mgr, host, state, repos);
}
}
#if 0
static void
print_active_paths (SeafSyncManager *mgr)
{
int n = seaf_sync_manager_active_paths_number(mgr);
seaf_message ("%d active paths\n\n", n);
if (n < 10) {
char *paths_json = seaf_sync_manager_list_active_paths_json (mgr);
seaf_message ("%s\n", paths_json);
g_free (paths_json);
}
}
#endif
static char *
parse_jwt_token (const char *rsp_content, gint64 rsp_size)
{
json_t *object = NULL;
json_error_t jerror;
const char *member = NULL;
char *jwt_token = NULL;
object = json_loadb (rsp_content, rsp_size, 0, &jerror);
if (!object) {
return NULL;
}
if (json_object_has_member (object, "jwt_token")) {
member = json_object_get_string_member (object, "jwt_token");
if (member)
jwt_token = g_strdup (member);
} else {
json_decref (object);
return NULL;
}
json_decref (object);
return jwt_token;
}
#define HTTP_FORBIDDEN 403
#define HTTP_NOT_FOUND 404
#define HTTP_SERVERR 500
#define HTTP_SERVERR_BAD_GATEWAY 502
#define HTTP_SERVERR_UNAVAILABLE 503
#define HTTP_SERVERR_TIMEOUT 504
typedef struct _GetJwtTokenAux {
HttpServerState *state;
char *repo_id;
} GetJwtTokenAux;
static void
fileserver_get_jwt_token_cb (HttpAPIGetResult *result, void *user_data)
{
GetJwtTokenAux *aux = user_data;
HttpServerState *state = aux->state;
char *repo_id = aux->repo_id;
SeafRepo *repo = NULL;
char *jwt_token = NULL;
state->n_jwt_token_request--;
if (result->http_status == HTTP_NOT_FOUND ||
result->http_status == HTTP_FORBIDDEN ||
result->http_status == HTTP_SERVERR) {
goto out;
}
if (!result->success) {
goto out;
}
repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id);
if (!repo)
goto out;
jwt_token = parse_jwt_token (result->rsp_content,result->rsp_size);
if (!jwt_token) {
seaf_warning ("Failed to parse jwt token for repo %s\n", repo->id);
goto out;
}
g_free (repo->jwt_token);
repo->jwt_token = jwt_token;
out:
g_free (aux->repo_id);
g_free (aux);
return;
}
inline static gboolean
periodic_sync_due (SeafRepo *repo)
{
int now = (int)time(NULL);
return (now > (repo->last_sync_time + repo->sync_interval));
}
#if defined WIN32 || defined __APPLE__ || defined COMPILE_LINUX_WS
static int
check_and_subscribe_repo (SeafSyncManager *mgr, SeafRepo *repo)
{
char *url = NULL;
HttpServerState *state = g_hash_table_lookup (mgr->http_server_states,
repo->server_url);
if (!state || !state->notif_server_alive) {
return 0;
}
if (state->n_jwt_token_request > 10) {
return 0;
}
gint64 now = (gint64)time(NULL);
if (now - repo->last_check_jwt_token > JWT_TOKEN_EXPIRE_TIME) {
repo->last_check_jwt_token = now;
if (!repo->use_fileserver_port)
url = g_strdup_printf ("%s/seafhttp/repo/%s/jwt-token", repo->effective_host, repo->id);
else
url = g_strdup_printf ("%s/repo/%s/jwt-token", repo->effective_host, repo->id);
state->n_jwt_token_request++;
GetJwtTokenAux *aux = g_new0 (GetJwtTokenAux, 1);
aux->repo_id = g_strdup (repo->id);
aux->state = state;
if (http_tx_manager_fileserver_api_get (seaf->http_tx_mgr,
repo->effective_host,
url,
repo->token,
fileserver_get_jwt_token_cb,
aux) < 0) {
g_free (aux->repo_id);
g_free (aux);
state->n_jwt_token_request--;
}
g_free (url);
return 0;
}
if (!seaf_notif_manager_is_repo_subscribed (seaf->notif_mgr, repo)) {
if (repo->jwt_token && repo->server_url)
seaf_notif_manager_subscribe_repo (seaf->notif_mgr, repo);
}
return 0;
}
#endif
static int
auto_sync_pulse (void *vmanager)
{
SeafSyncManager *manager = vmanager;
GList *repos, *ptr;
SeafRepo *repo;
repos = seaf_repo_manager_get_repo_list (manager->seaf->repo_mgr, -1, -1);
check_folder_permissions (manager, repos);
check_server_locked_files (manager, repos);
/* Sort repos by last_sync_time, so that we don't "starve" any repo. */
repos = g_list_sort_with_data (repos, cmp_repos_by_sync_time, NULL);
for (ptr = repos; ptr != NULL; ptr = ptr->next) {
repo = ptr->data;
if (!manager->priv->auto_sync_enabled || !repo->auto_sync)
continue;
/* Every second, we'll check the worktree to see if it still exists.
* We'll invalidate worktree if it gets moved or deleted.
* But there is a hole here: If the user delete the worktree dir and
* recreate a dir with the same name within a second, we'll falsely
* see the worktree as valid. What's worse, the new worktree dir won't
* be monitored.
* This problem can only be solved by restart.
*/
/* If repo has been checked out and the worktree doesn't exist,
* we'll delete the repo automatically.
*/
if (repo->head != NULL) {
if (seaf_repo_check_worktree (repo) < 0) {
if (!repo->worktree_invalid) {
// The repo worktree was valid, but now it's invalid
seaf_repo_manager_invalidate_repo_worktree (seaf->repo_mgr, repo);
if (!seafile_session_config_get_allow_invalid_worktree(seaf)) {
auto_delete_repo (manager, repo);
}
}
continue;
} else {
if (repo->worktree_invalid) {
// The repo worktree was invalid, but now it's valid again,
// so we start watch it
seaf_repo_manager_validate_repo_worktree (seaf->repo_mgr, repo);
continue;
}
}
}
repo->worktree_invalid = FALSE;
#ifdef USE_GPL_CRYPTO
if (repo->version == 0 || (repo->encrypted && repo->enc_version < 2)) {
continue;
}
#endif
if (!repo->token) {
/* If the user has logged out of the account, the repo token would
* be null */
seaf_debug ("repo token of %s (%.8s) is null, would not sync it\n", repo->name, repo->id);
continue;
}
/* Don't sync repos not checked out yet. */
if (!repo->head)
continue;
gint64 now = (gint64)time(NULL);
#if defined WIN32 || defined __APPLE__
if (repo->version > 0) {
if (repo->checking_locked_files)
continue;
// Since delayed file updates requires blocks from the server, we need to check the protocol information before updating.
if (check_http_protocol (manager, repo)) {
if (repo->last_check_locked_time == 0 ||
now - repo->last_check_locked_time >= CHECK_LOCKED_FILES_INTERVAL)
{
repo->checking_locked_files = TRUE;
if (seaf_job_manager_schedule_job (seaf->job_mgr,
check_locked_files,
check_locked_files_done,
repo) < 0) {
seaf_warning ("Failed to schedule check local locked files\n");
repo->checking_locked_files = FALSE;
} else {
repo->last_check_locked_time = now;
}
}
}
}
#endif
SyncInfo *info = get_sync_info (manager, repo->id);
if (info->in_sync)
continue;
if (info->sync_perm_err_cnt > SYNC_PERM_ERROR_RETRY_TIME)
continue;
if (repo->version > 0) {
/* For repo version > 0, only use http sync. */
if (check_http_protocol (manager, repo)) {
#if defined WIN32 || defined __APPLE__ || defined COMPILE_LINUX_WS
if (check_notif_server (manager, repo)) {
seaf_notif_manager_connect_server (seaf->notif_mgr, repo->server_url, repo->use_fileserver_port);
}
#endif
if (repo->sync_interval == 0) {
if (sync_repo_v2 (manager, repo, FALSE) < 0)
continue;
#if defined WIN32 || defined __APPLE__ || defined COMPILE_LINUX_WS
check_and_subscribe_repo (manager, repo);
#endif
}
else if (periodic_sync_due (repo)) {
if (sync_repo_v2 (manager, repo, TRUE) < 0)
continue;
#if defined WIN32 || defined __APPLE__ || defined COMPILE_LINUX_WS
check_and_subscribe_repo (manager, repo);
#endif
}
}
} else {
seaf_warning ("Repo %s(%s) is version 0 library. Syncing is no longer supported.\n",
repo->name, repo->id);
}
}
g_list_free (repos);
return TRUE;
}
static void
on_repo_http_fetched (SeafileSession *seaf,
HttpTxTask *tx_task,
SeafSyncManager *manager)
{
SyncInfo *info = get_sync_info (manager, tx_task->repo_id);
SyncTask *task = info->current_task;
/* Clone tasks are handled by clone manager. */
if (tx_task->is_clone)
return;
if (task->repo->delete_pending) {
transition_sync_state (task, SYNC_STATE_CANCELED);
seaf_repo_manager_del_repo (seaf->repo_mgr, task->repo);
return;
}
if (tx_task->state == HTTP_TASK_STATE_FINISHED) {
memcpy (info->head_commit, tx_task->head, 41);
transition_sync_state (task, SYNC_STATE_DONE);
} else if (tx_task->state == HTTP_TASK_STATE_CANCELED) {
transition_sync_state (task, SYNC_STATE_CANCELED);
} else if (tx_task->state == HTTP_TASK_STATE_ERROR) {
if (tx_task->error == SYNC_ERROR_ID_SERVER_REPO_DELETED) {
on_repo_deleted_on_server (task, task->repo);
} else {
set_task_error (task, tx_task->error);
}
}
}
static void
on_repo_http_uploaded (SeafileSession *seaf,
HttpTxTask *tx_task,
SeafSyncManager *manager)
{
SyncInfo *info = get_sync_info (manager, tx_task->repo_id);
SyncTask *task = info->current_task;
g_return_if_fail (task != NULL && info->in_sync);
if (task->repo->delete_pending) {
transition_sync_state (task, SYNC_STATE_CANCELED);
seaf_repo_manager_del_repo (seaf->repo_mgr, task->repo);
return;
}
if (tx_task->state == HTTP_TASK_STATE_FINISHED) {
memcpy (info->head_commit, tx_task->head, 41);
/* Save current head commit id for GC. */
seaf_repo_manager_set_repo_property (seaf->repo_mgr,
task->repo->id,
REPO_LOCAL_HEAD,
task->repo->head->commit_id);
task->uploaded = TRUE;
check_head_commit_http (task);
} else if (tx_task->state == HTTP_TASK_STATE_CANCELED) {
transition_sync_state (task, SYNC_STATE_CANCELED);
} else if (tx_task->state == HTTP_TASK_STATE_ERROR) {
if (tx_task->error == SYNC_ERROR_ID_SERVER_REPO_DELETED) {
on_repo_deleted_on_server (task, task->repo);
} else {
set_task_error (task, tx_task->error);
}
}
}
const char *
sync_state_to_str (int state)
{
if (state < 0 || state >= SYNC_STATE_NUM) {
seaf_warning ("illegal sync state: %d\n", state);
return NULL;
}
return sync_state_str[state];
}
static void
disable_auto_sync_for_repos (SeafSyncManager *mgr)
{
GList *repos;
GList *ptr;
SeafRepo *repo;
repos = seaf_repo_manager_get_repo_list (seaf->repo_mgr, -1, -1);
for (ptr = repos; ptr; ptr = ptr->next) {
repo = ptr->data;
if (repo->sync_interval == 0)
seaf_wt_monitor_unwatch_repo (seaf->wt_monitor, repo->id);
seaf_sync_manager_cancel_sync_task (mgr, repo->id);
seaf_sync_manager_remove_active_path_info (mgr, repo->id);
}
g_list_free (repos);
}
int
seaf_sync_manager_disable_auto_sync (SeafSyncManager *mgr)
{
if (!seaf->started) {
seaf_message ("sync manager is not started, skip disable auto sync.\n");
return -1;
}
disable_auto_sync_for_repos (mgr);
mgr->priv->auto_sync_enabled = FALSE;
g_debug ("[sync mgr] auto sync is disabled\n");
return 0;
}
static void
enable_auto_sync_for_repos (SeafSyncManager *mgr)
{
GList *repos;
GList *ptr;
SeafRepo *repo;
repos = seaf_repo_manager_get_repo_list (seaf->repo_mgr, -1, -1);
for (ptr = repos; ptr; ptr = ptr->next) {
repo = ptr->data;
if (repo->sync_interval == 0)
seaf_wt_monitor_watch_repo (seaf->wt_monitor, repo->id, repo->worktree);
}
g_list_free (repos);
}
int
seaf_sync_manager_enable_auto_sync (SeafSyncManager *mgr)
{
if (!seaf->started) {
seaf_message ("sync manager is not started, skip enable auto sync.\n");
return -1;
}
enable_auto_sync_for_repos (mgr);
mgr->priv->auto_sync_enabled = TRUE;
g_debug ("[sync mgr] auto sync is enabled\n");
return 0;
}
int
seaf_sync_manager_is_auto_sync_enabled (SeafSyncManager *mgr)
{
if (mgr->priv->auto_sync_enabled)
return 1;
else
return 0;
}
static ActivePathsInfo *
active_paths_info_new (SeafRepo *repo)
{
ActivePathsInfo *info = g_new0 (ActivePathsInfo, 1);
info->paths = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
info->syncing_tree = sync_status_tree_new (repo->worktree);
info->synced_tree = sync_status_tree_new (repo->worktree);
return info;
}
static void
active_paths_info_free (ActivePathsInfo *info)
{
if (!info)
return;
g_hash_table_destroy (info->paths);
sync_status_tree_free (info->syncing_tree);
sync_status_tree_free (info->synced_tree);
g_free (info);
}
void
seaf_sync_manager_update_active_path (SeafSyncManager *mgr,
const char *repo_id,
const char *path,
int mode,
SyncStatus status,
gboolean refresh)
{
ActivePathsInfo *info;
SeafRepo *repo;
if (!repo_id || !path) {
seaf_warning ("BUG: empty repo_id or path.\n");
return;
}
if (status <= SYNC_STATUS_NONE || status >= N_SYNC_STATUS) {
seaf_warning ("BUG: invalid sync status %d.\n", status);
return;
}
pthread_mutex_lock (&mgr->priv->paths_lock);
info = g_hash_table_lookup (mgr->priv->active_paths, repo_id);
if (!info) {
repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id);
if (!repo) {
pthread_mutex_unlock (&mgr->priv->paths_lock);
return;
}
info = active_paths_info_new (repo);
g_hash_table_insert (mgr->priv->active_paths, g_strdup(repo_id), info);
}
SyncStatus existing = (SyncStatus) g_hash_table_lookup (info->paths, path);
if (!existing) {
g_hash_table_insert (info->paths, g_strdup(path), (void*)status);
if (status == SYNC_STATUS_SYNCING)
sync_status_tree_add (info->syncing_tree, path, mode, refresh);
else if (status == SYNC_STATUS_SYNCED)
sync_status_tree_add (info->synced_tree, path, mode, refresh);
#ifdef WIN32
else if (refresh)
seaf_sync_manager_add_refresh_path (mgr, path);
#endif
} else if (existing != status) {
g_hash_table_replace (info->paths, g_strdup(path), (void*)status);
if (existing == SYNC_STATUS_SYNCING)
sync_status_tree_del (info->syncing_tree, path);
else if (existing == SYNC_STATUS_SYNCED)
sync_status_tree_del (info->synced_tree, path);
if (status == SYNC_STATUS_SYNCING)
sync_status_tree_add (info->syncing_tree, path, mode, refresh);
else if (status == SYNC_STATUS_SYNCED)
sync_status_tree_add (info->synced_tree, path, mode, refresh);
#ifdef WIN32
else if (refresh)
seaf_sync_manager_add_refresh_path (mgr, path);
#endif
}
pthread_mutex_unlock (&mgr->priv->paths_lock);
}
void
seaf_sync_manager_delete_active_path (SeafSyncManager *mgr,
const char *repo_id,
const char *path)
{
ActivePathsInfo *info;
if (!repo_id || !path) {
seaf_warning ("BUG: empty repo_id or path.\n");
return;
}
pthread_mutex_lock (&mgr->priv->paths_lock);
info = g_hash_table_lookup (mgr->priv->active_paths, repo_id);
if (!info) {
pthread_mutex_unlock (&mgr->priv->paths_lock);
return;
}
g_hash_table_remove (info->paths, path);
sync_status_tree_del (info->syncing_tree, path);
sync_status_tree_del (info->synced_tree, path);
pthread_mutex_unlock (&mgr->priv->paths_lock);
}
static char *path_status_tbl[] = {
"none",
"syncing",
"error",
"ignored",
"synced",
"paused",
"readonly",
"locked",
"locked_by_me",
NULL,
};
static char *
get_repo_sync_status (SeafSyncManager *mgr, const char *repo_id)
{
SyncInfo *info = get_sync_info (mgr, repo_id);
SeafRepo *repo;
if (info->in_error)
return g_strdup(path_status_tbl[SYNC_STATUS_ERROR]);
repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id);
if (!repo)
return g_strdup(path_status_tbl[SYNC_STATUS_NONE]);
if (!repo->auto_sync || !mgr->priv->auto_sync_enabled)
return g_strdup(path_status_tbl[SYNC_STATUS_PAUSED]);
char allzeros[41] = {0};
if (!info->in_sync && memcmp(allzeros, info->head_commit, 41) == 0)
return g_strdup(path_status_tbl[SYNC_STATUS_NONE]);
if (info->in_sync &&
(info->current_task->state == SYNC_STATE_COMMIT ||
info->current_task->state == SYNC_STATE_FETCH ||
info->current_task->state == SYNC_STATE_UPLOAD ||
info->current_task->state == SYNC_STATE_MERGE))
return g_strdup(path_status_tbl[SYNC_STATUS_SYNCING]);
else if (!repo->is_readonly)
return g_strdup(path_status_tbl[SYNC_STATUS_SYNCED]);
else
return g_strdup(path_status_tbl[SYNC_STATUS_READONLY]);
}
char *
seaf_sync_manager_get_path_sync_status (SeafSyncManager *mgr,
const char *repo_id,
const char *path,
gboolean is_dir)
{
ActivePathsInfo *info;
SyncInfo *sync_info;
SyncStatus ret = SYNC_STATUS_NONE;
if (!repo_id || !path) {
seaf_warning ("BUG: empty repo_id or path.\n");
return NULL;
}
if (path[0] == 0) {
return get_repo_sync_status (mgr, repo_id);
}
/* If the repo is in error, all files in it should show no sync status. */
sync_info = get_sync_info (mgr, repo_id);
if (sync_info && sync_info->in_error) {
ret = SYNC_STATUS_NONE;
goto out;
}
pthread_mutex_lock (&mgr->priv->paths_lock);
info = g_hash_table_lookup (mgr->priv->active_paths, repo_id);
if (!info) {
pthread_mutex_unlock (&mgr->priv->paths_lock);
ret = SYNC_STATUS_NONE;
goto out;
}
ret = (SyncStatus) g_hash_table_lookup (info->paths, path);
if (is_dir && (ret == SYNC_STATUS_NONE)) {
/* If a dir is not in the syncing tree but in the synced tree,
* it's synced. Otherwise if it's in the syncing tree, some files
* under it must be syncing, so it should be in syncing status too.
*/
if (sync_status_tree_exists (info->syncing_tree, path))
ret = SYNC_STATUS_SYNCING;
else if (sync_status_tree_exists (info->synced_tree, path))
ret = SYNC_STATUS_SYNCED;
}
pthread_mutex_unlock (&mgr->priv->paths_lock);
if (ret == SYNC_STATUS_SYNCED) {
if (!seaf_repo_manager_is_path_writable(seaf->repo_mgr, repo_id, path))
ret = SYNC_STATUS_READONLY;
else if (seaf_filelock_manager_is_file_locked_by_me (seaf->filelock_mgr,
repo_id, path))
ret = SYNC_STATUS_LOCKED_BY_ME;
else if (seaf_filelock_manager_is_file_locked (seaf->filelock_mgr,
repo_id, path))
ret = SYNC_STATUS_LOCKED;
}
out:
return g_strdup(path_status_tbl[ret]);
}
static json_t *
active_paths_to_json (GHashTable *paths)
{
json_t *array = NULL, *obj = NULL;
GHashTableIter iter;
gpointer key, value;
char *path;
SyncStatus status;
array = json_array ();
g_hash_table_iter_init (&iter, paths);
while (g_hash_table_iter_next (&iter, &key, &value)) {
path = key;
status = (SyncStatus)value;
obj = json_object ();
json_object_set (obj, "path", json_string(path));
json_object_set (obj, "status", json_string(path_status_tbl[status]));
json_array_append (array, obj);
}
return array;
}
char *
seaf_sync_manager_list_active_paths_json (SeafSyncManager *mgr)
{
json_t *array = NULL, *obj = NULL, *path_array = NULL;
GHashTableIter iter;
gpointer key, value;
char *repo_id;
ActivePathsInfo *info;
char *ret = NULL;
pthread_mutex_lock (&mgr->priv->paths_lock);
array = json_array ();
g_hash_table_iter_init (&iter, mgr->priv->active_paths);
while (g_hash_table_iter_next (&iter, &key, &value)) {
repo_id = key;
info = value;
obj = json_object();
path_array = active_paths_to_json (info->paths);
json_object_set (obj, "repo_id", json_string(repo_id));
json_object_set (obj, "paths", path_array);
json_array_append (array, obj);
}
pthread_mutex_unlock (&mgr->priv->paths_lock);
ret = json_dumps (array, JSON_INDENT(4));
if (!ret) {
seaf_warning ("Failed to convert active paths to json\n");
}
json_decref (array);
return ret;
}
int
seaf_sync_manager_active_paths_number (SeafSyncManager *mgr)
{
GHashTableIter iter;
gpointer key, value;
ActivePathsInfo *info;
int ret = 0;
g_hash_table_iter_init (&iter, mgr->priv->active_paths);
while (g_hash_table_iter_next (&iter, &key, &value)) {
info = value;
ret += g_hash_table_size(info->paths);
}
return ret;
}
void
seaf_sync_manager_remove_active_path_info (SeafSyncManager *mgr, const char *repo_id)
{
pthread_mutex_lock (&mgr->priv->paths_lock);
g_hash_table_remove (mgr->priv->active_paths, repo_id);
pthread_mutex_unlock (&mgr->priv->paths_lock);
#ifdef WIN32
/* This is a hack to tell Windows Explorer to refresh all open windows. */
SHChangeNotify (SHCNE_ASSOCCHANGED, SHCNF_IDLIST, NULL, NULL);
#endif
}
#ifdef WIN32
static wchar_t *
win_path (const char *path)
{
char *ret = g_strdup(path);
wchar_t *ret_w;
char *p;
for (p = ret; *p != 0; ++p)
if (*p == '/')
*p = '\\';
ret_w = g_utf8_to_utf16 (ret, -1, NULL, NULL, NULL);
g_free (ret);
return ret_w;
}
static void *
refresh_windows_explorer_thread (void *vdata)
{
GAsyncQueue *q = vdata;
char *path;
wchar_t *wpath;
int count = 0;
while (1) {
path = g_async_queue_pop (q);
wpath = win_path (path);
SHChangeNotify (SHCNE_ATTRIBUTES, SHCNF_PATHW, wpath, NULL);
g_free (path);
g_free (wpath);
if (++count >= 100) {
g_usleep (G_USEC_PER_SEC);
count = 0;
}
}
return NULL;
}
void
seaf_sync_manager_add_refresh_path (SeafSyncManager *mgr, const char *path)
{
g_async_queue_push (mgr->priv->refresh_paths, g_strdup(path));
}
void
seaf_sync_manager_refresh_path (SeafSyncManager *mgr, const char *path)
{
wchar_t *wpath;
wpath = win_path (path);
SHChangeNotify (SHCNE_ATTRIBUTES, SHCNF_PATHW, wpath, NULL);
g_free (wpath);
}
#endif
static void
update_head_commit_ids_for_server (gpointer key, gpointer value, gpointer user_data)
{
char *server_url = key;
HttpServerState *state = value;
int status = 200;
/* Only get head commit ids from server if:
* 1. syncing protocol version has been checked, and
* 2. protocol version is at least 2.
*/
if (state->http_version >= 2) {
seaf_debug ("Updating repo head commit ids for server %s.\n", server_url);
GList *repo_id_list = seaf_repo_manager_get_repo_id_list_by_server (seaf->repo_mgr,
server_url);
if (!repo_id_list) {
return;
}
GHashTable *new_map = http_tx_manager_get_head_commit_ids (seaf->http_tx_mgr,
state->effective_host,
state->use_fileserver_port,
repo_id_list, &status);
if (new_map) {
//If the fileserver hangs up before sending events to the notification server,
// the client will not receive the updates, and some updates will be lost.
// Therefore, after the fileserver recovers, immediately checking locks and folder perms.
if (state->server_disconnected) {
seaf_sync_manager_check_locks_and_folder_perms (seaf->sync_mgr, server_url);
}
state->server_disconnected = FALSE;
pthread_mutex_lock (&state->head_commit_map_lock);
g_hash_table_destroy (state->head_commit_map);
state->head_commit_map = new_map;
if (!state->head_commit_map_init)
state->head_commit_map_init = TRUE;
state->last_update_head_commit_map_time = (gint64)time(NULL);
pthread_mutex_unlock (&state->head_commit_map_lock);
} else {
if (status == HTTP_SERVERR_BAD_GATEWAY ||
status == HTTP_SERVERR_UNAVAILABLE ||
status == HTTP_SERVERR_TIMEOUT) {
state->server_disconnected = TRUE;
}
}
g_list_free_full (repo_id_list, g_free);
}
}
static void *
update_cached_head_commit_ids (void *arg)
{
SeafSyncManager *mgr = (SeafSyncManager *)arg;
while (1) {
g_usleep (30 * G_USEC_PER_SEC);
g_hash_table_foreach (mgr->http_server_states, update_head_commit_ids_for_server, mgr);
}
return NULL;
}