Skip to content

Commit 9f1eba4

Browse files
committed
Create Cluster TypeDBOptions when connecting to Cluster
1 parent b62d67e commit 9f1eba4

File tree

5 files changed

+17
-13
lines changed

5 files changed

+17
-13
lines changed

state/connection/ClientState.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ class ClientState constructor(private val notificationMgr: NotificationManager)
8282
private var runningClosingCommands = AtomicIntegerState(0)
8383
private var databaseListRefreshedTime = System.currentTimeMillis()
8484
private val asyncDepth = AtomicInteger(0)
85+
internal val isCluster get() = _client is TypeDBClient.Cluster
8586

8687
private val coroutineScope = CoroutineScope(EmptyCoroutineContext)
8788

state/connection/QueryRunner.kt

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
package com.vaticle.typedb.studio.state.connection
2020

21-
import com.vaticle.typedb.client.api.TypeDBOptions
22-
import com.vaticle.typedb.client.api.TypeDBTransaction
2321
import com.vaticle.typedb.client.api.answer.ConceptMap
2422
import com.vaticle.typedb.client.api.answer.ConceptMapGroup
2523
import com.vaticle.typedb.client.api.answer.NumericGroup
@@ -52,9 +50,8 @@ import kotlinx.coroutines.launch
5250

5351
@OptIn(ExperimentalTime::class)
5452
class QueryRunner constructor(
55-
val transaction: TypeDBTransaction, // TODO: restrict in the future, TypeDB 3.0 answers return complete info
53+
private val transactionState: TransactionState, // TODO: restrict in the future, TypeDB 3.0 answers return complete info
5654
private val queries: String,
57-
private val hasStopSignal: AtomicBoolean,
5855
private val onComplete: () -> Unit
5956
) {
6057

@@ -118,6 +115,8 @@ class QueryRunner constructor(
118115
private val lastResponse = AtomicLong(0)
119116
private val consumerLatch = CountDownLatch(1)
120117
private val coroutineScope = CoroutineScope(EmptyCoroutineContext)
118+
private val hasStopSignal get() = transactionState.hasStopSignalAtomic.atomic
119+
val transaction get() = transactionState.transaction
121120

122121
fun launch() {
123122
isRunning.set(true)
@@ -218,7 +217,7 @@ class QueryRunner constructor(
218217
noResultMsg = INSERT_QUERY_NO_RESULT,
219218
queryStr = query.toString(),
220219
stream = Response.Stream.ConceptMaps()
221-
) { transaction.query().insert(query, TypeDBOptions.core().prefetch(true)) }
220+
) { transaction.query().insert(query, transactionState.typeDBOptions().prefetch(true)) }
222221
}
223222

224223
private fun runUpdateQuery(query: TypeQLUpdate) {
@@ -228,7 +227,7 @@ class QueryRunner constructor(
228227
noResultMsg = UPDATE_QUERY_NO_RESULT,
229228
queryStr = query.toString(),
230229
stream = Response.Stream.ConceptMaps()
231-
) { transaction.query().update(query, TypeDBOptions.core().prefetch(true)) }
230+
) { transaction.query().update(query, transactionState.typeDBOptions().prefetch(true)) }
232231
}
233232

234233
private fun runMatchQuery(query: TypeQLMatch) {

state/connection/SessionState.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicReference
3636
import mu.KotlinLogging
3737

3838
class SessionState constructor(
39-
private val client: ClientState,
39+
internal val client: ClientState,
4040
internal val notificationMgr: NotificationManager
4141
) {
4242

state/connection/TransactionState.kt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,11 @@ class TransactionState constructor(
6767
val hasStopSignal get() = hasStopSignalAtomic.state
6868
val hasRunningQuery get() = hasRunningQueryAtomic.state
6969
private val onSchemaWrite = LinkedBlockingQueue<() -> Unit>()
70-
private val hasStopSignalAtomic = AtomicBooleanState(false)
70+
internal val hasStopSignalAtomic = AtomicBooleanState(false)
7171
private var hasRunningQueryAtomic = AtomicBooleanState(false)
7272
private val isOpenAtomic = AtomicBooleanState(false)
7373
private var _transaction: TypeDBTransaction? by mutableStateOf(null)
74+
internal val transaction get() = _transaction!!
7475

7576
val snapshot = ConfigState(
7677
activatedFn = { it || type.isWrite },
@@ -94,7 +95,7 @@ class TransactionState constructor(
9495
private fun tryOpen() {
9596
if (isOpen) return
9697
try {
97-
val options = TypeDBOptions.core().infer(infer.activated)
98+
val options = typeDBOptions().infer(infer.activated)
9899
.explain(infer.activated).transactionTimeoutMillis(ONE_HOUR_IN_MILLS)
99100
_transaction = session.transaction(type, options)!!.apply {
100101
onClose { close(TRANSACTION_CLOSED_ON_SERVER, it?.message ?: "Unknown") }
@@ -112,7 +113,7 @@ class TransactionState constructor(
112113
try {
113114
hasStopSignalAtomic.set(false)
114115
tryOpen()
115-
return if (isOpen) QueryRunner(_transaction!!, content, hasStopSignalAtomic.atomic) {
116+
return if (isOpen) QueryRunner(this, content) {
116117
if (!snapshot.activated) close()
117118
else if (!isOpen) close(TRANSACTION_CLOSED_IN_QUERY)
118119
hasStopSignalAtomic.set(false)
@@ -155,4 +156,8 @@ class TransactionState constructor(
155156
message?.let { notificationMgr.userError(LOGGER, it, *params) }
156157
}
157158
}
158-
}
159+
160+
internal fun typeDBOptions(): TypeDBOptions {
161+
return if (session.client.isCluster) TypeDBOptions.cluster() else TypeDBOptions.core()
162+
}
163+
}

state/schema/SchemaManager.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,7 @@ class SchemaManager(
183183
synchronized(this) {
184184
lastTransactionUse.set(System.currentTimeMillis())
185185
if (readTx.get() != null) return readTx.get()
186-
val options = TypeDBOptions.core().transactionTimeoutMillis(ONE_HOUR_IN_MILLS)
187-
readTx.set(session.transaction(options = options).also { it?.onClose { closeReadTx() } })
186+
readTx.set(session.transaction().also { it?.onClose { closeReadTx() } })
188187
scheduleCloseReadTx()
189188
return readTx.get()
190189
}

0 commit comments

Comments
 (0)