From 98ecdb82857ce467292a196e5100366e53288361 Mon Sep 17 00:00:00 2001 From: Laurent Mutricy Date: Tue, 27 Aug 2024 09:31:49 +0200 Subject: [PATCH 1/3] Remove TSchema and add __call__ method to resolve #322 --- schema/__init__.py | 428 +++++++++++++++++++++------------------------ 1 file changed, 201 insertions(+), 227 deletions(-) diff --git a/schema/__init__.py b/schema/__init__.py index 31bd71b..abf9c03 100644 --- a/schema/__init__.py +++ b/schema/__init__.py @@ -120,233 +120,6 @@ class SchemaUnexpectedTypeError(SchemaError): pass -# Type variable to represent a Schema-like type -TSchema = TypeVar("TSchema", bound="Schema") - - -class And(Generic[TSchema]): - """ - Utility function to combine validation directives in AND Boolean fashion. - """ - - def __init__( - self, - *args: Union[TSchema, Callable[..., Any]], - error: Union[str, None] = None, - ignore_extra_keys: bool = False, - schema: Union[Type[TSchema], None] = None, - ) -> None: - self._args: Tuple[Union[TSchema, Callable[..., Any]], ...] = args - self._error: Union[str, None] = error - self._ignore_extra_keys: bool = ignore_extra_keys - self._schema_class: Type[TSchema] = schema if schema is not None else Schema - - def __repr__(self) -> str: - return f"{self.__class__.__name__}({', '.join(repr(a) for a in self._args)})" - - @property - def args(self) -> Tuple[Union[TSchema, Callable[..., Any]], ...]: - """The provided parameters""" - return self._args - - def validate(self, data: Any, **kwargs: Any) -> Any: - """ - Validate data using defined sub schema/expressions ensuring all - values are valid. - :param data: Data to be validated with sub defined schemas. - :return: Returns validated data. - """ - # Annotate sub_schema with the type returned by _build_schema - for sub_schema in self._build_schemas(): # type: TSchema - data = sub_schema.validate(data, **kwargs) - return data - - def _build_schemas(self) -> List[TSchema]: - return [self._build_schema(s) for s in self._args] - - def _build_schema(self, arg: Any) -> TSchema: - # Assume self._schema_class(arg, ...) returns an instance of TSchema - return self._schema_class( - arg, error=self._error, ignore_extra_keys=self._ignore_extra_keys - ) - - -class Or(And[TSchema]): - """Utility function to combine validation directives in a OR Boolean - fashion. - - If one wants to make an xor, one can provide only_one=True optional argument - to the constructor of this object. When a validation was performed for an - xor-ish Or instance and one wants to use it another time, one needs to call - reset() to put the match_count back to 0.""" - - def __init__( - self, - *args: Union[TSchema, Callable[..., Any]], - only_one: bool = False, - **kwargs: Any, - ) -> None: - self.only_one: bool = only_one - self.match_count: int = 0 - super().__init__(*args, **kwargs) - - def reset(self) -> None: - failed: bool = self.match_count > 1 and self.only_one - self.match_count = 0 - if failed: - raise SchemaOnlyOneAllowedError( - ["There are multiple keys present from the %r condition" % self] - ) - - def validate(self, data: Any, **kwargs: Any) -> Any: - """ - Validate data using sub defined schema/expressions ensuring at least - one value is valid. - :param data: data to be validated by provided schema. - :return: return validated data if not validation - """ - autos: List[str] = [] - errors: List[Union[str, None]] = [] - for sub_schema in self._build_schemas(): - try: - validation: Any = sub_schema.validate(data, **kwargs) - self.match_count += 1 - if self.match_count > 1 and self.only_one: - break - return validation - except SchemaError as _x: - autos += _x.autos - errors += _x.errors - raise SchemaError( - ["%r did not validate %r" % (self, data)] + autos, - [self._error.format(data) if self._error else None] + errors, - ) - - -class Regex: - """ - Enables schema.py to validate string using regular expressions. - """ - - # Map all flags bits to a more readable description - NAMES = [ - "re.ASCII", - "re.DEBUG", - "re.VERBOSE", - "re.UNICODE", - "re.DOTALL", - "re.MULTILINE", - "re.LOCALE", - "re.IGNORECASE", - "re.TEMPLATE", - ] - - def __init__( - self, pattern_str: str, flags: int = 0, error: Union[str, None] = None - ) -> None: - self._pattern_str: str = pattern_str - flags_list = [ - Regex.NAMES[i] for i, f in enumerate(f"{flags:09b}") if f != "0" - ] # Name for each bit - - self._flags_names: str = ", flags=" + "|".join(flags_list) if flags_list else "" - self._pattern: re.Pattern = re.compile(pattern_str, flags=flags) - self._error: Union[str, None] = error - - def __repr__(self) -> str: - return f"{self.__class__.__name__}({self._pattern_str!r}{self._flags_names})" - - @property - def pattern_str(self) -> str: - """The pattern string for the represented regular expression""" - return self._pattern_str - - def validate(self, data: str, **kwargs: Any) -> str: - """ - Validates data using the defined regex. - :param data: Data to be validated. - :return: Returns validated data. - """ - e = self._error - - try: - if self._pattern.search(data): - return data - else: - error_message = ( - e.format(data) - if e - else f"{data!r} does not match {self._pattern_str!r}" - ) - raise SchemaError(error_message) - except TypeError: - error_message = ( - e.format(data) if e else f"{data!r} is not string nor buffer" - ) - raise SchemaError(error_message) - - -class Use: - """ - For more general use cases, you can use the Use class to transform - the data while it is being validated. - """ - - def __init__( - self, callable_: Callable[[Any], Any], error: Union[str, None] = None - ) -> None: - if not callable(callable_): - raise TypeError(f"Expected a callable, not {callable_!r}") - self._callable: Callable[[Any], Any] = callable_ - self._error: Union[str, None] = error - - def __repr__(self) -> str: - return f"{self.__class__.__name__}({self._callable!r})" - - def validate(self, data: Any, **kwargs: Any) -> Any: - try: - return self._callable(data) - except SchemaError as x: - raise SchemaError( - [None] + x.autos, - [self._error.format(data) if self._error else None] + x.errors, - ) - except BaseException as x: - f = _callable_str(self._callable) - raise SchemaError( - "%s(%r) raised %r" % (f, data, x), - self._error.format(data) if self._error else None, - ) - - -COMPARABLE, CALLABLE, VALIDATOR, TYPE, DICT, ITERABLE = range(6) - - -def _priority(s: Any) -> int: - """Return priority for a given object.""" - if type(s) in (list, tuple, set, frozenset): - return ITERABLE - if isinstance(s, dict): - return DICT - if issubclass(type(s), type): - return TYPE - if isinstance(s, Literal): - return COMPARABLE - if hasattr(s, "validate"): - return VALIDATOR - if callable(s): - return CALLABLE - else: - return COMPARABLE - - -def _invoke_with_optional_kwargs(f: Callable[..., Any], **kwargs: Any) -> Any: - s = inspect.signature(f) - if len(s.parameters) == 0: - return f() - return f(**kwargs) - - class Schema(object): """ Entry point of the library, use this class to instantiate validation @@ -834,6 +607,207 @@ def _get_key_name(key: Any) -> Any: return _json_schema(self, True) +class And(Schema): + """ + Utility function to combine validation directives in AND Boolean fashion. + """ + + def __init__( + self, + *args: Union[Schema, Callable[..., Any]], + error: Union[str, None] = None, + ignore_extra_keys: bool = False, + schema: Union[Type[Schema], None] = None, + ) -> None: + self._args: Tuple[Union[Schema, Callable[..., Any]], ...] = args + self._error: Union[str, None] = error + self._ignore_extra_keys: bool = ignore_extra_keys + self._schema_class: Type[Schema] = schema if schema is not None else Schema + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({', '.join(repr(a) for a in self._args)})" + + @property + def args(self) -> Tuple[Union[Schema, Callable[..., Any]], ...]: + """The provided parameters""" + return self._args + + def validate(self, data: Any, **kwargs: Any) -> Any: + """ + Validate data using defined sub schema/expressions ensuring all + values are valid. + :param data: Data to be validated with sub defined schemas. + :return: Returns validated data. + """ + # Annotate sub_schema with the type returned by _build_schema + for sub_schema in self._build_schemas(): # type: Schema + data = sub_schema.validate(data, **kwargs) + return data + + def _build_schemas(self) -> List[Schema]: + return [self._build_schema(s) for s in self._args] + + def _build_schema(self, arg: Any) -> Schema: + # Assume self._schema_class(arg, ...) returns an instance of Schema + return self._schema_class(arg, error=self._error, ignore_extra_keys=self._ignore_extra_keys) + + +class Or(And): + """Utility function to combine validation directives in a OR Boolean + fashion. + + If one wants to make an xor, one can provide only_one=True optional argument + to the constructor of this object. When a validation was performed for an + xor-ish Or instance and one wants to use it another time, one needs to call + reset() to put the match_count back to 0.""" + + def __init__(self, *args: Union[Schema, Callable[..., Any]], only_one: bool = False, **kwargs: Any) -> None: + self.only_one: bool = only_one + self.match_count: int = 0 + super().__init__(*args, **kwargs) + + def reset(self) -> None: + failed: bool = self.match_count > 1 and self.only_one + self.match_count = 0 + if failed: + raise SchemaOnlyOneAllowedError(["There are multiple keys present from the %r condition" % self]) + + def validate(self, data: Any, **kwargs: Any) -> Any: + """ + Validate data using sub defined schema/expressions ensuring at least + one value is valid. + :param data: data to be validated by provided schema. + :return: return validated data if not validation + """ + autos: List[str] = [] + errors: List[Union[str, None]] = [] + for sub_schema in self._build_schemas(): + try: + validation: Any = sub_schema.validate(data, **kwargs) + self.match_count += 1 + if self.match_count > 1 and self.only_one: + break + return validation + except SchemaError as _x: + autos += _x.autos + errors += _x.errors + raise SchemaError( + ["%r did not validate %r" % (self, data)] + autos, + [self._error.format(data) if self._error else None] + errors, + ) + + +class Regex: + """ + Enables schema.py to validate string using regular expressions. + """ + + # Map all flags bits to a more readable description + NAMES = [ + "re.ASCII", + "re.DEBUG", + "re.VERBOSE", + "re.UNICODE", + "re.DOTALL", + "re.MULTILINE", + "re.LOCALE", + "re.IGNORECASE", + "re.TEMPLATE", + ] + + def __init__(self, pattern_str: str, flags: int = 0, error: Union[str, None] = None) -> None: + self._pattern_str: str = pattern_str + flags_list = [Regex.NAMES[i] for i, f in enumerate(f"{flags:09b}") if f != "0"] # Name for each bit + + self._flags_names: str = ", flags=" + "|".join(flags_list) if flags_list else "" + self._pattern: re.Pattern = re.compile(pattern_str, flags=flags) + self._error: Union[str, None] = error + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self._pattern_str!r}{self._flags_names})" + + @property + def pattern_str(self) -> str: + """The pattern string for the represented regular expression""" + return self._pattern_str + + def validate(self, data: str, **kwargs: Any) -> str: + """ + Validates data using the defined regex. + :param data: Data to be validated. + :return: Returns validated data. + """ + e = self._error + + try: + if self._pattern.search(data): + return data + else: + error_message = e.format(data) if e else f"{data!r} does not match {self._pattern_str!r}" + raise SchemaError(error_message) + except TypeError: + error_message = e.format(data) if e else f"{data!r} is not string nor buffer" + raise SchemaError(error_message) + + +class Use: + """ + For more general use cases, you can use the Use class to transform + the data while it is being validated. + """ + + def __init__(self, callable_: Callable[[Any], Any], error: Union[str, None] = None) -> None: + if not callable(callable_): + raise TypeError(f"Expected a callable, not {callable_!r}") + self._callable: Callable[[Any], Any] = callable_ + self._error: Union[str, None] = error + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self._callable!r})" + + def validate(self, data: Any, **kwargs: Any) -> Any: + try: + return self._callable(data) + except SchemaError as x: + raise SchemaError([None] + x.autos, [self._error.format(data) if self._error else None] + x.errors) + except BaseException as x: + f = _callable_str(self._callable) + raise SchemaError("%s(%r) raised %r" % (f, data, x), self._error.format(data) if self._error else None) + + def __call__(self, *args: Any, **kwds: Any) -> Any: + pass + + +COMPARABLE, CALLABLE, VALIDATOR, TYPE, DICT, ITERABLE = range(6) + + +def _priority(s: Any) -> int: + """Return priority for a given object.""" + if type(s) in (list, tuple, set, frozenset): + return ITERABLE + if type(s) is dict: + return DICT + if issubclass(type(s), type): + return TYPE + if isinstance(s, Literal): + return COMPARABLE + if hasattr(s, "validate"): + return VALIDATOR + if callable(s): + return CALLABLE + else: + return COMPARABLE + + +def _invoke_with_optional_kwargs(f: Callable[..., Any], **kwargs: Any) -> Any: + s = inspect.signature(f) + if len(s.parameters) == 0: + return f() + return f(**kwargs) + + + + class Optional(Schema): """Marker for an optional part of the validation Schema.""" From e8f02060aa42e4576447d263388653b187e5bbff Mon Sep 17 00:00:00 2001 From: Laurent Mutricy Date: Tue, 27 Aug 2024 09:47:00 +0200 Subject: [PATCH 2/3] also update Regex --- schema/__init__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/schema/__init__.py b/schema/__init__.py index abf9c03..4ca4916 100644 --- a/schema/__init__.py +++ b/schema/__init__.py @@ -749,6 +749,9 @@ def validate(self, data: str, **kwargs: Any) -> str: error_message = e.format(data) if e else f"{data!r} is not string nor buffer" raise SchemaError(error_message) + def __call__(self, *args: Any, **kwds: Any) -> Any: + pass + class Use: """ From 82528aa5c385fb1b05cdf2dc9b345210b60be4e9 Mon Sep 17 00:00:00 2001 From: Laurent Mutricy Date: Tue, 27 Aug 2024 10:00:33 +0200 Subject: [PATCH 3/3] some cleaning --- schema/__init__.py | 52 +++++++++++++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 14 deletions(-) diff --git a/schema/__init__.py b/schema/__init__.py index 4ca4916..31b936c 100644 --- a/schema/__init__.py +++ b/schema/__init__.py @@ -9,7 +9,6 @@ Any, Callable, Dict, - Generic, Iterable, List, NoReturn, @@ -649,7 +648,9 @@ def _build_schemas(self) -> List[Schema]: def _build_schema(self, arg: Any) -> Schema: # Assume self._schema_class(arg, ...) returns an instance of Schema - return self._schema_class(arg, error=self._error, ignore_extra_keys=self._ignore_extra_keys) + return self._schema_class( + arg, error=self._error, ignore_extra_keys=self._ignore_extra_keys + ) class Or(And): @@ -661,7 +662,12 @@ class Or(And): xor-ish Or instance and one wants to use it another time, one needs to call reset() to put the match_count back to 0.""" - def __init__(self, *args: Union[Schema, Callable[..., Any]], only_one: bool = False, **kwargs: Any) -> None: + def __init__( + self, + *args: Union[Schema, Callable[..., Any]], + only_one: bool = False, + **kwargs: Any, + ) -> None: self.only_one: bool = only_one self.match_count: int = 0 super().__init__(*args, **kwargs) @@ -670,7 +676,9 @@ def reset(self) -> None: failed: bool = self.match_count > 1 and self.only_one self.match_count = 0 if failed: - raise SchemaOnlyOneAllowedError(["There are multiple keys present from the %r condition" % self]) + raise SchemaOnlyOneAllowedError( + ["There are multiple keys present from the %r condition" % self] + ) def validate(self, data: Any, **kwargs: Any) -> Any: """ @@ -715,9 +723,13 @@ class Regex: "re.TEMPLATE", ] - def __init__(self, pattern_str: str, flags: int = 0, error: Union[str, None] = None) -> None: + def __init__( + self, pattern_str: str, flags: int = 0, error: Union[str, None] = None + ) -> None: self._pattern_str: str = pattern_str - flags_list = [Regex.NAMES[i] for i, f in enumerate(f"{flags:09b}") if f != "0"] # Name for each bit + flags_list = [ + Regex.NAMES[i] for i, f in enumerate(f"{flags:09b}") if f != "0" + ] # Name for each bit self._flags_names: str = ", flags=" + "|".join(flags_list) if flags_list else "" self._pattern: re.Pattern = re.compile(pattern_str, flags=flags) @@ -743,10 +755,16 @@ def validate(self, data: str, **kwargs: Any) -> str: if self._pattern.search(data): return data else: - error_message = e.format(data) if e else f"{data!r} does not match {self._pattern_str!r}" + error_message = ( + e.format(data) + if e + else f"{data!r} does not match {self._pattern_str!r}" + ) raise SchemaError(error_message) except TypeError: - error_message = e.format(data) if e else f"{data!r} is not string nor buffer" + error_message = ( + e.format(data) if e else f"{data!r} is not string nor buffer" + ) raise SchemaError(error_message) def __call__(self, *args: Any, **kwds: Any) -> Any: @@ -759,7 +777,9 @@ class Use: the data while it is being validated. """ - def __init__(self, callable_: Callable[[Any], Any], error: Union[str, None] = None) -> None: + def __init__( + self, callable_: Callable[[Any], Any], error: Union[str, None] = None + ) -> None: if not callable(callable_): raise TypeError(f"Expected a callable, not {callable_!r}") self._callable: Callable[[Any], Any] = callable_ @@ -772,10 +792,16 @@ def validate(self, data: Any, **kwargs: Any) -> Any: try: return self._callable(data) except SchemaError as x: - raise SchemaError([None] + x.autos, [self._error.format(data) if self._error else None] + x.errors) + raise SchemaError( + [None] + x.autos, + [self._error.format(data) if self._error else None] + x.errors, + ) except BaseException as x: f = _callable_str(self._callable) - raise SchemaError("%s(%r) raised %r" % (f, data, x), self._error.format(data) if self._error else None) + raise SchemaError( + "%s(%r) raised %r" % (f, data, x), + self._error.format(data) if self._error else None, + ) def __call__(self, *args: Any, **kwds: Any) -> Any: pass @@ -788,7 +814,7 @@ def _priority(s: Any) -> int: """Return priority for a given object.""" if type(s) in (list, tuple, set, frozenset): return ITERABLE - if type(s) is dict: + if isinstance(s, dict): return DICT if issubclass(type(s), type): return TYPE @@ -809,8 +835,6 @@ def _invoke_with_optional_kwargs(f: Callable[..., Any], **kwargs: Any) -> Any: return f(**kwargs) - - class Optional(Schema): """Marker for an optional part of the validation Schema."""