Skip to content
This repository was archived by the owner on May 27, 2025. It is now read-only.

Commit 587cb92

Browse files
integrated opentelemetry into application_insights code (#191)
Co-authored-by: Josh Bradley <[email protected]>
1 parent fe3c103 commit 587cb92

File tree

10 files changed

+2775
-1158
lines changed

10 files changed

+2775
-1158
lines changed

backend/poetry.lock

Lines changed: 1281 additions & 1120 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ authors = [
1010
"Douglas Orbaker <[email protected]>",
1111
"Chris Sanchez <[email protected]>",
1212
"Shane Solomon <[email protected]>",
13+
"Kenny Zhang <[email protected]>"
1314
]
1415
readme = "README.md"
1516
license = "MIT"
@@ -51,9 +52,8 @@ httpx = ">=0.25.2"
5152
kubernetes = ">=29.0.0"
5253
networkx = ">=3.2.1"
5354
nltk = "*"
54-
opencensus = ">=0.11.4"
55-
opencensus-context = ">=0.1.3"
56-
opencensus-ext-azure = ">=1.1.13"
55+
azure-monitor-opentelemetry-exporter = "*"
56+
opentelemetry-sdk = ">=1.27.0"
5757
pandas = ">=2.2.1"
5858
pyaml-env = ">=1.2.1"
5959
pyarrow = ">=15.0.0"

backend/src/api/query_streaming.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ async def global_search_streaming(request: GraphRequest):
6969
else:
7070
# Current investigations show that community level 1 is the most useful for global search. Set this as the default value
7171
COMMUNITY_LEVEL = 1
72-
72+
7373
for index_name in sanitized_index_names:
7474
validate_index_file_exist(index_name, COMMUNITY_REPORT_TABLE)
7575
validate_index_file_exist(index_name, ENTITIES_TABLE)
@@ -249,7 +249,7 @@ async def local_search_streaming(request: GraphRequest):
249249
NODES_TABLE = "output/create_final_nodes.parquet"
250250
RELATIONSHIPS_TABLE = "output/create_final_relationships.parquet"
251251
TEXT_UNITS_TABLE = "output/create_final_text_units.parquet"
252-
252+
253253
if isinstance(request.community_level, int):
254254
COMMUNITY_LEVEL = request.community_level
255255
else:

backend/src/models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class GraphRequest(BaseModel):
4646
class GraphResponse(BaseModel):
4747
result: Any
4848
context_data: Any
49-
49+
5050

5151
class GraphDataResponse(BaseModel):
5252
nodes: int

backend/src/reporting/application_insights_workflow_callbacks.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,17 @@
1212
Optional,
1313
)
1414

15+
from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter
1516
from datashaper.workflow.workflow_callbacks import NoopWorkflowCallbacks
16-
from opencensus.ext.azure.log_exporter import AzureLogHandler
17+
from opentelemetry._logs import (
18+
get_logger_provider,
19+
set_logger_provider,
20+
)
21+
from opentelemetry.sdk._logs import (
22+
LoggerProvider,
23+
LoggingHandler,
24+
)
25+
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
1726

1827

1928
class ApplicationInsightsWorkflowCallbacks(NoopWorkflowCallbacks):
@@ -74,15 +83,23 @@ def __init_logger(self, connection_string, max_logger_init_retries: int = 10):
7483
unique_hash = hashlib.sha256(current_time.encode()).hexdigest()
7584
self._logger_name = f"{self.__class__.__name__}-{unique_hash}"
7685
if self._logger_name not in logging.Logger.manager.loggerDict:
86+
# attach azure monitor log exporter to logger provider
87+
logger_provider = LoggerProvider()
88+
set_logger_provider(logger_provider)
89+
exporter = AzureMonitorLogExporter(connection_string=connection_string)
90+
get_logger_provider().add_log_record_processor(
91+
BatchLogRecordProcessor(
92+
exporter=exporter,
93+
schedule_delay_millis=60000,
94+
)
95+
)
7796
# instantiate new logger
7897
self._logger = logging.getLogger(self._logger_name)
7998
self._logger.propagate = False
8099
# remove any existing handlers
81100
self._logger.handlers.clear()
82-
# set up Azure Monitor
83-
self._logger.addHandler(
84-
AzureLogHandler(connection_string=connection_string)
85-
)
101+
# fetch handler from logger provider and attach to class
102+
self._logger.addHandler(LoggingHandler())
86103
# set logging level
87104
self._logger.setLevel(logging.DEBUG)
88105

