Skip to content

Commit

Permalink
ADD get_rate, suspend using threading.Event
Browse files Browse the repository at this point in the history
  • Loading branch information
ehdgua01 committed May 23, 2020
1 parent c6ffe00 commit fdec5c3
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 33 deletions.
38 changes: 22 additions & 16 deletions blocksync/syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ def __init__(self) -> None:

# internal attributes or properties
self._lock = threading.Lock()
self._event = threading.Event()
self._create = threading.Event()
self._blocks: Dict[str, int] = {
"size": -1,
"same": 0,
"diff": 0,
"done": 0,
}
self._worker_threads: List[threading.Thread] = []
self._suspend = False
self._suspend = threading.Event()
self._cancel = False
self._started = False
self._logger = blocksync_logger
Expand Down Expand Up @@ -100,12 +100,12 @@ def set_logger(self, logger: logging.Logger) -> "Syncer":
return self

def suspend(self) -> "Syncer":
self._suspend = True
self._suspend.clear()
self._logger.info("Suspending...")
return self

def resume(self) -> "Syncer":
self._suspend = False
self._suspend.set()
self._logger.info("Resuming...")
return self

Expand Down Expand Up @@ -142,7 +142,8 @@ def start_sync(
raise TypeError("Interval and pause requires float or int type")

self.reset_blocks()
self._event.clear()
self._create.clear()
self._suspend.set()
self._worker_threads = [
threading.Thread(
target=self._sync,
Expand Down Expand Up @@ -195,15 +196,17 @@ def _sync(
if create:
if worker_id == 1:
self.destination.do_create(self.source.size)
self._event.set()
self._create.set()
else:
self._event.wait()
self._create.wait()
self.destination.do_close().do_open()
else:
raise FileNotFoundError
self._logger.error("Not exists destination file")
return

if self.source.size != self.destination.size:
raise ValueError("size not same")
self._logger.error("size not same")
return
elif self._blocks["size"] == -1:
self._blocks["size"] = self.source.size

Expand All @@ -230,11 +233,11 @@ def _sync(
self.source.get_blocks(block_size),
self.destination.get_blocks(block_size),
):
while self._suspend:
if not self._suspend.is_set():
self._logger.info(
"[Worker {}]: Suspending...".format(worker_id)
)
time.sleep(1)
self._suspend.wait()

if self._cancel:
raise CancelSync(
Expand Down Expand Up @@ -276,11 +279,18 @@ def _sync(
self._logger.info(e)
except Exception as e:
if self.on_error:
self.on_error(e, self._blocks)
return self.on_error(e, self._blocks)
finally:
self.source.do_close()
self.destination.do_close()

def get_rate(self, block_size: int = UNITS["MiB"]) -> float:
if self._blocks["done"] < 1:
return 0.00

rate = (self._blocks["done"] / (self._blocks["size"] // block_size or 1)) * 100
return 100.00 if 100 <= rate else rate

@property
def source(self) -> File:
return self._source
Expand All @@ -293,10 +303,6 @@ def destination(self) -> File:
def blocks(self) -> Dict[str, int]:
return self._blocks

@property
def rate(self) -> float:
return (self._blocks["done"] / self._blocks["size"]) * 100

@property
def started(self) -> bool:
return self._started
Expand Down
45 changes: 28 additions & 17 deletions tests/test_syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def test_start_sync(self) -> None:
.started,
True,
)
self.assertFalse(self.syncer.finished)
self.assertEqual(self.syncer.wait(), self.syncer)
self.assertEqual(self.syncer.finished, True)
self.assertEqual(
Expand All @@ -165,6 +166,16 @@ def test_start_sync(self) -> None:
self.source.do_open().execute_with_result("read"),
self.destination.do_open().execute_with_result("read"),
)

self.assertEqual(
self.syncer.set_hash_algorithms(["sha256"])
.start_sync(workers=5, block_size=UNITS["MiB"], interval=0, wait=True)
.started,
True,
)
self.assertEqual(
self.syncer.blocks, {"size": size, "same": 10, "diff": 0, "done": 10}
)
self.source.do_close()
self.destination.do_close()

Expand All @@ -180,6 +191,14 @@ def test_cancel_sync(self) -> None:
self.syncer.start_sync(wait=True, create=True)
self.syncer._add_block.assert_not_called()

def test_get_rate(self) -> None:
self.create_source_file(10)
self.assertEqual(self.syncer.get_rate(), 0.00)
self.syncer.set_source(self.source).set_destination(
self.destination
).start_sync(wait=True, create=True)
self.assertEqual(self.syncer.get_rate(), 100.00)

def test_dryrun(self) -> None:
self.create_source_file(10)
self.syncer.set_source(self.source).set_destination(
Expand All @@ -195,30 +214,22 @@ def test_dryrun(self) -> None:

def test_suspend_and_resume(self) -> None:
self.create_source_file(10)
self.assertEqual(
self.syncer.set_source(self.source)
.set_destination(self.destination)
.suspend(),
self.syncer,
)
self.assertTrue(self.syncer._suspend)

self.syncer._logger.info = unittest.mock.MagicMock()

with self.timeout(3):
self.syncer.start_sync(wait=True, create=True)
self.syncer.set_source(self.source).set_destination(
self.destination
).start_sync(create=True)
self.assertEqual(
self.syncer.suspend(), self.syncer,
)
self.syncer.wait()

self.syncer._logger.info.assert_called_with(
"[Worker {}]: Suspending...".format(1)
)
self.assertEqual(
self.syncer.set_source(self.source)
.set_destination(self.destination)
.resume(),
self.syncer,
)
self.assertFalse(self.syncer._suspend)
self.syncer.start_sync(wait=True, create=True)
self.assertEqual(self.syncer.resume(), self.syncer)
self.syncer.wait()
self.assertEqual(
self.syncer.blocks, {"size": 10, "same": 0, "diff": 1, "done": 1}
)

0 comments on commit fdec5c3

Please sign in to comment.