Integrating Massive Data Streams George Siachamis supervised by Geert-Jan Houben, Arie van Deursen and Asterios Katsifodimos Delft University of Technology g.siachamis@tudelft.nl ABSTRACT Recognizing the growing need for efficient streaming data in- Data Integration has been a long-standing and challenging problem tegration, in this project we aim at providing scalable streaming for enterprises and researchers. Data residing in multiple hetero- methods to facilitate integrating massive data streams. Essentially, geneous sources must be integrated and prepared such that the a streaming data integration pipeline consists of multiple basic data valuable information that it carries, can be extracted and analysed. integration tasks adapted and optimized to handle data streams. For However, the volume and the velocity of the produced data in addi- this doctoral work, our efforts focus on three main tasks: stream tion to the modern business needs for real-time results have pushed profiling, stream discovery and similarity joins. We recognize that data analytics, and therefore data integration, towards data streams. these tasks play a major role in data integration pipelines and they While data integration is a hard problem in and of itself, integrating are integral for the efficiency of those pipelines. On the one hand, data streams becomes even more challenging. Streams are charac- with stream profiling and stream discovery, temporal profiles can terized by their high velocity, infinite nature and predisposition to be computed and used for discovering possibly related streams to concept drift. optimize downstream data integration tasks. On the other hand, The goal of this doctoral work is to design and provide scalable similarity joins are an integral data integration task with applica- methods to support data integration tasks on massive data streams, tions to data cleaning and entity resolution but not limited to those. i.e., support streaming data integration. The aim of this work is Summarizing, we aim at adapting and optimizing these basic tasks threefold. First, we aim at developing and proposing streaming for data streams in order to improve the efficiency of a streaming methods to compute temporal stream data-profiles and summaries data integration pipeline. Our ultimate goal is to enable the real- that can describe the dynamic state of a stream in the course of time results needed to ensure the quality of real-time data analytics. time. Second, we aim at developing methods and metrics of stream We plan on evaluating our work in real-word use cases provided similarity. Those methods and metrics can serve as means to detect by our industrial partner, ING. similar or complementary streams in a streaming data lake. Finally, we aim at optimizing distributed streaming similarity joins - a very important operation that precedes entity linking and resolution. 2 MOTIVATION & CHALLENGES This paper discusses exciting challenges and open problems in the In many modern enterprises, different teams publish their stream- field, and a research plan on tackling them. ing datasets to an internal streaming data lake where other teams can access them. However, it is very rare for the published streams to come with valuable time-related metadata. This results to hun- dreds or even thousands of data streams published in an internal 1 INTRODUCTION repository but never harnessed because of lacking documentation Modern enterprises are gathering huge volumes of data either to and valuable metadata. Unfortunately at the moment of writing, the perform business analytics or manage efficiently their assets. This only way of organising and harnessing all these streams is through data resides in disparate sources and although it can convey the long valuable labour hours of manual exploration and integration. same or related information, it can differ considerably in structure The existing tools cannot deal with the massive rate of thousands and representation based on the different conventions of the man- of incoming data per second that usually characterizes modern data aging teams. This untamed heterogeneity leads to the so called streams. Another major challenge is also the fact that streams are data integration problem. Traditionally, data integration has been possibly unbounded datasets meaning that their volume is growing a manual process upon targeted static sources. The last decades, infinitely. This also means that are not at our disposal at their full plenty of research time has been invested to automate and improve prior to their processing, i.e., the whole dataset is not available the accuracy of data integration tasks [11, 24, 25]. At the same time, when it starts to be processed. All of these in combination with the a fair amount of work [14, 15, 19] has been done towards improv- fact that data streams might suffer from concept drift every other ing the efficiency of data integration tasks in static and dynamic day renders the task of handling these data streams too complex databases. However, due to the ever-growing volume and velocity for existing tools. of produced data as well as the demand for data-driven real-time An inspiring example, that we also recognized through our in- applications, streaming data analytics have emerged. To ensure the dustrial collaboration with ING, is the monitoring of crucial in- quality of these analytics, streaming data must be prepared and frastructure. Modern enterprises consist of multiple teams which integrated in a real-time fashion. monitor their own assets and provide alerts for the incident teams. Although these assets might differ considerably in their represen- Proceedings of the VLDB 2021 PhD Workshop, August 16th, 2021. Copenhagen, Den- mark. Copyright (C) 2021 for this paper by its authors. Use permitted under Creative tation in the data provided by each team, they are often closely Commons License Attribution 4.0 International (CC BY 4.0). related. For any enterprise, it is crucial that an occurring incident properties of the current state of the stream. These profiles are Time constantly updated in an incremental manner to reflect the changes Stream S1 in the content of the streams. ... ... Finding Related Streams. Based on previously created profiles, Stream S2 our stream discovery method computes the similarity between ... ... streams in our streaming data lake indicating streams that have a Stream S3 high chance of being related. These indications of relatedness are ... ... adaptively updated as the streams change over time. The resulting related streams are either presented to the end user or used to guide ... the downstream tasks of a workflow. Stream S4 Joining Similar Records. Our streaming similarity join method ... ... takes as input two or more streams and outputs all possible record Stream S5 matches. In order to identify and join the similar records, our ... ... method computes a similarity score through a similarity function. In this project, the primary focus is to optimize this similarity com- parison to significantly improve the efficiency of the streaming integration tasks. The results of our similarity join task can be used as part of one of the entity resolution pipelines described in [17]. In the rest of the section, we go into details about the individual Profiles tasks that the methods perform and we discuss the related work. 3.2 Profiling Streams Figure 1: A streaming data lake applying our streaming An important step before integrating streams is to compute data methods. For each stream a temporal profile is computed. profiles for all the streams in a streaming data lake. Depending on These profiles are used to find similar streams (streams with the nature of the downstream tasks, different types of data profiles the same coloring). The green arrows indicate the matching can be of interest. In this project we mainly focus on two types: records that the similarity join between the related streams basic statistics-based profiles and summaries/sketches. outputs. Basic statistics-based profiles are easier to compute but also less informative for tasks like a similarity join. These profiles typically will be resolved as fast as possible. It is also crucial that in the pro- contain information like the cardinalities, the value distributions or cess of resolving any occurring issue, the involved engineers will the data types of columns. This information can be used to reduce not miss any relevant information. In other words, the monitoring the combinations of items to be checked from downstream tasks, e.g. streams must be integrated in real-time and in an exact manner. combinations of attributes or streams to be checked for similarity. In this project, our goal is to face these challenges and provide This can be done either in a stream level by identifying streams methods that will bring us one step closer to a tool that help a com- with common statistical properties or in column level by narrowing pany tame the streams and take advantage of their rich information. down the column combinations to be examined. For example, the To ensure the usefulness of our work, we also plan on evaluating it value distributions of columns can help identify candidate pairs of on a relevant to the given example monitoring case in the industrial columns on which a downstream similarity join will be performed. environment of our partner, ING. On the other hand, sketches and summaries [7, 13] are harder to create but they can give a good estimate of the contents of a 3 SCALABLE METHODS FOR STREAMING stream. Sketches or summaries can be very useful in various prob- DATA INTEGRATION lems like approximate and streaming query processing or dataset discovery. In [16] summaries are computed for approximate query In this section, we discuss the main three work packages of this answering based on a probabilistic technique that makes use of doctoral work, our three scalable streaming methods. First we give Maximum Entropy. In [13], sketches are created for dataset discov- an overview of the envisioned methods. Then we present each task ery by leveraging bloom filters and a skip list. However, none of in details and we discuss the related work. the above techniques incorporates the time factor in their sketches. In addition, [16] is not targeting data streams and an adaptation is 3.1 Overview of Streaming Methods far from trivial due to the complexity of the technique. Our envisioned methods can be used either individually or in vari- According to [1] there is still much more ground to cover to per- ous combinations depending on the use case at hand. An overview form incremental, online and temporal profiling. For this project, of these methods is the following. it is important that our profiles are computed online and are con- Profiling Streams. Our proposed stream profiling method is de- stantly updated to capture the temporal properties of our streams. signed to work on top of a streaming data lake and provide approx- This is not an easy task when data streams are possibly endless, imate temporal profiles that will describe statistical and semantic fast and massive and their statistical values can change often. Time needs to be incorporated on the profiles and any process must be In what follows, we will discuss the main work in the related incremental to ensure efficiency. fields. Similarity Joins in Map Reduce. Similarity joins have been stud- ied a lot for MapReduce environments. In general, MapReduce 3.3 Finding Related Streams methods require their inputs at their full before processing, and After computing the desired profile for each stream, we must iden- most of them leverage statistics and properties of the datasets to tify which of our streams are related based on the profiled infor- optimize the task and reduce the transmission and computation mation. Depending on the type of the collected profiles, different costs. They provide a one-off partitioning scheme which cannot be strategies can be employed. updated adaptively on runtime. Thus, they are not trivially appli- A simple example, in the case of sketches, is the simple procedure cable on a streaming environment but they are a great inspiration described in [13]. Here, the authors suggest to use the computed towards a distributed streaming solution. sketches to acquire a simple estimation of overlapping values be- There are two main approaches which MapReduce methods usu- tween two sources by computing the overlap between the way ally follow: Filter & Verification and General Metric Space.The Filter smaller corresponding sketches. This way an estimation of whether & Verification methods [20],[10] rely on prefices and signatures two sources must be integrated or not can be made, as well as an which they leverage to scale out the similarity computations and estimation of the cost of this integration. However, the described filter unnecessary comparisons. On the other hand, General Met- scenario is pretty simple, it uses only sketches and does not incor- ric space methods [21],[8] divide the metric space in partitions porate time-related capabilities like temporal queries. to which similar objects are grouped. However, they require that A better example to showcase the use of profiles though, is the the similarity function is a metric, or at least a semi-metric. More data discovery system Aurum [5]. In Aurum, the authors propose specifically, [8] and [21] select random centroids, create an inner creating a knowledge graph based on previously computed profiles. and an outer partition for each by using centroid proximity and The knowledge graph is afterwards queried for dataset discovery filters, and compute a)the similarity of all pairs of items in an inner purposes. The computed profiles consists of multiple statistical partition and b)the similarity of each item of an outer partition with properties, discovered dependencies and sketches. These profiles all the items within the corresponding inner partition. are used from Aurum to prune the search space and reduce the Similarity Joins for Data Streams. On the other hand, research similarity comparisons needed to build the knowledge graph. The on similarity joins in a streaming environment is very limited. [9] authors have opted for a scalable parallel solution that reads the introduces the problem of streaming similarity self-join. It proposes input once, and they also provide a mechanism for keeping up-to- a similarity measure which filters out old items and a streaming date both the profiles and the knowledge graph. However, Aurum framework that leverage an optimized for streaming data state-of- is not designed for streams and does not provide temporal features the-art inverted index. However, the proposed solution runs on a both in its profiles and its knowledge base. Thus, temporal queries single machine and thus unable to multiple massive streams for are not supported. In a streaming environment and especially for scalability reasons. a crucial task like monitoring infrastructure, such a capability is To the best of our knowledge, the only work dealing with dis- essential. tributed streaming similarity joins is [23]. It proposes a distributed streaming similarity join framework that employs a length-based 3.4 Joining Similar Streaming Records filter to distribute the data across a cluster of nodes. Because the lengths of the incoming tuples might change over time, an adaptive Similarity join [3, 6, 22] is the problem of identifying all pairs of algorithm is also proposed to recalculate the bounds for the length similar records that reside in two or more datasets. A pair of records segments based on the online collected statistics. In addition, in is considered similar if the similarity score given by a similarity order to reduce the computations in a node, an inverted index is function is above a given threshold. In a stream processing model, built accompanied with a bundle structure to reduce the indexed the similarity join operation between two given streams is expected records. However, the authors consider full history joins without to join similar records from the incoming streams based on the proposing any retention policy which is crucial when dealing with values of one or more target attributes. In a streaming environment endless streams. In addition, the proposed length-based filter will we can distinguish two types of similarity joins: the full-history struggle to scale out efficiently when all the incoming sets are of and the windowed joins. similar length. Similarity joins are difficult and time-consuming operations. The brute force approach has to compare all the data of the first dataset Load-Balancing on Streams. Load balancing is a native concern against the data on the second, leading to a quadratic time com- in distributed stream processing environments, since the statisti- plexity, O (๐‘› 2 ). When we take into account that a data stream is a cal properties of the data change frequently and the systems need possibly unbounded dataset, it is clear that a brute force solution is to adapt to achieve full potential. To ensure load balancing, [12] infeasible in a streaming environment. Thus, it is essential that the proposes a new dataflow join operator that can adaptively distrib- performed comparisons between records are reduced by avoiding ute records to nodes and perform state repartitioning through a unnecessary computations. Additionally, due to the dynamic nature locality-aware migration strategy. In [18], a streaming variation of of data streams any occurring concept drift might result on obsolete the HyperCube algorithm [2] is presented. The authors divide the partitions and load skew. To ensure the high efficiency of the task, incoming records to heavy and light hitters and process each heavy we must adaptively partition the data online. hitter in a Hypercube Grid. The characterization of a tuple as heavy or light hitter, as well as the size of each HyperCube grid is adapted [2] F. N. Afrati and J. D. Ullman. Optimizing joins in a map-reduce environment. In to the statistics that are gathered online for the join values. These Proceedings of the 13th International Conference on Extending Database Technology, EDBT โ€™10, page 99โ€“110, New York, NY, USA, 2010. Association for Computing works focus on distributing the load and do not present solutions Machinery. for reducing the needed computations. However, a unified solution [3] A. Arasu, V. Ganti, and R. Kaushik. Efficient exact set-similarity joins. In Proceed- ings of the 32nd international conference on Very large data bases, pages 918โ€“929, with a load balancing scheme tailored to the distribution scheme 2006. can achieve both goals and provide better results. [4] R. Cappuzzo, P. Papotti, and S. Thirumuruganathan. Creating embeddings of heterogeneous relational datasets for data integration tasks. In Proceedings of the Partitioning Statistics 2020 ACM SIGMOD International Conference on Management of Data, SIGMOD Tuning Capturing โ€™20, page 1335โ€“1349, New York, NY, USA, 2020. Association for Computing Machinery. [5] R. Castro Fernandez, Z. Abedjan, F. Koko, G. Yuan, S. Madden, and M. Stonebraker. Physical Logical Similarity Output of Joined Input Streams Partitioning Partitioning Computation Records Aurum: A data discovery system. In 2018 IEEE 34th International Conference on Data Engineering (ICDE), pages 1001โ€“1012, 2018. [6] S. Chaudhuri, V. Ganti, and R. Kaushik. A primitive operator for similarity joins Figure 2: The Similarity Join process in data cleaning. In 22nd International Conference on Data Engineering (ICDEโ€™06), pages 5โ€“5, 2006. [7] G. Cormode, M. Garofalakis, P. J. Haas, and C. Jermaine. Synopses for mas- Our approach.To tackle the streaming challenges we propose a sive data: Samples, histograms, wavelets, sketches. Foundations and Trendsยฎ in Databases, 4(1โ€“3):1โ€“294, 2011. partitioning scheme based on inner and outer partitions inspired [8] A. Das Sarma, Y. He, and S. Chaudhuri. Clusterjoin: A similarity joins framework by the general metric space approaches from MapReduce. First, we using map-reduce. Proc. VLDB Endow., 7(12):1059โ€“1070, Aug. 2014. partition the incoming records from our input streams to different [9] G. De Francisci Morales and A. Gionis. Streaming similarity self-join. Proc. VLDB Endow., 9(10):792โ€“803, June 2016. physical nodes based on the proximity to a nodeโ€™s representative [10] D. Deng, G. Li, S. Hao, J. Wang, and J. Feng. Massjoin: A mapreduce-based method centroid. Afterwards, we create tighter partitions within a node for scalable string similarity joins. In 2014 IEEE 30th International Conference on Data Engineering, pages 340โ€“351, 2014. in a logical way by leveraging the provided similarity threshold [11] M. Ebraheem, S. Thirumuruganathan, S. Joty, M. Ouzzani, and N. Tang. Dis- and creating new logical partitions based on the incoming records. tributed representations of tuples for entity resolution. Proc. VLDB Endow., The last step of our workflow is the actual similarity computation 11(11):1454โ€“1467, July 2018. [12] M. ElSeidy, A. Elguindy, A. Vitorovic, and C. Koch. Scalable and adaptive online and the output of the wanted joined pairs. The similarity compu- joins. page 16, 2014. tations are restricted to candidate pairs through our tight logical [13] D. Karapiperis, A. Gkoulalas-Divanis, and V. S. Verykios. Summarization algo- partitions. In additions, in order to ensure adaptivity, we collect rithms for record linkage. In M. H. Bรถhlen, R. Pichler, N. May, E. Rahm, S. Wu, and K. Hose, editors, Proceedings of the 21st International Conference on Extending statistics from the similarity computations sub-task to re-calibrate Database Technology, EDBT 2018, Vienna, Austria, March 26-29, 2018, pages 73โ€“84. both our logical and physical partitions. It is important to notice OpenProceedings.org, 2018. [14] L. Kolb, A. Thor, and E. Rahm. Dedoop: Efficient deduplication with hadoop. that in this project we optimize for high dimensional data. Based Proc. VLDB Endow., 5(12):1878โ€“1881, Aug. 2012. on the latest advances in Deep Learning-based Entity Resolution [15] L. Kolb, A. Thor, and E. Rahm. Multi-pass sorted neighborhood blocking with [4, 11, 25], word embeddings can be used to capture more effectively mapreduce. Comput. Sci., 27(1):45โ€“63, Feb. 2012. [16] L. Orr, M. Balazinska, and D. Suciu. Entropydb: a probabilistic approach to the similarities between records. However, due to their high dimen- approximate query processing. VLDB Journal International Journal on Very Large sionality, the similarity computations between two embeddings is Data Bases, 29(1), 2020. very inefficient. We aim to tackle this inefficiency enabling the use [17] G. Papadakis, G. Mandilaras, L. Gagliardelli, G. Simonini, E. Thanos, G. Gian- nakopoulos, S. Bergamaschi, T. Palpanas, and M. Koubarakis. Three-dimensional of word embeddings for similarity joins on streams. entity resolution with jedai. Information Systems, 93:101565, 2020. [18] Y. Qiu, S. Papadias, and K. Yi. Streaming hypercube: A massively parallel stream Evaluation. We plan to evaluate our solution on datasets from join algorithm. In M. Herschel, H. Galhardas, B. Reinwald, I. Fundulaki, C. Binnig, [23], real-world datasets provided by our industrial partner, and and Z. Kaoudi, editors, Advances in Database Technology - 22nd International synthetic datasets. We plan to compare it against the discussed state Conference on Extending Database Technology, EDBT 2019, Lisbon, Portugal, March 26-29, 2019, pages 642โ€“645. OpenProceedings.org, 2019. of the art solution [23], and baselines from the general metric space. [19] A. Saeedi, M. Nentwig, E. Peukert, and E. Rahm. Scalable matching and clustering Since our goal is to provide real-time results, our evaluation will be of entities with famer. Complex Systems Informatics and Modeling Quarterly, based on metrics like throughput and latency. (16):61โ€“83, 2018. [20] R. Vernica, M. J. Carey, and C. Li. Efficient parallel set-similarity joins using mapreduce. SIGMOD โ€™10, page 495โ€“506, New York, NY, USA, 2010. Association 4 CONCLUSION for Computing Machinery. [21] Y. Wang, A. Metwally, and S. Parthasarathy. Scalable all-pairs similarity search Summarizing, in this paper we present the exciting challenges and in metric spaces. KDD โ€™13, page 829โ€“837, New York, NY, USA, 2013. Association open problems of streaming data integration and a research plan that for Computing Machinery. [22] C. Xiao, W. Wang, X. Lin, J. X. Yu, and G. Wang. Efficient similarity joins for aims to provide scalable methods to tackle them. We discuss three near-duplicate detection. ACM Trans. Database Syst., 36(3), Aug. 2011. envisioned methods to perform profiling, discovery and similarity [23] J. Yang, W. Zhang, X. Wang, Y. Zhang, and X. Lin. Distributed streaming set joins on streams. In addition, we present related work for each similarity join. In 2020 IEEE 36th International Conference on Data Engineering (ICDE), pages 565โ€“576. IEEE, 2020. task and we discuss their limitations when dealing with streams. [24] M. Zhang, M. Hadjieleftheriou, B. C. Ooi, C. M. Procopiuc, and D. Srivastava. Finally, we shortly present our approach for distributed streaming Automatic discovery of attributes in relational databases. In Proceedings of the similarity joins by leveraging adaptive data partitions. 2011 ACM SIGMOD International Conference on Management of Data, SIGMOD โ€™11, page 109โ€“120, New York, NY, USA, 2011. Association for Computing Machinery. [25] W. Zhang, H. Wei, B. Sisman, X. L. Dong, C. Faloutsos, and D. Page. Autoblock: REFERENCES A hands-off blocking framework for entity matching. In Proceedings of the [1] Z. Abedjan, L. Golab, and F. Naumann. Data profiling: A tutorial. In Proceedings of 13th International Conference on Web Search and Data Mining, WSDM โ€™20, page the 2017 ACM International Conference on Management of Data, SIGMOD โ€™17, page 744โ€“752, New York, NY, USA, 2020. Association for Computing Machinery. 1747โ€“1751, New York, NY, USA, 2017. Association for Computing Machinery.