@@ -91,8 +108,7 @@ def __init_logger(self, connection_string, max_logger_init_retries: int = 10):
91108

92109
def _format_details(self, details: Dict[str, Any] | None = None) -> Dict[str, Any]:
93110
"""
94-
Format the details dictionary to comply with the Application Insights structured.
95-
111+
Format the details dictionary to comply with the Application Insights structured
96112
logging Property column standard.
97113
98114
Args:

infra/deploy.sh

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -343,23 +343,28 @@ deployAzureResources () {
343343
--parameters "enablePrivateEndpoints=$ENABLE_PRIVATE_ENDPOINTS" \
344344
--parameters "acrName=$CONTAINER_REGISTRY_NAME" \
345345
--output json)
346+
# errors in deployment may not be caught by exitIfCommandFailed function so we also check the output for errors
346347
exitIfCommandFailed $? "Error deploying Azure resources..."
348+
exitIfValueEmpty "$AZURE_DEPLOY_RESULTS" "Error deploying Azure resources..."
347349
AZURE_OUTPUTS=$(jq -r .properties.outputs <<< $AZURE_DEPLOY_RESULTS)
348-
exitIfCommandFailed $? "Error parsing outputs from Azure resource deployment..."
350+
exitIfCommandFailed $? "Error parsing outputs from Azure deployment..."
351+
exitIfValueEmpty "$AZURE_OUTPUTS" "Error parsing outputs from Azure deployment..."
349352
assignAOAIRoleToManagedIdentity
350353
}
351354

352355
validateSKUs() {
353356
# Run SKU validation functions unless skip flag is set
354-
if [ $2 = true ]; then
355-
checkSKUAvailability $1
356-
checkSKUQuotas $1
357+
local location=$1
358+
local validate_skus=$2
359+
if [ $validate_skus = true ]; then
360+
checkSKUAvailability $location
361+
checkSKUQuotas $location
357362
fi
358363
}
359364

360365
checkSKUAvailability() {
361366
# Function to validate that the required SKUs are not restricted for the given region
362-
printf "Checking Location for SKU Availability... "
367+
printf "Checking cloud region for VM sku availability... "
363368
local location=$1
364369
local sku_checklist=("standard_d4s_v5" "standard_d8s_v5" "standard_e8s_v5")
365370
for sku in ${sku_checklist[@]}; do
@@ -682,6 +687,9 @@ startBanner
682687
checkRequiredTools
683688
populateParams $PARAMS_FILE
684689

690+
# Check SKU availability and quotas
691+
validateSKUs $LOCATION $VALIDATE_SKUS_FLAG
692+
685693
# Create resource group
686694
createResourceGroupIfNotExists $LOCATION $RESOURCE_GROUP
687695

@@ -690,7 +698,6 @@ createSshkeyIfNotExists $RESOURCE_GROUP
690698

691699
# Deploy Azure resources
692700
checkForApimSoftDelete
693-
validateSKUs $LOCATION $VALIDATE_SKUS_FLAG
694701
deployAzureResources
695702

696703
# Deploy the graphrag backend docker image to ACR

infra/helm/graphrag/values.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ ingress:
3030
service.beta.kubernetes.io/azure-load-balancer-internal: "true"
3131

3232
graphragConfig:
33+
AI_SEARCH_AUDIENCE: ""
34+
AI_SEARCH_URL: ""
35+
APP_INSIGHTS_CONNECTION_STRING: ""
3336
COSMOS_URI_ENDPOINT: ""
3437
GRAPHRAG_API_BASE: ""
3538
GRAPHRAG_API_VERSION: ""
@@ -40,7 +43,6 @@ graphragConfig:
4043
GRAPHRAG_EMBEDDING_DEPLOYMENT_NAME: ""
4144
REPORTERS: "blob,console,app_insights"
4245
STORAGE_ACCOUNT_BLOB_URL: ""
43-
AI_SEARCH_URL: ""
4446

4547
master:
4648
name: "master"

notebooks/1-Quickstart.ipynb

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -327,20 +327,28 @@
327327
"%%time\n",
328328
"\n",
329329
"\n",
330-
"def global_search(index_name: str | list[str], query: str, community_level: int) -> requests.Response:\n",
330+
"def global_search(\n",
331+
" index_name: str | list[str], query: str, community_level: int\n",
332+
") -> requests.Response:\n",
331333
" \"\"\"Run a global query over the knowledge graph(s) associated with one or more indexes\"\"\"\n",
332334
" url = endpoint + \"/query/global\"\n",
333335
" # optional parameter: community level to query the graph at (default for global query = 1)\n",
334-
" request = {\"index_name\": index_name, \"query\": query, \"community_level\": community_level}\n",
336+
" request = {\n",
337+
" \"index_name\": index_name,\n",
338+
" \"query\": query,\n",
339+
" \"community_level\": community_level,\n",
340+
" }\n",
335341
" return requests.post(url, json=request, headers=headers)\n",
336342
"\n",
337343
"\n",
338344
"# perform a global query\n",
339345
"global_response = global_search(\n",
340-
" index_name=index_name, query=\"Summarize the main topics found in this data\", community_level=1\n",
346+
" index_name=index_name,\n",
347+
" query=\"Summarize the main topics found in this data\",\n",
348+
" community_level=1,\n",
341349
")\n",
342350
"global_response_data = parse_query_response(global_response, return_context_data=True)\n",
343-
"global_response_data\n"
351+
"global_response_data"
344352
]
345353
},
346354
{
@@ -361,17 +369,25 @@
361369
"%%time\n",
362370
"\n",
363371
"\n",
364-
"def local_search(index_name: str | list[str], query: str, community_level: int) -> requests.Response:\n",
372+
"def local_search(\n",
373+
" index_name: str | list[str], query: str, community_level: int\n",
374+
") -> requests.Response:\n",
365375
" \"\"\"Run a local query over the knowledge graph(s) associated with one or more indexes\"\"\"\n",
366376
" url = endpoint + \"/query/local\"\n",
367377
" # optional parameter: community level to query the graph at (default for local query = 2)\n",
368-
" request = {\"index_name\": index_name, \"query\": query, \"community_level\": community_level}\n",
378+
" request = {\n",
379+
" \"index_name\": index_name,\n",
380+
" \"query\": query,\n",
381+
" \"community_level\": community_level,\n",
382+
" }\n",
369383
" return requests.post(url, json=request, headers=headers)\n",
370384
"\n",
371385
"\n",
372386
"# perform a local query\n",
373387
"local_response = local_search(\n",
374-
" index_name=index_name, query=\"Summarize the main topics found in this data\", community_level=2\n",
388+
" index_name=index_name,\n",
389+
" query=\"Summarize the main topics found in this data\",\n",
390+
" community_level=2,\n",
375391
")\n",
376392
"local_response_data = parse_query_response(local_response, return_context_data=True)\n",
377393
"local_response_data"
@@ -380,7 +396,7 @@
380396
],
381397
"metadata": {
382398
"kernelspec": {
383-
"display_name": ".venv",
399+
"display_name": "graphrag-venv",
384400
"language": "python",
385401
"name": "python3"
386402
},

notebooks/2-Advanced_Getting_Started.ipynb

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -324,11 +324,17 @@
324324
" return requests.get(url, headers=headers)\n",
325325
"\n",
326326
"\n",
327-
"def global_search(index_name: str | list[str], query: str, community_level: int) -> requests.Response:\n",
327+
"def global_search(\n",
328+
" index_name: str | list[str], query: str, community_level: int\n",
329+
") -> requests.Response:\n",
328330
" \"\"\"Run a global query over the knowledge graph(s) associated with one or more indexes\"\"\"\n",
329331
" url = endpoint + \"/query/global\"\n",
330332
" # optional parameter: community level to query the graph at (default for global query = 1)\n",
331-
" request = {\"index_name\": index_name, \"query\": query, \"community_level\": community_level}\n",
333+
" request = {\n",
334+
" \"index_name\": index_name,\n",
335+
" \"query\": query,\n",
336+
" \"community_level\": community_level,\n",
337+
" }\n",
332338
" return requests.post(url, json=request, headers=headers)\n",
333339
"\n",
334340
"\n",
@@ -338,7 +344,11 @@
338344
" \"\"\"Run a global query across one or more indexes and stream back the response\"\"\"\n",
339345
" url = endpoint + \"/query/streaming/global\"\n",
340346
" # optional parameter: community level to query the graph at (default for global query = 1)\n",
341-
" request = {\"index_name\": index_name, \"query\": query, \"community_level\": community_level}\n",
347+
" request = {\n",
348+
" \"index_name\": index_name,\n",
349+
" \"query\": query,\n",
350+
" \"community_level\": community_level,\n",
351+
" }\n",
342352
" context_list = []\n",
343353
" with requests.post(url, json=request, headers=headers, stream=True) as r:\n",
344354
" r.raise_for_status()\n",
@@ -358,11 +368,17 @@
358368
" display(pd.DataFrame.from_dict(context_list[0][\"reports\"]).head(10))\n",
359369
"\n",
360370
"\n",
361-
"def local_search(index_name: str | list[str], query: str, community_level: int) -> requests.Response:\n",
371+
"def local_search(\n",
372+
" index_name: str | list[str], query: str, community_level: int\n",
373+
") -> requests.Response:\n",
362374
" \"\"\"Run a local query over the knowledge graph(s) associated with one or more indexes\"\"\"\n",
363375
" url = endpoint + \"/query/local\"\n",
364376
" # optional parameter: community level to query the graph at (default for local query = 2)\n",
365-
" request = {\"index_name\": index_name, \"query\": query, \"community_level\": community_level}\n",
377+
" request = {\n",
378+
" \"index_name\": index_name,\n",
379+
" \"query\": query,\n",
380+
" \"community_level\": community_level,\n",
381+
" }\n",
366382
" return requests.post(url, json=request, headers=headers)\n",
367383
"\n",
368384
"\n",
@@ -372,7 +388,11 @@
372388
" \"\"\"Run a global query across one or more indexes and stream back the response\"\"\"\n",
373389
" url = endpoint + \"/query/streaming/local\"\n",
374390
" # optional parameter: community level to query the graph at (default for local query = 2)\n",
375-
" request = {\"index_name\": index_name, \"query\": query, \"community_level\": community_level}\n",
391+
" request = {\n",
392+
" \"index_name\": index_name,\n",
393+
" \"query\": query,\n",
394+
" \"community_level\": community_level,\n",
395+
" }\n",
376396
" context_list = []\n",
377397
" with requests.post(url, json=request, headers=headers, stream=True) as r:\n",
378398
" r.raise_for_status()\n",
@@ -746,7 +766,9 @@
746766
"%%time\n",
747767
"# pass in a single index name as a string or to query across multiple indexes, set index_name=[myindex1, myindex2]\n",
748768
"global_response = global_search(\n",
749-
" index_name=index_name, query=\"Summarize the main topics found in this data\", community_level=1\n",
769+
" index_name=index_name,\n",
770+
" query=\"Summarize the main topics found in this data\",\n",
771+
" community_level=1,\n",
750772
")\n",
751773
"# print the result and save context data in a variable\n",
752774
"global_response_data = parse_query_response(global_response, return_context_data=True)\n",
@@ -769,7 +791,9 @@
769791
"outputs": [],
770792
"source": [
771793
"global_search_streaming(\n",
772-
" index_name=index_name, query=\"Summarize the main topics found in this data\", community_level=1\n",
794+
" index_name=index_name,\n",
795+
" query=\"Summarize the main topics found in this data\",\n",
796+
" community_level=1,\n",
773797
")"
774798
]
775799
},
@@ -797,7 +821,7 @@
797821
"local_response = local_search(\n",
798822
" index_name=index_name,\n",
799823
" query=\"Who are the primary actors in these communities?\",\n",
800-
" community_level=2\n",
824+
" community_level=2,\n",
801825
")\n",
802826
"# print the result and save context data in a variable\n",
803827
"local_response_data = parse_query_response(local_response, return_context_data=True)\n",
@@ -822,7 +846,7 @@
822846
"local_search_streaming(\n",
823847
" index_name=index_name,\n",
824848
" query=\"Who are the primary actors in these communities?\",\n",
825-
" community_level=2\n",
849+
" community_level=2,\n",
826850
")"
827851
]
828852
},

0 commit comments

Comments
 (0)