-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathazure_ai_foundry.py
161 lines (143 loc) · 6.16 KB
/
azure_ai_foundry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""
title: Azure AI Foundry Pipeline
author: owndev
author_url: https://github.com/owndev
project_url: https://github.com/owndev/Open-WebUI-Functions
funding_url: https://github.com/owndev/Open-WebUI-Functions
version: 1.0.2
license: MIT
description: A Python-based pipeline for interacting with Azure AI services, enabling seamless communication with various AI models via configurable headers and robust error handling. This includes support for Azure OpenAI models as well as other Azure AI models by dynamically managing headers and request configurations.
features:
- Supports dynamic model specification via headers.
- Filters valid parameters to ensure clean requests.
- Handles streaming and non-streaming responses.
- Provides flexible timeout and error handling mechanisms.
- Compatible with Azure OpenAI and other Azure AI models.
"""
from typing import Union, Generator, Iterator
from pydantic import BaseModel, Field
import requests
import os
class Pipe:
# Environment variables for API key, endpoint, and optional model
class Valves(BaseModel):
# API key for Azure AI
AZURE_AI_API_KEY: str = Field(
default=os.getenv("AZURE_AI_API_KEY", "API_KEY"),
description="API key for Azure AI"
)
# Endpoint for Azure AI (e.g. "https://<your-endpoint>/chat/completions?api-version=2024-05-01-preview" or "https://<your-endpoint>/openai/deployments/gpt-4o/chat/completions?api-version=2024-08-01-preview")
AZURE_AI_ENDPOINT: str = Field(
default=os.getenv(
"AZURE_AI_ENDPOINT",
"https://<your-endpoint>/chat/completions?api-version=2024-05-01-preview"
),
description="Endpoint for Azure AI"
)
# Optional model name, only necessary if not Azure OpenAI or if model name not in URL (e.g. "https://<your-endpoint>/openai/deployments/<model-name>/chat/completions")
AZURE_AI_MODEL: str = Field(
default=os.getenv("AZURE_AI_MODEL", ""),
description="Optional model name for Azure AI"
)
# Switch for sending model name in request body
AZURE_AI_MODEL_IN_BODY: bool = Field(
default=False,
description="If True, include the model name in the request body instead of as a header.",
)
def __init__(self):
self.name = "Azure AI"
self.valves = self.Valves()
self.validate_environment()
def validate_environment(self):
"""
Validates that required environment variables are set.
"""
if not self.valves.AZURE_AI_API_KEY:
raise ValueError("AZURE_AI_API_KEY is not set!")
if not self.valves.AZURE_AI_ENDPOINT:
raise ValueError("AZURE_AI_ENDPOINT is not set!")
def get_headers(self) -> dict:
"""
Constructs the headers for the API request, including the model name if defined.
"""
headers = {
"api-key": self.valves.AZURE_AI_API_KEY,
"Content-Type": "application/json",
}
# If the valve indicates that the model name should be in the body,
# add it to the filtered body.
if self.valves.AZURE_AI_MODEL and not self.valves.AZURE_AI_MODEL_IN_BODY:
headers["x-ms-model-mesh-model-name"] = self.valves.AZURE_AI_MODEL
return headers
def validate_body(self, body: dict):
"""
Validates the request body to ensure required fields are present.
"""
if "messages" not in body or not isinstance(body["messages"], list):
raise ValueError("The 'messages' field is required and must be a list.")
def pipe(self, body: dict) -> Union[str, Generator, Iterator]:
"""
Main method for sending requests to the Azure AI endpoint.
The model name is passed as a header if defined.
"""
# Validate the request body
self.validate_body(body)
# Construct headers
headers = self.get_headers()
# Filter allowed parameters
allowed_params = {
"model",
"messages",
"frequency_penalty",
"max_tokens",
"presence_penalty",
"response_format",
"seed",
"stop",
"stream",
"temperature",
"tool_choice",
"tools",
"top_p",
}
filtered_body = {k: v for k, v in body.items() if k in allowed_params}
# If the valve indicates that the model name should be in the body,
# add it to the filtered body.
if self.valves.AZURE_AI_MODEL and self.valves.AZURE_AI_MODEL_IN_BODY:
filtered_body["model"] = self.valves.AZURE_AI_MODEL
response = None
try:
# Check for streaming support
do_stream = filtered_body.get("stream", False)
# Send POST request to the endpoint
response = requests.post(
url=self.valves.AZURE_AI_ENDPOINT,
json=filtered_body,
headers=headers,
stream=do_stream,
timeout=600 if do_stream else 300, # Longer timeout for streaming
)
response.raise_for_status()
# Return streaming or full response
if do_stream:
return self.stream_response(response)
else:
return response.json()
except requests.exceptions.Timeout:
return "Error: Request timed out. The server did not respond in time."
except requests.exceptions.ConnectionError:
return "Error: Failed to connect to the server. Check your network or endpoint."
except requests.exceptions.HTTPError as http_err:
return f"HTTP Error: {http_err.response.status_code} - {http_err.response.text}"
except Exception as e:
return f"Unexpected error: {e}"
def stream_response(self, response):
"""
Handles streaming responses line by line.
"""
try:
for line in response.iter_lines():
if line: # Skip empty lines
yield line
except Exception as e:
yield f"Error while streaming: {e}"