Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/concurrent/ExecutorLocal.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
package org.apache.cassandra.concurrent;

import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.context.OperationContextTracker;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.sensors.RequestTracker;

public interface ExecutorLocal<T>
{
ExecutorLocal[] all = { Tracing.instance, ClientWarn.instance, RequestTracker.instance };
ExecutorLocal[] all = { Tracing.instance, ClientWarn.instance, RequestTracker.instance, OperationContextTracker.instance };

/**
* This is called when scheduling the task, and also before calling {@link #set(Object)} when running on a
Expand Down
24 changes: 16 additions & 8 deletions src/java/org/apache/cassandra/concurrent/ExecutorLocals.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.context.OperationContext;
import org.apache.cassandra.service.context.OperationContextTracker;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;

Expand All @@ -37,21 +39,24 @@ public class ExecutorLocals
private static final ExecutorLocal<TraceState> tracing = Tracing.instance;
private static final ExecutorLocal<ClientWarn.State> clientWarn = ClientWarn.instance;
private static final ExecutorLocal<RequestSensors> requestTracker = RequestTracker.instance;
private static final ExecutorLocal<OperationContext> operationContextTracker = OperationContextTracker.instance;

public final TraceState traceState;
public final ClientWarn.State clientWarnState;
public final RequestSensors sensors;
public final OperationContext operationContext;

private ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState, RequestSensors sensors)
private ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState, RequestSensors sensors, OperationContext operationContext)
{
this.traceState = traceState;
this.clientWarnState = clientWarnState;
this.sensors = sensors;
this.operationContext = operationContext;
}

static
{
assert Arrays.equals(ExecutorLocal.all, new ExecutorLocal[]{ tracing, clientWarn, requestTracker })
assert Arrays.equals(ExecutorLocal.all, new ExecutorLocal[]{ tracing, clientWarn, requestTracker, operationContextTracker })
: "ExecutorLocals has not been updated to reflect new ExecutorLocal.all";
}

Expand All @@ -68,29 +73,32 @@ public static ExecutorLocals create()
TraceState traceState = tracing.get();
ClientWarn.State clientWarnState = clientWarn.get();
RequestSensors sensors = requestTracker.get();
if (traceState == null && clientWarnState == null && sensors == null)
OperationContext operationContext = operationContextTracker.get();
if (traceState == null && clientWarnState == null && sensors == null && operationContext == null)
return null;
else
return new ExecutorLocals(traceState, clientWarnState, sensors);
return new ExecutorLocals(traceState, clientWarnState, sensors, operationContext);
}

public static ExecutorLocals create(TraceState traceState, ClientWarn.State clientWarnState, RequestSensors sensors)
public static ExecutorLocals create(TraceState traceState, ClientWarn.State clientWarnState, RequestSensors sensors, OperationContext operationContext)
{
return new ExecutorLocals(traceState, clientWarnState, sensors);
return new ExecutorLocals(traceState, clientWarnState, sensors, operationContext);
}

public static ExecutorLocals create(TraceState traceState)
{
ClientWarn.State clientWarnState = clientWarn.get();
RequestSensors sensors = requestTracker.get();
return new ExecutorLocals(traceState, clientWarnState, sensors);
OperationContext operationContext = operationContextTracker.get();
return new ExecutorLocals(traceState, clientWarnState, sensors, operationContext);
}

public static ExecutorLocals create(RequestSensors sensors)
{
TraceState traceState = tracing.get();
ClientWarn.State clientWarnState = clientWarn.get();
return new ExecutorLocals(traceState, clientWarnState, sensors);
OperationContext operationContext = operationContextTracker.get();
return new ExecutorLocals(traceState, clientWarnState, sensors, operationContext);
}

public static void set(ExecutorLocals locals)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.sensors.SensorsFactory;
import org.apache.cassandra.service.context.OperationContext;
import org.apache.cassandra.service.reads.range.EndpointGroupingRangeCommandIterator;

/** A class that extracts system properties for the cassandra node it runs within. */
Expand Down Expand Up @@ -600,7 +601,13 @@ public enum CassandraRelevantProperties
* Do not try to calculate optimal streaming candidates. This can take a lot of time in some configs specially
* with vnodes.
*/
SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION("cassandra.skip_optimal_streaming_candidates_calculation", "false");
SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION("cassandra.skip_optimal_streaming_candidates_calculation", "false"),

/**
* Allows custom implementation of {@link OperationContext.Factory} to optionally create and configure custom
* {@link OperationContext} instances.
*/
OPERATION_CONTEXT_FACTORY("cassandra.operation_context_factory_class");

CassandraRelevantProperties(String key, String defaultVal)
{
Expand Down
7 changes: 7 additions & 0 deletions src/java/org/apache/cassandra/db/ReadExecutionController.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.cassandra.index.Index;
import org.apache.cassandra.metrics.DecayingEstimatedHistogramReservoir;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.context.OperationContext;
import org.apache.cassandra.service.context.OperationContextTracker;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.concurrent.OpOrder;
Expand Down Expand Up @@ -143,6 +145,8 @@ static ReadExecutionController forCommand(ReadCommand command, boolean trackRepa

long createdAtNanos = baseCfs.metric.topLocalReadQueryTime.isEnabled() ? clock.now() : NO_SAMPLING;

OperationContextTracker.start(OperationContext.FACTORY.forRead(command, baseCfs));

if (indexCfs == null)
return new ReadExecutionController(command, baseCfs.readOrdering.start(), baseCfs.metadata(), null, null, createdAtNanos, trackRepairedStatus);

Expand Down Expand Up @@ -176,6 +180,7 @@ static ReadExecutionController forCommand(ReadCommand command, boolean trackRepa
if (indexController != null)
indexController.close();
}
OperationContextTracker.endCurrent();
throw e;
}
}
Expand Down Expand Up @@ -217,6 +222,8 @@ public void close()
}
}

