Skip to content

Commit 84e3330

Browse files
feat(instrumentation-aws-sdk): add Bedrock InvokeModelWithResponseStream instrumentation (#2845)
Co-authored-by: Trent Mick <[email protected]>
1 parent c302e35 commit 84e3330

8 files changed

+998
-2
lines changed

packages/instrumentation-aws-sdk/src/services/bedrock-runtime.ts

Lines changed: 226 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import {
1717
Attributes,
1818
DiagLogger,
19+
diag,
1920
Histogram,
2021
HrTime,
2122
Meter,
@@ -61,6 +62,7 @@ import {
6162
export class BedrockRuntimeServiceExtension implements ServiceExtension {
6263
private tokenUsage!: Histogram;
6364
private operationDuration!: Histogram;
65+
private _diag: DiagLogger = diag;
6466

6567
updateMetricInstruments(meter: Meter) {
6668
// https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-metrics/#metric-gen_aiclienttokenusage
@@ -103,7 +105,9 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
103105
case 'ConverseStream':
104106
return this.requestPreSpanHookConverse(request, config, diag, true);
105107
case 'InvokeModel':
106-
return this.requestPreSpanHookInvokeModel(request, config, diag);
108+
return this.requestPreSpanHookInvokeModel(request, config, diag, false);
109+
case 'InvokeModelWithResponseStream':
110+
return this.requestPreSpanHookInvokeModel(request, config, diag, true);
107111
}
108112

109113
return {
@@ -159,7 +163,8 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
159163
private requestPreSpanHookInvokeModel(
160164
request: NormalizedRequest,
161165
config: AwsSdkInstrumentationConfig,
162-
diag: DiagLogger
166+
diag: DiagLogger,
167+
isStream: boolean
163168
): RequestMetadata {
164169
let spanName: string | undefined;
165170
const spanAttributes: Attributes = {
@@ -314,6 +319,7 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
314319
return {
315320
spanName,
316321
isIncoming: false,
322+
isStream,
317323
spanAttributes,
318324
};
319325
}
@@ -348,6 +354,13 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
348354
);
349355
case 'InvokeModel':
350356
return this.responseHookInvokeModel(response, span, tracer, config);
357+
case 'InvokeModelWithResponseStream':
358+
return this.responseHookInvokeModelWithResponseStream(
359+
response,
360+
span,
361+
tracer,
362+
config
363+
);
351364
}
352365
}
353366

@@ -581,4 +594,215 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
581594
}
582595
}
583596
}
597+
598+
private async responseHookInvokeModelWithResponseStream(
599+
response: NormalizedResponse,
600+
span: Span,
601+
tracer: Tracer,
602+
config: AwsSdkInstrumentationConfig
603+
): Promise<any> {
604+
const stream = response.data?.body;
605+
const modelId = response.request.commandInput?.modelId;
606+
if (!stream || !modelId) return;
607+
608+
// Replace the original response body with our instrumented stream.
609+
// - Defers span.end() until the entire stream is consumed
610+
// This ensures downstream consumers still receive the full stream correctly,
611+
// while OpenTelemetry can record span attributes from streamed data.
612+
response.data.body = async function* (
613+
this: BedrockRuntimeServiceExtension
614+
) {
615+
try {
616+
for await (const chunk of stream) {
617+
const parsedChunk = this.parseChunk(chunk?.chunk?.bytes);
618+
619+
if (!parsedChunk) {
620+
// pass through
621+
} else if (modelId.includes('amazon.titan')) {
622+
BedrockRuntimeServiceExtension.recordTitanAttributes(
623+
parsedChunk,
624+
span
625+
);
626+
} else if (modelId.includes('anthropic.claude')) {
627+
BedrockRuntimeServiceExtension.recordClaudeAttributes(
628+
parsedChunk,
629+
span
630+
);
631+
} else if (modelId.includes('amazon.nova')) {
632+
BedrockRuntimeServiceExtension.recordNovaAttributes(
633+
parsedChunk,
634+
span
635+
);
636+
} else if (modelId.includes('meta.llama')) {
637+
BedrockRuntimeServiceExtension.recordLlamaAttributes(
638+
parsedChunk,
639+
span
640+
);
641+
} else if (modelId.includes('cohere.command-r')) {
642+
BedrockRuntimeServiceExtension.recordCohereRAttributes(
643+
parsedChunk,
644+
span
645+
);
646+
} else if (modelId.includes('cohere.command')) {
647+
BedrockRuntimeServiceExtension.recordCohereAttributes(
648+
parsedChunk,
649+
span
650+
);
651+
} else if (modelId.includes('mistral')) {
652+
BedrockRuntimeServiceExtension.recordMistralAttributes(
653+
parsedChunk,
654+
span
655+
);
656+
}
657+
yield chunk;
658+
}
659+
} finally {
660+
span.end();
661+
}
662+
}.bind(this)();
663+
return response.data;
664+
}
665+
666+
private parseChunk(bytes?: Uint8Array): any {
667+
if (!bytes || !(bytes instanceof Uint8Array)) return null;
668+
try {
669+
const str = Buffer.from(bytes).toString('utf-8');
670+
return JSON.parse(str);
671+
} catch (err) {
672+
this._diag.warn('Failed to parse streamed chunk', err);
673+
return null;
674+
}
675+
}
676+
677+
private static recordNovaAttributes(parsedChunk: any, span: Span) {
678+
if (parsedChunk.metadata?.usage !== undefined) {
679+
if (parsedChunk.metadata?.usage.inputTokens !== undefined) {
680+
span.setAttribute(
681+
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
682+
parsedChunk.metadata.usage.inputTokens
683+
);
684+
}
685+
if (parsedChunk.metadata?.usage.outputTokens !== undefined) {
686+
span.setAttribute(
687+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
688+
parsedChunk.metadata.usage.outputTokens
689+
);
690+
}
691+
}
692+
if (parsedChunk.messageStop?.stopReason !== undefined) {
693+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
694+
parsedChunk.messageStop.stopReason,
695+
]);
696+
}
697+
}
698+
699+
private static recordClaudeAttributes(parsedChunk: any, span: Span) {
700+
if (parsedChunk.message?.usage?.input_tokens !== undefined) {
701+
span.setAttribute(
702+
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
703+
parsedChunk.message.usage.input_tokens
704+
);
705+
}
706+
if (parsedChunk.message?.usage?.output_tokens !== undefined) {
707+
span.setAttribute(
708+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
709+
parsedChunk.message.usage.output_tokens
710+
);
711+
}
712+
if (parsedChunk.delta?.stop_reason !== undefined) {
713+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
714+
parsedChunk.delta.stop_reason,
715+
]);
716+
}
717+
}
718+
719+
private static recordTitanAttributes(parsedChunk: any, span: Span) {
720+
if (parsedChunk.inputTextTokenCount !== undefined) {
721+
span.setAttribute(
722+
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
723+
parsedChunk.inputTextTokenCount
724+
);
725+
}
726+
if (parsedChunk.totalOutputTextTokenCount !== undefined) {
727+
span.setAttribute(
728+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
729+
parsedChunk.totalOutputTextTokenCount
730+
);
731+
}
732+
if (parsedChunk.completionReason !== undefined) {
733+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
734+
parsedChunk.completionReason,
735+
]);
736+
}
737+
}
738+
private static recordLlamaAttributes(parsedChunk: any, span: Span) {
739+
if (parsedChunk.prompt_token_count !== undefined) {
740+
span.setAttribute(
741+
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
742+
parsedChunk.prompt_token_count
743+
);
744+
}
745+
if (parsedChunk.generation_token_count !== undefined) {
746+
span.setAttribute(
747+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
748+
parsedChunk.generation_token_count
749+
);
750+
}
751+
if (parsedChunk.stop_reason !== undefined) {
752+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
753+
parsedChunk.stop_reason,
754+
]);
755+
}
756+
}
757+
758+
private static recordMistralAttributes(parsedChunk: any, span: Span) {
759+
if (parsedChunk.outputs?.[0]?.text !== undefined) {
760+
span.setAttribute(
761+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
762+
// NOTE: We approximate the token count since this value is not directly available in the body
763+
// According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
764+
// https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
765+
Math.ceil(parsedChunk.outputs[0].text.length / 6)
766+
);
767+
}
768+
if (parsedChunk.outputs?.[0]?.stop_reason !== undefined) {
769+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
770+
parsedChunk.outputs[0].stop_reason,
771+
]);
772+
}
773+
}
774+
775+
private static recordCohereAttributes(parsedChunk: any, span: Span) {
776+
if (parsedChunk.generations?.[0]?.text !== undefined) {
777+
span.setAttribute(
778+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
779+
// NOTE: We approximate the token count since this value is not directly available in the body
780+
// According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
781+
// https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
782+
Math.ceil(parsedChunk.generations[0].text.length / 6)
783+
);
784+
}
785+
if (parsedChunk.generations?.[0]?.finish_reason !== undefined) {
786+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
787+
parsedChunk.generations[0].finish_reason,
788+
]);
789+
}
790+
}
791+
792+
private static recordCohereRAttributes(parsedChunk: any, span: Span) {
793+
if (parsedChunk.text !== undefined) {
794+
// NOTE: We approximate the token count since this value is not directly available in the body
795+
// According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
796+
// https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
797+
span.setAttribute(
798+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
799+
Math.ceil(parsedChunk.text.length / 6)
800+
);
801+
}
802+
if (parsedChunk.finish_reason !== undefined) {
803+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
804+
parsedChunk.finish_reason,
805+
]);
806+
}
807+
}
584808
}

0 commit comments

Comments
 (0)