=Paper=
{{Paper
|id=None
|storemode=property
|title=SPARQL Query Answering on a Shared-Nothing Architecture
|pdfUrl=https://ceur-ws.org/Vol-637/paper1.pdf
|volume=Vol-637
}}
==SPARQL Query Answering on a Shared-Nothing Architecture==
SPARQL Query Answering on a
Shared-nothing Architecture
Spyros Kotoulas Jacopo Urbani
kot@few.vu.nl j.urbani@few.vu.nl
Department of Computer Science Department of Computer Science
VU University Amsterdam VU University Amsterdam
ABSTRACT The former allows easier management but only scales by
The amount of Semantic Web data is outgrowing the capac- using more powerful hardware, which is often infeasible or
ity of Semantic Web stores. Similar to traditional databases, prohibitively expensive. The latter allows adding additional
scaling up RDF stores is faced with a design dilemma: in- computational resources to increase performance at the cost
crease the number of nodes at the cost of increased com- of higher complexity and, often, large query response times.
plexity or use sophisticated, and expensive, hardware that Typically, clustered stores split their indexes across the
can support large amounts of memory, high disk bandwidth nodes at loading time. When resolving queries, nodes may
and low seek latency. In this paper, we propose a technique need to exchange data. These data exchanges should be
to do distributed and join-less RDF query answering based minimized since they increase load and response time. Ide-
on query pattern-driven indexing. To this end, we first pro- ally, only data that will be part of the answer should be
pose an extension of SPARQL to specify query patterns. exchanged. This is a very difficult task in the Semantic
These patterns are used to build a query-specific indexes Web, because the semi-structured nature of the data makes
using MapReduce, which are later queried using a NoSQL access patterns difficult to predict.
store. We provide a preliminary evaluation of our results There is another problem that we must consider. In re-
using Hadoop and HBase, indicating that, for a predefined lational databases, the information that is inserted in the
query pattern, our system offers very high query throughput system is carefully selected. In Semantic Web stores, we
and fast response times. often encounter the situation that, while an entire dataset
is included in the knowledge base, only a small subset is
eventually used, for a given application.
Categories and Subject Descriptors In this work, we focus on clustered storage and we claim
E.1 [Data Structures]: Distributed data structures; C.2.4 that we should only index the part of the data that we
[Distributed Systems]: Distributed applications need, and index for specific access patterns. We propose
a technique to decrease the complexity by calculating query
General Terms pattern-driven indexes and this will enable us to exploit clus-
tered architectures without the need to combine data from
Algorithms, Design, Experimentation, Performance
different nodes at query time.
There are several scenarios where the user is able to define
Keywords the queries in advance. For example, application developers
RDF, SPARQL, MapReduce very often know the query patterns of their applications, or
they can be very easily extracted from the application source
1. INTRODUCTION code. Furthermore, analytical workloads typically have a
fixed set of query patterns. For instance, when analysing
The amount of data in the Semantic Web is growing with
social network data, the user may only be interested in re-
a faster pace than the computational power of computers.
solving queries with the predicate “foaf:knows”. Finally,
To cope with this disparity, we either need more efficient
such an approach could be combined with a standard store.
single-machine algorithms for data management or scalable
This approach would answer queries that match some popu-
distributed methods to tap on the computational resources
lar query patterns, offloading the standard store that would
of more machines.
match the rest.
When designing an RDF store, one is often confronted
In section 2, we will introduce SP ARQLP , an extension
with a design dilemma: use centralized or clustered storage.
of the SP ARQL [10] query language to allow for defining
SP ARQL query patterns. These query patterns serve as a
To copy without fee all or part of this material is permitted only for private mapping between a subset of the unbound variables of the
and academic purposes, given that the title of the publication, the authors query, called the pattern variables, and the result set of the
and its date of publication appear. Copying or use for commercial purposes, query.
or to republish, to post on servers or to redistribute to lists, is forbidden In section 3, we will present a method for building the in-
unless an explicit permission is acquired from the copyright owners; the dexes based on the query patterns. This method constructs
authors of the material.
Workshop on Semantic Data Management (SemData@VLDB) 2010,
only the indexes required for each SP ARQLP query pattern
September 17, 2010, Singapore. using the MapReduce[6] parallel programming paradigm.
Copyright 2010: www.semdata.org.
To this end, we have adapted some standard database tech- DEFINE GET
niques to MapReduce/RDF and developed some new ones. DEFINE 1.
NoSQL
Furthermore, we will show how we query these indexes. Query
Map Map 2.
In section 4 we will present an implementation of our Reduce Reduce NoSQL
methods using the Hadoop MapReduce framework and the Map Map NoSQL
GET Query
HBase NoSQL database. Our results indicate that our ap- Reduce Reduce NoSQL 3.
Input
proach has good performance on large datasets, where it is Map Map NoSQL
able to answer specific queries in a few milliseconds. Reduce Reduce
NoSQL
Map Map
2. SPARQLP MapReduce
In this section, we will define an extension to SPARQL
for query patterns. We explain the details of this extension
with an example. Consider an application that needs to Figure 1: Overview of the approach
retrieve the names and email addresses of all professors that
work for a university in a given country. The corresponding
SPARQL query1 would be: full triple indexes, so that we avoid loading costs. Instead,
whenever we process a SP ARQLP DEFINE query, we usu-
SELECT ?name ?email where { ?x a profes- ally read the entire input and construct only the indexes
sor. ?x worksfor ?y. ?y locatedin ?country. ?x that we need for that query.
email ?email. ?x name ?name. } Calculating these indexes implies resolving the embedded
SPARQL query within the SP ARQLP DEFINE queries.
The application will repeatedly post the query above re- We consider the following in the resolution of these queries:
placing “?country” with the country it is interested in and
retrieve the corresponding values of (?name, ?email). • The embedded queries will always be more expensive
SP ARQLP aims at capturing such usage patterns by rewrit- to resolve and will produce more bindings than the
ing the query as a function corresponding standard SPARQL queries, since they
have less bound variables.
?country → (?name, ?email)
• We are interested in the time to retrieve all results.
p
In SPARQL we define such a function using the DEF IN E Thus time-to-first result is not relevant, neither is stream-
construct and execute it using the GET construct. Looking ing of results. The performance goal is to maximize
at our example, the mapping function is defined as: result throughput.
DEFINE professorsOfCountry(?country) AS • We are focusing in answering queries over vast amounts
SELECT ?name ?email where { ?x a professor. of data rather than the response time of individual
?x worksfor ?y. ?y locatedin ?country. ?x email SP ARQLP DEFINE queries. Query response time is
?email. ?x name ?name. } more relevant for SP ARQLP GET queries, which we
will discuss in section 3.2
Using this definition, an RDF store prepares to handle
requests on the given function. Once this operation is com-
pleted, we use the GET construct to execute this query for Considering the above, we will use the MapReduce [6]
specific countries. For example, in order to retrieve the pro- paradigm to construct pattern indexes. In Appendix B, we
fessors from the Netherlands we would launch the query: are providing a short introduction to MapReduce, which is
essential for comprehending section 3.1.
GET professorsOfCountry(Netherlands) In Figure 1, we give an overview of our approach. For
DEFINE queries, we extract the embedded query and we
In the appendix A, we formally define the SP ARQLP execute it using MapReduce. We index the results on the
grammar by extending the EBNF notation. pattern variables and store the name of the query in a sep-
arate table. The indexes are loaded in a NoSQL database.
3. SPARQLP ON A SHARED-NOTHING For SPARQLP GET queries, we retrieve a reference to the
index using the query name, and we query it using the pat-
ARCHITECTURE tern variables. Since the variables are used as keys for the
To optimally support the querying model presented in the index, we do not need to perform any joins to retrieve the
previous section, for each SP ARQLP DEFINE query, we results.
can maintain a pattern index, i.e. an index from the pattern In the next subsections, we will describe how we deal with
variables to the other unbounded variables. In fact, these SPARQLP DEFINE and GET queries.
are the only indexes we need to answer SP ARQLP GET
queries. 3.1 SPARQLP DEFINE
Typically, RDF stores maintain a set of triple indexes over Resolving a SPARQLP DEFINE query implies resolving
the entire input. These indexes consist of permutations the embedded SPARQL query which will typically be an
of the triple terms (Subject-Predicate-Object, Predicate- expensive query.
Object-Subject etc). In our approach, we do not maintain Querying Semantic Web data with MapReduce and with-
1 out a-priori constructed indexes differs from querying in tra-
for brevity, we will omit namespaces for the rest of this
paper ditional stores in the following:
• Load-balancing: Clustered databases typically share not need to re-partition our data, eliminating load balancing
data across nodes, and they aim at placing data that problems and the need for an additional job.
is used together on the same node. This decreases re- If all sides of a join are large, then we resort to a hybrid
sponse time, since less data needs to be accessed over of hash join and sort-merge join. First the input tuples are
the network. In contrast, MapReduce focuses mainly partitioned among nodes using a hash function. Then, each
at dividing computation equally across nodes, aim- partition is sorted locally at each node and a sort-merge join
ing at maximising parallelisation rather than response is performed. Note that for larger joins, the load balancing
time. MapReduce achieves this by dynamically parti- problem is dissipated by the fact that we have more bindings
tioning the data. per join variable.
• Setting up MapReduce jobs is an expensive operation, 3.1.4 Recycling
in terms of coordination effort. Thus, it incurs high
Since we are not maintaining traditional indexes, and disk
latency.
storage in a MapReduce cluster is cheap, our method uses
• Without a-priori indexing, selectivity estimation is dif- intermediate result recycling[7]. Intermediate results are not
ficult, since accurately calculating expression cardinal- indexed, but stored in parallel as a collection of local files.
ity is impossible without a-priori data processing. In [7], it was shown that this method improves performance
for systems doing operator-at-a-time processing.
• Passing information between running Map or Reduce
tasks in the same job is not allowed by the paradigm. 3.1.5 Index construction
Therefore, the query plan can only be modified be- As the last step of SP ARQLP DEFINE query evaluation,
tween different jobs. This is practical because SPARQLP we are constructing indexes mapping template variables to
DEFINE queries are expensive, and the overhead of the results of the query. To this end, we are tapping on the
starting new jobs is high. massive sorting functionality of MapReduce frameworks to
• Semantic Web triples are small in size, typically con- construct indexes in a scalable manner.
taining three of four terms and occupying dozens of This is accomplished by a job that groups tuples according
bytes, if dictionary encoding [12] is used, and hun- to template variables, performs a global sort and writes the
dreds of bytes otherwise. Furthermore, table scans are results to files. After that a suitable partitioning of the
scheduled across all processors. Thus, scanning the data is calculated, this task is highly parallelisable. The
entire input is relatively fast. sorted files are written to the local disk and accessed by the
SP ARQLP GET method, described in the following section.
Based on the above observations, we are proposing the
following for the resolution of MapReduce SP ARQLP DE- 3.2 SPARQLP GET
FINE queries: The indexes created during the SPARQLP DEFINE query
resolution are queried using a NoSQL store.
3.1.1 Tuple-at-a-time vs operator-at-a-time For each SPARQLP DEFINE, a new table is created in
Typical RDF stores perform tuple-at-a-time evaluation. the NoSQL database. We use the bindings of the pattern
In MapReduce, it is only possible to partition data once per variables as key and the results as values. In case that we
job. Thus, tuple-at-a-time evaluation would pose significant have a large number of results for a given variable, it is more
limitations on the supported queries. Thus, within jobs, we efficient if we store a pointer to a file in the Distributed File
process one tuple at a time, while each job implements sev- System, instead of the values themselves. This implies an
eral operations. Jobs are run in sequence, so our method additional lookup for queries, but this cost is amortised over
uses a mix of tuple-at-a-time and operation-at-a-time pro- the number of retrieved results and the reduced size of the
cessing. table.
Lacking any joins, SPARQLP GET queries are very fast to
3.1.2 Selectivity estimation evaluate: a single lookup is enough to retrieve all results for
Since, in a MapReduce architecture, full table scans are a given query. Furthermore, clustered NoSQL implementa-
relatively cheap, we use an accurate selectivity estimation tions, assure high fault tolerance, horizontal scalability and
algorithm, performing a full table scan. During this scan, no single point of failure [4].
we calculate the cardinality of each statement pattern in
the query. Furthermore, for statement patterns with few
bindings, we also store the bindings. 4. PRELIMINARY EVALUATION
We have performed a preliminary study to analyze the
3.1.3 Joins during the Map phase benefits of our approach. More thorough evaluation and
Typically, joins are performed in the Reduce phase. This comparison to existing approaches is deferred to future work.
gives rise to load balancing problems, since making equally- This section is organized as follows: in subsection 4.1, we
sized partitions for the Reduce phase is not trivial. Further- describe the testbed and the implementation details of our
more, it incurs significant overhead, since in a MapReduce prototype. We continue reporting the performance of the
job, we can only have a single Reduce function. DEFINE query in subsection 4.2. At last, we report the
Neither of the above is true for Maps. Thus, if one side performance of the GET query in subsection 4.3.
of the join is small enough to fit in main memory, we per-
form a Grace hash-join[8]. This implies all nodes loading the 4.1 Implementation and Testbed
small side in memory and iterating through the large side We have implemented a prototype which implements our
in parallel. The advantage of this approach is that we do methods. We have used Sesame [3] for query parsing and
basic query planning, the Hadoop framework for MapRe- Job Runtime (in sec.)
duce and the HBase NoSQL database for answering GET job 1: selectivity estimation 61
queries. job 2: first join job 36
As a preprocessing step, we compress our input data using job 3: second join job 89
the method described in [12] and, optionally, materialize the job 4: HBase upload 305
closure under the OWL ter horst semantics using previous
work in WebPIE [11]. Table 1: Execution time of the DEFINE query
For the DEFINE queries, we do the following: Firstly,
1100
we extract and index the name and template variables of
the query and extract the embedded SPARQL query. We 1000
use Sesame to parse the SPARQL query and launch our se- 900
lectivity estimation algorithm. The results of the selectivity
800
estimation are passed back to Sesame, which creates a static
# GET Queries per second
query plan. This query plan is executed in the MapReduce 700
framework, and is dynamically adjusted to cater for recy- 600
cling and bindings already calculated during the selectivity
500
estimation step. Finally, indexes are created for the query
plans. Currently, we use a very simple index construction 400
mechanism that uses a single node, but that can be easily 300
extended by carefully partitioning data. The index is loaded
200
by HBase.
For GET queries, we first retrieve the query index, using 100
the name of the query as a key. Then, we post a query con- 0
0 8 16 24 32 40 48 56 64 72 80 88 96 104 112 120 128
sisting of the template variables on HBase on the retrieved Number of clients
index and retrieve the results.
The performance of our prototype was evaluated using the Figure 2: Total number of GET queries per second
DAS3 cluster2 of the Vrije Universiteit Amsterdam. We set
up an Hadoop and HBase cluster using 32 data nodes, with
4 cores, a single SATA drive and 4GB of memory each. ory. The ordering of the patterns according to their increas-
We have used the LUBM benchmark for our evaluation. ing selectivity was: (?X type Student), (?X memberOf ?Y),
We chose query number 8, since it is one of the most chal- (?Y type Department), (?Y subOrganizationOf ?P). The
lenging queries in the dataset and used the University as a bindings for the last two patterns were small enough to fit
parameter. We have generated the LUBM(8000) dataset, in memory, thus they were extracted.
consisting of 1 billion triples. Dictionary encoding took Job 2 loaded the extracted bindings, (?Y type Department)
46 minutes and materializing the closure took 37 minutes, and (?Y subOrganizationOf ?P), and joined them with the
yielding a total of 1.52 billion triples. We created a vari- input during the Map phase. In the Reduce phase, the re-
ant of query number 8 of the standard LUBM query set as sults were joined with (?X memberOf ?Y).
a DEFINE query and executed it against the 1.52 billion Job 3 performed a hybrid join with (?X type Student ),
triples. The query is reported below: since both sets were too large to fit in memory.
Job 4 grouped the tuples according to ?P and uploaded
DEFINE getStudentsOfUniversity(?P) AS the bindings to HBase. The poor performance is attributed
SELECT to the fact that we did not create suitable partitions for the
{?X type Student . data. Also, we used the HBase APIs to create the index
?Y type Department . while we could generate it using MapReduce and then load
?X memberOf ?Y . it on HBase. The second approach would give about an
?Y subOrganizationOf ?P . order of magnitude better loading time, compared to our
} current implementation3 .
For GET queries, we have extracted the URIs for all uni- 4.3 Performance of the GET query
versities and evaluated by posting queries for the students After we have generated the index, we can efficiently query
of random universities. Note that, for our prototype, we do the knowledge base with the GET query. This query will
not perform dictionary decoding, of the results. receive in input the university and it will return all the stu-
In the following two subsections we describe the perfor- dents for that university.
mance obtained from first launching a DEFINE query and A single query took on average 3.83 milliseconds to be
later a sequence of GET queries. executed. We decided to stress the system by launching
queries simultaneously from different nodes. The purpose of
4.2 Performance of the DEFINE query this stress test was to evaluate the scalability of our method
For this task, our prototype produced four MapReduce as the number of queries per second increases. We started by
jobs in a sequence. In Table 1, we report the execution time launching the queries from a single client and we increased
for each job. the amount of clients up to 128. Queries were posted from
Job 1 performed the selectivity estimation and extracted
3
the patterns which are small enough to fit in the main mem- see http://hbase.apache.org/docs/current/api/org/
apache/hadoop/hbase/mapreduce/package-summary.
2
http://www.cs.vu.nl/das3 html#bulk
several nodes and we have assigned a CPU core to each In this paper, we have presented a method for RDF data
client. management based on a shared-nothing architecture. In-
We report the results in Figure 2. From the table, we see stead of maintaining indexes over the entire dataset, only
how the throughput of the system increases until it can serve the indexes required for given query patterns are generated.
about 1100 queries per second (i.e. 1ms per query) and then We have implemented a first prototype and we tested the
it stabilizes regardless the number of clients. Again, the performance on a sample query. Our preliminary evalua-
performance could be dramatically increased if we would tion shows that the system can reach high throughput but,
tune the framework on our needs. For example, our index though this approach is promising, further work is necessary
is spread in 8 different regions, which were served by only 5 for a complete evaluation.
nodes, effectively using only 5 of the 32 available nodes. If This work was supported by the EU-IST project LarKC
we further distribute the table we could reach even higher (FP7-215535).
throughput.
5. RELATED WORK 7. REFERENCES
This work is related to RDF stores, MapReduce process- [1] A. Abouzeid, K. Bajda-Pawlikowski, D. J. Abadi,
ing and view materialization in SQL databases. A. Rasin, and A. Silberschatz. Hadoopdb: An
HadoopDB [1] is a hybrid MapReduce/database imple- architectural hybrid of mapreduce and dbms
mentation, focused on the processing of SQL queries. Data technologies for analytical workloads. PVLDB,
is maintained by traditional databases in the data nodes. 2(1):922–933, 2009.
For every SQL query received by the system, a MapReduce [2] R. G. Bello, K. Dias, A. Downing, J. J. Feenan, Jr.,
query plan is generated. The input for the corresponding J. L. Finnerty, W. D. Norcott, H. Sun, A. Witkowski,
MapReduce job is retrieved from the SQL databases main- and M. Ziauddin. Materialized views in oracle. In
tained by the data nodes. In contrast, our approach does VLDB ’98: Proceedings of the 24rd International
not build indexes at the loading time and does not perform Conference on Very Large Data Bases, pages 659–664,
joins at runtime. San Francisco, CA, USA, 1998. Morgan Kaufmann
Relational views[5] is mature concept in the field of SQL Publishers Inc.
databases, providing functionality to define a virtual table. [3] J. Broekstra, A. Kampman, and F. van Harmelen.
Furthermore, SQLServer, DB2 and Oracle [2] also materi- Sesame: A generic architecture for storing and
alize these tables. Our work captures SPARQL data usage querying RDF and RDF schema. In Proceedings of the
patterns at a greater detail, since we can embed any kind International Semantic Web Conference (ISWC),
of query in our DEFINE queries and we specify the key on pages 54–68, 2002.
which the results of this query should be indexed. On the [4] F. Chang, J. Dean, S. Ghemawat, W. Hsieh,
other hand, SQL views have the advantage that they use D. Wallach, M. Burrows, T. Chandra, A. Fikes, and
the same model of normal SQL tables. R. Gruber. Bigtable: A distributed storage system for
RDF stores typically employ tuple-at-a-time processing structured data. ACM Transactions on Computer
and avoid materialisation of intermediate results. Further- Systems (TOCS), 26(2):4, 2008.
more, they rely on doing joins at run-time. Our prelimi- [5] E. F. Codd. Recent investigations in relational data
nary results indicate that our approach vastly outperforms base systems. In IFIP Congress, pages 1017–1021,
these stores in terms of throughput and response time of the 1974.
SPARQLP GET queries, since the latter require no joins. [6] J. Dean and S. Ghemawat. Mapreduce: Simplified
Nevertheless, our system is at a disadvantage when process- data processing on large clusters. In Proceedings of the
ing arbitrary SPARQL queries. Furthermore, updating data USENIX Symposium on Operating Systems Design &
in our system is expected to be an expensive operation. Implementation (OSDI), pages 137–147, 2004.
The work reported in [13] demonstrates a method to re-
[7] M. G. Ivanova, M. L. Kersten, N. J. Nes, and R. A.
duce a dataset to an interesting subset using a supercom-
Gonçalves. An architecture for recycling intermediates
puter. It is focused on extracting a relatively small portion
in a column-store. In SIGMOD ’09: Proceedings of the
of the input, which can be queried using a conventional RDF
35th SIGMOD international conference on
store (with the associated limitations). In comparison, our
Management of data, pages 309–320, New York, NY,
architecture is not limited by the size of the indexed data
USA, 2009. ACM.
(since the indexes created in our NoSQL store can be very
[8] M. Kitsuregawa, H. Tanaka, and T. Moto-Oka.
large) but it is limited by the access patterns to this data.
Application of hash to data base machine and its
architecture. New Generation Comput., 1(1):63–74,
6. FUTURE WORK AND CONCLUSION 1983.
We intend to perform a thorough evaluation of our system [9] E. Maler, J. Paoli, C. M. Sperberg-McQueen,
using larger datasets and more queries. Our results should F. Yergeau, and T. Bray. Extensible markup language
be compared with those of standard RDF stores. Moreover, (XML) 1.0 (third edition). first edition of a
we expect that our method will also be useful for answering recommendation, W3C, Feb. 2004.
standard SPARQL queries. Nevertheless, we expect to get http://www.w3.org/TR/2004/REC-xml-20040204.
good performance only for very expensive queries, since the [10] E. Prud’hommeaux and A. Seaborne. SPARQL query
overhead of the platform is very high. Furthermore, research language for RDF. W3C recommendation, W3C, Jan.
should be carried out in update mechanisms for indexes. To 2008. http://www.w3.org/TR/2008/REC-rdf-sparql-
this end, techniques for materialised SQL views can be used. query-20080115/.
[11] J. Urbani, S. Kotoulas, J. Maassen, F. van Harmelen, Algorithm 1 Counting term occurrences in RDF NTriples
and H. E. Bal. Owl reasoning with webpie: files
Calculating the closure of 100 billion triples. In
L. Aroyo, G. Antoniou, E. Hyvönen, A. ten Teije, map( key , v a l u e ) :
H. Stuckenschmidt, L. Cabral, and T. Tudorache, // key : l i n e number
editors, ESWC (1), volume 6088 of Lecture Notes in // v a l u e : t r i p l e
Computer Science, pages 213–227. Springer, 2010. // emit a blank v a l u e , s i n c e
// o n l y amount o f terms m a t t e r s
[12] J. Urbani, J. Maassen, and H. Bal. Massive semantic
emit ( v a l u e . s u b j e c t , blank ) ;
web data compression with mapreduce. In Proceedings
emit ( v a l u e . p r e d i c a t e , blank ) ;
of the MapReduce workshop at HPDC, 2010.
emit ( v a l u e . o b j e c t , blank ) ;
[13] G. T. Williams, J. Weaver, M. Atre, and J. A.
Hendler. Scalable reduction of large datasets to r e d u c e ( key , i t e r a t o r v a l u e s ) :
interesting subsets. In 8th International Semantic Web // key : t r i p l e term (URI o r l i t e r a l )
Conference (Billion Triples Challenge), 2009. // v a l u e s : l i s t o f i r r e l e v a n t v a l u e s
// f o r each term
APPENDIX i n t count =0;
f o r ( value in values )
A. SPARQLP GRAMMAR count++; // count number o f v a l u e s ,
In this appendix we formalize the SPARQLp grammar us- // e q u a l l i n g o c c u r r e n c e s
ing the EBNF notation defined in the W3C recommendation emit ( key , count ) ;
for XML [9]. We do not define the grammar from scratch
but we extend the SPARQL grammar [10] with the following
constructs: INPUT OUTPUT
Map Reduce
DefineQuery ::= ’DEFINE’ ( IRIref+ ) A p C ...> C2
A q B
D r D r3
...
.>
...
,..
E r D q1
...
B. MAPREDUCE Map Reduce
MapReduce is a parallel programming paradigm for par-
allel and distributed processing of batch jobs. Each job con-
sists of two phases: a map and a reduce. The mapping phase Figure 3: MapReduce processing
partitions the input data by associating each element with
a key. The reduce phase processes each partition indepen-
dently. All data is processed based on key/value pairs: the can be scheduled in parallel across many nodes. In this
map function processes a key/value pair and produces a set example, the input triples can be split across nodes ar-
of new key/value pairs; the reduce merges all intermediate bitrarily, since the computations on these triples (emit-
values with the same key into final results. ting the key/value pairs), are independent of each other.
• the reduce operates on an iterator of values because
B.1 MapReduce example: term count the set of values is typically far too large to fit in mem-
We illustrate the use of MapReduce through an example ory. This means that the reducer can only partially
application that counts the occurrences of each term in a use correlations between these items while processing:
collection of triples. As shown in Algorithm 1, the map it receives them as a stream instead of a set. In this
function partitions these triples based on each term. Thus, example, operating on the stream is trivial, since the
it emits intermediate key/value pairs, using the triple terms reducer simply increments the counter for each item.
(s,p,o) as keys and blank, irrelevant, value. The framework
• the reduce operates on all pieces of data that share some
will group all the intermediate pairs with the same key, and
key. By assigning proper keys to data items during the
invoke the reduce function with the corresponding list of
map, the data is partitioned for the reduce phase. A
values. The reduce function will sum the number of values
skewed partitioning (i.e. skewed key distribution) will
into an aggregate term count (one value was emitted for
lead to imbalances in the load of the compute nodes.
each term occurrence) and return the result as output.
If term x is relatively popular the node performing the
This job could be executed as shown in Figure 3. The in-
reduce for term x will be slower than others. To use
put data is split in several blocks. Each computation node
MapReduce efficiently, we must find balanced parti-
operates on one or more blocks, and performs the map func-
tions of the data.
tion on that block. All intermediate values with the same
key are sent to one node, where the reduce is applied. • since the data resides on local nodes, and the physi-
cal location of data is known, the platform performs
B.2 Characteristics of MapReduce locality-aware scheduling: if possible, map and reduce
This simple example illustrates some important elements are scheduled on the machine holding the relevant data,
of the MapReduce programming model: moving computation instead of data. Remote data can
be accessed through a Distributed File System(DFS)
• since the map operates on single pieces of data without maintained by the framework.
dependencies, partitions can be created arbitrarily and