Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.apache.spark.sql.redis

import java.nio.charset.StandardCharsets.UTF_8

import redis.clients.jedis.Pipeline
import redis.clients.jedis.params.SetParams

class BinaryNXRedisPersistence extends BinaryRedisPersistence {

override def save(pipeline: Pipeline, key: String, value: Array[Byte], ttl: Int): Unit = {
val keyBytes = key.getBytes(UTF_8)
val setParameters = SetParams.setParams()
.nx()
.ex(ttl)
pipeline.set(keyBytes, value, setParameters)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.spark.sql.redis

import redis.clients.jedis.Pipeline

class HashNXRedisPersistence extends HashRedisPersistence {

override def save(pipeline: Pipeline, key: String, value: Any, ttl: Int): Unit = {
val javaValue = value.asInstanceOf[Map[String, String]]
javaValue.keySet.foreach( field => pipeline.hsetnx(key, field, javaValue(field)))
if (ttl > 0) {
pipeline.expire(key, ttl)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ object RedisPersistence {

private val providers =
Map(SqlOptionModelBinary -> new BinaryRedisPersistence(),
SqlOptionModelHash -> new HashRedisPersistence())
SqlOptionModelHash -> new HashRedisPersistence(),
SqlOptionModelHashNoOpIfExists -> new HashNXRedisPersistence(),
SqlOptionModelBinaryNoOpIfExists -> new BinaryNXRedisPersistence())

def apply(model: String): RedisPersistence[Any] = {
// use hash model by default
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/org/apache/spark/sql/redis/redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package object redis {
val SqlOptionModel = "model"
val SqlOptionModelBinary = "binary"
val SqlOptionModelHash = "hash"
val SqlOptionModelHashNoOpIfExists = "hashnx"
val SqlOptionModelBinaryNoOpIfExists = "binarynx"
val SqlOptionInferSchema = "infer.schema"
val SqlOptionKeyColumn = "key.column"
val SqlOptionTTL = "ttl"
Expand Down