Tag Archive for: Apache Spark

5 Apache Spark Best Practices

Already familiar with the term big data, right? Despite the fact that we would all discuss Big Data, it takes a very long time before you confront it in your career. Apache Spark is a Big Data tool that aims to handle large datasets in a parallel and distributed manner. Apache Spark began as a research project at UC Berkeley’s AMPLab, a student, researcher, and faculty collaboration centered on data-intensive application domains, in 2009. 

Introduction

Spark’s aim is to create a new framework that was optimized for quick iterative processing, such as machine learning and interactive data analysis while retaining Hadoop MapReduce’s scalability and fault-tolerant. Spark outperforms Hadoop in many ways, reaching performance levels that are nearly 100 times higher in some cases. Spark has a number of components for various types of processing, all of which are based on Spark Core. Today we will be going to discuss in brief the Apache  Spark and 5 of its best practices to look forward to-

What is Apache Spark?

Apache Spark is an open-source distributed system for big data workforces. For fast analytic queries against another size of data, it uses in-memory caching and optimised query execution. It is a parallel processing framework for grouped computers to operate large-scale data analytics applications. This could handle packet and real-time data processing and predictive analysis workloads.

It claims to support code reuse all over multiple workloads—batch processing, interactive queries, real-time analytics, machine learning, and graph processing—and offers development APIs in Java, Scala, Python, and R. With 365,000 meetup members in 2017, Apache Spark is becoming one of the most renowned big data distributed processing frameworks. Explore for Apache Spark Tutorial for more information.

5 best practices of Apache Spark

1. Begin with a small sample of the data.

Because we want to make big data work, we need to start with a small sample of data to see if we’re on the right track. In my project, I sampled 10% of the data and verified that the pipelines were working properly. This allowed me to use the SQL section of the Spark UI to watch the numbers grow throughout the flow while not having to wait too long for it to complete.

In my experience, if you attain your preferred runtime with a small sample, scaling up is usually simple.

2. Spark troubleshooting

For transformations, Spark seems to have a lazy loading behaviour. That is, it will not initiate the transformation computation; instead, it will keep records of the transformation requested. This makes it difficult to determine where in our code there are bugs or areas that need to be optimised. Splitting the code into sections with df.cache() and then using df.count() to force Spark to calculate the df at every section was one practise that we found useful.

Spark actions seem to be keen in that they cause the underlying action to perform a computation. So, if you’ve had a Spark action which you only call when it’s required, pay attention. A Spark action, for instance, is count() on a dataset. You can now inspect the computation of each section using the spark UI and identify any issues. It’s important to note that if you don’t use the sampling we mentioned in (1), you’ll probably end up with a very long runtime that’s difficult to debug.

Check out Apache Spark Training & Certification Course to get yourself certified in Apache Spark with industry-level skills.

3. Finding and resolving Skewness is a difficult task.

Having to look at the stage specifics in the spark UI and looking for just a major difference between both the max and median can help you find the Skewness:

Let’s begin with a definition of Skewness. As previously stated, our data is divided into partitions, and the size of each partition will most likely change as the progress of transformation. This can result in a large difference in size between partitions, indicating that our data is skew. This implies that a few of the tasks were markedly slower than the rest.

Why is this even a bad thing? Because it may cause other stages to stand in line for these few tasks, leaving cores idle. If you understand where all the Skewness has been coming from, you can fix it right away by changing the partitioning.

4. Appropriately cache

Spark allows you to cache datasets in memory. There are a variety of options to choose from:

  • Since the same operation has been computed several times in the pipeline flow, cache it.
  • To allow the required cache setting, use the persist API to enable caching (persist to disc or not; serialized or not).
  • Be cognizant of lazy loading and, if necessary, prime cache up front. Some APIs are eager, while others aren’t.
  • To see information about the datasets you’ve cached, go to the Storage tab in the Spark UI.
  • It’s a good idea to unpersist your cached datasets after you’ve finished using them to free up resources, especially if other people are using the cluster.

5. Spark has issues with iterative code.

