Skip to content

Commit df5eebe

Browse files
feat: send flamegraph from an unhealthy worker (#119)
1 parent e258cd8 commit df5eebe

File tree

4 files changed

+152
-76
lines changed

4 files changed

+152
-76
lines changed

plugins/alerts.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ async function alerts (app, _opts) {
4040
}
4141

4242
const timestamp = Date.now()
43+
const workerId = healthInfo.id
4344
const serviceId = healthInfo.application
4445
const healthWithTimestamp = { ...healthInfo, timestamp, service: serviceId }
4546
delete healthWithTimestamp.healthConfig // we don't need to store this
@@ -112,7 +113,7 @@ async function alerts (app, _opts) {
112113
const alert = await body.json()
113114

114115
app.sendFlamegraphs({
115-
serviceIds: [serviceId],
116+
workerIds: [workerId],
116117
alertId: alert.id
117118
}).catch(err => {
118119
app.log.error({ err }, 'Failed to send a flamegraph')

plugins/flamegraphs.js

Lines changed: 52 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -128,88 +128,98 @@ async function flamegraphs (app, _opts) {
128128
}
129129
}
130130

131-
async function getServiceFlamegraph (serviceId, profileType, attempt = 1) {
131+
async function getServiceFlamegraph (workerId, profileType, attempt = 1) {
132132
const runtime = app.watt.runtime
133133

134134
try {
135-
const profile = await runtime.sendCommandToApplication(serviceId, 'getLastProfile', { type: profileType })
135+
const profile = await runtime.sendCommandToApplication(workerId, 'getLastProfile', { type: profileType })
136136
return profile
137137
} catch (err) {
138138
if (err.code === 'PLT_PPROF_NO_PROFILE_AVAILABLE') {
139139
app.log.info(
140-
{ serviceId, attempt, maxAttempts, attemptTimeout },
140+
{ workerId, attempt, maxAttempts, attemptTimeout },
141141
'No profile available for the service. Waiting for profiling to complete.'
142142
)
143143
if (attempt <= maxAttempts) {
144144
await sleep(attemptTimeout)
145-
return getServiceFlamegraph(serviceId, profileType, attempt + 1)
145+
return getServiceFlamegraph(workerId, profileType, attempt + 1)
146146
}
147147
} else if (err.code === 'PLT_PPROF_NOT_ENOUGH_ELU') {
148-
app.log.info({ serviceId }, 'ELU low, CPU profiling not active')
148+
app.log.info({ workerId }, 'ELU low, CPU profiling not active')
149149
} else {
150-
app.log.warn({ err, serviceId }, 'Failed to get profile from service')
150+
app.log.warn({ err, workerId }, 'Failed to get profile from a worker')
151+
152+
const [serviceId, workerIndex] = workerId.split(':')
153+
if (workerIndex) {
154+
app.log.warn('Worker not available, trying to get profile from another worker')
155+
return getServiceFlamegraph(serviceId, profileType)
156+
}
151157
}
152158
}
153159
}
154160

161+
async function sendServiceFlamegraph (scalerUrl, serviceId, profile, profileType, alertId) {
162+
const podId = app.instanceId
163+
const url = `${scalerUrl}/pods/${podId}/services/${serviceId}/flamegraph`
164+
app.log.info({ serviceId, podId, profileType }, 'Sending flamegraph')
165+
166+
const query = { profileType }
167+
if (alertId) {
168+
query.alertId = alertId
169+
}
170+
171+
try {
172+
const authHeaders = await app.getAuthorizationHeader()
173+
const { statusCode, body } = await request(url, {
174+
method: 'POST',
175+
headers: {
176+
'Content-Type': 'application/octet-stream',
177+
...authHeaders
178+
},
179+
query,
180+
body: profile
181+
})
182+
183+
if (statusCode !== 200) {
184+
const error = await body.text()
185+
app.log.error({ error }, 'Failed to send flamegraph')
186+
throw new Error(`Failed to send flamegraph: ${error}`)
187+
}
188+
} catch (err) {
189+
app.log.warn({ err, serviceId, podId }, 'Failed to send flamegraph from service')
190+
}
191+
}
192+
155193
app.sendFlamegraphs = async (options = {}) => {
156194
if (isFlamegraphsDisabled) {
157195
app.log.info('PLT_DISABLE_FLAMEGRAPHS is set, flamegraphs are disabled')
158196
return
159197
}
160198

161-
let { serviceIds, alertId, profileType = 'cpu' } = options
199+
let { workerIds, alertId, profileType = 'cpu' } = options
162200

163201
const scalerUrl = app.instanceConfig?.iccServices?.scaler?.url
164202
if (!scalerUrl) {
165203
app.log.error('No scaler URL found in ICC services, cannot send flamegraph')
166204
throw new Error('No scaler URL found in ICC services, cannot send flamegraph')
167205
}
168206

169-
const podId = app.instanceId
170207
const runtime = app.watt.runtime
171208

172-
if (!serviceIds) {
209+
if (!workerIds) {
173210
const { applications } = await runtime.getApplications()
174-
serviceIds = applications.map(app => app.id)
211+
workerIds = applications.map(app => app.id)
175212
}
176213

177-
const authHeaders = await app.getAuthorizationHeader()
178-
179-
const uploadPromises = serviceIds.map(async (serviceId) => {
180-
const profile = await getServiceFlamegraph(serviceId, profileType)
214+
const uploadPromises = workerIds.map(async (workerId) => {
215+
const profile = await getServiceFlamegraph(workerId, profileType)
181216
if (!profile || !(profile instanceof Uint8Array)) {
182-
app.log.error({ serviceId }, 'Failed to get profile from service')
217+
app.log.error({ workerId }, 'Failed to get profile from service')
183218
return
184219
}
185220

186-
const url = `${scalerUrl}/pods/${podId}/services/${serviceId}/flamegraph`
187-
app.log.info({ serviceId, podId, profileType }, 'Sending flamegraph')
188-
189-
const query = { profileType }
190-
if (alertId) {
191-
query.alertId = alertId
192-
}
193-
194-
try {
195-
const { statusCode, body } = await request(url, {
196-
method: 'POST',
197-
headers: {
198-
'Content-Type': 'application/octet-stream',
199-
...authHeaders
200-
},
201-
query,
202-
body: profile
203-
})
204-
205-
if (statusCode !== 200) {
206-
const error = await body.text()
207-
app.log.error({ error }, 'Failed to send flamegraph')
208-
throw new Error(`Failed to send flamegraph: ${error}`)
209-
}
210-
} catch (err) {
211-
app.log.warn({ err, serviceId, podId }, 'Failed to send flamegraph from service')
212-
}
221+
const serviceId = workerId.split(':')[0]
222+
await sendServiceFlamegraph(scalerUrl, serviceId, profile, profileType, alertId)
213223
})
214224

215225
await Promise.all(uploadPromises)

plugins/health-signals.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ async function healthSignals (app, _opts) {
8080
}
8181

8282
const {
83+
id: workerId,
8384
application: serviceId,
8485
currentHealth,
8586
healthSignals
@@ -125,13 +126,13 @@ async function healthSignals (app, _opts) {
125126
}
126127

127128
if (healthSignals.length > 0) {
128-
await sendHealthSignalsWithTimeout(serviceId, healthSignals)
129+
await sendHealthSignalsWithTimeout(serviceId, workerId, healthSignals)
129130
}
130131
})
131132
}
132133
app.setupHealthSignals = setupHealthSignals
133134

134-
async function sendHealthSignalsWithTimeout (serviceId, signals) {
135+
async function sendHealthSignalsWithTimeout (serviceId, workerId, signals) {
135136
signalsCaches[serviceId] ??= new HealthSignalsCache()
136137
servicesSendingStatuses[serviceId] ??= false
137138

@@ -148,15 +149,15 @@ async function healthSignals (app, _opts) {
148149

149150
try {
150151
const signals = signalsCache.getAll()
151-
await sendHealthSignals(serviceId, signals, metrics)
152+
await sendHealthSignals(serviceId, workerId, signals, metrics)
152153
} catch (err) {
153154
app.log.error({ err }, 'Failed to send health signals to scaler')
154155
}
155156
}, 5000).unref()
156157
}
157158
}
158159

159-
async function sendHealthSignals (serviceId, signals, metrics) {
160+
async function sendHealthSignals (serviceId, workerId, signals, metrics) {
160161
const scalerUrl = app.instanceConfig?.iccServices?.scaler?.url
161162
const applicationId = app.instanceConfig?.applicationId
162163
const authHeaders = await app.getAuthorizationHeader()
@@ -186,6 +187,7 @@ async function healthSignals (app, _opts) {
186187

187188
app.sendFlamegraphs({
188189
serviceIds: [serviceId],
190+
workerIds: [workerId],
189191
alertId: alert.id
190192
}).catch(err => {
191193
app.log.error({ err }, 'Failed to send a flamegraph')

0 commit comments

Comments
 (0)