Implement new sync loop, upload part.

This commit is contained in:
Jiaqiang Xu 2014-06-26 16:27:36 +08:00
parent e326eaefa1
commit 100f13eb9b
28 changed files with 2813 additions and 1092 deletions

View File

@ -25,7 +25,7 @@
#define CURRENT_ENC_VERSION 2
#define DEFAULT_PROTO_VERSION 1
#define CURRENT_PROTO_VERSION 6
#define CURRENT_PROTO_VERSION 7
#define CURRENT_REPO_VERSION 1

View File

@ -149,28 +149,6 @@ int diff_index(const char *repo_id, int version,
#endif /* not SEAFILE_SERVER */
#ifdef SEAFILE_SERVER
typedef int (*DiffFileCB) (int n,
const char *basedir,
SeafDirent *files[],
void *data);
typedef int (*DiffDirCB) (int n,
const char *basedir,
SeafDirent *dirs[],
void *data,
gboolean *recurse);
typedef struct DiffOptions {
char store_id[37];
int version;
DiffFileCB file_cb;
DiffDirCB dir_cb;
void *data;
} DiffOptions;
inline static gboolean
dirent_same (SeafDirent *denta, SeafDirent *dentb)
{
@ -334,7 +312,7 @@ diff_trees_recursive (int n, SeafDir *trees[],
return ret;
}
static int
int
diff_trees (int n, const char *roots[], DiffOptions *opt)
{
SeafDir **trees, *root;
@ -365,6 +343,8 @@ diff_trees (int n, const char *roots[], DiffOptions *opt)
return ret;
}
#ifdef SEAFILE_SERVER
static int
twoway_diff_files (int n, const char *basedir, SeafDirent *files[], void *data)
{

View File

@ -92,4 +92,27 @@ format_diff_results(GList *results);
char *
diff_results_to_description (GList *results);
typedef int (*DiffFileCB) (int n,
const char *basedir,
SeafDirent *files[],
void *data);
typedef int (*DiffDirCB) (int n,
const char *basedir,
SeafDirent *dirs[],
void *data,
gboolean *recurse);
typedef struct DiffOptions {
char store_id[37];
int version;
DiffFileCB file_cb;
DiffDirCB dir_cb;
void *data;
} DiffOptions;
int
diff_trees (int n, const char *roots[], DiffOptions *opt);
#endif

View File

@ -19,6 +19,11 @@
#define SC_OBJ_SEG_END "307"
#define SS_OBJ_SEG_END "Object Segment End"
#define SC_OBJ_LIST_SEG "308"
#define SS_OBJ_LIST_SEG "Object List Segment"
#define SC_OBJ_LIST_SEG_END "309"
#define SS_OBJ_LIST_SEG_END "Object List Segment End"
#define SC_NOT_FOUND "401"
#define SS_NOT_FOUND "Object not found"
#define SC_BAD_OL "402"

View File

@ -34,7 +34,10 @@ proc_headers = $(addprefix processors/, \
checkbl-proc.h \
getcommit-v3-proc.h \
checkff-proc.h \
getca-proc.h)
getca-proc.h \
check-protocol-proc.h \
sendcommit-v4-proc.h \
sendfs-v2-proc.h)
proc_headers += ../common/processors/objecttx-common.h \
../common/processors/blocktx-common.h \
@ -53,7 +56,6 @@ noinst_HEADERS = \
../common/sync-repo-common.h \
block-tx-client.h \
seafile-config.h \
gc-core.h \
client-migrate.h \
$(proc_headers)
@ -81,7 +83,7 @@ common_src = \
repo-mgr.c ../common/commit-mgr.c \
../common/log.c ../common/avl/avl.c ../common/object-list.c \
../common/rpc-service.c \
gc-core.c ../common/vc-common.c \
../common/vc-common.c \
../common/seaf-utils.c \
../common/obj-store.c \
../common/obj-backend-fs.c \
@ -108,7 +110,10 @@ common_src = \
processors/checkbl-proc.c \
processors/getcommit-v3-proc.c \
processors/checkff-proc.c \
processors/getca-proc.c
processors/getca-proc.c \
processors/check-protocol-proc.c \
processors/sendcommit-v4-proc.c \
processors/sendfs-v2-proc.c
seaf_daemon_SOURCES = seaf-daemon.c $(common_src)

View File

@ -464,6 +464,7 @@ add_transfer_task (CloneTask *task, GError **error)
"fetch_head",
"master",
task->token,
FALSE,
error);
if (!task->tx_id)
return -1;

View File

@ -0,0 +1,75 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#include "common.h"
#include "check-protocol-proc.h"
#define DEBUG_FLAG SEAFILE_DEBUG_SYNC
#include "log.h"
G_DEFINE_TYPE (SeafileCheckProtocolProc, seafile_check_protocol_proc, CCNET_TYPE_PROCESSOR)
static int
check_protocol_start (CcnetProcessor *processor, int argc, char **argv);
static void
handle_response (CcnetProcessor *processor,
char *code, char *code_msg,
char *content, int clen);
static void
seafile_check_protocol_proc_class_init (SeafileCheckProtocolProcClass *klass)
{
CcnetProcessorClass *proc_class = CCNET_PROCESSOR_CLASS (klass);
proc_class->name = "seafile-check-protocol";
proc_class->start = check_protocol_start;
proc_class->handle_response = handle_response;
}
static void
seafile_check_protocol_proc_init (SeafileCheckProtocolProc *processor)
{
}
static int
check_protocol_start (CcnetProcessor *processor, int argc, char **argv)
{
if (argc != 0) {
seaf_warning ("[sync-repo] argc should be 0.\n");
ccnet_processor_done (processor, FALSE);
return 0;
}
char buf[256];
snprintf (buf, 256, "remote %s seafile-check-protocol-slave", processor->peer_id);
ccnet_processor_send_request (processor, buf);
return 0;
}
static void handle_response (CcnetProcessor *processor,
char *code, char *code_msg,
char *content, int clen)
{
SeafileCheckProtocolProc *proc = (SeafileCheckProtocolProc *)processor;
if (memcmp (code, SC_OK, 3) == 0) {
if (content[clen-1] != '\0') {
seaf_warning ("[check-protocol] Response not end with NULL\n");
ccnet_processor_done (processor, FALSE);
return;
}
proc->protocol_version = atoi(content);
ccnet_processor_done (processor, TRUE);
} else
ccnet_processor_done (processor, FALSE);
}

View File

@ -0,0 +1,31 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#ifndef SEAFILE_CHECK_PROTOCOL_PROC_H
#define SEAFILE_CHECK_PROTOCOL_PROC_H
#include <glib-object.h>
#include <ccnet.h>
#define SEAFILE_TYPE_CHECK_PROTOCOL_PROC (seafile_check_protocol_proc_get_type ())
#define SEAFILE_CHECK_PROTOCOL_PROC(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SEAFILE_TYPE_CHECK_PROTOCOL_PROC, SeafileCheckProtocolProc))
#define SEAFILE_IS_CHECK_PROTOCOL_PROC(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SEAFILE_TYPE_CHECK_PROTOCOL_PROC))
#define SEAFILE_CHECK_PROTOCOL_PROC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SEAFILE_TYPE_CHECK_PROTOCOL_PROC, SeafileCheckProtocolProcClass))
#define IS_SEAFILE_CHECK_PROTOCOL_PROC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), SEAFILE_TYPE_CHECK_PROTOCOL_PROC))
#define SEAFILE_CHECK_PROTOCOL_PROC_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SEAFILE_TYPE_CHECK_PROTOCOL_PROC, SeafileCheckProtocolProcClass))
typedef struct _SeafileCheckProtocolProc SeafileCheckProtocolProc;
typedef struct _SeafileCheckProtocolProcClass SeafileCheckProtocolProcClass;
struct _SeafileCheckProtocolProc {
CcnetProcessor parent_instance;
int protocol_version;
};
struct _SeafileCheckProtocolProcClass {
CcnetProcessorClass parent_class;
};
GType seafile_check_protocol_proc_get_type ();
#endif

View File

@ -288,6 +288,9 @@ handle_response (CcnetProcessor *processor,
return;
}
if (task->protocol_version >= 7 && !task->server_side_merge)
task->protocol_version = 6;
seaf_message ("repo version is %d, protocol version is %d.\n",
task->repo_version, task->protocol_version);
ccnet_processor_done (processor, TRUE);

View File

@ -62,9 +62,14 @@ start (CcnetProcessor *processor, int argc, char **argv)
new_head = argv[2];
buf = g_string_new (NULL);
g_string_printf (buf, "remote %s seafile-recvbranch %s %s %s %s",
processor->peer_id, repo_id, branch, new_head,
task->session_token);
if (task->protocol_version <= 6)
g_string_printf (buf, "remote %s seafile-recvbranch %s %s %s %s",
processor->peer_id, repo_id, branch, new_head,
task->session_token);
else
g_string_printf (buf, "remote %s seafile-recvbranch-v2 %s %s %s %s",
processor->peer_id, repo_id, branch, new_head,
task->session_token);
ccnet_processor_send_request (processor, buf->str);
g_string_free (buf, TRUE);

View File

@ -0,0 +1,131 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#include "common.h"
#define DEBUG_FLAG SEAFILE_DEBUG_TRANSFER
#include "log.h"
#include <fcntl.h>
#include <ccnet.h>
#include "net.h"
#include "utils.h"
#include "seafile-session.h"
#include "sendcommit-v4-proc.h"
#include "processors/objecttx-common.h"
#include "vc-common.h"
enum {
INIT,
SEND_OBJECT
};
static int send_commit_start (CcnetProcessor *processor, int argc, char **argv);
static void handle_response (CcnetProcessor *processor,
char *code, char *code_msg,
char *content, int clen);
G_DEFINE_TYPE (SeafileSendcommitV4Proc, seafile_sendcommit_v4_proc, CCNET_TYPE_PROCESSOR)
static void
seafile_sendcommit_v4_proc_class_init (SeafileSendcommitV4ProcClass *klass)
{
CcnetProcessorClass *proc_class = CCNET_PROCESSOR_CLASS (klass);
proc_class->name = "sendcommit-v4-proc";
proc_class->start = send_commit_start;
proc_class->handle_response = handle_response;
}
static void
seafile_sendcommit_v4_proc_init (SeafileSendcommitV4Proc *processor)
{
}
static int
send_commit_start (CcnetProcessor *processor, int argc, char **argv)
{
TransferTask *task = ((SeafileSendcommitV4Proc *)processor)->tx_task;
GString *buf;
buf = g_string_new (NULL);
g_string_printf (buf, "remote %s seafile-recvcommit-v3 master %s",
processor->peer_id, task->session_token);
ccnet_processor_send_request (processor, buf->str);
g_string_free (buf, TRUE);
return 0;
}
static void
send_commit (CcnetProcessor *processor, const char *object_id)
{
TransferTask *task = ((SeafileSendcommitV4Proc *)processor)->tx_task;
char *data;
int len;
ObjectPack *pack = NULL;
int pack_size;
if (seaf_obj_store_read_obj (seaf->commit_mgr->obj_store,
task->repo_id, task->repo_version,
object_id, (void**)&data, &len) < 0) {
g_warning ("Failed to read commit %s.\n", object_id);
goto fail;
}
pack_size = sizeof(ObjectPack) + len;
pack = malloc (pack_size);
memcpy (pack->id, object_id, 41);
memcpy (pack->object, data, len);
ccnet_processor_send_update (processor, SC_OBJECT, SS_OBJECT,
(char *)pack, pack_size);
seaf_debug ("Send commit %.8s.\n", object_id);
g_free (data);
free (pack);
return;
fail:
ccnet_processor_send_update (processor, SC_NOT_FOUND, SS_NOT_FOUND,
object_id, 41);
ccnet_processor_done (processor, FALSE);
}
static void handle_response (CcnetProcessor *processor,
char *code, char *code_msg,
char *content, int clen)
{
SeafileSendcommitV4Proc *proc = (SeafileSendcommitV4Proc *)processor;
TransferTask *task = proc->tx_task;
if (task->state != TASK_STATE_NORMAL) {
ccnet_processor_done (processor, TRUE);
return;
}
switch (processor->state) {
case INIT:
if (memcmp (code, SC_OK, 3) == 0) {
processor->state = SEND_OBJECT;
send_commit (processor, task->head);
return;
}
break;
case SEND_OBJECT:
if (memcmp (code, SC_ACK, 3) == 0) {
ccnet_processor_done (processor, TRUE);
return;
}
break;
default:
g_return_if_reached ();
}
g_warning ("Bad response: %s %s.\n", code, code_msg);
if (memcmp (code, SC_ACCESS_DENIED, 3) == 0)
transfer_task_set_error (task, TASK_ERR_ACCESS_DENIED);
ccnet_processor_done (processor, FALSE);
}

View File

@ -0,0 +1,31 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#ifndef SEAFILE_SENDCOMMIT_V4_PROC_H
#define SEAFILE_SENDCOMMIT_V4_PROC_H
#include <glib-object.h>
#define SEAFILE_TYPE_SENDCOMMIT_V4_PROC (seafile_sendcommit_v4_proc_get_type ())
#define SEAFILE_SENDCOMMIT_V4_PROC(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SEAFILE_TYPE_SENDCOMMIT_V4_PROC, SeafileSendcommitV4Proc))
#define SEAFILE_IS_SENDCOMMIT_V4_PROC(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SEAFILE_TYPE_SENDCOMMIT_V4_PROC))
#define SEAFILE_SENDCOMMIT_V4_PROC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SEAFILE_TYPE_SENDCOMMIT_V4_PROC, SeafileSendcommitV4ProcClass))
#define IS_SEAFILE_SENDCOMMIT_V4_PROC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), SEAFILE_TYPE_SENDCOMMIT_V4_PROC))
#define SEAFILE_SENDCOMMIT_V4_PROC_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SEAFILE_TYPE_SENDCOMMIT_V4_PROC, SeafileSendcommitV4ProcClass))
typedef struct _SeafileSendcommitV4Proc SeafileSendcommitV4Proc;
typedef struct _SeafileSendcommitV4ProcClass SeafileSendcommitV4ProcClass;
struct _SeafileSendcommitV4Proc {
CcnetProcessor parent_instance;
TransferTask *tx_task;
};
struct _SeafileSendcommitV4ProcClass {
CcnetProcessorClass parent_class;
};
GType seafile_sendcommit_v4_proc_get_type ();
#endif

View File

@ -0,0 +1,478 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#include "common.h"
#define DEBUG_FLAG SEAFILE_DEBUG_TRANSFER
#include "log.h"
#include <fcntl.h>
#include <sys/stat.h>
#include <ccnet.h>
#include "utils.h"
#include "seafile-session.h"
#include "commit-mgr.h"
#include "fs-mgr.h"
#include "processors/objecttx-common.h"
#include "sendfs-v2-proc.h"
#include "diff-simple.h"
/*
* recvfs-v2-proc
* ------------------------------>
*
* OK
* <-----------------------------
*
* Calculate send object list
*
* SC_OBJ_LIST_SEG
* ----------------------------->
* SC_OBJ_LIST_SEG
* <----------------------------
* ......
* SC_OBJ_LIST_SEG_END
* ----------------------------->
*
* SC_OBJ
* ----------------------------->
* ......
* After all objects are saved to disk, the server ends the protocol.
* SC_END
* <----------------------------
*/
enum {
INIT = 0,
CHECK_OBJECT_LIST,
SEND_OBJECTS,
};
typedef struct {
GList *send_obj_list;
GList *recv_obj_list;
guint32 reader_id;
gboolean calc_success;
} SeafileSendfsProcPriv;
#define GET_PRIV(o) \
(G_TYPE_INSTANCE_GET_PRIVATE ((o), SEAFILE_TYPE_SENDFS_V2_PROC, SeafileSendfsProcPriv))
#define USE_PRIV \
SeafileSendfsProcPriv *priv = GET_PRIV(processor);
G_DEFINE_TYPE (SeafileSendfsV2Proc, seafile_sendfs_v2_proc, CCNET_TYPE_PROCESSOR)
static int start (CcnetProcessor *processor, int argc, char **argv);
static void handle_response (CcnetProcessor *processor,
char *code, char *code_msg,
char *content, int clen);
static void
release_resource(CcnetProcessor *processor)
{
USE_PRIV;
string_list_free (priv->send_obj_list);
string_list_free (priv->recv_obj_list);
seaf_obj_store_unregister_async_read (seaf->fs_mgr->obj_store,
priv->reader_id);
CCNET_PROCESSOR_CLASS (seafile_sendfs_v2_proc_parent_class)->release_resource (processor);
}
static void
seafile_sendfs_v2_proc_class_init (SeafileSendfsV2ProcClass *klass)
{
CcnetProcessorClass *proc_class = CCNET_PROCESSOR_CLASS (klass);
proc_class->name = "sendfs-v2-proc";
proc_class->start = start;
proc_class->handle_response = handle_response;
proc_class->release_resource = release_resource;
g_type_class_add_private (klass, sizeof(SeafileSendfsProcPriv));
}
static void
seafile_sendfs_v2_proc_init (SeafileSendfsV2Proc *processor)
{
}
static void
fs_object_read_cb (OSAsyncResult *res, void *data);
static int
start (CcnetProcessor *processor, int argc, char **argv)
{
USE_PRIV;
GString *buf;
SeafileSendfsV2Proc *proc = (SeafileSendfsV2Proc *)processor;
TransferTask *task = proc->tx_task;
buf = g_string_new (NULL);
g_string_printf (buf, "remote %s seafile-recvfs-v2 %s",
processor->peer_id, task->session_token);
ccnet_processor_send_request (processor, buf->str);
g_string_free (buf, TRUE);
priv->reader_id = seaf_obj_store_register_async_read (seaf->fs_mgr->obj_store,
task->repo_id,
task->repo_version,
fs_object_read_cb,
processor);
return 0;
}
/* Calculate send object list */
inline static gboolean
dirent_same (SeafDirent *denta, SeafDirent *dentb)
{
return (strcmp (dentb->id, denta->id) == 0 && denta->mode == dentb->mode);
}
static int
collect_file_ids (int n, const char *basedir, SeafDirent *files[], void *data)
{
SeafDirent *file1 = files[0];
SeafDirent *file2 = files[1];
GList **pret = data;
if (file1 && (!file2 || !dirent_same (file1, file2)) &&
strcmp (file1->id, EMPTY_SHA1) != 0)
*pret = g_list_prepend (*pret, g_strdup(file1->id));
return 0;
}
static int
collect_dir_ids (int n, const char *basedir, SeafDirent *dirs[], void *data,
gboolean *recurse)
{
SeafDirent *dir1 = dirs[0];
SeafDirent *dir2 = dirs[1];
GList **pret = data;
if (dir1 && (!dir2 || !dirent_same (dir1, dir2)) &&
strcmp (dir1->id, EMPTY_SHA1) != 0)
*pret = g_list_prepend (*pret, g_strdup(dir1->id));
return 0;
}
static void *
calculate_send_object_list (void *vdata)
{
CcnetProcessor *processor = vdata;
USE_PRIV;
SeafileSendfsV2Proc *proc = (SeafileSendfsV2Proc *)processor;
TransferTask *task = proc->tx_task;
SeafBranch *local = NULL, *master = NULL;
local = seaf_branch_manager_get_branch (seaf->branch_mgr, task->repo_id, "local");
if (!local) {
seaf_warning ("Branch local not found for repo %.8s.\n", task->repo_id);
priv->calc_success = FALSE;
goto out;
}
master = seaf_branch_manager_get_branch (seaf->branch_mgr, task->repo_id, "master");
if (!master) {
seaf_warning ("Branch master not found for repo %.8s.\n", task->repo_id);
priv->calc_success = FALSE;
goto out;
}
SeafCommit *local_head = NULL, *master_head = NULL;
local_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
task->repo_id, task->repo_version,
local->commit_id);
if (!local_head) {
seaf_warning ("Local head commit not found for repo %.8s.\n",
task->repo_id);
priv->calc_success = FALSE;
goto out;
}
master_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
task->repo_id, task->repo_version,
master->commit_id);
if (!master_head) {
seaf_warning ("Master head commit not found for repo %.8s.\n",
task->repo_id);
priv->calc_success = FALSE;
goto out;
}
/* Diff won't traverse the root object itself. */
if (strcmp (local_head->root_id, master_head->root_id) != 0)
priv->send_obj_list = g_list_prepend (priv->send_obj_list,
g_strdup(local_head->root_id));
DiffOptions opts;
memset (&opts, 0, sizeof(opts));
memcpy (opts.store_id, task->repo_id, 36);
opts.version = task->repo_version;
opts.file_cb = collect_file_ids;
opts.dir_cb = collect_dir_ids;
opts.data = &priv->send_obj_list;
const char *trees[2];
trees[0] = local_head->root_id;
trees[1] = master_head->root_id;
if (diff_trees (2, trees, &opts) < 0) {
seaf_warning ("Failed to diff local and master head for repo %.8s.\n",
task->repo_id);
priv->calc_success = FALSE;
}
priv->calc_success = TRUE;
out:
seaf_branch_unref (local);
seaf_branch_unref (master);
seaf_commit_unref (local_head);
seaf_commit_unref (master_head);
return vdata;
}
static void
send_object_list_segment (CcnetProcessor *processor);
static void
calculate_send_object_list_done (void *vdata)
{
CcnetProcessor *processor = vdata;
USE_PRIV;
if (!priv->calc_success) {
ccnet_processor_send_update (processor, SC_SHUTDOWN, SS_SHUTDOWN, NULL, 0);
ccnet_processor_done (processor, FALSE);
return;
}
if (priv->send_obj_list == NULL) {
seaf_message ("No fs objects to upload. Done.\n");
ccnet_processor_send_update (processor, SC_END, SS_END, NULL, 0);
ccnet_processor_done (processor, TRUE);
return;
}
send_object_list_segment (processor);
}
/* Check object list. */
#define OBJECT_LIST_SEGMENT_N 1000
#define OBJECT_LIST_SEGMENT_LEN 40 * 1000
static void
send_next_object (CcnetProcessor *processor);
static void
send_object_list_segment (CcnetProcessor *processor)
{
USE_PRIV;
char buf[OBJECT_LIST_SEGMENT_LEN];
if (priv->send_obj_list == NULL) {
seaf_debug ("Check object list end.\n");
ccnet_processor_send_update (processor,
SC_OBJ_LIST_SEG_END, SS_OBJ_LIST_SEG_END,
NULL, 0);
send_next_object (processor);
processor->state = SEND_OBJECTS;
return;
}
int i = 0;
char *p = buf;
char *obj_id;
while (priv->send_obj_list != NULL) {
obj_id = priv->send_obj_list->data;
priv->send_obj_list = g_list_delete_link (priv->send_obj_list,
priv->send_obj_list);
memcpy (p, obj_id, 40);
p += 40;
g_free (obj_id);
if (++i == OBJECT_LIST_SEGMENT_N)
break;
}
if (i > 0) {
seaf_debug ("Send %d object ids.\n", i);
ccnet_processor_send_update (processor,
SC_OBJ_LIST_SEG, SS_OBJ_LIST_SEG,
buf, i * 40);
}
}
static void
process_object_list_segment (CcnetProcessor *processor, char *content, int clen)
{
USE_PRIV;
int n, i;
char *p;
if (clen % 40 != 0) {
seaf_warning ("Invalid object list segment length %d.\n", clen);
ccnet_processor_send_update (processor, SC_SHUTDOWN, SS_SHUTDOWN, NULL, 0);
ccnet_processor_done (processor, FALSE);
return;
}
n = clen/40;
p = content;
seaf_debug ("%d objects are needed by the server.\n", n);
for (i = 0; i < n; ++i) {
priv->recv_obj_list = g_list_prepend (priv->recv_obj_list, g_strndup (p, 40));
p += 40;
}
}
/* Send objects */
static void
send_fs_object (CcnetProcessor *processor,
const char *object_id, char *data, int len)
{
ObjectPack *pack = NULL;
int pack_size;
pack_size = sizeof(ObjectPack) + len;
pack = malloc (pack_size);
memcpy (pack->id, object_id, 41);
memcpy (pack->object, data, len);
if (pack_size <= MAX_OBJ_SEG_SIZE) {
ccnet_processor_send_update (processor, SC_OBJECT, SS_OBJECT,
(char *)pack, pack_size);
} else {
int offset, n;
offset = 0;
while (offset < pack_size) {
n = MIN(pack_size - offset, MAX_OBJ_SEG_SIZE);
if (offset + n < pack_size) {
ccnet_processor_send_update (processor,
SC_OBJ_SEG, SS_OBJ_SEG,
(char *)pack + offset, n);
} else {
ccnet_processor_send_update (processor,
SC_OBJ_SEG_END, SS_OBJ_SEG_END,
(char *)pack + offset, n);
}
seaf_debug ("Sent object %s segment<total = %d, offset = %d, n = %d>\n",
object_id, pack_size, offset, n);
offset += n;
}
}
seaf_debug ("Send fs object %.8s.\n", object_id);
free (pack);
}
static void
fs_object_read_cb (OSAsyncResult *res, void *data)
{
CcnetProcessor *processor = data;
if (!res->success) {
seaf_warning ("Failed to read fs object %.8s.\n", res->obj_id);
ccnet_processor_send_update (processor, SC_NOT_FOUND, SS_NOT_FOUND,
res->obj_id, 41);
ccnet_processor_done (processor, FALSE);
return;
}
send_fs_object (processor, res->obj_id, res->data, res->len);
send_next_object (processor);
}
static void
read_fs_object (CcnetProcessor *processor, const char *obj_id)
{
USE_PRIV;
seaf_obj_store_async_read (seaf->fs_mgr->obj_store,
priv->reader_id,
obj_id);
}
static void
send_next_object (CcnetProcessor *processor)
{
USE_PRIV;
char *object_id;
if (priv->recv_obj_list == NULL) {
seaf_debug ("Send fs objects end.\n");
return;
}
object_id = priv->recv_obj_list->data;
priv->recv_obj_list = g_list_delete_link (priv->recv_obj_list,
priv->recv_obj_list);
read_fs_object (processor, object_id);
g_free (object_id);
}
static void
handle_response (CcnetProcessor *processor,
char *code, char *code_msg,
char *content, int clen)
{
SeafileSendfsV2Proc *proc = (SeafileSendfsV2Proc *)processor;
TransferTask *task = proc->tx_task;
switch (processor->state) {
case INIT:
if (strncmp(code, SC_OK, 3) == 0) {
ccnet_processor_thread_create (processor, seaf->job_mgr,
calculate_send_object_list,
calculate_send_object_list_done,
processor);
processor->state = CHECK_OBJECT_LIST;
return;
}
break;
case CHECK_OBJECT_LIST:
if (strncmp (code, SC_OBJ_LIST_SEG, 3) == 0) {
process_object_list_segment (processor, content, clen);
send_object_list_segment (processor);
return;
}
break;
case SEND_OBJECTS:
if (strncmp (code, SC_END, 3) == 0) {
seaf_debug ("All objects received. Done.\n");
ccnet_processor_done (processor, TRUE);
return;
}
break;
default:
g_return_if_reached ();
}
g_warning ("Bad response: %s %s.\n", code, code_msg);
if (memcmp (code, SC_ACCESS_DENIED, 3) == 0)
transfer_task_set_error (task, TASK_ERR_ACCESS_DENIED);
ccnet_processor_done (processor, FALSE);
}

