Tag Archive for: Distributed Computing

Big Data mit Hadoop und Map Reduce!

Foto von delfi de la Rua auf Unsplash.

Hadoop ist ein Softwareframework, mit dem sich große Datenmengen auf verteilten Systemen schnell verarbeiten lassen. Es verfügt über Mechanismen, welche eine stabile und fehlertolerante Funktionalität sicherstellen, sodass das Tool für die Datenverarbeitung im Big Data Umfeld bestens geeignet ist. In diesen Fällen ist eine normale relationale Datenbank oft nicht ausreichend, um die unstrukturierten Datenmengen kostengünstig und effizient abzuspeichern.

Unterschiede zwischen Hadoop und einer relationalen Datenbank

Hadoop unterscheidet sich in einigen grundlegenden Eigenschaften von einer vergleichbaren relationalen Datenbank.

Eigenschaft Relationale Datenbank Hadoop
Datentypen ausschließlich strukturierte Daten alle Datentypen (strukturiert, semi-strukturiert und unstrukturiert)
Datenmenge wenig bis mittel (im Bereich von einigen GB) große Datenmengen (im Bereich von Terrabyte oder Petabyte)
Abfragesprache SQL HQL (Hive Query Language)
Schema Statisches Schema (Schema on Write) Dynamisches Schema (Schema on Read)
Kosten Lizenzkosten je nach Datenbank Kostenlos
Datenobjekte Relationale Tabellen Key-Value Pair
Skalierungstyp Vertikale Skalierung (Computer muss hardwaretechnisch besser werden) Horizontale Skalierung (mehr Computer können dazugeschaltet werden, um Last abzufangen)

Vergleich Hadoop und Relationale Datenbank

Bestandteile von Hadoop

Das Softwareframework selbst ist eine Zusammenstellung aus insgesamt vier Komponenten.

Hadoop Common ist eine Sammlung aus verschiedenen Modulen und Bibliotheken, welche die anderen Bestandteile unterstützt und deren Zusammenarbeit ermöglicht. Unter anderem sind hier die Java Archive Dateien (JAR Files) abgelegt, die zum Starten von Hadoop benötigt werden. Darüber hinaus ermöglicht die Sammlung die Bereitstellung von grundlegenden Services, wie beispielsweise das File System.

Der Map-Reduce Algorithmus geht in seinen Ursprüngen auf Google zurück und hilft komplexe Rechenaufgaben in überschaubarere Teilprozesse aufzuteilen und diese dann über mehrere Systeme zu verteilen, also horizontal zu skalieren. Dadurch verringert sich die Rechenzeit deutlich. Am Ende müssen die Ergebnisse der Teilaufgaben wieder zu seinem Gesamtresultat zusammengefügt werden.

Der Yet Another Resource Negotiator (YARN) unterstützt den Map-Reduce Algorithmus, indem er die Ressourcen innerhalb eines Computer Clusters im Auge behält und die Teilaufgaben auf die einzelnen Rechner verteilt. Darüber hinaus ordnet er den einzelnen Prozessen die Kapazitäten dafür zu.

Das Hadoop Distributed File System (HDFS) ist ein skalierbares Dateisystem zur Speicherung von Zwischen- oder Endergebnissen. Innerhalb des Clusters ist es über mehrere Rechner verteilt, um große Datenmengen schnell und effizient verarbeiten zu können. Die Idee dahinter war, dass Big Data Projekte und Datenanalysen auf großen Datenmengen beruhen. Somit sollte es ein System geben, welches die Daten auch stapelweise speichert und dadurch schnell verarbeitet. Das HDFS sorgt auch dafür, dass Duplikate von Datensätzen abgelegt werden, um den Ausfall eines Rechners verkraften zu können.

Map Reduce am Beispiel

Angenommen wir haben alle Teile der Harry Potter Romane in Hadoop PDF abgelegt und möchten nun die einzelnen Wörter zählen, die in den Büchern vorkommen. Dies ist eine klassische Aufgabe bei der uns die Aufteilung in eine Map-Funktion und eine Reduce Funktion helfen kann.

