Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions packages/dd-trace/src/llmobs/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const telemetry = require('./telemetry')
const LLMObsSpanProcessor = require('./span_processor')

const { channel } = require('dc-polyfill')
const spanProcessCh = channel('dd-trace:span:process')
const spanFinishCh = channel('dd-trace:span:finish')
const evalMetricAppendCh = channel('llmobs:eval-metric:append')
const flushCh = channel('llmobs:writers:flush')
const injectCh = channel('dd-trace:span:inject')
Expand Down Expand Up @@ -62,7 +62,7 @@ function enable (config) {
// span processing
spanProcessor = new LLMObsSpanProcessor(config)
spanProcessor.setWriter(spanWriter)
spanProcessCh.subscribe(handleSpanProcess)
spanFinishCh.subscribe(handleSpanProcess)

// distributed tracing for llmobs
injectCh.subscribe(handleLLMObsParentIdInjection)
Expand All @@ -86,7 +86,7 @@ function enable (config) {
function disable () {
if (evalMetricAppendCh.hasSubscribers) evalMetricAppendCh.unsubscribe(handleEvalMetricAppend)
if (flushCh.hasSubscribers) flushCh.unsubscribe(handleFlush)
if (spanProcessCh.hasSubscribers) spanProcessCh.unsubscribe(handleSpanProcess)
if (spanFinishCh.hasSubscribers) spanFinishCh.unsubscribe(handleSpanProcess)
if (injectCh.hasSubscribers) injectCh.unsubscribe(handleLLMObsParentIdInjection)
if (registerUserSpanProcessorCh.hasSubscribers) registerUserSpanProcessorCh.unsubscribe(handleRegisterProcessor)

Expand Down Expand Up @@ -133,8 +133,8 @@ function handleRegisterProcessor (userSpanProcessor) {
spanProcessor.setUserSpanProcessor(userSpanProcessor)
}

function handleSpanProcess (data) {
spanProcessor.process(data)
function handleSpanProcess (span) {
spanProcessor.process(span)
}

function handleEvalMetricAppend (payload) {
Expand Down
2 changes: 1 addition & 1 deletion packages/dd-trace/src/llmobs/span_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class LLMObsSpanProcessor {
}

// TODO: instead of relying on the tagger's weakmap registry, can we use some namespaced storage correlation?
process ({ span }) {
process (span) {
if (!this.#config.llmobs.enabled) return
// if the span is not in our private tagger map, it is not an llmobs span
if (!LLMObsTagger.tagMap.has(span)) return
Expand Down
5 changes: 0 additions & 5 deletions packages/dd-trace/src/span_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ const { getEnvironmentVariable } = require('./config-helper')
const startedSpans = new WeakSet()
const finishedSpans = new WeakSet()

const { channel } = require('dc-polyfill')
const spanProcessCh = channel('dd-trace:span:process')

class SpanProcessor {
constructor (exporter, prioritySampler, config) {
this._exporter = exporter
Expand Down Expand Up @@ -57,8 +54,6 @@ class SpanProcessor {
isChunkRoot = false
this._stats?.onSpanFinished(formattedSpan)
formatted.push(formattedSpan)

spanProcessCh.publish({ span })
}
}

Expand Down
4 changes: 2 additions & 2 deletions packages/dd-trace/test/llmobs/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const proxyquire = require('proxyquire')

const AgentInfoExporter = require('../../src/exporters/common/agent-info-exporter')

const spanProcessCh = channel('dd-trace:span:process')
const spanFinishCh = channel('dd-trace:span:finish')
const evalMetricAppendCh = channel('llmobs:eval-metric:append')
const flushCh = channel('llmobs:writers:flush')
const injectCh = channel('dd-trace:span:inject')
Expand Down Expand Up @@ -260,7 +260,7 @@ describe('module', () => {

expect(injectCh.hasSubscribers).to.be.false
expect(evalMetricAppendCh.hasSubscribers).to.be.false
expect(spanProcessCh.hasSubscribers).to.be.false
expect(spanFinishCh.hasSubscribers).to.be.false
expect(flushCh.hasSubscribers).to.be.false
})
})
4 changes: 2 additions & 2 deletions packages/dd-trace/test/llmobs/plugins/ai/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ describe('Plugin', () => {

const toolCallId = result.steps[0].toolCalls[0].toolCallId

const { apmSpans, llmobsSpans } = await getEvents()
const { apmSpans, llmobsSpans } = await getEvents(4)

let expectedFinalOutput

Expand Down Expand Up @@ -565,7 +565,7 @@ describe('Plugin', () => {
const steps = stepsPromise.status.value
const toolCallId = steps[0].toolCalls[0].toolCallId

const { apmSpans, llmobsSpans } = await getEvents()
const { apmSpans, llmobsSpans } = await getEvents(4)

let expectedFinalOutput

Expand Down
14 changes: 7 additions & 7 deletions packages/dd-trace/test/llmobs/plugins/langchain/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ describe('integrations', () => {

await chain.invoke({ input: 'Can you tell me about LangSmith?' })

const { apmSpans, llmobsSpans } = await getEvents()
const { apmSpans, llmobsSpans } = await getEvents(2)

const workflowSpan = apmSpans[0]
const llmSpan = apmSpans[1]
Expand Down Expand Up @@ -497,7 +497,7 @@ describe('integrations', () => {
})
assert.ok(result)

const { apmSpans, llmobsSpans } = await getEvents()
const { apmSpans, llmobsSpans } = await getEvents(5)

const topLevelWorkflow = apmSpans[0]
const firstSubWorkflow = apmSpans[1]
Expand Down Expand Up @@ -604,7 +604,7 @@ describe('integrations', () => {

await chain.batch(['chickens', 'dogs'])

const { apmSpans, llmobsSpans } = await getEvents()
const { apmSpans, llmobsSpans } = await getEvents(3)

const workflowSpan = apmSpans[0]
const firstLLMSpan = apmSpans[1]
Expand Down Expand Up @@ -677,7 +677,7 @@ describe('integrations', () => {
input: 'What is the powerhouse of the cell?'
})

const { apmSpans, llmobsSpans } = await getEvents()
const { apmSpans, llmobsSpans } = await getEvents(2)

const workflowSpan = apmSpans[0]
const llmSpan = apmSpans[1]
Expand Down Expand Up @@ -760,7 +760,7 @@ describe('integrations', () => {

await chain.invoke({ foo: 'bar' })

const { apmSpans, llmobsSpans } = await getEvents()
const { apmSpans, llmobsSpans } = await getEvents(3)

const workflowSpan = apmSpans[0]
const taskSpan = apmSpans[1]
Expand Down Expand Up @@ -893,7 +893,7 @@ describe('integrations', () => {
it('submits a retrieval span with a child embedding span for similaritySearch', async () => {
await vectorstore.similaritySearch('Biology')

const { apmSpans, llmobsSpans } = await getEvents()
const { apmSpans, llmobsSpans } = await getEvents(2)

// first call was for the embedding span in the beforeEach
const retrievalSpanEvent = llmobsSpans[0]
Expand All @@ -918,7 +918,7 @@ describe('integrations', () => {
it('submits a retrieval span with a child embedding span for similaritySearchWithScore', async () => {
await vectorstore.similaritySearchWithScore('Biology')

const { apmSpans, llmobsSpans } = await getEvents()
const { apmSpans, llmobsSpans } = await getEvents(2)

// first call was for the embedding span in the beforeEach
const retrievalSpanEvent = llmobsSpans[0]
Expand Down
16 changes: 9 additions & 7 deletions packages/dd-trace/test/llmobs/sdk/integration.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const sinon = require('sinon')
const { useLlmObs, assertLlmObsSpanEvent } = require('../util')

const assert = require('node:assert')
const agent = require('../../plugins/agent')

function getTag (llmobsSpan, tagName) {
const tag = llmobsSpan.tags.find(tag => tag.split(':')[0] === tagName)
Expand Down Expand Up @@ -37,7 +38,7 @@ describe('end to end sdk integration tests', () => {

assert.equal(result, 'boom')

const { apmSpans, llmobsSpans } = await getEvents()
const { apmSpans, llmobsSpans } = await getEvents(2)
assert.equal(apmSpans.length, 3)
assert.equal(llmobsSpans.length, 2)

Expand Down Expand Up @@ -86,7 +87,7 @@ describe('end to end sdk integration tests', () => {

agent('my custom input')

const { apmSpans, llmobsSpans } = await getEvents()
const { apmSpans, llmobsSpans } = await getEvents(2)
assert.equal(apmSpans.length, 3)
assert.equal(llmobsSpans.length, 2)

Expand Down Expand Up @@ -167,7 +168,7 @@ describe('end to end sdk integration tests', () => {
llmobs.trace({ kind: 'workflow', name: 'child' }, () => {})
})

const { llmobsSpans } = await getEvents()
const { llmobsSpans } = await getEvents(2)
assert.equal(llmobsSpans.length, 2)

assert.equal(getTag(llmobsSpans[0], 'ml_app'), 'test')
Expand All @@ -185,7 +186,7 @@ describe('end to end sdk integration tests', () => {
llmobs.trace({ kind: 'workflow', name: 'child' }, () => {})
})

const { llmobsSpans } = await getEvents()
const { llmobsSpans } = await getEvents(2)
assert.equal(llmobsSpans.length, 2)

assert.equal(getTag(llmobsSpans[0], 'ml_app'), 'span-level-ml-app')
Expand Down Expand Up @@ -213,7 +214,7 @@ describe('end to end sdk integration tests', () => {
llmobs.trace({ kind: 'workflow', name: 'child-2' }, () => {})
})

const { llmobsSpans } = await getEvents()
const { llmobsSpans } = await getEvents(3)
assert.equal(llmobsSpans.length, 3)

assert.equal(getTag(llmobsSpans[0], 'ml_app'), 'test')
Expand Down Expand Up @@ -291,6 +292,7 @@ describe('end to end sdk integration tests', () => {

beforeEach(() => {
llmobs.registerProcessor(processor)
agent.reset() // make sure llmobs requests are cleared
})

it('does not submit the span', async () => {
Expand Down Expand Up @@ -333,7 +335,7 @@ describe('end to end sdk integration tests', () => {
})
})

const { llmobsSpans } = await getEvents()
const { llmobsSpans } = await getEvents(2)
assert.equal(llmobsSpans.length, 2)

assert.equal(llmobsSpans[0].meta.input.value, 'REDACTED')
Expand All @@ -357,7 +359,7 @@ describe('end to end sdk integration tests', () => {
llmobs.trace({ kind: 'workflow', name: 'afterAnnotationContext' }, () => {})
})

const { llmobsSpans } = await getEvents()
const { llmobsSpans } = await getEvents(6)
assert.equal(llmobsSpans.length, 6)

assert.equal(getTag(llmobsSpans[0], 'foo'), undefined)
Expand Down
22 changes: 11 additions & 11 deletions packages/dd-trace/test/llmobs/span_processor.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ describe('span processor', () => {
it('should do nothing if llmobs is not enabled', () => {
processor = new LLMObsSpanProcessor({ llmobs: { enabled: false } })

expect(() => processor.process({ span })).not.to.throw()
expect(() => processor.process(span)).not.to.throw()
})

it('should do nothing if the span is not an llm obs span', () => {
Expand Down Expand Up @@ -71,7 +71,7 @@ describe('span processor', () => {
'_ml_obs.llmobs_parent_id': '1234'
})

processor.process({ span })
processor.process(span)
const payload = writer.append.getCall(0).firstArg

expect(payload).to.deep.equal({
Expand Down Expand Up @@ -140,7 +140,7 @@ describe('span processor', () => {
'_ml_obs.meta.metadata': metadata
})

processor.process({ span })
processor.process(span)
const payload = writer.append.getCall(0).firstArg

expect(payload.meta.metadata).to.deep.equal({
Expand All @@ -167,7 +167,7 @@ describe('span processor', () => {
'_ml_obs.meta.output.documents': [{ text: 'hello', name: 'myDoc', id: '1', score: 0.6 }]
})

processor.process({ span })
processor.process(span)
const payload = writer.append.getCall(0).firstArg

expect(payload.meta.output.documents).to.deep.equal([{
Expand All @@ -194,7 +194,7 @@ describe('span processor', () => {
'_ml_obs.meta.input.documents': [{ text: 'hello', name: 'myDoc', id: '1', score: 0.6 }]
})

processor.process({ span })
processor.process(span)
const payload = writer.append.getCall(0).firstArg

expect(payload.meta.input.documents).to.deep.equal([{
Expand All @@ -221,7 +221,7 @@ describe('span processor', () => {
'_ml_obs.meta.model_name': 'myModel'
})

processor.process({ span })
processor.process(span)
const payload = writer.append.getCall(0).firstArg

expect(payload.meta.model_provider).to.equal('custom')
Expand All @@ -246,7 +246,7 @@ describe('span processor', () => {
'_ml_obs.meta.span.kind': 'llm'
})

processor.process({ span })
processor.process(span)
const payload = writer.append.getCall(0).firstArg

expect(payload.meta['error.message']).to.equal('error message')
Expand Down Expand Up @@ -274,7 +274,7 @@ describe('span processor', () => {
'_ml_obs.meta.span.kind': 'llm'
})

processor.process({ span })
processor.process(span)
const payload = writer.append.getCall(0).firstArg

expect(payload.meta['error.message']).to.equal('error message')
Expand Down Expand Up @@ -302,7 +302,7 @@ describe('span processor', () => {
'_ml_obs.name': 'mySpan'
})

processor.process({ span })
processor.process(span)
const payload = writer.append.getCall(0).firstArg

expect(payload.name).to.equal('mySpan')
Expand All @@ -324,7 +324,7 @@ describe('span processor', () => {
'_ml_obs.session_id': '1234'
})

processor.process({ span })
processor.process(span)
const payload = writer.append.getCall(0).firstArg

expect(payload.session_id).to.equal('1234')
Expand All @@ -347,7 +347,7 @@ describe('span processor', () => {
'_ml_obs.tags': { hostname: 'localhost', foo: 'bar', source: 'mySource' }
})

processor.process({ span })
processor.process(span)
const payload = writer.append.getCall(0).firstArg

expect(payload.tags).to.include('foo:bar')
Expand Down
Loading