Skip to content

Commit

Permalink
Merge pull request #8 from insight-platform/bugfix/7-block-offset-cal…
Browse files Browse the repository at this point in the history
…culation

Bugfix/7 block offset calculation
  • Loading branch information
bwsw authored Aug 5, 2024
2 parents c58d67a + a83820a commit ab4a2d3
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 56 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ tokio-timerfd = "0.2"
uuid = { version = "1", features = ["v7"] }

[workspace.package]
version = "0.9.1"
version = "0.9.2"
edition = "2021"
authors = ["Ivan Kudriavtsev <[email protected]>"]
description = "ReplayDB Service"
Expand Down
44 changes: 22 additions & 22 deletions replaydb/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ mod tests {
async fn test_read_message() -> Result<()> {
let (r, w) = get_channel().await?;

let f0 = gen_properly_filled_frame();
let f0 = gen_properly_filled_frame(true);
let f0_uuid = f0.get_uuid();
let store = MockStore {
messages: vec![
Expand All @@ -672,7 +672,7 @@ mod tests {
Duration::from_millis(10),
),
(
Some((gen_properly_filled_frame().to_message(), vec![], vec![])),
Some((gen_properly_filled_frame(true).to_message(), vec![], vec![])),
Duration::from_millis(100),
),
(None, Duration::from_millis(1)),
Expand Down Expand Up @@ -713,7 +713,7 @@ mod tests {
async fn test_read_no_data() -> Result<()> {
let (r, w) = get_channel().await?;

let f0 = gen_properly_filled_frame();
let f0 = gen_properly_filled_frame(true);
let f0_uuid = f0.get_uuid();
let store = MockStore {
messages: vec![
Expand Down Expand Up @@ -786,7 +786,7 @@ mod tests {
None,
)?;

let m = job.prepare_message(gen_properly_filled_frame().to_message())?;
let m = job.prepare_message(gen_properly_filled_frame(true).to_message())?;
assert!(m.is_some());
let m = m.unwrap();
assert_eq!(
Expand Down Expand Up @@ -839,7 +839,7 @@ mod tests {
None,
)?;

let m = job.prepare_message(gen_properly_filled_frame().to_message())?;
let m = job.prepare_message(gen_properly_filled_frame(true).to_message())?;
assert!(m.is_some());
let m = m.unwrap().as_video_frame().unwrap();
assert_eq!(m.get_source_id(), "resulting_id".to_string());
Expand Down Expand Up @@ -884,13 +884,13 @@ mod tests {
let res = job.check_ts_decrease(&eos)?;
assert_eq!(res, false);

let first = gen_properly_filled_frame().to_message();
let first = gen_properly_filled_frame(true).to_message();

tokio_timerfd::sleep(Duration::from_millis(1)).await?;
let second = gen_properly_filled_frame().to_message();
let second = gen_properly_filled_frame(true).to_message();

tokio_timerfd::sleep(Duration::from_millis(1)).await?;
let third = gen_properly_filled_frame().to_message();
let third = gen_properly_filled_frame(true).to_message();

let res = job.check_ts_decrease(&first)?;
assert_eq!(res, false);
Expand Down Expand Up @@ -932,10 +932,10 @@ mod tests {
None,
)?;

let first = gen_properly_filled_frame().to_message();
let first = gen_properly_filled_frame(true).to_message();

tokio_timerfd::sleep(Duration::from_millis(1)).await?;
let second = gen_properly_filled_frame().to_message();
let second = gen_properly_filled_frame(true).to_message();

let res = job.check_ts_decrease(&second)?;
assert_eq!(res, false);
Expand Down Expand Up @@ -1033,9 +1033,9 @@ mod tests {
let (r, w) = get_channel().await?;

let frames = vec![
gen_properly_filled_frame(),
gen_properly_filled_frame(),
gen_properly_filled_frame(),
gen_properly_filled_frame(true),
gen_properly_filled_frame(true),
gen_properly_filled_frame(true),
];

let store = MockStore {
Expand Down Expand Up @@ -1095,7 +1095,7 @@ mod tests {
let mut frames = vec![];
let n = 20;
for _ in 0..n {
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);
frames.push(f);
tokio_timerfd::sleep(Duration::from_millis(30)).await?;
}
Expand Down Expand Up @@ -1173,7 +1173,7 @@ mod tests {
messages: vec![
(
{
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);
first_uuid = f.get_uuid();
Some((f.to_message(), vec![], vec![]))
},
Expand All @@ -1188,7 +1188,7 @@ mod tests {
Duration::from_millis(0),
),
(
Some((gen_properly_filled_frame().to_message(), vec![], vec![])),
Some((gen_properly_filled_frame(true).to_message(), vec![], vec![])),
Duration::from_millis(0),
),
(
Expand All @@ -1202,7 +1202,7 @@ mod tests {
(
Some((
{
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);
last_uuid = f.get_uuid_u128();
f
}
Expand Down Expand Up @@ -1268,7 +1268,7 @@ mod tests {

let n = 20;
for _ in 0..n {
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);
frames.push(f);
tokio_timerfd::sleep(Duration::from_millis(10)).await?;
}
Expand Down Expand Up @@ -1341,7 +1341,7 @@ mod tests {

let n = 20;
for _ in 0..n {
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);
frames.push(f);
tokio_timerfd::sleep(Duration::from_millis(30)).await?;
}
Expand Down Expand Up @@ -1425,9 +1425,9 @@ mod tests {
let (r, w) = get_channel().await?;

let frames = vec![
gen_properly_filled_frame(),
gen_properly_filled_frame(),
gen_properly_filled_frame(),
gen_properly_filled_frame(true),
gen_properly_filled_frame(true),
gen_properly_filled_frame(true),
];

let store = MockStore {
Expand Down
8 changes: 4 additions & 4 deletions replaydb/src/job/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ mod tests {

let mut factory =
RocksDbJobFactory::new(store.clone(), 1024u64.try_into()?, Duration::from_secs(30))?;
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);
let source_id = f.get_source_id();
store
.lock()
.await
.add_message(&f.to_message(), &[], &[])
.await?;
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);
store
.lock()
.await
Expand Down Expand Up @@ -153,14 +153,14 @@ mod tests {

let mut factory =
RocksDbJobFactory::new(store.clone(), 1024u64.try_into()?, Duration::from_secs(30))?;
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);
let source_id = f.get_source_id();
store
.lock()
.await
.add_message(&f.to_message(), &[], &[])
.await?;
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);

let f_clone = f.clone();
tokio::spawn(async move {
Expand Down
14 changes: 7 additions & 7 deletions replaydb/src/job/stop_condition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,19 @@ mod tests {

#[test]
fn test_last_frame_stop_condition() -> Result<()> {
let frame_before = gen_properly_filled_frame();
let frame_before = gen_properly_filled_frame(true);
thread::sleep(Duration::from_millis(1));
let mut stop_condition = JobStopCondition::last_frame(incremental_uuid_v7().as_u128());
assert!(!stop_condition.check(&frame_before.to_message())?);
thread::sleep(Duration::from_millis(1));
let frame_after = gen_properly_filled_frame();
let frame_after = gen_properly_filled_frame(true);
assert!(stop_condition.check(&frame_after.to_message())?);
Ok(())
}

#[test]
fn test_frame_count_stop_condition() -> Result<()> {
let frame = gen_properly_filled_frame();
let frame = gen_properly_filled_frame(true);
let mut stop_condition = JobStopCondition::frame_count(2);
assert!(!stop_condition.check(&frame.to_message())?);
assert!(stop_condition.check(&frame.to_message())?);
Expand All @@ -163,20 +163,20 @@ mod tests {

#[test]
fn test_key_frame_count_stop_condition() -> Result<()> {
let mut frame = gen_properly_filled_frame();
let mut frame = gen_properly_filled_frame(true);
frame.set_keyframe(Some(true));
let mut stop_condition = JobStopCondition::key_frame_count(2);
assert!(!stop_condition.check(&frame.to_message())?);
frame.set_keyframe(Some(false));
assert!(!stop_condition.check(&frame.to_message())?);
let key_frame = gen_properly_filled_frame();
let key_frame = gen_properly_filled_frame(true);
assert!(stop_condition.check(&key_frame.to_message())?);
Ok(())
}

#[test]
fn test_pts_delta_stop_condition() -> Result<()> {
let mut frame = gen_properly_filled_frame();
let mut frame = gen_properly_filled_frame(true);
frame.set_time_base((1, 1_000_000));
frame.set_pts(1_000_000);
let mut stop_condition = JobStopCondition::pts_delta_sec(1.0);
Expand All @@ -190,7 +190,7 @@ mod tests {

#[test]
fn test_real_time_delta_stop_condition() -> Result<()> {
let frame = gen_properly_filled_frame();
let frame = gen_properly_filled_frame(true);
let mut stop_condition = JobStopCondition::real_time_delta_ms(500);
assert!(!stop_condition.check(&frame.to_message())?);
thread::sleep(Duration::from_millis(600));
Expand Down
4 changes: 2 additions & 2 deletions replaydb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::time::SystemTime;
use tokio::sync::Mutex;
use uuid::Uuid;

pub fn gen_properly_filled_frame() -> VideoFrameProxy {
pub fn gen_properly_filled_frame(kf: bool) -> VideoFrameProxy {
let mut f = gen_frame();
let (tbn, tbd) = (1, 1_000_000);
let now_nanos = SystemTime::now()
Expand All @@ -22,7 +22,7 @@ pub fn gen_properly_filled_frame() -> VideoFrameProxy {
f.set_pts(pts);
f.set_creation_timestamp_ns(now_nanos);
f.set_time_base((tbn, tbd));
f.set_keyframe(Some(true));
f.set_keyframe(Some(kf));
f
}

Expand Down
Loading

0 comments on commit ab4a2d3

Please sign in to comment.