View File

@ -0,0 +1,33 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#ifndef SEAFILE_SENDFS_V2_PROC_H
#define SEAFILE_SENDFS_V2_PROC_H
#include <glib-object.h>
#include <ccnet/processor.h>
#include "transfer-mgr.h"
#define SEAFILE_TYPE_SENDFS_V2_PROC (seafile_sendfs_v2_proc_get_type ())
#define SEAFILE_SENDFS_V2_PROC(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SEAFILE_TYPE_SENDFS_V2_PROC, SeafileSendfsV2Proc))
#define SEAFILE_IS_SENDFS_V2_PROC(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SEAFILE_TYPE_SENDFS_V2_PROC))
#define SEAFILE_SENDFS_V2_PROC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SEAFILE_TYPE_SENDFS_V2_PROC, SeafileSendfsV2ProcClass))
#define IS_SEAFILE_SENDFS_V2_PROC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), SEAFILE_TYPE_SENDFS_V2_PROC))
#define SEAFILE_SENDFS_V2_PROC_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SEAFILE_TYPE_SENDFS_V2_PROC, SeafileSendfsV2ProcClass))
typedef struct _SeafileSendfsV2Proc SeafileSendfsV2Proc;
typedef struct _SeafileSendfsV2ProcClass SeafileSendfsV2ProcClass;
struct _SeafileSendfsV2Proc {
CcnetProcessor parent_instance;
TransferTask *tx_task;
};
struct _SeafileSendfsV2ProcClass {
CcnetProcessorClass parent_class;
};
GType seafile_sendfs_v2_proc_get_type ();
#endif

