Fault Tolerant Distributed Hash-Join in Relational Databases 1st Arsen Nasibullin 2nd Boris Novikov Saint-Petersburg State University HSE University Saint-Petersburg, Russia Saint-Petersburg, Russia nevskyarseny@yandex.ru bnovikov@hseu.ru Abstract—The business today are facing with immense chal- The join operation has been studied and discussed exten- lenges due to complex applications and rapid growth in data sively in research efforts because it is one of the most time- volumes. Many of applications use data for computing statistical consuming and data-intensive operations in relational query data to make proper decision in other applications such as machine learning or social networking. Mostly, these applica- processing [4]–[7]. tions assume performing sophisticated client queries with such Distributed database systems have the main goal is operators as aggregation and join. State-of-the-art distributed processing an enormous amount of data in a short time, relational databases get over these challenges. Unfortunately, by transmitting data, handling its and passing the final distributed database management systems suffer from failures. Failures causes sophisticated queries with joining large tables outcome to applications. In failure-free scenarios, database are re-executed so that enormous volume of resources must be systems may achieve good results. Given that failures are leveraged. common at large-scale, distributed database systems remain In this paper, we introduce a new fault tolerant join algorithm for fault tolerance as built-in feature. As example, modern data distributed RDBMS. The algorithm based on mechanisms of data processing systems have algorithms of recovering tasks of a replication and heartbeat messages. We compare our algorithm with traditional, unreliable distributed join algorithm. This paper failed site [8]. demonstrates how we achieved trade-off between time required to perform tasks of failed sites and extra resources needed to According to studies [9], [10], the most common failure carry it out. types are divided into the following: Index Terms—databases, hash-join, fault-tolerance, query pro- cessing, replication • Communication failure. This type of failures is spe- cific for distributed systems. There are many types of communication failures - errors in messages, unordered I. I NTRODUCTION messages, undelivered message, and lost connection. The Let us consider the definition of distributed database sys- first two types are responsibility of computer network. tems. Distributed database systems are database management Lost messages and/or connection can be the consequence systems, consisting of local database systems [1], [2]. These of failed sites or communication failure. systems with their own disks are located and dispersed over • System failure. A hardware of a software failure may a network of interconnected computers. In this paper, we cause a system failure. For example, may be either CPU deal with shared-nothing architecture of distributed database crashed or the presence of bugs in application code which systems. cause failures occur. Once a system failure occurred, Fault tolerance is the property of system to keep on carrying content of main memory is loosen. out tasks in the event of the failure [1], [3]. A system is able to • Media failure. It refers to the failures of the secondary detect a failure and properly handle it without affecting correct storage devices that store the database. Such failures may execution of tasks. be due to operating system errors, as well as to hardware Distributed systems, based on MapReduce framework, pro- faults such as head crashes or controller failures. The vide high availability, improved performance. They also pro- important point from the perspective of DBMS reliability vide fault-tolerance in case of any site is down. In contrast to is that all or part of the database that is on the secondary distributed systems, parallel RDBMS cannot handle occurred storage is considered to be destroyed and inaccessible. • Transaction failure. Transactions can fail for a number fails so that entire work has to be re-executed. In our work we will focus on how achieve fault tolerance for RDBMS. of reasons. For instance, a failure can occur due to either Modern distributed database systems are applicable for a incorrect input data or potential deadlock. variety of tasks such as business intelligence [1], [4]. Mostly, As state in [9], hardware and software failures make up these tasks assume performing sophisticated client queries more than 90% of all failures. This percentage has not changed with such operators as aggregation and join. significantly since the early days of computing. What is more interesting is the occurrence of soft failure is significantly Copyright© 2020 for this paper by its authors. Use permitted under Creative Commons License Attribution 4.0 International (CC BY 4.0). higher than that of hardware failures. for processing vast amount of data in parallel called Hadoop In contrast to the occurrence of system failures, the occurrence MapReduce [8]. We examined algorithms [6], [11]–[13] of of transaction failures, as it states in [10], is 3%. Modern parallel join for different kind of systems. Analyzed works do database management systems are capable of recovering ex- not consider the task of handling failures. Authors supposed ecution of abrupt interrupted transactions in a short time. In algorithms work on fail-free systems. this paper, we do not consider transactions failures. Review [14] describes the state of the art approaches to Unfortunately, state-of-the-art data processing systems do improve the performance of parallel query processing using not have algorithms for handling failures when a client per- MapReduce. Even more, authors discussed significant weak- forms a query joining two of more tables. Currently, the nesses and limitations of the framework. A classification of modern relational and NoSQL distributed database systems existing studies over MapReduce is provided focusing on the re-execute an entire query even if a one node of systems optimization objective. becomes inactive. For example, a node with stored data Authors proposed a strategy of doubling each task in execu- becomes unavailable when transmitting data to sites where tion [15]. This stands for if one of the tasks fails, the second join operation are performed. It causes re-execution of a query backup task will end up on time, reducing the job completion with join leads to spend enormous resources and time to end time by using larger amounts of resources. Intuitively, readers up a join query. may guess that doubling the tasks leads to approximately The paper addresses the challenges of recovering tasks at doubling the resources. failed sites. Our solution achieves the fault tolerant execution In work [16], authors have devised two strategies to improve of join queries through making trade-off between completing the failure detection in Hadoop via heartbeat messages in the recovery tasks for the least time and extra resources to be worker side. The first strategy is an adaptive interval which leveraged for this. Data replication insure to reallocate a will dynamically configure the expiry time adapted to the client query to available site with stored data. An approach various sizes of jobs. As authors state, the adaptive interval of sending heartbeat messages allows to detect a failure of is advantageous to the small jobs. The second strategy is to any site and undertake required steps to recover undone work. evaluate the reputation of each worker according the reports of We leverage the following notations used in the paper: the failed fetch-errors from each worker. If a worker failures, it lows its reputation. Once the reputation becomes equal to some bound, the master node marks this worker as failed. TABLE I N OTATIONS USED IN THE PAPER To remove single point of failure in Hadoop, a new approach of a metadata replication based solution was Symbol Explanation proposed in [17]. The solution involves three major phases: R Input relation R S Input relation S in initialization phase, each secondary node is registered to b Block size primary node and its initial metadata such as version file and B(R) The number of blocks of relation R file system image are caught up with those of active/primary B(S) The number of blocks of relation S T (R) The number of tuples of relation R node; in replication phase, the runtime metadata (such as T (S) The number of tuples of relation S outstanding operations and lease states) for failover in future |K| The number of keepers are replicated; in failover phase, standby/new elected primary |W | The number of workers node takes over all communications. I(A) The number of distinct join attribute values Keepers Nodes, where data is stored Workers Nodes, where join operation are performed III. D ISTRIBUTED C OST M ODEL One of the challenges of multi objective query optimization This paper is organized as follows. Section II describes in distributed database management systems is to find Pareto related work. Description of the distributed cost model, used set of solutions or the best possible trade-offs among the in the paper, is given in Section III. In Section IV, we address objective functions [18]. Objective functions are defined as to classical distributed hash-join algorithm and examine cost total time of query execution, I/O operations, CPU instructions of the execution. Fault tolerant distributed hash-join algorithm and a number of messages to be transmitted. and its cost is introduced in Section V. Section VI provides In distributed database systems the total time of query exe- the description and evaluations of how fault tolerant algorithm cution is expressed through mathematical model of weighted gets over failures. Comparison of estimations is presented in average. This model consists of time to perform I/O operations, Section VII. This paper is concluded by Section VIII. CPU instructions and time to exchange a number of messages among involved sites. In this work we are going to find trade-off between the least time of the execution in case of II. R ELATED W ORK failure occurrence and extra resources required to recover In this section, we briefly review works dedicated to dif- failed tasks. We will evaluate time to perform I/O operations, ferent approaches of tackling failures in modern framework CPU instructions, and communication separately. In this paper, we consider joining two relations R and S, The direction of a red line from a keeper to the coordinator and R.A = S.B is the join condition. Assume all data of each means process of sending a notification to the coordinator with relation evenly distributed throughout the system; each node message a keeper stopped reading its partitions on a particular stores T|K| (R) and T|K| (S) respectively. phase. The blue lines among keepers and workers denote the process of sending tuples at two phases. Lines of green color, directed from workers to the coordinator, signify submitting IV. C LASSICAL D ISTRIBUTED H ASH -J OIN A LGORITHM outcome tuples. In this section, we describe distributed hash-join algorithm. Here and further in the paper we do not consider the We will often refer to distributed hash-join algorithm as single coordinator receives all client queries. Instead, a new classical join algorithm. coordinator is assigned for each query. At the highest level, the working of distributed hash join in distributed database systems of a shared-nothing architecture A. Cost of Classical Distributed Hash-Join is straightforward: 1) Communication cost: the coordinator sends a message 1) A coordinator receives a client query. Then, the coordi- to K keepers to take up reading their partitions of relation R. nator populates messages with a client query across all One more message the coordinator sends to keepers to start keepers. reading partitions of relation S. At the building phase, keepers 2) Each keeper reads its partitions of relation R, applies a send T (R) messages to workers, and at the probing phase they hash function h1 to the join attribute of each attribute. send T (S) messages. The total communication cost: Hash function h1 has its range of values 0...|K| − 1. If a tuple hashes to value i, then it is sent to a worker Wi . Tmsg = 2 + T (R) + T (S) (1) Once a keeper ends up reading its partitions of relation 2) I/O cost: keepers read B(R) blocks at the building phase R, it notifies the coordinator about the status of work. and workers write B(R) blocks into their disks. At the probing 3) Each worker Wi builds a hash table, allocated in mem- phase, keepers read B(S) blocks of relation S. To write the ory, and fills in it with tuples received from step 2. In result of matching, it will take this step, each worker uses a different hash function h2 than the one used in the step 2. B(R)T (S) + B(S)T (R) IOmatches = (2) 4) Once all keepers stopped reading their partitions of I(A) relation R, the coordinator initiates a probing phase by The total I/O cost: sending notifications to keepers. 5) Each keeper reads its partitions of relation S, applies a TI/O = 2B(R) + B(S) + IOmatches (3) hash function h1 to the join attribute of each attribute 3) CPU cost: at the both building and probing phases, as it is in the step 2. If a tuple hashes to value i, then it the coordinator sends 2 messages to keepers. Each keeper re- has to be sent to a worker Wi . ceives 2 messages. For reading both relations, keepers perform 6) A worker receives a tuple of relation S, probes the hash B(R) + B(S) operations. To write blocks of relation R on table built in step 2 to find out a tuple of relation S joins each worker, it is required to perform B(R) operations. At the with any tuple of relation R. If so, tuples join and an building and probing phases, keepers perform T (R) + T (S) outcome tuple is generated. operations to submit messages to workers and workers execute T (R) + T (S) operations to receive messages. Each worker executes W1 B(R)T (S) + B(S)T (R) + T (R)T (S) K1 CP Ucompare = (4) I(A)|W | W2 comparing of tuples both relations during the matching, and C K2 writing join result. The total CPU cost: TCP U = B(S) + 2(T (R) + T (S) + B(R) + |K| + 1)+ (5) ... W3 CP Ucompare KM WN V. FAULT T OLERANT D ISTRIBUTED H ASH -J OIN Fig. 1. Scheme of working of distributed join We identified a few causes the classical distributed join may interrupt at any phase of algorithm. Figure 1 illustrates a scheme of working distributed join al- • The coordinator became unreachable. For example, due gorithm. The red lines stands for a communication between the to a communication or a system failure. All stored data coordinator and a keeper at two phases building and probing. may be loosen. • A media or system failure occurred at a keeper or a step, each worker uses a different hash function h2 than worker performing operations over its disk. the one used in the step 2. • A site was suddenly turned off in the middle of the 4) Once all keepers stopped reading their partitions of probing phase so the entire work has to be re-executed. relation R, the coordinator initiates a probing phase by To detect and properly handle the failures above, we decided sending notifications to keepers. to leverage the concept of sending heartbeat messages. The 5) Probing. Each keeper reads its partitions of relation S, concept of heartbeat is widely used in the consensus algorithm applies a hash function h1 to the join attribute of each Raft [19]. In a heartbeat message, a site may inform another attribute as it is in the step 2. If a tuple hashes to value site about status of carried out tasks or pass crucial metadata i, then it is sent to i mod |W | and (i + 1) mod |W | information. workers. The approach of heartbeat messages benefits detecting any 6) Worker i mod |W | receives a tuple of relation S, probes failures of sites. The coordinator’s duty is to send heartbeat the hash table built in step 2 to find out a tuple of relation messages to all sites and receive responses. Once the coordi- S joins with any tuple of relation R. If so, tuples join nator receives heartbeat messages, it has to sort out inputs. If and an outcome tuple is generated. The other worker an input contains error message, it stands for an error occurred (i + 1) mod |W | puts reserved data into its disk. in a site, for example, software or media failure. If there is no 7) Once an outcome tuple is generated, a worker sends response from a site, it means either a site is unreachable due heartbeat message the following worker. In this to communication failure or hardware issues occurred. message, it points a position of the last successfully We injected the approach of heartbeat messages into our algo- joined tuple of relation S. rithm. Based on content of a received message, the coordinator undertakes required steps to recover work of a failed site. Apart from telling about communication and system fail- ures, consider failures with disks. A failure with a disk can be W3 ... caused by power loss, media faults, I/O errors, or corruption K1 of information on the disk. Particularly, a failure with a disk of a worker may cause the loss of intermediate join result. W2 WN To prevent the loss of data stored in sites, we leverage the C K2 strategy of full data replication [20], [21]. Keepers form a ... ring. Keeper (i + 1) mod |K| stores a full copy of data of the previous keeper i mod |K|. To protect intermediate data KM W1 of workers from the loss, each keeper copy data for joining to both workers. The first worker executes join operation of both RC tables whereas the second worker stays on until the coordinator signals to join reserved data. Fig. 2. Scheme of working of fault tolerant distributed join A. Algorithm Summarizing said all above, we introduce fault tolerant In the Figure 2 shown a scheme of working of fault tolerant distributed hash-join algorithm. The algorithm is similar to distributed join algorithm. There is the same principle of classical hash-join for distributed database systems in a shared- working as it is shown in Figure 1 for the classical join nothing architecture. algorithm. The difference is that there is added a reserved co- ordinator RC and it synchronizes with the primary coordinator 1) Building. A coordinator receives a client query. To ini- C. Workers comprise a ring of nodes. Each worker is aware of tiate a build phase, the coordinator populates messages the following node. It facilitates a worker submits info about with a client query across all nodes. Once messages are proceeded work during the join to the following worker. In sent, the coordinator sets status of performing a client case of i mod |W | worker is failed, (i+1) mod |W | worker query as processing for all keepers. takes over tasks of a failed worker. 2) Each keeper reads its partitions of relation R, applies a hash function h1 to the join attribute of each attribute. B. Cost of Fault Tolerant Distributed Hash-Join Algorithm Hash function h1 has its range of values 0...|W | − 1. If 1) Communication cost: similar to the classical distributed a tuple hashes to value i, then it is sent to i mod |W | join algorithm, in fault tolerant algorithm the coordinator sends and (i + 1) mod |W | workers. For the latter, a message one message to keepers to take up reading their partitions has to contain message reserved data. Once a keeper of relation R and one message to start reading partitions of ends up reading its partitions of relation R, it notifies relation S. At the building phase, keepers send T (R) messages the coordinator about the status of work. to workers, and at the probing phase they send T (S) messages. 3) Each worker builds a hash table, allocated in memory, and fills in it with tuples received from step 2. In this Tmsg = 2 + T (R) + T (S) (6) 2) I/O cost: keepers read B(R) blocks at the building phase the failed coordinator, a node should handle it by re-sending a and workers write B(R) blocks into their disks in parallel message to the secondary coordinator and keeping on work including reserved blocks of relation R. At the probing phase, with it. A database manager should be aware of a failed keepers read B(S) blocks and workers write B(S) blocks coordinator and recover it. as reserved. To write the result of matching, it’s required to perform B(R)T (S) + B(S)T (R) W3 ... IOmatches = (7) K1 I(A) I/O operations. The total I/O cost: W2 WN Failed K2 TI/O = 2(B(S) + B(R)) + IOmatches (8) Coordinator ... 3) CPU cost: the coordinator carries out 2 operations to W1 submit messages to keepers at the both phases. All in all, KM keepers perform 2|K| operations to receive messages. To read New and write both relations by keepers at two phases, it will takes Primary Coordinator IORW = 2(B(S) + B(R)) (9) Fig. 3. All messages route to the new coordinator At the building and probing phases, to submit and receive messages with tuples, it will takes Figure 3 depicts a case when the coordinator is failed and all CSR = 2(T (R) + T (S)) (10) messages from nodes re-route to the reserved coordinator. operations to submit and receive messages with tuples. Each worker executes B. Failure of a Keeper T (R)T (S) A keeper may fail at any phase: CP Ucompare = (11) I(A)|W | 1) Before performing a query. Once the coordinator knows operations comparing of tuples both relations during the a keeper is failed, it will redirect tasks to another keeper matching, and where reserved data of a failed keeper is stored. 2) Building or probing. Despite the phase, a keeper stops B(R)T (S) + B(S)T (R) CP Ujoin = (12) sending heartbeat messages to the coordinator. The latter I(A)|W | reassigns task to another keeper with stored data of an operations to write join result. After an outcome tuple is inactive keeper and says its to keep scanning a particular generated and sent to the coordinator, two operations are relation. needed to send a message from i mod W worker to (i + 1) To handle the first case, all the coordinator needs to carry out mod |W | worker. is to mark the failed coordinator as unreachable and not to The total CPU cost: submit messages to it. We do not calculate cost of performing TCP U = IORW + CSR + 2(|K| + 1)+ this case because of we assume the coordinator is aware of (13) the address of the failed keeper and the cost will not affect CP Ucompare + CP Ujoin the query execution. Let us consider the second case. As we pointed out in Section III, the total cost of execution is the sum of communication VI. H ANDLING FAILURES cost, I/O cost and CPU cost. Communication cost is composed In this section, we examine steps to handle failures during of sending a message from the coordinator to a keeper and the execution of fault tolerant hash join algorithm. submitting messages of a particular relation to workers. A. Failure of The Coordinator T (R) + T (S) Tmsg = 1 + (14) As we said before, the coordinator may fail at any phase. For |K| instance, the coordinator becomes unreachable before probing I/O cost implies scanning blocks of two relations, writing phase or even in the middle of the execution of building phase. blocks of relation R twice in parallel and once blocks of To eliminate any of those scenarios, the secondary coordinator relation S. takes over a work of the failed coordinator. Both keepers and 2(B(R) + B(S)) workers have to be aware of the secondary coordinator and be TI/O = (15) |K| able to quickly join it. We suggest adding both coordinators’ addresses to keepers As for CPU cost, it is composed of two operations to send and and workers. When a heartbeat message is sent from a site to receive message from the coordinator to a keeper, reading and writing blocks of two relations and submitting and receiving TABLE II messages with tuples. PARAMETERS USED FOR COMPUTING OF COSTS B(R) + B(S) + T (R) + T (S) T(R) = T(S) 256 50000 100000 TCP U = 2 + 2 (16) |W | = |K| 5 |K| B(R) 16 1000 5000 B(S) 16 500 1000 The total cost of the execution of recovering work of a failed b 16 20 100 keeper is the sum of Trecovery = (14) + (15) + (16) (17) Time of the execution in ms C. Failure of a Worker 600 514 514 514 514 If a worker becomes unavailable at building phase, the coordinator submits a notification in the following heartbeat 400 message stop communicating with an unavailable worker. If worker i mod |W | is in unreachable state at the probing phase, the coordinator sends out a message to worker (i + 1) 200 mod |W | so that it may take over task of joining tuples. 103 The cost of communication: 1 0 Tmsg = 1 (18) I/O cost is composed of the following assumption. During Normal Keeper Worker performing join operation, once a worker i mod |W | sends work failed failed an outcome tuple to the coordinator, it submits index of a tuple Classical distributed join Fault tolerant join V which just joined and sent to the coordinator. Worker (i+1) mod |W | starts reading from V + 1 tuple. Fig. 4. Comparison of communication time required to execute both algo- rithms. T(R) = T(S) = 256 T (S) |W | − V TI/O = 2 (19) b Section III, we analyze different objective functions separately. CPU cost is the sum of performing sending and receiving Table II provides parameters used during evaluations. messages, reading untreated tuples and comparing them Working with disks in fail-free mode of work, the execution T (S) ( T|W (R) T (S) of fault tolerant join takes from 5 to 11% more time than |W | − V | − V )( |W | − V ) TCP U = 2 + + (20) classical join takes. This is due to tuples of the second table b I(A) have to be written and marked as reserved. In the rest cases, The total cost to recover work of a failed worker is the sum fault tolerant join algorithm works with disks 97% time faster. of As for CPU cost, classical join wins in term of time of the Trecovery = (18) + (19) + (20) (21) execution at fail-free mode. 81% time less requires to recover work of the execution of client query if a keeper fails. In case VII. E VALUATION AND C OMPARISON of a worker fails, it will take 93% less time to assign a task In this section we computed cost of both algorithms. of a failed site to another worker. The average time of the execution of joining both tables is The more fascinating results are got when comparing com- defined by introducing the probability P that the total time of munication costs. In fail-free case, time required to pass data the execution is the sum of probabilities of fail-free execution across all sites for both algorithms equals. When a keeper and total time of work required to recover a work at a failed fails, fault tolerant algorithm works 80% time faster than site at any phase. classical algorithm works. In case of a worker fails, fault The formula below depicts the average time of execution tolerant join algorithms demonstrates impressive time of the The average time = P ∗ N W + (1 − P ) ∗ RW (22) execution. It will 99% faster than its competitor. The results of communication evaluations are shown in Figures 4, 5, 6. where NW is the total time of fail-free execution of an algorithm whereas RW is the total time of recovering work. In VIII. C ONCLUSION case of fault tolerant join algorithm, under RW we assume the In this paper, we introduced a fault-tolerant distributed execution of recovery work at any phase described in Section hash-join algorithm. Cost model has been provided, classical VI. For classical distributed join, the recovery work is to re- and fault tolerant algorithms are compared with each other. execute the entire query across all sites. In case of failure-free, the unstable algorithm demonstrates We consider the following cases: fail-free work, a keeper fails, better results. In contrast to classical distributed hash join, and a worker fails. For simplicity, we assume that the proba- estimations showed that fault tolerant join algorithm takes bility of fault tolerant work of any site equals. As we said in precedence over unreliable distributed hash join algorithm. ·105 [4] B. Catania and L. Jain, Advanced Query Processing: An Introduction, 01 2012, vol. 36, pp. 1–13. Time of the execution in ms 1 1 1 1 [5] G. Graefe, “Query evaluation techniques for large databases,” ACM 1 Comput. Surv., vol. 25, no. 2, pp. 73–169, Jun. 1993. [Online]. Available: http://doi.acm.org/10.1145/152610.152611 [6] C. Barthels, I. Müller, T. Schneider, G. Alonso, and T. Hoefler, “Distributed join algorithms on thousands of cores,” Proc. VLDB Endow., vol. 10, no. 5, pp. 517–528, Jan. 2017. [Online]. Available: 0.5 https://doi.org/10.14778/3055540.3055545 [7] C. Kim, T. Kaldewey, V. W. Lee, E. Sedlar, A. D. Nguyen, N. Satish, J. Chhugani, A. Di Blas, and P. Dubey, “Sort vs. hash revisited: 0.2 Fast join implementation on modern multi-core cpus,” Proc. VLDB Endow., vol. 2, no. 2, pp. 1378–1389, Aug. 2009. [Online]. Available: 1 · 10−5 https://doi.org/10.14778/1687553.1687564 0 [8] A. S. Foundation. (2019) Apache hadoop. [Online]. Available: https://hadoop.apache.org/ [9] M. T. zsu and P. Valduriez, Principles of Distributed Database Systems, Normal Keeper Worker 3rd ed. Springer Publishing Company, Incorporated, 2011. work failed failed [10] J. Gray, P. McJones, M. Blasgen, B. Lindsay, R. Lorie, T. Price, F. Putzolu, and I. Traiger, “The recovery manager of the system r Classical distributed join Fault tolerant join database manager,” ACM Comput. Surv., vol. 13, no. 2, pp. 223–242, Jun. 1981. [Online]. Available: http://doi.acm.org/10.1145/356842.356847 Fig. 5. Comparison of communication time required to execute both algo- [11] G. Gardarin and P. Valduriez, “Join and semijoin algorithms for a rithms. T(R) = T(S) = 50.000 multiprocessor database machine,” ACM Transactions on Database Systems, vol. 9, 03 1984. [12] C. Balkesen, G. Alonso, J. Teubner, and M. T. Özsu, “Multi- ·105 core, main-memory joins: Sort vs. hash revisited,” Proc. VLDB Endow., vol. 7, no. 1, p. 85–96, Sep. 2013. [Online]. Available: 2 2 2 2 https://doi.org/10.14778/2732219.2732227 Time of the execution in ms 2 [13] J. Teubner and G. Alonso, “Main-memory hash joins on modern processor architectures,” IEEE Transactions on Knowledge and Data Engineering, vol. 27, no. 7, pp. 1754–1766, July 2015. [14] C. Doulkeridis and K. Norvaag, “A survey of large-scale analytical query processing in mapreduce,” The VLDB Journal, vol. 23, no. 3, pp. 355– 380, Jun. 2014. [Online]. Available: http://dx.doi.org/10.1007/s00778- 1 013-0319-9 [15] P. Costa, M. Pasin, A. Bessani, and M. Correia, “Byzantine fault-tolerant 0.4 mapreduce: Faults are not just crashes,” 11 2011, pp. 32–39. [16] H. Zhu and H. Chen, “Adaptive failure detection via heartbeat under hadoop,” 12 2011, pp. 231–238. 1 · 10−5 [17] F. Wang, J. Qiu, J. Yang, B. Dong, X. Li, and Y. Li, “Hadoop high 0 availability through metadata replication,” in Proceedings of the First International Workshop on Cloud Data Management, ser. CloudDB ’09. Normal Keeper Worker New York, NY, USA: ACM, 2009, pp. 37–44. [Online]. Available: work failed failed http://doi.acm.org/10.1145/1651263.1651271 [18] V. Singh, “Multi-objective parametric query optimization for distributed Classical distributed join Fault tolerant join database systems,” in Proceedings of Fifth International Conference on Soft Computing for Problem Solving, M. Pant, K. Deep, J. C. Bansal, A. Nagar, and K. N. Das, Eds. Singapore: Springer Singapore, 2016, Fig. 6. Comparison of communication time required to execute both algo- pp. 219–233. rithms. T(R) = T(S) = 100.000 [19] D. Ongaro and J. Ousterhout, “In search of an understandable consensus algorithm,” in Proceedings of the 2014 USENIX Conference on USENIX Annual Technical Conference, ser. USENIX ATC’14. Berkeley, CA, With double copying tuples of relations to servers, where join USA: USENIX Association, 2014, pp. 305–320. [Online]. Available: http://dl.acm.org/citation.cfm?id=2643634.2643666 is performed, we may omit abrupt failures of workers and keep [20] S. H. Son, “Replicated data management in distributed database on joining tuples in available ones. systems,” SIGMOD Rec., vol. 17, no. 4, pp. 62–69, Nov. 1988. Future work includes adapting our algorithm to data skew. [Online]. Available: http://doi.acm.org/10.1145/61733.61738 [21] G. Alonso and B. Kemme, “How to select a replication protocol We will also consider implementing fault tolerant algorithm according to scalability, availability and communication overhead,” 08 and conduct experiments with other RDBMS. 2001. R EFERENCES [1] A. S. Tanenbaum and M. v. Steen, Distributed Systems: Principles and Paradigms (2Nd Edition). Upper Saddle River, NJ, USA: Prentice-Hall, Inc., 2006. [2] S. K. Rahimi and F. S. Haug, Distributed Database Management Systems: A Practical Approach. Wiley-IEEE Computer Society Pr, 2010. [3] A. Avizienis, J.-C. Laprie, B. Randell, and C. Landwehr, “Basic concepts and taxonomy of dependable and secure computing,” IEEE Trans. Dependable Secur. Comput., vol. 1, no. 1, pp. 11–33, Jan. 2004. [Online]. Available: https://doi.org/10.1109/TDSC.2004.2