[client] Fix bugs when cloning a large library.

This commit is contained in:
Jiaqiang Xu 2014-08-12 17:59:13 +08:00
parent 631fd43be9
commit 085c7f14ae
6 changed files with 185 additions and 62 deletions

View File

@ -50,6 +50,7 @@ enum {
RECV_STATE_AUTH,
RECV_STATE_HEADER,
RECV_STATE_CONTENT,
RECV_STATE_DONE,
};
struct _BlockTxClient {
@ -74,7 +75,6 @@ struct _BlockTxClient {
FrameParser parser;
gboolean break_loop;
gboolean canceled;
int version;
};
@ -305,7 +305,10 @@ handle_auth_rsp_content_cb (char *content, int clen, void *cbarg)
if (rsp->status == STATUS_OK) {
seaf_debug ("Auth OK.\n");
g_atomic_int_set (&client->info->ready_for_transfer, 1);
if (!client->info->transfer_once) {
int rsp = BLOCK_CLIENT_READY;
pipewrite (client->info->done_pipe[1], &rsp, sizeof(rsp));
}
/* If in interactive mode, wait for TRANSFER command to start transfer. */
if (client->info->transfer_once)
@ -753,7 +756,7 @@ recv_data_cb (BlockTxClient *client)
if (n == 0) {
seaf_warning ("Data connection is closed by the server.\n");
client->break_loop = TRUE;
client->info->result = BLOCK_CLIENT_SERVER_ERROR;
client->info->result = BLOCK_CLIENT_NET_ERROR;
return;
} else if (n < 0) {
seaf_warning ("Read data connection error: %s.\n",
@ -789,38 +792,81 @@ recv_data_cb (BlockTxClient *client)
client->break_loop = TRUE;
}
static void
shutdown_client (BlockTxClient *client)
{
if (client->block) {
seaf_block_manager_close_block (seaf->block_mgr, client->block);
seaf_block_manager_block_handle_free (seaf->block_mgr, client->block);
client->block = NULL;
}
if (client->parser.enc_init)
EVP_CIPHER_CTX_cleanup (&client->parser.ctx);
evbuffer_free (client->recv_buf);
evutil_closesocket (client->data_fd);
client->recv_state = RECV_STATE_DONE;
}
static gboolean
handle_command (BlockTxClient *client, int command)
{
gboolean ret = FALSE;
int rsp;
switch (command) {
case BLOCK_CLIENT_CMD_TRANSFER:
/* Ignore TRANSFER command if transfer has been canceled. */
if (client->canceled) {
seaf_debug ("Task canceled, ignore transfer command.\n");
/* Ignore TRANSFER command if client has been shutdown. */
if (client->recv_state == RECV_STATE_DONE) {
seaf_debug ("Client was shutdown, ignore transfer command.\n");
break;
}
if (transfer_next_block (client) < 0)
ret = TRUE;
if (transfer_next_block (client) < 0) {
rsp = client->info->result;
pipewrite (client->info->done_pipe[1], &rsp, sizeof(rsp));
shutdown_client (client);
client->break_loop = FALSE;
}
break;
case BLOCK_CLIENT_CMD_CANCEL:
if (client->recv_state == RECV_STATE_DONE) {
seaf_debug ("Client was shutdown, ignore cancel command.\n");
break;
}
seaf_debug ("Canceled command received.\n");
client->info->result = BLOCK_CLIENT_CANCELED;
/* If in interactive mode, just set a canceled flag and send CANCELED
* response.
*/
if (!client->info->transfer_once) {
seaf_debug ("Canceled command received.\n");
int rsp = client->info->result;
pipewrite (client->info->done_pipe[1], &rsp, sizeof(rsp));
client->canceled = TRUE;
} else
if (client->info->transfer_once) {
shutdown_client (client);
ret = TRUE;
} else {
rsp = client->info->result;
pipewrite (client->info->done_pipe[1], &rsp, sizeof(rsp));
shutdown_client (client);
client->break_loop = FALSE;
ret = FALSE;
}
break;
case BLOCK_CLIENT_CMD_END:
client->info->result = BLOCK_CLIENT_SUCCESS;
client->info->result = BLOCK_CLIENT_ENDED;
rsp = client->info->result;
pipewrite (client->info->done_pipe[1], &rsp, sizeof(rsp));
/* Don't need to shutdown_client() here, since it's already called. */
client->break_loop = FALSE;
ret = TRUE;
break;
}
@ -828,18 +874,44 @@ handle_command (BlockTxClient *client, int command)
return ret;
}
static void
static gboolean
do_break_loop (BlockTxClient *client)
{
if (client->info->transfer_once) {
shutdown_client (client);
return TRUE;
} else {
int rsp = client->info->result;
pipewrite (client->info->done_pipe[1], &rsp, sizeof(rsp));
if (client->info->result != BLOCK_CLIENT_SUCCESS)
shutdown_client (client);
client->break_loop = FALSE;
return FALSE;
}
}
static gboolean
client_thread_loop (BlockTxClient *client)
{
BlockTxInfo *info = client->info;
fd_set fds;
int max_fd = MAX (info->cmd_pipe[0], client->data_fd);
int rc;
gboolean restart = FALSE;
while (1) {
FD_ZERO (&fds);
FD_SET (info->cmd_pipe[0], &fds);
FD_SET (client->data_fd, &fds);
/* Stop receiving any data after the client was shutdown. */
if (client->recv_state != RECV_STATE_DONE) {
FD_SET (client->data_fd, &fds);
max_fd = MAX (info->cmd_pipe[0], client->data_fd);
} else
max_fd = info->cmd_pipe[0];
rc = select (max_fd + 1, &fds, NULL, NULL, NULL);
if (rc < 0 && errno == EINTR) {
@ -850,27 +922,28 @@ client_thread_loop (BlockTxClient *client)
break;
}
/* Stopping receiving any data after canceled. */
if (FD_ISSET (client->data_fd, &fds) && !client->canceled) {
if (client->recv_state != RECV_STATE_DONE &&
FD_ISSET (client->data_fd, &fds)) {
recv_data_cb (client);
if (client->break_loop) {
if (client->info->transfer_once)
break;
else {
int rsp = client->info->result;
pipewrite (client->info->done_pipe[1], &rsp, sizeof(rsp));
client->break_loop = FALSE;
}
}
if (client->break_loop && do_break_loop (client))
break;
}
if (FD_ISSET (info->cmd_pipe[0], &fds)) {
int cmd;
piperead (info->cmd_pipe[0], &cmd, sizeof(int));
if (cmd == BLOCK_CLIENT_CMD_RESTART) {
restart = TRUE;
break;
}
if (handle_command (client, cmd))
break;
}
}
return restart;
}
static void *
@ -878,33 +951,46 @@ block_tx_client_thread (void *vdata)
{
BlockTxClient *client = vdata;
BlockTxInfo *info = client->info;
BlockTxClientDoneCB cb = client->cb;
evutil_socket_t data_fd;
gboolean restart;
retry:
data_fd = connect_chunk_server (info->cs);
if (data_fd < 0) {
info->result = BLOCK_CLIENT_NET_ERROR;
if (!info->transfer_once) {
pipewrite (info->done_pipe[1], &info->result, sizeof(info->result));
/* Transfer manager always expects an ENDED response. */
int rsp = BLOCK_CLIENT_ENDED;
pipewrite (info->done_pipe[1], &rsp, sizeof(rsp));
}
return vdata;
}
client->data_fd = data_fd;
if (send_handshake (data_fd, info) < 0)
if (send_handshake (data_fd, info) < 0) {
if (!info->transfer_once) {
pipewrite (info->done_pipe[1], &info->result, sizeof(info->result));
int rsp = BLOCK_CLIENT_ENDED;
pipewrite (info->done_pipe[1], &rsp, sizeof(rsp));
}
return vdata;
}
client->recv_buf = evbuffer_new ();
client_thread_loop (client);
restart = client_thread_loop (client);
if (client->block) {
seaf_block_manager_close_block (seaf->block_mgr, client->block);
seaf_block_manager_block_handle_free (seaf->block_mgr, client->block);
if (restart) {
seaf_message ("Restarting block tx client.\n");
memset (client, 0, sizeof(BlockTxClient));
client->info = info;
client->cb = cb;
client->info->result = BLOCK_CLIENT_UNKNOWN;
goto retry;
}
if (client->parser.enc_init)
EVP_CIPHER_CTX_cleanup (&client->parser.ctx);
evbuffer_free (client->recv_buf);
evutil_closesocket (data_fd);
return vdata;
}

View File

@ -12,11 +12,14 @@ typedef void (*BlockTxClientDoneCB) (BlockTxInfo *);
* After all blocks are uploaded, the client done callback is called.
*
* 2. In download, the client is set to interactive mode.
* Transfer manager initiates multiple batches of blocks for download.
* The block tx client first has to connect to the server and do authentication.
* After authentication is done, block-tx-client writes a READY reply to done_pipe.
* Transfer manager waits on the done_pipe for the READY reply.
* Transfer manager then initiates multiple batches of blocks for download.
* After each batch is downloaded, the block client writes to info->done_pipe
* to notify transfer manager.
* After all blocks are downloaded, transfer manager send a END command to
* block client. The block client exits and calls the client done callback.
* block client. The block client exits and returns an ENDED response code.
*/
int
@ -26,6 +29,7 @@ enum {
BLOCK_CLIENT_CMD_TRANSFER = 0,
BLOCK_CLIENT_CMD_CANCEL,
BLOCK_CLIENT_CMD_END,
BLOCK_CLIENT_CMD_RESTART,
};
void

View File

@ -1790,7 +1790,7 @@ checkout_file (const char *repo_id,
case BLOCK_CLIENT_NET_ERROR:
case BLOCK_CLIENT_SERVER_ERROR:
g_free (path);
return FETCH_CHECKOUT_FAILED;
return FETCH_CHECKOUT_TRANSFER_ERROR;
case BLOCK_CLIENT_CANCELED:
g_free (path);
return FETCH_CHECKOUT_CANCELED;
@ -2165,13 +2165,21 @@ seaf_repo_fetch_and_checkout (TransferTask *task,
remote_head_id,
conflict_hash,
no_conflict_hash);
/* Even if the file failed to check out, still need to update index. */
/* Even if the file failed to check out, still need to update index.
* But we have to stop after transfer errors.
*/
if (rc == FETCH_CHECKOUT_CANCELED) {
seaf_debug ("Transfer canceled.\n");
ret = FETCH_CHECKOUT_CANCELED;
if (add_ce)
cache_entry_free (ce);
goto out;
} else if (rc == FETCH_CHECKOUT_TRANSFER_ERROR) {
seaf_warning ("Transfer failed.\n");
ret = FETCH_CHECKOUT_TRANSFER_ERROR;
if (add_ce)
cache_entry_free (ce);
goto out;
}
cleanup_file_blocks (task->repo_id, task->repo_version, file_id);

View File

@ -390,6 +390,7 @@ enum {
FETCH_CHECKOUT_SUCCESS = 0,
FETCH_CHECKOUT_CANCELED,
FETCH_CHECKOUT_FAILED,
FETCH_CHECKOUT_TRANSFER_ERROR,
};
struct _TransferTask;

View File

@ -827,16 +827,29 @@ seaf_transfer_manager_download_file_blocks (SeafTransferManager *manager,
seafile_unref (file);
BlockTxInfo *info = task->tx_info;
int rsp;
retry:
if (g_queue_get_length (task->block_ids) == 0)
return BLOCK_CLIENT_SUCCESS;
BlockTxInfo *info = task->tx_info;
block_tx_client_run_command (info, BLOCK_CLIENT_CMD_TRANSFER);
/* Wait until block download is done. */
int rsp;
piperead (info->done_pipe[0], &rsp, sizeof(rsp));
/* The server closes the socket after 30 seconds without data,
* so just retry if we encounter network error.
*/
if (rsp == BLOCK_CLIENT_NET_ERROR) {
block_tx_client_run_command (info, BLOCK_CLIENT_CMD_RESTART);
piperead (info->done_pipe[0], &rsp, sizeof(rsp));
if (rsp == BLOCK_CLIENT_READY)
goto retry;
}
while ((block_id = g_queue_pop_head(task->block_ids)) != NULL)
g_free (block_id);
@ -1318,13 +1331,7 @@ start_block_tx_client_run_once (TransferTask *task)
static void
block_tx_client_interactive_done_cb (BlockTxInfo *info)
{
g_free (info->enc_session_key);
pipeclose (info->cmd_pipe[0]);
pipeclose (info->cmd_pipe[1]);
pipeclose (info->done_pipe[0]);
pipeclose (info->done_pipe[1]);
g_queue_free (info->task->block_ids);
g_free (info);
}
static void
@ -1378,16 +1385,19 @@ download_and_checkout_files_thread (void *vdata)
{
DownloadFilesData *data = vdata;
TransferTask *task = data->task;
gint ready;
int rsp;
do {
g_usleep (1000000);
ready = g_atomic_int_get (&task->tx_info->ready_for_transfer);
} while (ready == 0);
piperead (task->tx_info->done_pipe[0], &rsp, sizeof(rsp));
data->status = seaf_repo_fetch_and_checkout (task, task->head);
if (rsp == BLOCK_CLIENT_READY) {
data->status = seaf_repo_fetch_and_checkout (task, task->head);
block_tx_client_run_command (task->tx_info, BLOCK_CLIENT_CMD_END);
block_tx_client_run_command (task->tx_info, BLOCK_CLIENT_CMD_END);
piperead (task->tx_info->done_pipe[0], &rsp, sizeof(rsp));
} else
/* block-tx-client thread should have exited. */
data->status = FETCH_CHECKOUT_FAILED;
return vdata;
}
@ -1397,6 +1407,15 @@ download_and_checkout_files_done (void *vdata)
{
DownloadFilesData *data = vdata;
TransferTask *task = data->task;
BlockTxInfo *info = task->tx_info;
g_free (info->enc_session_key);
pipeclose (info->cmd_pipe[0]);
pipeclose (info->cmd_pipe[1]);
pipeclose (info->done_pipe[0]);
pipeclose (info->done_pipe[1]);
g_queue_free (info->task->block_ids);
g_free (info);
switch (data->status) {
case FETCH_CHECKOUT_SUCCESS:
@ -1404,6 +1423,7 @@ download_and_checkout_files_done (void *vdata)
transition_state (task, TASK_STATE_FINISHED, TASK_RT_STATE_FINISHED);
break;
case FETCH_CHECKOUT_FAILED:
case FETCH_CHECKOUT_TRANSFER_ERROR:
transition_state_to_error (task, TASK_ERR_DOWNLOAD_BLOCKS);
break;
case FETCH_CHECKOUT_CANCELED:

View File

@ -96,6 +96,10 @@ enum {
BLOCK_CLIENT_NET_ERROR,
BLOCK_CLIENT_SERVER_ERROR,
BLOCK_CLIENT_CANCELED,
/* result codes only used in interactive mode. */
BLOCK_CLIENT_READY,
BLOCK_CLIENT_ENDED,
};
#define BLOCK_TX_SESSION_KEY_LEN 32