Skip to content

Commit 81ce125

Browse files
authored
Merge pull request #38 from SOFTNETWORK-APP/fix/tableType
- fix table type through SQL round trip - invalidate schema cache after deletion of an index - add support for SQL UPDATE with scripts
2 parents bcee5f2 + 97d5a6d commit 81ce125

12 files changed

Lines changed: 472 additions & 118 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ For programmatic access, add SoftClient4ES to your project:
205205
resolvers += "Softnetwork" at "https://softnetwork.jfrog.io/artifactory/releases/"
206206

207207
// Choose your Elasticsearch version
208-
libraryDependencies += "app.softnetwork.elastic" %% "softclient4es8-java-client" % "0.17.2"
208+
libraryDependencies += "app.softnetwork.elastic" %% "softclient4es8-java-client" % "0.17.3"
209209
// Add the community extensions for materialized views (optional)
210210
libraryDependencies += "app.softnetwork.elastic" %% "softclient4es8-community-extensions" % "0.1.0"
211211
```

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ ThisBuild / organization := "app.softnetwork"
2020

2121
name := "softclient4es"
2222

23-
ThisBuild / version := "0.17.2"
23+
ThisBuild / version := "0.17.3"
2424

2525
ThisBuild / scalaVersion := scala213
2626

core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,7 @@ trait IndicesApi extends ElasticClientHelpers {
443443

444444
executeDeleteIndex(index) match {
445445
case success @ ElasticSuccess(true) =>
446+
invalidateSchema(index)
446447
logger.info(s"✅ Index '$index' deleted successfully")
447448
success
448449
case success @ ElasticSuccess(_) =>

documentation/sql/dml_statements.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -329,13 +329,13 @@ ON CONFLICT (uuid) DO UPDATE;
329329
`COPY INTO` transparently supports remote file systems by auto-detecting the URI scheme in the `FROM` path.
330330
No SQL syntax change is required — simply use the appropriate URI scheme.
331331

332-
| URI scheme | File system | Required JAR |
333-
| --- | --- | --- |
334-
| `s3a://` or `s3://` | AWS S3 | `hadoop-aws` |
335-
| `abfs://`, `abfss://`, `wasb://`, `wasbs://` | Azure ADLS Gen2 / Blob Storage | `hadoop-azure` |
336-
| `gs://` | Google Cloud Storage | `gcs-connector-hadoop3` |
337-
| `hdfs://` | HDFS | _(bundled with hadoop-client)_ |
338-
| _(no scheme / local path)_ | Local filesystem | _(no extra JAR needed)_ |
332+
| URI scheme | File system | Required JAR |
333+
|----------------------------------------------|--------------------------------|--------------------------------|
334+
| `s3a://` or `s3://` | AWS S3 | `hadoop-aws` |
335+
| `abfs://`, `abfss://`, `wasb://`, `wasbs://` | Azure ADLS Gen2 / Blob Storage | `hadoop-azure` |
336+
| `gs://` | Google Cloud Storage | `gcs-connector-hadoop3` |
337+
| `hdfs://` | HDFS | _(bundled with hadoop-client)_ |
338+
| _(no scheme / local path)_ | Local filesystem | _(no extra JAR needed)_ |
339339

