diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index fc151e0d46c2f..2e80aca2db6b5 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -96,17 +96,23 @@ impl KvLogStoreMetrics { Self::new_inner(metrics, actor_id, id, name, connector) } + /// `id`: refers to a unique way to identify the logstore. This can be the sink id, + /// or for joins, it can be the `fragment_id`. + /// `name`: refers to the MV / Sink that the log store is associated with. + /// `target`: refers to the target of the log store, + /// for instance `MySql` Sink, PG sink, etc... + /// or unaligned join. pub(crate) fn new_inner( metrics: &StreamingMetrics, actor_id: ActorId, id: u32, name: &str, - upstream: &'static str, + target: &'static str, ) -> Self { let actor_id_str = actor_id.to_string(); let id_str = id.to_string(); - let labels = &[&actor_id_str, upstream, &id_str, name]; + let labels = &[&actor_id_str, target, &id_str, name]; let storage_write_size = metrics .kv_log_store_storage_write_size .with_guarded_label_values(labels); @@ -121,7 +127,7 @@ impl KvLogStoreMetrics { .kv_log_store_storage_read_size .with_guarded_label_values(&[ &actor_id_str, - upstream, + target, &id_str, name, READ_PERSISTENT_LOG, @@ -130,7 +136,7 @@ impl KvLogStoreMetrics { .kv_log_store_storage_read_count .with_guarded_label_values(&[ &actor_id_str, - upstream, + target, &id_str, name, READ_PERSISTENT_LOG, @@ -140,7 +146,7 @@ impl KvLogStoreMetrics { .kv_log_store_storage_read_size .with_guarded_label_values(&[ &actor_id_str, - upstream, + target, &id_str, name, READ_FLUSHED_BUFFER, @@ -149,7 +155,7 @@ impl KvLogStoreMetrics { .kv_log_store_storage_read_count .with_guarded_label_values(&[ &actor_id_str, - upstream, + target, &id_str, name, READ_FLUSHED_BUFFER,