OperationContextTracker.endCurrent();

if (createdAtNanos != NO_SAMPLING)
addSample();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.service.context;

import java.util.function.Supplier;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ReadCommand;

/**
* Default implementation of {@link OperationContext}.
* <p>
* This default implementation is mostly only useful for debugging as the only concrete method is provices is a
* {@link #toString()} method giving details on the operation the context corresponds to (though the context object
* also identify the operation, so it could also theoretically be used from 2 separate place in the code to decide
* if they execute as part of the same operation).
*/
public class DefaultOperationContext implements OperationContext
{
private final Supplier<String> toDebugString;

private DefaultOperationContext(Supplier<String> toDebugString)
{
this.toDebugString = toDebugString;
}

@Override
public void close()
{
}

@Override
public String toString()
{
return String.format("[%d] %s", System.identityHashCode(this), toDebugString.get());
}

/**
* Simple default implementation of {@link OperationContext.Factory} that creates {@link DefaultOperationContext}.
*/
static class Factory implements OperationContext.Factory
{
@Override
public OperationContext forRead(ReadCommand command, ColumnFamilyStore cfs)
{
return new DefaultOperationContext(command::toCQLString);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.service.context;

import org.apache.cassandra.concurrent.ExecutorLocal;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.utils.FBUtilities;

import static org.apache.cassandra.config.CassandraRelevantProperties.OPERATION_CONTEXT_FACTORY;

/**
* Represents some context about a "top-level" operation.
* <p>
* This interface is fairly open on purpose, as implementations for different operations could look fairly different.
* But it is also open-ended as it is an extension point: the {@link #FACTORY} used to create the context instances
* is configurable, and meant to allow extensions to add whatever information they need to the context.
* <p>
* Also note that what consistute a "top-level" operation is not strictly defined. At the time of this writing, those
* context are not serialized across nodes, so "top-level" is understood as "for a node", and so correspond to
* operations like "a `ReadCommand` execution on a replica".
* <p>
* The context of executing operation is tracked by {@link OperationContextTracker} which use the {@link ExecutorLocal}
* concept to make that context available to any methods that execute as part of the operation. Basically, this is a way
* to make the context available everwhere along the path of execution of the operation, without needing to pass that
* context as argument of every single method that could be involved by the operation execution (which in most cases
* would be <b>a lot of methods</b>).
*/
public interface OperationContext extends AutoCloseable
{
Factory FACTORY = OPERATION_CONTEXT_FACTORY.getString() == null
? new DefaultOperationContext.Factory()
: FBUtilities.construct(OPERATION_CONTEXT_FACTORY.getString(), "operation context factory");


/**
* Called when the operation this is a context of terminates, and thus when the context will not be used/retrieved
* anymore.
*/
@Override
void close();

/**
* Factory used to create {@link OperationContext} instances.
* <p>
* The intent is that every operation that wants to set a context should have its own method in this interface, but
* operations are added as needed (instead of trying to cover every possible operation upfront).
* <p>
* Do note however that there can only be one operation context "active" at any given time (meaning, any thread
* execute can only see at most one context), so the context should be set at the higher level that make sense
* (and if necessary, sub-operations can enrich the context of their parent, assuming the parent context make room
* for this).
*/
interface Factory
{
OperationContext forRead(ReadCommand command, ColumnFamilyStore cfs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.service.context;

import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.concurrent.ExecutorLocal;

public class OperationContextTracker implements ExecutorLocal<OperationContext>
{
public static final OperationContextTracker instance = new OperationContextTracker();
private final FastThreadLocal<OperationContext> current = new FastThreadLocal<>();

@Override
public OperationContext get()
{
return current.get();
}

@Override
public void set(OperationContext value)
{
current.set(value);
}

public static void start(OperationContext context)
{
instance.set(context);
}

public static void endCurrent()
{
OperationContext ctx = instance.get();
if (ctx != null)
{
ctx.close();
instance.current.remove();
}
}
}
8 changes: 4 additions & 4 deletions src/java/org/apache/cassandra/utils/concurrent/Timer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.cassandra.concurrent.ExecutorLocals;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.context.OperationContext;
import org.apache.cassandra.tracing.TraceState;

/**
Expand Down Expand Up @@ -70,13 +71,12 @@ public Future<Void> onTimeout(Runnable task, long timeout, TimeUnit unit)
*/
public Future<Void> onTimeout(Runnable task, long timeout, TimeUnit unit, ExecutorLocals executorLocals)
{
ClientWarn.State clientWarnState = executorLocals == null ? null : executorLocals.clientWarnState;
TraceState traceState = executorLocals == null ? null : executorLocals.traceState;
RequestSensors sensors = executorLocals == null ? null : executorLocals.sensors;

CompletableFuture<Void> result = new CompletableFuture<>();
Timeout handle = timer.newTimeout(ignored ->
{
ExecutorLocals.set(ExecutorLocals.create(traceState, clientWarnState, sensors));
if (executorLocals != null)
ExecutorLocals.set(executorLocals);
try
{
task.run();
Expand Down