View File

@ -12,6 +12,7 @@
#include "transfer-mgr.h"
#include "processors/sync-repo-proc.h"
#include "processors/getca-proc.h"
#include "processors/check-protocol-proc.h"
#include "vc-common.h"
#include "seafile-error.h"
#include "status.h"
@ -61,6 +62,9 @@ seaf_sync_manager_new (SeafileSession *seaf)
mgr->sync_interval = DEFAULT_SYNC_INTERVAL;
mgr->sync_infos = g_hash_table_new (g_str_hash, g_str_equal);
mgr->server_states = g_hash_table_new_full (g_str_hash, g_str_equal,
g_free, g_free);
return mgr;
}
@ -171,6 +175,9 @@ seaf_sync_manager_start (SeafSyncManager *mgr)
ccnet_proc_factory_register_processor (mgr->seaf->session->proc_factory,
"seafile-getca",
SEAFILE_TYPE_GETCA_PROC);
ccnet_proc_factory_register_processor (mgr->seaf->session->proc_factory,
"seafile-check-protocol",
SEAFILE_TYPE_CHECK_PROTOCOL_PROC);
g_signal_connect (seaf, "repo-fetched",
(GCallback)on_repo_fetched, mgr);
g_signal_connect (seaf, "repo-uploaded",
@ -441,6 +448,7 @@ start_upload_if_necessary (SyncTask *task)
"local",
"master",
task->token,
task->server_side_merge,
&error);
if (error != NULL) {
seaf_warning ("Failed to start upload: %s\n", error->message);
@ -465,6 +473,7 @@ start_fetch_if_necessary (SyncTask *task)
"fetch_head",
"master",
task->token,
task->server_side_merge,
&error);
if (error != NULL) {
@ -484,29 +493,6 @@ struct MergeResult {
gboolean worktree_dirty;
};
/* static int */
/* fix_dirty_worktree (SeafRepo *repo) */
/* { */
/* char *commit_id; */
/* GError *error = NULL; */
/* commit_id = seaf_repo_index_commit (repo, "", FALSE, &error); */
/* if (error != NULL) { */
/* seaf_warning ("Failed to commit unclean worktree.\n"); */
/* g_error_free (error); */
/* return -1; */
/* } */
/* g_free (commit_id); */
/* /\* After commit, the worktree should be clean. *\/ */
/* if (seaf_repo_is_worktree_changed (repo)) { */
/* seaf_warning ("Worktree is still dirty after commit.\n"); */
/* return -1; */
/* } */
/* return 0; */
/* } */
static void *
merge_job (void *vtask)
{
@ -522,15 +508,6 @@ merge_job (void *vtask)
return res;
}
/* Try to commit if worktree is not clean. */
/* if (seaf_repo_is_worktree_changed (repo) && fix_dirty_worktree (repo) < 0) { */
/* seaf_message ("[sync mgr] Worktree is not clean. Skip merging repo %s(%.8s).\n", */
/* repo->name, repo->id); */
/* res->success = FALSE; */
/* res->worktree_dirty = TRUE; */
/* return res; */
/* } */
/*
* 4 types of errors may occur:
* 1. merge conflicts;
@ -1001,6 +978,67 @@ out:
g_free (last_checkout);
}
static void
update_sync_status_v2 (SyncTask *task)
{
SyncInfo *info = task->info;
SeafRepo *repo = task->repo;
SeafBranch *master = NULL, *local = NULL;
local = seaf_branch_manager_get_branch (
seaf->branch_mgr, info->repo_id, "local");
if (!local) {
seaf_warning ("[sync-mgr] Branch local not found for repo %s(%.8s).\n",
repo->name, repo->id);
seaf_sync_manager_set_task_error (task, SYNC_ERROR_DATA_CORRUPT);
return;
}
master = seaf_branch_manager_get_branch (
seaf->branch_mgr, info->repo_id, "master");
if (!master) {
seaf_warning ("[sync-mgr] Branch master not found for repo %s(%.8s).\n",
repo->name, repo->id);
seaf_sync_manager_set_task_error (task, SYNC_ERROR_DATA_CORRUPT);
return;
}
if (info->repo_corrupted) {
seaf_sync_manager_set_task_error (task, SYNC_ERROR_REPO_CORRUPT);
} else if (info->deleted_on_relay) {
/* If repo doesn't exist on relay and we have "master",
* it was deleted on relay. In this case we remove this repo.
*/
seaf_sync_manager_set_task_error (task, SYNC_ERROR_NOREPO);
seaf_debug ("remove repo %s(%.8s) since it's deleted on relay\n",
repo->name, repo->id);
seaf_mq_manager_publish_notification (seaf->mq_mgr,
"repo.deleted_on_relay",
repo->name);
seaf_repo_manager_del_repo (seaf->repo_mgr, repo);
} else {
/* If local head is the same as remote head, already in sync. */
if (strcmp (local->commit_id, info->head_commit) == 0) {
/* As long as the repo is synced with the server. All the local
* blocks are not useful any more.
*/
if (repo_block_store_exists (repo)) {
seaf_message ("Removing blocks for repo %s(%.8s).\n",
repo->name, repo->id);
ccnet_job_manager_schedule_job (seaf->job_mgr,
remove_repo_blocks,
remove_blocks_done,
task);
} else
transition_sync_state (task, SYNC_STATE_DONE);
} else
start_fetch_if_necessary (task);
}
seaf_branch_unref (local);
seaf_branch_unref (master);
}
static void
sync_done_cb (CcnetProcessor *processor, gboolean success, void *data)
{
@ -1038,51 +1076,17 @@ sync_done_cb (CcnetProcessor *processor, gboolean success, void *data)
return;
}
update_sync_status (task);
}
static const char *
get_dest_id (SeafRepo *repo)
{
const char *dest_id;
if (repo->relay_id)
dest_id = repo->relay_id;
if (!task->server_side_merge)
update_sync_status (task);
else
return NULL;
if (!ccnet_peer_is_ready(seaf->ccnetrpc_client, dest_id))
return NULL;
return dest_id;
update_sync_status_v2 (task);
}
static int
check_net_state (void *data);
static int
start_sync_repo_proc (SeafSyncManager *manager, SyncTask *task)
{
CcnetProcessor *processor;
/* Set dest id before we talk to the dest.
* If it's a manual sync, it should have been set.
*/
if (!task->dest_id) {
if (!task->repo->relay_id) {
seaf_sync_manager_set_task_error (task, SYNC_ERROR_RELAY_OFFLINE);
return -1;
}
task->dest_id = g_strdup(task->repo->relay_id);
}
/* If relay is not ready, wait until it is. */
if (!ccnet_peer_is_ready (seaf->ccnetrpc_client, task->dest_id)) {
seaf_message ("[sync-mgr] Relay for %s is not ready, wait.\n",
task->repo->name);
task->conn_timer = ccnet_timer_new (check_net_state, task, 1000);
return 0;
}
processor = ccnet_proc_factory_create_remote_master_processor (
seaf->session->proc_factory, "seafile-sync-repo", task->dest_id);
if (!processor) {
@ -1105,20 +1109,6 @@ start_sync_repo_proc (SeafSyncManager *manager, SyncTask *task)
return 0;
}
static int
check_net_state (void *data)
{
SyncTask *task = data;
if (ccnet_peer_is_ready (seaf->ccnetrpc_client, task->dest_id)) {
ccnet_timer_free (&task->conn_timer);
start_sync_repo_proc (task->mgr, task);
return 0;
}
return 1;
}
struct CommitResult {
SyncTask *task;
gboolean changed;
@ -1160,6 +1150,7 @@ commit_job_done (void *vres)
{
struct CommitResult *res = vres;
SeafRepo *repo = res->task->repo;
SyncTask *task = res->task;
res->task->mgr->commit_job_running = FALSE;
@ -1182,24 +1173,24 @@ commit_job_done (void *vres)
return;
}
/* If this repo is downloaded by syncing with an existing folder, and
* the folder's contents are different from the server, clone manager
* will create a "index" branch. This branch is of no use after the
* first commit operation succeeds.
*/
if (seaf_branch_manager_branch_exists (seaf->branch_mgr, repo->id, "index"))
seaf_branch_manager_del_branch (seaf->branch_mgr, repo->id, "index");
/* If nothing committed and is not manual sync, no need to sync. */
if (!res->changed &&
!res->task->is_manual_sync && !res->task->is_initial_commit) {
transition_sync_state (res->task, SYNC_STATE_DONE);
g_free (res);
return;
if (!res->task->server_side_merge) {
/* If nothing committed and is not manual sync, no need to sync. */
if (!res->changed &&
!res->task->is_manual_sync && !res->task->is_initial_commit) {
transition_sync_state (res->task, SYNC_STATE_DONE);
g_free (res);
return;
}
start_sync_repo_proc (res->task->mgr, res->task);
} else {
if (res->changed)
start_upload_if_necessary (res->task);
else if (task->is_manual_sync || task->is_initial_commit)
start_sync_repo_proc (task->mgr, task);
else
transition_sync_state (task, SYNC_STATE_DONE);
}
start_sync_repo_proc (res->task->mgr, res->task);
g_free (res);
}
@ -1253,7 +1244,7 @@ start_sync (SeafSyncManager *manager, SeafRepo *repo,
task->info = info;
task->mgr = manager;
/* dest_id will be set later. */
task->dest_id = g_strdup(repo->relay_id);
task->token = g_strdup(repo->token);
task->is_manual_sync = is_manual_sync;
task->is_initial_commit = is_initial_commit;
@ -1292,6 +1283,7 @@ sync_repo (SeafSyncManager *manager, SeafRepo *repo)
/* Force commit and sync after a new repo is added. */
start_sync (manager, repo, TRUE, FALSE, TRUE);
status->last_check = now;
wt_status_unref (status);
return 0;
} else if (last_changed != 0 && status->last_check <= last_changed) {
/* Commit and sync if the repo has been updated after the
@ -1300,6 +1292,7 @@ sync_repo (SeafSyncManager *manager, SeafRepo *repo)
if (now - last_changed >= 2) {
start_sync (manager, repo, TRUE, FALSE, FALSE);
status->last_check = now;
wt_status_unref (status);
return 0;
}
}
@ -1317,6 +1310,129 @@ sync_repo (SeafSyncManager *manager, SeafRepo *repo)
return 0;
}
static SyncTask *
create_sync_task_v2 (SeafSyncManager *manager, SeafRepo *repo,
gboolean is_manual_sync, gboolean is_initial_commit)
{
SyncTask *task = g_new0 (SyncTask, 1);
SyncInfo *info;
info = get_sync_info (manager, repo->id);
task->info = info;
task->mgr = manager;
task->dest_id = g_strdup (repo->relay_id);
task->token = g_strdup(repo->token);
task->is_manual_sync = is_manual_sync;
task->is_initial_commit = is_initial_commit;
task->server_side_merge = TRUE;
repo->last_sync_time = time(NULL);
++(manager->n_running_tasks);
/* Free the last task when a new task is started.
* This way we can always get the state of the last task even
* after it's done.
*/
if (task->info->current_task)
sync_task_free (task->info->current_task);
task->info->current_task = task;
task->info->in_sync = TRUE;
task->repo = repo;
return task;
}
static gboolean
create_commit_from_event_queue (SeafSyncManager *manager, SeafRepo *repo,
gboolean is_manual_sync)
{
WTStatus *status;
SyncTask *task;
gboolean ret = FALSE;
gint now = (gint)time(NULL);
gint last_changed;
status = seaf_wt_monitor_get_worktree_status (manager->seaf->wt_monitor,
repo->id);
if (status) {
last_changed = g_atomic_int_get (&status->last_changed);
if (status->last_check == 0) {
/* Force commit and sync after a new repo is added. */
task = create_sync_task_v2 (manager, repo, is_manual_sync, TRUE);
commit_repo (task);
status->last_check = now;
ret = TRUE;
} else if (last_changed != 0 && status->last_check <= last_changed) {
/* Commit and sync if the repo has been updated after the
* last check and is not updated for the last 2 seconds.
*/
if (now - last_changed >= 2) {
task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
commit_repo (task);
status->last_check = now;
ret = TRUE;
}
} else if (status->last_event != NULL) {
task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
commit_repo (task);
ret = TRUE;
}
wt_status_unref (status);
}
return ret;
}
static int
sync_repo_v2 (SeafSyncManager *manager, SeafRepo *repo, gboolean is_manual_sync)
{
SeafBranch *master, *local;
int now = (int)time(NULL);
SyncTask *task;
int ret = 0;
master = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "master");
if (!master) {
seaf_warning ("No master branch found for repo %s(%.8s).\n",
repo->name, repo->id);
return -1;
}
local = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "local");
if (!local) {
seaf_warning ("No local branch found for repo %s(%.8s).\n",
repo->name, repo->id);
return -1;
}
if (strcmp (master->commit_id, local->commit_id) != 0) {
if ((repo->last_sync_time == 0 ||
repo->last_sync_time < now - manager->sync_interval) &&
manager->n_running_tasks < MAX_RUNNING_SYNC_TASKS) {
task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
start_upload_if_necessary (task);
}
/* Do nothing if the client still has something to upload
* but it's before 30-second schedule.
*/
goto out;
} else if (create_commit_from_event_queue (manager, repo, is_manual_sync))
goto out;
if ((repo->last_sync_time == 0 ||
repo->last_sync_time > now - manager->sync_interval) &&
manager->n_running_tasks < MAX_RUNNING_SYNC_TASKS) {
task = create_sync_task_v2 (manager, repo, is_manual_sync, FALSE);
start_sync_repo_proc (manager, task);
}
out:
seaf_branch_unref (master);
seaf_branch_unref (local);
return ret;
}
static void
auto_delete_repo (SeafSyncManager *manager, SeafRepo *repo)
{
@ -1338,6 +1454,125 @@ auto_delete_repo (SeafSyncManager *manager, SeafRepo *repo)
g_free (name);
}
static void
check_protocol_done_cb (CcnetProcessor *processor, gboolean success, void *data)
{
ServerState *state = data;
state->checking = FALSE;
if (success)
state->server_side_merge = SERVER_SIDE_MERGE_SUPPORTED;
else if (processor->failure == PROC_NO_SERVICE)
/* Talking to an old server. */
state->server_side_merge = SERVER_SIDE_MERGE_UNSUPPORTED;
}
static int
start_check_protocol_proc (SeafSyncManager *manager,
const char *peer_id, ServerState *state)
{
CcnetProcessor *processor;
processor = ccnet_proc_factory_create_remote_master_processor (
seaf->session->proc_factory, "seafile-check-protocol", peer_id);
if (!processor) {
seaf_warning ("[sync-mgr] failed to create get seafile-check-protocol proc.\n");
return -1;
}
if (ccnet_processor_startl (processor, NULL) < 0) {
seaf_warning ("[sync-mgr] failed to start seafile-check-protocol proc.\n");
return -1;
}
g_signal_connect (processor, "done", (GCallback)check_protocol_done_cb, state);
return 0;
}
static gboolean
check_relay_status (SeafSyncManager *mgr, SeafRepo *repo)
{
gboolean is_ready = ccnet_peer_is_ready (seaf->ccnetrpc_client, repo->relay_id);
ServerState *state = g_hash_table_lookup (mgr->server_states, repo->relay_id);
if (!state) {
state = g_new0 (ServerState, 1);
g_hash_table_insert (mgr->server_states, g_strdup(repo->relay_id), state);
}
if (is_ready) {
if (state->server_side_merge == SERVER_SIDE_MERGE_UNKNOWN) {
if (!state->checking) {
start_check_protocol_proc (mgr, repo->relay_id, state);
state->checking = TRUE;
}
return FALSE;
} else
return TRUE;
} else {
if (state->server_side_merge == SERVER_SIDE_MERGE_UNKNOWN)
return FALSE;
else {
/* Reset protocol_version to unknown so that we'll check it
* after the server is up again. */
state->server_side_merge = SERVER_SIDE_MERGE_UNKNOWN;
return FALSE;
}
}
}
/*
* If the user upgarde from 3.0.x, there may be more than one commit to upload
* on the local branch. The new syncing protocol can't handle more than one
* commit. So if we detect this case, fall back to old protocol.
* After the repo is synced this time, we can use new protocol in the future.
*/
static gboolean
has_old_commits_to_upload (SeafRepo *repo)
{
SeafBranch *master = NULL, *local = NULL;
SeafCommit *head = NULL;
gboolean ret = TRUE;
master = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "master");
if (!master) {
seaf_warning ("No master branch found for repo %s(%.8s).\n",
repo->name, repo->id);
goto out;
}
local = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "local");
if (!local) {
seaf_warning ("No local branch found for repo %s(%.8s).\n",
repo->name, repo->id);
goto out;
}
if (strcmp (local->commit_id, master->commit_id) == 0) {
ret = FALSE;
goto out;
}
head = seaf_commit_manager_get_commit (seaf->commit_mgr,
repo->id, repo->version,
local->commit_id);
if (!head) {
seaf_warning ("Failed to get head commit of repo %s(%.8s).\n",
repo->name, repo->id);
goto out;
}
if (head->second_parent_id == NULL &&
g_strcmp0 (head->parent_id, master->commit_id) == 0)
ret = FALSE;
out:
seaf_branch_unref (master);
seaf_branch_unref (local);
seaf_commit_unref (head);
return ret;
}
gint
cmp_repos_by_sync_time (gconstpointer a, gconstpointer b, gpointer user_data)
{
@ -1404,8 +1639,10 @@ auto_sync_pulse (void *vmanager)
if (!manager->priv->auto_sync_enabled || !repo->auto_sync)
continue;
/* Don't sync repos without a relay-id */
if (!repo->relay_id)
/* If relay is not ready or protocol version is not determined,
* need to wait.
*/
if (!check_relay_status (manager, repo))
continue;
SyncInfo *info = get_sync_info (manager, repo->id);
@ -1413,18 +1650,15 @@ auto_sync_pulse (void *vmanager)
if (info->in_sync)
continue;
const char *dest_id = get_dest_id (repo);
if (!dest_id)
continue;
ServerState *state = g_hash_table_lookup (manager->server_states,
repo->relay_id);
CcnetPeer *peer = ccnet_get_peer (seaf->ccnetrpc_client, dest_id);
if (!peer->session_key) {
g_object_unref (peer);
continue;
}
g_object_unref (peer);
sync_repo (manager, repo);
if (repo->version == 0 ||
state->server_side_merge == SERVER_SIDE_MERGE_UNSUPPORTED ||
has_old_commits_to_upload (repo))
sync_repo (manager, repo);
else if (state->server_side_merge == SERVER_SIDE_MERGE_SUPPORTED)
sync_repo_v2 (manager, repo, FALSE);
}
g_list_free (repos);
@ -1462,7 +1696,9 @@ on_repo_fetched (SeafileSession *seaf,
if (tx_task->state == TASK_STATE_FINISHED) {
memcpy (info->head_commit, tx_task->head, 41);
merge_branches_if_necessary (task);
if (!task->server_side_merge)
merge_branches_if_necessary (task);
} else if (tx_task->state == TASK_STATE_CANCELED) {
transition_sync_state (task, SYNC_STATE_CANCELED);
} else if (tx_task->state == TASK_STATE_ERROR) {
@ -1501,7 +1737,10 @@ on_repo_uploaded (SeafileSession *seaf,
task->repo->id,
REPO_LOCAL_HEAD,
task->repo->head->commit_id);
transition_sync_state (task, SYNC_STATE_DONE);
if (!task->server_side_merge)
transition_sync_state (task, SYNC_STATE_DONE);
else
start_sync_repo_proc (manager, task);
} else if (tx_task->state == TASK_STATE_CANCELED) {
transition_sync_state (task, SYNC_STATE_CANCELED);
} else if (tx_task->state == TASK_STATE_ERROR) {
@ -1576,6 +1815,7 @@ seaf_sync_manager_disable_auto_sync (SeafSyncManager *mgr)
return 0;
}
#if 0
static void
add_sync_tasks_for_all (SeafSyncManager *mgr)
{
@ -1596,6 +1836,7 @@ add_sync_tasks_for_all (SeafSyncManager *mgr)
g_list_free (repos);
}
#endif
int
seaf_sync_manager_enable_auto_sync (SeafSyncManager *mgr)
@ -1605,7 +1846,7 @@ seaf_sync_manager_enable_auto_sync (SeafSyncManager *mgr)
return -1;
}
add_sync_tasks_for_all (mgr);
/* add_sync_tasks_for_all (mgr); */
mgr->priv->auto_sync_enabled = TRUE;
g_debug ("[sync mgr] auto sync is enabled\n");
return 0;

