[test] Update client auto sync tests.

- Make each test case "unit test" by separating the test directories.
- Complete upload/download tests.
This commit is contained in:
Jiaqiang Xu 2015-10-21 17:24:04 +08:00
parent 9fb60e16e4
commit 8685d82d05
2 changed files with 263 additions and 113 deletions

View File

@ -7,155 +7,304 @@ from threading import Thread
from seaf_op import get_token, urlopen
from . import test_util
'''echo 111 > 1.txt; sleep 3; mv 1.txt 2.txt
'''
Test add-delete-add sequence.
'''
def test_add_delete_add():
test_util.set_test_root('test_add_delete_add')
test_util.mkfile(1, 'test/1.txt', 'aaaaaaaa')
test_util.rmdir(1, 'test')
test_util.mkfile(1, 'test/1.txt', 'aaaaaaaa')
test_util.verify_result()
'''
Rename test:
echo 111 > 1.txt; sleep 3; mv 1.txt 2.txt
echo 222 > 3.txt; sleep 3; mv 2.txt 3.txt
echo test > test.txt
mkdir test; echo 444 > 4.txt; sleep 3; mv *.txt test
mkdir test2; mv test test2
mv test2/test .
echo 555 >> test/4.txt; mv test test2
mv test2 test3
'''
# note this case can't pass on windows
def test_rename():
test_util.set_test_root('test_rename')
test_util.mkfile(1, '1.txt', '111')
time.sleep(3)
test_util.move(1, '1.txt', '2.txt')
time.sleep(3)
test_util.mkfile(1, '3.txt', '222')
time.sleep(3)
test_util.move(1, '2.txt', '3.txt')
time.sleep(3)
test_util.mkfile(1, 'test.txt', 'test')
time.sleep(3)
test_util.mkdir(1, 'test')
test_util.mkfile(1, '4.txt', '444')
time.sleep(3)
test_util.batchmove(1, '*.txt', 'test')
time.sleep(3)
test_util.mkdir(1, 'test2')
test_util.move(1, 'test', 'test2')
time.sleep(3)
test_util.move(1, 'test2/test', '')
time.sleep(3)
test_util.modfile(1, 'test/4.txt', '555')
test_util.move(1, 'test', 'test2')
time.sleep(3)
test_util.move(1, 'test2', 'test3')
test_util.verify_result()
'''mkdir test
'''
Create and update test:
echo 111 > test/1.txt
echo 222 >> test/1.txt
copy a dir with multiple levels into test dir
move a dir with multiple levels into test dir
在一个空目录下添加文件变成非空目录
修改一个文件的时间戳
create an empty dir
add file into an empty folder
'''
def test_create_update():
test_util.mkdir(2, 'test')
test_util.mkfile(2, 'test/1.txt', '111')
test_util.modfile(2, 'test/1.txt', '222')
test_util.set_test_root('test_create_update')
test_util.mkdir(1, 'test')
test_util.mkfile(1, 'test/1.txt', '111')
time.sleep(3)
test_util.mkdir(2, '1/2/3/4/5')
test_util.copy(2, '1', '1_cp')
test_util.mkdir(2, '6/7/8/9/10')
test_util.move(2, '6', 'test')
test_util.modfile(1, 'test/1.txt', '222')
time.sleep(3)
test_util.mkfile(2, '1/1.md', 'dddddddddddddddddddddd')
time.sleep(1)
test_util.touch(2, '1/1.md')
test_util.mkdir(1, '1/2/3/4/5')
test_util.mkfile(1, '1/1.txt', '111')
test_util.mkfile(1, '1/2/2.txt', '222')
test_util.copy(1, '1', 'test/1')
time.sleep(3)
test_util.mkdir(1, 'empty')
time.sleep(3)
test_util.mkfile(1, 'empty/test.md', 'dddddddddddddddddddddd')
test_util.verify_result()
'''rm test/1.txt
'''
Delete test:
echo 222 > 2.txt
rm 2.txt
delete a dir with multiple levels
move a dir with multiple levels out of test dir
把一个目录里面的文件全部删除变成空目录
delete all files under a dir, make it empty.
delete empty dir
'''
def test_delete():
test_util.rmfile(2, 'test/1.txt')
test_util.mkfile(2, '2.txt', '2222')
test_util.set_test_root('test_delete')
test_util.mkfile(1, '2.txt', '2222')
time.sleep(3)
test_util.rmfile(2, '2.txt')
test_util.rmdir(2, '1')
test_util.rmfile(1, '2.txt')
test_util.mkdir(1, '1/2/3/4/5')
test_util.mkfile(1, '1/1.txt', '111')
test_util.mkfile(1, '1/2/2.txt', '222')
test_util.copy(1, '1', 'test/1')
time.sleep(3)
test_util.move(2, '1_cp/2', '')
test_util.mkdir(1, 'empty')
for i in xrange(3):
test_util.mkfile(1, 'empty/%d.txt' % i, 'dddddddddd')
test_util.rmdir(1, '1')
time.sleep(3)
for i in xrange(3):
test_util.rmfile(1, 'empty/%d.txt' % i)
test_util.rmdir(1, 'test/1')
time.sleep(3)
test_util.rmdir(1, 'test')
test_util.verify_result()
'''修改文件或者目录的名字大小写,如从 test 到 TEST
关闭客户端执行上述操作然后启动客户端
'''
def test_rename_case():
test_util.mkdir(1, 'UPPER')
test_util.mkfile(2, 'lower.md', 'lowerfffff')
Case rename test:
rename a dir from 'test' to 'TEST'
disalbe auto sync; rename the dir from 'TEST' to 'test'; enable auto sync.
'''
def test_case_rename():
test_util.set_test_root('test_case_rename')
test_util.mkdir(1, 'test')
test_util.mkfile(1, 'a.txt', 'aaaa')
time.sleep(3)
test_util.move(1, 'UPPER', 'upper')
test_util.move(2, 'lower.md', 'LOWER.md')
test_util.move(1, 'test', 'TEST')
test_util.verify_result()
test_util.desync_cli1()
test_util.move(1, 'upper', 'UPPER')
test_util.move(1, 'LOWER.md', 'lower.md')
test_util.move(1, 'TEST', 'test')
test_util.sync_cli1()
test_util.verify_result()
'''在 web 上把一个有内容的目录 abc 重命名为 ABC
假设有非空目录 abc/test/先关闭自动同步 web 上重命名为 ABC/TEST/再打开自动同步
假设有非空目录 test以及文件 a.txt先关闭自动同步 web 上把 test 重命名为 TEST然后把 a.txt 移动到 TEST 下面打开自动同步
'''
def test_rename_download():
test_util.mkdir(1, 'abc/test')
test_util.mkfile(2, 'a.txt', 'aaaaaaaaaaaaaaaaaaa')
time.sleep(6)
A test set for downloads. The updates are done on cli1 and downloaded on cli2.
Note that these tests are different from upload tests. In upload tests, we deliberately
combine multiple operations into one test; in download tests, we must ensure each
operation be carried out individually on cli2.
'''
token = get_token(test_util.setting.server_url,
test_util.setting.user,
test_util.setting.password)
headers = {'Authorization': 'Token %s' % token}
# Create a new file
def test_download_1():
test_util.set_test_root('test_download')
def rename_on_web(path, newname):
data = {'operation': 'rename', 'newname': newname}
urlopen('%s/api2/repos/%s/dir/?p=%s' % \
(test_util.setting.server_url, test_util.repo_id, path),
data, headers)
test_util.mkfile(1, '1.txt', '11111')
def move_on_web(dest_path, fname):
data = {'operation': 'move', 'dst_repo': test_util.repo_id, 'dst_dir': dest_path}
urlopen('%s/api2/repos/%s/file/?p=%s' % \
(test_util.setting.server_url, test_util.repo_id, fname),
data, headers, False)
rename_on_web('/abc', 'ABC')
test_util.verify_result()
if not os.path.exists(test_util.getpath(1, 'ABC')):
assert False, 'ABC dir should exist in two worktrees'
test_util.desync_cli2()
rename_on_web('/ABC/test', 'TEST')
test_util.sync_cli2()
# Update a file
def test_download_2():
test_util.set_test_root('test_download')
test_util.modfile(1, '1.txt', '22222')
test_util.verify_result()
if not os.path.exists(test_util.getpath(2, 'ABC/TEST')):
assert False, 'ABC/TEST dir should exit in worktree2'
test_util.mkdir(1, 'bcd')
test_util.mkfile(1, 'bcd.md', 'bcddddd')
time.sleep(6)
test_util.desync_cli1()
rename_on_web('/bcd', 'BCD')
move_on_web('/BCD', '/bcd.md')
test_util.sync_cli1()
# Create empty dir
def test_download_3():
test_util.set_test_root('test_download')
test_util.mkdir(1, 'dir1')
test_util.verify_result()
# Rename a file
def test_download_4():
test_util.set_test_root('test_download')
test_util.move(1, '1.txt', '2.txt')
test_util.verify_result()
# Rename empty dir
def test_download_5():
test_util.set_test_root('test_download')
test_util.move(1, 'dir1', 'dir2')
test_util.verify_result()
# Create file in empty dir
def test_download_6():
test_util.set_test_root('test_download')
test_util.mkfile(1, 'dir2/1.txt', '1111111')
test_util.verify_result()
# Rename a non-empty dir
def test_download_7():
test_util.set_test_root('test_download')
test_util.move(1, 'dir2', 'dir3')
test_util.verify_result()
# Remove all files in a non-empty dir
def test_download_8():
test_util.set_test_root('test_download')
test_util.rmfile(1, 'dir3/1.txt')
test_util.verify_result()
# Move a non-empty dir into an empty dir
def test_download_9():
test_util.set_test_root('test_download')
test_util.mkfile(1, 'dir4/2.txt', '2222222')
test_util.verify_result()
test_util.move(1, 'dir4', 'dir3')
test_util.verify_result()
# Delete file
def test_download_10():
test_util.set_test_root('test_download')
test_util.rmfile(1, '2.txt')
test_util.verify_result()
# Delete a non-empty dir
def test_download_11():
test_util.set_test_root('test_download')
test_util.rmdir(1, 'dir3/dir4')
test_util.verify_result()
# Delete empty dir
def test_download_12():
test_util.set_test_root('test_download')
test_util.rmdir(1, 'dir3')
test_util.verify_result()
'''
Test cases for download case rename
'''
def test_download_case_rename_1():
test_util.set_test_root('test_download_case_rename')
test_util.mkfile(1, 'abc/test/test.txt', 'testtset')
test_util.verify_result()
test_util.move(1, 'abc/test', 'abc/TEST')
test_util.move(1, 'abc', 'ABC')
test_util.verify_result()
def test_download_case_rename_2():
test_util.set_test_root('test_download_case_rename')
test_util.mkfile(1, 'a.txt', 'aaaaaaaaaaaaaaaaaaa')
test_util.mkfile(1, 'test/b.txt', 'bbbbbbbbbb')
test_util.verify_result()
test_util.move(1, 'test', 'TEST')
test_util.move(1, 'a.txt', 'TEST')
test_util.verify_result()
'''create cur.md, wait for synced then concurrent modify cur.md
'''
def mod_file(worktree, fname):
test_util.modfile(worktree, fname, test_util.getpath(worktree, fname))
# def mod_file(worktree, fname):
# test_util.modfile(worktree, fname, test_util.getpath(worktree, fname))
def test_concurrent_mod():
test_util.mkfile(1, 'cur.md', 'curcurcurrrrrrrrrrrrrrrrrrrrr')
test_util.verify_result()
thread1 = Thread(target=mod_file, args=(1, 'cur.md'))
thread2 = Thread(target=mod_file, args=(2, 'cur.md'))
thread1.start()
thread2.start()
test_util.verify_result()
files = glob.glob(test_util.getpath(1, 'cur*.md'))
assert len(files) == 2, 'Should generate conflict file'
# def test_concurrent_mod():
# test_util.mkfile(1, 'cur.md', 'curcurcurrrrrrrrrrrrrrrrrrrrr')
# test_util.verify_result()
# thread1 = Thread(target=mod_file, args=(1, 'cur.md'))
# thread2 = Thread(target=mod_file, args=(2, 'cur.md'))
# thread1.start()
# thread2.start()
# test_util.verify_result()
# files = glob.glob(test_util.getpath(1, 'cur*.md'))
# assert len(files) == 2, 'Should generate conflict file'

View File

@ -26,6 +26,10 @@ class TestUtil():
except Exception:
pass
self.repo_id = None
self.test_root = ''
def set_test_root(self, root):
self.test_root = root
@staticmethod
def clean_sync_data(conf):
@ -110,11 +114,8 @@ class TestUtil():
pass
def wait_sync(self):
max_retry = 12
cur_retry = 0
while cur_retry < max_retry:
while True:
time.sleep(5)
cur_retry += 1
repo1 = seaf_op.seaf_get_repo(self.cli1_dir, self.repo_id)
if repo1 is None:
continue
@ -162,15 +163,15 @@ class TestUtil():
# worktree: 1(worktree1), 2(worktree2)
def mkdir(self, worktree, path):
if worktree == 1:
os.makedirs(os.path.join(self.worktree1, path))
os.makedirs(os.path.join(self.worktree1, self.test_root, path))
elif worktree == 2:
os.makedirs(os.path.join(self.worktree2, path))
os.makedirs(os.path.join(self.worktree2, self.test_root, path))
def rmdir(self, worktree, path):
if worktree == 1:
shutil.rmtree(os.path.join(self.worktree1, path))
shutil.rmtree(os.path.join(self.worktree1, self.test_root, path))
elif worktree == 2:
shutil.rmtree(os.path.join(self.worktree2, path))
shutil.rmtree(os.path.join(self.worktree2, self.test_root, path))
def mkfile(self, worktree, fpath, con=''):
if worktree == 1:
@ -179,7 +180,7 @@ class TestUtil():
pdir = self.worktree2
else:
return
abs_path = os.path.join(pdir, fpath)
abs_path = os.path.join(pdir, self.test_root, fpath)
dirname = os.path.dirname(abs_path)
if not os.path.exists(dirname):
os.makedirs(dirname)
@ -188,9 +189,9 @@ class TestUtil():
def rmfile(self, worktree, fpath):
if worktree == 1:
os.remove(os.path.join(self.worktree1, fpath))
os.remove(os.path.join(self.worktree1, self.test_root, fpath))
elif worktree == 2:
os.remove(os.path.join(self.worktree2, fpath))
os.remove(os.path.join(self.worktree2, self.test_root, fpath))
def modfile(self, worktree, fpath, con=''):
if worktree == 1:
@ -199,17 +200,17 @@ class TestUtil():
pdir = self.worktree2
else:
return
abs_path = os.path.join(pdir, fpath)
abs_path = os.path.join(pdir, self.test_root, fpath)
with open(abs_path, 'a') as fd:
fd.write(con)
def move(self, worktree, org_path, dest_path):
if worktree == 1:
shutil.move(os.path.join(self.worktree1, org_path),
os.path.join(self.worktree1, dest_path))
shutil.move(os.path.join(self.worktree1, self.test_root, org_path),
os.path.join(self.worktree1, self.test_root, dest_path))
elif worktree == 2:
shutil.move(os.path.join(self.worktree2, org_path),
os.path.join(self.worktree2, dest_path))
shutil.move(os.path.join(self.worktree2, self.test_root, org_path),
os.path.join(self.worktree2, self.test_root, dest_path))
def batchmove(self, worktree, regex, dest_path):
if worktree == 1:
@ -218,28 +219,28 @@ class TestUtil():
pdir = self.worktree2
else:
return
files = glob.glob(os.path.join(pdir, regex))
dest = os.path.join(pdir, dest_path)
files = glob.glob(os.path.join(pdir, self.test_root, regex))
dest = os.path.join(pdir, self.test_root, dest_path)
for f in files:
shutil.move(f, dest)
def copy(self, worktree, org_path, dest_path):
if worktree == 1:
shutil.copytree(os.path.join(self.worktree1, org_path),
os.path.join(self.worktree1, dest_path))
shutil.copytree(os.path.join(self.worktree1, self.test_root, org_path),
os.path.join(self.worktree1, self.test_root, dest_path))
elif worktree == 2:
shutil.copytree(os.path.join(self.worktree2, org_path),
os.path.join(self.worktree2, dest_path))
shutil.copytree(os.path.join(self.worktree2, self.test_root, org_path),
os.path.join(self.worktree2, self.test_root, dest_path))
def touch(self, worktree, path, time=None):
if worktree == 1:
os.utime(os.path.join(self.worktree1, path), time)
os.utime(os.path.join(self.worktree1, self.test_root, path), time)
if worktree == 2:
os.utime(os.path.join(self.worktree2, path), time)
os.utime(os.path.join(self.worktree2, self.test_root, path), time)
def getpath(self, worktree, path):
if worktree == 1:
return os.path.join(self.worktree1, path)
return os.path.join(self.worktree1, self.test_root, path)
elif worktree == 2:
return os.path.join(self.worktree2, path)
return os.path.join(self.worktree2, self.test_root, path)
raise Exception('Invalid worktree')