Bevor es die Möglichkeit gab, solche aufwendigen Abfragen auf ein ganzes Computer-Cluster aufzuteilen und parallel berechnen zu können, war man gezwungen, den kompletten Datensatz nacheinander zu durchlaufen. Dadurch wurde die Abfragezeit auch umso länger, umso größer der Datensatz wurde. Der einzige Weg, um die Ausführung der Funktion zu beschleunigen ist es, einen Computer mit einem leistungsfähigeren Prozessor (CPU) auszustatten, also dessen Hardware zu verbessern. Wenn man versucht, die Ausführung eines Algorithmus zu beschleunigen, indem man die Hardware des Gerätes verbessert, nennt man das vertikale Skalieren.

Mithilfe von MapReduce ist es möglich eine solche Abfrage deutlich zu beschleunigen, indem man die Aufgabe in kleinere Teilaufgaben aufsplittet. Das hat dann wiederum den Vorteil, dass die Teilaufgaben auf viele verschiedene Computer aufgeteilt und von ihnen ausgeführt werden kann. Dadurch müssen wir nicht die Hardware eines einzigen Gerätes verbessern, sondern können viele, vergleichsweise leistungsschwächere, Computer nutzen und trotzdem die Abfragezeit verringern. Ein solches Vorgehen nennt man horizontales Skalieren.

Kommen wir zurück zu unserem Beispiel: Bisher waren wir bildlich so vorgegangen, dass wir alle Harry Potter Teile gelesen haben und nach jedem gelesenen Wort die Strichliste mit den einzelnen Wörtern einfach um einen Strich erweitert haben. Das Problem daran ist, dass wir diese Vorgehensweise nicht parallelisieren können. Angenommen eine zweite Person will uns unterstützen, dann kann sie das nicht tun, weil sie die Strichliste, mit der wir gerade arbeiten, benötigt, um weiterzumachen. Solange sie diese nicht hat, kann sie nicht unterstützen.

Sie kann uns aber unterstützen, indem sie bereits mit dem zweiten Teil der Harry Potter Reihe beginnt und eine eigene Strichliste nur für das zweite Buch erstellt. Zum Schluss können wir dann alle einzelnen Strichlisten zusammenführen und beispielsweise die Häufigkeit des Wortes “Harry” auf allen Strichlisten zusammenaddieren.

MapReduce am Beispiel von Wortzählungen in Harry Potter Büchern

MapReduce am Beispiel von Wortzählungen in Harry Potter Büchern | Source: Data Basecamp

Dadurch lässt sich die Aufgabe auch relativ einfach horizontal skalieren, indem jeweils eine Person pro Harry Potter Buch arbeitet. Wenn wir noch schneller arbeiten wollen, können wir auch mehrere Personen mit einbeziehen und jede Person ein einziges Kapitel bearbeiten lassen. Am Schluss müssen wir dann nur alle Ergebnisse der einzelnen Personen zusammennehmen, um so zu einem Gesamtergebnis zu gelangen.

Das ausführliche Beispiel und die Umsetzung in Python findest Du hier.

Aufbau eines Hadoop Distributed File Systems

Der Kern des Hadoop Distributed File Systems besteht darin die Daten auf verschiedene Dateien und Computer zu verteilen, sodass Abfragen schnell bearbeitet werden können und der Nutzer keine langen Wartezeiten hat. Damit der Ausfall einer einzelnen Maschine im Cluster nicht zum Verlust der Daten führt, gibt es gezielte Replikationen auf verschiedenen Computern, um eine Ausfallsicherheit zu gewährleisten.

Hadoop arbeitet im Allgemeinen nach dem sogenannten Master-Slave-Prinzip. Innerhalb des Computerclusters haben wir einen Knoten, der die Rolle des sogenannten Masters übernimmt. Dieser führt in unserem Beispiel keine direkte Berechnung durch, sondern verteilt lediglich die Aufgaben auf die sogenannten Slave Knoten und koordiniert den ganzen Prozess. Die Slave Knoten wiederum lesen die Bücher aus und speichern die Worthäufigkeit und die Wortverteilung.