View File

@ -60,8 +60,8 @@ enum {
SYNC_ERROR_COMMIT,
SYNC_ERROR_MERGE,
SYNC_ERROR_WORKTREE_DIRTY,
SYNC_ERROR_UNKNOWN,
SYNC_ERROR_DEPRECATED_SERVER,
SYNC_ERROR_UNKNOWN,
SYNC_ERROR_NUM,
};
@ -76,11 +76,25 @@ struct _SyncTask {
char *tx_id;
char *token;
struct CcnetTimer *commit_timer;
struct CcnetTimer *conn_timer;
gboolean server_side_merge;
SeafRepo *repo; /* for convenience, only valid when in_sync. */
};
enum {
SERVER_SIDE_MERGE_UNKNOWN = 0,
SERVER_SIDE_MERGE_SUPPORTED,
SERVER_SIDE_MERGE_UNSUPPORTED,
};
struct _ServerState {
int server_side_merge;
gboolean checking;
};
typedef struct _ServerState ServerState;
struct _SeafileSession;
struct _SeafSyncManager {
@ -91,6 +105,8 @@ struct _SeafSyncManager {
gboolean commit_job_running;
int sync_interval;
GHashTable *server_states;
SeafSyncManagerPriv *priv;
};

File diff suppressed because it is too large Load Diff

View File

@ -124,6 +124,7 @@ struct _TransferTask {
char *token;
char *session_token;
int protocol_version;
gboolean server_side_merge;
char *from_branch;
char *to_branch;
char head[41];
@ -221,6 +222,7 @@ seaf_transfer_manager_add_download (SeafTransferManager *manager,
const char *from_branch,
const char *to_branch,
const char *token,
gboolean server_side_merge,
GError **error);
char *
@ -231,6 +233,7 @@ seaf_transfer_manager_add_upload (SeafTransferManager *manager,
const char *from_branch,
const char *to_branch,
const char *token,
gboolean server_side_merge,
GError **error);
GList*

View File

@ -28,6 +28,11 @@ typedef struct WTStatus {
gint last_check;
gint last_changed;
/* If last_event is non-NULL, the last commit is partial.
* We need to produce another commit from the remaining events.
*/
WTEvent *last_event;
pthread_mutex_t q_lock;
GQueue *event_q;
} WTStatus;

View File

@ -32,7 +32,10 @@ proc_headers = $(addprefix processors/, \
putcs-v2-proc.h \
checkbl-proc.h \
checkff-proc.h \
putca-proc.h)
putca-proc.h \
check-protocol-slave-proc.h \
recvfs-v2-proc.h \
recvbranch-v2-proc.h)
noinst_HEADERS = web-accesstoken-mgr.h chunkserv-mgr.h seafile-session.h \
repo-mgr.h \
@ -95,7 +98,10 @@ seaf_server_SOURCES = \
processors/putcs-v2-proc.c \
processors/checkbl-proc.c \
processors/checkff-proc.c \
processors/putca-proc.c
processors/putca-proc.c \
processors/check-protocol-slave-proc.c \
processors/recvfs-v2-proc.c \
processors/recvbranch-v2-proc.c
seaf_server_LDADD = @CCNET_LIBS@ \
$(top_builddir)/lib/libseafile_common.la \

View File

@ -0,0 +1,37 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#include "common.h"
#include "check-protocol-slave-proc.h"
G_DEFINE_TYPE (SeafileCheckProtocolSlaveProc, seafile_check_protocol_slave_proc, CCNET_TYPE_PROCESSOR)
static int
check_protocol_slave_start (CcnetProcessor *processor, int argc, char **argv);
static void
seafile_check_protocol_slave_proc_class_init (SeafileCheckProtocolSlaveProcClass *klass)
{
CcnetProcessorClass *proc_class = CCNET_PROCESSOR_CLASS (klass);
proc_class->name = "seafile-check-protocol-slave-proc";
proc_class->start = check_protocol_slave_start;
}
static void
seafile_check_protocol_slave_proc_init (SeafileCheckProtocolSlaveProc *processor)
{
}
static int
check_protocol_slave_start (CcnetProcessor *processor, int argc, char **argv)
{
int n;
char buf[10];
n = snprintf (buf, sizeof(buf), "%d", CURRENT_PROTO_VERSION);
ccnet_processor_send_response (processor, SC_OK, SS_OK, buf, n+1);
ccnet_processor_done (processor, TRUE);
return 0;
}

View File

@ -0,0 +1,29 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#ifndef SEAFILE_CHECK_PROTOCOL_SLAVE_PROC_H
#define SEAFILE_CHECK_PROTOCOL_SLAVE_PROC_H
#include <glib-object.h>
#include <ccnet.h>
#define SEAFILE_TYPE_CHECK_PROTOCOL_SLAVE_PROC (seafile_check_protocol_slave_proc_get_type ())
#define SEAFILE_CHECK_PROTOCOL_SLAVE_PROC(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SEAFILE_TYPE_CHECK_PROTOCOL_SLAVE_PROC, SeafileCheckProtocolSlaveProc))
#define SEAFILE_IS_CHECK_PROTOCOL_SLAVE_PROC(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SEAFILE_TYPE_CHECK_PROTOCOL_SLAVE_PROC))
#define SEAFILE_CHECK_PROTOCOL_SLAVE_PROC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SEAFILE_TYPE_CHECK_PROTOCOL_SLAVE_PROC, SeafileCheckProtocolSlaveProcClass))
#define IS_SEAFILE_CHECK_PROTOCOL_SLAVE_PROC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), SEAFILE_TYPE_CHECK_PROTOCOL_SLAVE_PROC))
#define SEAFILE_CHECK_PROTOCOL_SLAVE_PROC_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SEAFILE_TYPE_CHECK_PROTOCOL_SLAVE_PROC, SeafileCheckProtocolSlaveProcClass))
typedef struct _SeafileCheckProtocolSlaveProc SeafileCheckProtocolSlaveProc;
typedef struct _SeafileCheckProtocolSlaveProcClass SeafileCheckProtocolSlaveProcClass;
struct _SeafileCheckProtocolSlaveProc {
CcnetProcessor parent_instance;
};
struct _SeafileCheckProtocolSlaveProcClass {
CcnetProcessorClass parent_class;
};
GType seafile_check_protocol_slave_proc_get_type ();
#endif

View File

@ -0,0 +1,377 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#include <ccnet.h>
#include <string.h>
#include <ccnet/ccnet-object.h>
#include "seafile-session.h"
#include "recvbranch-v2-proc.h"
#include "vc-common.h"
#include "monitor-rpc-wrappers.h"
#include "merge-new.h"
#include "diff-simple.h"
#include "log.h"
#define SC_BAD_COMMIT "401"
#define SS_BAD_COMMIT "Commit does not exist"
#define SC_NOT_FF "402"
#define SS_NOT_FF "Not fast forward"
#define SC_QUOTA_ERROR "403"
#define SS_QUOTA_ERROR "Failed to get quota"
#define SC_QUOTA_FULL "404"
#define SS_QUOTA_FULL "storage for the repo's owner is full"
#define SC_SERVER_ERROR "405"
#define SS_SERVER_ERROR "Internal server error"
#define SC_BAD_REPO "406"
#define SS_BAD_REPO "Repo does not exist"
#define SC_BAD_BRANCH "407"
#define SS_BAD_BRANCH "Branch does not exist"
#define SC_ACCESS_DENIED "410"
#define SS_ACCESS_DENIED "Access denied"
typedef struct {
char repo_id[37];
char *branch_name;
char new_head[41];
char *rsp_code;
char *rsp_msg;
} SeafileRecvbranchProcPriv;
G_DEFINE_TYPE (SeafileRecvbranchV2Proc, seafile_recvbranch_v2_proc, CCNET_TYPE_PROCESSOR)
#define GET_PRIV(o) \
(G_TYPE_INSTANCE_GET_PRIVATE ((o), SEAFILE_TYPE_RECVBRANCH_V2_PROC, SeafileRecvbranchProcPriv))
#define USE_PRIV \
SeafileRecvbranchProcPriv *priv = GET_PRIV(processor);
static int start (CcnetProcessor *processor, int argc, char **argv);
static void handle_update (CcnetProcessor *processor,
char *code, char *code_msg,
char *content, int clen);
static void *update_repo (void *vprocessor);
static void thread_done (void *result);
static void
release_resource(CcnetProcessor *processor)
{
USE_PRIV;
g_free (priv->branch_name);
g_free (priv->rsp_code);
g_free (priv->rsp_msg);
CCNET_PROCESSOR_CLASS (seafile_recvbranch_v2_proc_parent_class)->release_resource (processor);
}
static void
seafile_recvbranch_v2_proc_class_init (SeafileRecvbranchV2ProcClass *klass)
{
CcnetProcessorClass *proc_class = CCNET_PROCESSOR_CLASS (klass);
proc_class->name = "recvbranch-v2-proc";
proc_class->start = start;
proc_class->handle_update = handle_update;
proc_class->release_resource = release_resource;
g_type_class_add_private (klass, sizeof (SeafileRecvbranchProcPriv));
}
static void
seafile_recvbranch_v2_proc_init (SeafileRecvbranchV2Proc *processor)
{
}
static int
start (CcnetProcessor *processor, int argc, char **argv)
{
USE_PRIV;
char *session_token;
if (argc != 4) {
ccnet_processor_send_response (processor, SC_BAD_ARGS, SS_BAD_ARGS, NULL, 0);
ccnet_processor_done (processor, FALSE);
return -1;
}
if (!is_uuid_valid(argv[0]) || strlen(argv[2]) != 40) {
ccnet_processor_send_response (processor, SC_BAD_ARGS, SS_BAD_ARGS, NULL, 0);
ccnet_processor_done (processor, FALSE);
return -1;
}
memcpy (priv->repo_id, argv[0], 36);
memcpy (priv->new_head, argv[2], 40);
priv->branch_name = g_strdup(argv[1]);
session_token = argv[3];
if (seaf_token_manager_verify_token (seaf->token_mgr,
NULL,
processor->peer_id,
session_token, NULL) < 0) {
ccnet_processor_send_response (processor,
SC_ACCESS_DENIED, SS_ACCESS_DENIED,
NULL, 0);
ccnet_processor_done (processor, FALSE);
return -1;
}
ccnet_processor_thread_create (processor,
seaf->job_mgr,
update_repo,
thread_done,
processor);
return 0;
}
static void
handle_update (CcnetProcessor *processor,
char *code, char *code_msg,
char *content, int clen)
{
}
static char *
gen_merge_description (SeafRepo *repo,
const char *merged_root,
const char *p1_root,
const char *p2_root)
{
GList *p;
GList *results = NULL;
char *desc;
diff_merge_roots (repo->store_id, repo->version,
merged_root, p1_root, p2_root, &results, TRUE);
desc = diff_results_to_description (results);
for (p = results; p; p = p->next) {
DiffEntry *de = p->data;
diff_entry_free (de);
}
g_list_free (results);
return desc;
}
static int
fast_forward_or_merge (const char *repo_id,
SeafCommit *base,
SeafCommit *new_commit)
{
#define MAX_RETRY_COUNT 3
SeafRepo *repo = NULL;
SeafCommit *current_head = NULL, *merged_commit = NULL;
int retry_cnt = 0;
int ret = 0;
repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id);
if (!repo) {
seaf_warning ("Repo %s doesn't exist.\n", repo_id);
ret = -1;
goto out;
}
retry:
current_head = seaf_commit_manager_get_commit (seaf->commit_mgr,
repo->id, repo->version,
repo->head->commit_id);
if (!current_head) {
seaf_warning ("Failed to find head commit of %s.\n", repo_id);
ret = -1;
goto out;
}
/* Merge if base and head are not the same. */
if (strcmp (base->commit_id, current_head->commit_id) != 0) {
MergeOptions opt;
const char *roots[3];
char *desc = NULL;
memset (&opt, 0, sizeof(opt));
opt.n_ways = 3;
memcpy (opt.remote_repo_id, repo_id, 36);
memcpy (opt.remote_head, new_commit->commit_id, 40);
opt.do_merge = TRUE;
roots[0] = base->root_id; /* base */
roots[1] = current_head->root_id; /* head */
roots[2] = new_commit->root_id; /* remote */
if (seaf_merge_trees (repo->store_id, repo->version, 3, roots, &opt) < 0) {
seaf_warning ("Failed to merge.\n");
ret = -1;
goto out;
}
if (!opt.conflict)
desc = g_strdup("Auto merge by system");
else {
desc = gen_merge_description (repo,
opt.merged_tree_root,
current_head->root_id,
new_commit->root_id);
if (!desc)
desc = g_strdup("Auto merge by system");
}
merged_commit = seaf_commit_new(NULL, repo->id, opt.merged_tree_root,
new_commit->creator_name, EMPTY_SHA1,
desc,
0);
g_free (desc);
merged_commit->parent_id = g_strdup (current_head->commit_id);
merged_commit->second_parent_id = g_strdup (new_commit->commit_id);
merged_commit->new_merge = TRUE;
if (opt.conflict)
merged_commit->conflict = TRUE;
seaf_repo_to_commit (repo, merged_commit);
if (seaf_commit_manager_add_commit (seaf->commit_mgr, merged_commit) < 0) {
seaf_warning ("Failed to add commit.\n");
ret = -1;
goto out;
}
} else {
seaf_commit_ref (new_commit);
merged_commit = new_commit;
}
seaf_branch_set_commit(repo->head, merged_commit->commit_id);
if (seaf_branch_manager_test_and_update_branch(seaf->branch_mgr,
repo->head,
current_head->commit_id) < 0)
{
seaf_repo_unref (repo);
repo = NULL;
seaf_commit_unref (current_head);
current_head = NULL;
seaf_commit_unref (merged_commit);
merged_commit = NULL;
repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id);
if (!repo) {
seaf_warning ("Repo %s doesn't exist.\n", repo_id);
ret = -1;
goto out;
}
if (++retry_cnt <= MAX_RETRY_COUNT) {
seaf_message ("Concurrent branch update, retry.\n");
/* Sleep random time between 100 and 1000 millisecs. */
usleep (g_random_int_range(1, 11) * 100 * 1000);
goto retry;
} else {
seaf_warning ("Stop retrying.\n");
ret = -1;
goto out;
}
}
out:
seaf_commit_unref (current_head);
seaf_commit_unref (merged_commit);
seaf_repo_unref (repo);
return ret;
}
static void *
update_repo (void *vprocessor)
{
CcnetProcessor *processor = vprocessor;
USE_PRIV;
char *repo_id, *new_head;
SeafRepo *repo = NULL;
SeafCommit *new_commit = NULL, *base = NULL;
repo_id = priv->repo_id;
new_head = priv->new_head;
repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id);
if (!repo) {
/* repo is deleted on server */
priv->rsp_code = g_strdup (SC_BAD_REPO);
priv->rsp_msg = g_strdup (SC_BAD_REPO);
goto out;
}
/* Since this is the last step of upload procedure, commit should exist. */
new_commit = seaf_commit_manager_get_commit (seaf->commit_mgr,
repo->id, repo->version,
new_head);
if (!new_commit) {
seaf_warning ("Failed to get commit %s for repo %s.\n",
new_head, repo->id);
priv->rsp_code = g_strdup (SC_BAD_COMMIT);
priv->rsp_msg = g_strdup (SS_BAD_COMMIT);
goto out;
}
base = seaf_commit_manager_get_commit (seaf->commit_mgr,
repo->id, repo->version,
new_commit->parent_id);
if (!base) {
seaf_warning ("Failed to get commit %s for repo %s.\n",
new_commit->parent_id, repo->id);
priv->rsp_code = g_strdup (SC_BAD_COMMIT);
priv->rsp_msg = g_strdup (SS_BAD_COMMIT);
goto out;
}
if (seaf_quota_manager_check_quota (seaf->quota_mgr, repo_id) < 0) {
priv->rsp_code = g_strdup(SC_QUOTA_FULL);
priv->rsp_msg = g_strdup(SS_QUOTA_FULL);
goto out;
}
if (fast_forward_or_merge (repo_id, base, new_commit) < 0) {
priv->rsp_code = g_strdup(SC_SERVER_ERROR);
priv->rsp_msg = g_strdup(SS_SERVER_ERROR);
goto out;
}
seaf_repo_manager_cleanup_virtual_repos (seaf->repo_mgr, repo_id);
seaf_repo_manager_merge_virtual_repo (seaf->repo_mgr, repo_id, NULL);
out:
seaf_repo_unref (repo);
seaf_commit_unref (new_commit);
seaf_commit_unref (base);
if (!priv->rsp_code) {
priv->rsp_code = g_strdup (SC_OK);
priv->rsp_msg = g_strdup (SS_OK);
}
return vprocessor;
}
static void
thread_done (void *result)
{
CcnetProcessor *processor = result;
USE_PRIV;
if (strcmp (priv->rsp_code, SC_OK) == 0) {
/* Repo is updated, schedule repo size computation. */
schedule_repo_size_computation (seaf->size_sched, priv->repo_id);
ccnet_processor_send_response (processor, SC_OK, SS_OK, NULL, 0);
ccnet_processor_done (processor, TRUE);
} else {
ccnet_processor_send_response (processor,
priv->rsp_code, priv->rsp_msg,
NULL, 0);
ccnet_processor_done (processor, FALSE);
}
}