It was particularly difficult. Spark uses lazy evaluation so that when the code is run, it only creates a computational graph, a DAG. Once you have an iterative process, however, this method can be very problematic so because DAG finally opens the prior iteration and then becomes extremely large, we mean extremely large. This may be too large for the driver to remember. Because the application is stuck, this makes it appear in the spark UI as if no jobs are running (which is correct) for an extended period of time — until the driver crashes.

This seems to be presently an obvious issue with Spark, and the workaround that worked for me was to use df.checkpoint() / df.reset() / df.reset() / df.reset() / df.reset() / df. every 5–6 iterations, call localCheckpoint() (find your number by experimenting a bit). This works because, unlike cache(), checkpoint() breaks the lineage and the DAG, saves the results and starts from a new checkpoint. The disadvantage is that you don’t have the entire DAG to recreate the df if something goes wrong.

Conclusion

Spark is now one of the most popular projects inside the Hadoop ecosystem, with many companies using it in conjunction with Hadoop to process large amounts of data. In June 2013, Spark was acknowledged into the Apache Software Foundation’s (ASF) entrepreneurial context, and in February 2014, it was designated as an Apache Top-Level Project. Spark could indeed run by itself, on Apache Mesos, or on Apache Hadoop, which is the most common. Spark is used by large enterprises working with big data applications because of its speed and ability to connect multiple types of databases and run various types of analytics applications.

Learning how to make Spark work its magic takes time, but these 5 practices will help you move your project forward and sprinkle some spark charm on your code.

Die Abschätzung von Pi mit Apache Spark

Auf den Berliner Data Science/Big Data/Data Analytics/…-Meetups auf denen ich in letzter Zeit des Öfteren zugegen war, tauchte immer wieder der Begriff Spark auf. Ich wollte wissen was es hiermit auf sich hat. Nachdem ich Spark 1.5.1 lokal auf meinem Mac installiert hatte, fing ich an Wörter in frei verfügbaren Texten zu zählen. Da es mir aber zu aufwändig schien, extrem lange Texte im Internet zu suchen und ich ein Gefühl für die Leistungsfähigkeit von Spark bekommen wollte, widmete ich mich einem skalierbaren Problem: der Abschätzung von Pi mit der Monte Carlo-Methode.

 1000 Zufallspunkte lokal auf Mac

spark-scala-interface-pi-example

Dies war wie zu erwarten keine Herausforderung für meine Hardware. Was passiert bei 10^6/ 10^7/ 10^8/ 10^9… Zufallspunkten?

dataset-spark-pi-example-1

An dieser Stelle stieß ich auf ein “Integer-Problem“. Weil 3*10^9 > 2^31 – 1, kann in diesem Fall nicht mehr der Datentyp Integer verwendet werden, sondern man müsste „long Integer“ (64 bit) nehmen. Was mich nun jedoch viel mehr interessierte als mit Zufallspunkten > 2^31 – 1  zu experimentieren, war eine Spark-Installation auf AWS und die entsprechenden Berechnungszeiten. Ich installierte Spark 1.5.0 (auf Hadoop 2.6.0 YARN) auf einem AWS-Cluster (2 Core/1 Master x m3.xlarge). Zu meiner Überraschung ergab sich Folgendes:

dataset-spark-pi-example-2

Warum war mein Mac schneller als ein AWS-Cluster? Eine m3.xlarge-Instanz hat 4 Kerne und 15 GB Arbeitsspeicher, mein Mac ziemlich genau die Hälfte… Gut, dann probieren wir das Ganze mal mit einem 4 Core/1 Master x m3.xlarge-Cluster.

dataset-spark-pi-example-3

Es ergibt sich kein signifikanter Unterschied. Erst die Verwendung von einem 3 Core/1 Master x r3.2xlarge-Cluster brachte eine Beschleunigung. Wo ist der Flaschenhals? Um Netzwerkeffekte zu prüfen, habe ich schließlich eine 0 Core/1 Master-AWS-Installation getestet.

dataset-spark-pi-example-4

Dieser letzte Test skalierte zu meinen vorherigen Tests auf dem AWS-System, und er wies darauf hin, dass der Flaschenhals kein Netzwerkeffekt war.

