[client] Get worktree event detail on Windows.

This commit is contained in:
Jiaqiang Xu 2014-06-18 20:48:18 +08:00
parent 3ff5a3a2dd
commit 567a592567
4 changed files with 619 additions and 211 deletions

View File

@ -1,4 +1,13 @@
#ifdef WIN32
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x500
#endif
#include <windows.h>
#endif /* WIN32 */
#include "common.h"
#include "seafile-session.h"

View File

@ -10,6 +10,7 @@
#include <ccnet.h>
#include "utils.h"
#include "avl/avl.h"
#define DEBUG_FLAG SEAFILE_DEBUG_SYNC
#include "log.h"
#include "status.h"
@ -598,11 +599,45 @@ out:
static int
add_path_to_index (SeafRepo *repo, struct index_state *istate,
SeafileCrypt *crypt, const char *path, GList *ignore_list)
SeafileCrypt *crypt, const char *path, GList *ignore_list,
GList **scanned_dirs)
{
char *full_path;
SeafStat st;
/* When a repo is initially added, a CREATE_OR_UPDATE event will be created
* for the worktree root "".
*/
if (path[0] == 0) {
add_recursive (repo->id, repo->version, repo->email, istate,
repo->worktree, path,
crypt, FALSE, ignore_list);
return 0;
}
/* If we've recursively scanned the parent directory, don't need to scan
* any files under it any more.
*/
GList *ptr;
char *dir, *full_dir;
for (ptr = *scanned_dirs; ptr; ptr = ptr->next) {
dir = ptr->data;
/* exact match */
if (strcmp (dir, path) == 0) {
seaf_debug ("%s has been scanned before, skip adding.\n", path);
return 0;
}
/* prefix match. */
full_dir = g_strconcat (dir, "/", NULL);
if (strncmp (full_dir, path, strlen(full_dir)) == 0) {
g_free (full_dir);
seaf_debug ("%s has been scanned before, skip adding.\n", path);
return 0;
}
g_free (full_dir);
}
if (check_full_path_ignore (repo->worktree, path, ignore_list))
return 0;
@ -614,13 +649,12 @@ add_path_to_index (SeafRepo *repo, struct index_state *istate,
return -1;
}
if (S_ISDIR(st.st_mode)) {
if (is_empty_dir (full_path, ignore_list))
add_empty_dir_to_index (istate, path, &st);
} else if (S_ISREG(st.st_mode)) {
add_to_index (repo->id, repo->version, istate, path, full_path,
&st, 0, crypt, index_cb, repo->email);
}
if (S_ISDIR(st.st_mode))
*scanned_dirs = g_list_prepend (*scanned_dirs, g_strdup(path));
/* Add is always recursive */
add_recursive (repo->id, repo->version, repo->email, istate, repo->worktree, path,
crypt, FALSE, ignore_list);
g_free (full_path);
return 0;
@ -664,6 +698,15 @@ apply_worktree_changes_to_index (SeafRepo *repo, struct index_state *istate,
return -1;
}
GList *scanned_dirs = NULL;
WTEvent *last_event;
pthread_mutex_lock (&status->q_lock);
last_event = g_queue_peek_tail (status->event_q);
pthread_mutex_unlock (&status->q_lock);
if (!last_event)
goto out;
while (1) {
pthread_mutex_lock (&status->q_lock);
event = g_queue_pop_head (status->event_q);
@ -673,7 +716,8 @@ apply_worktree_changes_to_index (SeafRepo *repo, struct index_state *istate,
switch (event->ev_type) {
case WT_EVENT_CREATE_OR_UPDATE:
add_path_to_index (repo, istate, crypt, event->path, ignore_list);
add_path_to_index (repo, istate, crypt, event->path,
ignore_list, &scanned_dirs);
break;
case WT_EVENT_DELETE:
remove_from_index_with_prefix (istate, event->path);
@ -712,10 +756,16 @@ apply_worktree_changes_to_index (SeafRepo *repo, struct index_state *istate,
break;
}
wt_event_free (event);
if (event == last_event) {
wt_event_free (event);
break;
} else
wt_event_free (event);
}
out:
wt_status_unref (status);
string_list_free (scanned_dirs);
return 0;
}

View File

@ -220,7 +220,8 @@ handle_rename (int in_fd,
} else if (event->mask & IN_MOVED_TO) {
/* A file/dir was moved into this repo. */
/* Add watch and produce events. */
add_watch_recursive (info, in_fd, worktree, filename, TRUE);
add_event_to_queue (status, WT_EVENT_CREATE_OR_UPDATE, filename, NULL);
add_watch_recursive (info, in_fd, worktree, filename, FALSE);
}
} else {
if (event->mask & IN_MOVED_FROM) {
@ -250,7 +251,9 @@ handle_rename (int in_fd,
*/
add_event_to_queue (status, WT_EVENT_DELETE,
rename_info->old_path, NULL);
add_watch_recursive (info, in_fd, worktree, filename, TRUE);
add_event_to_queue (status, WT_EVENT_CREATE_OR_UPDATE,
filename, NULL);
add_watch_recursive (info, in_fd, worktree, filename, FALSE);
}
unset_rename_processing_state (rename_info);
} else {
@ -351,7 +354,8 @@ process_one_event (int in_fd,
* watch it. So it's safer to scan this dir. At most time we don't
* have to scan recursively and very few new files will be found.
*/
add_watch_recursive (info, in_fd, worktree, filename, TRUE);
add_event_to_queue (status, WT_EVENT_CREATE_OR_UPDATE, filename, NULL);
add_watch_recursive (info, in_fd, worktree, filename, FALSE);
} else if (event->mask & IN_DELETE) {
seaf_debug ("Deleted %s.\n", filename);
add_event_to_queue (status, WT_EVENT_DELETE, filename, NULL);
@ -575,7 +579,7 @@ add_watch (SeafWTMonitorPriv *priv, const char *repo_id, const char *worktree)
g_hash_table_insert (priv->info_hash, (gpointer)(long)inotify_fd, info);
pthread_mutex_unlock (&priv->hash_lock);
if (add_watch_recursive (info, inotify_fd, worktree, "", TRUE) < 0) {
if (add_watch_recursive (info, inotify_fd, worktree, "", FALSE) < 0) {
close (inotify_fd);
pthread_mutex_lock (&priv->hash_lock);
g_hash_table_remove (priv->handle_hash, repo_id);
@ -584,13 +588,15 @@ add_watch (SeafWTMonitorPriv *priv, const char *repo_id, const char *worktree)
return -1;
}
/* An empty path indicates repo-mgr to scan the whole worktree. */
add_event_to_queue (info->status, WT_EVENT_CREATE_OR_UPDATE, "", NULL);
return inotify_fd;
}
static int handle_add_repo (SeafWTMonitorPriv *priv,
const char *repo_id,
const char *worktree,
long *handle)
const char *worktree)
{
int inotify_fd;
@ -601,7 +607,6 @@ static int handle_add_repo (SeafWTMonitorPriv *priv,
FD_SET (inotify_fd, &priv->read_fds);
priv->maxfd = MAX (inotify_fd, priv->maxfd);
*handle = (long)inotify_fd;
return 0;
}
@ -661,7 +666,6 @@ static void
handle_watch_command (SeafWTMonitor *monitor, WatchCommand *cmd)
{
SeafWTMonitorPriv *priv = monitor->priv;
long inotify_fd;
if (cmd->type == CMD_ADD_WATCH) {
if (g_hash_table_lookup_extended (priv->handle_hash, cmd->repo_id,
@ -670,7 +674,7 @@ handle_watch_command (SeafWTMonitor *monitor, WatchCommand *cmd)
return;
}
if (handle_add_repo(priv, cmd->repo_id, cmd->worktree, &inotify_fd) < 0) {
if (handle_add_repo(priv, cmd->repo_id, cmd->worktree) < 0) {
seaf_warning ("[wt mon] failed to watch worktree of repo %s.\n",
cmd->repo_id);
reply_watch_command (monitor, -1);

View File

@ -14,23 +14,12 @@
#define DEBUG_FLAG SEAFILE_DEBUG_WATCH
#include "log.h"
typedef enum CommandType {
CMD_ADD_WATCH,
CMD_DELETE_WATCH,
CMD_REFRESH_WATCH,
N_CMD_TYPES,
} CommandType;
typedef struct WatchCommand {
CommandType type;
char repo_id[37];
} WatchCommand;
#define DIR_WATCH_MASK \
FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_LAST_WRITE \
| FILE_NOTIFY_CHANGE_DIR_NAME | FILE_NOTIFY_CHANGE_SIZE
#define DIR_WATCH_BUFSIZE (sizeof(FILE_NOTIFY_INFORMATION) + SEAF_PATH_MAX * 2)
/* Use large buffer to prevent events overflow. */
#define DIR_WATCH_BUFSIZE 1 << 20 /* 1MB */
/* Hold the OVERLAPPED struct for asynchronous ReadDirectoryChangesW(), and
the buf to receive dir change info. */
@ -40,20 +29,92 @@ typedef struct DirWatchAux {
gboolean unused;
} DirWatchAux;
typedef struct RenameInfo {
char *old_path;
gboolean processing; /* Are we processing a rename event? */
} RenameInfo;
typedef struct EventInfo {
DWORD action;
DWORD name_len;
char name[SEAF_PATH_MAX];
} EventInfo;
typedef struct RepoWatchInfo {
WTStatus *status;
RenameInfo *rename_info;
EventInfo last_event;
char *worktree;
} RepoWatchInfo;
struct SeafWTMonitorPriv {
pthread_mutex_t hash_lock;
GHashTable *handle_hash; /* repo_id -> dir handle */
GHashTable *status_hash; /* handle -> status */
GHashTable *info_hash; /* handle -> RepoWatchInfo */
GHashTable *buf_hash; /* handle -> aux buf */
int cmd_pipe[2];
int res_pipe[2];
HANDLE iocp_handle;
WatchCommand cmd; /* latest received command */
WatchCommand cmd;
};
static void handle_watch_command (SeafWTMonitorPriv *priv, WatchCommand *cmd);
static void *wt_monitor_job_win32 (void *vmonitor);
static void handle_watch_command (SeafWTMonitor *monitor, WatchCommand *cmd);
/* RenameInfo */
static RenameInfo *create_rename_info ()
{
RenameInfo *info = g_new0 (RenameInfo, 1);
return info;
}
static void free_rename_info (RenameInfo *info)
{
g_free (info->old_path);
g_free (info);
}
inline static void
set_rename_processing_state (RenameInfo *info, const char *path)
{
info->old_path = g_strdup(path);
info->processing = TRUE;
}
inline static void
unset_rename_processing_state (RenameInfo *info)
{
g_free (info->old_path);
info->old_path = NULL;
info->processing = FALSE;
}
/* RepoWatchInfo */
static RepoWatchInfo *
create_repo_watch_info (const char *repo_id, const char *worktree)
{
WTStatus *status = create_wt_status (repo_id);
RenameInfo *rename_info = create_rename_info ();
RepoWatchInfo *info = g_new0 (RepoWatchInfo, 1);
info->status = status;
info->rename_info = rename_info;
info->worktree = g_strdup(worktree);
return info;
}
static void
free_repo_watch_info (RepoWatchInfo *info)
{
wt_status_unref (info->status);
free_rename_info (info->rename_info);
g_free (info->worktree);
g_free (info);
}
static inline void
init_overlapped(OVERLAPPED *ol)
@ -68,6 +129,36 @@ reset_overlapped(OVERLAPPED *ol)
ol->Offset = ol->OffsetHigh = 0;
}
static void
add_event_to_queue (WTStatus *status,
int type, const char *path, const char *new_path)
{
WTEvent *event = wt_event_new (type, path, new_path);
char *name;
switch (type) {
case WT_EVENT_CREATE_OR_UPDATE:
name = "create/update";
break;
case WT_EVENT_DELETE:
name = "delete";
break;
case WT_EVENT_RENAME:
name = "rename";
break;
case WT_EVENT_OVERFLOW:
name = "overflow";
break;
default:
name = "unknown";
}
seaf_debug ("Adding event: %s, %s %s\n", name, path, new_path?new_path:"");
pthread_mutex_lock (&status->q_lock);
g_queue_push_tail (status->event_q, event);
pthread_mutex_unlock (&status->q_lock);
}
/* Every time after a read event is processed, we should call
* ReadDirectoryChangesW() on the dir handle asynchronously for the IOCP to
@ -93,10 +184,13 @@ start_watch_dir_change(SeafWTMonitorPriv *priv, HANDLE dir_handle)
/* The ending W of this function indicates that the info recevied about
the change would be in Unicode(specifically, the name of the file that
is changed would be encoded in wide char), but we don't care it right
now. Maybe in the future.
is changed would be encoded in wide char).
*/
BOOL ret = ReadDirectoryChangesW
BOOL ret;
DWORD code;
RepoWatchInfo *info;
retry:
ret = ReadDirectoryChangesW
(dir_handle, /* dir handle */
&aux->buf, /* buf to hold change info */
DIR_WATCH_BUFSIZE, /* buf size */
@ -107,12 +201,21 @@ start_watch_dir_change(SeafWTMonitorPriv *priv, HANDLE dir_handle)
NULL); /* completion routine */
if (!ret) {
code = GetLastError();
seaf_warning("Failed to ReadDirectoryChangesW, "
"error code %lu", code);
if (first_alloc)
/* if failed at the first watch, free the aux buffer */
g_free(aux);
seaf_warning("Failed to ReadDirectoryChangesW, "
"error code %lu", GetLastError());
else if (code == ERROR_NOTIFY_ENUM_DIR) {
/* If buffer overflowed after the last call,
* add an overflow event and retry watch.
*/
info = g_hash_table_lookup (priv->info_hash, dir_handle);
add_event_to_queue (info->status, WT_EVENT_OVERFLOW, NULL, NULL);
goto retry;
}
} else {
if (first_alloc)
/* insert the aux buffer into hash table at the first watch */
@ -128,8 +231,9 @@ start_watch_dir_change(SeafWTMonitorPriv *priv, HANDLE dir_handle)
* the pipe handle asynchronously for the IOCP to detect when it's readable.
*/
static BOOL
start_watch_cmd_pipe (SeafWTMonitorPriv *priv, OVERLAPPED *ol_in)
start_watch_cmd_pipe (SeafWTMonitor *monitor, OVERLAPPED *ol_in)
{
SeafWTMonitorPriv *priv = monitor->priv;
OVERLAPPED *ol = ol_in;
if (!ol) {
@ -137,7 +241,7 @@ start_watch_cmd_pipe (SeafWTMonitorPriv *priv, OVERLAPPED *ol_in)
init_overlapped(ol);
}
HANDLE hPipe = (HANDLE)priv->cmd_pipe[0];
HANDLE hPipe = (HANDLE)monitor->cmd_pipe[0];
BOOL sts = ReadFile
(hPipe, /* file handle */
@ -165,8 +269,9 @@ start_watch_cmd_pipe (SeafWTMonitorPriv *priv, OVERLAPPED *ol_in)
* ReadDirectoryChangesW() on it.
*/
static BOOL
add_handle_to_iocp (SeafWTMonitorPriv *priv, HANDLE hAdd)
add_handle_to_iocp (SeafWTMonitor *monitor, HANDLE hAdd)
{
SeafWTMonitorPriv *priv = monitor->priv;
if (!priv || !hAdd)
return FALSE;
@ -194,9 +299,9 @@ add_handle_to_iocp (SeafWTMonitorPriv *priv, HANDLE hAdd)
return FALSE;
}
if (hAdd == (HANDLE)priv->cmd_pipe[0]) {
if (hAdd == (HANDLE)monitor->cmd_pipe[0]) {
/* HANDLE is cmd_pipe */
return start_watch_cmd_pipe (priv, NULL);
return start_watch_cmd_pipe (monitor, NULL);
} else {
/* HANDLE is a dir handle */
return start_watch_dir_change (priv, hAdd);
@ -204,12 +309,13 @@ add_handle_to_iocp (SeafWTMonitorPriv *priv, HANDLE hAdd)
}
/* Add the pipe handle and all repo wt handles to IO Completion Port. */
static BOOL
add_all_to_iocp (SeafWTMonitorPriv *priv)
add_all_to_iocp (SeafWTMonitor *monitor)
{
if (!add_handle_to_iocp(priv, (HANDLE)priv->cmd_pipe[0])) {
SeafWTMonitorPriv *priv = monitor->priv;
if (!add_handle_to_iocp(monitor, (HANDLE)monitor->cmd_pipe[0])) {
seaf_warning("Failed to add cmd_pipe to iocp, "
"error code %lu", GetLastError());
@ -222,7 +328,7 @@ add_all_to_iocp (SeafWTMonitorPriv *priv)
g_hash_table_iter_init (&iter, priv->handle_hash);
while (g_hash_table_iter_next (&iter, &key, &value)) {
if (!add_handle_to_iocp(priv, (HANDLE)value)) {
if (!add_handle_to_iocp(monitor, (HANDLE)value)) {
seaf_warning("Failed to add dir handle to iocp, "
"repo %s, error code %lu", (char *)key,
GetLastError());
@ -234,6 +340,245 @@ add_all_to_iocp (SeafWTMonitorPriv *priv)
return TRUE;
}
/*
* On Windows, RENAMED_OLD_NAME and RENAMED_NEW_NAME always comes in pairs.
* If a file or dir is moved in/out of the worktree, ADDED or REMOVED event
* will be emitted by the kernel.
*
* This is a two-state state machine. The states are 'not processing rename' and
* 'processing rename'.
*/
static void
handle_rename (RepoWatchInfo *info,
PFILE_NOTIFY_INFORMATION event,
const char *worktree,
const char *filename,
gboolean last_event)
{
WTStatus *status = info->status;
RenameInfo *rename_info = info->rename_info;
if (event->Action == FILE_ACTION_RENAMED_OLD_NAME)
seaf_debug ("Move %s ->\n", filename);
else if (event->Action == FILE_ACTION_RENAMED_NEW_NAME)
seaf_debug ("Move -> %s.\n", filename);
if (!rename_info->processing) {
if (event->Action == FILE_ACTION_RENAMED_OLD_NAME) {
if (!last_event) {
set_rename_processing_state (rename_info, filename);
} else {
/* RENAMED_OLD_NAME should not be the last event,
just ignore it.
*/
}
}
} else {
if (event->Action == FILE_ACTION_RENAMED_NEW_NAME) {
/* Rename pair detected. */
add_event_to_queue (status, WT_EVENT_RENAME,
rename_info->old_path, filename);
unset_rename_processing_state (rename_info);
}
}
}
static gboolean
handle_consecutive_duplicate_event (RepoWatchInfo *info,
PFILE_NOTIFY_INFORMATION event)
{
gboolean duplicate;
/* Initially last_event is zero so it's not duplicate with any real events. */
duplicate = (info->last_event.action == event->Action &&
info->last_event.name_len == event->FileNameLength &&
memcmp (info->last_event.name, event->FileName, event->FileNameLength) == 0);
info->last_event.action = event->Action;
info->last_event.name_len = event->FileNameLength;
memcpy (info->last_event.name, event->FileName, event->FileNameLength);
return duplicate;
}
static char *
convert_to_unix_path (const wchar_t *path, int path_len)
{
char *utf8_path = g_utf16_to_utf8 (path, path_len/sizeof(wchar_t),
NULL, NULL, NULL);
char *p;
for (p = utf8_path; *p != 0; ++p)
if (*p == '\\')
*p = '/';
return utf8_path;
}
static void
process_one_event (RepoWatchInfo *info,
const char *worktree,
PFILE_NOTIFY_INFORMATION event,
gboolean last_event)
{
WTStatus *status = info->status;
char *filename;
gboolean add_to_queue = TRUE;
if (handle_consecutive_duplicate_event (info, event))
add_to_queue = FALSE;
filename = convert_to_unix_path (event->FileName, event->FileNameLength);
handle_rename (info, event, worktree, filename, last_event);
if (event->Action == FILE_ACTION_MODIFIED) {
seaf_debug ("Modified %s.\n", filename);
/* Ignore modified event for directories. */
char *full_path = g_build_filename (worktree, filename, NULL);
if (g_file_test(full_path, G_FILE_TEST_IS_DIR)) {
g_free (full_path);
goto out;
}
g_free (full_path);
if (add_to_queue)
add_event_to_queue (status, WT_EVENT_CREATE_OR_UPDATE, filename, NULL);
} else if (event->Action == FILE_ACTION_ADDED) {
seaf_debug ("Created %s.\n", filename);
add_event_to_queue (status, WT_EVENT_CREATE_OR_UPDATE, filename, NULL);
} else if (event->Action == FILE_ACTION_REMOVED) {
seaf_debug ("Deleted %s.\n", filename);
add_event_to_queue (status, WT_EVENT_DELETE, filename, NULL);
}
out:
g_free (filename);
g_atomic_int_set (&info->status->last_changed, (gint)time(NULL));
}
static gboolean
process_events (const char *repo_id, RepoWatchInfo *info,
char *event_buf, unsigned int buf_size)
{
PFILE_NOTIFY_INFORMATION event;
int offset = 0;
while (1) {
event = (PFILE_NOTIFY_INFORMATION)&event_buf[offset];
offset += event->NextEntryOffset;
process_one_event (info, info->worktree,
event, (event->NextEntryOffset == 0));
if (!event->NextEntryOffset)
break;
}
return TRUE;
}
static void *
wt_monitor_job_win32 (void *vmonitor)
{
SeafWTMonitor *monitor = vmonitor;
SeafWTMonitorPriv *priv = monitor->priv;
/* 2 * sizeof(inotify_event) + 256, should be large enough for one event.*/
RepoWatchInfo *info;
DWORD bytesRead = 0;
ULONG_PTR key = 0;
OVERLAPPED *ol = NULL;
/* Use I/O Completion Port to watch asynchronous events on:
* 1) dir watch handles(events created by ReadDirectoryChangesW)
* 2) the cmd pipe (which is a socket indeed)
*/
if (!add_all_to_iocp(monitor)) {
seaf_warning("Failed to add all to iocp\n");
return NULL;
}
while (1) {
BOOL ret = GetQueuedCompletionStatus
(priv->iocp_handle, /* iocp handle */
&bytesRead, /* length of info */
&key, /* completion key */
&ol, /* OVERLAPPED */
INFINITE); /* timeout */
static int retry;
if (!ret) {
seaf_warning ("GetQueuedCompletionStatus failed, "
"error code %lu", GetLastError());
if (retry++ < 3)
continue;
else
break;
} else {
/* clear the retry counter on success */
retry = 0;
}
if (key == (ULONG_PTR)monitor->cmd_pipe[0]) {
/* Triggered by a cmd pipe event */
if (bytesRead != sizeof(WatchCommand)) {
seaf_warning ("broken cmd from pipe: get"
" %d(expected: %d) bytes\n",
(int)bytesRead, sizeof(WatchCommand));
continue;
}
seaf_debug ("recevied a pipe cmd, type %d for repo %s\n",
priv->cmd.type, priv->cmd.repo_id);
handle_watch_command (monitor, &priv->cmd);
reset_overlapped(ol);
start_watch_cmd_pipe (monitor, ol);
} else {
/* Trigger by one of the dir watch handles */
HANDLE hTriggered = (HANDLE)key;
info = (RepoWatchInfo *)g_hash_table_lookup
(priv->info_hash, (gconstpointer)hTriggered);
if (info) {
DirWatchAux *aux = g_hash_table_lookup (priv->buf_hash,
(gconstpointer)hTriggered);
process_events (info->status->repo_id, info, aux->buf, bytesRead);
reset_overlapped(ol);
if (!start_watch_dir_change(priv, hTriggered)) {
seaf_warning ("start_watch_dir_change failed"
"for repo %s, error code %lu\n",
info->status->repo_id, GetLastError());
}
} else {
/* A previously unwatched dir_handle's DirWatchAux buf was
scheduled to be freed. */
DirWatchAux *aux = g_hash_table_lookup (priv->buf_hash, (gconstpointer)hTriggered);
if (aux && aux->unused)
g_free (aux);
g_hash_table_remove (priv->buf_hash, (gconstpointer)hTriggered);
}
}
}
return NULL;
}
/* Get the HANDLE of a repo directory, for latter use in
* ReadDirectoryChangesW(). This handle should be closed when the repo is
@ -266,6 +611,54 @@ get_handle_of_path(const wchar_t *path)
return dir_handle;
}
static HANDLE add_watch (SeafWTMonitorPriv *priv,
const char *repo_id,
const char *worktree)
{
HANDLE dir_handle = NULL;
wchar_t *path = NULL;
RepoWatchInfo *info;
/* worktree is in utf8, need to convert to wchar in win32 */
path = wchar_from_utf8 (worktree);
dir_handle = get_handle_of_path (path);
if (!dir_handle) {
seaf_warning ("failed to open handle for worktree "
"of repo %s\n", repo_id);
g_free (path);
return NULL;
}
g_free (path);
pthread_mutex_lock (&priv->hash_lock);
g_hash_table_insert (priv->handle_hash,
g_strdup(repo_id), (gpointer)(long)dir_handle);
info = create_repo_watch_info (repo_id, worktree);
g_hash_table_insert (priv->info_hash, (gpointer)(long)dir_handle, info);
pthread_mutex_unlock (&priv->hash_lock);
add_event_to_queue (info->status, WT_EVENT_CREATE_OR_UPDATE, "", NULL);
return dir_handle;
}
static int handle_add_repo (SeafWTMonitor *monitor,
const char *repo_id,
const char *worktree)
{
HANDLE handle;
handle = add_watch (monitor->priv, repo_id, worktree);
if (handle == NULL ||
!add_handle_to_iocp(monitor, handle)) {
return -1;
}
return 0;
}
/* Free the aux buffer when a repo is unwatched. */
static void
rm_from_buf_hash (SeafWTMonitorPriv *priv, HANDLE dir_handle)
@ -276,8 +669,6 @@ rm_from_buf_hash (SeafWTMonitorPriv *priv, HANDLE dir_handle)
if (!aux)
return;
g_hash_table_remove(priv->buf_hash, dir_handle);
/* `aux' can't be freed here. Once we we close the dir_handle, its
* outstanding io would cause GetQueuedCompletionStatus() to return some
* information in aux->buf. If we free it here, it would cause seg fault.
@ -288,162 +679,15 @@ rm_from_buf_hash (SeafWTMonitorPriv *priv, HANDLE dir_handle)
CloseHandle(dir_handle);
}
static HANDLE add_watch (const char* repo_id)
static int handle_rm_repo (SeafWTMonitorPriv *priv, const char *repo_id, gpointer handle)
{
SeafRepo *repo = NULL;
HANDLE dir_handle = NULL;
wchar_t *path = NULL;
HANDLE h = (HANDLE)handle;
repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id);
if (!repo) {
seaf_warning ("[wt mon] cannot find repo %s.\n", repo_id);
return NULL;
}
/* repo->worktree is in utf8, need to convert to wchar in win32 */
path = wchar_from_utf8 (repo->worktree);
dir_handle = get_handle_of_path (path);
if (!dir_handle) {
seaf_warning ("failed to open handle for worktree "
"of repo %s\n", repo_id);
} else {
seaf_debug ("opened handle for worktree %s\n", path);
}
g_free (path);
return dir_handle;
}
static void *
wt_monitor_job (void *vmonitor)
{
SeafWTMonitor *monitor = vmonitor;
SeafWTMonitorPriv *priv = monitor->priv;
/* 2 * sizeof(inotify_event) + 256, should be large enough for one event.*/
WTStatus *status;
DWORD bytesRead = 0;
ULONG_PTR key = 0;
OVERLAPPED *ol = NULL;
/* Use I/O Completion Port to watch asynchronous events on:
* 1) dir watch handles(events created by ReadDirectoryChangesW)
* 2) the cmd pipe (which is a socket indeed)
*/
if (!add_all_to_iocp(priv)) {
seaf_warning("Failed to add all to iocp\n");
return NULL;
}
while (1) {
BOOL ret = GetQueuedCompletionStatus
(priv->iocp_handle, /* iocp handle */
&bytesRead, /* length of info */
&key, /* completion key */
&ol, /* OVERLAPPED */
INFINITE); /* timeout */
static int retry;
if (!ret) {
seaf_warning ("GetQueuedCompletionStatus failed, "
"error code %lu", GetLastError());
if (retry++ < 3)
continue;
else
break;
} else {
/* clear the retry counter on success */
retry = 0;
}
if (key == (ULONG_PTR)priv->cmd_pipe[0]) {
/* Triggered by a cmd pipe event */
if (bytesRead != sizeof(WatchCommand)) {
seaf_warning ("broken cmd from pipe: get"
" %d(expected: %d) bytes\n",
(int)bytesRead, sizeof(WatchCommand));
continue;
}
seaf_debug ("recevied a pipe cmd, type %d for repo %s\n",
priv->cmd.type, priv->cmd.repo_id);
handle_watch_command (priv, &priv->cmd);
reset_overlapped(ol);
start_watch_cmd_pipe (priv, ol);
} else {
/* Trigger by one of the dir watch handles */
HANDLE hTriggered = (HANDLE)key;
status = (WTStatus *)g_hash_table_lookup
(priv->status_hash, (gconstpointer)hTriggered);
char *repo_id = NULL;
if (status && status->repo_id)
repo_id = status->repo_id;
else
repo_id = "Unknown-repo-id";
if (status) {
g_atomic_int_set (&status->last_changed, (gint)time(NULL));
seaf_debug("worktree change detected, repo %s\n", repo_id);
reset_overlapped(ol);
if (!start_watch_dir_change(priv, hTriggered)) {
seaf_warning ("start_watch_dir_change failed"
"for repo %s, error code %lu\n",
repo_id, GetLastError());
}
} else {
/* A previously unwatched dir_handle's DirWatchAux buf was
scheduled to be freed. */
DirWatchAux *aux = g_hash_table_lookup (priv->buf_hash, (gconstpointer)hTriggered);
if (aux && aux->unused)
g_free (aux);
}
}
}
return NULL;
}
static int handle_add_repo (SeafWTMonitorPriv *priv, const char *repo_id, long *handle)
{
HANDLE inotify_fd;
g_return_val_if_fail (handle != NULL, -1);
inotify_fd = add_watch (repo_id);
if (inotify_fd == NULL ||
!add_handle_to_iocp(priv, inotify_fd)) {
return -1;
}
*handle = (long)inotify_fd;
return 0;
}
static int handle_rm_repo (SeafWTMonitorPriv *priv, gpointer handle)
{
HANDLE inotify_fd = (HANDLE)handle;
rm_from_buf_hash(priv, inotify_fd);
pthread_mutex_lock (&priv->hash_lock);
g_hash_table_remove (priv->handle_hash, repo_id);
g_hash_table_remove (priv->info_hash, handle);
rm_from_buf_hash(priv, h);
pthread_mutex_unlock (&priv->hash_lock);
return 0;
}
@ -453,4 +697,105 @@ static int handle_refresh_repo (SeafWTMonitorPriv *priv, const char *repo_id)
return 0;
}
#include "wt-monitor-common.h"
static void
reply_watch_command (SeafWTMonitor *monitor, int result)
{
int n;
n = pipewriten (monitor->res_pipe[1], &result, sizeof(int));
if (n != sizeof(int))
seaf_warning ("[wt mon] fail to write command result.\n");
}
static void
handle_watch_command (SeafWTMonitor *monitor, WatchCommand *cmd)
{
SeafWTMonitorPriv *priv = monitor->priv;
if (cmd->type == CMD_ADD_WATCH) {
if (g_hash_table_lookup_extended (priv->handle_hash, cmd->repo_id,
NULL, NULL)) {
reply_watch_command (monitor, 0);
return;
}
if (handle_add_repo(monitor, cmd->repo_id, cmd->worktree) < 0) {
seaf_warning ("[wt mon] failed to watch worktree of repo %s.\n",
cmd->repo_id);
reply_watch_command (monitor, -1);
return;
}
seaf_debug ("[wt mon] add watch for repo %s\n", cmd->repo_id);
reply_watch_command (monitor, 0);
} else if (cmd->type == CMD_DELETE_WATCH) {
gpointer key, value;
if (!g_hash_table_lookup_extended (priv->handle_hash, cmd->repo_id,
&key, &value)) {
reply_watch_command (monitor, 0);
return;
}
handle_rm_repo (priv, cmd->repo_id, value);
reply_watch_command (monitor, 0);
} else if (cmd->type == CMD_REFRESH_WATCH) {
if (handle_refresh_repo (priv, cmd->repo_id) < 0) {
seaf_warning ("[wt mon] failed to refresh watch of repo %s.\n",
cmd->repo_id);
reply_watch_command (monitor, -1);
return;
}
reply_watch_command (monitor, 0);
}
}
/* Public interface functions. */
SeafWTMonitor *
seaf_wt_monitor_new (SeafileSession *seaf)
{
SeafWTMonitor *monitor = g_new0 (SeafWTMonitor, 1);
SeafWTMonitorPriv *priv = g_new0 (SeafWTMonitorPriv, 1);
pthread_mutex_init (&priv->hash_lock, NULL);
priv->handle_hash = g_hash_table_new_full
(g_str_hash, g_str_equal, g_free, NULL);
priv->info_hash = g_hash_table_new_full
(g_direct_hash, g_direct_equal, NULL, (GDestroyNotify)free_repo_watch_info);
priv->buf_hash = g_hash_table_new_full
(g_direct_hash, g_direct_equal, NULL, g_free);
monitor->priv = priv;
monitor->seaf = seaf;
monitor->job_func = wt_monitor_job_win32;
return monitor;
}
WTStatus *
seaf_wt_monitor_get_worktree_status (SeafWTMonitor *monitor,
const char *repo_id)
{
SeafWTMonitorPriv *priv = monitor->priv;
gpointer key, value;
RepoWatchInfo *info;
pthread_mutex_lock (&priv->hash_lock);
if (!g_hash_table_lookup_extended (priv->handle_hash, repo_id,
&key, &value)) {
pthread_mutex_unlock (&priv->hash_lock);
return NULL;
}
info = g_hash_table_lookup(priv->info_hash, value);
wt_status_ref (info->status);
pthread_mutex_unlock (&priv->hash_lock);
return info->status;
}