-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapp.py
148 lines (123 loc) · 4.65 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import os
import pathlib
import secrets
import time
from typing import Optional
import socketio
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.param_functions import Cookie, Depends
from fastapi.params import Form
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from starlette.middleware.sessions import SessionMiddleware
from starlette.requests import Request
from starlette.responses import HTMLResponse, JSONResponse, RedirectResponse, Response
SECRET_KEY = os.environ.get("SECRET_KEY", "ef4ac4e2a33e4d9e0bb34200349e3544")
templates = Jinja2Templates(directory=pathlib.Path(__file__).parent / "templates")
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "[email protected]",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "[email protected]",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
class RequiresLoginException(Exception):
pass
app = FastAPI()
sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*")
app.mount("/ws", socketio.ASGIApp(sio))
app.add_middleware(SessionMiddleware, secret_key=SECRET_KEY)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.mount(
"/static",
StaticFiles(directory=pathlib.Path(__file__).parent / "templates"),
name="static",
)
# This is not really required for simple use case, but if we have a lot views
# and want to protect them, a common redirect logic is convenient.
@app.exception_handler(RequiresLoginException)
async def exception_handler(*args, **kwargs) -> Response:
return RedirectResponse(url="/", status_code=303)
def verify_session_id(request: Request, session_id: Optional[str] = Cookie(...)):
"""Verify the session_id in the fake db.
If it doesn't exist raise an exception to redirect to Login page"""
username = request.session.get(session_id)
if username not in fake_users_db:
# raise an exception so that we can redirect to the login
# if there's no `session_id`` passed or wrong `session_id`` is given
raise RequiresLoginException
return username
@app.get("/view")
async def view(request: Request, username: str = Depends(verify_session_id)):
await sio.emit("message", "hello universe")
return templates.TemplateResponse(
"view.html",
{
"request": request,
"current_user": username,
"start_time": request.session.get("start_time", int(time.time())),
"PORT": os.environ.get("PORT", 8000),
},
)
@app.get("/")
def index(request: Request):
# if there's some session, the user may likely be logged in
# try redirecting to the /view
if request.session:
return RedirectResponse(url="/view", status_code=303)
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/login")
async def login(request: Request, username: str = Form(...), password: str = Form(...)):
"""Get `username` and `password` from form data and authenticate the user
If username doesn't exist, redirect to Login page.
Else continue to `/view` page
"""
# for simplicity we will only check the `username`` exists
# we can add a `password` check if required
if username not in fake_users_db:
response = RedirectResponse(url="/", status_code=303)
return response
# why we need to set the status_code to `303` can be seen in the below git issue comment
# https://github.com/encode/starlette/issues/632#issuecomment-527258349
response = RedirectResponse(url="/view", status_code=303)
session_id = secrets.token_hex(16)
request.session.update(
{
session_id: username,
"start_time": int(time.time()),
"username": username,
}
)
response.set_cookie("session_id", session_id)
return response
@app.get("/logout", name="logout")
async def logout(request: Request, username: str = Depends(verify_session_id)):
"""Logout and redirect to Login screen"""
request.session.clear()
response = RedirectResponse(url="/", status_code=303)
response.set_cookie("session_id", None)
await sio.emit("logout", username)
return response
@sio.event
async def connect(sid, environ):
session = environ["asgi.scope"]["session"]
await sio.emit("new user", session)
@sio.event
async def message(sid, data):
await sio.emit("message", data, room=sid)