Dieses Prinzip wird auch bei der Datenspeicherung genutzt. Der Master verteilt Informationen aus dem Datensatz auf verschiedenen Slave Nodes und merkt sich, auf welchen Computern er welche Partitionen abgespeichert hat. Dabei legt er die Daten auch redundant ab, um Ausfälle kompensieren zu können. Bei einer Abfrage der Daten durch den Nutzer entscheidet der Masterknoten dann, welche Slaveknoten er anfragen muss, um die gewünschten Informationen zu erhalten.

Distributed Computing – MapReduce Algorithmus

Sollen große Datenmengen analysiert werden, ist die Hardware eines leistungsfähigen Computers schnell überfordert und die Analysezeiten werden zu lang. Die Lösung zur Bewältigung von Big Data Analytics sind Konzepte des verteilten Rechnens (Distributed Computing).

Vertikale Skalierung – Der Klassiker der leistungsstarken Datenverarbeitung

Die meisten Unternehmen setzen heute noch auf leistungsstarke und aufrüstbare Einzelserver. Sollten Datenmengen größer und Analysen rechenaufwändiger werden, werden Festplatten (Storage), Arbeitsspeicher (RAM) und Prozessoren (CPU) aufgerüstet oder der Server direkt durch einen leistungsstärkeren ersetzt.

Diese Form der sogenannten vertikalen Skalierung (Vergrößerung der Server-Komponenten) ist für viele Unternehmen heute noch gängige Praxis, auch weil sie leicht zu administrieren ist und sie mit nahezu jeder Software funktioniert. Jedoch sind der Erweiterbarkeit gewisse Grenzen gesetzt und auch der Wechsel zu noch leistungsfähigerer Hardware würde den Einsatz von neuester High-End-Hardware bedeuten, der Kostenanstieg wäre exponentiell. Ferner bedarf es einer durchdachten Backup-Strategie mit gespiegelten Festplatten oder einem ganzen Backup-Server.

Leistungsstarke Server sind teuer und können zwar große Datenmengen weitaus schneller auswerten als Consumer-Computer, jedoch sind auch sie eher nicht dazu in der Lage, Big Data zu verarbeiten, also beispielsweise 100 Terabyte Daten binnen Sekunden statistisch auszuwerten.

Horizontale Skalierung – Skalierbare Speicher-/Rechenleistung

Ein alternatives Konzept zur vertikalen Skalierung ist die horizontale Skalierung. Dabei werden mehrere Computer, die im Vergleich oftmals über nur mittelmäßige Leistungsmerkmale verfügen, über ein Computer-Netzwerk verbunden und parallel angesteuert.

Der große Vorteil der horizontalen Skalierung ist der kostengünstige Einstieg, denn praktisch könnte bereits mit einem einzelnen Computer (Node) begonnen werden und dann nach und nach mit weiteren Nodes die Leistungsfähigkeit des Clusters (Verbund von Nodes) linear gesteigert werden. Ungefähr linear wachsen auch die Kosten an, so dass diese weitaus besser planbar sind. Cluster können weitaus höhere Leistungen erreichen als es einzelne Server könnten, daher gibt die horizontale Skalierung als diejenige, die sich für Big Data Analytics eignet, denn sie ermöglicht verteiltes Rechnen (Distributed Computing). Mit einem ausreichend großen Cluster lassen sich auch 100 Terabyte und mehr in wenigen Augenblicken statistisch auswerten.

Ferner ermöglichen horizontale Lösungen integrierte Backup-Strategien, indem jeder Node des Clusters über ein Backup der Daten eines anderen Nodes verfügt. Verfügt ein Node sogar über mehrere Backups, lässt sich eine sehr hohe Ausfallsicherheit – Datenverfügbarkeit im Cluster – erzielen.

