Skip to content

Commit d4359d0

Browse files
pcmanusdjatnieks
authored andcommitted
Add support for a per-operation context (#1619)
The goal of this commit is to allow low-level code, typically at the level of file reads (say, a custom `FileChannel` implementation), to obtain context on which higher level operation they correspond to. A typical use case may for tiered storage where `FileChannel` implementations may forward reads to remote storage, and where having context on the overall operation could help, say, take prior work done to set timeout, or aggregate metrics per "user-level" operations (rather than per `FileChannel` operation). To do this, we reuse the existing `ExecutorLocals` mechanism, and simply have the top-level operation setup the proper context as an `ExecutorLocal`, allowing it to be accessed by any low-level operations operating on behalf of that operation. This is, in many way, similar to tracing, but instead of a `TraceState` that collect what happens during the operation, it is a relatively flexible notion of `OperationContext`. As of this patch, this feature is fairly barebone and mostly exists for extensions in the following sense: 1. only user reads (`ReadCommand` execution) currently sets up such a context. The code is written in such a way that adding support for other operations should be easy, but this is not done. 2. the context set by reads by default has barely any information: it merely has a `toString()` method that can roughly tell what the operation itself is, and so could have use for debugging, but not much more. Further, that context is not read by anything in this patch. However, said context are created through a "factory" and the factory class is configurable through a system property. So extension can override the factory in order to create contexts with more information/state, and they fetch/use those context where they see fit.
1 parent af414a9 commit d4359d0

File tree

11 files changed

+222
-12
lines changed

11 files changed

+222
-12
lines changed

src/java/org/apache/cassandra/concurrent/ExecutorLocals.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.util.concurrent.FastThreadLocal;
2222
import org.apache.cassandra.sensors.RequestSensors;
2323
import org.apache.cassandra.service.ClientWarn;
24+
import org.apache.cassandra.service.context.OperationContext;
2425
import org.apache.cassandra.tracing.TraceState;
2526
import org.apache.cassandra.utils.Closeable;
2627
import org.apache.cassandra.utils.WithResources;
@@ -33,7 +34,7 @@
3334
*/
3435
public class ExecutorLocals implements WithResources, Closeable
3536
{
36-
private static final ExecutorLocals none = new ExecutorLocals(null, null, null);
37+
private static final ExecutorLocals none = new ExecutorLocals(null, null, null, null);
3738
private static final FastThreadLocal<ExecutorLocals> locals = new FastThreadLocal<ExecutorLocals>()
3839
{
3940
@Override
@@ -45,22 +46,24 @@ protected ExecutorLocals initialValue()
4546

4647
public static class Impl
4748
{
48-
public static void set(TraceState traceState, ClientWarn.State clientWarnState, RequestSensors sensors)
49+
public static void set(TraceState traceState, ClientWarn.State clientWarnState, RequestSensors sensors, OperationContext operationContext)
4950
{
50-
if (traceState == null && clientWarnState == null && sensors == null) locals.set(none);
51-
else locals.set(new ExecutorLocals(traceState, clientWarnState, sensors));
51+
if (traceState == null && clientWarnState == null && sensors == null && operationContext == null) locals.set(none);
52+
else locals.set(new ExecutorLocals(traceState, clientWarnState, sensors, operationContext));
5253
}
5354
}
5455

5556
public final TraceState traceState;
5657
public final ClientWarn.State clientWarnState;
5758
public final RequestSensors sensors;
59+
public final OperationContext operationContext;
5860

59-
protected ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState, RequestSensors sensors)
61+
protected ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState, RequestSensors sensors, OperationContext operationContext)
6062
{
6163
this.traceState = traceState;
6264
this.clientWarnState = clientWarnState;
6365
this.sensors = sensors;
66+
this.operationContext = operationContext;
6467
}
6568

6669
/**
@@ -84,13 +87,13 @@ public static WithResources propagate()
8487
public static ExecutorLocals create(TraceState traceState)
8588
{
8689
ExecutorLocals current = locals.get();
87-
return current.traceState == traceState ? current : new ExecutorLocals(traceState, current.clientWarnState, current.sensors);
90+
return current.traceState == traceState ? current : new ExecutorLocals(traceState, current.clientWarnState, current.sensors, current.operationContext);
8891
}
8992

9093
public static ExecutorLocals create(RequestSensors sensors)
9194
{
9295
ExecutorLocals current = locals.get();
93-
return current.sensors == sensors ? current : new ExecutorLocals(current.traceState, current.clientWarnState, sensors);
96+
return current.sensors == sensors ? current : new ExecutorLocals(current.traceState, current.clientWarnState, sensors, current.operationContext);
9497
}
9598

9699
public static void clear()

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.cassandra.net.MessagingService;
3636
import org.apache.cassandra.sensors.SensorsFactory;
3737
import org.apache.cassandra.service.FileSystemOwnershipCheck;
38+
import org.apache.cassandra.service.context.OperationContext;
3839
import org.apache.cassandra.service.reads.range.EndpointGroupingRangeCommandIterator;
3940
import org.apache.cassandra.utils.FBUtilities;
4041
import org.apache.cassandra.utils.StorageCompatibilityMode;
@@ -574,6 +575,11 @@ public enum CassandraRelevantProperties
574575
* when the JVM terminates. Therefore, we can use such optimization and not wait unnecessarily. */
575576
NON_GRACEFUL_CLOSE("cassandra.messagingService.nonGracefulClose"),
576577
NON_GRACEFUL_SHUTDOWN("cassandra.test.messagingService.nonGracefulShutdown"),
578+
/**
579+
* Allows custom implementation of {@link OperationContext.Factory} to optionally create and configure custom
580+
* {@link OperationContext} instances.
581+
*/
582+
OPERATION_CONTEXT_FACTORY("cassandra.operation_context_factory_class"),
577583
/** for specific tests */
578584
/** This property indicates whether disable_mbean_registration is true */
579585
ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION("org.apache.cassandra.disable_mbean_registration"),

src/java/org/apache/cassandra/db/ReadExecutionController.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.cassandra.index.Index;
2828
import org.apache.cassandra.metrics.DecayingEstimatedHistogramReservoir;
2929
import org.apache.cassandra.schema.TableMetadata;
30+
import org.apache.cassandra.service.context.OperationContext;
31+
import org.apache.cassandra.service.context.OperationContextTracker;
3032
import org.apache.cassandra.tracing.Tracing;
3133
import org.apache.cassandra.utils.MonotonicClock;
3234
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -143,6 +145,8 @@ static ReadExecutionController forCommand(ReadCommand command, boolean trackRepa
143145

144146
long createdAtNanos = baseCfs.metric.topLocalReadQueryTime.isEnabled() ? clock.now() : NO_SAMPLING;
145147

148+
OperationContextTracker.start(OperationContext.FACTORY.forRead(command, baseCfs));
149+
146150
if (indexCfs == null)
147151
return new ReadExecutionController(command, baseCfs.readOrdering.start(), baseCfs.metadata(), null, null, createdAtNanos, trackRepairedStatus);
148152

@@ -176,6 +180,7 @@ static ReadExecutionController forCommand(ReadCommand command, boolean trackRepa
176180
if (indexController != null)
177181
indexController.close();
178182
}
183+
OperationContextTracker.endCurrent();
179184
throw e;
180185
}
181186
}
@@ -217,6 +222,8 @@ public void close()
217222
}
218223
}
219224

