diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorLocal.java b/src/java/org/apache/cassandra/concurrent/ExecutorLocal.java index 53d48cc8872f..d88d3d4defce 100644 --- a/src/java/org/apache/cassandra/concurrent/ExecutorLocal.java +++ b/src/java/org/apache/cassandra/concurrent/ExecutorLocal.java @@ -19,12 +19,13 @@ package org.apache.cassandra.concurrent; import org.apache.cassandra.service.ClientWarn; +import org.apache.cassandra.service.context.OperationContextTracker; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.sensors.RequestTracker; public interface ExecutorLocal { - ExecutorLocal[] all = { Tracing.instance, ClientWarn.instance, RequestTracker.instance }; + ExecutorLocal[] all = { Tracing.instance, ClientWarn.instance, RequestTracker.instance, OperationContextTracker.instance }; /** * This is called when scheduling the task, and also before calling {@link #set(Object)} when running on a diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java b/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java index 29cc662ef73f..5ebcccddf0f2 100644 --- a/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java +++ b/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java @@ -23,6 +23,8 @@ import org.apache.cassandra.sensors.RequestSensors; import org.apache.cassandra.sensors.RequestTracker; import org.apache.cassandra.service.ClientWarn; +import org.apache.cassandra.service.context.OperationContext; +import org.apache.cassandra.service.context.OperationContextTracker; import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; @@ -37,21 +39,24 @@ public class ExecutorLocals private static final ExecutorLocal tracing = Tracing.instance; private static final ExecutorLocal clientWarn = ClientWarn.instance; private static final ExecutorLocal requestTracker = RequestTracker.instance; + private static final ExecutorLocal operationContextTracker = OperationContextTracker.instance; public final TraceState traceState; public final ClientWarn.State clientWarnState; public final RequestSensors sensors; + public final OperationContext operationContext; - private ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState, RequestSensors sensors) + private ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState, RequestSensors sensors, OperationContext operationContext) { this.traceState = traceState; this.clientWarnState = clientWarnState; this.sensors = sensors; + this.operationContext = operationContext; } static { - assert Arrays.equals(ExecutorLocal.all, new ExecutorLocal[]{ tracing, clientWarn, requestTracker }) + assert Arrays.equals(ExecutorLocal.all, new ExecutorLocal[]{ tracing, clientWarn, requestTracker, operationContextTracker }) : "ExecutorLocals has not been updated to reflect new ExecutorLocal.all"; } @@ -68,29 +73,32 @@ public static ExecutorLocals create() TraceState traceState = tracing.get(); ClientWarn.State clientWarnState = clientWarn.get(); RequestSensors sensors = requestTracker.get(); - if (traceState == null && clientWarnState == null && sensors == null) + OperationContext operationContext = operationContextTracker.get(); + if (traceState == null && clientWarnState == null && sensors == null && operationContext == null) return null; else - return new ExecutorLocals(traceState, clientWarnState, sensors); + return new ExecutorLocals(traceState, clientWarnState, sensors, operationContext); } - public static ExecutorLocals create(TraceState traceState, ClientWarn.State clientWarnState, RequestSensors sensors) + public static ExecutorLocals create(TraceState traceState, ClientWarn.State clientWarnState, RequestSensors sensors, OperationContext operationContext) { - return new ExecutorLocals(traceState, clientWarnState, sensors); + return new ExecutorLocals(traceState, clientWarnState, sensors, operationContext); } public static ExecutorLocals create(TraceState traceState) { ClientWarn.State clientWarnState = clientWarn.get(); RequestSensors sensors = requestTracker.get(); - return new ExecutorLocals(traceState, clientWarnState, sensors); + OperationContext operationContext = operationContextTracker.get(); + return new ExecutorLocals(traceState, clientWarnState, sensors, operationContext); } public static ExecutorLocals create(RequestSensors sensors) { TraceState traceState = tracing.get(); ClientWarn.State clientWarnState = clientWarn.get(); - return new ExecutorLocals(traceState, clientWarnState, sensors); + OperationContext operationContext = operationContextTracker.get(); + return new ExecutorLocals(traceState, clientWarnState, sensors, operationContext); } public static void set(ExecutorLocals locals) diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 5d30723d656f..921370ba530c 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -27,6 +27,7 @@ import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.sensors.SensorsFactory; +import org.apache.cassandra.service.context.OperationContext; import org.apache.cassandra.service.reads.range.EndpointGroupingRangeCommandIterator; /** A class that extracts system properties for the cassandra node it runs within. */ @@ -600,7 +601,13 @@ public enum CassandraRelevantProperties * Do not try to calculate optimal streaming candidates. This can take a lot of time in some configs specially * with vnodes. */ - SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION("cassandra.skip_optimal_streaming_candidates_calculation", "false"); + SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION("cassandra.skip_optimal_streaming_candidates_calculation", "false"), + + /** + * Allows custom implementation of {@link OperationContext.Factory} to optionally create and configure custom + * {@link OperationContext} instances. + */ + OPERATION_CONTEXT_FACTORY("cassandra.operation_context_factory_class"); CassandraRelevantProperties(String key, String defaultVal) { diff --git a/src/java/org/apache/cassandra/db/ReadExecutionController.java b/src/java/org/apache/cassandra/db/ReadExecutionController.java index 753e9f9138ad..9968942da038 100644 --- a/src/java/org/apache/cassandra/db/ReadExecutionController.java +++ b/src/java/org/apache/cassandra/db/ReadExecutionController.java @@ -27,6 +27,8 @@ import org.apache.cassandra.index.Index; import org.apache.cassandra.metrics.DecayingEstimatedHistogramReservoir; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.context.OperationContext; +import org.apache.cassandra.service.context.OperationContextTracker; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.MonotonicClock; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -143,6 +145,8 @@ static ReadExecutionController forCommand(ReadCommand command, boolean trackRepa long createdAtNanos = baseCfs.metric.topLocalReadQueryTime.isEnabled() ? clock.now() : NO_SAMPLING; + OperationContextTracker.start(OperationContext.FACTORY.forRead(command, baseCfs)); + if (indexCfs == null) return new ReadExecutionController(command, baseCfs.readOrdering.start(), baseCfs.metadata(), null, null, createdAtNanos, trackRepairedStatus); @@ -176,6 +180,7 @@ static ReadExecutionController forCommand(ReadCommand command, boolean trackRepa if (indexController != null) indexController.close(); } + OperationContextTracker.endCurrent(); throw e; } } @@ -217,6 +222,8 @@ public void close() } } + OperationContextTracker.endCurrent(); + if (createdAtNanos != NO_SAMPLING) addSample(); diff --git a/src/java/org/apache/cassandra/service/context/DefaultOperationContext.java b/src/java/org/apache/cassandra/service/context/DefaultOperationContext.java new file mode 100644 index 000000000000..f22d8e9dd6fc --- /dev/null +++ b/src/java/org/apache/cassandra/service/context/DefaultOperationContext.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.context; + +import java.util.function.Supplier; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ReadCommand; + +/** + * Default implementation of {@link OperationContext}. + *

+ * This default implementation is mostly only useful for debugging as the only concrete method is provices is a + * {@link #toString()} method giving details on the operation the context corresponds to (though the context object + * also identify the operation, so it could also theoretically be used from 2 separate place in the code to decide + * if they execute as part of the same operation). + */ +public class DefaultOperationContext implements OperationContext +{ + private final Supplier toDebugString; + + private DefaultOperationContext(Supplier toDebugString) + { + this.toDebugString = toDebugString; + } + + @Override + public void close() + { + } + + @Override + public String toString() + { + return String.format("[%d] %s", System.identityHashCode(this), toDebugString.get()); + } + + /** + * Simple default implementation of {@link OperationContext.Factory} that creates {@link DefaultOperationContext}. + */ + static class Factory implements OperationContext.Factory + { + @Override + public OperationContext forRead(ReadCommand command, ColumnFamilyStore cfs) + { + return new DefaultOperationContext(command::toCQLString); + } + } +} diff --git a/src/java/org/apache/cassandra/service/context/OperationContext.java b/src/java/org/apache/cassandra/service/context/OperationContext.java new file mode 100644 index 000000000000..8ebbbd3a5146 --- /dev/null +++ b/src/java/org/apache/cassandra/service/context/OperationContext.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.context; + +import org.apache.cassandra.concurrent.ExecutorLocal; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.config.CassandraRelevantProperties.OPERATION_CONTEXT_FACTORY; + +/** + * Represents some context about a "top-level" operation. + *

+ * This interface is fairly open on purpose, as implementations for different operations could look fairly different. + * But it is also open-ended as it is an extension point: the {@link #FACTORY} used to create the context instances + * is configurable, and meant to allow extensions to add whatever information they need to the context. + *

+ * Also note that what consistute a "top-level" operation is not strictly defined. At the time of this writing, those + * context are not serialized across nodes, so "top-level" is understood as "for a node", and so correspond to + * operations like "a `ReadCommand` execution on a replica". + *

+ * The context of executing operation is tracked by {@link OperationContextTracker} which use the {@link ExecutorLocal} + * concept to make that context available to any methods that execute as part of the operation. Basically, this is a way + * to make the context available everwhere along the path of execution of the operation, without needing to pass that + * context as argument of every single method that could be involved by the operation execution (which in most cases + * would be a lot of methods). +*/ +public interface OperationContext extends AutoCloseable +{ + Factory FACTORY = OPERATION_CONTEXT_FACTORY.getString() == null + ? new DefaultOperationContext.Factory() + : FBUtilities.construct(OPERATION_CONTEXT_FACTORY.getString(), "operation context factory"); + + + /** + * Called when the operation this is a context of terminates, and thus when the context will not be used/retrieved + * anymore. + */ + @Override + void close(); + + /** + * Factory used to create {@link OperationContext} instances. + *

+ * The intent is that every operation that wants to set a context should have its own method in this interface, but + * operations are added as needed (instead of trying to cover every possible operation upfront). + *

+ * Do note however that there can only be one operation context "active" at any given time (meaning, any thread + * execute can only see at most one context), so the context should be set at the higher level that make sense + * (and if necessary, sub-operations can enrich the context of their parent, assuming the parent context make room + * for this). + */ + interface Factory + { + OperationContext forRead(ReadCommand command, ColumnFamilyStore cfs); + } +} diff --git a/src/java/org/apache/cassandra/service/context/OperationContextTracker.java b/src/java/org/apache/cassandra/service/context/OperationContextTracker.java new file mode 100644 index 000000000000..b992eeac78ec --- /dev/null +++ b/src/java/org/apache/cassandra/service/context/OperationContextTracker.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.context; + +import io.netty.util.concurrent.FastThreadLocal; +import org.apache.cassandra.concurrent.ExecutorLocal; + +public class OperationContextTracker implements ExecutorLocal +{ + public static final OperationContextTracker instance = new OperationContextTracker(); + private final FastThreadLocal current = new FastThreadLocal<>(); + + @Override + public OperationContext get() + { + return current.get(); + } + + @Override + public void set(OperationContext value) + { + current.set(value); + } + + public static void start(OperationContext context) + { + instance.set(context); + } + + public static void endCurrent() + { + OperationContext ctx = instance.get(); + if (ctx != null) + { + ctx.close(); + instance.current.remove(); + } + } +} diff --git a/src/java/org/apache/cassandra/utils/concurrent/Timer.java b/src/java/org/apache/cassandra/utils/concurrent/Timer.java index 164f6be7c60e..81439b56bc03 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Timer.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Timer.java @@ -31,6 +31,7 @@ import org.apache.cassandra.concurrent.ExecutorLocals; import org.apache.cassandra.sensors.RequestSensors; import org.apache.cassandra.service.ClientWarn; +import org.apache.cassandra.service.context.OperationContext; import org.apache.cassandra.tracing.TraceState; /** @@ -70,13 +71,12 @@ public Future onTimeout(Runnable task, long timeout, TimeUnit unit) */ public Future onTimeout(Runnable task, long timeout, TimeUnit unit, ExecutorLocals executorLocals) { - ClientWarn.State clientWarnState = executorLocals == null ? null : executorLocals.clientWarnState; - TraceState traceState = executorLocals == null ? null : executorLocals.traceState; - RequestSensors sensors = executorLocals == null ? null : executorLocals.sensors; + CompletableFuture result = new CompletableFuture<>(); Timeout handle = timer.newTimeout(ignored -> { - ExecutorLocals.set(ExecutorLocals.create(traceState, clientWarnState, sensors)); + if (executorLocals != null) + ExecutorLocals.set(executorLocals); try { task.run();