i want transform column. new column should contain partition of original column. defined following udf:
def extract (index : integer) = udf((v: seq[double]) => v.grouped(16).toseq(index))
to use in loop later with
mydf = mydf.withcolumn("measurement_"+i,extract(i)($"vector"))
the original vector column created with:
var vectors :seq[seq[double]] = myvectors vectors.todf("vector")
but in end following error:
failed execute user defined function(anonfun$user$sparkapp$myclass$$extract$2$1: (array<double>) => array<double>)
have defined udf incorrectly?
i can reproduce error when try extract elements don't exist, i.e. give index larger sequence length:
val mydf = seq(seq(1.0, 2.0 ,3, 4.0), seq(4.0,3,2,1)).todf("vector") mydf: org.apache.spark.sql.dataframe = [vector: array<double>] def extract (index : integer) = udf((v: seq[double]) => v.grouped(2).toseq(index)) // extract: (index: integer)org.apache.spark.sql.expressions.userdefinedfunction val = 2 mydf.withcolumn("measurement_"+i,extract(i)($"vector")).show
gives error:
org.apache.spark.sparkexception: failed execute user defined function($anonfun$extract$1: (array<double>) => array<double>)
most have same problem while doing toseq(index)
, try use toseq.lift(index)
returns none if index out of bound:
def extract (index : integer) = udf((v: seq[double]) => v.grouped(2).toseq.lift(index)) extract: (index: integer)org.apache.spark.sql.expressions.userdefinedfunction
normal index:
val = 1 mydf.withcolumn("measurement_"+i,extract(i)($"vector")).show +--------------------+-------------+ | vector|measurement_1| +--------------------+-------------+ |[1.0, 2.0, 3.0, 4.0]| [3.0, 4.0]| |[4.0, 3.0, 2.0, 1.0]| [2.0, 1.0]| +--------------------+-------------+
index out of bound:
val = 2 mydf.withcolumn("measurement_"+i,extract(i)($"vector")).show +--------------------+-------------+ | vector|measurement_2| +--------------------+-------------+ |[1.0, 2.0, 3.0, 4.0]| null| |[4.0, 3.0, 2.0, 1.0]| null| +--------------------+-------------+
Comments
Post a Comment