River IQ

spark udf with withColumn

  Ashish Kumar      Spark February 14, 2020
Image

import org.apache.spark.sql.functions._

val events = Seq (

(1,1,2,3,4),

(2,1,2,3,4),

(3,1,2,3,4),

(4,1,2,3,4),

(5,1,2,3,4)).toDF("ID","amt1","amt2","amt3","amt4")


var prev_amt5=0

var i=1

def getamt5value(ID:Int,amt1:Int,amt2:Int,amt3:Int,amt4:Int) : Int = {  

  if(i==1){

i=i+1

prev_amt5=0

  }else{

i=i+1

  }

  if (ID == 0)

  {

if(amt1==0)

{

  val cur_amt5= 1

  prev_amt5=cur_amt5

  cur_amt5

}else{

  val cur_amt5=1*(amt2+amt3)

  prev_amt5=cur_amt5

  cur_amt5

}

  }else if (amt4==0 || (prev_amt5==0 & amt1==0)){

val cur_amt5=0

prev_amt5=cur_amt5

cur_amt5

  }else{

val cur_amt5=prev_amt5 +  amt2 + amt3 + amt4

prev_amt5=cur_amt5

cur_amt5

  }

}

val getamt5 = udf {(ID:Int,amt1:Int,amt2:Int,amt3:Int,amt4:Int) =>            

   getamt5value(ID,amt1,amt2,amt3,amt4)    

}

myDF.withColumn("amnt5", getamt5(myDF.col("ID"),myDF.col("amt1"),myDF.col("amt2"),myDF.col("amt3"),myDF.col("amt4"))).show()


0 Comments

Be first to comment on this post.