Bei heise Developer fand ich einen sehr interessanten Artikel, welcher sich dem Thema „optimale Konfiguration der virtualisierten Cloud-Hardware für den jeweiligen Anwendungsfall finden“ widmet: Benchmarking Spark: Wie sich unterschiedliche Hardware-Parameter auf Big-Data-Anwendungen auswirken

Für heute belasse ich es bei dem vorgestellten Experiment.

To be continued…,

Kontrolle und Steuerung von Spark Applikationen über REST

Apache Spark erfreut sich zunehmender Beliebtheit in der Data Science Szene da es in Geschwindigkeit und Funktionalität eine immense Verbesserung bzw. Erweiterung des reinen Hadoop MapReduce Programmiermodells ist. Jedoch bleibt Spark ebenso wie Hadoop eine Technologie für Experten. Es erfordert zumindest Kenntnisse von Unix-Skripten und muss über die Command-Line gesteuert werden. Die vorhandenen Weboberflächen bieten nur sehr rudimentäre Einblicke in den Status von Spark Applikationen:

spark basic ui

Der Spark JobServer ist ein Open-Source Projekt, das eine REST-Schnittstelle (Representational State Transfer) für Spark anbietet. (In diesem YouTube Video wird anschaulich erläutert, was ein REST API ist und wozu es verwendet werden kann.) Vereinfacht gesagt, ermöglicht es der JobServer, Spark über diese REST-Schnittstelle als Webservice zu nutzen. Es ist möglich, über den JobServer Spark Kontexte und Applikationen (Jobs) zu managen und Kontexte über verschiedene Aufrufe der REST-Schnittstelle hinweg wiederzuverwenden. Jar Files mit Job Implementierungen können vorab über die gleiche Schnittstelle installiert werden, so dass es z.B. möglich ist, auch sehr feingranulare Jobs über die Schnittstelle zu steuern (vollständige Liste der Features).

Der Spark JobServer ist bereits bei verschiedenen Organisationen (u.a. Netflix, Zed Worldwide, KNIME, Azavea und Maana) im Einsatz. Diese Nutzer des JobServers verwenden ihn meist versteckt „unter der Haube“, um so ihre jeweiligen Werkzeuge Big-Data tauglich zu machen. So nutzt KNIME ab dem nächsten Release (Oktober 2015) den JobServer. Anwendern können dann Spark Jobs über eine grafische Oberfläche bequem von ihrem lokalen Rechner aus starten, monitoren und stoppen. In der folgenden Abbildung sehen Sie, wie Trainingsdaten auf den Server hochgeladen werden, um daraus verschiedene Machine Learning Modelle zu erstellen. Diese Modelle können dann auf Testdaten angewandt werden, die z.B. aus einer HIVE-Tabelle nach Spark importiert werden:

spark knime hive jobs

Jeder der dargestellten Knoten mit der Überschrift „Spark ***“, wie z.B. „Spark Decision Tree“, ist ein Spark Job im Sinne des JobServers. Weitere Beispiele für Spark Jobs sind verschiedene Vorverarbeitungsaufgaben wie das Sampling einer Tabelle oder ein Join über mehrere Tabellen.

Spark kann über den JobServer im Standalone-, Mesos- oder im Yarn-Client-Modus angesteuert werden. Eine sehr hilfreiche Erweiterung der eigentlichen Spark-Funktionalität bietet der JobServer über die sogenannten „Named RDDs“ an. Ein Resilient Distributed Dataset (RDD) ist im Prinzip ein Datensatz bzw. eine Tabelle in Spark. „Named RDDs“ erlauben die Weiterverwendung von RDDs über einzelne Jobs hinweg. So kann man Jobs modularer aufbauen und leichter Zwischenergebnisse inspizieren.

Ich kann aus eigener Erfahrung sagen, dass der JobServer die geeignete Middleware zwischen einer benutzerfreundlichen Oberfläche und Spark ist. Die Open-Source Community ist hier sehr aktiv und der JobServer lässt sich bei Bedarf gut erweitern.