Skip to content

Commit

Permalink
Extract python types
Browse files Browse the repository at this point in the history
  • Loading branch information
soapy1 committed Nov 18, 2024
1 parent a5cf560 commit 762477b
Showing 1 changed file with 107 additions and 105 deletions.
212 changes: 107 additions & 105 deletions conda-store-server/conda_store_server/_internal/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import sys
from functools import partial

from typing import List, Optional

from sqlalchemy import (
JSON,
BigInteger,
Expand All @@ -33,6 +35,7 @@
from sqlalchemy.orm import (
backref,
DeclarativeBase,
Mapped,
mapped_column,
relationship,
sessionmaker,
Expand All @@ -54,44 +57,45 @@ class Worker(Base):

__tablename__ = "worker"

id = mapped_column(Integer, primary_key=True)
id: Mapped[int] = mapped_column(primary_key=True)

# For checking whether the worker is initialized
initialized = mapped_column(Boolean, default=False)
initialized: Mapped[bool] = mapped_column(Boolean, default=False)


class Namespace(Base):
"""Namespace for resources"""

__tablename__ = "namespace"

id = mapped_column(Integer, primary_key=True)
name = mapped_column(Unicode(255), unique=True)
id: Mapped[int] = mapped_column(primary_key=True)

name: Mapped[str] = mapped_column(Unicode(255), unique=True)

environments = relationship("Environment", back_populates="namespace")
environments: Mapped[List["Environment"]] = relationship(back_populates="namespace")

deleted_on = mapped_column(DateTime, default=None)
deleted_on: Mapped[datetime.datetime] = mapped_column(DateTime, default=None)

metadata_ = mapped_column(JSON, default=dict, nullable=True)
metadata_: Mapped[Optional[dict]] = mapped_column(JSON, default=dict, nullable=True)

role_mappings = relationship("NamespaceRoleMapping", back_populates="namespace")
role_mappings: Mapped[List["NamespaceRoleMapping"]] = relationship("NamespaceRoleMapping", back_populates="namespace")


class NamespaceRoleMapping(Base):
"""Mapping between roles and namespaces"""

__tablename__ = "namespace_role_mapping"

id = mapped_column(Integer, primary_key=True)
namespace_id = mapped_column(Integer, ForeignKey("namespace.id"), nullable=False)
namespace = relationship(Namespace, back_populates="role_mappings")
id: Mapped[int] = mapped_column(primary_key=True)
namespace_id: Mapped[int] = mapped_column(ForeignKey("namespace.id"), nullable=False)
namespace: Mapped["Namespace"] = relationship(back_populates="role_mappings")

# arn e.g. <namespace>/<name> like `quansight-*/*` or `quansight-devops/*`
# The entity must match with ARN_ALLOWED defined in schema.py
entity = mapped_column(Unicode(255), nullable=False)
entity: Mapped[str] = mapped_column(Unicode(255), nullable=False)

# e.g. viewer
role = mapped_column(Unicode(255), nullable=False)
role: Mapped[str] = mapped_column(Unicode(255), nullable=False)

@validates("entity")
def validate_entity(self, key, entity):
Expand All @@ -115,17 +119,17 @@ class NamespaceRoleMappingV2(Base):

__tablename__ = "namespace_role_mapping_v2"

id = mapped_column(Integer, primary_key=True)
id: Mapped[int] = mapped_column(primary_key=True)
# Provides access to this namespace
namespace_id = mapped_column(Integer, ForeignKey("namespace.id"), nullable=False)
namespace = relationship(Namespace, foreign_keys=[namespace_id])
namespace_id: Mapped[int] = mapped_column(ForeignKey("namespace.id"), nullable=False)
namespace: Mapped["Namespace"] = relationship(foreign_keys=[namespace_id])

# ... for other namespace
other_namespace_id = mapped_column(Integer, ForeignKey("namespace.id"), nullable=False)
other_namespace = relationship(Namespace, foreign_keys=[other_namespace_id])
other_namespace_id: Mapped[int] = mapped_column(ForeignKey("namespace.id"), nullable=False)
other_namespace: Mapped["Namespace"] = relationship(foreign_keys=[other_namespace_id])

# ... with this role, like 'viewer'
role = mapped_column(Unicode(255), nullable=False)
role: Mapped[str] = mapped_column(Unicode(255), nullable=False)

@validates("role")
def validate_role(self, key, role):
Expand Down Expand Up @@ -158,15 +162,15 @@ def __init__(self, specification, is_lockfile: bool = False):
self.sha256 = utils.datastructure_hash(self.spec)
self.is_lockfile = is_lockfile

id = mapped_column(Integer, primary_key=True)
name = mapped_column(Unicode(255), nullable=False)
spec = mapped_column(JSON, nullable=False)
sha256 = mapped_column(Unicode(255), unique=True, nullable=False)
created_on = mapped_column(DateTime, default=datetime.datetime.utcnow)
is_lockfile = mapped_column(Boolean, nullable=False)
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(Unicode(255), nullable=False)
spec: Mapped[dict] = mapped_column(JSON, nullable=False)
sha256: Mapped[str] = mapped_column(Unicode(255), unique=True, nullable=False)
created_on: Mapped[datetime.datetime] = mapped_column(DateTime, default=datetime.datetime.utcnow)
is_lockfile: Mapped[bool]

builds = relationship("Build", back_populates="specification")
solves = relationship("Solve", back_populates="specification")
builds: Mapped[List["Build"]] = relationship(back_populates="specification")
solves: Mapped[List["Solve"]] = relationship(back_populates="specification")


solve_conda_package_build = Table(
Expand All @@ -186,17 +190,15 @@ class Solve(Base):

__tablename__ = "solve"

id = mapped_column(Integer, primary_key=True)
specification_id = mapped_column(Integer, ForeignKey("specification.id"), nullable=False)
specification = relationship(Specification, back_populates="solves")
id: Mapped[int] = mapped_column(primary_key=True)
specification_id: Mapped[str] = mapped_column(ForeignKey("specification.id"), nullable=False)
specification: Mapped["Specification"] = relationship(back_populates="solves")

scheduled_on = mapped_column(DateTime, default=datetime.datetime.utcnow)
started_on = mapped_column(DateTime, default=None)
ended_on = mapped_column(DateTime, default=None)
scheduled_on: Mapped[datetime.datetime] = mapped_column(DateTime, default=datetime.datetime.utcnow)
started_on: Mapped[datetime.datetime] = mapped_column(DateTime, default=None)
ended_on: Mapped[datetime.datetime] = mapped_column(DateTime, default=None)

package_builds = relationship(
"CondaPackageBuild", secondary=solve_conda_package_build
)
package_builds: Mapped[List["CondaPackageBuild"]] = relationship(secondary=solve_conda_package_build)


build_conda_package = Table(
Expand All @@ -216,31 +218,30 @@ class Build(Base):

__tablename__ = "build"

id = mapped_column(Integer, primary_key=True)
specification_id = mapped_column(Integer, ForeignKey("specification.id"), nullable=False)
specification = relationship(Specification, back_populates="builds")
id: Mapped[int] = mapped_column(primary_key=True)
specification_id: Mapped[int] = mapped_column(ForeignKey("specification.id"), nullable=False)
specification: Mapped["Specification"] = relationship(back_populates="builds")

environment_id = mapped_column(Integer, ForeignKey("environment.id"), nullable=False)
environment = relationship(
"Environment",
environment_id: Mapped[int] = mapped_column(ForeignKey("environment.id"), nullable=False)
environment: Mapped["Environment"] = relationship(
backref=backref("builds", cascade="all, delete-orphan"),
foreign_keys=[environment_id],
)

package_builds = relationship("CondaPackageBuild", secondary=build_conda_package)
package_builds: Mapped[List["CondaPackageBuild"]] = relationship(secondary=build_conda_package)

status = mapped_column(Enum(schema.BuildStatus), default=schema.BuildStatus.QUEUED)
status: Mapped[schema.BuildStatus] = mapped_column(default=schema.BuildStatus.QUEUED)
# Additional status info that will be provided to the user. DO NOT put
# sensitive data here
status_info = mapped_column(UnicodeText, default=None)
size = mapped_column(BigInteger, default=0)
scheduled_on = mapped_column(DateTime, default=datetime.datetime.utcnow)
started_on = mapped_column(DateTime, default=None)
ended_on = mapped_column(DateTime, default=None)
deleted_on = mapped_column(DateTime, default=None)
status_info: Mapped[str] = mapped_column(UnicodeText, default=None)
size: Mapped[int] = mapped_column(BigInteger, default=0)
scheduled_on: Mapped[datetime.datetime] = mapped_column(DateTime, default=datetime.datetime.utcnow)
started_on: Mapped[datetime.datetime] = mapped_column(DateTime, default=None)
ended_on: Mapped[datetime.datetime] = mapped_column(DateTime, default=None)
deleted_on: Mapped[datetime.datetime] = mapped_column(DateTime, default=None)

# Only used by build_key_version 3, not necessary for earlier versions
hash = mapped_column(Unicode(32), default=None)
hash: Mapped[str] = mapped_column(Unicode(32), default=None)

@staticmethod
def _get_build_key_version():
Expand All @@ -249,7 +250,7 @@ def _get_build_key_version():

return BuildKey.current_version()

build_key_version = mapped_column(Integer, default=_get_build_key_version, nullable=False)
build_key_version: Mapped[int] = mapped_column(default=_get_build_key_version, nullable=False)

@validates("build_key_version")
def validate_build_key_version(self, key, build_key_version):
Expand All @@ -258,8 +259,8 @@ def validate_build_key_version(self, key, build_key_version):

return BuildKey.set_current_version(build_key_version)

build_artifacts = relationship(
"BuildArtifact", back_populates="build", cascade="all, delete-orphan"
build_artifacts: Mapped[List["BuildArtifact"]] = relationship(
back_populates="build", cascade="all, delete-orphan"
)

def build_path(self, conda_store):
Expand Down Expand Up @@ -423,14 +424,15 @@ class BuildArtifact(Base):

__tablename__ = "build_artifact"

id = mapped_column(Integer, primary_key=True)
id: Mapped[int] = mapped_column(primary_key=True)

build_id: Mapped[int] = mapped_column(ForeignKey("build.id"))

build_id = mapped_column(Integer, ForeignKey("build.id"))
build = relationship(Build, back_populates="build_artifacts")
build: Mapped["Build"] = relationship(back_populates="build_artifacts")

artifact_type = mapped_column(Enum(schema.BuildArtifactType), nullable=False)
artifact_type: Mapped[schema.BuildArtifactType]

key = mapped_column(Unicode(255))
key: Mapped[str] = mapped_column(Unicode(255))


class Environment(Base):
Expand All @@ -445,29 +447,29 @@ class Environment(Base):
UniqueConstraint("namespace_id", "name", name="_namespace_name_uc"),
)

id = mapped_column(Integer, primary_key=True)
id: Mapped[int] = mapped_column(primary_key=True)

namespace_id = mapped_column(Integer, ForeignKey("namespace.id"), nullable=False)
namespace = relationship(Namespace)
namespace_id: Mapped[int] = mapped_column(ForeignKey("namespace.id"), nullable=False)
namespace: Mapped["Namespace"] = relationship(Namespace)

name = mapped_column(Unicode(255), nullable=False)
name: Mapped[str] = mapped_column(Unicode(255), nullable=False)

current_build_id = mapped_column(Integer, ForeignKey("build.id"))
current_build = relationship(
Build, foreign_keys=[current_build_id], post_update=True
current_build_id: Mapped[int] = mapped_column(ForeignKey("build.id"))
current_build: Mapped["Build"] = relationship(
foreign_keys=[current_build_id], post_update=True
)

deleted_on = mapped_column(DateTime, default=None)
deleted_on: Mapped[datetime.datetime] = mapped_column(DateTime, default=None)

description = mapped_column(UnicodeText, default=None)
description: Mapped[str] = mapped_column(UnicodeText, default=None)


class CondaChannel(Base):
__tablename__ = "conda_channel"

id = mapped_column(Integer, primary_key=True)
name = mapped_column(Unicode(255), unique=True, nullable=False)
last_update = mapped_column(DateTime)
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(Unicode(255), unique=True, nullable=False)
last_update: Mapped[datetime.datetime] = mapped_column(DateTime)

def update_packages(self, db, subdirs=None):
logger.info(f"update packages {self.name} ")
Expand Down Expand Up @@ -706,21 +708,21 @@ class CondaPackage(Base):
),
)

id = mapped_column(Integer, primary_key=True)
id: Mapped[int] = mapped_column(primary_key=True)

channel_id = mapped_column(Integer, ForeignKey("conda_channel.id"), index=True)
channel = relationship(CondaChannel)
channel_id: Mapped[int] = mapped_column(ForeignKey("conda_channel.id"), index=True)
channel: Mapped["CondaChannel"] = relationship(CondaChannel)

builds = relationship(
"CondaPackageBuild", back_populates="package", cascade="all, delete-orphan"
builds: Mapped[List["CondaPackageBuild"]] = relationship(
back_populates="package", cascade="all, delete-orphan"
)

license = mapped_column(Text, nullable=True)
license_family = mapped_column(Unicode(64), nullable=True)
name = mapped_column(Unicode(255), nullable=False, index=True)
version = mapped_column(Unicode(64), nullable=False, index=True)
summary = mapped_column(Text, nullable=True)
description = mapped_column(Text, nullable=True)
license: Mapped[str] = mapped_column(Text, nullable=True)
license_family: Mapped[str] = mapped_column(Unicode(64), nullable=True)
name: Mapped[str] = mapped_column(Unicode(255), nullable=False, index=True)
version: Mapped[str] = mapped_column(Unicode(64), nullable=False, index=True)
summary: Mapped[str] = mapped_column(Text, nullable=True)
description: Mapped[str] = mapped_column(Text, nullable=True)

def __repr__(self):
return f"<CondaPackage (channel={self.channel} name={self.name} version={self.version})>"
Expand All @@ -741,28 +743,28 @@ class CondaPackageBuild(Base):
),
)

id = mapped_column(Integer, primary_key=True)
id: Mapped[int] = mapped_column(primary_key=True)

package_id = mapped_column(Integer, ForeignKey("conda_package.id"))
package = relationship(CondaPackage, back_populates="builds")
package_id: Mapped[int] = mapped_column(ForeignKey("conda_package.id"))
package: Mapped["CondaPackage"] = relationship(back_populates="builds")

"""
Some package builds have the exact same data from different channels.
Thus, when adding a channel, populating CondaPackageBuild can encounter
duplicate keys errors. That's why we need to distinguish them by channel_id.
"""
channel_id = mapped_column(Integer, ForeignKey("conda_channel.id"))
channel = relationship(CondaChannel)

build = mapped_column(Unicode(64), nullable=False, index=True)
build_number = mapped_column(Integer, nullable=False)
constrains = mapped_column(JSON, nullable=True)
depends = mapped_column(JSON, nullable=False)
md5 = mapped_column(Unicode(255), nullable=False)
sha256 = mapped_column(Unicode(64), nullable=False)
size = mapped_column(BigInteger, nullable=False)
subdir = mapped_column(Unicode(64), nullable=True)
timestamp = mapped_column(BigInteger, nullable=True)
channel_id: Mapped[int] = mapped_column(ForeignKey("conda_channel.id"))
channel: Mapped["CondaChannel"] = relationship(CondaChannel)

build: Mapped[int] = mapped_column(Unicode(64), index=True)
build_number: Mapped[int]
constrains: Mapped[dict] = mapped_column(JSON)
depends: Mapped[dict] = mapped_column(JSON)
md5: Mapped[str] = mapped_column(Unicode(255))
sha256: Mapped[str] = mapped_column(Unicode(64))
size: Mapped[int] = mapped_column(BigInteger)
subdir: Mapped[str] = mapped_column(Unicode(64))
timestamp: Mapped[int] = mapped_column(BigInteger)

def __repr__(self):
return f"<CondaPackageBuild (id={self.id} build={self.build} size={self.size} sha256={self.sha256})>"
Expand All @@ -771,11 +773,11 @@ def __repr__(self):
class CondaStoreConfiguration(Base):
__tablename__ = "conda_store_configuration"

id = mapped_column(Integer, primary_key=True)
id: Mapped[int] = mapped_column(primary_key=True)

disk_usage = mapped_column(BigInteger, default=0)
free_storage = mapped_column(BigInteger, default=0)
total_storage = mapped_column(BigInteger, default=0)
disk_usage: Mapped[int] = mapped_column(BigInteger, default=0)
free_storage: Mapped[int] = mapped_column(BigInteger, default=0)
total_storage: Mapped[int] = mapped_column(BigInteger, default=0)

@classmethod
def configuration(cls, db):
Expand All @@ -802,10 +804,10 @@ class KeyValueStore(Base):
__tablename__ = "keyvaluestore"
__table_args__ = (UniqueConstraint("prefix", "key", name="_prefix_key_uc"),)

id = mapped_column(Integer, primary_key=True)
prefix = mapped_column(Unicode)
key = mapped_column(Unicode)
value = mapped_column(JSON)
id: Mapped[int] = mapped_column(primary_key=True)
prefix: Mapped[str] = mapped_column(Unicode)
key: Mapped[str] = mapped_column(Unicode)
value: Mapped[dict] = mapped_column(JSON)


def new_session_factory(
Expand Down

0 comments on commit 762477b

Please sign in to comment.