340340
> **Important:** Cloud connector JARs are declared as `provided` dependencies and are **not bundled** in the library.
341341
> They must be present in the runtime classpath (e.g. added to the CLI assembly or the application's fat-jar).

sql/src/main/scala/app/softnetwork/elastic/sql/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ package object sql {
573573
override def sql: String = s"""'$value'"""
574574
override def baseType: SQLType = SQLTypes.Varchar
575575

576-
override def ddl: String = s""""$value""""
576+
override def ddl: String = s""""${value.replace("\\", "\\\\").replace("\"", "\\\"")}""""
577577
}
578578

579579
case object IdValue extends Value[String]("_id") with TokenRegex {

sql/src/main/scala/app/softnetwork/elastic/sql/parser/Parser.scala

Lines changed: 14 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package app.softnetwork.elastic.sql.parser
1818

19-
import app.softnetwork.elastic.sql.PainlessContextType.Processor
2019
import app.softnetwork.elastic.sql._
2120
import app.softnetwork.elastic.sql.function.time.DateTimeFunction
2221
import app.softnetwork.elastic.sql.function._
@@ -230,42 +229,24 @@ object Parser
230229
case None => None
231230
}
232231

233-
def script: PackratParser[PainlessScript] =
234-
("SCRIPT" ~ "AS") ~ start ~ (identifierWithArithmeticExpression |
232+
def scriptValue: PackratParser[PainlessScript] = identifierWithArithmeticExpression |
235233
identifierWithTransformation |
236234
identifierWithIntervalFunction |
237-
identifierWithFunction) ~ end ^^ { case _ ~ _ ~ s ~ _ => s }
235+
identifierWithFunction
236+
237+
def script: PackratParser[PainlessScript] =
238+
("SCRIPT" ~ "AS") ~ start ~ scriptValue ~ end ^^ { case _ ~ _ ~ s ~ _ => s }
238239

239240
def column: PackratParser[Column] =
240241
ident ~ extension_type ~ (script | multiFields) ~ defaultVal ~ notNull ~ comment ~ (options | success(
241242
ListMap.empty[String, Value[_]]
242243
)) ^^ { case name ~ dt ~ mfs ~ dv ~ nn ~ ct ~ opts =>
243244
mfs match {
244245
case script: PainlessScript =>
245-
val ctx = PainlessContext(Processor)
246-
val scr = script.painless(Some(ctx))
247-
val temp = s"$ctx$scr"
248-
val ret =
249-
temp.split(";") match {
250-
case Array(single) if single.trim.startsWith("return ") =>
251-
val stripReturn = single.trim.stripPrefix("return ").trim
252-
s"ctx.$name = $stripReturn"
253-
case multiple =>
254-
val last = multiple.last.trim
255-
val temp = multiple.dropRight(1) :+ s" ctx.$name = $last"
256-
temp.mkString(";")
257-
}
258246
Column(
259247
name,
260248
dt,
261-
Some(
262-
ScriptProcessor(
263-
script = script.sql,
264-
column = name,
265-
dataType = dt,
266-
source = ret
267-
)
268-
),
249+
Some(ScriptProcessor.fromScript(name, script, Some(dt))),
269250
Nil,
270251
dv,
271252
nn,
@@ -521,27 +502,9 @@ object Parser
521502

522503
def alterColumnScript: PackratParser[AlterColumnScript] =
523504
alterColumnIfExists ~ ident ~ "SET" ~ script ^^ { case ie ~ name ~ _ ~ ns =>
524-
val ctx = PainlessContext(Processor)
525-
val scr = ns.painless(Some(ctx))
526-
val temp = s"$ctx$scr"
527-
val ret =
528-
temp.split(";") match {
529-
case Array(single) if single.trim.startsWith("return ") =>
530-
val stripReturn = single.trim.stripPrefix("return ").trim
531-
s"ctx.$name = $stripReturn"
532-
case multiple =>
533-
val last = multiple.last.trim
534-
val temp = multiple.dropRight(1) :+ s" ctx.$name = $last"
535-
temp.mkString(";")
536-
}
537505
AlterColumnScript(
538506
name,
539-
ScriptProcessor(
540-
script = ns.sql,
541-
column = name,
542-
dataType = ns.out,
543-
source = ret
544-
),
507+
ScriptProcessor.fromScript(name, ns, Some(ns.out)),
545508
ifExists = ie
546509
)
547510
}
@@ -1025,10 +988,12 @@ object Parser
1025988

1026989
/** UPDATE table SET col1 = v1, col2 = v2 [WHERE ...] */
1027990
def update: PackratParser[Update] =
1028-
("UPDATE" ~> ident) ~ ("SET" ~> repsep(ident ~ "=" ~ value, separator)) ~ where.? ^^ {
1029-
case table ~ assigns ~ w =>
1030-
val values = ListMap(assigns.map { case col ~ _ ~ v => col -> v }: _*)
1031-
Update(table, values, w)
991+
("UPDATE" ~> ident) ~ ("SET" ~> repsep(
992+
ident ~ "=" ~ (value | scriptValue),
993+
separator
994+
)) ~ where.? ^^ { case table ~ assigns ~ w =>
995+
val values = ListMap(assigns.map { case col ~ _ ~ v => col -> v }: _*)
996+
Update(table, values, w)
1032997
}
1033998

1034999
/** DELETE FROM table [WHERE ...] */
@@ -1094,7 +1059,7 @@ trait Parser
10941059
val endStruct: Parser[String] = "}"
10951060

10961061
def objectValue: PackratParser[ObjectValue] =
1097-
lparen ~> rep1sep(option, comma) <~ rparen ^^ { opts =>
1062+
lparen ~> repsep(option, comma) <~ rparen ^^ { opts =>
10981063
ObjectValue(ListMap(opts: _*))
10991064
}
11001065

sql/src/main/scala/app/softnetwork/elastic/sql/parser/type/package.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import app.softnetwork.elastic.sql.{
2424
Identifier,
2525
LongValue,
2626
LongValues,
27+
Null,
2728
ParamValue,
2829
PiValue,
2930
RandomValue,
@@ -38,9 +39,11 @@ package object `type` {
3839
trait TypeParser { self: Parser =>
3940

4041
def literal: PackratParser[StringValue] =
41-
(("\"" ~> """([^"\\]|\\.)*""".r <~ "\"") | ("'" ~> """([^'\\]|\\.)*""".r <~ "'")) ^^ { str =>
42-
StringValue(str)
43-
}
42+
(("\"" ~> """([^"\\]|\\.)*""".r <~ "\"") ^^ { str =>
43+
StringValue(str.replace("\\\"", "\"").replace("\\\\", "\\"))
44+
}) | (("'" ~> """([^'\\]|\\.)*""".r <~ "'") ^^ { str =>
45+
StringValue(str.replace("\\'", "'").replace("\\\\", "\\"))
46+
})
4447

4548
def long: PackratParser[LongValue] =
4649
"""(-)?(0|[1-9]\d*)""".r ^^ (str => LongValue(str.toLong))
@@ -59,6 +62,9 @@ package object `type` {
5962
def param: PackratParser[ParamValue.type] =
6063
"?" ^^ (_ => ParamValue)
6164

65+
def nullValue: PackratParser[Null.type] =
66+
"(?i)NULL\\b".r ^^ (_ => Null)
67+
6268
def literals: PackratParser[Value[_]] = "[" ~> repsep(literal, ",") <~ "]" ^^ { list =>
6369
StringValues(list)
6470
}
@@ -78,7 +84,7 @@ package object `type` {
7884
def array: PackratParser[Value[_]] = literals | longs | doubles | booleans
7985

8086
def value: PackratParser[Value[_]] =
81-
literal | pi | random | double | long | boolean | param | array
87+
literal | pi | random | double | long | boolean | nullValue | param | array
8288

8389
def identifierWithValue: Parser[Identifier] = (value ^^ functionAsIdentifier) >> cast
8490

sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -453,20 +453,31 @@ package object query {
453453
}
454454
}
455455

456-
case class Update(table: String, values: ListMap[String, Value[_]], where: Option[Where])
456+
case class Update(table: String, values: ListMap[String, PainlessScript], where: Option[Where])
457457
extends DmlStatement {
458458
override def sql: String = s"UPDATE $table SET ${values
459-
.map { case (k, v) => s"$k = ${v.value}" }
460-
.mkString(", ")}${where.map(w => s" ${w.sql}").getOrElse("")}"
459+
.map { case (k, v) =>
460+
v match {
461+
case value: Value[_] => s"$k = ${value.value}"
462+
case painlessScript => s"$k = ${painlessScript.sql}"
463+
}
464+
}
465+
.mkString(", ")}${where.map(w => s"${w.sql}").getOrElse("")}"
461466

462467
lazy val customPipeline: IngestPipeline = IngestPipeline(
463-
s"update-$table-${Instant.now}",
468+
s"update-$table-${Instant.now.toEpochMilli}",
464469
IngestPipelineType.Custom,
465470
values.map { case (k, v) =>
466-
SetProcessor(
467-
column = k,
468-
value = v
469-
)
471+
v match {
472+
case value: Value[_] =>
473+
SetProcessor(
474+
pipelineType = IngestPipelineType.Custom,
475+
column = k,
476+
value = value
477+
)
478+
case script =>
479+
ScriptProcessor.fromScript(k, script, pipelineType = IngestPipelineType.Custom)
480+
}
470481
}.toSeq
471482
)
472483

@@ -833,11 +844,15 @@ package object query {
833844
case None => Nil
834845
}
835846

836-
lazy val tableType: TableType = (options.get("type") match {
837-
case Some(value) =>
838-
value match {
839-
case s: StringValue => Some(TableType(s.value))
840-
case _ => None
847+
lazy val tableType: TableType = (mappings.get("_meta") match {
848+
case Some(meta) =>
849+
meta match {
850+
case o: ObjectValue =>
851+
o.value.get("type") match {
852+
case Some(s: StringValue) => Some(TableType(s.value))
853+
case _ => None
854+
}
855+
case _ => None
841856
}
842857
case None => None
843858
}).getOrElse(TableType.Regular)

sql/src/main/scala/app/softnetwork/elastic/sql/schema/package.scala

Lines changed: 72 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,35 @@ package object schema {
436436

437437
}
438438

439+
object ScriptProcessor {
440+
def fromScript(
441+
column: String,
442+
script: PainlessScript,
443+
dataType: Option[SQLType] = None,
444+
pipelineType: IngestPipelineType = IngestPipelineType.Default
445+
): ScriptProcessor = {
446+
val ctx = PainlessContext(PainlessContextType.Processor)
447+
val scr = script.painless(Some(ctx))
448+
val painless = s"$ctx$scr"
449+
val source = painless.split(";") match {
450+
case Array(single) if single.trim.startsWith("return ") =>
451+
val stripped = single.trim.stripPrefix("return ").trim
452+
s"ctx.$column = $stripped"
453+
case parts =>
454+
val last = parts.last.trim
455+
val updated = parts.dropRight(1) :+ s" ctx.$column = $last"
456+
updated.mkString(";")
457+
}
458+
ScriptProcessor(
459+
pipelineType = pipelineType,
460+
script = script.sql,
461+
column = column,
462+
dataType = dataType.getOrElse(script.out),
463+
source = source
464+
)
465+
}
466+
}
467+
439468
case class RenameProcessor(
440469
pipelineType: IngestPipelineType = IngestPipelineType.Default,
441470
description: Option[String] = None,
@@ -908,29 +937,43 @@ package object schema {
908937
)
909938
)
910939
}
911-
.map("script" -> _) ++ ListMap(
912-
"multi_fields" -> ObjectValue(
913-
ListMap(multiFields.map(field => field.name -> ObjectValue(field._meta)): _*)
914-
)
915-
) ++ (if (lineage.nonEmpty) {
916-
// ✅ Lineage as map of paths
917-
ListMap(
918-
"lineage" -> ObjectValue(
919-
lineage.map { case (pathId, chain) =>
920-
pathId -> ObjectValues(
921-
chain.map { case (table, column) =>
922-
ObjectValue(
923-
ListMap(
924-
"table" -> StringValue(table),
925-
"column" -> StringValue(column)
926-
)
927-
)
928-
}
929-
)
930-
}
931-
)
932-
)
933-
} else ListMap.empty)
940+
.map("script" -> _) ++ (if (multiFields.nonEmpty)
941+
ListMap(
942+
"multi_fields" -> ObjectValue(
943+
ListMap(
944+
multiFields.map(field =>
945+
field.name -> ObjectValue(field._meta)
946+
): _*
947+
)
948+
)
949+
)
950+
else ListMap.empty[String, Value[_]]) ++ (if (lineage.nonEmpty) {
951+
// ✅ Lineage as map of paths
952+
ListMap(
953+
"lineage" -> ObjectValue(
954+
lineage.map { case (pathId, chain) =>
955+
pathId -> ObjectValues(
956+
chain.map {
957+
case (
958+
table,
959+
column
960+
) =>
961+
ObjectValue(
962+
ListMap(
963+
"table" -> StringValue(
964+
table
965+
),
966+
"column" -> StringValue(
967+
column
968+
)
969+
)
970+
)
971+
}
972+
)
973+
}
974+
)
975+
)
976+
} else ListMap.empty)
934977
}
935978

936979
def updateStruct(): Column = {
@@ -1443,9 +1486,11 @@ package object schema {
14431486
"columns" -> ObjectValue(cols.map { case (name, col) => name -> ObjectValue(col._meta) })
14441487
) ++ ListMap(
14451488
"type" -> StringValue(tableType.name)
1446-
) ++ ListMap(
1447-
"materialized_views" -> StringValues(materializedViews.map(StringValue))
1448-
)
1489+
) ++ (if (materializedViews.nonEmpty)
1490+
ListMap(
1491+
"materialized_views" -> StringValues(materializedViews.map(StringValue))
1492+
)
1493+
else ListMap.empty[String, Value[_]])
14491494

14501495
def update(): Table = {
14511496
val updated =
@@ -1490,7 +1535,7 @@ package object schema {
14901535
""
14911536
}
14921537
val separator = if (partitionBy.nonEmpty) "," else ""
1493-
s"$separator\nOPTIONS = ${Seq(
1538+
s"$separator\nOPTIONS ${Seq(
14941539
mappingOpts,
14951540
settingsOpts,
14961541
aliasesOpts

0 commit comments

Comments
 (0)