225+
OperationContextTracker.endCurrent();
226+
220227
if (createdAtNanos != NO_SAMPLING)
221228
addSample();
222229

src/java/org/apache/cassandra/sensors/RequestTracker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,6 @@ public RequestSensors get()
3838
public void set(RequestSensors sensors)
3939
{
4040
ExecutorLocals current = ExecutorLocals.current();
41-
ExecutorLocals.Impl.set(current.traceState, current.clientWarnState, sensors);
41+
ExecutorLocals.Impl.set(current.traceState, current.clientWarnState, sensors, current.operationContext);
4242
}
4343
}

src/java/org/apache/cassandra/service/ClientWarn.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public State get()
4242
public void set(State value)
4343
{
4444
ExecutorLocals current = ExecutorLocals.current();
45-
ExecutorLocals.Impl.set(current.traceState, value, current.sensors);
45+
ExecutorLocals.Impl.set(current.traceState, value, current.sensors, current.operationContext);
4646
}
4747

4848
public void warn(String text)
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.service.context;
20+
21+
import java.util.function.Supplier;
22+
import org.apache.cassandra.db.ColumnFamilyStore;
23+
import org.apache.cassandra.db.ReadCommand;
24+
25+
/**
26+
* Default implementation of {@link OperationContext}.
27+
* <p>
28+
* This default implementation is mostly only useful for debugging as the only concrete method is provices is a
29+
* {@link #toString()} method giving details on the operation the context corresponds to (though the context object
30+
* also identify the operation, so it could also theoretically be used from 2 separate place in the code to decide
31+
* if they execute as part of the same operation).
32+
*/
33+
public class DefaultOperationContext implements OperationContext
34+
{
35+
private final Supplier<String> toDebugString;
36+
37+
private DefaultOperationContext(Supplier<String> toDebugString)
38+
{
39+
this.toDebugString = toDebugString;
40+
}
41+
42+
@Override
43+
public void close()
44+
{
45+
}
46+
47+
@Override
48+
public String toString()
49+
{
50+
return String.format("[%d] %s", System.identityHashCode(this), toDebugString.get());
51+
}
52+
53+
/**
54+
* Simple default implementation of {@link OperationContext.Factory} that creates {@link DefaultOperationContext}.
55+
*/
56+
static class Factory implements OperationContext.Factory
57+
{
58+
@Override
59+
public OperationContext forRead(ReadCommand command, ColumnFamilyStore cfs)
60+
{
61+
return new DefaultOperationContext(command::toCQLString);
62+
}
63+
}
64+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.service.context;
20+
21+
import org.apache.cassandra.db.ColumnFamilyStore;
22+
import org.apache.cassandra.db.ReadCommand;
23+
import org.apache.cassandra.utils.FBUtilities;
24+
25+
import static org.apache.cassandra.config.CassandraRelevantProperties.OPERATION_CONTEXT_FACTORY;
26+
27+
/**
28+
* Represents some context about a "top-level" operation.
29+
* <p>
30+
* This interface is fairly open on purpose, as implementations for different operations could look fairly different.
31+
* But it is also open-ended as it is an extension point: the {@link #FACTORY} used to create the context instances
32+
* is configurable, and meant to allow extensions to add whatever information they need to the context.
33+
* <p>
34+
* Also note that what consistute a "top-level" operation is not strictly defined. At the time of this writing, those
35+
* context are not serialized across nodes, so "top-level" is understood as "for a node", and so correspond to
36+
* operations like "a `ReadCommand` execution on a replica".
37+
* <p>
38+
* The context of executing operation is tracked by {@link OperationContextTracker} which use the {@link ExecutorLocal}
39+
* concept to make that context available to any methods that execute as part of the operation. Basically, this is a way
40+
* to make the context available everwhere along the path of execution of the operation, without needing to pass that
41+
* context as argument of every single method that could be involved by the operation execution (which in most cases
42+
* would be <b>a lot of methods</b>).
43+
*/
44+
public interface OperationContext extends AutoCloseable
45+
{
46+
Factory FACTORY = OPERATION_CONTEXT_FACTORY.getString() == null
47+
? new DefaultOperationContext.Factory()
48+
: FBUtilities.construct(OPERATION_CONTEXT_FACTORY.getString(), "operation context factory");
49+
50+
51+
/**
52+
* Called when the operation this is a context of terminates, and thus when the context will not be used/retrieved
53+
* anymore.
54+
*/
55+
@Override
56+
void close();
57+
58+
/**
59+
* Factory used to create {@link OperationContext} instances.
60+
* <p>
61+
* The intent is that every operation that wants to set a context should have its own method in this interface, but
62+
* operations are added as needed (instead of trying to cover every possible operation upfront).
63+
* <p>
64+
* Do note however that there can only be one operation context "active" at any given time (meaning, any thread
65+
* execute can only see at most one context), so the context should be set at the higher level that make sense
66+
* (and if necessary, sub-operations can enrich the context of their parent, assuming the parent context make room
67+
* for this).
68+
*/
69+
interface Factory
70+
{
71+
OperationContext forRead(ReadCommand command, ColumnFamilyStore cfs);
72+
}
73+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.service.context;
20+
21+
import org.apache.cassandra.concurrent.ExecutorLocals;
22+
23+
public class OperationContextTracker extends ExecutorLocals.Impl
24+
{
25+
public static final OperationContextTracker instance = new OperationContextTracker();
26+
27+
private OperationContextTracker()
28+
{}
29+
30+
public OperationContext get()
31+
{
32+
return ExecutorLocals.current().operationContext;
33+
}
34+
35+
public void set(OperationContext operationContext)
36+
{
37+
ExecutorLocals current = ExecutorLocals.current();
38+
ExecutorLocals.Impl.set(current.traceState, current.clientWarnState, current.sensors, operationContext);
39+
}
40+
41+
public static void start(OperationContext context)
42+
{
43+
instance.set(context);
44+
}
45+
46+
public static void endCurrent()
47+
{
48+
OperationContext ctx = instance.get();
49+
if (ctx != null)
50+
{
51+
ctx.close();
52+
instance.set(null);
53+
}
54+
}
55+
}

src/java/org/apache/cassandra/tracing/Tracing.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ public TraceState get(TimeUUID sessionId)
277277
public void set(TraceState tls)
278278
{
279279
ExecutorLocals current = ExecutorLocals.current();
280-
ExecutorLocals.Impl.set(tls, current.clientWarnState, current.sensors);
280+
ExecutorLocals.Impl.set(tls, current.clientWarnState, current.sensors, current.operationContext);
281281
}
282282

283283
public TraceState begin(final String request, final Map<String, String> parameters)

src/java/org/apache/cassandra/utils/concurrent/Timer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.cassandra.concurrent.ExecutorLocals;
3131
import org.apache.cassandra.sensors.RequestSensors;
3232
import org.apache.cassandra.service.ClientWarn;
33+
import org.apache.cassandra.service.context.OperationContext;
3334
import org.apache.cassandra.tracing.TraceState;
3435

3536
/**
@@ -72,11 +73,12 @@ public Future<Void> onTimeout(Runnable task, long timeout, TimeUnit unit, Execut
7273
ClientWarn.State clientWarnState = executorLocals == null ? null : executorLocals.clientWarnState;
7374
TraceState traceState = executorLocals == null ? null : executorLocals.traceState;
7475
RequestSensors sensors = executorLocals == null ? null : executorLocals.sensors;
76+
OperationContext operationContext = executorLocals == null ? null : executorLocals.operationContext;
7577
AsyncPromise<Void> result = new AsyncPromise<>();
7678
Timeout handle = timer.newTimeout(ignored ->
7779
{
7880

79-
ExecutorLocals.Impl.set(traceState, clientWarnState, sensors);
81+
ExecutorLocals.Impl.set(traceState, clientWarnState, sensors, operationContext);
8082
try
8183
{
8284
task.run();

0 commit comments

Comments
 (0)