Skip to content

Commit

Permalink
Fixed bug for azure delta source
Browse files Browse the repository at this point in the history
  • Loading branch information
MatsMoll committed Mar 25, 2024
1 parent c3e08b8 commit 6eaab9a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
16 changes: 12 additions & 4 deletions aligned/sources/azure_blob_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from aligned.feature_source import WritableFeatureSource
from aligned.local.job import FileDateJob, FileFactualJob, FileFullJob
from aligned.retrival_job import RetrivalJob, RetrivalRequest
from aligned.schemas.date_formatter import DateFormatter
from aligned.schemas.feature import FeatureType, EventTimestamp
from aligned.sources.local import (
CsvConfig,
Expand Down Expand Up @@ -382,6 +383,7 @@ class AzureBlobDeltaDataSource(
config: AzureBlobConfig
path: str
mapping_keys: dict[str, str] = field(default_factory=dict)
date_formatter: DateFormatter = field(default_factory=lambda: DateFormatter.unix_timestamp('ms'))
type_name: str = 'azure_blob_delta'

def job_group_key(self) -> str:
Expand Down Expand Up @@ -420,7 +422,7 @@ async def freshness(self, event_timestamp: EventTimestamp) -> datetime | None:
return None

def features_for(self, facts: RetrivalJob, request: RetrivalRequest) -> RetrivalJob:
return FileFactualJob(self, [request], facts)
return FileFactualJob(self, [request], facts, date_formatter=self.date_formatter)

async def schema(self) -> dict[str, FeatureType]:
try:
Expand All @@ -433,15 +435,21 @@ async def schema(self) -> dict[str, FeatureType]:
raise UnableToFindFileException() from error

def all_data(self, request: RetrivalRequest, limit: int | None) -> RetrivalJob:
return FileFullJob(self, request, limit)
return FileFullJob(self, request, limit, date_formatter=self.date_formatter)

def all_between_dates(
self,
request: RetrivalRequest,
start_date: datetime,
end_date: datetime,
) -> RetrivalJob:
return FileDateJob(source=self, request=request, start_date=start_date, end_date=end_date)
return FileDateJob(
source=self,
request=request,
start_date=start_date,
end_date=end_date,
date_formatter=self.date_formatter,
)

async def write_pandas(self, df: pd.DataFrame) -> None:
await self.write_polars(pl.from_pandas(df).lazy())
Expand Down Expand Up @@ -516,7 +524,7 @@ def pa_field(feature: Feature) -> pa.Field:
if dtypes[feature.name] == pl.Null:
df = df.with_columns(pl.col(feature.name).cast(feature.dtype.polars_type))
elif feature.dtype.is_datetime:
df = df.with_columns(pl.col(feature.name).dt.timestamp('ms').cast(pl.Float64()))
df = df.with_columns(self.date_formatter.encode_polars(feature.name))
else:
df = df.with_columns(pl.col(feature.name).cast(feature.dtype.polars_type))

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "aligned"
version = "0.0.87"
version = "0.0.88"
description = "A data managment and lineage tool for ML applications."
authors = ["Mats E. Mollestad <[email protected]>"]
license = "Apache-2.0"
Expand Down

0 comments on commit 6eaab9a

Please sign in to comment.