View File

@ -0,0 +1,30 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#ifndef SEAFILE_RECVBRANCH_V2_PROC_H
#define SEAFILE_RECVBRANCH_V2_PROC_H
#include <glib-object.h>
#define SEAFILE_TYPE_RECVBRANCH_V2_PROC (seafile_recvbranch_v2_proc_get_type ())
#define SEAFILE_RECVBRANCH_V2_PROC(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SEAFILE_TYPE_RECVBRANCH_V2_PROC, SeafileRecvbranchV2Proc))
#define SEAFILE_IS_RECVBRANCH_V2_PROC(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SEAFILE_TYPE_RECVBRANCH_V2_PROC))
#define SEAFILE_RECVBRANCH_V2_PROC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SEAFILE_TYPE_RECVBRANCH_V2_PROC, SeafileRecvbranchV2ProcClass))
#define IS_SEAFILE_RECVBRANCH_V2_PROC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), SEAFILE_TYPE_RECVBRANCH_V2_PROC))
#define SEAFILE_RECVBRANCH_V2_PROC_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SEAFILE_TYPE_RECVBRANCH_V2_PROC, SeafileRecvbranchV2ProcClass))
typedef struct _SeafileRecvbranchV2Proc SeafileRecvbranchV2Proc;
typedef struct _SeafileRecvbranchV2ProcClass SeafileRecvbranchV2ProcClass;
struct _SeafileRecvbranchV2Proc {
CcnetProcessor parent_instance;
};
struct _SeafileRecvbranchV2ProcClass {
CcnetProcessorClass parent_class;
};
GType seafile_recvbranch_v2_proc_get_type ();
#endif

