Is anyone worked upon the spark structure streaming with solace using scala , getting below error

Is anyone worked upon the spark structure streaming with solace using scala , getting below error


    <scala.version>2.12.12</scala.version>
    <spark.version>3.1.2</spark.version>

Solace Jar : pubsubplus-connector-spark-1.2.0-jar-with-dependencies.jar

com.solacesystems.jcsmp.JCSMPTransportException: CertificateException - java.security.cert.CertPathValidatorException: Path does not chain with any of the trust anchors
at com.solacesystems.jcsmp.protocol.impl.TcpChannel.executePostOnce(TcpChannel.java:248)
at com.solacesystems.jcsmp.protocol.impl.ChannelOpStrategyClient.performOpen(ChannelOpStrategyClient.java:101)
at com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.performOpenSingle(TcpClientChannel.java:428)
at com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.access$800(TcpClientChannel.java:124)
at com.solacesystems.jcsmp.protocol.impl.TcpClientChannel$ClientChannelConnect.call(TcpClientChannel.java:2661)
at com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.open(TcpClientChannel.java:404)
at com.solacesystems.jcsmp.impl.JCSMPBasicSession.sniffRouter(JCSMPBasicSession.java:423)
at com.solacesystems.jcsmp.impl.JCSMPBasicSession.createFlow(JCSMPBasicSession.java:1059)
at com.solacecoe.connectors.spark.streaming.solace.SolaceBroker.setReceiver(SolaceBroker.java:66)
at com.solacecoe.connectors.spark.streaming.solace.SolaceBroker.addReceiver(SolaceBroker.java:48)
at com.solacecoe.connectors.spark.streaming.SolaceMicroBatch.(SolaceMicroBatch.java:115)
at com.solacecoe.connectors.spark.SolaceScan.toMicroBatchStream(SolaceScan.java:44)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$4(MicroBatchExecution.scala:104)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:82)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:97)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:82)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:318)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:318)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:307)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:82)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:62)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:326)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)

Hi @ibneyhasan - The error in the logs indicate an issue when trying to read certificates. Is the connection to solace based on tls. Can you share the connector configuration you have used for the setup?

Here is connector configuration.

val df = spark.readStream.format(“solace”)
.option(“host”, “tcps://.aws.com:55443,tcps://.aws.com:55443”)
.option(“vpn”, “”“)”)
.option(“username”, "
“)
.option(“password”, “")
.option(“queue”, "
”)
.option(“connectRetries”, 2)
.option(“reconnectRetries”, -1)
.option(“reconnectRetryWaitInMillis”, 5000)
.option(“keepAliveIntervalInMillis”, 30000)
.option(“batchSize”, 100)
.option(“includeHeaders”, true)
// .option(“solace.oauth.client.auth-server.ssl.validate-certificate”, false)
.option(“partitions”, 10)
.option(“solace.apiProperties.SSL_TRUST_STORE”, "//solace_keystore.jks”)
.option(“solace.apiProperties.SSL_TRUST_STORE_FORMAT”, “jks”)
.option(“solace.apiProperties.SSL_TRUST_STORE_PASSWORD”, “*”)
.load()

Thanks for sharing the configuration. I believe the handshake between certificate’s presented by server and your truststore is failing. The CA certificate may not be matching. Can you also try providing the absolute path in trust_store configuration to make sure jks path is resolved correctly.

Also, the latest version of connector is 3.1.2 and it is available in maven central - https://mvnrepository.com/artifact/com.solacecoe.connectors/pubsubplus-connector-spark