Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/que
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ if options.metrics_port
port: options.metrics_port,
)

health_check = ->(_) { [200, {}, ["healthy"]] }
health_check = Que::Middleware::WorkerHealthCheck.new(worker_group)

if defined?(Prometheus::MemoryStats)
Prometheus::MemoryStats.
Expand Down
1 change: 1 addition & 0 deletions lib/que.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
require_relative "que/worker_group"
require_relative "que/middleware/worker_collector"
require_relative "que/middleware/queue_collector"
require_relative "que/middleware/worker_health_check"

module Que
begin
Expand Down
26 changes: 26 additions & 0 deletions lib/que/middleware/worker_health_check.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

module Que
module Middleware
# Rack application that reflects the health of all workers in the worker
# group. Returns 503 if any worker has encountered a postgres error on its
# last work cycle, 200 otherwise.
#
# A worker in postgres_error state is sleeping and retrying, but if that
# state persists across Kubernetes liveness probe checks the pod will be
# marked unhealthy and restarted with a fresh database connection.
class WorkerHealthCheck
def initialize(worker_group)
@worker_group = worker_group
end

def call(_env)
if @worker_group.workers.all?(&:healthy?)
[200, { "Content-Type" => "text/plain" }, ["healthy"]]
else
[503, { "Content-Type" => "text/plain" }, ["unhealthy"]]
end
end
end
end
end
7 changes: 6 additions & 1 deletion lib/que/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def initialize(
@stop = false # instruct worker to stop
@stopped = false # mark worker as having stopped
@current_running_job = nil
@last_work_result = nil
@locker = Locker.new(
queue: queue,
cursor_expiry: lock_cursor_expiry,
Expand All @@ -145,7 +146,7 @@ def work_loop

@tracer.trace(RunningSecondsTotal, queue: @queue, primary_queue: @queue) do
loop do
case work
case @last_work_result = work
when :postgres_error
Que.logger&.info(event: "que.postgres_error", wake_interval: @wake_interval)
@tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do
Expand Down Expand Up @@ -277,6 +278,10 @@ def stopped?
@stopped
end

def healthy?
@last_work_result != :postgres_error
end

def collect_metrics
@tracer.collect
end
Expand Down
72 changes: 72 additions & 0 deletions spec/lib/que/middleware/worker_health_check_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# frozen_string_literal: true

require "spec_helper"

RSpec.describe Que::Middleware::WorkerHealthCheck do
subject(:health_check) { described_class.new(worker_group) }

let(:worker_group) { instance_double(Que::WorkerGroup, workers: workers) }

describe "#call" do
context "when all workers are healthy" do
let(:workers) do
[
instance_double(Que::Worker, healthy?: true),
instance_double(Que::Worker, healthy?: true),
]
end

it "returns 200" do
status, = health_check.call({})
expect(status).to eq(200)
end

it "returns healthy in the body" do
_, _, body = health_check.call({})
expect(body).to eq(["healthy"])
end
end

context "when one worker is in a postgres_error state" do
let(:workers) do
[
instance_double(Que::Worker, healthy?: true),
instance_double(Que::Worker, healthy?: false),
]
end

it "returns 503" do
status, = health_check.call({})
expect(status).to eq(503)
end

it "returns unhealthy in the body" do
_, _, body = health_check.call({})
expect(body).to eq(["unhealthy"])
end
end

context "when all workers are in a postgres_error state" do
let(:workers) do
[
instance_double(Que::Worker, healthy?: false),
instance_double(Que::Worker, healthy?: false),
]
end

it "returns 503" do
status, = health_check.call({})
expect(status).to eq(503)
end
end

context "when there are no workers" do
let(:workers) { [] }

it "returns 200" do
status, = health_check.call({})
expect(status).to eq(200)
end
end
end
end
38 changes: 38 additions & 0 deletions spec/lib/que/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,42 @@
end
end
end

describe "#healthy?" do
subject(:worker) { described_class.new }

it "is true before the worker has done any work" do
expect(worker).to be_healthy
end

it "is true when no job is found" do
worker.work
expect(worker).to be_healthy
end

it "is true after successfully working a job" do
FakeJob.enqueue(1)
worker.work
expect(worker).to be_healthy
end

it "is false after a postgres error" do
FakeJob.enqueue(1)
expect(Que).
to receive(:execute).with(:lock_job, ["default", 0]).and_raise(PG::Error)
worker.work
expect(worker).to_not be_healthy
end

it "recovers once work succeeds after a postgres error" do
FakeJob.enqueue(1)
expect(Que).
to receive(:execute).with(:lock_job, ["default", 0]).and_raise(PG::Error)
worker.work
expect(worker).to_not be_healthy

worker.work # no jobs queued, returns :job_not_found
expect(worker).to be_healthy
end
end
end
53 changes: 53 additions & 0 deletions spec/smoke_tests/worker_health_check_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# frozen_string_literal: true

require "English"
require "net/http"

RSpec.describe "Worker health check", :smoke_test do # rubocop:disable RSpec/DescribeClass
# Use a short wake interval so workers complete at least one cycle before we
# probe, without having to wait the default 5 seconds.
let(:wake_interval) { 0.1 }
let(:metrics_port) { 8081 }

def spawn_que(task_file)
Process.spawn(
"bundle exec bin/que #{task_file} " \
"--metrics-port=#{metrics_port} " \
"--wake-interval=#{wake_interval}",
)
end

def health_check_response
Net::HTTP.get_response(URI("http://0.0.0.0:#{metrics_port}/"))
end

context "when workers are healthy" do
it "returns 200" do
pid = spawn_que("./tasks/smoke_test.rb")
sleep 3

response = health_check_response

expect(response.code).to eq("200")
expect(response.body).to eq("healthy")
ensure
Process.kill("INT", pid)
Process.wait(pid)
end
end

context "when workers have a persistent postgres error" do
it "returns 503" do
pid = spawn_que("./tasks/smoke_test_unhealthy.rb")
sleep 3

response = health_check_response

expect(response.code).to eq("503")
expect(response.body).to eq("unhealthy")
ensure
Process.kill("INT", pid)
Process.wait(pid)
end
end
end
26 changes: 26 additions & 0 deletions tasks/smoke_test_unhealthy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

require "active_record"
require_relative "../lib/que"

ActiveRecord::Base.establish_connection(
adapter: "postgresql",
host: ENV.fetch("PGHOST", "localhost"),
user: ENV.fetch("PGUSER", "postgres"),
password: ENV.fetch("PGPASSWORD", ""),
database: ENV.fetch("PGDATABASE", "que-test"),
)

Que.connection = ActiveRecord
Que.migrate!

# Replace the adapter so every work attempt raises PG::Error, simulating a
# persistent bad connection. Workers will always return :postgres_error,
# so the health check should always return 503.
class AlwaysFailingAdapter < Que::Adapters::Base
def checkout
raise PG::Error, "simulated persistent postgres error"
end
end

Que.adapter = AlwaysFailingAdapter.new
Loading