View File

@ -600,7 +600,7 @@ handle_update (CcnetProcessor *processor,
(TimerCB)check_end_condition, processor, CHECK_INTERVAL);
processor->state = FETCH_OBJECT;
} else {
g_warning ("Bad response: %s %s\n", code, code_msg);
g_warning ("Bad update: %s %s\n", code, code_msg);
ccnet_processor_send_response (processor,
SC_BAD_UPDATE_CODE, SS_BAD_UPDATE_CODE,
NULL, 0);
@ -617,7 +617,7 @@ handle_update (CcnetProcessor *processor,
} else if (strncmp(code, SC_OBJECT, 3) == 0) {
recv_fs_object (processor, content, clen);
} else {
g_warning ("Bad response: %s %s\n", code, code_msg);
g_warning ("Bad update: %s %s\n", code, code_msg);
ccnet_processor_send_response (processor,
SC_BAD_UPDATE_CODE, SS_BAD_UPDATE_CODE,
NULL, 0);

View File

@ -0,0 +1,416 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#include "common.h"
#define DEBUG_FLAG SEAFILE_DEBUG_TRANSFER
#include "log.h"
#include <fcntl.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <ccnet.h>
#include "utils.h"
#include "seafile-session.h"
#include "fs-mgr.h"
#include "processors/objecttx-common.h"
#include "recvfs-v2-proc.h"
#include "seaf-utils.h"
enum {
CHECK_OBJECT_LIST = 0,
RECV_OBJECTS,
};
typedef struct {
char *obj_seg;
int obj_seg_len;
gboolean registered;
guint32 writer_id;
/* Used for getting repo info */
char repo_id[37];
char store_id[37];
int repo_version;
gboolean success;
/* Used to check object list */
char *recv_objs;
int recv_len;
GList *needed_objs;
int n_needed;
int total_needed;
int n_saved;
} SeafileRecvfsProcPriv;
#define GET_PRIV(o) \
(G_TYPE_INSTANCE_GET_PRIVATE ((o), SEAFILE_TYPE_RECVFS_V2_PROC, SeafileRecvfsProcPriv))
#define USE_PRIV \
SeafileRecvfsProcPriv *priv = GET_PRIV(processor);
G_DEFINE_TYPE (SeafileRecvfsV2Proc, seafile_recvfs_v2_proc, CCNET_TYPE_PROCESSOR)
static int start (CcnetProcessor *processor, int argc, char **argv);
static void handle_update (CcnetProcessor *processor,
char *code, char *code_msg,
char *content, int clen);
static void
release_resource(CcnetProcessor *processor)
{
USE_PRIV;
g_free (priv->obj_seg);
if (priv->registered) {
seaf_obj_store_unregister_async_write (seaf->fs_mgr->obj_store,
priv->writer_id);
}
string_list_free (priv->needed_objs);
CCNET_PROCESSOR_CLASS (seafile_recvfs_v2_proc_parent_class)->release_resource (processor);
}
static void
seafile_recvfs_v2_proc_class_init (SeafileRecvfsV2ProcClass *klass)
{
CcnetProcessorClass *proc_class = CCNET_PROCESSOR_CLASS (klass);
proc_class->name = "recvfs-v2-proc";
proc_class->start = start;
proc_class->handle_update = handle_update;
proc_class->release_resource = release_resource;
g_type_class_add_private (klass, sizeof (SeafileRecvfsProcPriv));
}
static void
seafile_recvfs_v2_proc_init (SeafileRecvfsV2Proc *processor)
{
}
static void
on_fs_write (OSAsyncResult *res, void *cb_data);
static void
register_async_io (CcnetProcessor *processor)
{
USE_PRIV;
priv->registered = TRUE;
priv->writer_id = seaf_obj_store_register_async_write (seaf->fs_mgr->obj_store,
priv->store_id,
priv->repo_version,
on_fs_write,
processor);
}
static void *
get_repo_info_thread (void *data)
{
CcnetProcessor *processor = data;
USE_PRIV;
SeafRepo *repo;
repo = seaf_repo_manager_get_repo (seaf->repo_mgr, priv->repo_id);
if (!repo) {
seaf_warning ("Failed to get repo %s.\n", priv->repo_id);
priv->success = FALSE;
return data;
}
memcpy (priv->store_id, repo->store_id, 36);
priv->repo_version = repo->version;
priv->success = TRUE;
seaf_repo_unref (repo);
return data;
}
static void
get_repo_info_done (void *data)
{
CcnetProcessor *processor = data;
USE_PRIV;
if (priv->success) {
ccnet_processor_send_response (processor, SC_OK, SS_OK, NULL, 0);
register_async_io (processor);
} else {
ccnet_processor_send_response (processor, SC_SHUTDOWN, SS_SHUTDOWN,
NULL, 0);
ccnet_processor_done (processor, FALSE);
}
}
static int
start (CcnetProcessor *processor, int argc, char **argv)
{
char *session_token;
USE_PRIV;
if (argc != 1) {
ccnet_processor_send_response (processor, SC_BAD_ARGS, SS_BAD_ARGS, NULL, 0);
ccnet_processor_done (processor, FALSE);
return -1;
}
session_token = argv[0];
if (seaf_token_manager_verify_token (seaf->token_mgr,
NULL,
processor->peer_id,
session_token, priv->repo_id) == 0) {
ccnet_processor_thread_create (processor,
seaf->job_mgr,
get_repo_info_thread,
get_repo_info_done,
processor);
return 0;
} else {
ccnet_processor_send_response (processor,
SC_ACCESS_DENIED, SS_ACCESS_DENIED,
NULL, 0);
ccnet_processor_done (processor, FALSE);
return -1;
}
}
static void
on_fs_write (OSAsyncResult *res, void *cb_data)
{
CcnetProcessor *processor = cb_data;
USE_PRIV;
if (!res->success) {
g_warning ("[recvfs] Failed to write %s.\n", res->obj_id);
ccnet_processor_send_response (processor, SC_BAD_OBJECT, SS_BAD_OBJECT,
NULL, 0);
ccnet_processor_done (processor, FALSE);
return;
}
seaf_debug ("[recvfs] Wrote fs object %s.\n", res->obj_id);
if (++(priv->n_saved) == priv->total_needed) {
seaf_debug ("All objects saved. Done.\n");
ccnet_processor_send_response (processor, SC_END, SS_END, NULL, 0);
ccnet_processor_done (processor, TRUE);
}
}
static int
save_fs_object (CcnetProcessor *processor, ObjectPack *pack, int len)
{
USE_PRIV;
return seaf_obj_store_async_write (seaf->fs_mgr->obj_store,
priv->writer_id,
pack->id,
pack->object,
len - 41,
FALSE);
}
static int
recv_fs_object (CcnetProcessor *processor, char *content, int clen)
{
ObjectPack *pack = (ObjectPack *)content;
/* SeafFSObject *fs_obj = NULL; */
if (clen < sizeof(ObjectPack)) {
g_warning ("invalid object id.\n");
goto bad;
}
seaf_debug ("[recvfs] Recv fs object %.8s.\n", pack->id);
/* Check object integrity by parsing it. */
/* fs_obj = seaf_fs_object_from_data(pack->id, */
/* pack->object, clen - sizeof(ObjectPack), */
/* (priv->repo_version > 0)); */
/* if (!fs_obj) { */
/* g_warning ("Bad fs object %s.\n", pack->id); */
/* goto bad; */
/* } */
/* seaf_fs_object_free (fs_obj); */
if (save_fs_object (processor, pack, clen) < 0) {
goto bad;
}
return 0;
bad:
ccnet_processor_send_response (processor, SC_BAD_OBJECT,
SS_BAD_OBJECT, NULL, 0);
g_warning ("[recvfs] Bad fs object received.\n");
ccnet_processor_done (processor, FALSE);
/* seaf_fs_object_free (fs_obj); */
return -1;
}
static void
recv_fs_object_seg (CcnetProcessor *processor, char *content, int clen)
{
USE_PRIV;
/* Append the received object segment to the end */
priv->obj_seg = g_realloc (priv->obj_seg, priv->obj_seg_len + clen);
memcpy (priv->obj_seg + priv->obj_seg_len, content, clen);
seaf_debug ("[recvfs] Get obj seg: <id= %40s, offset= %d, lenth= %d>\n",
priv->obj_seg, priv->obj_seg_len, clen);
priv->obj_seg_len += clen;
}
static void
process_fs_object_seg (CcnetProcessor *processor)
{
USE_PRIV;
if (recv_fs_object (processor, priv->obj_seg, priv->obj_seg_len) == 0) {
g_free (priv->obj_seg);
priv->obj_seg = NULL;
priv->obj_seg_len = 0;
}
}
static void *
process_object_list (void *data)
{
CcnetProcessor *processor = data;
USE_PRIV;
int n, i;
char *p;
char *obj_id;
n = priv->recv_len/40;
p = priv->recv_objs;
for (i = 0; i < n; ++i) {
obj_id = g_strndup (p, 40);
if (!seaf_obj_store_obj_exists (seaf->fs_mgr->obj_store,
priv->store_id, priv->repo_version,
obj_id))
{
priv->needed_objs = g_list_prepend (priv->needed_objs, obj_id);
++(priv->n_needed);
++(priv->total_needed);
} else
g_free (obj_id);
p += 40;
}
return data;
}
static void
process_object_list_done (void *data)
{
CcnetProcessor *processor = data;
USE_PRIV;
if (priv->n_needed == 0) {
ccnet_processor_send_response (processor,
SC_OBJ_LIST_SEG, SS_OBJ_LIST_SEG,
NULL, 0);
return;
}
char *buf = g_malloc (priv->n_needed * 40);
char *p;
char *obj_id;
GList *ptr;
p = buf;
for (ptr = priv->needed_objs; ptr; ptr = ptr->next) {
obj_id = ptr->data;
memcpy (p, obj_id, 40);
p += 40;
}
ccnet_processor_send_response (processor,
SC_OBJ_LIST_SEG, SS_OBJ_LIST_SEG,
buf, priv->n_needed * 40);
g_free (buf);
string_list_free (priv->needed_objs);
priv->needed_objs = NULL;
priv->n_needed = 0;
}
static void
handle_update (CcnetProcessor *processor,
char *code, char *code_msg,
char *content, int clen)
{
USE_PRIV;
switch (processor->state) {
case CHECK_OBJECT_LIST:
if (strncmp (code, SC_OBJ_LIST_SEG, 3) == 0) {
if (clen % 40 != 0) {
seaf_warning ("Invalid object list segment length %d.\n", clen);
ccnet_processor_send_response (processor,
SC_SHUTDOWN, SS_SHUTDOWN,
NULL, 0);
ccnet_processor_done (processor, FALSE);
return;
}
priv->recv_objs = content;
priv->recv_len = clen;
ccnet_processor_thread_create (processor, seaf->job_mgr,
process_object_list,
process_object_list_done,
processor);
} else if (strncmp (code, SC_OBJ_LIST_SEG_END, 3) == 0) {
if (priv->total_needed == 0) {
seaf_debug ("No objects are needed. Done.\n");
ccnet_processor_send_response (processor, SC_END, SS_END, NULL, 0);
ccnet_processor_done (processor, TRUE);
return;
}
processor->state = RECV_OBJECTS;
} else if (strncmp (code, SC_END, 3) == 0) {
/* The client finds nothing to upload. */
ccnet_processor_done (processor, TRUE);
} else {
seaf_warning ("Bad update: %s %s\n", code, code_msg);
ccnet_processor_send_response (processor,
SC_BAD_UPDATE_CODE, SS_BAD_UPDATE_CODE,
NULL, 0);
ccnet_processor_done (processor, FALSE);
}
break;
case RECV_OBJECTS:
if (strncmp(code, SC_OBJ_SEG, 3) == 0) {
recv_fs_object_seg (processor, content, clen);
} else if (strncmp(code, SC_OBJ_SEG_END, 3) == 0) {
recv_fs_object_seg (processor, content, clen);
process_fs_object_seg (processor);
} else if (strncmp(code, SC_OBJECT, 3) == 0) {
recv_fs_object (processor, content, clen);
} else {
seaf_warning ("Bad update: %s %s\n", code, code_msg);
ccnet_processor_send_response (processor,
SC_BAD_UPDATE_CODE, SS_BAD_UPDATE_CODE,
NULL, 0);
ccnet_processor_done (processor, FALSE);
}
break;
default:
g_return_if_reached ();
}
}

View File

@ -0,0 +1,30 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#ifndef SEAFILE_RECVFS_V2_PROC_H
#define SEAFILE_RECVFS_V2_PROC_H
#include <glib-object.h>
#define SEAFILE_TYPE_RECVFS_V2_PROC (seafile_recvfs_v2_proc_get_type ())
#define SEAFILE_RECVFS_V2_PROC(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SEAFILE_TYPE_RECVFS_V2_PROC, SeafileRecvfsV2Proc))
#define SEAFILE_IS_RECVFS_V2_PROC(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SEAFILE_TYPE_RECVFS_V2_PROC))
#define SEAFILE_RECVFS_V2_PROC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SEAFILE_TYPE_RECVFS_V2_PROC, SeafileRecvfsV2ProcClass))
#define IS_SEAFILE_RECVFS_V2_PROC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), SEAFILE_TYPE_RECVFS_V2_PROC))
#define SEAFILE_RECVFS_V2_PROC_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SEAFILE_TYPE_RECVFS_V2_PROC, SeafileRecvfsV2ProcClass))
typedef struct _SeafileRecvfsV2Proc SeafileRecvfsV2Proc;
typedef struct _SeafileRecvfsV2ProcClass SeafileRecvfsV2ProcClass;
struct _SeafileRecvfsV2Proc {
CcnetProcessor parent_instance;
};
struct _SeafileRecvfsV2ProcClass {
CcnetProcessorClass parent_class;
};
GType seafile_recvfs_v2_proc_get_type ();
#endif

View File

@ -36,6 +36,9 @@
#include "processors/checkbl-proc.h"
#include "processors/checkff-proc.h"
#include "processors/putca-proc.h"
#include "processors/check-protocol-slave-proc.h"
#include "processors/recvfs-v2-proc.h"
#include "processors/recvbranch-v2-proc.h"
#include "cdc/cdc.h"
@ -97,6 +100,12 @@ static void register_processors (CcnetClient *client)
SEAFILE_TYPE_CHECKFF_PROC, NULL);
ccnet_register_service (client, "seafile-putca", "basic",
SEAFILE_TYPE_PUTCA_PROC, NULL);
ccnet_register_service (client, "seafile-check-protocol-slave", "basic",
SEAFILE_TYPE_CHECK_PROTOCOL_SLAVE_PROC, NULL);
ccnet_register_service (client, "seafile-recvfs-v2", "basic",
SEAFILE_TYPE_RECVFS_V2_PROC, NULL);
ccnet_register_service (client, "seafile-recvbranch-v2", "basic",
SEAFILE_TYPE_RECVBRANCH_V2_PROC, NULL);
}
#include <searpc.h>