Skip to content

Commit

Permalink
Merge branch 'main' into working
Browse files Browse the repository at this point in the history
  • Loading branch information
bibhu107 authored Feb 19, 2025
2 parents 32b1c21 + 087d96b commit a3387d9
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 42 deletions.
5 changes: 5 additions & 0 deletions e2e_test/batch/catalog/sysinfo.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ create table tab(num int, name varchar);
statement ok
create index tab_idx on tab(num desc);

query T
select pg_get_indexdef(('"' || current_database() || '".' || 'public.tab_idx')::regclass);
----
CREATE INDEX tab_idx ON tab(num DESC)

query T
select pg_get_indexdef('tab_idx'::regclass);
----
Expand Down
16 changes: 0 additions & 16 deletions src/common/src/util/cluster_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::collections::HashMap;
use std::fmt::{self, Display, Formatter};

use risingwave_pb::meta::actor_count_per_parallelism::PbWorkerActorCount;
use risingwave_pb::meta::cluster_limit::PbLimit;
Expand Down Expand Up @@ -117,18 +116,3 @@ impl ActorCountPerParallelism {
self.exceed_soft_limit() || self.exceed_hard_limit()
}
}

impl Display for ActorCountPerParallelism {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let worker_id_to_actor_count_str: Vec<_> = self
.worker_id_to_actor_count
.iter()
.map(|(k, v)| format!("{} -> {:?}", k, v))
.collect();
write!(
f,
"ActorCountPerParallelism {{ critical limit: {:?}, recommended limit: {:?}. worker_id_to_actor_count: {:?} }}",
self.hard_limit, self.soft_limit, worker_id_to_actor_count_str
)
}
}
27 changes: 13 additions & 14 deletions src/frontend/src/expr/function_impl/cast_regclass.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@ use thiserror::Error;
use thiserror_ext::AsReport;

use super::context::{AUTH_CONTEXT, CATALOG_READER, DB_NAME, SEARCH_PATH};
use crate::binder::ResolveQualifiedNameError;
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::{CatalogError, CatalogReader};
use crate::session::AuthContext;
use crate::Binder;

#[derive(Error, Debug)]
enum ResolveRegclassError {
#[error("parse object name failed: {0}")]
Parser(#[from] ParserError),
#[error("catalog error: {0}")]
Catalog(#[from] CatalogError),
#[error("resolve qualified name error: {0}")]
ResolveQualifiedName(#[from] ResolveQualifiedNameError),
}

impl From<ResolveRegclassError> for ExprError {
Expand All @@ -39,6 +43,10 @@ impl From<ResolveRegclassError> for ExprError {
name: "name",
reason: e.to_report_string().into(),
},
ResolveRegclassError::ResolveQualifiedName(e) => ExprError::InvalidParam {
name: "name",
reason: e.to_report_string().into(),
},
}
}
}
Expand Down Expand Up @@ -68,20 +76,11 @@ fn resolve_regclass_inner(
// '"my schema".foo' must all work as values passed pg_table_size.
let obj = Parser::parse_object_name_str(class_name)?;

if obj.0.len() == 1 {
let class_name = obj.0[0].real_value();
let schema_path = SchemaPath::Path(search_path, &auth_context.user_name);
Ok(catalog
.read_guard()
.get_id_by_class_name(db_name, schema_path, &class_name)?)
} else {
let schema = obj.0[0].real_value();
let class_name = obj.0[1].real_value();
let schema_path = SchemaPath::Name(&schema);
Ok(catalog
.read_guard()
.get_id_by_class_name(db_name, schema_path, &class_name)?)
}
let (schema_name, class_name) = Binder::resolve_schema_qualified_name(db_name, obj)?;
let schema_path = SchemaPath::new(schema_name.as_deref(), search_path, &auth_context.user_name);
Ok(catalog
.read_guard()
.get_id_by_class_name(db_name, schema_path, &class_name)?)
}

#[function("cast_regclass(varchar) -> int4")]
Expand Down
38 changes: 26 additions & 12 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1249,22 +1249,36 @@ impl SessionImpl {
return Ok(());
}

let gen_message = |violated_limit: &ActorCountPerParallelism,
let gen_message = |ActorCountPerParallelism {
worker_id_to_actor_count,
hard_limit,
soft_limit,
}: ActorCountPerParallelism,
exceed_hard_limit: bool|
-> String {
let (limit_type, action) = if exceed_hard_limit {
("critical", "Please scale the cluster before proceeding!")
("critical", "Scale the cluster immediately to proceed.")
} else {
("recommended", "Scaling the cluster is recommended.")
(
"recommended",
"Consider scaling the cluster for optimal performance.",
)
};
format!(
"\n- {}\n- {}\n- {}\n- {}\n- {}\n{}",
format_args!("Actor count per parallelism exceeds the {} limit.", limit_type),
format_args!("Depending on your workload, this may overload the cluster and cause performance/stability issues. {}", action),
"Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.",
"You can bypass this check via SQL `SET bypass_cluster_limits TO true`.",
"You can check actor count distribution via SQL `SELECT * FROM rw_worker_actor_count`.",
violated_limit,
r#"Actor count per parallelism exceeds the {limit_type} limit.
Depending on your workload, this may overload the cluster and cause performance/stability issues. {action}
HINT:
- For best practices on managing streaming jobs: https://docs.risingwave.com/operate/manage-a-large-number-of-streaming-jobs
- To bypass the check (if the cluster load is acceptable): `[ALTER SYSTEM] SET bypass_cluster_limits TO true`.
See https://docs.risingwave.com/operate/view-configure-runtime-parameters#how-to-configure-runtime-parameters
- Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.
DETAILS:
- hard limit: {hard_limit}
- soft limit: {soft_limit}
- worker_id_to_actor_count: {worker_id_to_actor_count:?}"#,
)
};

Expand All @@ -1274,10 +1288,10 @@ impl SessionImpl {
cluster_limit::ClusterLimit::ActorCount(l) => {
if l.exceed_hard_limit() {
return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
&l, true,
l, true,
))));
} else if l.exceed_soft_limit() {
self.notice_to_user(gen_message(&l, false));
self.notice_to_user(gen_message(l, false));
}
}
}
Expand Down

0 comments on commit a3387d9

Please sign in to comment.