BACK TO ALL BLOGS

Spark on Mesos Part 2: The Great Disk Leak

setup-mesos-part2

After ramping up our usage of Spark, we found that our Mesos agents were running out of disk space.

It was happening rapidly on some of our agents with small disks:

The issue turned out to be that Spark was leaving behind binaries and jars in both driver and executor directories:

Each uncompressed Spark binary directory folder contains 248MB, so to sum this up:

For a small pipeline with one driver and one executor, this adds up to 957MB. At our level of usage, this was 100GB of dead weight added every day.

I looked into ways to at least avoid storing the compressed Spark binaries, since Spark only really needs the uncompressed version. It turns out that Spark uses the Mesos fetcher to copy and extract files. By enabling caching on the Mesos fetcher, Mesos will store only one cached copy of the compressed Spark binaries, then extract it directly into each sandbox directory. In the spark documentation, it looks like this should be solved by setting the spark.mesos.fetcherCache.enable option to true.

If set to true, all URIs (example: spark.executor.uri, spark.mesos.uris) will be cached by the Mesos Fetcher Cache.

Adding this to our Spark application confs, we found that the cache option was turned for the executor, but not driver:

This brought our disk leak down to 740MB per Spark application. Reading through the Spark code, I found that the driver’s fetch configuration is defined by the MesosClusterScheduler, whereas the executor’s are defined by the MesosCourseGrainedSchedulerBackend. There were two oddities about the MesosClusterScheduler:

  • It reads options from the dispatcher’s configuration instead of the submitted application’s configuration
  • It uses the spark.mesos.fetchCache.enable option instead of spark.mesos.fetcherCache.enable

So bizarre! Finding no documentation for either of these issues online, I filed two bugs. By now, my PRs to fix them have been merged in, and should show up in upcoming releases.

In the meantime, I implemented a workaround by adding the spark.mesos.fetchCache.enable = true option to the dispatcher.

Now the Driver also used caching, reducing the disk leak to 523MB per Spark application:

Finally, I took advantage of Spark’s shutdown hook functionality to manually clean up the driver’s uberjar and uncompressed spark binaries:

//shutdown hook to clean driver spark binaries after application finishes
sys.env.get("MESOS_SANDBOX").foreach((sandboxDirectory) => {
 sparkSession.sparkContext.addSparkListener(new SparkListener {
   override def onApplicationEnd(sparkListenerApplicationEnd: SparkListenerApplicationEnd): Unit = {
     val sandboxItems = new File(sandboxDirectory).listFiles()
     val regexes = Array(
       "^spark-\d+.\d+.\d+-bin".r,
       "^hive-spark_.*\.jar".r
     )
     sandboxItems
         .filter((item) => regexes.exists((regex) => regex.findFirstIn(item.getName).isDefined))
         .foreach((item) => {
           FileUtils.forceDelete(item)
         })
   }
 })
})

This reduced the disk leak to just 248MB per application:

This still isn’t perfect, but I don’t think there will be a way to delete the uncompressed spark binaries from your Mesos executor sandbox directories until Spark adds more complete Mesos functionality. For now, it’s a 74% reduction in the disk leak.

Last, and perhaps most importantly, we reduced the time to live for our completed Mesos frameworks and sandboxes from one month to one day. This effectively cut our equilibrium disk usage by 97%. Our Mesos agents’ disk usage now stays at a healthy level.