-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathtime_token_tracker.py
153 lines (133 loc) · 6.62 KB
/
time_token_tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""
title: Time Token Tracker
author: owndev
author_url: https://github.com/owndev
project_url: https://github.com/owndev/Open-WebUI-Functions
funding_url: https://github.com/owndev/Open-WebUI-Functions
version: 2.3.0
license: MIT
description: A filter for tracking the response time and token usage of a request.
features:
- Tracks the response time of a request.
- Tracks Token Usage.
- Calculates the average tokens per message.
- Calculates the tokens per second.
"""
import time
from typing import Optional
import tiktoken
from pydantic import BaseModel, Field
# Global variables to track start time and token counts
global start_time, request_token_count, response_token_count
class Filter:
class Valves(BaseModel):
priority: int = Field(
default=0, description="Priority level for the filter operations."
)
CALCULATE_ALL_MESSAGES: bool = Field(
default=True,
description="If true, calculate tokens for all messages. If false, only use the last user and assistant messages."
)
SHOW_AVERAGE_TOKENS: bool = Field(
default=True,
description="Show average tokens per message (only used if CALCULATE_ALL_MESSAGES is true)."
)
SHOW_RESPONSE_TIME: bool = Field(
default=True,
description="Show the response time."
)
SHOW_TOKEN_COUNT: bool = Field(
default=True,
description="Show the token count."
)
SHOW_TOKENS_PER_SECOND: bool = Field(
default=True,
description="Show tokens per second for the response."
)
def __init__(self):
self.name = "Time Token Tracker"
self.valves = self.Valves()
async def inlet(self, body: dict, __user__: Optional[dict] = None, __event_emitter__=None) -> dict:
global start_time, request_token_count
start_time = time.time()
model = body.get("model", "default-model")
all_messages = body.get("messages", [])
try:
encoding = tiktoken.encoding_for_model(model)
except KeyError:
encoding = tiktoken.get_encoding("cl100k_base")
# If CALCULATE_ALL_MESSAGES is true, use all "user" and "system" messages
if self.valves.CALCULATE_ALL_MESSAGES:
request_messages = [m for m in all_messages if m.get("role") in ("user", "system")]
else:
# If CALCULATE_ALL_MESSAGES is false and there are exactly two messages
# (one user and one system), sum them both.
request_user_system = [m for m in all_messages if m.get("role") in ("user", "system")]
if len(request_user_system) == 2:
request_messages = request_user_system
else:
# Otherwise, take only the last "user" or "system" message if any
reversed_messages = list(reversed(all_messages))
last_user_system = next(
(m for m in reversed_messages if m.get("role") in ("user", "system")),
None
)
request_messages = [last_user_system] if last_user_system else []
request_token_count = sum(len(encoding.encode(m["content"])) for m in request_messages)
return body
async def outlet(self, body: dict, __user__: Optional[dict] = None, __event_emitter__=None) -> dict:
global start_time, request_token_count, response_token_count
end_time = time.time()
response_time = end_time - start_time
model = body.get("model", "default-model")
all_messages = body.get("messages", [])
try:
encoding = tiktoken.encoding_for_model(model)
except KeyError:
encoding = tiktoken.get_encoding("cl100k_base")
reversed_messages = list(reversed(all_messages))
# If CALCULATE_ALL_MESSAGES is true, use all "assistant" messages
if self.valves.CALCULATE_ALL_MESSAGES:
assistant_messages = [m for m in all_messages if m.get("role") == "assistant"]
else:
# Take only the last "assistant" message if any
last_assistant = next(
(m for m in reversed_messages if m.get("role") == "assistant"),
None
)
assistant_messages = [last_assistant] if last_assistant else []
response_token_count = sum(len(encoding.encode(m["content"])) for m in assistant_messages)
# Calculate tokens per second (only for the last assistant response)
if self.valves.SHOW_TOKENS_PER_SECOND:
last_assistant_msg = next((m for m in reversed_messages if m.get("role") == "assistant"), None)
last_assistant_tokens = len(encoding.encode(last_assistant_msg["content"])) if last_assistant_msg else 0
resp_tokens_per_sec = 0 if response_time == 0 else last_assistant_tokens / response_time
# Calculate averages only if CALCULATE_ALL_MESSAGES is true
avg_request_tokens = avg_response_tokens = 0
if self.valves.SHOW_AVERAGE_TOKENS and self.valves.CALCULATE_ALL_MESSAGES:
req_count = len([m for m in all_messages if m.get("role") in ("user", "system")])
resp_count = len([m for m in all_messages if m.get("role") == "assistant"])
avg_request_tokens = request_token_count / req_count if req_count else 0
avg_response_tokens = response_token_count / resp_count if resp_count else 0
# Shorter style, e.g.: "10.90s | Req: 175 (Ø 87.50) | Resp: 439 (Ø 219.50) | 40.18 T/s"
description_parts = []
if self.valves.SHOW_RESPONSE_TIME:
description_parts.append(f"{response_time:.2f}s")
if self.valves.SHOW_TOKEN_COUNT:
if self.valves.SHOW_AVERAGE_TOKENS and self.valves.CALCULATE_ALL_MESSAGES:
# Add averages (Ø) into short output
short_str = (
f"Req: {request_token_count} (Ø {avg_request_tokens:.2f}) | "
f"Resp: {response_token_count} (Ø {avg_response_tokens:.2f})"
)
else:
short_str = f"Req: {request_token_count} | Resp: {response_token_count}"
description_parts.append(short_str)
if self.valves.SHOW_TOKENS_PER_SECOND:
description_parts.append(f"{resp_tokens_per_sec:.2f} T/s")
description = " | ".join(description_parts)
await __event_emitter__({
"type": "status",
"data": {"description": description, "done": True},
})
return body