I'm Trying to insert data from kafka to Cassandra using 1 node spark and 1 node Cassandra. Snippet of my code is as follows :-
object liveData extends Serializable {
var datezone = new SimpleDateFormat("yyyy-MM-dd")
var timezone = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
datezone.setTimeZone(TimeZone.getTimeZone("UTC"))
timezone.setTimeZone(TimeZone.getTimeZone("UTC"))
implicit val formats = DefaultFormats
def parser(json: String): event = {
val fleetrecord = parse(json).extract[liveevent]
return event(
fleetrecord.id, datezone.format(new Date(fleetrecord.evT)),
timezone.format(new Date(fleetrecord.evT)), timezone.format(Calendar.getInstance().getTime()),
DigestUtils.md5Hex(fleetrecord.id + fleetrecord.evT).toUpperCase(), timezone.format(new Date(fleetrecord.lstevT)),
BigDecimal(fleetrecord.lT).setScale(6, BigDecimal.RoundingMode.HALF_UP).toDouble,
BigDecimal(fleetrecord.lN).setScale(6, BigDecimal.RoundingMode.HALF_UP).toDouble,
fleetrecord.d, fleetrecord.dFrmD, fleetrecord.s, fleetrecord.agl, 0, fleetrecord.port,
Map("d1" -> fleetrecord.d1, "d2" -> fleetrecord.d2, "d3" -> fleetrecord.d3, "d4" -> fleetrecord.d4,
"ebt" -> fleetrecord.eBt, "ibt" -> fleetrecord.iBt, "a1" -> fleetrecord.a1, "a2" -> fleetrecord.a2,
"a3" -> fleetrecord.a3, "a4" -> fleetrecord.a4), fleetrecord.evT)
}
def main(args: Array[String]) {
val conf = new SparkConf()
// .setMaster("local[3]")
.setAppName("Fleet Live Data")
.set("spark.cassandra.connection.host", "some ip")
.set("spark.cassandra.connection.keep_alive_ms", "20000")
.set("spark.cassandra.auth.username", "user")
.set("spark.cassandra.auth.password", "pass")
.set("spark.executor.memory", "2g")
.set("spark.driver.memory", "2g")
.set("spark.submit.deployMode", "cluster")
.set("spark.executor.instances", "9")
.set("spark.executor.cores", "1")
.set("spark.cores.max", "9")
.set("spark.driver.cores", "9")
.set("spark.streaming.unpersist", "true")
.set("spark.locality.wait", "2s")
println("Spark Configuration Done")
val spark = SparkSession
.builder
.appName("Fleet Live Data")
.config(conf)
.getOrCreate()
println("Spark Session Config Done")
val sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
val ssc = new StreamingContext(sc, Seconds(10))
var rddFinal, rddnew: org.apache.spark.rdd.RDD[event] = sc.parallelize(Seq(event("", "", "", "", "", "", 0.0, 0.0, 0, 0, 0, 0, 0, 0, Map("" -> 0), 0)))
val topics = Map("fleet" -> 1)
val kafkaParams = Map[String, String](
"zookeeper.connect" -> "192.168.0.40:2181,192.168.0.106:2181,192.168.0.113:2181",
"group.id" -> "live1",
"auto.offset.reset" -> "largest")
import spark.implicits._
val kafkaStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER)
val collection = kafkaStream.map(_._2).map(parser)
collection.foreachRDD(rdd =>
{
if (!rdd.partitions.isEmpty) {
println("RDD collected")
try {
rddFinal = rdd.sortBy(f => (f.imei, f.gpsdt))
println("time before " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()))
rddFinal.toDF().write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "locationinfo", "keyspace" -> "trackfleet_db", "output.consistency.level" -> "ANY"))
.mode(SaveMode.Append).save()
rddnew = rdd.keyBy(_.imei).reduceByKey((x, _) => x).values
rddnew.saveToCassandra("trackfleet_db", "locationinfo_recent", writeConf = WriteConf(timestamp = TimestampOption.perRow("gpsdtt")))
println("time after " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()))
if (rdd.count() < 5000) {
Mail.sendMail()
}
} catch {
case e: Exception => e.printStackTrace
}
} else {
println("blank rdd")
}
})
ssc.start()
ssc.awaitTermination()
}
}
Now When I execute this application, it works fine for around 1 week, sometimes 4-5 dats after that It starts getting slow, kafka lag keeps on increasing, records create long queue on spark under processing. How and what is missing in my code, is there anything that is using memory and hangs my application after certain period of time. I have checked my storage , executors , memory was very very less under usage , still it gets hang and as soon as when I start new instance of same application on same machine but with different port, it takes all the pending records and clears the queue , I just want to understand why existing application gets hang after running around certain period. Thanks,
Comments
Post a Comment