Jedoch gibt es auch Nachteile der horizontalen Skalierung: Die Administration eines Clusters ist weitaus herausfordernder als ein einzelner Server, egal wie leistungsstark dieser sein mag. Auch Bedarf es viel räumlichen Platz für einen (oder gar mehrere) Cluster. Die Kompatibilität der Nodes sollte auch für die nächsten Jahr gesichert sein und nicht zuletzt ist es eine große Hürde, dass die einzusetzende Software (Datenbank- und Analyse-Software) für den Einsatz auf Clustern geeignet sein muss. Verbreite Software-Lösungen für verteiltes Speichern und Rechnen kommen beispielsweise von der Apache Foundation als Open Source Software: Hadoop, Spark und Flink.

Map Reduce Processing

Damit verteiltes Rechnung funktioniert, bedarf es der richtigen Software, die wiederum Algorithmen einsetzt, die sich dafür eignen. Der bekannteste und immer noch am weitesten verbreitete Algorithmus ist MapReduce. MapReduce ist ein sehr einfacher Algorithmus und dürfte von der grundsätzlichen Vorgehensweise jedem Software-Entwickler oder Analyst vertraut sein. Das Prinzip entspricht dem folgenden SQL-Statement, dass die am häufigsten vorkommende Sprache aus dem Datensatz (Tabelle Customers) abfragt:

Es gibt eine Tabelle (es könnte eine Tabelle in einer relationalen Datenbank sein oder eine CSV-Datei), die durch eine SELECT-Query abgefragt (map), groupiert (combine) und sortiert (sort). Dieser Schritt kann vereinfacht als Map-Funktion betrachtet werden, die in einer Liste Paaren aus Schlüssel (Keys) und Werten (Values) resultiert. Ist diese Liste vorhanden, kann diese auf die gewünschten Ergebnisse entspechend einer Logik (z. B. max(), min(), mean(), sum()) auf wenige oder nur einen einzigen Wert reduziert werden (Reduce-Funktion). Zu beachten ist dabei, dass der Map-Prozess sehr viel speicher- und rechen-aufwändiger als der Reduce-Prozess ist. Führen wir diese Abfrage auf einer Maschine aus, fassen wir die beiden Abfragen als ein Statement aus:

SELECT TOP 1 [Language], COUNT(*)
FROM Customers
GROUP BY [Language]
ORDER BY COUNT(*) DESC

Betrachten wir jedoch die einzelnen Schritte, können wir sie wieder zumindest in einen Map- und einen Reduce-Schritt unterteilen. Diese Aufteilung machen wir uns für das verteilte Rechnen zunutze: Wenn jeder Computer (Node; oft auch Client Node oder Data Node) einen Teil der Daten besitzt, kann jeder Node für sich einen Map-Prozess durchführen, die Ergebnisse dann an einen Master-Node (oder in Hadoop-Sprache: Name Node) senden, der den Reduce-Prozess durchführt. Der Großteil der Aufgabe findet somit auf dem Cluster statt, nur der simple Reduce-Schritt auf einem einzelnen Computer.

Oftmals reicht ein parallel ablaufender Map-Prozess auf dem Cluster nicht aus, um Daten effizient auswerten zu können. Die Maßgabe sollte stets sein, den Reduce-Aufwand so gering wie möglich zu halten und soviel Arbeit wie möglich auf den Cluster zu verlagern. Daher sollte jeder Node im Cluster soweit aggregieren wie möglich: Dafür gibt es den Combine-Schritt.

Die zuvor erwähnte SQL-Abfrage als MapReduce würde bedeuten, dass ein Node über den Datensatz verfügt (und andere Nodes über weitere Datensätze) und jeder Node für sich seine Daten über einen Map-Prozess herausarbeitet, über einen Combine-Prozess aggregiert und die Aggregationsergebnisse an den Master-Node (Name Node) sendet. Hat der Master-Node alle Ergebnisse erhalten, berechnet dieser daraus das Endergebnis (Reduce).

Zusammenfassung: Map Reduce

