Mongo Spark connector does not allow scale and precision of Decimal128 in schema when loading mongo collection into Spark

I have already posted and answered question on stack overflow regarding this issue.

The documentation for the Mongo Spark connector describes two ways of determining the schema when loading a collection from Mongo into Spark: i) schema inference via sampling, and ii) explicitly specifying the schema using a case class.

When working with large decimal values, neither of these methods is satisfactory. The fundamental issue is that there is not a 1-1 mapping between Spark schema types and Scala/Java types.

When using schema inference with a NumberDecimal attribute, schema inference will randomly sample the collection to determine the maximum value in the collection and hence the scale and precision of the corresponding BigDecimal type. However, because sampling is used, the maximum value is inevitably incorrect. Any values exceeding the incorrectly inferred scale overflow the BigDecimal and are silently set to None. Moreover the problem is very hard to troubleshoot because of the non-deterministic nature of schema inference. I propose an enhancement request to output a warning when using schema inference on NumberDecimal attributes.

Alternatively, we can specify a schema using the type parameter of the load helper method. However, in this case there is no way to specify the precision and scale of the BigDecimal, and there is no way to specify that the attribute is not nullable. The BigDecimal type is always translated into decimal(38, 18) (nullable = true) which is not always desired.

The only sensible way I could find to deal with large decimal values was to bypass the load helper methods and call toDF(schema) on the MongoSpark instance, e.g.:

val schema = StructType(
                             List(StructField("time", LongType, false),
                                  StructField("pubKeyId", LongType, false),
                                  StructField("value", DecimalType(30, 0), false),
                                  StructField("outIndex", LongType, false),
                                  StructField("outTxId", LongType, false)
                             ))
    val outputs =    
      builder().sparkContext(sc).readConfig(rc).build().toDF(schema).as[Output]

However, this is a bit cumbersome, and is not explicitly documented. It would be nice if there were additional load method with a signature taking a spark sql schema as parameter so that we can properly specify the schema explicitly when loading a collection…