Building a Scalable Distributed Online Media Processing Environment Shadi A. Noghabi advised by: Roy H. Campbell, Indranil Gupta University of Illinois at Urbana-Champaign, {abdolla2, rhc, indy}@illinois.edu ABSTRACT no data loss or unavailability) from a distributed storage at Media has become dominant in all aspects of human lives, YouTube. This, is only one example of the need for process- from critical applications such as medical, military, and se- ing and storing large media objects. Many other examples curity (e.g. surveillance cameras) to entertainment appli- can be found in a wide range of applications from medi- cations such as social media and media sharing websites. cal imagery and surveillance cameras to social networks and Billions of massive media objects (e.g., videos, photos, doc- online shopping. uments, etc.) are generated every second with high diversity Handling this massive amount of media poses a number among them (in terms of sizes and formats). These objects of unique challenges. First, a diverse range of media (pho- have to be stored and retrieved reliably, with low latency tos, videos, documents, etc.) with various sizes (from a few and in a scalable while efficient fashion. Additionally, var- KBs to a few GBs), should be processed efficiently at the ious types of processing are done on media objects, from same time. Second, there is an ever-growing amount of data simple compressions and format conversion, to more com- that needs to be stored, served, and processed in a linearly plex machine learning algorithms detecting certain patterns scalable fashion. Third, in many applications (e.g., machine and objects. learning applications), processing is both data and CPU in- Existing large-scale storage and processing systems face tensive, causing several difficulties in resource scheduling. several challenges when handling media objects. My re- My research focuses on building an unified online environ- search focuses on building an unified storage and processing ment tailored specifically for storing and processing these environment tailored specifically for media objects, while diverse massive media objects, while maintaining efficiency maintaining high efficiency and scalability. I have built a and scalability. scalable, load-balanced, efficient storage system optimized First, we need to store this ever-growing enormous amount for media objects based on their unique access patterns. of media efficiently. Existing distributed storage systems, Currently, I am working on developing an efficient media including file systems [21, 26] and key value stores [12, 16], processing system and integrating these two systems into face several challenges when serving media objects. These one framework. systems impose additional unnecessary overhead (such as rich metadata), and are not efficient in handling both mas- sive GB objects and many small objects, at the same time. Keywords Therefore, we need a scalable storage system tailored specif- Media Processing, Distributed Storage, Online Processing ically for diverse media objects. Additionally, media objects have to be processed in a 1. INTRODUCTION timely manner. Various types of processing is conducted on media objects including: pattern matching (e.g., detecting During the past decade, media has become extremely pop- pornography), categorization (e.g., tagging photos), corre- ular. Based on [7], hundreds of hours of videos (' hundreds lation detection (e.g., recognizing a burglary in a network of GBs) are uploaded per minute in YouTube, the largest of surveillance cameras), compression and resizing, format Internet video database. These videos have to be processed, conversion, and matching media objects (e.g., deduplica- reformatted, compressed, verified, and categorized, while tion and checking copy-write). There has been extensive being uploaded. Moreover, hundreds of thousands of hours research on optimizing each of these applications, such as of videos are viewed per minute, from all around the globe. OpenCV and ShapeLogic [3, 4], focusing on a single ma- All these videos should be stored and retrieved reliably (with chine. Recently, the trend has moved toward distributed environments. HIPI, MIPr, and 4Quant [22, 23, 2, 6], have been developed as distributed offline media processing sys- tems. However, in many use cases, especially in sensitive areas such as security and medical, we need real-time pro- cessing. Even in less critical areas, such as social networks, delays (more than a few seconds or minutes) could cost mil- Proceedings of the VLDB 2016 PhD Workshop, September 9, 2016. New lions of dollars. There has been some effort performing real- Delhi, India. time media processing by building libraries on top of ex- Copyright (c) 2016 for this paper by its authors. Copying permitted for isting distributed online frameworks [5]. However, current private and academic purposes. online frameworks are not optimized for massive media ob- Social networks are one of the biggest sources of media jects since they do not focus on minimizing data movement. objects, with hundreds of millions of users continually up- Therefore, we need an online processing framework designed loading and viewing photos, videos, etc. Typically, these and optimized with the goal of processing large media ob- media objects are written once, frequently accessed, never jects. modified, and rarely deleted. We leveraged this immutable read-heavy access pattern of media objects towards Am- bry. Ambry is a scalable distributed storage designed for efficiently handling both massive media objects (GBs) and 2. MEDIA STORAGE large number of small media objects (KBs). Ambry utilizes As the first step in my research, I have been focusing on techniques such as decentralized design, asynchronous repli- designing an efficient storage for media objects. Handling cation, rebalancing mechanisms, zero-cost failure detection, media poses a number of unique challenges. First, due to and OS caching. Using these techniques, Ambry provides diversity in media types, media object sizes vary significantly high throughput (utilizing up to 88% of the network) and from tens of KBs (e.g., profile pictures) to a few GBs (e.g., low latency (less than 50 ms latency for 1 MB object), while videos). The system needs to store both massive media ob- maintaining load balancing amongst nodes. jects and a large number of small media objects efficiently. Second, there is an ever-growing number of media that need to be stored and served. This rapid growth in requests mag- 3. PROCESSING MEDIA OBJECTS nifies the necessity for a linearly scalable system (with low As the next step, I am currently working on building a overhead). Third, the variability in workload and cluster ex- distributed online processing system, optimized for media pansions can create unbalanced load, degrading the latency objects. Distributed stream processing systems have been and throughput of the system. This creates a need for load- designed for processing enormous amount of data in a near balancing. Finally, data has to be stored and retrieved in real-time fashion [24, 9, 19, 27]. Conceptually, these sys- a fast, durable, and highly available fashion. For example, tems are a great fit for media processing. Stream processing when a user uploads a media object in social network, all systems are capable of processing massive amount of data in his/her friends from all around the globe should be able to parallel, as the data is generated in a near real-time fashion. see the media object with very low latency, even if parts of However, existing systems are not optimized for media the internal infrastructure fail. To provide these properties, and incur a lot of data movement. Many of these system data has to be reliably replicated in multiple datacenters, include multiple stages of data copy and/or fetching data while maintaining low latency for each request. over the network. Although data movement may not be a Several systems have been designed for handling a large dominant factor for processing small data, this is not true amount of data, but none of them satisfactorily meet the for massive media objects. requirements and scale media processing needs. There has One of the main causes of data movement is reading and been extensive research into distributed file systems [20, 17, writing data from an external storage system (remote state), 14, 21, 26]. As pointed out by [11, 15], the unnecessary as opposed to supporting fault-tolerant locally stored data additional capabilities these systems provide, such as the (local state). For example, for providing exactly-once guar- hierarchical directory structure and rich metadata, are an antees2 , Millwheel [9] queries Bigtable [12] on each message overkill for a media storage. it receives to confirm that message has not been processed Many key value stores [13, 16, 10, 12] have also been before. Although an external storage provides faster boot- designed for storing a large number of objects. Although strap and recovery time, it increases latency, consumes net- these systems can handle many small objects, they are not work bandwidth, and can cause denial of service (DOS) for optimized for storing large objects (tens of MBs to GBs). the external storage. Further, they impose extra overhead for providing consis- Currently, I am collaborating with LinkedIn on develop- tency guarantees while these are typically not needed for ing Samza, a scalable distributed stream processing system immutable data. Some examples of these overheads include supporting local state. Samza provides fault-tolerant local using vector clocks, conflict resolution mechanism, logging, state by using local database instances in each node, along and central coordinators. with a compacted changelog for failure recovery. Each lo- A few systems have been designed specifically for large cal database instance stores data on disk, providing TBs of immutable objects including Facebook’s Haystack [11] along space per machine. Additionally, by batching writes and with f4 [18] and Twitter’s Blob Store [25]. However, these caching popular data in memory, it reaches performance systems either become imbalanced (under-utilizing some of close to an in memory storage. By using local state, we the nodes) or do not scale beyond a point. can implement exactly-once guarantees via storing processed Thus, through a collaboration with LinkedIn, we devel- message ids locally (with very low latency), and replaying oped a scalable load-balanced distributed storage system the changelog if failures happen. built specifically for media objects (described below) 1 . Am- We ran an performance benchmark to evaluate the effect bry has been running in LinkedIn’s production environment of using local state, compared to using remote state. We for the past 2 years, serving up to 10K requests per second used two workloads across more than 400 million users. We have published our work “Ambry: LinkedIn’s Scalable Geo-Distributed • ReadWrite: similar to a word count application read- Object Store” in SIGMOD 2016 [8]. ing the current count of a specific word and updating. 1 2 The project is open-source and the code can be found at Exactly-once guarantees means processing each message [1]. exactly once, even in presence of failures and late arrivals. 10000000 is not sufficient for providing an efficient media-processing 1000000 environment. Due to the large media sizes, the local state may not be enough for storing all the data associated with Throughput ( messages/s) 100000 an application. 10000 For example, assume we have a continuously changing ref- ReadWrite erence set of fraud media, and a fraud detection application 1000 that compares recently posted media against a subset of the ReadOnly 100 reference set (based on a similarity metric such as RGB ra- tio). Using local state, we can partition the reference set 10 based on the similarity metric set across multiple machines; 1 access the set locally; and update the reference set when- Network local in- local Rocks local w/o remote DB ever needed. However, if the reference set grows too large memory DB caching (e.g., videos), the local state capacity will not be enough un- less by scaling to a bigger cluster with many underutilized Figure 1: Comparision of different stores in samza machines. I plan to overcome this issue by integrating the under two differnt workloads. This graph compares distributed storage system and the processing system into maximum throughput acivable by the network, lo- one framework. This framework will utilize local state as a cal inmemory storage, local rocks db storage, local cache for storing/retrieving media by offloading data to the rocks db with disabling caching, and using a remote storage system, whenever needed. storage. Additionally, processing even a single massive media ob- ject (e.g., a high quality video) in a timely manner can go beyond the capabilities of a single machine. Chunking large • ReadOnly: similar to a large join of a infinite stream media objects into smaller ones and processing chunks in and a table. This workload reads data from the storage parallel, mitigates this issue. However, chunking introduces and concatenating it with the input message. many challenges including: rebuilding the large object in a system where data chunks can be processed out of order or We ran the workload on a 4 node cluster of beefy ma- infinitely delayed; processing data without losing accuracy; chines, using multiple stores. Based on our initial results, and handling data dependency amongst data chunks. As my using local state improves throughput up to 100x compared future direction, I plan to provide built-in chunking and re- to using remote state, as shown in Figure 1. Using a local building mechanism for large media types, without affecting Rocks DB with caching enabled reaches almost the perfor- accuracy. mance of a local in memory store and both cases saturate the In a nutshell, as the future direction, I plan to further network. Even with no caching, Rocks DB reaches almost optimize Samza to efficiently handle diverse media types and half the throughput of a in-memory store. However, the sizes (ranging from a few KBs to a few GBs), and integrate remote storage is orders of magnitude (up to 100x) slower. Samza and Ambry into a unified scalable media storage and We also measured the latency for each test and it followed processing environment. a similar pattern. When using a local (Rocks DB or inmem- ory) store the latency per input data was a few microseconds in all test (8 to 30 microseconds). However, using a remote 5. REFERENCES store this latency was 3 orders of magnitude higher (4-10 [1] Ambry. http://www.github.com/linkedin/ambry, milliseconds). This is mainly because of the added delay for (accessed Mar, 2016). going over the network and data copies, and the overhead [2] HIPI: Hadoop image processing interface. of providing consistency in a fault-tolerant distributed store http://hipi.cs.virginia.edu/index.html, accessed (with multiple replicas). Mar, 2016. Moreover, we have developed a host affinity mechanism [3] OpenCV: Open source computer vision. that tries to reduce the failure recovery time by leveraging http://www.opencv.org/, accessed Mar, 2016. the state already stored on disk. This mechanism favors [4] ShapeLogic. http://www.shapelogic.org, accessed brining up failed containers on machines they were placed Mar, 2016. before, reusing the state stored on disk. Using host affin- [5] Stormcv. https://github.com/sensorstorm/StormCV, ity we are able to drop the recovery time 5-10x, reducing accessed Mar, 2016. the recovery time from several minutes to only a few sec- [6] Transforming images into information. onds. Also, using host affinity, the overhead of recovery is http://4quant.com, accessed Mar, 2016. almost a constant value irrespective of the size of the state [7] YouTube statistics. https://www.youtube.com/yt/ to be rebuilt. This is because only the fraction of the state press/en-GB/statistics.html, accessed Mar, 2016. not flushed to disk has to be rebuilt. Therefore, using this mechanism and local state we are able to provide fast fail- [8] S. A. Noghabi, S. Subramanian, P. Narayanan, ure recovery, close to a remote state, while not incurring S. Narayanan, G. Holla, M. Zadeh, T. Li, I. Gupta, the overhead of querying a remote database on each input and R. H. Campbell. Ambry: Linkedin’s scalable message processed. geo-distributed object store. In Proceeding of the ACM Special Interest Group on Management of Data (SIGMOD), 2016. 4. FUTURE RESEARCH [9] T. Akidau, A. Balikov, K. Bekiroğlu, S. Chernyak, Although local state significantly improves performance J. Haberman, R. Lax, S. McVeety, D. Mills, (specially for applications sensitive to data movement), it P. Nordstrom, and S. Whittle. Millwheel: fault-tolerant stream processing at internet scale. In mapreduce tasks. Chris. University of Virginia, 2011. Proceeding of the Very Large Data Bases Endowment [24] A. Toshniwal, S. Taneja, A. Shukla, K. Ramasamy, (VLDB), 2013. J. M. Patel, S. Kulkarni, J. Jackson, K. Gade, M. Fu, [10] A. Auradkar, C. Botev, S. Das, D. De Maagd, J. Donham, et al. Storm@ twitter. In Proceeding of the A. Feinberg, P. Ganti, L. Gao, B. Ghosh, ACM Special Interest Group on Management of Data K. Gopalakrishna, et al. Data infrastructure at (SIGMOD), 2014. LinkedIn. In Proceeding of the IEEE International [25] Twitter. Blobstore: Twitter’s in-house photo storage Conference on Data Engineering (ICDE), 2012. system. https://blog.twitter.com/2012/ [11] D. Beaver, S. Kumar, H. C. Li, J. Sobel, and blobstore-twitter-s-in-house-photo-storage-system, P. Vajgel. Finding a needle in Haystack: Facebook’s 2011 (accessed Mar, 2016). photo storage. In Proceeding of the USENIX [26] S. A. Weil, S. A. Brandt, E. L. Miller, D. D. Long, Symposium on Operating Systems Design and and C. Maltzahn. Ceph: A scalable, high-performance Implementation (OSDI), 2010. distributed file system. In Proceeding of the USENIX [12] F. Chang, J. Dean, S. Ghemawat, W. C. Hsieh, D. A. Symposium on Operating Systems Design and Wallach, M. Burrows, T. Chandra, A. Fikes, and Implementation (OSDI), 2006. R. E. Gruber. Bigtable: A distributed storage system [27] M. Zaharia, T. Das, H. Li, T. Hunter, S. Shenker, and for structured data. ACM Transactions on Computer I. Stoica. Discretized streams: Fault-tolerant Systems (TOCS), 26(2), 2008. streaming computation at scale. In Proceeding of the [13] G. DeCandia, D. Hastorun, M. Jampani, USENIX Symposium on Operating Systems Design G. Kakulapati, A. Lakshman, A. Pilchin, and Implementation (OSDI), 2013. S. Sivasubramanian, P. Vosshall, and W. Vogels. Dynamo: Amazon’s highly available key-value store. In Proceeding of the ACM SIGOPS Operating Systems Review (OSR), 2007. [14] S. Ghemawat, H. Gobioff, and S.-T. Leung. The Google File System. In Proceeding of the ACM SIGOPS Operating Systems Review (OSR), 2003. [15] Hortonworks. Ozone: An object store in HDFS. http://hortonworks.com/blog/ ozone-object-store-hdfs/, 2014 (accessed Mar, 2016). [16] A. Lakshman and P. Malik. Cassandra: A decentralized structured storage system. In Proceeding of the ACM SIGOPS Operating Systems Review (OSR), number 2, 2010. [17] J. H. Morris, M. Satyanarayanan, M. H. Conner, J. H. Howard, D. S. Rosenthal, and F. D. Smith. Andrew: A distributed personal computing environment. Communications of the ACM (CACM), 29(3), 1986. [18] S. Muralidhar, W. Lloyd, S. Roy, C. Hill, E. Lin, W. Liu, S. Pan, S. Shankar, V. Sivakumar, et al. F4: Facebook’s warm blob storage system. In Proceeding of the USENIX Symposium on Operating Systems Design and Implementation (OSDI), 2014. [19] L. Neumeyer, B. Robbins, A. Nair, and A. Kesari. S4: Distributed stream computing platform. In Proceeding of IEEE International Conference on Data Mining Workshops (ICDMW), 2010. [20] R. Sandberg, D. Goldberg, S. Kleiman, D. Walsh, and B. Lyon. Design and implementation of the Sun network file system. In Proceeding of the USENIX Summer Technical Conference, 1985. [21] K. Shvachko, H. Kuang, S. Radia, and R. Chansler. The Hadoop Distributed File System. In Proceeding of the IEEE Mass Storage Systems and Technologies (MSST), 2010. [22] A. Sozykin and T. Epanchintsev. MIPr - a framework for distributed image processing using Hadoop. In Proceeding of the IEEE Application of Information and Communication Technologies (AICT), 2015. [23] C. Sweeney, L. Liu, S. Arietta, and J. Lawrence. HIPI: a Hadoop image processing interface for image-based