-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathload_test_email.py
54 lines (47 loc) · 1.46 KB
/
load_test_email.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import os
import time
from os.path import join, dirname
import jwt
from dotenv import load_dotenv
from locust import HttpUser, task
dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)
class SendEmail(HttpUser):
"""
Sends requests to POST v2/notifications/email
"""
def on_start(self) -> None:
self.api_key = os.environ.get('API_KEY')
self.email_address = os.environ.get('EMAIL_ADDRESS')
self.email_template_id = os.environ.get('EMAIL_TEMPLATE_ID')
self.service_id = os.environ.get('SERVICE_ID')
def _get_jwt(self) -> str:
header = {'typ': 'JWT', 'alg': 'HS256'}
combo = {}
currentTimestamp = int(time.time())
data = {
'iss': self.service_id,
'iat': currentTimestamp,
'exp': currentTimestamp + 30,
'jti': 'jwt_nonce'
}
combo.update(data)
combo.update(header)
encoded_jwt = jwt.encode(combo, self.api_key, algorithm='HS256')
return encoded_jwt
@task
def send(self):
encoded_jwt = self._get_jwt()
headers = {
'Content-Type': 'application/json',
'Authorization': f"Bearer {encoded_jwt}"
}
payload = {
'template_id': self.email_template_id,
'email_address': self.email_address
}
self.client.post(
'/v2/notifications/email',
json=payload,
headers=headers
)