MapReduce ist der bekannteste Algorithmus zur verteilten Verarbeitung von Daten und eignet sich für die Durchführung von komplexen Datenanalysen. Liegen Datensätze auf mehreren Computern (Client Nodes) vor, läuft der Algorithmus in der Regel in drei Schritten ab:

  1. Map – Selektierung der Datensätze auf den Computern im gewünschten Format und Durchführung einer Berechnung, beispielsweise der Bildung einer Summe. Dieser Schritt ist ermöglich das Prinzip Schema on Read, das aus Hadoop ein Tool zur Verarbeitung von unstrukturierten Daten macht.
  2. Combine – Durchführung einer Aggregation, die ebenfalls auf jeden Client Node durchgeführt wird, zur Zusammenfassung von Map-Ergebnissen.
  3. Reduce – Aggregation aller Ergebnisse auf dem zentralen Rechner (Name Node)

MapReduce ist dazu geeignet, unstrukturierte Daten zu verarbeiten, denn das Format der Daten wird über einen Map-Algorithmus bestimmt, der sehr flexibel programmiert werden kann. MapReduce ist kein eng definierter Algorithmus, sondern eine Hülle, die mit Inhalt befüllt werden muss. So müssen MapReduce-Algorithmen individuell über eine Programmiersprache wie Java, Scala oder Python programmiert werden.

Ein Beispiel eines in Java programmierten Word-Count-Algorithmus nach der MapReduce-Logik in Hadoop findet sich hier:

1. 	package org.myorg;
2. 	
3. 	import java.io.IOException;
4. 	import java.util.*;
5. 	
6. 	import org.apache.hadoop.fs.Path;
7. 	import org.apache.hadoop.conf.*;
8. 	import org.apache.hadoop.io.*;
9. 	import org.apache.hadoop.mapred.*;
10. 	import org.apache.hadoop.util.*;
11. 	
12. 	public class WordCount {
13. 	
14. 	   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {  // Map-Process on Cluster
15. 	     private final static IntWritable one = new IntWritable(1);
16. 	     private Text word = new Text();
17. 	
18. 	     public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
19. 	       String line = value.toString();
20. 	       StringTokenizer tokenizer = new StringTokenizer(line);
21. 	       while (tokenizer.hasMoreTokens()) {
22. 	         word.set(tokenizer.nextToken());
23. 	         output.collect(word, one);
24. 	       }
25. 	     }
26. 	   }
27. 	
28. 	   public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {  // Reduce-Process on Name Node
29. 	     public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
30. 	       int sum = 0;
31. 	       while (values.hasNext()) {
32. 	         sum += values.next().get();
33. 	       }
34. 	       output.collect(key, new IntWritable(sum));
35. 	     }
36. 	   }
37. 	
38. 	   public static void main(String[] args) throws Exception {  // Setting up the MapReduce-Job "wordcount"
39. 	     JobConf conf = new JobConf(WordCount.class);
40. 	     conf.setJobName("wordcount");
41. 	
42. 	     conf.setOutputKeyClass(Text.class);
43. 	     conf.setOutputValueClass(IntWritable.class);
44. 	
45. 	     conf.setMapperClass(Map.class);
46. 	     conf.setCombinerClass(Reduce.class);
47. 	     conf.setReducerClass(Reduce.class);
48. 	
49. 	     conf.setInputFormat(TextInputFormat.class);
50. 	     conf.setOutputFormat(TextOutputFormat.class);
51. 	
52. 	     FileInputFormat.setInputPaths(conf, new Path(args[0]));
53. 	     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
54. 	
55. 	     JobClient.runJob(conf);
57. 	   }
58. 	}

MapReduce und Advanced Analytics

MapReduce spielt seine Vorteile auf Computer-Clustern aus und eignet sich sehr zur Analyse von Daten nach dem Schema on Read. Für kompliziertere Analysealgorithmen ist MapReduce jedoch nur bedingt geeignet, denn bereits einfache Join-Anweisungen benötigen mehrere MapReduce-Ketten.

