From 291c8bb5bc0773ee3029a045373adef4c47472e5 Mon Sep 17 00:00:00 2001 From: vladsaveliev Date: Wed, 14 Feb 2024 15:57:29 +0100 Subject: [PATCH 1/4] Buffer visits in memory and in csv before writing to database --- .gitignore | 2 + app/__init__.py | 4 ++ app/db.py | 37 +++++----- app/main.py | 180 +++++++++++++++++++++++++++++++++++------------ pyproject.toml | 21 ++---- requirements.txt | 1 + 6 files changed, 168 insertions(+), 77 deletions(-) diff --git a/.gitignore b/.gitignore index 8d4ceba..1d9083e 100644 --- a/.gitignore +++ b/.gitignore @@ -138,3 +138,5 @@ dmypy.json .idea/ .DS_Store + +visits.csv diff --git a/app/__init__.py b/app/__init__.py index 0288a96..ed8d1c5 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,3 +1,7 @@ """api.multiqc.info: Providing run-time information about available updates.""" __version__ = "0.1.0.dev0" + +from dotenv import load_dotenv + +load_dotenv() diff --git a/app/db.py b/app/db.py index b19e6ee..70f0161 100644 --- a/app/db.py +++ b/app/db.py @@ -1,24 +1,29 @@ """Functions to interact with the database.""" +import os import datetime -from os import getenv from sqlmodel import create_engine, Field, select, Session, SQLModel -sql_url = getenv("DATABASE_URL") +sql_url = os.getenv("DATABASE_URL") +assert sql_url is not None, sql_url engine = create_engine(sql_url) -class Visit(SQLModel, table=True): # type: ignore # mypy doesn't like this, not sure why - """Table to record raw individual visits to the version endpoint.""" +class Visits(SQLModel, table=True): # type: ignore # mypy doesn't like this, not sure why + """ + Table to record per-minute visit summaries + """ id: int | None = Field(default=None, primary_key=True) + start: datetime.datetime | None = None + end: datetime.datetime | None = None + count: int = 0 version_multiqc: str | None = None version_python: str | None = None operating_system: str | None = None installation_method: str | None = None ci_environment: str | None = None - called_at: datetime.datetime | None = Field(default_factory=datetime.datetime.utcnow, nullable=False) def create_db_and_tables() -> None: @@ -26,27 +31,27 @@ def create_db_and_tables() -> None: SQLModel.metadata.create_all(engine) -def add_visit(visit: Visit) -> None: - """Add a visit to the database.""" - with Session(engine) as session: - session.add(visit) - session.commit() +# def add_visit(visit: Visits) -> None: +# """Add a visit to the database.""" +# with Session(engine) as session: +# session.add(visit) +# session.commit() def get_visits( start: datetime.datetime | None = None, end: datetime.datetime | None = None, limit: int | None = None, -) -> list[Visit]: - """Return list of raw visits from the DB.""" +) -> list[Visits]: + """Return list of per-minute visit summary from the DB.""" with Session(engine) as session: - statement = select(Visit) + statement = select(Visits) if start: # Ignore type because Visit.called_at can be None for default value - statement.where(Visit.called_at > start) # type: ignore + statement.where(Visits.start >= start) # type: ignore if end: - statement.where(Visit.called_at < end) # type: ignore + statement.where(Visits.end <= end) # type: ignore if limit: statement.limit(limit) - statement.order_by(Visit.called_at.desc()) # type: ignore + statement.order_by(Visits.start.desc()) # type: ignore return session.exec(statement).all() diff --git a/app/main.py b/app/main.py index 0493a9f..f08fa2f 100644 --- a/app/main.py +++ b/app/main.py @@ -1,4 +1,12 @@ +import csv import datetime +import logging +import sys +from contextlib import asynccontextmanager +from pathlib import Path +from threading import Lock + +import uvicorn from enum import Enum from os import getenv @@ -6,10 +14,23 @@ import plotly.express as px from fastapi import BackgroundTasks, FastAPI from fastapi.responses import HTMLResponse, PlainTextResponse, Response +from fastapi_utilities import repeat_every from github import Github from plotly.graph_objs import Layout -from . import __version__, db, models +from app import __version__, db, models +from app.db import engine + + +app = FastAPI( + title="MultiQC API", + description="MultiQC API service, providing run-time information about available updates.", + version=__version__, + license_info={ + "name": "Source code available under the MIT Licence", + "url": "https://github.com/MultiQC/api.multiqc.info/blob/main/LICENSE", + }, +) def get_latest_release() -> models.LatestRelease: @@ -24,22 +45,102 @@ def get_latest_release() -> models.LatestRelease: ) -app = FastAPI( - title="MultiQC API", - description="MultiQC API service, providing run-time information about available updates.", - version=__version__, - license_info={ - "name": "Source code available under the MIT Licence", - "url": "https://github.com/MultiQC/api.multiqc.info/blob/main/LICENSE", - }, -) +app.latest_release = get_latest_release() +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +logger.addHandler(logging.StreamHandler(sys.stdout)) -@app.on_event("startup") -def on_startup(): - """Initialise the DB and tables on server startup.""" + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Initialise the DB and tables on server startup db.create_db_and_tables() + # Sync latest version tag using GitHub API app.latest_release = get_latest_release() + yield + + +@repeat_every(seconds=15 * 60) # every 15 minutes +def update_version(): + """Sync latest version tag using GitHub API""" + app.latest_release = get_latest_release() + + +# Fields to store per visit +fieldnames = [ + "version_multiqc", + "version_python", + "operating_system", + "installation_method", + "ci_environment", +] + +# Thread-safe in-memory buffer to accumulate recent visits before writing to the CSV file +visit_data = [] +visit_data_lock = Lock() + + +@app.get("/version") # log a visit +async def version( + version_multiqc: str | None = None, + version_python: str | None = None, + operating_system: str | None = None, + installation_method: str | None = None, + ci_environment: str | None = None, +): + """ + Endpoint for MultiQC that returns the latest release, and logs + the visit along with basic user environment detail. + """ + with visit_data_lock: + visit_data.append( + { + "timestamp": datetime.datetime.now().isoformat(), + "version_multiqc": version_multiqc, + "version_python": version_python, + "operating_system": operating_system, + "installation_method": installation_method, + "ci_environment": ci_environment, + } + ) + return models.VersionResponse(latest_release=app.latest_release) + + +# Path to a buffer CSV file to persist recent visits before dumping to the database +# In the same folder as this script +CSV_FILE_PATH = Path(__file__).parent / "visits.csv" + + +@repeat_every(seconds=10) # every 10 seconds +async def persist_visits(): + """Write in-memory visits to a CSV file""" + global visit_data + with visit_data_lock: + if visit_data: + logger.debug(f"Persisting {len(visit_data)} visits to CSV {CSV_FILE_PATH}") + with open(CSV_FILE_PATH, mode="a") as file: + writer = csv.DictWriter(file, fieldnames=["timestamp"] + fieldnames) + for row in visit_data: + writer.writerow(row) + visit_data = [] + + +@repeat_every(seconds=60 * 60 * 1) # every hour +async def summarize_visits(): + with visit_data_lock: + df = pd.read_csv(CSV_FILE_PATH, sep=",", names=["timestamp"] + fieldnames) + df["start"] = pd.to_datetime(df["timestamp"]) + df["end"] = df["start"] + pd.to_timedelta("1min") + df["start"] = df["start"].dt.strftime("%Y-%m-%d %H:%M") + df["end"] = df["end"].dt.strftime("%Y-%m-%d %H:%M") + df = df.drop(columns=["timestamp"]) + # replace nan with "Unknown" + df = df.fillna("Unknown") # df.groupby will fail if there are NaNs + # Summarize visits per user per minute + minute_summary = df.groupby(["start", "end"] + fieldnames).size().reset_index(name="count") + logger.debug(f"Summarizing {len(df)} visits in {CSV_FILE_PATH} and writing {len(minute_summary)} rows to DB") + minute_summary.to_sql("visits", con=engine, if_exists="append", index=False) @app.get("/") @@ -69,29 +170,6 @@ async def downloads(): return {} -@app.get("/version") -async def version( - background_tasks: BackgroundTasks, - version_multiqc: str | None = None, - version_python: str | None = None, - operating_system: str | None = None, - installation_method: str | None = None, - ci_environment: str | None = None, -): - """Endpoint for MultiQC that returns the latest release, plus bonus info.""" - background_tasks.add_task( - db.add_visit, - db.Visit( - version_multiqc=version_multiqc, - version_python=version_python, - operating_system=operating_system, - installation_method=installation_method, - ci_environment=ci_environment, - ), - ) - return models.VersionResponse(latest_release=app.latest_release) - - @app.get("/version.php", response_class=PlainTextResponse) async def version_legacy(background_tasks: BackgroundTasks, v: str | None = None): """ @@ -100,7 +178,13 @@ async def version_legacy(background_tasks: BackgroundTasks, v: str | None = None Accessed by MultiQC versions 1.14 and earlier, after being redirected to by https://multiqc.info/version.php """ - background_tasks.add_task(db.add_visit, db.Visit(version_multiqc=v)) + with visit_data_lock: + visit_data.append( + { + "timestamp": datetime.datetime.now().isoformat(), + "version_multiqc": v, + } + ) return app.latest_release.version @@ -127,7 +211,7 @@ class PlotlyTemplates(str, Enum): @app.get("/plot_usage") -async def usage_raw( +async def plot_usage( categories: models.UsageCategory | None = None, interval: models.IntervalTypes = models.IntervalTypes.D, start: datetime.datetime | None = None, @@ -139,6 +223,8 @@ async def usage_raw( """Plot usage metrics.""" # Get visit data visits = db.get_visits(start=start, end=end, limit=limit) + if not visits: + return Response(status_code=204) df = pd.DataFrame.from_records([i.dict() for i in visits]) df.fillna("Unknown", inplace=True) legend_title_text = models.usage_category_nicenames[categories] if categories else None @@ -148,15 +234,17 @@ async def usage_raw( categories = models.UsageCategory[categories.name.replace("_simple", "")] df[categories.name] = df[categories.name].str.replace(r"^v?(\d+\.\d+).+", lambda m: m.group(1), regex=True) - # Plot + # Plot histogram of df.count per interval from df.start fig = px.histogram( df, - x=df["called_at"].dt.to_period(interval.name).astype("datetime64[M]"), - color=categories, - title="MultiQC usage", + x="start", + y="count", + color=categories.name if categories else None, + title="Usage per version per week", + ) + fig.update_layout( + legend_title_text=legend_title_text, ) - fig.update_traces(xbins_size=models.interval_types_plotly[interval]) - fig.update_layout(legend_title_text=legend_title_text) return plotly_image_response(plotly_to_image(fig, format, template), format) @@ -202,3 +290,7 @@ def plotly_image_response(plot, format: PlotlyImageFormats = PlotlyImageFormats. elif format == "png": return Response(content=plot, media_type="image/png") return Response(content=plot) + + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/pyproject.toml b/pyproject.toml index e7825ec..50676eb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,22 +1,9 @@ -[tool.black] +[tool.ruff] line-length = 120 -target-version = ['py310'] - -[tool.isort] -line_length = 120 -multi_line_output = 3 -force_alphabetical_sort_within_sections = "True" -force_sort_within_sections = "False" -sections = [ - "FUTURE", - "STDLIB", - "THIRDPARTY", - "FIRSTPARTY", - "LOCALFOLDER", -] -profile = "black" +target-version = "py312" +ignore-init-module-imports = true [tool.mypy] -python_version = "3.10" +python_version = "3.12" ignore_missing_imports = "True" scripts_are_modules = "True" diff --git a/requirements.txt b/requirements.txt index efe93f8..fca56dd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ fastapi +fastapi-utilities kaleido pandas plotly From 71f2b8b64d17403a2ee6664a9f704c0fe355db00 Mon Sep 17 00:00:00 2001 From: vladsaveliev Date: Wed, 14 Feb 2024 18:15:18 +0100 Subject: [PATCH 2/4] Clean up CSV, handle errors, make start and end primary keys --- app/db.py | 27 ++++++++------------ app/main.py | 72 +++++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 63 insertions(+), 36 deletions(-) diff --git a/app/db.py b/app/db.py index 70f0161..2197214 100644 --- a/app/db.py +++ b/app/db.py @@ -12,18 +12,18 @@ class Visits(SQLModel, table=True): # type: ignore # mypy doesn't like this, not sure why """ - Table to record per-minute visit summaries + Table to record per-minute visit summaries. Start is a primary key, + and start and end are both indexed. """ - id: int | None = Field(default=None, primary_key=True) - start: datetime.datetime | None = None - end: datetime.datetime | None = None - count: int = 0 - version_multiqc: str | None = None - version_python: str | None = None - operating_system: str | None = None - installation_method: str | None = None - ci_environment: str | None = None + start: datetime.datetime = Field(primary_key=True) + end: datetime.datetime = Field(primary_key=True) + count: int + version_multiqc: str = Field(index=True) + version_python: str = Field(default=None, index=True) + operating_system: str = Field(default=None, index=True) + installation_method: str = Field(default=None, index=True) + ci_environment: str = Field(default=None, index=True) def create_db_and_tables() -> None: @@ -31,13 +31,6 @@ def create_db_and_tables() -> None: SQLModel.metadata.create_all(engine) -# def add_visit(visit: Visits) -> None: -# """Add a visit to the database.""" -# with Session(engine) as session: -# session.add(visit) -# session.commit() - - def get_visits( start: datetime.datetime | None = None, end: datetime.datetime | None = None, diff --git a/app/main.py b/app/main.py index f08fa2f..ccb80e6 100644 --- a/app/main.py +++ b/app/main.py @@ -1,6 +1,7 @@ import csv import datetime import logging +import os import sys from contextlib import asynccontextmanager from pathlib import Path @@ -12,11 +13,12 @@ import pandas as pd import plotly.express as px -from fastapi import BackgroundTasks, FastAPI +from fastapi import BackgroundTasks, FastAPI, HTTPException from fastapi.responses import HTMLResponse, PlainTextResponse, Response from fastapi_utilities import repeat_every from github import Github from plotly.graph_objs import Layout +from sqlalchemy.exc import IntegrityError from app import __version__, db, models from app.db import engine @@ -49,6 +51,7 @@ def get_latest_release() -> models.LatestRelease: logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) +# Make sure logs are printed to stdout: logger.addHandler(logging.StreamHandler(sys.stdout)) @@ -61,6 +64,7 @@ async def lifespan(app: FastAPI): yield +@app.on_event("startup") @repeat_every(seconds=15 * 60) # every 15 minutes def update_version(): """Sync latest version tag using GitHub API""" @@ -77,8 +81,8 @@ def update_version(): ] # Thread-safe in-memory buffer to accumulate recent visits before writing to the CSV file -visit_data = [] -visit_data_lock = Lock() +visit_buffer = [] +visit_buffer_lock = Lock() @app.get("/version") # log a visit @@ -93,8 +97,8 @@ async def version( Endpoint for MultiQC that returns the latest release, and logs the visit along with basic user environment detail. """ - with visit_data_lock: - visit_data.append( + with visit_buffer_lock: + visit_buffer.append( { "timestamp": datetime.datetime.now().isoformat(), "version_multiqc": version_multiqc, @@ -112,35 +116,65 @@ async def version( CSV_FILE_PATH = Path(__file__).parent / "visits.csv" +@app.on_event("startup") @repeat_every(seconds=10) # every 10 seconds async def persist_visits(): """Write in-memory visits to a CSV file""" - global visit_data - with visit_data_lock: - if visit_data: - logger.debug(f"Persisting {len(visit_data)} visits to CSV {CSV_FILE_PATH}") + global visit_buffer + with visit_buffer_lock: + if visit_buffer: + logger.debug(f"Persisting {len(visit_buffer)} visits to CSV {CSV_FILE_PATH}") with open(CSV_FILE_PATH, mode="a") as file: - writer = csv.DictWriter(file, fieldnames=["timestamp"] + fieldnames) - for row in visit_data: - writer.writerow(row) - visit_data = [] + writer: csv.DictWriter = csv.DictWriter(file, fieldnames=["timestamp"] + fieldnames) + writer.writerows(visit_buffer) + visit_buffer = [] +@app.on_event("startup") @repeat_every(seconds=60 * 60 * 1) # every hour async def summarize_visits(): - with visit_data_lock: + with visit_buffer_lock: df = pd.read_csv(CSV_FILE_PATH, sep=",", names=["timestamp"] + fieldnames) df["start"] = pd.to_datetime(df["timestamp"]) df["end"] = df["start"] + pd.to_timedelta("1min") df["start"] = df["start"].dt.strftime("%Y-%m-%d %H:%M") df["end"] = df["end"].dt.strftime("%Y-%m-%d %H:%M") df = df.drop(columns=["timestamp"]) - # replace nan with "Unknown" df = df.fillna("Unknown") # df.groupby will fail if there are NaNs # Summarize visits per user per minute minute_summary = df.groupby(["start", "end"] + fieldnames).size().reset_index(name="count") - logger.debug(f"Summarizing {len(df)} visits in {CSV_FILE_PATH} and writing {len(minute_summary)} rows to DB") - minute_summary.to_sql("visits", con=engine, if_exists="append", index=False) + logger.debug( + f"Summarizing {len(df)} visits in {CSV_FILE_PATH} and writing {len(minute_summary)} rows to the DB" + ) + try: + minute_summary.to_sql("visits", con=engine, if_exists="append", index=False) + except IntegrityError as e: + logger.error( + f"Failed to write to the database due to a primary key violation, " + f"probably these entries were already added: {e}" + ) + # Cleaning the file to avoid duplicates + open(CSV_FILE_PATH, "w").close() + except Exception as e: + logger.error(f"Failed to write to the database: {e}") + raise HTTPException(status_code=500, detail=str(e)) + else: + logger.debug(f"Successfully wrote {len(minute_summary)} rows to the DB") + # Clear the CSV file on successful write + open(CSV_FILE_PATH, "w").close() + return Response(status_code=200) + + +if os.getenv("ENVIRONMENT") == "DEV": + + @app.get("/summarize") + async def summarize_visits_endpoint(): + try: + # Call the summarize logic here if possible or replicate the logic + await summarize_visits() + return Response(status_code=200) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) @app.get("/") @@ -178,8 +212,8 @@ async def version_legacy(background_tasks: BackgroundTasks, v: str | None = None Accessed by MultiQC versions 1.14 and earlier, after being redirected to by https://multiqc.info/version.php """ - with visit_data_lock: - visit_data.append( + with visit_buffer_lock: + visit_buffer.append( { "timestamp": datetime.datetime.now().isoformat(), "version_multiqc": v, From f1ff546b6d72b331c6c1b662e4b528ca34934086 Mon Sep 17 00:00:00 2001 From: vladsaveliev Date: Wed, 14 Feb 2024 18:24:20 +0100 Subject: [PATCH 3/4] Comments --- app/main.py | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/app/main.py b/app/main.py index ccb80e6..b72b722 100644 --- a/app/main.py +++ b/app/main.py @@ -165,6 +165,7 @@ async def summarize_visits(): return Response(status_code=200) +# Add a /summarize endpoint to trigger the summarize logic manually available only when developing if os.getenv("ENVIRONMENT") == "DEV": @app.get("/summarize") @@ -177,21 +178,8 @@ async def summarize_visits_endpoint(): raise HTTPException(status_code=500, detail=str(e)) -@app.get("/") -async def index(background_tasks: BackgroundTasks): - """ - Root endpoint for the API. - - Returns a list of available endpoints. - """ - return { - "message": "Welcome to the MultiQC service API", - "available_endpoints": [ - {"path": route.path, "name": route.name} for route in app.routes if route.name != "swagger_ui_redirect" - ], - } - - +@app.on_event("startup") +@repeat_every(seconds=60 * 60 * 1) # daily @app.get("/downloads") async def downloads(): """ @@ -222,6 +210,21 @@ async def version_legacy(background_tasks: BackgroundTasks, v: str | None = None return app.latest_release.version +@app.get("/") +async def index(background_tasks: BackgroundTasks): + """ + Root endpoint for the API. + + Returns a list of available endpoints. + """ + return { + "message": "Welcome to the MultiQC service API", + "available_endpoints": [ + {"path": route.path, "name": route.name} for route in app.routes if route.name != "swagger_ui_redirect" + ], + } + + class PlotlyImageFormats(str, Enum): """Available Plotly image export formats.""" From 50d3d1aeeb54d7b981e8d7f0e9cc2051b8b7d0b6 Mon Sep 17 00:00:00 2001 From: Phil Ewels Date: Wed, 14 Feb 2024 23:05:57 +0100 Subject: [PATCH 4/4] minor docstring tweak --- app/db.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/app/db.py b/app/db.py index 2197214..34d2d0e 100644 --- a/app/db.py +++ b/app/db.py @@ -12,8 +12,9 @@ class Visits(SQLModel, table=True): # type: ignore # mypy doesn't like this, not sure why """ - Table to record per-minute visit summaries. Start is a primary key, - and start and end are both indexed. + Table to record per-minute visit summaries. + + Start is a primary key, and start and end are both indexed. """ start: datetime.datetime = Field(primary_key=True)