Während statistische Auswertungen und Join-Anweisungen mit MapReduce noch gut möglich sind, werden Algorithmen des maschinellen Lernens schwierig bis kaum möglich, da diese viele Iterationen, z. B. zur Anpassung von Gewichten, benötigen.

Hyperkonvergenz: Mehr Intelligenz für das Rechenzentrum

Wer heute dafür verantwortlich ist, die IT-Infrastruktur seines Unternehmens oder einer Organisation zu steuern, der steht vor einer ganzen Reihe Herausforderungen: Skalierbar, beliebig flexibel und mit möglichst kurzer „time-to-market“ für neue Services – so sollte es sein. Die Anforderungen an Kapazität und Rechenpower können sich schnell ändern. Mit steigenden Nutzerzahlen oder neuen Anwendungen, die geliefert werden sollen. Weder Kunden noch Management haben Zeit oder Verständnis dafür, dass neue Dienste wegen neuer Hardwareanforderungen nur langsam oder mit langem Vorlauf ausgerollt werden können.

Unternehmen wollen deshalb schnell und flexibel auf neue Anforderungen und Produkterweiterungen reagieren können. Dabei kommt in der Praxis häufig sehr heterogene Infrastruktur zum Einsatz: On-Premise-Systeme vor Ort, externe Data Center und Cloud-Lösungen müssen zuverlässig, nahtlos und insbesondere auch sicher die Services bereit stellen, die Kunden oder Mitarbeiter nutzen. Wichtig dabei: die Storage- und Computing-Kapazität sollte flexibel skalierbar sein und sich auch kurzfristig geänderten Anforderungen und Prioritäten anpassen können. Zum Beispiel: Innerhalb von kurzer Zeit deutlich mehr virtuelle Desktopsysteme für User bereit stellen.

Smarte Software für Rechenzentren

Der beste Weg für den CIO und die IT-Abteilung, diese neuen Herausforderungen zu lösen, sind „Hyperkonvergenz“-Systeme. Dabei handelt es sich um kombinierte Knoten für Storage und Computing-Leistung im Rechenzentrum, die dank smarter Software beliebig erweitert oder ausgetauscht werden können. Hierbei handelt es sich um SDS-Systeme („Software defined Storage“) – die Speicherkapazität und Rechenleistung der einzelnen Systeme wird von der Software smart abstrahiert und gebündelt.

Das Unternehmen Cisco zeigt, wie die Zukunft im Rechenzentrum aussehen wird: die neue Plattform HyperFlex setzt genau hier an. Wie der Name andeutet, bietet HyperFlex eine Hyperkonvergenz-Plattform für das Rechenzentrum auf Basis von Intel® Xeon® Prozessoren*. Der Kern ist hier die Software, die auf dem eigenen Filesystem „HX Data Platform“ aufsetzt. Damit erweitern Kunden ihr bestehendes System schnell und einfach. Diese Hyperkonvergenz-Lösung ist darauf ausgelegt, nicht als Silo parallel zu bereits bestehender Infrastruktur zu stehen, sondern zu einem Teil der bestehenden Hard- und Software zu werden.

Denn die Verwaltung von HyperFlex-Knoten ist in Ciscos bestehendem UCS Management integriert. So dauert es nur wenige Minuten, bis neue Nodes zu einem System hinzugefügt sind. Nach wenigen Klicks sind die zusätzlichen Knoten installiert, konfiguriert, provisioniert und somit live in Betrieb. Besonders hilfreich für dynamische Unternehmen: HyperFlex macht es sehr einfach möglich, im Betrieb selektiv Storage-, RAM-c oder Computing-Kapazität zu erweitern – unabhängig voneinander.  Sollten Knoten ausfallen, verkraftet das System dies ohne Ausfall oder Datenverlust.

Weiterführende Informationen zu den Cisco HyperFlex Systemen finden Sie mit einem Klick hier.

Dieser Sponsored Post entstand in Zusammenarbeit mit Cisco & Intel.

*Intel, the Intel logo, Xeon, and Xeon Inside are trademarks or registered trademarks of Intel Corporation in the U.S. and/or other countries.